More work on assets (not compiling)

This commit is contained in:
Emil Axelsson
2017-11-14 17:23:58 +01:00
parent 6a4f938bac
commit ce9b75117e
8 changed files with 168 additions and 94 deletions

View File

@@ -53,12 +53,18 @@ public:
Loaded,
LoadingFailed,
Synchronized,
SynchronizatoinFailed,
SynchronizationFailed,
Initialized,
InitializationFailed
};
bool update();
std::shared_ptr<Asset> updateLoadState(std::string path, AssetState targetState);
void updateSyncState(Asset* asset, AssetState targetState);
void handleSyncStateChange(AssetSynchronizer::StateChange stateChange);
void setTargetAssetState(const std::string& path, AssetState targetState);
AssetState currentAssetState(Asset* asset);
void clearAllTargetAssets();

View File

@@ -46,32 +46,29 @@ class AssetSynchronizer {
public:
enum class SynchronizationState : int {
Unknown,
Added,
Synchronizing,
Synchronized
Resolved,
Rejected
};
AssetSynchronizer();
void addAsset(std::shared_ptr<Asset> asset);
void removeAsset(Asset* asset);
void syncAsset(Asset* asset);
SynchronizationState assetState(Asset* asset);
float assetProgress(Asset* asset);
void syncUnsynced();
std::vector<std::shared_ptr<Asset>> getSynchronizedAssets();
private:
bool assetIsSynchronized(Asset* asset);
struct AssetSynchronization {
struct StateChange {
std::shared_ptr<Asset> asset;
SynchronizationState state;
};
std::unordered_map<Asset*, AssetSynchronization> _managedAssets;
AssetSynchronizer();
void startSync(std::shared_ptr<Asset> asset);
void cancelSync(Asset* asset);
SynchronizationState assetState(Asset* asset);
float assetProgress(Asset* asset);
std::vector<StateChange> getStateChanges();
private:
std::unordered_map<Asset*, std::shared_ptr<Asset>> _synchronizingAssets;
std::unordered_map<Asset*, StateChange> _stateChanges;
std::unordered_map<ResourceSynchronization*, Asset*> _resourceToAssetMap;
std::vector<Asset*> _trivialSynchronizations;
};
} // namespace openspace

View File

@@ -111,15 +111,29 @@ std::string TorrentSynchronization::directory() {
}
void TorrentSynchronization::start() {
size_t torrentId = _torrentClient->addMagnetLink(_magnetLink, directory());
resolve();
return;
if (_enabled) {
return;
}
_enabled = true;
_torrentId = _torrentClient->addMagnetLink(
_magnetLink,
directory(),
[this](TorrentClient::Progress p) {
updateTorrentProgress(p);
}
);
}
void TorrentSynchronization::cancel() {
if (_enabled) {
_torrentClient->removeTorrent(_torrentId);
_enabled = false;
}
}
void TorrentSynchronization::clear() {
// Todo: remove directory!
}
@@ -135,4 +149,8 @@ bool TorrentSynchronization::nTotalBytesIsKnown() {
return false;
}
void TorrentSynchronization::updateTorrentProgress(TorrentClient::Progress p) {
// TODO: implement this
}
} // namespace openspace

View File

@@ -25,6 +25,8 @@
#ifndef __OPENSPACE_MODULE_SYNC___TORRENTSYNCHRONIZATION___H__
#define __OPENSPACE_MODULE_SYNC___TORRENTSYNCHRONIZATION___H__
#include <modules/sync/torrentclient.h>
#include <openspace/util/resourcesynchronization.h>
#include <openspace/documentation/documentation.h>
@@ -54,6 +56,10 @@ public:
bool nTotalBytesIsKnown() override;
private:
void updateTorrentProgress(TorrentClient::Progress p);
std::atomic_bool _enabled = false;
size_t _torrentId;
std::string uniformResourceName() const;
std::string _identifier;
std::string _magnetLink;

View File

@@ -101,7 +101,7 @@ void TorrentClient::pollAlerts() {
}
}
size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination) {
size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination, TorrentProgressCallback cb) {
if (!_session) {
LERROR("Torrent session not initialized when adding torrent");
return -1;
@@ -122,7 +122,8 @@ size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destin
return id;
}
size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination) {
size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination, TorrentProgressCallback cb) {
// TODO: register callback!
if (!_session) {
LERROR("Torrent session not initialized when adding torrent");
return -1;

View File

@@ -48,11 +48,17 @@ public:
libtorrent::torrent_handle handle;
};
struct Progress {
};
using TorrentProgressCallback = std::function<void(Progress)>;
TorrentClient();
~TorrentClient();
void initialize();
size_t addTorrentFile(std::string torrentFile, std::string destination);
size_t addMagnetLink(std::string magnetLink, std::string destination);
size_t addTorrentFile(std::string torrentFile, std::string destination, TorrentProgressCallback cb);
size_t addMagnetLink(std::string magnetLink, std::string destination, TorrentProgressCallback cb);
void removeTorrent(size_t id);
void pollAlerts();
private:

View File

@@ -43,75 +43,105 @@ AssetManager::AssetManager(std::unique_ptr<AssetLoader> loader,
, _assetSynchronizer(std::move(synchronizer))
{}
bool AssetManager::update() {
bool changedInititializations = false;
std::unordered_map<std::string, std::shared_ptr<Asset>> loadedAssets;
// Load and unload assets
for (const auto& c : _pendingStateChangeCommands) {
const std::string& path = c.first;
const AssetState targetState = c.second;
const bool shouldBeLoaded = targetState != AssetState::Unloaded;
const std::shared_ptr<Asset> asset = _assetLoader->loadedAsset(path);
const bool isLoaded = asset != nullptr;
if (isLoaded && !shouldBeLoaded) {
_currentStates.erase(asset.get());
_assetLoader->unloadAsset(asset.get());
} else if (!isLoaded && shouldBeLoaded) {
std::shared_ptr<Asset> loadedAsset = tryLoadAsset(path);
if (loadedAsset) {
loadedAssets.emplace(path, loadedAsset);
_currentStates[loadedAsset.get()] = AssetState::Loaded;
} else {
_currentStates[loadedAsset.get()] = AssetState::LoadingFailed;
}
std::shared_ptr<Asset> asset = updateLoadState(path, targetState);
if (asset) {
loadedAssets.emplace(path, asset);
}
}
// Collect all assets for synchronization
for (const auto& loadedAsset : loadedAssets) {
const AssetState targetState = _pendingStateChangeCommands[loadedAsset.first];
const std::string& path = loadedAsset.first;
const AssetState targetState = _pendingStateChangeCommands[path];
const bool shouldSync =
targetState == AssetState::Synchronized ||
targetState == AssetState::Initialized;
updateSyncState(path, targetState);
}
if (!shouldSync) {
continue;
// Collect assets that were resolved and rejected.
// Initialize if requested.
// Update current state accordingly.
for (const auto& stateChange : _assetSynchronizer->getStateChanges()) {
handleSyncStateChange(stateChange);
}
_pendingStateChangeCommands.clear();
return false;
}
/**
* Load or unload asset depening on target state
* Return shared pointer to asset if this loads the asset
*/
std::shared_ptr<Asset> AssetManager::updateLoadState(std::string path, AssetState targetState) {
const bool shouldBeLoaded = targetState != AssetState::Unloaded;
const std::shared_ptr<Asset> asset = _assetLoader->loadedAsset(path);
const bool isLoaded = asset != nullptr;
if (isLoaded && !shouldBeLoaded) {
_currentStates.erase(asset.get());
_assetLoader->unloadAsset(asset.get());
}
else if (!isLoaded && shouldBeLoaded) {
std::shared_ptr<Asset> loadedAsset = tryLoadAsset(path);
if (loadedAsset) {
_currentStates[loadedAsset.get()] = AssetState::Loaded;
return loadedAsset;
}
else {
_currentStates[loadedAsset.get()] = AssetState::LoadingFailed;
}
}
return nullptr;
}
/**
* Start or cancel synchronizations depending on target state
*/
void AssetManager::updateSyncState(Asset* asset, AssetState targetState) {
const bool shouldSync =
targetState == AssetState::Synchronized ||
targetState == AssetState::Initialized;
if (shouldSync) {
std::vector<std::shared_ptr<Asset>> importedAssets =
loadedAsset.second->allAssets();
for (const auto& a : importedAssets) {
_assetSynchronizer->addAsset(a);
_assetSynchronizer->startSync(a);
_syncAncestors[a].insert(loadedAsset.second);
//_syncDependencies[loadedAsset.second].insert(a);
}
_stateChangesInProgress.emplace(
loadedAsset.second,
_pendingStateChangeCommands[loadedAsset.first]
);
} else {
_assetSynchronizer->cancelSync(a);
// Todo: Also cancel syncing of dependendencies
}
}
// Start asset synchronization. (Async operation)
_assetSynchronizer->syncUnsynced();
// Collect finished synchronizations and initialize assets
std::vector<std::shared_ptr<Asset>> syncedAssets =
_assetSynchronizer->getSynchronizedAssets();
void handleSyncStateChange(AssetSynchronizer::StateChange stateChange) {
for (const auto& syncedAsset : syncedAssets) {
// Retrieve ancestors that were waiting for this asset to sync
const auto it = _syncAncestors.find(syncedAsset);
if (it == _syncAncestors.end()) {
continue; // Should not happen. (No ancestor to this synchronization)
}
std::unordered_set<std::shared_ptr<Asset>>& ancestors = it->second;
// Retrieve ancestors that were waiting for this asset to sync
const auto it = _syncAncestors.find(stateChange.asset);
if (it == _syncAncestors.end()) {
continue; // Should not happen. (No ancestor to this synchronization)
}
std::unordered_set<std::shared_ptr<Asset>>& ancestors = it->second;
if (stateChange.state ==
AssetSynchronizer::SynchronizationState::Resolved)
{
for (const auto& ancestor : ancestors) {
const bool initReady = ancestor->isInitReady();
@@ -124,19 +154,27 @@ bool AssetManager::update() {
if (tryInitializeAsset(*ancestor)) {
changedInititializations = true;
_currentStates[ancestor.get()] = AssetState::Initialized;
} else {
}
else {
_currentStates[ancestor.get()] = AssetState::InitializationFailed;
}
} else {
}
else {
_currentStates[ancestor.get()] = AssetState::Synchronized;
}
}
}
_syncAncestors.erase(syncedAsset);
}
else if (stateChange.state ==
AssetSynchronizer::SynchronizationState::Rejected)
{
for (const auto& ancestor : ancestors) {
_currentStates[ancestor.get()] = AssetState::SynchronizationFailed;
}
}
_pendingStateChangeCommands.clear();
return changedInititializations;
_syncAncestors.erase(stateChange.asset);
}
void AssetManager::setTargetAssetState(const std::string& path, AssetState targetState) {

View File

@@ -34,7 +34,7 @@ namespace {
namespace openspace {
AssetSynchronizer::AssetSynchronizer() {}
/*
void AssetSynchronizer::addAsset(std::shared_ptr<Asset> asset) {
_managedAssets.emplace(asset.get(),
AssetSynchronization{ asset, SynchronizationState::Added }
@@ -57,17 +57,20 @@ void AssetSynchronizer::removeAsset(Asset* asset) {
}
_managedAssets.erase(asset);
}
}*/
void AssetSynchronizer::syncAsset(Asset* asset) {
void AssetSynchronizer::startSync(std::shared_ptr<Asset> asset) {
std::vector<std::shared_ptr<ResourceSynchronization>> resourceSyncs =
asset->synchronizations();
if (resourceSyncs.empty()) {
_trivialSynchronizations.push_back(asset);
_stateChanges.emplace(
asset.get(),
StateChange{ asset, SynchronizationState::Resolved }
);
}
_managedAssets[asset].state = SynchronizationState::Synchronizing;
_synchronizingAssets[asset.get()] = asset;
for (const auto& s : resourceSyncs) {
if (!s->isResolved()) {
@@ -76,22 +79,10 @@ void AssetSynchronizer::syncAsset(Asset* asset) {
}
}
void AssetSynchronizer::syncUnsynced() {
for (auto& it : _managedAssets) {
if (it.second.state == SynchronizationState::Added) {
syncAsset(it.first);
}
}
void AssetSynchroinier::cancelSync(Asset* asset) {
// Todo: cancel sync
}
AssetSynchronizer::SynchronizationState AssetSynchronizer::assetState(Asset* asset) {
auto it = _managedAssets.find(asset);
if (it == _managedAssets.end()) {
return SynchronizationState::Unknown;
}
return it->second.state;
}
/*
float AssetSynchronizer::assetProgress(Asset* asset) {
auto it = _managedAssets.find(asset);
if (it == _managedAssets.end()) {
@@ -118,13 +109,16 @@ float AssetSynchronizer::assetProgress(Asset* asset) {
return static_cast<float>(nSyncedBytes)/static_cast<float>(nTotalBytes);
}
*/
std::vector<std::shared_ptr<Asset>> AssetSynchronizer::getSynchronizedAssets() {
std::vector<AssetSynchronizer::StateChange> AssetSynchronizer::getStateChanges() {
/*
std::vector<std::shared_ptr<ResourceSynchronization>> finishedResourceSyncs;
for (auto a : _managedAssets) {
std::vector<std::shared_ptr<ResourceSynchronization>> syncs = a.first->synchronizations();
for (auto s : syncs) {
if (s->isResolved()) {
if (s->isResolved() || s->isRejected()) {
finishedResourceSyncs.push_back(s);
}
}
@@ -161,8 +155,16 @@ std::vector<std::shared_ptr<Asset>> AssetSynchronizer::getSynchronizedAssets() {
_trivialSynchronizations.clear();
return synchronizedAssets;
*/
std::vector<StateChange> stateChangesVector;
for (auto& s : _stateChanges) {
stateChangesVector.push_back(std::move(s.second));
}
_stateChanges.clear();
return stateChangesVector;
}
/*
bool AssetSynchronizer::assetIsSynchronized(Asset * asset) {
std::vector<std::shared_ptr<ResourceSynchronization>> syncs = asset->synchronizations();
for (const auto& s : syncs) {
@@ -171,6 +173,6 @@ bool AssetSynchronizer::assetIsSynchronized(Asset * asset) {
}
}
return true;
}
}*/
}