mirror of
https://github.com/Kitware/CMake.git
synced 2026-01-09 15:20:56 -06:00
Autogen: Redo locking and state machine for fence handling and the worker pool
(1) All CV use must hold the corresponding mutex, otherwise race conditions happen. This is mandated by the C++ standard. (2) Introduce a separate CV for the thread waiting for other jobs to finish before running a fence. This avoids waking up all other workers blindly. Correctly wake that thread up when the processing of outstanding jobs is done. (3) Split the waiting for a fence to become runnable from a fence is pending. This avoids problems if more than one fence can end up on the queue. The thread that took a fence off the queue is responsible for clearing the fence processing flag.
This commit is contained in:
committed by
Brad King
parent
9ec09f22ff
commit
01ad902588
@@ -469,11 +469,9 @@ void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
|
||||
|
||||
void cmWorkerPoolWorker::UVProcessFinished()
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(Proc_.Mutex);
|
||||
if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
|
||||
Proc_.ROP.reset();
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(Proc_.Mutex);
|
||||
if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
|
||||
Proc_.ROP.reset();
|
||||
}
|
||||
// Notify idling thread
|
||||
Proc_.Condition.notify_one();
|
||||
@@ -532,6 +530,7 @@ public:
|
||||
unsigned int JobsProcessing = 0;
|
||||
std::deque<cmWorkerPool::JobHandleT> Queue;
|
||||
std::condition_variable Condition;
|
||||
std::condition_variable ConditionFence;
|
||||
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
|
||||
|
||||
// -- References
|
||||
@@ -593,19 +592,12 @@ bool cmWorkerPoolInternal::Process()
|
||||
|
||||
void cmWorkerPoolInternal::Abort()
|
||||
{
|
||||
bool notifyThreads = false;
|
||||
// Clear all jobs and set abort flag
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(Mutex);
|
||||
if (Processing && !Aborting) {
|
||||
// Register abort and clear queue
|
||||
Aborting = true;
|
||||
Queue.clear();
|
||||
notifyThreads = true;
|
||||
}
|
||||
}
|
||||
if (notifyThreads) {
|
||||
// Wake threads
|
||||
std::lock_guard<std::mutex> guard(Mutex);
|
||||
if (!Aborting) {
|
||||
// Register abort and clear queue
|
||||
Aborting = true;
|
||||
Queue.clear();
|
||||
Condition.notify_all();
|
||||
}
|
||||
}
|
||||
@@ -669,7 +661,7 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
|
||||
if (Aborting) {
|
||||
break;
|
||||
}
|
||||
// Wait for new jobs
|
||||
// Wait for new jobs on the main CV
|
||||
if (Queue.empty()) {
|
||||
++WorkersIdle;
|
||||
Condition.wait(uLock);
|
||||
@@ -677,20 +669,34 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check for fence jobs
|
||||
if (FenceProcessing || Queue.front()->IsFence()) {
|
||||
if (JobsProcessing != 0) {
|
||||
Condition.wait(uLock);
|
||||
continue;
|
||||
}
|
||||
// No jobs get processed. Set the fence job processing flag.
|
||||
FenceProcessing = true;
|
||||
// If there is a fence currently active or waiting,
|
||||
// sleep on the main CV and try again.
|
||||
if (FenceProcessing) {
|
||||
Condition.wait(uLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Pop next job from queue
|
||||
jobHandle = std::move(Queue.front());
|
||||
Queue.pop_front();
|
||||
|
||||
// Check for fence jobs
|
||||
bool raisedFence = false;
|
||||
if (jobHandle->IsFence()) {
|
||||
FenceProcessing = true;
|
||||
raisedFence = true;
|
||||
// Wait on the Fence CV until all pending jobs are done.
|
||||
while (JobsProcessing != 0 && !Aborting) {
|
||||
ConditionFence.wait(uLock);
|
||||
}
|
||||
// When aborting, explicitly kick all threads alive once more.
|
||||
if (Aborting) {
|
||||
FenceProcessing = false;
|
||||
Condition.notify_all();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Unlocked scope for job processing
|
||||
++JobsProcessing;
|
||||
{
|
||||
@@ -701,11 +707,18 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
|
||||
}
|
||||
--JobsProcessing;
|
||||
|
||||
// Was this a fence job?
|
||||
if (FenceProcessing) {
|
||||
// If this was the thread that entered fence processing
|
||||
// originally, notify all idling workers that the fence
|
||||
// is done.
|
||||
if (raisedFence) {
|
||||
FenceProcessing = false;
|
||||
Condition.notify_all();
|
||||
}
|
||||
// If fence processing is still not done, notify the
|
||||
// the fencing worker when all active jobs are done.
|
||||
if (FenceProcessing && JobsProcessing == 0) {
|
||||
ConditionFence.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
// Decrement running workers count
|
||||
|
||||
Reference in New Issue
Block a user