From 1926b59f9195ff2cd9d0cf55d6ea5ae725263794 Mon Sep 17 00:00:00 2001 From: Emil Axelsson Date: Tue, 12 Dec 2017 18:36:09 +0100 Subject: [PATCH] Thread safety in torrentclient. Add syncfile to torrentsync. --- modules/sync/syncs/torrentsynchronization.cpp | 22 +++++++++++++- modules/sync/syncs/torrentsynchronization.h | 2 ++ modules/sync/torrentclient.cpp | 25 ++++++++++++++-- modules/sync/torrentclient.h | 2 ++ src/scene/asset.cpp | 29 +++++++++++++++---- src/scene/assetmanager.cpp | 6 ++-- 6 files changed, 75 insertions(+), 11 deletions(-) diff --git a/modules/sync/syncs/torrentsynchronization.cpp b/modules/sync/syncs/torrentsynchronization.cpp index a6e5e03929..84d27ef13d 100644 --- a/modules/sync/syncs/torrentsynchronization.cpp +++ b/modules/sync/syncs/torrentsynchronization.cpp @@ -34,6 +34,8 @@ #include #include +#include + namespace { const char* _loggerCat = "TorrentSynchronization"; @@ -115,6 +117,11 @@ void TorrentSynchronization::start() { return; } begin(); + + if (hasSyncFile()) { + resolve(); + } + _enabled = true; _torrentId = _torrentClient->addMagnetLink( _magnetLink, @@ -123,7 +130,6 @@ void TorrentSynchronization::start() { updateTorrentProgress(p); } ); - } void TorrentSynchronization::cancel() { @@ -139,6 +145,19 @@ void TorrentSynchronization::clear() { cancel(); } +bool TorrentSynchronization::hasSyncFile() { + std::string path = directory() + ".ossync"; + return FileSys.fileExists(path); +} + +void TorrentSynchronization::createSyncFile() { + std::string dir = directory(); + std::string filepath = dir + ".ossync"; + FileSys.createDirectory(dir, ghoul::filesystem::Directory::Recursive::Yes); + std::ofstream syncFile(filepath, std::ofstream::out); + syncFile << "Synchronized"; + syncFile.close(); +} size_t TorrentSynchronization::nSynchronizedBytes() { return _progress.nDownloadedBytes; @@ -155,6 +174,7 @@ bool TorrentSynchronization::nTotalBytesIsKnown() { void TorrentSynchronization::updateTorrentProgress(TorrentClient::TorrentProgress progress) { _progress = progress; if (progress.finished && state() == State::Syncing) { + createSyncFile(); resolve(); } } diff --git a/modules/sync/syncs/torrentsynchronization.h b/modules/sync/syncs/torrentsynchronization.h index 3392533cd9..4865b47bb5 100644 --- a/modules/sync/syncs/torrentsynchronization.h +++ b/modules/sync/syncs/torrentsynchronization.h @@ -58,6 +58,8 @@ public: private: void updateTorrentProgress(TorrentClient::TorrentProgress p); std::string uniformResourceName() const; + bool hasSyncFile(); + void createSyncFile(); std::atomic_bool _enabled = false; size_t _torrentId; diff --git a/modules/sync/torrentclient.cpp b/modules/sync/torrentclient.cpp index 4492b2a990..812f2317d1 100644 --- a/modules/sync/torrentclient.cpp +++ b/modules/sync/torrentclient.cpp @@ -87,8 +87,11 @@ void TorrentClient::initialize() { void TorrentClient::pollAlerts() { std::vector alerts; - _session->pop_alerts(&alerts); - + { + std::lock_guard guard(_mutex); + _session->pop_alerts(&alerts); + } + for (lt::alert const* a : alerts) { //LINFO(a->message()); // if we receive the finished alert or an error, we're done @@ -111,6 +114,8 @@ void TorrentClient::pollAlerts() { } size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination, TorrentProgressCallback cb) { + std::lock_guard guard(_mutex); + if (!_session) { LERROR("Torrent session not initialized when adding torrent"); return -1; @@ -132,6 +137,8 @@ size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destin } size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination, TorrentProgressCallback cb) { + std::lock_guard guard(_mutex); + // TODO: register callback! if (!_session) { LERROR("Torrent session not initialized when adding torrent"); @@ -153,9 +160,23 @@ size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destinat } void TorrentClient::removeTorrent(TorrentId id) { + std::lock_guard guard(_mutex); + + const auto it = _torrents.find(id); + if (it == _torrents.end()) { + return; + } + + libtorrent::torrent_handle h = it->second.handle; + _session->remove_torrent(h); + + _torrents.erase(it); + } void TorrentClient::notify(TorrentId id) { + std::lock_guard guard(_mutex); + const auto& torrent = _torrents.find(id); if (torrent == _torrents.end()) { return; diff --git a/modules/sync/torrentclient.h b/modules/sync/torrentclient.h index 46b8397590..2dcbfd2256 100644 --- a/modules/sync/torrentclient.h +++ b/modules/sync/torrentclient.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include "libtorrent/torrent_handle.hpp" @@ -73,6 +74,7 @@ private: std::unordered_map _torrents; std::unique_ptr _session; std::thread _torrentThread; + std::mutex _mutex; std::atomic_bool _keepRunning = true; }; diff --git a/src/scene/asset.cpp b/src/scene/asset.cpp index f382d7f6d6..bba05fc657 100644 --- a/src/scene/asset.cpp +++ b/src/scene/asset.cpp @@ -108,9 +108,6 @@ void Asset::requiredAssetChangedState(std::shared_ptr child) { void Asset::requestedAssetChangedState(std::shared_ptr child) { State childState = child->state(); - - LINFO("requestedAssetChangedState: " << child->id() << " to " << static_cast(child->state())); - if (child->hasInitializedParent()) { if (childState == State::Loaded) { child->startSynchronizations(); @@ -340,7 +337,7 @@ bool Asset::startSynchronizations() { bool Asset::cancelAllSynchronizations() { bool cancelledAnySync = false; - for (auto& child : requiredAssets()) { + for (auto& child : childAssets()) { bool cancelled = child->cancelAllSynchronizations(); if (cancelled) { cancelledAnySync = true; @@ -360,7 +357,27 @@ bool Asset::cancelAllSynchronizations() { } bool Asset::cancelUnwantedSynchronizations() { - return false; + if (hasSyncingOrResolvedParent()) { + return false; + } + bool cancelledAnySync = false; + for (auto& child : childAssets()) { + bool cancelled = child->cancelUnwantedSynchronizations(); + if (cancelled) { + cancelledAnySync = true; + } + } + for (const auto& s : ownSynchronizations()) { + if (s->isSyncing()) { + cancelledAnySync = true; + s->cancel(); + setState(State::Loaded); + } + } + if (cancelledAnySync) { + setState(State::Loaded); + } + return cancelledAnySync; } bool Asset::restartAllSynchronizations() { @@ -522,6 +539,8 @@ void Asset::deinitialize() { return; } + LDEBUG("Denitializing asset " << id()); + // Notify children for (auto& dependency : _requiredAssets) { try { diff --git a/src/scene/assetmanager.cpp b/src/scene/assetmanager.cpp index 4de2d0eac4..1022ac1790 100644 --- a/src/scene/assetmanager.cpp +++ b/src/scene/assetmanager.cpp @@ -82,15 +82,15 @@ bool AssetManager::update() { } void AssetManager::assetStateChanged(std::shared_ptr asset, Asset::State state) { - LINFO(asset->id() << " changed state to " << static_cast(state)); + //LINFO(asset->id() << " changed state to " << static_cast(state)); } void AssetManager::assetRequested(std::shared_ptr parent, std::shared_ptr child) { - LINFO(parent->id() << " requested " << child->id()); + //LINFO(parent->id() << " requested " << child->id()); } void AssetManager::assetUnrequested(std::shared_ptr parent, std::shared_ptr child) { - LINFO(parent->id() << " unrequested " << child->id()); + //LINFO(parent->id() << " unrequested " << child->id()); } void AssetManager::add(const std::string& path) {