Thread safety in torrentclient. Add syncfile to torrentsync.

This commit is contained in:
Emil Axelsson
2017-12-12 18:36:09 +01:00
parent a774b7b5b2
commit 1926b59f91
6 changed files with 75 additions and 11 deletions

View File

@@ -34,6 +34,8 @@
#include <ghoul/logging/logmanager.h>
#include <ghoul/filesystem/filesystem.h>
#include <fstream>
namespace {
const char* _loggerCat = "TorrentSynchronization";
@@ -115,6 +117,11 @@ void TorrentSynchronization::start() {
return;
}
begin();
if (hasSyncFile()) {
resolve();
}
_enabled = true;
_torrentId = _torrentClient->addMagnetLink(
_magnetLink,
@@ -123,7 +130,6 @@ void TorrentSynchronization::start() {
updateTorrentProgress(p);
}
);
}
void TorrentSynchronization::cancel() {
@@ -139,6 +145,19 @@ void TorrentSynchronization::clear() {
cancel();
}
bool TorrentSynchronization::hasSyncFile() {
std::string path = directory() + ".ossync";
return FileSys.fileExists(path);
}
void TorrentSynchronization::createSyncFile() {
std::string dir = directory();
std::string filepath = dir + ".ossync";
FileSys.createDirectory(dir, ghoul::filesystem::Directory::Recursive::Yes);
std::ofstream syncFile(filepath, std::ofstream::out);
syncFile << "Synchronized";
syncFile.close();
}
size_t TorrentSynchronization::nSynchronizedBytes() {
return _progress.nDownloadedBytes;
@@ -155,6 +174,7 @@ bool TorrentSynchronization::nTotalBytesIsKnown() {
void TorrentSynchronization::updateTorrentProgress(TorrentClient::TorrentProgress progress) {
_progress = progress;
if (progress.finished && state() == State::Syncing) {
createSyncFile();
resolve();
}
}

View File

@@ -58,6 +58,8 @@ public:
private:
void updateTorrentProgress(TorrentClient::TorrentProgress p);
std::string uniformResourceName() const;
bool hasSyncFile();
void createSyncFile();
std::atomic_bool _enabled = false;
size_t _torrentId;

View File

@@ -87,8 +87,11 @@ void TorrentClient::initialize() {
void TorrentClient::pollAlerts() {
std::vector<libtorrent::alert*> alerts;
_session->pop_alerts(&alerts);
{
std::lock_guard<std::mutex> guard(_mutex);
_session->pop_alerts(&alerts);
}
for (lt::alert const* a : alerts) {
//LINFO(a->message());
// if we receive the finished alert or an error, we're done
@@ -111,6 +114,8 @@ void TorrentClient::pollAlerts() {
}
size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destination, TorrentProgressCallback cb) {
std::lock_guard<std::mutex> guard(_mutex);
if (!_session) {
LERROR("Torrent session not initialized when adding torrent");
return -1;
@@ -132,6 +137,8 @@ size_t TorrentClient::addTorrentFile(std::string torrentFile, std::string destin
}
size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destination, TorrentProgressCallback cb) {
std::lock_guard<std::mutex> guard(_mutex);
// TODO: register callback!
if (!_session) {
LERROR("Torrent session not initialized when adding torrent");
@@ -153,9 +160,23 @@ size_t TorrentClient::addMagnetLink(std::string magnetLink, std::string destinat
}
void TorrentClient::removeTorrent(TorrentId id) {
std::lock_guard<std::mutex> guard(_mutex);
const auto it = _torrents.find(id);
if (it == _torrents.end()) {
return;
}
libtorrent::torrent_handle h = it->second.handle;
_session->remove_torrent(h);
_torrents.erase(it);
}
void TorrentClient::notify(TorrentId id) {
std::lock_guard<std::mutex> guard(_mutex);
const auto& torrent = _torrents.find(id);
if (torrent == _torrents.end()) {
return;

View File

@@ -29,6 +29,7 @@
#include <string>
#include <memory>
#include <thread>
#include <mutex>
#include <unordered_map>
#include "libtorrent/torrent_handle.hpp"
@@ -73,6 +74,7 @@ private:
std::unordered_map<TorrentId, Torrent> _torrents;
std::unique_ptr<libtorrent::session> _session;
std::thread _torrentThread;
std::mutex _mutex;
std::atomic_bool _keepRunning = true;
};

View File

@@ -108,9 +108,6 @@ void Asset::requiredAssetChangedState(std::shared_ptr<Asset> child) {
void Asset::requestedAssetChangedState(std::shared_ptr<Asset> child) {
State childState = child->state();
LINFO("requestedAssetChangedState: " << child->id() << " to " << static_cast<int>(child->state()));
if (child->hasInitializedParent()) {
if (childState == State::Loaded) {
child->startSynchronizations();
@@ -340,7 +337,7 @@ bool Asset::startSynchronizations() {
bool Asset::cancelAllSynchronizations() {
bool cancelledAnySync = false;
for (auto& child : requiredAssets()) {
for (auto& child : childAssets()) {
bool cancelled = child->cancelAllSynchronizations();
if (cancelled) {
cancelledAnySync = true;
@@ -360,7 +357,27 @@ bool Asset::cancelAllSynchronizations() {
}
bool Asset::cancelUnwantedSynchronizations() {
return false;
if (hasSyncingOrResolvedParent()) {
return false;
}
bool cancelledAnySync = false;
for (auto& child : childAssets()) {
bool cancelled = child->cancelUnwantedSynchronizations();
if (cancelled) {
cancelledAnySync = true;
}
}
for (const auto& s : ownSynchronizations()) {
if (s->isSyncing()) {
cancelledAnySync = true;
s->cancel();
setState(State::Loaded);
}
}
if (cancelledAnySync) {
setState(State::Loaded);
}
return cancelledAnySync;
}
bool Asset::restartAllSynchronizations() {
@@ -522,6 +539,8 @@ void Asset::deinitialize() {
return;
}
LDEBUG("Denitializing asset " << id());
// Notify children
for (auto& dependency : _requiredAssets) {
try {

View File

@@ -82,15 +82,15 @@ bool AssetManager::update() {
}
void AssetManager::assetStateChanged(std::shared_ptr<Asset> asset, Asset::State state) {
LINFO(asset->id() << " changed state to " << static_cast<int>(state));
//LINFO(asset->id() << " changed state to " << static_cast<int>(state));
}
void AssetManager::assetRequested(std::shared_ptr<Asset> parent, std::shared_ptr<Asset> child) {
LINFO(parent->id() << " requested " << child->id());
//LINFO(parent->id() << " requested " << child->id());
}
void AssetManager::assetUnrequested(std::shared_ptr<Asset> parent, std::shared_ptr<Asset> child) {
LINFO(parent->id() << " unrequested " << child->id());
//LINFO(parent->id() << " unrequested " << child->id());
}
void AssetManager::add(const std::string& path) {