From ce9b75117e762dde9ec5d2cbce3f463233b6c2f1 Mon Sep 17 00:00:00 2001 From: Emil Axelsson Date: Tue, 14 Nov 2017 17:23:58 +0100 Subject: [PATCH] More work on assets (not compiling) --- include/openspace/scene/assetmanager.h | 8 +- include/openspace/scene/assetsynchronizer.h | 33 ++--- modules/sync/syncs/torrentsynchronization.cpp | 24 +++- modules/sync/syncs/torrentsynchronization.h | 6 + modules/sync/torrentclient.cpp | 5 +- modules/sync/torrentclient.h | 10 +- src/scene/assetmanager.cpp | 128 ++++++++++++------ src/scene/assetsynchronizer.cpp | 48 +++---- 8 files changed, 168 insertions(+), 94 deletions(-) diff --git a/include/openspace/scene/assetmanager.h b/include/openspace/scene/assetmanager.h index 84a76a903b..b7802f85bd 100644 --- a/include/openspace/scene/assetmanager.h +++ b/include/openspace/scene/assetmanager.h @@ -53,12 +53,18 @@ public: Loaded, LoadingFailed, Synchronized, - SynchronizatoinFailed, + SynchronizationFailed, Initialized, InitializationFailed }; bool update(); + + + std::shared_ptr updateLoadState(std::string path, AssetState targetState); + void updateSyncState(Asset* asset, AssetState targetState); + void handleSyncStateChange(AssetSynchronizer::StateChange stateChange); + void setTargetAssetState(const std::string& path, AssetState targetState); AssetState currentAssetState(Asset* asset); void clearAllTargetAssets(); diff --git a/include/openspace/scene/assetsynchronizer.h b/include/openspace/scene/assetsynchronizer.h index f21013e5a4..5dd58d5196 100644 --- a/include/openspace/scene/assetsynchronizer.h +++ b/include/openspace/scene/assetsynchronizer.h @@ -46,32 +46,29 @@ class AssetSynchronizer { public: enum class SynchronizationState : int { Unknown, - Added, Synchronizing, - Synchronized + Resolved, + Rejected }; - AssetSynchronizer(); - void addAsset(std::shared_ptr asset); - void removeAsset(Asset* asset); - void syncAsset(Asset* asset); - SynchronizationState assetState(Asset* asset); - float assetProgress(Asset* asset); - void syncUnsynced(); - - std::vector> getSynchronizedAssets(); - -private: - bool assetIsSynchronized(Asset* asset); - - struct AssetSynchronization { + struct StateChange { std::shared_ptr asset; SynchronizationState state; }; - std::unordered_map _managedAssets; + AssetSynchronizer(); + void startSync(std::shared_ptr asset); + void cancelSync(Asset* asset); + + SynchronizationState assetState(Asset* asset); + float assetProgress(Asset* asset); + + std::vector getStateChanges(); + +private: + std::unordered_map> _synchronizingAssets; + std::unordered_map _stateChanges; std::unordered_map _resourceToAssetMap; - std::vector _trivialSynchronizations; }; } // namespace openspace diff --git a/modules/sync/syncs/torrentsynchronization.cpp b/modules/sync/syncs/torrentsynchronization.cpp index f47985f257..73a8ca80fc 100644 --- a/modules/sync/syncs/torrentsynchronization.cpp +++ b/modules/sync/syncs/torrentsynchronization.cpp @@ -111,15 +111,29 @@ std::string TorrentSynchronization::directory() { } void TorrentSynchronization::start() { - size_t torrentId = _torrentClient->addMagnetLink(_magnetLink, directory()); - resolve(); - return; + if (_enabled) { + return; + } + _enabled = true; + _torrentId = _torrentClient->addMagnetLink( + _magnetLink, + directory(), + [this](TorrentClient::Progress p) { + updateTorrentProgress(p); + } + ); + } void TorrentSynchronization::cancel() { + if (_enabled) { + _torrentClient->removeTorrent(_torrentId); + _enabled = false; + } } void TorrentSynchronization::clear() { + // Todo: remove directory! } @@ -135,4 +149,8 @@ bool TorrentSynchronization::nTotalBytesIsKnown() { return false; } +void TorrentSynchronization::updateTorrentProgress(TorrentClient::Progress p) { + // TODO: implement this +} + } // namespace openspace diff --git a/modules/sync/syncs/torrentsynchronization.h b/modules/sync/syncs/torrentsynchronization.h index ac30b85d9f..3ed2a125f3 100644 --- a/modules/sync/syncs/torrentsynchronization.h +++ b/modules/sync/syncs/torrentsynchronization.h @@ -25,6 +25,8 @@ #ifndef __OPENSPACE_MODULE_SYNC___TORRENTSYNCHRONIZATION___H__ #define __OPENSPACE_MODULE_SYNC___TORRENTSYNCHRONIZATION___H__ +#include + #include #include @@ -54,6 +56,10 @@ public: bool nTotalBytesIsKnown() override; private: + void updateTorrentProgress(TorrentClient::Progress p); + + std::atomic_bool _enabled = false; + size_t _torrentId; std::string uniformResourceName() const; std::string _identifier; std::string _magnetLink; diff --git a/modules/sync/torrentclient.cpp b/modules/sync/torrentclient.cpp index 86634583a2..e1b20d2f43 100644 --- a/modules/sync/torrentclient.cpp +++ b/modules/sync/torrentclient.cpp @@ -101,7 +101,7 @@ void TorrentClient::pollAlerts() { } } -size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination) { +size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination, TorrentProgressCallback cb) { if (!_session) { LERROR("Torrent session not initialized when adding torrent"); return -1; @@ -122,7 +122,8 @@ size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destin return id; } -size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination) { +size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination, TorrentProgressCallback cb) { + // TODO: register callback! if (!_session) { LERROR("Torrent session not initialized when adding torrent"); return -1; diff --git a/modules/sync/torrentclient.h b/modules/sync/torrentclient.h index b9b3ad85b4..798692d7c4 100644 --- a/modules/sync/torrentclient.h +++ b/modules/sync/torrentclient.h @@ -48,11 +48,17 @@ public: libtorrent::torrent_handle handle; }; + struct Progress { + + }; + + using TorrentProgressCallback = std::function; + TorrentClient(); ~TorrentClient(); void initialize(); - size_t addTorrentFile(std::string torrentFile, std::string destination); - size_t addMagnetLink(std::string magnetLink, std::string destination); + size_t addTorrentFile(std::string torrentFile, std::string destination, TorrentProgressCallback cb); + size_t addMagnetLink(std::string magnetLink, std::string destination, TorrentProgressCallback cb); void removeTorrent(size_t id); void pollAlerts(); private: diff --git a/src/scene/assetmanager.cpp b/src/scene/assetmanager.cpp index 08fa74d597..07e3a96529 100644 --- a/src/scene/assetmanager.cpp +++ b/src/scene/assetmanager.cpp @@ -43,75 +43,105 @@ AssetManager::AssetManager(std::unique_ptr loader, , _assetSynchronizer(std::move(synchronizer)) {} + bool AssetManager::update() { - bool changedInititializations = false; - std::unordered_map> loadedAssets; - // Load and unload assets for (const auto& c : _pendingStateChangeCommands) { const std::string& path = c.first; const AssetState targetState = c.second; - const bool shouldBeLoaded = targetState != AssetState::Unloaded; - - const std::shared_ptr asset = _assetLoader->loadedAsset(path); - const bool isLoaded = asset != nullptr; - - if (isLoaded && !shouldBeLoaded) { - _currentStates.erase(asset.get()); - _assetLoader->unloadAsset(asset.get()); - - } else if (!isLoaded && shouldBeLoaded) { - std::shared_ptr loadedAsset = tryLoadAsset(path); - if (loadedAsset) { - loadedAssets.emplace(path, loadedAsset); - _currentStates[loadedAsset.get()] = AssetState::Loaded; - } else { - _currentStates[loadedAsset.get()] = AssetState::LoadingFailed; - } + std::shared_ptr asset = updateLoadState(path, targetState); + if (asset) { + loadedAssets.emplace(path, asset); } } // Collect all assets for synchronization for (const auto& loadedAsset : loadedAssets) { - const AssetState targetState = _pendingStateChangeCommands[loadedAsset.first]; + const std::string& path = loadedAsset.first; + const AssetState targetState = _pendingStateChangeCommands[path]; - const bool shouldSync = - targetState == AssetState::Synchronized || - targetState == AssetState::Initialized; + updateSyncState(path, targetState); + } - if (!shouldSync) { - continue; + // Collect assets that were resolved and rejected. + // Initialize if requested. + // Update current state accordingly. + for (const auto& stateChange : _assetSynchronizer->getStateChanges()) { + handleSyncStateChange(stateChange); + } + + _pendingStateChangeCommands.clear(); + return false; +} + +/** + * Load or unload asset depening on target state + * Return shared pointer to asset if this loads the asset + */ +std::shared_ptr AssetManager::updateLoadState(std::string path, AssetState targetState) { + const bool shouldBeLoaded = targetState != AssetState::Unloaded; + + const std::shared_ptr asset = _assetLoader->loadedAsset(path); + const bool isLoaded = asset != nullptr; + + if (isLoaded && !shouldBeLoaded) { + _currentStates.erase(asset.get()); + _assetLoader->unloadAsset(asset.get()); + } + else if (!isLoaded && shouldBeLoaded) { + std::shared_ptr loadedAsset = tryLoadAsset(path); + if (loadedAsset) { + _currentStates[loadedAsset.get()] = AssetState::Loaded; + return loadedAsset; } + else { + _currentStates[loadedAsset.get()] = AssetState::LoadingFailed; + } + } + return nullptr; +} +/** + * Start or cancel synchronizations depending on target state + */ +void AssetManager::updateSyncState(Asset* asset, AssetState targetState) { + const bool shouldSync = + targetState == AssetState::Synchronized || + targetState == AssetState::Initialized; + + if (shouldSync) { std::vector> importedAssets = loadedAsset.second->allAssets(); for (const auto& a : importedAssets) { - _assetSynchronizer->addAsset(a); + _assetSynchronizer->startSync(a); _syncAncestors[a].insert(loadedAsset.second); + //_syncDependencies[loadedAsset.second].insert(a); } _stateChangesInProgress.emplace( loadedAsset.second, _pendingStateChangeCommands[loadedAsset.first] ); + } else { + _assetSynchronizer->cancelSync(a); + // Todo: Also cancel syncing of dependendencies } +} - // Start asset synchronization. (Async operation) - _assetSynchronizer->syncUnsynced(); - - // Collect finished synchronizations and initialize assets - std::vector> syncedAssets = - _assetSynchronizer->getSynchronizedAssets(); +void handleSyncStateChange(AssetSynchronizer::StateChange stateChange) { - for (const auto& syncedAsset : syncedAssets) { - // Retrieve ancestors that were waiting for this asset to sync - const auto it = _syncAncestors.find(syncedAsset); - if (it == _syncAncestors.end()) { - continue; // Should not happen. (No ancestor to this synchronization) - } - std::unordered_set>& ancestors = it->second; + // Retrieve ancestors that were waiting for this asset to sync + const auto it = _syncAncestors.find(stateChange.asset); + if (it == _syncAncestors.end()) { + continue; // Should not happen. (No ancestor to this synchronization) + } + std::unordered_set>& ancestors = it->second; + + if (stateChange.state == + AssetSynchronizer::SynchronizationState::Resolved) + { for (const auto& ancestor : ancestors) { const bool initReady = ancestor->isInitReady(); @@ -124,19 +154,27 @@ bool AssetManager::update() { if (tryInitializeAsset(*ancestor)) { changedInititializations = true; _currentStates[ancestor.get()] = AssetState::Initialized; - } else { + } + else { _currentStates[ancestor.get()] = AssetState::InitializationFailed; } - } else { + } + else { _currentStates[ancestor.get()] = AssetState::Synchronized; } } } - _syncAncestors.erase(syncedAsset); + + } + else if (stateChange.state == + AssetSynchronizer::SynchronizationState::Rejected) + { + for (const auto& ancestor : ancestors) { + _currentStates[ancestor.get()] = AssetState::SynchronizationFailed; + } } - _pendingStateChangeCommands.clear(); - return changedInititializations; + _syncAncestors.erase(stateChange.asset); } void AssetManager::setTargetAssetState(const std::string& path, AssetState targetState) { diff --git a/src/scene/assetsynchronizer.cpp b/src/scene/assetsynchronizer.cpp index 7add472e21..1f52d48360 100644 --- a/src/scene/assetsynchronizer.cpp +++ b/src/scene/assetsynchronizer.cpp @@ -34,7 +34,7 @@ namespace { namespace openspace { AssetSynchronizer::AssetSynchronizer() {} - +/* void AssetSynchronizer::addAsset(std::shared_ptr asset) { _managedAssets.emplace(asset.get(), AssetSynchronization{ asset, SynchronizationState::Added } @@ -57,17 +57,20 @@ void AssetSynchronizer::removeAsset(Asset* asset) { } _managedAssets.erase(asset); -} +}*/ -void AssetSynchronizer::syncAsset(Asset* asset) { +void AssetSynchronizer::startSync(std::shared_ptr asset) { std::vector> resourceSyncs = asset->synchronizations(); if (resourceSyncs.empty()) { - _trivialSynchronizations.push_back(asset); + _stateChanges.emplace( + asset.get(), + StateChange{ asset, SynchronizationState::Resolved } + ); } - _managedAssets[asset].state = SynchronizationState::Synchronizing; + _synchronizingAssets[asset.get()] = asset; for (const auto& s : resourceSyncs) { if (!s->isResolved()) { @@ -76,22 +79,10 @@ void AssetSynchronizer::syncAsset(Asset* asset) { } } -void AssetSynchronizer::syncUnsynced() { - for (auto& it : _managedAssets) { - if (it.second.state == SynchronizationState::Added) { - syncAsset(it.first); - } - } +void AssetSynchroinier::cancelSync(Asset* asset) { + // Todo: cancel sync } - -AssetSynchronizer::SynchronizationState AssetSynchronizer::assetState(Asset* asset) { - auto it = _managedAssets.find(asset); - if (it == _managedAssets.end()) { - return SynchronizationState::Unknown; - } - return it->second.state; -} - +/* float AssetSynchronizer::assetProgress(Asset* asset) { auto it = _managedAssets.find(asset); if (it == _managedAssets.end()) { @@ -118,13 +109,16 @@ float AssetSynchronizer::assetProgress(Asset* asset) { return static_cast(nSyncedBytes)/static_cast(nTotalBytes); } +*/ -std::vector> AssetSynchronizer::getSynchronizedAssets() { +std::vector AssetSynchronizer::getStateChanges() { + /* std::vector> finishedResourceSyncs; + for (auto a : _managedAssets) { std::vector> syncs = a.first->synchronizations(); for (auto s : syncs) { - if (s->isResolved()) { + if (s->isResolved() || s->isRejected()) { finishedResourceSyncs.push_back(s); } } @@ -161,8 +155,16 @@ std::vector> AssetSynchronizer::getSynchronizedAssets() { _trivialSynchronizations.clear(); return synchronizedAssets; + */ + std::vector stateChangesVector; + for (auto& s : _stateChanges) { + stateChangesVector.push_back(std::move(s.second)); + } + _stateChanges.clear(); + return stateChangesVector; } +/* bool AssetSynchronizer::assetIsSynchronized(Asset * asset) { std::vector> syncs = asset->synchronizations(); for (const auto& s : syncs) { @@ -171,6 +173,6 @@ bool AssetSynchronizer::assetIsSynchronized(Asset * asset) { } } return true; -} +}*/ }