Work on http synchronization

This commit is contained in:
Emil Axelsson
2017-11-13 23:01:22 +01:00
parent c171e6ae86
commit 81f45aed71
8 changed files with 80 additions and 28 deletions

View File

@@ -35,6 +35,7 @@
#include <vector>
#include <mutex>
#include <atomic>
#include <thread>
namespace openspace {
@@ -125,6 +126,7 @@ private:
class HttpDownload {
public:
HttpDownload();
virtual ~HttpDownload() = default;
using ProgressCallback = std::function<bool(HttpRequest::Progress)>;
void onProgress(ProgressCallback progressCallback);
bool hasStarted();
@@ -151,6 +153,7 @@ private:
class SyncHttpDownload : public virtual HttpDownload {
public:
SyncHttpDownload(std::string url);
virtual ~SyncHttpDownload() = default;
void download(HttpRequest::RequestOptions opt);
protected:
HttpRequest _httpRequest;
@@ -176,6 +179,7 @@ private:
class HttpFileDownload : public virtual HttpDownload {
public:
HttpFileDownload(std::string destination);
virtual ~HttpFileDownload() = default;
protected:
bool initDownload() override;
bool deinitDownload() override;
@@ -187,6 +191,7 @@ private:
class HttpMemoryDownload : public virtual HttpDownload {
public:
virtual ~HttpMemoryDownload() = default;
const std::vector<char>& downloadedData();
protected:
bool initDownload() override;
@@ -200,6 +205,7 @@ private:
class SyncHttpMemoryDownload : public SyncHttpDownload, public HttpMemoryDownload {
public:
SyncHttpMemoryDownload(std::string url);
SyncHttpMemoryDownload(SyncHttpMemoryDownload&&);
virtual ~SyncHttpMemoryDownload() = default;
};
@@ -207,6 +213,7 @@ public:
class SyncHttpFileDownload : public SyncHttpDownload, public HttpFileDownload {
public:
SyncHttpFileDownload(std::string url, std::string destinationPath);
SyncHttpFileDownload(SyncHttpFileDownload&&);
virtual ~SyncHttpFileDownload() = default;
};
@@ -214,6 +221,7 @@ public:
class AsyncHttpMemoryDownload : public AsyncHttpDownload, public HttpMemoryDownload {
public:
AsyncHttpMemoryDownload(std::string url);
AsyncHttpMemoryDownload(AsyncHttpMemoryDownload&&);
virtual ~AsyncHttpMemoryDownload() = default;
};
@@ -221,6 +229,7 @@ public:
class AsyncHttpFileDownload : public AsyncHttpDownload, public HttpFileDownload {
public:
AsyncHttpFileDownload(std::string url, std::string destinationPath);
AsyncHttpFileDownload(AsyncHttpFileDownload&&);
virtual ~AsyncHttpFileDownload() = default;
};

View File

@@ -53,19 +53,21 @@ public:
virtual void cancel() = 0;
virtual void clear() = 0;
virtual float nSynchronizedBytes() = 0;
virtual float nTotalBytes() = 0;
virtual size_t nSynchronizedBytes() = 0;
virtual size_t nTotalBytes() = 0;
virtual bool nTotalBytesIsKnown() = 0;
virtual float progress();
void wait();
bool isResolved();
void resolve();
void reject();
void updateProgress(float t);
private:
std::atomic<bool> _started;
std::atomic<bool> _resolved;
std::atomic<bool> _rejected;
};

View File

@@ -36,6 +36,7 @@
#include <sstream>
#include <fstream>
#include <numeric>
namespace {
const char* _loggerCat = "HttpSynchronization";
@@ -116,11 +117,12 @@ void HttpSynchronization::start() {
_syncThread = std::thread([this, listUrls] {
for (const auto& url : listUrls) {
if (trySyncFromUrl(url)) {
createSyncFile();
resolve();
return;
}
}
//fail();
reject();
});
}
@@ -153,7 +155,7 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
opt.requestTimeoutSeconds = 0;
SyncHttpMemoryDownload fileListDownload(listUrl);
fileListDownload.onProgress([this](HttpRequest::Progress p) {
fileListDownload.onProgress([this](HttpRequest::Progress) {
return !_shouldCancel;
});
fileListDownload.download(opt);
@@ -164,10 +166,21 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
const std::vector<char>& buffer = fileListDownload.downloadedData();
_nSynchronizedBytes = 0;
_nTotalBytes = 0;
_nTotalBytesKnown = false;
std::istringstream fileList(std::string(buffer.begin(), buffer.end()));
std::vector<std::thread> downloadThreads;
std::string line = "";
std::unordered_map<std::string, size_t> fileSizes;
std::mutex fileSizeMutex;
std::atomic_bool startedAllDownloads = false;
std::atomic_size_t nDownloads = 0;
std::vector<AsyncHttpFileDownload> downloads;
while (fileList >> line) {
size_t lastSlash = line.find_last_of('/');
std::string filename = line.substr(lastSlash + 1);
@@ -175,19 +188,43 @@ bool HttpSynchronization::trySyncFromUrl(std::string listUrl) {
std::string fileDestination = directory() +
ghoul::filesystem::FileSystem::PathSeparator +
filename;
std::thread t([opt, line, fileDestination]() {
SyncHttpFileDownload fileDownload(line, fileDestination);
fileDownload.download(opt);
downloads.push_back(AsyncHttpFileDownload(line, fileDestination));
AsyncHttpFileDownload& fileDownload = downloads.back();
bool reportedFileSize = false;
++nDownloads;
fileDownload.onProgress(
[this, line, &reportedFileSize, &fileSizes, &fileSizeMutex,
&startedAllDownloads, &nDownloads](HttpRequest::Progress p)
{
if (!reportedFileSize && p.totalBytesKnown) {
std::lock_guard<std::mutex> guard(fileSizeMutex);
fileSizes[line] = p.totalBytes;
reportedFileSize = true;
if (!_nTotalBytesKnown && startedAllDownloads && fileSizes.size() == nDownloads) {
_nTotalBytesKnown = true;
_nTotalBytes = std::accumulate(fileSizes.begin(), fileSizes.end(), size_t(0),
[](size_t a, auto b) {
return a + b.second;
});
}
}
return !_shouldCancel;
});
downloadThreads.push_back(std::move(t));
}
for (auto& t : downloadThreads) {
t.join();
fileDownload.start(opt);
}
startedAllDownloads = true;
for (auto& d : downloads) {
d.wait();
if (!d.hasSucceeded()) {
return false;
}
}
createSyncFile();
return true;
}
@@ -200,11 +237,11 @@ void HttpSynchronization::createSyncFile() {
syncFile.close();
}
float HttpSynchronization::nSynchronizedBytes() {
return 0;
size_t HttpSynchronization::nSynchronizedBytes() {
return _nSynchronizedBytes;
}
float HttpSynchronization::nTotalBytes() {
size_t HttpSynchronization::nTotalBytes() {
return 0;
}

View File

@@ -46,8 +46,8 @@ public:
void cancel() override;
void clear() override;
float nSynchronizedBytes() override;
float nTotalBytes() override;
size_t nSynchronizedBytes() override;
size_t nTotalBytes() override;
bool nTotalBytesIsKnown() override;
private:
@@ -57,8 +57,8 @@ private:
void createSyncFile();
std::atomic_bool _nTotalBytesKnown;
std::atomic<float> _nTotalBytes;
std::atomic<float> _nSynchronizedBytes;
std::atomic_size_t _nTotalBytes;
std::atomic_size_t _nSynchronizedBytes;
std::atomic_bool _shouldCancel;
std::string _identifier;

View File

@@ -123,11 +123,11 @@ void TorrentSynchronization::clear() {
}
float TorrentSynchronization::nSynchronizedBytes() {
size_t TorrentSynchronization::nSynchronizedBytes() {
return 0;
}
float TorrentSynchronization::nTotalBytes() {
size_t TorrentSynchronization::nTotalBytes() {
return 0;
}

View File

@@ -49,8 +49,8 @@ public:
void cancel() override;
void clear() override;
float nSynchronizedBytes() override;
float nTotalBytes() override;
size_t nSynchronizedBytes() override;
size_t nTotalBytes() override;
bool nTotalBytesIsKnown() override;
private:

View File

@@ -105,10 +105,10 @@ float AssetSynchronizer::assetProgress(Asset* asset) {
for (const auto& sync : syncs) {
if (sync->nTotalBytesIsKnown()) {
return 0;
} else {
nTotalBytes += sync->nTotalBytes();
nSyncedBytes += sync->nSynchronizedBytes();
} else {
return 0;
}
}

View File

@@ -106,6 +106,10 @@ void ResourceSynchronization::resolve() {
_resolved = true;
}
void ResourceSynchronization::reject() {
_rejected = true;
}
float ResourceSynchronization::progress() {
if (!nTotalBytesIsKnown()) {
return 0.f;