Asset syncing

This commit is contained in:
Emil Axelsson
2017-11-15 13:12:10 +01:00
parent 6b182bdfad
commit 27823e9af6
10 changed files with 227 additions and 100 deletions

View File

@@ -70,7 +70,6 @@ public:
void addSynchronization(std::shared_ptr<ResourceSynchronization> synchronization);
std::vector<std::shared_ptr<ResourceSynchronization>> synchronizations();
std::vector<std::shared_ptr<Asset>> allActiveAssets();
std::vector<std::shared_ptr<Asset>> allAssets();
bool isInitReady() const;

View File

@@ -41,6 +41,16 @@ namespace openspace {
class Asset;
/**
* Interface for managing assets.
* The asset manager interface is only concerned with "top level" assets,
* i.e. assets that are loaded using setTargetAssetState, and not their dependencies.
* However, an asset is not considered synchronized before all its deps are
* synchronized.
* Also, setting a target state of an asset to Unloaded will only unload an asset
* from the system if it is not a dependency of a loaded asset.
*/
class AssetManager {
public:
AssetManager(
@@ -65,12 +75,12 @@ public:
};
bool update();
std::shared_ptr<Asset> updateLoadState(std::string path, AssetState targetState);
void updateSyncState(Asset* asset, AssetState targetState);
void handleSyncStateChange(AssetSynchronizer::StateChange stateChange);
void setTargetAssetState(const std::string& path, AssetState targetState);
void setTargetAssetState(Asset* asset, AssetState targetState);
AssetState currentAssetState(Asset* asset);
AssetState currentAssetState(const std::string& path);
void clearAllTargetAssets();
std::vector<std::shared_ptr<Asset>> loadedAssets();
scripting::LuaLibrary luaLibrary();

View File

@@ -45,7 +45,7 @@ namespace openspace {
class AssetSynchronizer {
public:
enum class SynchronizationState : int {
Unknown,
Unsynced,
Synchronizing,
Resolved,
Rejected
@@ -56,9 +56,15 @@ public:
SynchronizationState state;
};
struct AssetResourceSync {
std::shared_ptr<Asset> asset;
std::shared_ptr<ResourceSynchronization> sync;
};
AssetSynchronizer();
void startSync(std::shared_ptr<Asset> asset);
void cancelSync(Asset* asset);
void cancelSync(std::shared_ptr<Asset> asset);
void restartSync(std::shared_ptr<Asset> asset);
SynchronizationState assetState(Asset* asset);
float assetProgress(Asset* asset);
@@ -66,9 +72,13 @@ public:
std::vector<StateChange> getStateChanges();
private:
std::unordered_map<Asset*, std::shared_ptr<Asset>> _synchronizingAssets;
void startAssetResourceSync(std::shared_ptr<Asset> a, std::shared_ptr<ResourceSynchronization> rs);
void cancelAssetResourceSync(std::shared_ptr<Asset> a, std::shared_ptr<ResourceSynchronization> rs);
void setState(std::shared_ptr<Asset> a, SynchronizationState state);
std::vector<std::shared_ptr<Asset>> _synchronizingAssets;
std::unordered_map<Asset*, StateChange> _stateChanges;
std::unordered_map<ResourceSynchronization*, Asset*> _resourceToAssetMap;
std::unordered_map<ResourceSynchronization*, std::vector<Asset*>> _resourceToAssetMap;
};
} // namespace openspace

View File

@@ -34,14 +34,20 @@
namespace openspace {
class TorrentClient;
class ResourceSynchronization;
class ResourceSynchronization
: public std::enable_shared_from_this<ResourceSynchronization>
{
public:
enum class State : int {
Unsynced,
Syncing,
Resolved,
Rejected
};
using CallbackHandle = size_t;
using StateChangeCallback = std::function<void(State)>;
static documentation::Documentation Documentation();
static std::unique_ptr<ResourceSynchronization> createFromDictionary(
const ghoul::Dictionary& dictionary);
@@ -60,14 +66,25 @@ public:
void wait();
bool isResolved();
bool isRejected();
bool isSyncing();
CallbackHandle addStateChangeCallback(StateChangeCallback cb);
void removeStateChangeCallback(CallbackHandle id);
protected:
void resolve();
void reject();
void reset();
void begin();
void updateProgress(float t);
private:
std::atomic<bool> _started;
std::atomic<bool> _resolved;
std::atomic<bool> _rejected;
void setState(State state);
std::atomic<State> _state = State::Unsynced;
std::mutex _callbackMutex;
CallbackHandle _nextCallbackId = 0;
std::unordered_map<CallbackHandle, StateChangeCallback> _stateChangeCallbacks;
};
} // namespace openspace

View File

@@ -116,6 +116,11 @@ std::string HttpSynchronization::directory() {
}
void HttpSynchronization::start() {
if (isSyncing()) {
return;
}
begin();
if (hasSyncFile()) {
resolve();
return;
@@ -130,15 +135,19 @@ void HttpSynchronization::start() {
return;
}
}
reject();
if (!_shouldCancel) {
reject();
}
});
}
void HttpSynchronization::cancel() {
_shouldCancel = true;
reset();
}
void HttpSynchronization::clear() {
cancel();
// TODO: Remove all files from directory.
}

View File

@@ -73,17 +73,7 @@ std::vector<std::shared_ptr<ResourceSynchronization>> Asset::synchronizations()
std::vector<std::shared_ptr<Asset>> Asset::allAssets() {
std::set<std::shared_ptr<Asset>> assets({ shared_from_this() });
for (auto& dep : _dependencies) {
std::vector<std::shared_ptr<Asset>> depAssets = dep->allActiveAssets();
std::copy(depAssets.begin(), depAssets.end(), std::inserter(assets, assets.end()));
}
std::vector<std::shared_ptr<Asset>> assetVector(assets.begin(), assets.end());
return assetVector;
}
std::vector<std::shared_ptr<Asset >> Asset::allActiveAssets() {
std::set<std::shared_ptr<Asset>> assets({ shared_from_this() });
for (auto& dep : _dependencies) {
std::vector<std::shared_ptr<Asset>> depAssets = dep->allActiveAssets();
std::vector<std::shared_ptr<Asset>> depAssets = dep->allAssets();
std::copy(depAssets.begin(), depAssets.end(), std::inserter(assets, assets.end()));
}
std::vector<std::shared_ptr<Asset>> assetVector(assets.begin(), assets.end());

View File

@@ -43,9 +43,11 @@ AssetManager::AssetManager(std::unique_ptr<AssetLoader> loader,
, _assetSynchronizer(std::move(synchronizer))
{}
bool AssetManager::update() {
// 1. Load assets.
// 2. Start/cancel synchronizations
// 3. Unload assets.
// Load assets
for (const auto& c : _pendingStateChangeCommands) {
const std::string& path = c.first;
@@ -55,7 +57,7 @@ bool AssetManager::update() {
}
}
// Start synchronizations
// Start/cancel synchronizations
for (const auto& c : _pendingStateChangeCommands) {
const std::string& path = c.first;
const AssetState targetState = c.second;
@@ -77,7 +79,7 @@ bool AssetManager::update() {
targetState == AssetState::Initialized);
if (shouldSync && !alreadySyncedOrSyncing) {
//startSynchronization(asset);
startSynchronization(asset);
}
}
@@ -224,6 +226,21 @@ AssetManager::AssetState AssetManager::currentAssetState(Asset* asset) {
return it->state;
}
AssetManager::AssetState AssetManager::currentAssetState(const std::string& assetIdentifier) {
const auto it = std::find_if(
_managedAssets.begin(),
_managedAssets.end(),
[&assetIdentifier](const ManagedAsset& ma) {
return ma.path == assetIdentifier;
}
);
if (it == _managedAssets.end()) {
return AssetManager::AssetState::Unloaded;
}
return it->state;
}
void AssetManager::clearAllTargetAssets() {
_pendingStateChangeCommands.clear();
for (const auto& i : _assetLoader->loadedAssets()) {
@@ -253,10 +270,24 @@ scripting::LuaLibrary AssetManager::luaLibrary() {
"string",
""
},
{
"synchronizeAsset",
&luascriptfunctions::synchronizeAsset,
{this},
"string",
""
},
{
"initializeAsset",
&luascriptfunctions::initializeAsset,
{ this },
"string",
""
},
{
"unloadAsset",
&luascriptfunctions::unloadAsset,
{this},
{ this },
"string",
""
}

View File

@@ -29,7 +29,34 @@ int loadAsset(lua_State* state) {
reinterpret_cast<AssetManager*>(lua_touserdata(state, lua_upvalueindex(1)));
int nArguments = lua_gettop(state);
SCRIPT_CHECK_ARGUMENTS("importAsset", state, 1, nArguments);
SCRIPT_CHECK_ARGUMENTS("loadAsset", state, 1, nArguments);
std::string assetName = luaL_checkstring(state, -1);
assetManager->setTargetAssetState(assetName, AssetManager::AssetState::Loaded);
return 0;
}
int synchronizeAsset(lua_State* state) {
AssetManager *assetManager =
reinterpret_cast<AssetManager*>(lua_touserdata(state, lua_upvalueindex(1)));
int nArguments = lua_gettop(state);
SCRIPT_CHECK_ARGUMENTS("synchronizeAsset", state, 1, nArguments);
std::string assetName = luaL_checkstring(state, -1);
assetManager->setTargetAssetState(assetName, AssetManager::AssetState::Synchronized);
return 0;
}
int initializeAsset(lua_State* state) {
AssetManager *assetManager =
reinterpret_cast<AssetManager*>(lua_touserdata(state, lua_upvalueindex(1)));
int nArguments = lua_gettop(state);
SCRIPT_CHECK_ARGUMENTS("initializeAsset", state, 1, nArguments);
std::string assetName = luaL_checkstring(state, -1);
@@ -42,7 +69,7 @@ int unloadAsset(lua_State* state) {
reinterpret_cast<AssetManager*>(lua_touserdata(state, lua_upvalueindex(1)));
int nArguments = lua_gettop(state);
SCRIPT_CHECK_ARGUMENTS("unimportAsset", state, 1, nArguments);
SCRIPT_CHECK_ARGUMENTS("unloadAsset", state, 1, nArguments);
std::string assetName = luaL_checkstring(state, -1);

View File

@@ -34,82 +34,77 @@ namespace {
namespace openspace {
AssetSynchronizer::AssetSynchronizer() {}
/*
void AssetSynchronizer::addAsset(std::shared_ptr<Asset> asset) {
_managedAssets.emplace(asset.get(),
AssetSynchronization{ asset, SynchronizationState::Added }
);
for (const auto& sync : asset->synchronizations()) {
_resourceToAssetMap[sync.get()] = asset.get();
}
}
void AssetSynchronizer::removeAsset(Asset* asset) {
AssetSynchronization a = _managedAssets[asset];
std::vector<std::shared_ptr<ResourceSynchronization>> resourceSyncs =
asset->synchronizations();
for (const auto& s : resourceSyncs) {
_resourceToAssetMap.erase(s.get());
s->cancel();
}
_managedAssets.erase(asset);
}*/
void AssetSynchronizer::startSync(std::shared_ptr<Asset> asset) {
std::vector<std::shared_ptr<ResourceSynchronization>> resourceSyncs =
asset->synchronizations();
std::vector<std::shared_ptr<Asset>> assets = asset->allAssets();
for (const auto& a : assets) {
std::vector<std::shared_ptr<ResourceSynchronization>> syncs =
a->synchronizations();
if (resourceSyncs.empty()) {
_stateChanges.emplace(
asset.get(),
StateChange{ asset, SynchronizationState::Resolved }
);
}
_synchronizingAssets[asset.get()] = asset;
for (const auto& s : resourceSyncs) {
if (!s->isResolved()) {
s->start();
bool startedAnySync = false;
for (const auto& s : syncs) {
if (!s->isResolved()) {
startedAnySync = true;
startAssetResourceSync(a, s);
}
}
if (!startedAnySync) {
setState(a, SynchronizationState::Resolved);
}
}
}
void AssetSynchronizer::cancelSync(Asset* asset) {
// Todo: cancel sync
}
/*
float AssetSynchronizer::assetProgress(Asset* asset) {
auto it = _managedAssets.find(asset);
if (it == _managedAssets.end()) {
return 0.f;
void AssetSynchronizer::cancelSync(std::shared_ptr<Asset> asset) {
std::vector<std::shared_ptr<Asset>> assets = asset->allAssets();
for (const auto& a : assets) {
std::vector<std::shared_ptr<ResourceSynchronization>> syncs =
a->synchronizations();
bool cancelledAnySync = false;
for (const auto& s : syncs) {
if (s->isSyncing()) {
cancelledAnySync = true;
cancelAssetResourceSync(a, s);
}
}
if (cancelledAnySync) {
setState(a, SynchronizationState::Unsynced);
}
}
const std::vector<std::shared_ptr<ResourceSynchronization>> syncs =
asset->synchronizations();
}
void AssetSynchronizer::restartSync(std::shared_ptr<Asset> asset) {
cancelSync(asset);
startSync(asset);
}
float AssetSynchronizer::assetProgress(Asset* asset) {
std::vector<std::shared_ptr<Asset>> assets = asset->allAssets();
size_t nTotalBytes = 0;
size_t nSyncedBytes = 0;
for (const auto& sync : syncs) {
if (sync->nTotalBytesIsKnown()) {
nTotalBytes += sync->nTotalBytes();
nSyncedBytes += sync->nSynchronizedBytes();
} else {
return 0;
for (const auto& a : assets) {
const std::vector<std::shared_ptr<ResourceSynchronization>> syncs =
asset->synchronizations();
for (const auto& sync : syncs) {
if (sync->nTotalBytesIsKnown()) {
nTotalBytes += sync->nTotalBytes();
nSyncedBytes += sync->nSynchronizedBytes();
} else {
return 0;
}
}
}
if (nTotalBytes == 0) {
return 1.f;
}
return static_cast<float>(nSyncedBytes)/static_cast<float>(nTotalBytes);
}
*/
std::vector<AssetSynchronizer::StateChange> AssetSynchronizer::getStateChanges() {
/*

View File

@@ -65,13 +65,9 @@ documentation::Documentation ResourceSynchronization::Documentation() {
};
}
ResourceSynchronization::ResourceSynchronization()
: _started(false)
, _resolved(false)
{}
ResourceSynchronization::ResourceSynchronization() {}
ResourceSynchronization::~ResourceSynchronization()
{}
ResourceSynchronization::~ResourceSynchronization() {}
std::unique_ptr<ResourceSynchronization> ResourceSynchronization::createFromDictionary(
const ghoul::Dictionary & dictionary)
@@ -99,15 +95,58 @@ void ResourceSynchronization::wait() {
}
bool ResourceSynchronization::isResolved() {
return _resolved;
return _state == State::Resolved;
}
bool ResourceSynchronization::isRejected() {
return _state == State::Rejected;
}
bool ResourceSynchronization::isSyncing() {
return _state == State::Syncing;
}
ResourceSynchronization::CallbackHandle
ResourceSynchronization::addStateChangeCallback(StateChangeCallback cb)
{
std::lock_guard<std::mutex> guard(_callbackMutex);
CallbackHandle callbackId = _nextCallbackId++;
_stateChangeCallbacks[callbackId] = cb;
return callbackId;
}
void ResourceSynchronization::removeStateChangeCallback(CallbackHandle id) {
std::lock_guard<std::mutex> guard(_callbackMutex);
_stateChangeCallbacks.erase(id);
}
void ResourceSynchronization::resolve() {
_resolved = true;
setState(State::Resolved);
}
void ResourceSynchronization::reject() {
_rejected = true;
setState(State::Rejected);
}
void ResourceSynchronization::reset() {
setState(State::Unsynced);
}
void ResourceSynchronization::begin() {
setState(State::Syncing);
}
void ResourceSynchronization::setState(State state) {
_state = state;
_callbackMutex.lock();
std::vector<StateChangeCallback> callbacks(
_stateChangeCallbacks.begin(),
_stateChangeCallbacks.end()
);
_callbackMutex.unlock();
for (auto& cb : callbacks) {
cb(state);
}
}
float ResourceSynchronization::progress() {