Thread safety in http requests

This commit is contained in:
Emil Axelsson
2017-11-14 13:47:22 +01:00
parent c032e4a66e
commit 6a4f938bac
4 changed files with 64 additions and 55 deletions

View File

@@ -33,9 +33,8 @@
#include <memory>
#include <string>
#include <vector>
#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>
namespace openspace {
@@ -125,10 +124,11 @@ private:
class HttpDownload {
public:
HttpDownload();
HttpDownload(HttpDownload&& d);
virtual ~HttpDownload() = default;
using ProgressCallback = std::function<bool(HttpRequest::Progress)>;
HttpDownload();
HttpDownload(HttpDownload&& d) = default;
virtual ~HttpDownload() = default;
void onProgress(ProgressCallback progressCallback);
bool hasStarted();
bool hasFailed();
@@ -144,7 +144,6 @@ protected:
void markAsSuccessful();
void markAsFailed();
private:
std::mutex _onProgressMutex;
ProgressCallback _onProgress;
bool _started = false;
bool _failed = false;
@@ -174,9 +173,10 @@ protected:
private:
HttpRequest _httpRequest;
std::thread _downloadThread;
std::mutex _mutex;
std::mutex _conditionMutex;
std::condition_variable _downloadFinishCondition;
bool _shouldCancel = false;
std::mutex _stateChangeMutex;
};
class HttpFileDownload : public virtual HttpDownload {
@@ -212,7 +212,6 @@ private:
class SyncHttpMemoryDownload : public SyncHttpDownload, public HttpMemoryDownload {
public:
SyncHttpMemoryDownload(std::string url);
SyncHttpMemoryDownload(SyncHttpMemoryDownload&&) = default;
virtual ~SyncHttpMemoryDownload() = default;
};
@@ -220,7 +219,6 @@ public:
class SyncHttpFileDownload : public SyncHttpDownload, public HttpFileDownload {
public:
SyncHttpFileDownload(std::string url, std::string destinationPath);
SyncHttpFileDownload(SyncHttpFileDownload&&) = default;
virtual ~SyncHttpFileDownload() = default;
};
@@ -228,7 +226,6 @@ public:
class AsyncHttpMemoryDownload : public AsyncHttpDownload, public HttpMemoryDownload {
public:
AsyncHttpMemoryDownload(std::string url);
AsyncHttpMemoryDownload(AsyncHttpMemoryDownload&&) = default;
virtual ~AsyncHttpMemoryDownload() = default;
};
@@ -236,7 +233,6 @@ public:
class AsyncHttpFileDownload : public AsyncHttpDownload, public HttpFileDownload {
public:
AsyncHttpFileDownload(std::string url, std::string destinationPath);
AsyncHttpFileDownload(AsyncHttpFileDownload&&) = default;
virtual ~AsyncHttpFileDownload() = default;
};

View File

@@ -37,6 +37,7 @@
#include <sstream>
#include <fstream>
#include <numeric>
#include <memory>
namespace {
const char* _loggerCat = "HttpSynchronization";
@@ -71,6 +72,13 @@ HttpSynchronization::HttpSynchronization(const ghoul::Dictionary& dict)
_synchronizationRepositories = syncModule->httpSynchronizationRepositories();
}
HttpSynchronization::~HttpSynchronization() {
if (_syncThread.joinable()) {
cancel();
_syncThread.join();
}
}
documentation::Documentation HttpSynchronization::Documentation() {
using namespace openspace::documentation;
return {
@@ -127,9 +135,11 @@ void HttpSynchronization::start() {
}
void HttpSynchronization::cancel() {
_shouldCancel = true;
}
void HttpSynchronization::clear() {
// TODO: Remove all files from directory.
}
std::vector<std::string> HttpSynchronization::fileListUrls() {
@@ -165,7 +175,6 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
}
const std::vector<char>& buffer = fileListDownload.downloadedData();
_nSynchronizedBytes = 0;
_nTotalBytes = 0;
_nTotalBytesKnown = false;
@@ -179,7 +188,7 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
std::atomic_bool startedAllDownloads = false;
std::atomic_size_t nDownloads = 0;
std::vector<AsyncHttpFileDownload> downloads;
std::vector<std::unique_ptr<AsyncHttpFileDownload>> downloads;
while (fileList >> line) {
size_t lastSlash = line.find_last_of('/');
@@ -189,20 +198,18 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
ghoul::filesystem::FileSystem::PathSeparator +
filename;
downloads.push_back(AsyncHttpFileDownload(line, fileDestination));
AsyncHttpFileDownload& fileDownload = downloads.back();
downloads.push_back(std::make_unique<AsyncHttpFileDownload>(line, fileDestination));
auto& fileDownload = downloads.back();
bool reportedFileSize = false;
++nDownloads;
fileDownload.onProgress(
[this, line, &reportedFileSize, &fileSizes, &fileSizeMutex,
fileDownload->onProgress(
[this, line, &fileSizes, &fileSizeMutex,
&startedAllDownloads, &nDownloads](HttpRequest::Progress p)
{
if (!reportedFileSize && p.totalBytesKnown) {
if (p.totalBytesKnown) {
std::lock_guard<std::mutex> guard(fileSizeMutex);
fileSizes[line] = p.totalBytes;
reportedFileSize = true;
if (!_nTotalBytesKnown && startedAllDownloads && fileSizes.size() == nDownloads) {
_nTotalBytesKnown = true;
@@ -215,17 +222,24 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
return !_shouldCancel;
});
fileDownload.start(opt);
fileDownload->start(opt);
}
startedAllDownloads = true;
bool failed = false;
for (auto& d : downloads) {
d.wait();
if (!d.hasSucceeded()) {
return false;
d->wait();
if (!d->hasSucceeded()) {
failed = true;
}
}
return true;
if (!failed) {
return true;
}
for (auto& d : downloads) {
d->cancel();
}
return false;
}
void HttpSynchronization::createSyncFile() {
@@ -242,11 +256,11 @@ size_t HttpSynchronization::nSynchronizedBytes() {
}
size_t HttpSynchronization::nTotalBytes() {
return 0;
return _nTotalBytes;
}
bool HttpSynchronization::nTotalBytesIsKnown() {
return false;
return _nTotalBytesKnown;
}
} // namespace openspace

View File

@@ -35,7 +35,7 @@ namespace openspace {
class HttpSynchronization : public ResourceSynchronization {
public:
HttpSynchronization(const ghoul::Dictionary& dict);
virtual ~HttpSynchronization() = default;
virtual ~HttpSynchronization();
static documentation::Documentation Documentation();
@@ -58,7 +58,7 @@ private:
std::atomic_size_t _nTotalBytes;
std::atomic_size_t _nSynchronizedBytes;
std::atomic_bool _shouldCancel;
std::atomic_bool _shouldCancel = false;
std::string _identifier;
int _version;
std::string _synchronizationRoot;

View File

@@ -70,9 +70,6 @@ int progressCallback(
static_cast<size_t>(nDownloadedBytes)
}
);
LINFO("Transfer info from curl: " << nDownloadedBytes << " out of " << nTotalBytes);
return 0;
}
size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userData) {
@@ -129,22 +126,14 @@ void HttpRequest::perform(RequestOptions opt) {
void HttpRequest::setReadyState(openspace::HttpRequest::ReadyState state) {
_readyState = state;
_onReadyStateChange(state);
}
HttpDownload::HttpDownload()
: _onProgress([] (HttpRequest::Progress) { return true; })
{}
HttpDownload::HttpDownload(HttpDownload&& d)
: _onProgress(std::move(d._onProgress))
, _started(std::move(d._started))
, _failed(std::move(d._failed))
, _successful(std::move(d._successful))
{}
void HttpDownload::onProgress(ProgressCallback progressCallback) {
std::lock_guard<std::mutex> guard(_onProgressMutex);
_onProgress = progressCallback;
}
@@ -171,7 +160,6 @@ void HttpDownload::markAsSuccessful() {
}
bool HttpDownload::callOnProgress(HttpRequest::Progress p) {
std::lock_guard<std::mutex> guard(_onProgressMutex);
return _onProgress(p);
}
@@ -189,6 +177,13 @@ void SyncHttpDownload::download(HttpRequest::RequestOptions opt) {
// if onProgress returns false.
return callOnProgress(p) ? 0 : 1;
});
_httpRequest.onReadyStateChange([this](HttpRequest::ReadyState rs) {
if (rs == HttpRequest::ReadyState::Success) {
markAsSuccessful();
} else if (rs == HttpRequest::ReadyState::Fail) {
markAsFailed();
}
});
_httpRequest.perform(opt);
deinitDownload();
}
@@ -204,42 +199,41 @@ AsyncHttpDownload::AsyncHttpDownload(AsyncHttpDownload&& d)
{}
void AsyncHttpDownload::start(HttpRequest::RequestOptions opt) {
std::lock_guard<std::mutex> guard(_mutex);
std::lock_guard<std::mutex> guard(_stateChangeMutex);
if (hasStarted()) {
return;
}
markAsStarted();
_downloadThread = std::thread([this, opt] {
download(opt);
});
markAsStarted();
}
void AsyncHttpDownload::cancel() {
std::lock_guard<std::mutex> guard(_mutex);
_shouldCancel = true;
}
void AsyncHttpDownload::wait() {
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<std::mutex> lock(_conditionMutex);
_downloadFinishCondition.wait(lock, [this] {
return hasFailed() || hasSucceeded();
});
if (_downloadThread.joinable()) {
_downloadThread.join();
}
}
void AsyncHttpDownload::download(HttpRequest::RequestOptions opt) {
std::unique_lock<std::mutex> lock(_mutex);
initDownload();
_httpRequest.onData([this](HttpRequest::Data d) {
std::lock_guard<std::mutex> guard(_mutex);
return handleData(d);
});
_httpRequest.onProgress([this](HttpRequest::Progress p) {
// Return a non-zero value to cancel download
// if onProgress returns false.
std::lock_guard<std::mutex> guard(_mutex);
//std::lock_guard<std::mutex> guard(_mutex);
bool shouldContinue = callOnProgress(p);
if (!shouldContinue) {
return 1;
@@ -247,19 +241,23 @@ void AsyncHttpDownload::download(HttpRequest::RequestOptions opt) {
if (_shouldCancel) {
return 1;
}
if (p.totalBytesKnown && p.downloadedBytes == p.totalBytes) {
markAsSuccessful();
}
return 0;
});
lock.release();
_httpRequest.onReadyStateChange([this](HttpRequest::ReadyState rs) {
if (rs == HttpRequest::ReadyState::Success) {
markAsSuccessful();
}
else if (rs == HttpRequest::ReadyState::Fail) {
markAsFailed();
}
});
_httpRequest.perform(opt);
if (!hasSucceeded()) {
markAsFailed();
}
_downloadFinishCondition.notify_all();
lock.lock();
deinitDownload();
}
@@ -331,4 +329,5 @@ AsyncHttpFileDownload::AsyncHttpFileDownload(std::string url, std::string destin
, HttpFileDownload(std::move(destinationPath))
{}
} // namespace openspace