This commit is contained in:
Alexander Bock
2018-02-24 14:49:30 -05:00
9 changed files with 207 additions and 232 deletions
@@ -1,7 +1,7 @@
local Kernels = asset.syncedResource({
Name = "Mars Spice Kernels",
Type = "TorrentSynchronization",
Identifier = "mat097",
Identifier = "mar097",
Magnet = "magnet:?xt=urn:btih:308F326B9AF864294D73042FBBED33B17291E27E&dn=mar097.bsp&tr=udp%3a%2f%2ftracker.openbittorrent.com%3a80%2fannounce&tr=udp%3a%2f%2ftracker.publicbt.com%3a80%2fannounce&tr=udp%3a%2f%2ftracker.ccc.de%3a80%2fannounce"
})
@@ -27,19 +27,15 @@
#include <openspace/documentation/documentation.h>
#include <openspace/util/concurrentjobmanager.h>
#include <ghoul/filesystem/directory.h>
#include <ghoul/misc/dictionary.h>
#include <unordered_map>
namespace openspace {
class ResourceSynchronization
: public std::enable_shared_from_this<ResourceSynchronization>
{
class ResourceSynchronization {
public:
enum class State : int {
enum class State {
Unsynced,
Syncing,
Resolved,
@@ -49,12 +45,12 @@ public:
using CallbackHandle = size_t;
using StateChangeCallback = std::function<void(State)>;
static documentation::Documentation Documentation();
static std::unique_ptr<ResourceSynchronization> createFromDictionary(
const ghoul::Dictionary& dictionary);
ResourceSynchronization(const ghoul::Dictionary& dictionary);
virtual ~ResourceSynchronization();
virtual ~ResourceSynchronization() = default;
virtual std::string directory() = 0;
virtual void start() = 0;
virtual void cancel() = 0;
@@ -73,6 +69,8 @@ public:
CallbackHandle addStateChangeCallback(StateChangeCallback cb);
void removeStateChangeCallback(CallbackHandle id);
static documentation::Documentation Documentation();
protected:
void resolve();
void reject();
@@ -83,7 +81,7 @@ private:
void setState(State state);
std::string _name;
std::atomic<State> _state;
std::atomic<State> _state = State::Unsynced;
std::mutex _callbackMutex;
CallbackHandle _nextCallbackId = 0;
std::unordered_map<CallbackHandle, StateChangeCallback> _stateChangeCallbacks;
+59 -73
View File
@@ -47,27 +47,44 @@ namespace {
constexpr const char* QueryKeyFileVersion = "file_version";
constexpr const char* QueryKeyApplicationVersion = "application_version";
constexpr const int ApplicationVersion = 1;
}
} // namespace
namespace openspace {
HttpSynchronization::HttpSynchronization(
const ghoul::Dictionary& dict,
const std::string& synchronizationRoot,
const std::vector<std::string>& synchronizationRepositories
documentation::Documentation HttpSynchronization::Documentation() {
using namespace openspace::documentation;
return {
"HttpSynchronization",
"http_synchronization",
{
{
KeyIdentifier,
new StringVerifier,
Optional::No,
"A unique identifier for this resource"
},
{
KeyVersion,
new IntVerifier,
Optional::No,
"The version of this resource"
}
}
};
}
HttpSynchronization::HttpSynchronization(const ghoul::Dictionary& dict,
const std::string& synchronizationRoot,
const std::vector<std::string>& synchronizationRepositories
)
: openspace::ResourceSynchronization(dict)
, _nTotalBytesKnown(false)
, _nTotalBytes(0)
, _nSynchronizedBytes(0)
, _shouldCancel(false)
, _synchronizationRoot(synchronizationRoot)
, _synchronizationRepositories(synchronizationRepositories)
{
documentation::testSpecificationAndThrow(
Documentation(),
dict,
"HttpSynchroniztion"
"HttpSynchronization"
);
_identifier = dict.value<std::string>(KeyIdentifier);
@@ -81,28 +98,6 @@ HttpSynchronization::~HttpSynchronization() {
}
}
documentation::Documentation HttpSynchronization::Documentation() {
using namespace openspace::documentation;
return {
"HttpSynchronization",
"http_synchronization",
{
{
KeyIdentifier,
new StringVerifier,
Optional::No,
"A unique identifier for this resource"
},
{
KeyVersion,
new IntVerifier,
Optional::No,
"The version of this resource"
}
}
};
}
std::string HttpSynchronization::directory() {
ghoul::filesystem::Directory d(
_synchronizationRoot +
@@ -128,10 +123,13 @@ void HttpSynchronization::start() {
return;
}
std::vector<std::string> listUrls = fileListUrls();
_syncThread = std::thread([this, listUrls] {
for (const auto& url : listUrls) {
if (trySyncFromUrl(url)) {
std::string query = std::string("?") + QueryKeyIdentifier + "=" + _identifier +
"&" + QueryKeyFileVersion + "=" + std::to_string(_version) +
"&" + QueryKeyApplicationVersion + "=" + std::to_string(ApplicationVersion);
_syncThread = std::thread([this](const std::string& query) {
for (const std::string& url : _synchronizationRepositories) {
if (trySyncFromUrl(url + query)) {
createSyncFile();
resolve();
return;
@@ -140,7 +138,7 @@ void HttpSynchronization::start() {
if (!_shouldCancel) {
reject();
}
});
}, query);
}
void HttpSynchronization::cancel() {
@@ -153,17 +151,27 @@ void HttpSynchronization::clear() {
// TODO: Remove all files from directory.
}
std::vector<std::string> HttpSynchronization::fileListUrls() {
std::string query = std::string("?") + QueryKeyIdentifier + "=" + _identifier +
"&" + QueryKeyFileVersion + "=" + std::to_string(_version) +
"&" + QueryKeyApplicationVersion + "=" + std::to_string(ApplicationVersion);
size_t HttpSynchronization::nSynchronizedBytes() {
return _nSynchronizedBytes;
}
std::vector<std::string> urls;
for (const auto& repoUrl : _synchronizationRepositories) {
urls.push_back(repoUrl + query);
}
size_t HttpSynchronization::nTotalBytes() {
return _nTotalBytes;
}
return urls;
bool HttpSynchronization::nTotalBytesIsKnown() {
return _nTotalBytesKnown;
}
void HttpSynchronization::createSyncFile() {
std::string directoryName = directory();
std::string filepath = directoryName + ".ossync";
FileSys.createDirectory(directoryName, ghoul::filesystem::Directory::Recursive::Yes);
std::ofstream syncFile(filepath, std::ofstream::out);
syncFile << "Synchronized";
syncFile.close();
}
bool HttpSynchronization::hasSyncFile() {
@@ -175,9 +183,9 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
HttpRequest::RequestOptions opt;
opt.requestTimeoutSeconds = 0;
SyncHttpMemoryDownload fileListDownload(listUrl);
fileListDownload.onProgress([this](HttpRequest::Progress) {
return !_shouldCancel;
SyncHttpMemoryDownload fileListDownload(std::move(listUrl));
fileListDownload.onProgress([&c = _shouldCancel](HttpRequest::Progress) {
return !c;
});
fileListDownload.download(opt);
@@ -246,7 +254,7 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
startedAllDownloads = true;
bool failed = false;
for (auto& d : downloads) {
for (std::unique_ptr<AsyncHttpFileDownload>& d : downloads) {
d->wait();
if (!d->hasSucceeded()) {
failed = true;
@@ -255,32 +263,10 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
if (!failed) {
return true;
}
for (auto& d : downloads) {
for (std::unique_ptr<AsyncHttpFileDownload>& d : downloads) {
d->cancel();
}
return false;
}
void HttpSynchronization::createSyncFile() {
std::string dir = directory();
std::string filepath = dir + ".ossync";
FileSys.createDirectory(dir, ghoul::filesystem::Directory::Recursive::Yes);
std::ofstream syncFile(filepath, std::ofstream::out);
syncFile << "Synchronized";
syncFile.close();
}
size_t HttpSynchronization::nSynchronizedBytes() {
return _nSynchronizedBytes;
}
size_t HttpSynchronization::nTotalBytes() {
return _nTotalBytes;
}
bool HttpSynchronization::nTotalBytesIsKnown() {
return _nTotalBytesKnown;
}
} // namespace openspace
+10 -11
View File
@@ -26,23 +26,21 @@
#define __OPENSPACE_MODULE_SYNC___HTTPSYNCHRONIZATION___H__
#include <openspace/util/resourcesynchronization.h>
#include <openspace/documentation/documentation.h>
#include <openspace/documentation/documentation.h>
#include <ghoul/misc/dictionary.h>
namespace openspace {
class HttpSynchronization : public ResourceSynchronization {
public:
HttpSynchronization(
const ghoul::Dictionary& dict,
HttpSynchronization(const ghoul::Dictionary& dict,
const std::string& synchronizationRoot,
const std::vector<std::string>& synchronizationRepositories
);
virtual ~HttpSynchronization();
static documentation::Documentation Documentation();
std::string directory() override;
void start() override;
@@ -53,17 +51,18 @@ public:
size_t nTotalBytes() override;
bool nTotalBytesIsKnown() override;
static documentation::Documentation Documentation();
private:
std::vector<std::string> fileListUrls();
bool trySyncFromUrl(std::string url);
bool hasSyncFile();
void createSyncFile();
bool hasSyncFile();
bool trySyncFromUrl(std::string url);
std::atomic_bool _nTotalBytesKnown;
std::atomic_size_t _nTotalBytes;
std::atomic_size_t _nSynchronizedBytes;
std::atomic_bool _nTotalBytesKnown = false;
std::atomic_size_t _nTotalBytes = 0;
std::atomic_size_t _nSynchronizedBytes = 0;
std::atomic_bool _shouldCancel = false;
std::atomic_bool _shouldCancel;
std::string _identifier;
int _version;
std::string _synchronizationRoot;
+38 -39
View File
@@ -22,7 +22,7 @@
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
****************************************************************************************/
#include "torrentsynchronization.h"
#include <modules/sync/syncs/torrentsynchronization.h>
#include <modules/sync/syncmodule.h>
@@ -41,28 +41,43 @@ namespace {
constexpr const char* _loggerCat = "TorrentSynchronization";
constexpr const char* KeyIdentifier = "Identifier";
constexpr const char* KeyMagnet = "Magnet";
}
} // namespace
namespace openspace {
documentation::Documentation TorrentSynchronization::Documentation() {
using namespace openspace::documentation;
return {
"TorrentSynchronization",
"torrent_synchronization",
{
{
KeyIdentifier,
new StringVerifier,
Optional::No,
"A unique identifier for this torrent"
},
{
KeyMagnet,
new StringVerifier,
Optional::No,
"A magnet link identifying the torrent"
}
}
};
}
TorrentSynchronization::TorrentSynchronization(const ghoul::Dictionary& dict,
const std::string& synchronizationRoot,
TorrentClient& torrentClient)
: ResourceSynchronization(dict)
, _enabled(false)
, _synchronizationRoot(synchronizationRoot)
, _torrentClient(torrentClient)
{
documentation::testSpecificationAndThrow(
ResourceSynchronization::Documentation(),
dict,
"ResourceSynchronization::TorrentSynchronization"
);
documentation::testSpecificationAndThrow(
Documentation(),
dict,
"TorrentSynchronization::TorrentSynchronization"
"TorrentSynchronization"
);
_identifier = dict.value<std::string>(KeyIdentifier);
@@ -73,36 +88,18 @@ TorrentSynchronization::~TorrentSynchronization() {
cancel();
}
documentation::Documentation TorrentSynchronization::Documentation() {
using namespace openspace::documentation;
return {
"TorrentSynchronization",
"torrent_synchronization",
{
{
KeyIdentifier,
new StringVerifier,
Optional::No,
"A unique identifier for this torrent"
},
{
KeyMagnet,
new StringVerifier,
Optional::No,
"A magnet link identifying the torrent"
}
}
};
}
std::string TorrentSynchronization::uniformResourceName() const {
size_t begin = _magnetLink.find("=urn") + 1;
size_t end = _magnetLink.find('&', begin);
std::string xs = _magnetLink.substr(begin, end == std::string::npos ?
end : (end - begin));
std::string xs = _magnetLink.substr(
begin,
end == std::string::npos ? end : (end - begin)
);
std::transform(xs.begin(), xs.end(), xs.begin(), [](char x) {
if (x == ':') return '.';
if (x == ':') {
return '.';
}
return x;
});
return xs;
@@ -168,9 +165,11 @@ bool TorrentSynchronization::hasSyncFile() {
}
void TorrentSynchronization::createSyncFile() {
std::string dir = directory();
std::string filepath = dir + ".ossync";
FileSys.createDirectory(dir, ghoul::filesystem::Directory::Recursive::Yes);
std::string directoryName = directory();
std::string filepath = directoryName + ".ossync";
FileSys.createDirectory(directoryName, ghoul::filesystem::Directory::Recursive::Yes);
std::ofstream syncFile(filepath, std::ofstream::out);
syncFile << "Synchronized";
syncFile.close();
@@ -196,7 +195,7 @@ void TorrentSynchronization::updateTorrentProgress(
{
std::lock_guard<std::mutex> g(_progressMutex);
_progress = progress;
if (progress.finished && state() == State::Syncing) {
if (progress.finished && (state() == State::Syncing)) {
createSyncFile();
resolve();
}
+7 -8
View File
@@ -25,11 +25,10 @@
#ifndef __OPENSPACE_MODULE_SYNC___TORRENTSYNCHRONIZATION___H__
#define __OPENSPACE_MODULE_SYNC___TORRENTSYNCHRONIZATION___H__
#include <modules/sync/torrentclient.h>
#include <openspace/util/resourcesynchronization.h>
#include <openspace/documentation/documentation.h>
#include <modules/sync/torrentclient.h>
#include <openspace/documentation/documentation.h>
#include <ghoul/misc/dictionary.h>
namespace openspace {
@@ -39,12 +38,9 @@ class TorrentSynchronizationJob;
class TorrentSynchronization : public ResourceSynchronization {
public:
TorrentSynchronization(const ghoul::Dictionary& dict,
const std::string& synchronizationRoot,
TorrentClient& client);
const std::string& synchronizationRoot, TorrentClient& client);
virtual ~TorrentSynchronization();
static documentation::Documentation Documentation();
std::string directory() override;
void start() override;
@@ -55,13 +51,16 @@ public:
size_t nTotalBytes() override;
bool nTotalBytesIsKnown() override;
static documentation::Documentation Documentation();
private:
void updateTorrentProgress(TorrentClient::TorrentProgress p);
std::string uniformResourceName() const;
bool hasSyncFile();
void createSyncFile();
std::atomic_bool _enabled;
std::atomic_bool _enabled = false;
TorrentClient::TorrentId _torrentId;
TorrentClient::TorrentProgress _progress;
std::mutex _progressMutex;
+44 -47
View File
@@ -39,7 +39,7 @@
namespace {
constexpr const char* _loggerCat = "TorrentClient";
std::chrono::milliseconds PollInterval(1000);
constexpr const std::chrono::milliseconds PollInterval(1000);
} // namespace
namespace openspace {
@@ -48,8 +48,6 @@ TorrentError::TorrentError(std::string message)
: RuntimeError(std::move(message), "TorrentClient")
{}
TorrentClient::TorrentClient() : _active(false) {}
TorrentClient::~TorrentClient() {
deinitialize();
}
@@ -57,9 +55,6 @@ TorrentClient::~TorrentClient() {
void TorrentClient::initialize() {
#ifdef SYNC_USE_LIBTORRENT
libtorrent::settings_pack settings;
_session = std::make_unique<libtorrent::session>();
settings.set_str(libtorrent::settings_pack::user_agent, "OpenSpace/" +
std::to_string(openspace::OPENSPACE_VERSION_MAJOR) + "." +
std::to_string(openspace::OPENSPACE_VERSION_MINOR) + "." +
@@ -73,21 +68,23 @@ void TorrentClient::initialize() {
settings.set_int(libtorrent::settings_pack::active_seeds, -1);
settings.set_int(libtorrent::settings_pack::active_limit, 30);
settings.set_int(libtorrent::settings_pack::dht_announce_interval, 15);
_session->apply_settings(settings);
_session.apply_settings(settings);
_session->add_dht_router({ "router.utorrent.com", 6881 });
_session->add_dht_router({ "dht.transmissionbt.com", 6881 });
_session->add_dht_router({ "router.bittorrent.com", 6881 });
_session->add_dht_router({ "router.bitcomet.com", 6881 });
_session.add_dht_router({ "router.utorrent.com", 6881 });
_session.add_dht_router({ "dht.transmissionbt.com", 6881 });
_session.add_dht_router({ "router.bittorrent.com", 6881 });
_session.add_dht_router({ "router.bitcomet.com", 6881 });
libtorrent::error_code ec;
_session->listen_on(std::make_pair(20280, 20290), ec);
_session->start_upnp();
_session.listen_on(std::make_pair(20280, 20290), ec);
_session.start_upnp();
_active = true;
_isInitialized = true;
_isActive = true;
_torrentThread = std::thread([this]() {
while (_active) {
while (_isActive) {
pollAlerts();
std::unique_lock<std::mutex> lock(_abortMutex);
_abortNotifier.wait_for(lock, PollInterval);
@@ -98,24 +95,26 @@ void TorrentClient::initialize() {
void TorrentClient::deinitialize() {
#ifdef SYNC_USE_LIBTORRENT
if (!_active) {
if (!_isActive) {
return;
}
_active = false;
_isActive = false;
_abortNotifier.notify_all();
if (_torrentThread.joinable()) {
_torrentThread.join();
}
std::vector<lt::torrent_handle> handles = _session->get_torrents();
for (lt::torrent_handle h : handles) {
_session->remove_torrent(h);
std::vector<lt::torrent_handle> handles = _session.get_torrents();
for (const lt::torrent_handle& h : handles) {
_session.remove_torrent(h);
}
_torrents.clear();
_session->abort();
_session = nullptr;
_session.abort();
_isInitialized = false;
#endif // SYNC_USE_LIBTORRENT
}
@@ -142,76 +141,75 @@ void TorrentClient::pollAlerts() {
std::vector<lt::torrent_handle> handles;
{
std::lock_guard<std::mutex> guard(_mutex);
handles = _session->get_torrents();
handles = _session.get_torrents();
}
for (lt::torrent_handle h : handles) {
for (const lt::torrent_handle& h : handles) {
notify(h.id());
}
#endif // SYNC_USE_LIBTORRENT
}
TorrentClient::TorrentId TorrentClient::addTorrentFile(std::string torrentFile,
std::string destination,
TorrentClient::TorrentId TorrentClient::addTorrentFile(const std::string& torrentFile,
const std::string& destination,
TorrentProgressCallback cb)
{
#ifdef SYNC_USE_LIBTORRENT
std::lock_guard<std::mutex> guard(_mutex);
if (!_session) {
if (!_isInitialized) {
LERROR("Torrent session not initialized when adding torrent");
return -1;
}
libtorrent::error_code ec;
libtorrent::add_torrent_params p;
p.save_path = destination;
p.ti = std::make_shared<libtorrent::torrent_info>(torrentFile, ec, 0);
libtorrent::torrent_handle h = _session->add_torrent(p, ec);
if (ec) {
LERROR(torrentFile << ": " << ec.message());
}
libtorrent::torrent_handle h = _session.add_torrent(p, ec);
if (ec) {
LERROR(torrentFile << ": " << ec.message());
}
TorrentId id = h.id();
_torrents.emplace(id, Torrent{id, h, cb});
_torrents.emplace(id, Torrent{id, h, std::move(cb)});
return id;
#else // SYNC_USE_LIBTORRENT
throw TorrentError("SyncModule is compiled without libtorrent");
throw TorrentError("SyncModule is compiled without libtorrent support");
#endif // SYNC_USE_LIBTORRENT
}
TorrentClient::TorrentId TorrentClient::addMagnetLink(std::string magnetLink,
std::string destination,
TorrentClient::TorrentId TorrentClient::addMagnetLink(const std::string& magnetLink,
const std::string& destination,
TorrentProgressCallback cb)
{
#ifdef SYNC_USE_LIBTORRENT
std::lock_guard<std::mutex> guard(_mutex);
// TODO: register callback!
if (!_session) {
if (!_isInitialized) {
LERROR("Torrent session not initialized when adding torrent");
return -1;
}
libtorrent::error_code ec;
libtorrent::add_torrent_params p = libtorrent::parse_magnet_uri(magnetLink, ec);
if (ec) {
LERROR(magnetLink << ": " << ec.message());
}
p.save_path = destination;
p.storage_mode = libtorrent::storage_mode_allocate;
libtorrent::torrent_handle h = _session->add_torrent(p, ec);
libtorrent::torrent_handle h = _session.add_torrent(p, ec);
if (ec) {
LERROR(magnetLink << ": " << ec.message());
}
TorrentId id = h.id();
_torrents.emplace(id, Torrent{id, h, cb});
_torrents.emplace(id, Torrent{id, h, std::move(cb)});
return id;
#else // SYNC_USE_LIBTORRENT
throw TorrentError("SyncModule is compiled without libtorrent");
throw TorrentError("SyncModule is compiled without libtorrent support");
#endif // SYNC_USE_LIBTORRENT
}
@@ -225,7 +223,7 @@ void TorrentClient::removeTorrent(TorrentId id) {
}
libtorrent::torrent_handle h = it->second.handle;
_session->remove_torrent(h);
_session.remove_torrent(h);
_torrents.erase(it);
#endif // SYNC_USE_LIBTORRENT
@@ -239,12 +237,12 @@ void TorrentClient::notify(TorrentId id) {
{
std::lock_guard<std::mutex> guard(_mutex);
auto torrent = _torrents.find(id);
if (torrent == _torrents.end()) {
auto it = _torrents.find(id);
if (it == _torrents.end()) {
return;
}
libtorrent::torrent_handle h = torrent->second.handle;
libtorrent::torrent_handle h = it->second.handle;
libtorrent::torrent_status status = h.status();
progress.finished = status.is_finished;
@@ -252,7 +250,7 @@ void TorrentClient::notify(TorrentId id) {
progress.nTotalBytes = status.total_wanted;
progress.nDownloadedBytes = status.total_wanted_done;
callback = torrent->second.callback;
callback = it->second.callback;
}
callback(progress);
@@ -260,4 +258,3 @@ void TorrentClient::notify(TorrentId id) {
}
} // namespace openspace
+24 -24
View File
@@ -26,20 +26,17 @@
#define __OPENSPACE_MODULE_SYNC___TORRENTCLIENT___H__
#include <ghoul/misc/exception.h>
#include <functional>
#include <atomic>
#include <string>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#ifdef SYNC_USE_LIBTORRENT
#include "libtorrent/torrent_handle.hpp"
namespace libtorrent { class session; }
#include <libtorrent/torrent_handle.hpp>
#include <libtorrent/session.hpp>
#else // SYNC_USE_LIBTORRENT
// Dummy definition to make TorrentClient compile, these is not actually used if
@@ -47,11 +44,10 @@ namespace libtorrent { class session; }
namespace libtorrent {
using torrent_handle = void*;
using session = void*;
}
} // namespace libtorrent
#endif // SYNC_USE_LIBTORRENT
namespace openspace {
struct TorrentError : public ghoul::RuntimeError {
@@ -77,20 +73,20 @@ public:
using TorrentProgressCallback = std::function<void(TorrentProgress)>;
using TorrentId = int32_t;
TorrentClient();
virtual ~TorrentClient();
//TorrentClient();
~TorrentClient();
void initialize();
void deinitialize();
TorrentId addTorrentFile(std::string torrentFile, std::string destination,
TorrentProgressCallback cb);
TorrentId addMagnetLink(std::string magnetLink, std::string destination,
TorrentId addTorrentFile(const std::string& torrentFile,
const std::string& destination, TorrentProgressCallback cb);
TorrentId addMagnetLink(const std::string& magnetLink, const std::string& destination,
TorrentProgressCallback cb);
void removeTorrent(TorrentId id);
void pollAlerts();
private:
struct Torrent {
TorrentId id;
@@ -99,14 +95,18 @@ private:
};
void notify(TorrentId id);
void pollAlerts();
libtorrent::session _session;
bool _isInitialized = false;
std::atomic_bool _isActive = false;
std::thread _torrentThread;
std::condition_variable _abortNotifier;
std::mutex _abortMutex;
std::mutex _mutex;
std::unordered_map<TorrentId, Torrent> _torrents;
std::unique_ptr<libtorrent::session> _session;
std::thread _torrentThread;
std::mutex _mutex;
std::atomic_bool _active;
std::mutex _abortMutex;
std::condition_variable _abortNotifier;
};
} // namespace openspace
+17 -20
View File
@@ -39,7 +39,7 @@ namespace {
constexpr const char* KeyType = "Type";
constexpr const char* KeyName = "Name";
constexpr const char* _loggerCat = "ResourceSynchronization";
}
} // namespace
namespace openspace {
@@ -72,14 +72,6 @@ documentation::Documentation ResourceSynchronization::Documentation() {
};
}
ResourceSynchronization::ResourceSynchronization(const ghoul::Dictionary& dict)
: _state(State::Unsynced)
{
_name = dict.value<std::string>(KeyName);
}
ResourceSynchronization::~ResourceSynchronization() {}
std::unique_ptr<ResourceSynchronization> ResourceSynchronization::createFromDictionary(
const ghoul::Dictionary & dictionary)
{
@@ -92,16 +84,19 @@ std::unique_ptr<ResourceSynchronization> ResourceSynchronization::createFromDict
ghoul_assert(factory, "ResourceSynchronization factory did not exist");
std::unique_ptr<ResourceSynchronization> result =
factory->create(synchronizationType, dictionary);
if (result == nullptr) {
LERROR("Failed to create a ResourceSynchronization object of type '" <<
synchronizationType << "'");
return nullptr;
}
return result;
}
ResourceSynchronization::ResourceSynchronization(const ghoul::Dictionary& dict) {
documentation::testSpecificationAndThrow(
Documentation(),
dict,
"ResourceSynchronization"
);
_name = dict.value<std::string>(KeyName);
}
ResourceSynchronization::State ResourceSynchronization::state() const {
return _state;
}
@@ -119,7 +114,7 @@ bool ResourceSynchronization::isSyncing() {
}
ResourceSynchronization::CallbackHandle
ResourceSynchronization::addStateChangeCallback(StateChangeCallback cb)
ResourceSynchronization::addStateChangeCallback(StateChangeCallback cb)
{
std::lock_guard<std::mutex> guard(_callbackMutex);
CallbackHandle callbackId = _nextCallbackId++;
@@ -150,14 +145,16 @@ void ResourceSynchronization::begin() {
void ResourceSynchronization::setState(State state) {
_state = state;
_callbackMutex.lock();
std::vector<StateChangeCallback> callbacks;
callbacks.reserve(_stateChangeCallbacks.size());
for (const auto& it : _stateChangeCallbacks) {
for (const std::pair<CallbackHandle, StateChangeCallback>& it : _stateChangeCallbacks)
{
callbacks.push_back(it.second);
}
_callbackMutex.unlock();
for (auto& cb : callbacks) {
for (const StateChangeCallback& cb : callbacks) {
cb(state);
}
}
@@ -176,4 +173,4 @@ std::string ResourceSynchronization::name() const {
return _name;
}
}
} // namespace openspace