diff --git a/include/openspace/util/httprequest.h b/include/openspace/util/httprequest.h index e26c177998..bdceebc341 100644 --- a/include/openspace/util/httprequest.h +++ b/include/openspace/util/httprequest.h @@ -74,7 +74,13 @@ public: }; using ReadyStateChangeCallback = std::function; + + // ProgressCallback: Return non-zero value to cancel download. using ProgressCallback = std::function; + + // DataCallback: Return number of bytes successfully stored. + // If this does not match the data buffer sice reported in Data, + // the request will fail. using DataCallback = std::function; struct RequestOptions { @@ -116,15 +122,33 @@ private: int64_t ulnow); }; - -class HttpDownloadInterface { +class HttpDownload { public: + HttpDownload(); + using ProgressCallback = std::function; + void onProgress(ProgressCallback progressCallback); + bool hasStarted(); + bool hasFailed(); + bool hasSucceeded(); + +protected: + virtual size_t handleData(HttpRequest::Data d) = 0; virtual bool initDownload() = 0; virtual bool deinitDownload() = 0; - virtual size_t handleData(HttpRequest::Data d) = 0; + + bool callOnProgress(HttpRequest::Progress p); + void markAsStarted(); + void markAsSuccessful(); + void markAsFailed(); +private: + std::mutex _onProgressMutex; + ProgressCallback _onProgress; + bool _started = false; + bool _failed = false; + bool _successful = false; }; -class SyncHttpDownload : public virtual HttpDownloadInterface { +class SyncHttpDownload : public virtual HttpDownload { public: SyncHttpDownload(std::string url); void download(HttpRequest::RequestOptions opt); @@ -132,7 +156,7 @@ protected: HttpRequest _httpRequest; }; -class AsyncHttpDownload : public virtual HttpDownloadInterface { +class AsyncHttpDownload : public virtual HttpDownload { public: AsyncHttpDownload(std::string url); virtual ~AsyncHttpDownload() = default; @@ -143,17 +167,13 @@ protected: void download(HttpRequest::RequestOptions opt); private: HttpRequest _httpRequest; - std::thread _downloadThread; std::mutex _mutex; std::condition_variable _downloadFinishCondition; - bool _started = false; bool _shouldCancel = false; - bool _finished = false; - bool _successful = false; }; -class HttpFileDownload : public virtual HttpDownloadInterface { +class HttpFileDownload : public virtual HttpDownload { public: HttpFileDownload(std::string destination); protected: @@ -165,7 +185,7 @@ private: std::ofstream _file; }; -class HttpMemoryDownload : public virtual HttpDownloadInterface { +class HttpMemoryDownload : public virtual HttpDownload { public: const std::vector& downloadedData(); protected: diff --git a/modules/sync/syncs/httpsynchronization.cpp b/modules/sync/syncs/httpsynchronization.cpp index 732420d61f..d29dae1734 100644 --- a/modules/sync/syncs/httpsynchronization.cpp +++ b/modules/sync/syncs/httpsynchronization.cpp @@ -153,9 +153,14 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) { opt.requestTimeoutSeconds = 0; SyncHttpMemoryDownload fileListDownload(listUrl); + fileListDownload.onProgress([this](HttpRequest::Progress p) { + return !_shouldCancel; + }); fileListDownload.download(opt); - // ... + if (!fileListDownload.hasSucceeded()) { + return false; + } const std::vector& buffer = fileListDownload.downloadedData(); @@ -177,6 +182,7 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) { }); downloadThreads.push_back(std::move(t)); } + for (auto& t : downloadThreads) { t.join(); } diff --git a/modules/sync/syncs/httpsynchronization.h b/modules/sync/syncs/httpsynchronization.h index 1c9cbf506c..6680bc370b 100644 --- a/modules/sync/syncs/httpsynchronization.h +++ b/modules/sync/syncs/httpsynchronization.h @@ -56,6 +56,11 @@ private: bool hasSyncFile(); void createSyncFile(); + std::atomic_bool _nTotalBytesKnown; + std::atomic _nTotalBytes; + std::atomic _nSynchronizedBytes; + + std::atomic_bool _shouldCancel; std::string _identifier; int _version; std::string _synchronizationRoot; diff --git a/src/util/httprequest.cpp b/src/util/httprequest.cpp index 9239b037f5..daff1f0d62 100644 --- a/src/util/httprequest.cpp +++ b/src/util/httprequest.cpp @@ -131,6 +131,42 @@ void HttpRequest::setReadyState(openspace::HttpRequest::ReadyState state) { _readyState = state; } +HttpDownload::HttpDownload() + : _onProgress([] (HttpRequest::Progress) { return true; }) +{} + +void HttpDownload::onProgress(ProgressCallback progressCallback) { + std::lock_guard guard(_onProgressMutex); + _onProgress = progressCallback; +} + +bool HttpDownload::hasStarted() { + return _started; +} + +bool HttpDownload::hasFailed() { + return _failed; +} +bool HttpDownload::hasSucceeded() { + return _successful; +} + +void HttpDownload::markAsStarted() { + _started = true; +} + +void HttpDownload::markAsFailed() { + _failed = true; +} +void HttpDownload::markAsSuccessful() { + _successful = true; +} + +bool HttpDownload::callOnProgress(HttpRequest::Progress p) { + std::lock_guard guard(_onProgressMutex); + return _onProgress(p); +} + SyncHttpDownload::SyncHttpDownload(std::string url) : _httpRequest(std::move(url)) {} @@ -140,6 +176,11 @@ void SyncHttpDownload::download(HttpRequest::RequestOptions opt) { _httpRequest.onData([this] (HttpRequest::Data d) { return handleData(d); }); + _httpRequest.onProgress([this](HttpRequest::Progress p) { + // Return a non-zero value to cancel download + // if onProgress returns false. + return callOnProgress(p) ? 0 : 1; + }); _httpRequest.perform(opt); deinitDownload(); } @@ -150,13 +191,13 @@ AsyncHttpDownload::AsyncHttpDownload(std::string url) void AsyncHttpDownload::start(HttpRequest::RequestOptions opt) { std::lock_guard guard(_mutex); - if (_started) { + if (hasStarted()) { return; } _downloadThread = std::thread([this, opt] { download(opt); }); - _started = true; + markAsStarted(); } void AsyncHttpDownload::cancel() { @@ -167,7 +208,7 @@ void AsyncHttpDownload::cancel() { void AsyncHttpDownload::wait() { std::unique_lock lock(_mutex); _downloadFinishCondition.wait(lock, [this] { - return _finished; + return hasFailed() || hasSucceeded(); }); } @@ -182,21 +223,28 @@ void AsyncHttpDownload::download(HttpRequest::RequestOptions opt) { }); _httpRequest.onProgress([this](HttpRequest::Progress p) { + // Return a non-zero value to cancel download + // if onProgress returns false. std::lock_guard guard(_mutex); + bool shouldContinue = callOnProgress(p); + if (!shouldContinue) { + return 1; + } if (_shouldCancel) { return 1; } if (p.totalBytesKnown && p.downloadedBytes == p.totalBytes) { - _successful = true; + markAsSuccessful(); } return 0; }); lock.release(); _httpRequest.perform(opt); - _finished = true; + if (!hasSucceeded()) { + markAsFailed(); + } _downloadFinishCondition.notify_all(); - lock.lock(); deinitDownload(); }