From 6a4f938baca4dc550ea1f180b71f0b7925c2b5e9 Mon Sep 17 00:00:00 2001 From: Emil Axelsson Date: Tue, 14 Nov 2017 13:47:22 +0100 Subject: [PATCH] Thread safety in http requests --- include/openspace/util/httprequest.h | 18 +++----- modules/sync/syncs/httpsynchronization.cpp | 46 ++++++++++++------- modules/sync/syncs/httpsynchronization.h | 4 +- src/util/httprequest.cpp | 51 +++++++++++----------- 4 files changed, 64 insertions(+), 55 deletions(-) diff --git a/include/openspace/util/httprequest.h b/include/openspace/util/httprequest.h index b94a4ef816..85ab36e57a 100644 --- a/include/openspace/util/httprequest.h +++ b/include/openspace/util/httprequest.h @@ -33,9 +33,8 @@ #include #include #include -#include -#include #include +#include namespace openspace { @@ -125,10 +124,11 @@ private: class HttpDownload { public: - HttpDownload(); - HttpDownload(HttpDownload&& d); - virtual ~HttpDownload() = default; using ProgressCallback = std::function; + + HttpDownload(); + HttpDownload(HttpDownload&& d) = default; + virtual ~HttpDownload() = default; void onProgress(ProgressCallback progressCallback); bool hasStarted(); bool hasFailed(); @@ -144,7 +144,6 @@ protected: void markAsSuccessful(); void markAsFailed(); private: - std::mutex _onProgressMutex; ProgressCallback _onProgress; bool _started = false; bool _failed = false; @@ -174,9 +173,10 @@ protected: private: HttpRequest _httpRequest; std::thread _downloadThread; - std::mutex _mutex; + std::mutex _conditionMutex; std::condition_variable _downloadFinishCondition; bool _shouldCancel = false; + std::mutex _stateChangeMutex; }; class HttpFileDownload : public virtual HttpDownload { @@ -212,7 +212,6 @@ private: class SyncHttpMemoryDownload : public SyncHttpDownload, public HttpMemoryDownload { public: SyncHttpMemoryDownload(std::string url); - SyncHttpMemoryDownload(SyncHttpMemoryDownload&&) = default; virtual ~SyncHttpMemoryDownload() = default; }; @@ -220,7 +219,6 @@ public: class SyncHttpFileDownload : public SyncHttpDownload, public HttpFileDownload { public: SyncHttpFileDownload(std::string url, std::string destinationPath); - SyncHttpFileDownload(SyncHttpFileDownload&&) = default; virtual ~SyncHttpFileDownload() = default; }; @@ -228,7 +226,6 @@ public: class AsyncHttpMemoryDownload : public AsyncHttpDownload, public HttpMemoryDownload { public: AsyncHttpMemoryDownload(std::string url); - AsyncHttpMemoryDownload(AsyncHttpMemoryDownload&&) = default; virtual ~AsyncHttpMemoryDownload() = default; }; @@ -236,7 +233,6 @@ public: class AsyncHttpFileDownload : public AsyncHttpDownload, public HttpFileDownload { public: AsyncHttpFileDownload(std::string url, std::string destinationPath); - AsyncHttpFileDownload(AsyncHttpFileDownload&&) = default; virtual ~AsyncHttpFileDownload() = default; }; diff --git a/modules/sync/syncs/httpsynchronization.cpp b/modules/sync/syncs/httpsynchronization.cpp index 105754e948..aded39c779 100644 --- a/modules/sync/syncs/httpsynchronization.cpp +++ b/modules/sync/syncs/httpsynchronization.cpp @@ -37,6 +37,7 @@ #include #include #include +#include namespace { const char* _loggerCat = "HttpSynchronization"; @@ -71,6 +72,13 @@ HttpSynchronization::HttpSynchronization(const ghoul::Dictionary& dict) _synchronizationRepositories = syncModule->httpSynchronizationRepositories(); } +HttpSynchronization::~HttpSynchronization() { + if (_syncThread.joinable()) { + cancel(); + _syncThread.join(); + } +} + documentation::Documentation HttpSynchronization::Documentation() { using namespace openspace::documentation; return { @@ -127,9 +135,11 @@ void HttpSynchronization::start() { } void HttpSynchronization::cancel() { + _shouldCancel = true; } void HttpSynchronization::clear() { + // TODO: Remove all files from directory. } std::vector HttpSynchronization::fileListUrls() { @@ -165,7 +175,6 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) { } const std::vector& buffer = fileListDownload.downloadedData(); - _nSynchronizedBytes = 0; _nTotalBytes = 0; _nTotalBytesKnown = false; @@ -179,7 +188,7 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) { std::atomic_bool startedAllDownloads = false; std::atomic_size_t nDownloads = 0; - std::vector downloads; + std::vector> downloads; while (fileList >> line) { size_t lastSlash = line.find_last_of('/'); @@ -189,20 +198,18 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) { ghoul::filesystem::FileSystem::PathSeparator + filename; - downloads.push_back(AsyncHttpFileDownload(line, fileDestination)); - AsyncHttpFileDownload& fileDownload = downloads.back(); + downloads.push_back(std::make_unique(line, fileDestination)); + auto& fileDownload = downloads.back(); - bool reportedFileSize = false; ++nDownloads; - fileDownload.onProgress( - [this, line, &reportedFileSize, &fileSizes, &fileSizeMutex, + fileDownload->onProgress( + [this, line, &fileSizes, &fileSizeMutex, &startedAllDownloads, &nDownloads](HttpRequest::Progress p) { - if (!reportedFileSize && p.totalBytesKnown) { + if (p.totalBytesKnown) { std::lock_guard guard(fileSizeMutex); fileSizes[line] = p.totalBytes; - reportedFileSize = true; if (!_nTotalBytesKnown && startedAllDownloads && fileSizes.size() == nDownloads) { _nTotalBytesKnown = true; @@ -215,17 +222,24 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) { return !_shouldCancel; }); - fileDownload.start(opt); + fileDownload->start(opt); } startedAllDownloads = true; + bool failed = false; for (auto& d : downloads) { - d.wait(); - if (!d.hasSucceeded()) { - return false; + d->wait(); + if (!d->hasSucceeded()) { + failed = true; } } - return true; + if (!failed) { + return true; + } + for (auto& d : downloads) { + d->cancel(); + } + return false; } void HttpSynchronization::createSyncFile() { @@ -242,11 +256,11 @@ size_t HttpSynchronization::nSynchronizedBytes() { } size_t HttpSynchronization::nTotalBytes() { - return 0; + return _nTotalBytes; } bool HttpSynchronization::nTotalBytesIsKnown() { - return false; + return _nTotalBytesKnown; } } // namespace openspace diff --git a/modules/sync/syncs/httpsynchronization.h b/modules/sync/syncs/httpsynchronization.h index 4c95098fff..0ae89e989c 100644 --- a/modules/sync/syncs/httpsynchronization.h +++ b/modules/sync/syncs/httpsynchronization.h @@ -35,7 +35,7 @@ namespace openspace { class HttpSynchronization : public ResourceSynchronization { public: HttpSynchronization(const ghoul::Dictionary& dict); - virtual ~HttpSynchronization() = default; + virtual ~HttpSynchronization(); static documentation::Documentation Documentation(); @@ -58,7 +58,7 @@ private: std::atomic_size_t _nTotalBytes; std::atomic_size_t _nSynchronizedBytes; - std::atomic_bool _shouldCancel; + std::atomic_bool _shouldCancel = false; std::string _identifier; int _version; std::string _synchronizationRoot; diff --git a/src/util/httprequest.cpp b/src/util/httprequest.cpp index 8d7c81246d..91432b8678 100644 --- a/src/util/httprequest.cpp +++ b/src/util/httprequest.cpp @@ -70,9 +70,6 @@ int progressCallback( static_cast(nDownloadedBytes) } ); - - LINFO("Transfer info from curl: " << nDownloadedBytes << " out of " << nTotalBytes); - return 0; } size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userData) { @@ -129,22 +126,14 @@ void HttpRequest::perform(RequestOptions opt) { void HttpRequest::setReadyState(openspace::HttpRequest::ReadyState state) { _readyState = state; + _onReadyStateChange(state); } HttpDownload::HttpDownload() : _onProgress([] (HttpRequest::Progress) { return true; }) {} -HttpDownload::HttpDownload(HttpDownload&& d) - : _onProgress(std::move(d._onProgress)) - , _started(std::move(d._started)) - , _failed(std::move(d._failed)) - , _successful(std::move(d._successful)) -{} - - void HttpDownload::onProgress(ProgressCallback progressCallback) { - std::lock_guard guard(_onProgressMutex); _onProgress = progressCallback; } @@ -171,7 +160,6 @@ void HttpDownload::markAsSuccessful() { } bool HttpDownload::callOnProgress(HttpRequest::Progress p) { - std::lock_guard guard(_onProgressMutex); return _onProgress(p); } @@ -189,6 +177,13 @@ void SyncHttpDownload::download(HttpRequest::RequestOptions opt) { // if onProgress returns false. return callOnProgress(p) ? 0 : 1; }); + _httpRequest.onReadyStateChange([this](HttpRequest::ReadyState rs) { + if (rs == HttpRequest::ReadyState::Success) { + markAsSuccessful(); + } else if (rs == HttpRequest::ReadyState::Fail) { + markAsFailed(); + } + }); _httpRequest.perform(opt); deinitDownload(); } @@ -204,42 +199,41 @@ AsyncHttpDownload::AsyncHttpDownload(AsyncHttpDownload&& d) {} void AsyncHttpDownload::start(HttpRequest::RequestOptions opt) { - std::lock_guard guard(_mutex); + std::lock_guard guard(_stateChangeMutex); if (hasStarted()) { return; } + markAsStarted(); _downloadThread = std::thread([this, opt] { download(opt); }); - markAsStarted(); } void AsyncHttpDownload::cancel() { - std::lock_guard guard(_mutex); _shouldCancel = true; } void AsyncHttpDownload::wait() { - std::unique_lock lock(_mutex); + std::unique_lock lock(_conditionMutex); _downloadFinishCondition.wait(lock, [this] { return hasFailed() || hasSucceeded(); }); + if (_downloadThread.joinable()) { + _downloadThread.join(); + } } void AsyncHttpDownload::download(HttpRequest::RequestOptions opt) { - std::unique_lock lock(_mutex); - initDownload(); _httpRequest.onData([this](HttpRequest::Data d) { - std::lock_guard guard(_mutex); return handleData(d); }); _httpRequest.onProgress([this](HttpRequest::Progress p) { // Return a non-zero value to cancel download // if onProgress returns false. - std::lock_guard guard(_mutex); + //std::lock_guard guard(_mutex); bool shouldContinue = callOnProgress(p); if (!shouldContinue) { return 1; @@ -247,19 +241,23 @@ void AsyncHttpDownload::download(HttpRequest::RequestOptions opt) { if (_shouldCancel) { return 1; } - if (p.totalBytesKnown && p.downloadedBytes == p.totalBytes) { - markAsSuccessful(); - } return 0; }); - lock.release(); + _httpRequest.onReadyStateChange([this](HttpRequest::ReadyState rs) { + if (rs == HttpRequest::ReadyState::Success) { + markAsSuccessful(); + } + else if (rs == HttpRequest::ReadyState::Fail) { + markAsFailed(); + } + }); + _httpRequest.perform(opt); if (!hasSucceeded()) { markAsFailed(); } _downloadFinishCondition.notify_all(); - lock.lock(); deinitDownload(); } @@ -331,4 +329,5 @@ AsyncHttpFileDownload::AsyncHttpFileDownload(std::string url, std::string destin , HttpFileDownload(std::move(destinationPath)) {} + } // namespace openspace