cmWorkerPool: Factor our worker thread class (internals)

This moves the `cmWorkerPoolInternal::WorkerT` class to `cmWorkerPoolWorker`
and changes the thread start interface to make it independent of the
`cmWorkerPoolInternal` type.
This commit is contained in:
Sebastian Holtermann
2019-04-24 11:54:56 +02:00
parent 9794b72d38
commit 56890ede2a

View File

@@ -370,88 +370,144 @@ void cmUVReadOnlyProcess::UVTryFinish()
FinishedCallback_();
}
/**
* @brief Worker pool worker thread
*/
class cmWorkerPoolWorker
{
public:
cmWorkerPoolWorker(uv_loop_t& uvLoop);
~cmWorkerPoolWorker();
cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
/**
* Set the internal thread
*/
void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
/**
* Run an external process
*/
bool RunProcess(cmWorkerPool::ProcessResultT& result,
std::vector<std::string> const& command,
std::string const& workingDirectory);
private:
// -- Libuv callbacks
static void UVProcessStart(uv_async_t* handle);
void UVProcessFinished();
private:
// -- Process management
struct
{
std::mutex Mutex;
cm::uv_async_ptr Request;
std::condition_variable Condition;
std::unique_ptr<cmUVReadOnlyProcess> ROP;
} Proc_;
// -- System thread
std::thread Thread_;
};
cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
{
Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
}
cmWorkerPoolWorker::~cmWorkerPoolWorker()
{
if (Thread_.joinable()) {
Thread_.join();
}
}
bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
std::vector<std::string> const& command,
std::string const& workingDirectory)
{
if (command.empty()) {
return false;
}
// Create process instance
{
std::lock_guard<std::mutex> lock(Proc_.Mutex);
Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
Proc_.ROP->setup(&result, true, command, workingDirectory);
}
// Send asynchronous process start request to libuv loop
Proc_.Request.send();
// Wait until the process has been finished and destroyed
{
std::unique_lock<std::mutex> ulock(Proc_.Mutex);
while (Proc_.ROP) {
Proc_.Condition.wait(ulock);
}
}
return !result.error();
}
void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
{
auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
bool startFailed = false;
{
auto& Proc = wrk->Proc_;
std::lock_guard<std::mutex> lock(Proc.Mutex);
if (Proc.ROP && !Proc.ROP->IsStarted()) {
startFailed =
!Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
}
}
// Clean up if starting of the process failed
if (startFailed) {
wrk->UVProcessFinished();
}
}
void cmWorkerPoolWorker::UVProcessFinished()
{
{
std::lock_guard<std::mutex> lock(Proc_.Mutex);
if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
Proc_.ROP.reset();
}
}
// Notify idling thread
Proc_.Condition.notify_one();
}
/**
* @brief Private worker pool internals
*/
class cmWorkerPoolInternal
{
public:
// -- Types
/**
* @brief Worker thread
*/
class WorkerT
{
public:
WorkerT(unsigned int index);
~WorkerT();
WorkerT(WorkerT const&) = delete;
WorkerT& operator=(WorkerT const&) = delete;
/**
* Start the thread
*/
void Start(cmWorkerPoolInternal* internal);
/**
* @brief Run an external process
*/
bool RunProcess(cmWorkerPool::ProcessResultT& result,
std::vector<std::string> const& command,
std::string const& workingDirectory);
// -- Accessors
unsigned int Index() const { return Index_; }
cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; }
private:
// -- Libuv callbacks
static void UVProcessStart(uv_async_t* handle);
void UVProcessFinished();
private:
//! @brief Job handle
cmWorkerPool::JobHandleT JobHandle_;
//! @brief Worker index
unsigned int Index_;
// -- Process management
struct
{
std::mutex Mutex;
cm::uv_async_ptr Request;
std::condition_variable Condition;
std::unique_ptr<cmUVReadOnlyProcess> ROP;
} Proc_;
// -- System thread
std::thread Thread_;
};
public:
// -- Constructors
cmWorkerPoolInternal(cmWorkerPool* pool);
~cmWorkerPoolInternal();
/**
* @brief Runs the libuv loop
* Runs the libuv loop.
*/
bool Process();
/**
* @brief Clear queue and abort threads
* Clear queue and abort threads.
*/
void Abort();
/**
* @brief Push a job to the queue and notify a worker
* Push a job to the queue and notify a worker.
*/
bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
/**
* @brief Worker thread main loop method
* Worker thread main loop method.
*/
void Work(WorkerT* worker);
void Work(unsigned int workerIndex);
// -- Request slots
static void UVSlotBegin(uv_async_t* handle);
@@ -476,85 +532,12 @@ public:
unsigned int JobsProcessing = 0;
std::deque<cmWorkerPool::JobHandleT> Queue;
std::condition_variable Condition;
std::vector<std::unique_ptr<WorkerT>> Workers;
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
// -- References
cmWorkerPool* Pool = nullptr;
};
cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index)
: Index_(index)
{
}
cmWorkerPoolInternal::WorkerT::~WorkerT()
{
if (Thread_.joinable()) {
Thread_.join();
}
}
void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal)
{
Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this);
Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this);
}
bool cmWorkerPoolInternal::WorkerT::RunProcess(
cmWorkerPool::ProcessResultT& result,
std::vector<std::string> const& command, std::string const& workingDirectory)
{
if (command.empty()) {
return false;
}
// Create process instance
{
std::lock_guard<std::mutex> lock(Proc_.Mutex);
Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
Proc_.ROP->setup(&result, true, command, workingDirectory);
}
// Send asynchronous process start request to libuv loop
Proc_.Request.send();
// Wait until the process has been finished and destroyed
{
std::unique_lock<std::mutex> ulock(Proc_.Mutex);
while (Proc_.ROP) {
Proc_.Condition.wait(ulock);
}
}
return !result.error();
}
void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
{
auto* wrk = reinterpret_cast<WorkerT*>(handle->data);
bool startFailed = false;
{
auto& Proc = wrk->Proc_;
std::lock_guard<std::mutex> lock(Proc.Mutex);
if (Proc.ROP && !Proc.ROP->IsStarted()) {
startFailed =
!Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
}
}
// Clean up if starting of the process failed
if (startFailed) {
wrk->UVProcessFinished();
}
}
void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
{
{
std::lock_guard<std::mutex> lock(Proc_.Mutex);
if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
Proc_.ROP.reset();
}
}
// Notify idling thread
Proc_.Condition.notify_one();
}
void cmWorkerPool::ProcessResultT::reset()
{
ExitStatus = 0;
@@ -652,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
// Create workers
gint.Workers.reserve(num);
for (unsigned int ii = 0; ii != num; ++ii) {
gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii));
gint.Workers.emplace_back(
cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
}
// Start workers
for (auto& wrk : gint.Workers) {
wrk->Start(&gint);
// Start worker threads
for (unsigned int ii = 0; ii != num; ++ii) {
gint.Workers[ii]->SetThread(
std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
}
}
// Destroy begin request
@@ -672,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
gint.UVRequestEnd.reset();
}
void cmWorkerPoolInternal::Work(WorkerT* worker)
void cmWorkerPoolInternal::Work(unsigned int workerIndex)
{
cmWorkerPool::JobHandleT jobHandle;
std::unique_lock<std::mutex> uLock(Mutex);
// Increment running workers count
++WorkersRunning;
@@ -702,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker)
}
// Pop next job from queue
worker->JobHandle() = std::move(Queue.front());
jobHandle = std::move(Queue.front());
Queue.pop_front();
// Unlocked scope for job processing
++JobsProcessing;
{
uLock.unlock();
worker->JobHandle()->Work(Pool, worker->Index()); // Process job
worker->JobHandle().reset(); // Destroy job
jobHandle->Work(Pool, workerIndex); // Process job
jobHandle.reset(); // Destroy job
uLock.lock();
}
--JobsProcessing;