Work on asset synchronization

This commit is contained in:
Emil Axelsson
2017-11-11 18:48:07 +01:00
parent 916f94d70e
commit a73f2e0d00
10 changed files with 148 additions and 43 deletions
+10 -5
View File
@@ -45,10 +45,19 @@ namespace openspace {
class AssetSynchronizer : public ResourceSyncClient {
public:
enum class SynchronizationState : int {
Unknown,
Added,
Synchronizing,
Synchronized
};
AssetSynchronizer(ResourceSynchronizer* resourceSynchronizer);
void addAsset(std::shared_ptr<Asset> asset);
void removeAsset(Asset* asset);
void syncAsset(Asset* asset);
SynchronizationState assetState(Asset* asset);
float assetProgress(Asset* asset);
void syncUnsynced();
std::vector<std::shared_ptr<Asset>> getSynchronizedAssets();
@@ -56,11 +65,7 @@ public:
private:
bool assetIsSynchronized(Asset* asset);
enum class SynchronizationState : int {
Added,
Synchronizing,
Synchronized
};
struct AssetSynchronization {
std::shared_ptr<Asset> asset;
@@ -61,13 +61,16 @@ public:
ResourceSynchronization();
virtual ~ResourceSynchronization();
virtual void synchronize() = 0;
virtual std::string directory() = 0;
virtual void synchronize() = 0;
virtual float nSynchronizedBytes() = 0;
virtual float nTotalBytes() = 0;
virtual bool nTotalBytesIsKnown() = 0;
virtual float progress();
void wait();
bool isResolved();
void resolve();
float progress();
void updateProgress(float t);
std::shared_ptr<SynchronizationJob> job();
@@ -75,7 +78,6 @@ private:
std::shared_ptr<SynchronizationJob> _job;
std::atomic<bool> _started;
std::atomic<bool> _resolved;
std::atomic<float> _progress;
};
@@ -182,4 +182,17 @@ void HttpSynchronization::createSyncFile() {
syncFile.close();
}
float HttpSynchronization::nSynchronizedBytes() {
return 0;
}
float HttpSynchronization::nTotalBytes() {
return 0;
}
bool HttpSynchronization::nTotalBytesIsKnown() {
return false;
}
} // namespace openspace
+5
View File
@@ -37,10 +37,15 @@ class HttpSynchronizationJob;
class HttpSynchronization : public ResourceSynchronization {
public:
HttpSynchronization(const ghoul::Dictionary& dict);
virtual ~HttpSynchronization() = default;
static documentation::Documentation Documentation();
std::string directory() override;
void synchronize() override;
float nSynchronizedBytes() override;
float nTotalBytes() override;
bool nTotalBytesIsKnown() override;
private:
std::vector<std::string> fileListUrls();
@@ -120,4 +120,16 @@ void TorrentSynchronization::synchronize() {
return;
}
float TorrentSynchronization::nSynchronizedBytes() {
return 0;
}
float TorrentSynchronization::nTotalBytes() {
return 0;
}
bool TorrentSynchronization::nTotalBytesIsKnown() {
return false;
}
} // namespace openspace
@@ -47,6 +47,10 @@ public:
std::string directory() override;
void synchronize() override;
float nSynchronizedBytes() override;
float nTotalBytes() override;
bool nTotalBytesIsKnown() override;
private:
std::string uniformResourceName() const;
std::string _identifier;
+41 -22
View File
@@ -29,6 +29,7 @@
#include <libtorrent/session.hpp>
#include <libtorrent/alert_types.hpp>
#include <libtorrent/torrent_info.hpp>
#include <libtorrent/magnet_uri.hpp>
#include <openspace/openspace.h>
@@ -36,6 +37,7 @@
namespace {
const char* _loggerCat = "TorrentClient";
std::chrono::milliseconds PollInterval(200);
}
namespace openspace {
@@ -77,25 +79,29 @@ void TorrentClient::initialize() {
_torrentThread = std::thread([this]() {
while (_keepRunning) {
std::vector<libtorrent::alert*> alerts;
_session->pop_alerts(&alerts);
for (lt::alert const* a : alerts) {
LINFO(a->message());
// if we receive the finished alert or an error, we're done
if (lt::alert_cast<lt::torrent_finished_alert>(a)) {
LINFO(a->message());
}
if (lt::alert_cast<lt::torrent_error_alert>(a)) {
LINFO(a->message());
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
pollAlerts();
std::this_thread::sleep_for(PollInterval);
}
});
}
int TorrentClient::addTorrentFile(std::string torrentFile, std::string destination) {
void TorrentClient::pollAlerts() {
std::vector<libtorrent::alert*> alerts;
_session->pop_alerts(&alerts);
for (lt::alert const* a : alerts) {
LINFO(a->message());
// if we receive the finished alert or an error, we're done
if (lt::alert_cast<lt::torrent_finished_alert>(a)) {
LINFO(a->message());
}
if (lt::alert_cast<lt::torrent_error_alert>(a)) {
LINFO(a->message());
}
}
}
size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination) {
if (!_session) {
LERROR("Torrent session not initialized when adding torrent");
return -1;
@@ -106,24 +112,37 @@ int TorrentClient::addTorrentFile(std::string torrentFile, std::string destinati
p.save_path = destination;
p.ti = std::make_shared<libtorrent::torrent_info>(torrentFile, ec, 0);
_session->add_torrent(p, ec);
libtorrent::torrent_handle h = _session->add_torrent(p, ec);
if (ec) {
LERROR(torrentFile << ": " << ec.message());
}
size_t id = _nextId++;
_torrents.emplace(id, Torrent{id, h});
return id;
}
int TorrentClient::addMagnetLink(std::string magnetLink, std::string destination) {
size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination) {
if (!_session) {
LERROR("Torrent session not initialized when adding torrent");
return -1;
}
libtorrent::error_code ec;
libtorrent::add_torrent_params p;
libtorrent::add_torrent_params p = libtorrent::parse_magnet_uri(magnetLink, ec);
p.save_path = destination;
p.url = magnetLink;
p.storage_mode = libtorrent::storage_mode_allocate;
_session->add_torrent(p, ec);
libtorrent::torrent_handle h = _session->add_torrent(p, ec);
if (ec) {
LERROR(magnetLink << ": " << ec.message());
}
size_t id = _nextId++;
_torrents.emplace(id, Torrent{id, h});
return id;
}
void TorrentClient::removeTorrent(int id) {
void TorrentClient::removeTorrent(size_t id) {
}
+15 -7
View File
@@ -29,6 +29,9 @@
#include <string>
#include <memory>
#include <thread>
#include <unordered_map>
#include "libtorrent/torrent_handle.hpp"
namespace libtorrent {
class session;
@@ -36,20 +39,25 @@ namespace libtorrent {
namespace openspace {
struct Torrent {
std::string _torrentFile;
std::string _destination;
};
class TorrentClient {
public:
struct Torrent {
size_t id;
libtorrent::torrent_handle handle;
};
TorrentClient();
~TorrentClient();
void initialize();
int addTorrentFile(std::string torrentFile, std::string destination);
int addMagnetLink(std::string magnetLink, std::string destination);
void removeTorrent(int id);
size_t addTorrentFile(std::string torrentFile, std::string destination);
size_t addMagnetLink(std::string magnetLink, std::string destination);
void removeTorrent(size_t id);
void pollAlerts();
private:
size_t _nextId = 0;
std::unordered_map<size_t, Torrent> _torrents;
std::unique_ptr<libtorrent::session> _session;
std::thread _torrentThread;
std::atomic_bool _keepRunning = true;
+36
View File
@@ -26,6 +26,7 @@
#include <algorithm>
#include <memory>
#include <numeric>
namespace {
const char* _loggerCat = "AssetSynchronizer";
@@ -85,6 +86,41 @@ void AssetSynchronizer::syncUnsynced() {
}
}
AssetSynchronizer::SynchronizationState AssetSynchronizer::assetState(Asset* asset) {
auto it = _managedAssets.find(asset);
if (it == _managedAssets.end()) {
return SynchronizationState::Unknown;
}
return it->second.state;
}
float AssetSynchronizer::assetProgress(Asset* asset) {
auto it = _managedAssets.find(asset);
if (it == _managedAssets.end()) {
return 0.f;
}
const std::vector<std::shared_ptr<ResourceSynchronization>> syncs =
asset->synchronizations();
size_t nTotalBytes = 0;
size_t nSyncedBytes = 0;
for (const auto& sync : syncs) {
if (sync->nTotalBytesIsKnown()) {
return 0;
} else {
nTotalBytes += sync->nTotalBytes();
nSyncedBytes += sync->nSynchronizedBytes();
}
}
if (nTotalBytes == 0) {
return 1.f;
}
return static_cast<float>(nSyncedBytes)/static_cast<float>(nTotalBytes);
}
std::vector<std::shared_ptr<Asset>> AssetSynchronizer::getSynchronizedAssets() {
std::vector<std::shared_ptr<ResourceSynchronization>> syncs =
_resourceSynchronizer->finishedSynchronizations(this);
+7 -6
View File
@@ -92,7 +92,6 @@ std::unique_ptr<ResourceSynchronization> ResourceSynchronization::createFromDict
return nullptr;
}
const SyncModule* syncModule = OsEng.moduleEngine().module<SyncModule>();
return result;
}
@@ -112,11 +111,13 @@ void ResourceSynchronization::resolve() {
}
float ResourceSynchronization::progress() {
return _progress;
}
void ResourceSynchronization::updateProgress(float t) {
_progress = std::min(1.0f, std::max(t, 0.0f));
if (!nTotalBytesIsKnown()) {
return 0.f;
}
if (nTotalBytes() == 0) {
return 1.f;
}
return static_cast<float>(nSynchronizedBytes()) / static_cast<float>(nTotalBytes());
}
// SynchronizationJob methods