mirror of
https://github.com/Kitware/CMake.git
synced 2026-04-27 09:29:15 -05:00
Merge topic 'cmWorkerPool_Tweaks'
56890ede2acmWorkerPool: Factor our worker thread class (internals)9794b72d38cmWorkerPool: Set worker thread count separately to Process() Acked-by: Kitware Robot <kwrobot@kitware.com> Merge-request: !3260
This commit is contained in:
@@ -1186,6 +1186,7 @@ bool cmQtAutoMocUic::Init(cmMakefile* makefile)
|
||||
num = std::min<unsigned long>(num, ParallelMax);
|
||||
Base_.NumThreads = static_cast<unsigned int>(num);
|
||||
}
|
||||
WorkerPool_.SetThreadCount(Base_.NumThreads);
|
||||
}
|
||||
|
||||
// - Files and directories
|
||||
@@ -1482,15 +1483,12 @@ bool cmQtAutoMocUic::Process()
|
||||
if (!CreateDirectories()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!WorkerPool_.Process(Base().NumThreads, this)) {
|
||||
if (!WorkerPool_.Process(this)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (JobError_) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return SettingsFileWrite();
|
||||
}
|
||||
|
||||
|
||||
+132
-139
@@ -371,137 +371,62 @@ void cmUVReadOnlyProcess::UVTryFinish()
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Private worker pool internals
|
||||
* @brief Worker pool worker thread
|
||||
*/
|
||||
class cmWorkerPoolInternal
|
||||
class cmWorkerPoolWorker
|
||||
{
|
||||
public:
|
||||
// -- Types
|
||||
cmWorkerPoolWorker(uv_loop_t& uvLoop);
|
||||
~cmWorkerPoolWorker();
|
||||
|
||||
cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
|
||||
cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
|
||||
|
||||
/**
|
||||
* @brief Worker thread
|
||||
* Set the internal thread
|
||||
*/
|
||||
class WorkerT
|
||||
void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
|
||||
|
||||
/**
|
||||
* Run an external process
|
||||
*/
|
||||
bool RunProcess(cmWorkerPool::ProcessResultT& result,
|
||||
std::vector<std::string> const& command,
|
||||
std::string const& workingDirectory);
|
||||
|
||||
private:
|
||||
// -- Libuv callbacks
|
||||
static void UVProcessStart(uv_async_t* handle);
|
||||
void UVProcessFinished();
|
||||
|
||||
private:
|
||||
// -- Process management
|
||||
struct
|
||||
{
|
||||
public:
|
||||
WorkerT(unsigned int index);
|
||||
~WorkerT();
|
||||
|
||||
WorkerT(WorkerT const&) = delete;
|
||||
WorkerT& operator=(WorkerT const&) = delete;
|
||||
|
||||
/**
|
||||
* Start the thread
|
||||
*/
|
||||
void Start(cmWorkerPoolInternal* internal);
|
||||
|
||||
/**
|
||||
* @brief Run an external process
|
||||
*/
|
||||
bool RunProcess(cmWorkerPool::ProcessResultT& result,
|
||||
std::vector<std::string> const& command,
|
||||
std::string const& workingDirectory);
|
||||
|
||||
// -- Accessors
|
||||
unsigned int Index() const { return Index_; }
|
||||
cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; }
|
||||
|
||||
private:
|
||||
// -- Libuv callbacks
|
||||
static void UVProcessStart(uv_async_t* handle);
|
||||
void UVProcessFinished();
|
||||
|
||||
private:
|
||||
//! @brief Job handle
|
||||
cmWorkerPool::JobHandleT JobHandle_;
|
||||
//! @brief Worker index
|
||||
unsigned int Index_;
|
||||
// -- Process management
|
||||
struct
|
||||
{
|
||||
std::mutex Mutex;
|
||||
cm::uv_async_ptr Request;
|
||||
std::condition_variable Condition;
|
||||
std::unique_ptr<cmUVReadOnlyProcess> ROP;
|
||||
} Proc_;
|
||||
// -- System thread
|
||||
std::thread Thread_;
|
||||
};
|
||||
|
||||
public:
|
||||
// -- Constructors
|
||||
cmWorkerPoolInternal(cmWorkerPool* pool);
|
||||
~cmWorkerPoolInternal();
|
||||
|
||||
/**
|
||||
* @brief Runs the libuv loop
|
||||
*/
|
||||
bool Process();
|
||||
|
||||
/**
|
||||
* @brief Clear queue and abort threads
|
||||
*/
|
||||
void Abort();
|
||||
|
||||
/**
|
||||
* @brief Push a job to the queue and notify a worker
|
||||
*/
|
||||
bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
|
||||
|
||||
/**
|
||||
* @brief Worker thread main loop method
|
||||
*/
|
||||
void Work(WorkerT* worker);
|
||||
|
||||
// -- Request slots
|
||||
static void UVSlotBegin(uv_async_t* handle);
|
||||
static void UVSlotEnd(uv_async_t* handle);
|
||||
|
||||
public:
|
||||
// -- UV loop
|
||||
#ifdef CMAKE_UV_SIGNAL_HACK
|
||||
std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
|
||||
#endif
|
||||
std::unique_ptr<uv_loop_t> UVLoop;
|
||||
cm::uv_async_ptr UVRequestBegin;
|
||||
cm::uv_async_ptr UVRequestEnd;
|
||||
|
||||
// -- Thread pool and job queue
|
||||
std::mutex Mutex;
|
||||
bool Aborting = false;
|
||||
bool FenceProcessing = false;
|
||||
unsigned int WorkersRunning = 0;
|
||||
unsigned int WorkersIdle = 0;
|
||||
unsigned int JobsProcessing = 0;
|
||||
std::deque<cmWorkerPool::JobHandleT> Queue;
|
||||
std::condition_variable Condition;
|
||||
std::vector<std::unique_ptr<WorkerT>> Workers;
|
||||
|
||||
// -- References
|
||||
cmWorkerPool* Pool = nullptr;
|
||||
std::mutex Mutex;
|
||||
cm::uv_async_ptr Request;
|
||||
std::condition_variable Condition;
|
||||
std::unique_ptr<cmUVReadOnlyProcess> ROP;
|
||||
} Proc_;
|
||||
// -- System thread
|
||||
std::thread Thread_;
|
||||
};
|
||||
|
||||
cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index)
|
||||
: Index_(index)
|
||||
cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
|
||||
{
|
||||
Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
|
||||
}
|
||||
|
||||
cmWorkerPoolInternal::WorkerT::~WorkerT()
|
||||
cmWorkerPoolWorker::~cmWorkerPoolWorker()
|
||||
{
|
||||
if (Thread_.joinable()) {
|
||||
Thread_.join();
|
||||
}
|
||||
}
|
||||
|
||||
void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal)
|
||||
{
|
||||
Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this);
|
||||
Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this);
|
||||
}
|
||||
|
||||
bool cmWorkerPoolInternal::WorkerT::RunProcess(
|
||||
cmWorkerPool::ProcessResultT& result,
|
||||
std::vector<std::string> const& command, std::string const& workingDirectory)
|
||||
bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
|
||||
std::vector<std::string> const& command,
|
||||
std::string const& workingDirectory)
|
||||
{
|
||||
if (command.empty()) {
|
||||
return false;
|
||||
@@ -524,9 +449,9 @@ bool cmWorkerPoolInternal::WorkerT::RunProcess(
|
||||
return !result.error();
|
||||
}
|
||||
|
||||
void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
|
||||
void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
|
||||
{
|
||||
auto* wrk = reinterpret_cast<WorkerT*>(handle->data);
|
||||
auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
|
||||
bool startFailed = false;
|
||||
{
|
||||
auto& Proc = wrk->Proc_;
|
||||
@@ -542,7 +467,7 @@ void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
|
||||
}
|
||||
}
|
||||
|
||||
void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
|
||||
void cmWorkerPoolWorker::UVProcessFinished()
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(Proc_.Mutex);
|
||||
@@ -554,6 +479,65 @@ void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
|
||||
Proc_.Condition.notify_one();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Private worker pool internals
|
||||
*/
|
||||
class cmWorkerPoolInternal
|
||||
{
|
||||
public:
|
||||
// -- Constructors
|
||||
cmWorkerPoolInternal(cmWorkerPool* pool);
|
||||
~cmWorkerPoolInternal();
|
||||
|
||||
/**
|
||||
* Runs the libuv loop.
|
||||
*/
|
||||
bool Process();
|
||||
|
||||
/**
|
||||
* Clear queue and abort threads.
|
||||
*/
|
||||
void Abort();
|
||||
|
||||
/**
|
||||
* Push a job to the queue and notify a worker.
|
||||
*/
|
||||
bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
|
||||
|
||||
/**
|
||||
* Worker thread main loop method.
|
||||
*/
|
||||
void Work(unsigned int workerIndex);
|
||||
|
||||
// -- Request slots
|
||||
static void UVSlotBegin(uv_async_t* handle);
|
||||
static void UVSlotEnd(uv_async_t* handle);
|
||||
|
||||
public:
|
||||
// -- UV loop
|
||||
#ifdef CMAKE_UV_SIGNAL_HACK
|
||||
std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
|
||||
#endif
|
||||
std::unique_ptr<uv_loop_t> UVLoop;
|
||||
cm::uv_async_ptr UVRequestBegin;
|
||||
cm::uv_async_ptr UVRequestEnd;
|
||||
|
||||
// -- Thread pool and job queue
|
||||
std::mutex Mutex;
|
||||
bool Processing = false;
|
||||
bool Aborting = false;
|
||||
bool FenceProcessing = false;
|
||||
unsigned int WorkersRunning = 0;
|
||||
unsigned int WorkersIdle = 0;
|
||||
unsigned int JobsProcessing = 0;
|
||||
std::deque<cmWorkerPool::JobHandleT> Queue;
|
||||
std::condition_variable Condition;
|
||||
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
|
||||
|
||||
// -- References
|
||||
cmWorkerPool* Pool = nullptr;
|
||||
};
|
||||
|
||||
void cmWorkerPool::ProcessResultT::reset()
|
||||
{
|
||||
ExitStatus = 0;
|
||||
@@ -591,7 +575,8 @@ cmWorkerPoolInternal::~cmWorkerPoolInternal()
|
||||
|
||||
bool cmWorkerPoolInternal::Process()
|
||||
{
|
||||
// Reset state
|
||||
// Reset state flags
|
||||
Processing = true;
|
||||
Aborting = false;
|
||||
// Initialize libuv asynchronous request
|
||||
UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this);
|
||||
@@ -599,23 +584,27 @@ bool cmWorkerPoolInternal::Process()
|
||||
// Send begin request
|
||||
UVRequestBegin.send();
|
||||
// Run libuv loop
|
||||
return (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
|
||||
bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
|
||||
// Update state flags
|
||||
Processing = false;
|
||||
Aborting = false;
|
||||
return success;
|
||||
}
|
||||
|
||||
void cmWorkerPoolInternal::Abort()
|
||||
{
|
||||
bool firstCall = false;
|
||||
bool notifyThreads = false;
|
||||
// Clear all jobs and set abort flag
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(Mutex);
|
||||
if (!Aborting) {
|
||||
if (Processing && !Aborting) {
|
||||
// Register abort and clear queue
|
||||
Aborting = true;
|
||||
Queue.clear();
|
||||
firstCall = true;
|
||||
notifyThreads = true;
|
||||
}
|
||||
}
|
||||
if (firstCall) {
|
||||
if (notifyThreads) {
|
||||
// Wake threads
|
||||
Condition.notify_all();
|
||||
}
|
||||
@@ -627,15 +616,13 @@ inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
|
||||
if (Aborting) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Append the job to the queue
|
||||
Queue.emplace_back(std::move(jobHandle));
|
||||
|
||||
// Notify an idle worker if there's one
|
||||
if (WorkersIdle != 0) {
|
||||
Condition.notify_one();
|
||||
}
|
||||
|
||||
// Return success
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -648,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
|
||||
// Create workers
|
||||
gint.Workers.reserve(num);
|
||||
for (unsigned int ii = 0; ii != num; ++ii) {
|
||||
gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii));
|
||||
gint.Workers.emplace_back(
|
||||
cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
|
||||
}
|
||||
// Start workers
|
||||
for (auto& wrk : gint.Workers) {
|
||||
wrk->Start(&gint);
|
||||
// Start worker threads
|
||||
for (unsigned int ii = 0; ii != num; ++ii) {
|
||||
gint.Workers[ii]->SetThread(
|
||||
std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
|
||||
}
|
||||
}
|
||||
// Destroy begin request
|
||||
@@ -668,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
|
||||
gint.UVRequestEnd.reset();
|
||||
}
|
||||
|
||||
void cmWorkerPoolInternal::Work(WorkerT* worker)
|
||||
void cmWorkerPoolInternal::Work(unsigned int workerIndex)
|
||||
{
|
||||
cmWorkerPool::JobHandleT jobHandle;
|
||||
std::unique_lock<std::mutex> uLock(Mutex);
|
||||
// Increment running workers count
|
||||
++WorkersRunning;
|
||||
@@ -698,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker)
|
||||
}
|
||||
|
||||
// Pop next job from queue
|
||||
worker->JobHandle() = std::move(Queue.front());
|
||||
jobHandle = std::move(Queue.front());
|
||||
Queue.pop_front();
|
||||
|
||||
// Unlocked scope for job processing
|
||||
++JobsProcessing;
|
||||
{
|
||||
uLock.unlock();
|
||||
worker->JobHandle()->Work(Pool, worker->Index()); // Process job
|
||||
worker->JobHandle().reset(); // Destroy job
|
||||
jobHandle->Work(Pool, workerIndex); // Process job
|
||||
jobHandle.reset(); // Destroy job
|
||||
uLock.lock();
|
||||
}
|
||||
--JobsProcessing;
|
||||
@@ -743,19 +733,22 @@ cmWorkerPool::cmWorkerPool()
|
||||
|
||||
cmWorkerPool::~cmWorkerPool() = default;
|
||||
|
||||
bool cmWorkerPool::Process(unsigned int threadCount, void* userData)
|
||||
void cmWorkerPool::SetThreadCount(unsigned int threadCount)
|
||||
{
|
||||
if (!Int_->Processing) {
|
||||
ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
|
||||
}
|
||||
}
|
||||
|
||||
bool cmWorkerPool::Process(void* userData)
|
||||
{
|
||||
// Setup user data
|
||||
UserData_ = userData;
|
||||
ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
|
||||
|
||||
// Run libuv loop
|
||||
bool success = Int_->Process();
|
||||
|
||||
// Clear user data
|
||||
UserData_ = nullptr;
|
||||
ThreadCount_ = 0;
|
||||
|
||||
// Return
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
+27
-20
@@ -50,12 +50,12 @@ public:
|
||||
JobT& operator=(JobT const&) = delete;
|
||||
|
||||
/**
|
||||
* @brief Virtual destructor.
|
||||
* Virtual destructor.
|
||||
*/
|
||||
virtual ~JobT();
|
||||
|
||||
/**
|
||||
* @brief Fence job flag
|
||||
* Fence job flag
|
||||
*
|
||||
* Fence jobs require that:
|
||||
* - all jobs before in the queue have been processed
|
||||
@@ -66,7 +66,7 @@ public:
|
||||
|
||||
protected:
|
||||
/**
|
||||
* @brief Protected default constructor
|
||||
* Protected default constructor
|
||||
*/
|
||||
JobT(bool fence = false)
|
||||
: Fence_(fence)
|
||||
@@ -125,12 +125,12 @@ public:
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Job handle type
|
||||
* Job handle type
|
||||
*/
|
||||
typedef std::unique_ptr<JobT> JobHandleT;
|
||||
|
||||
/**
|
||||
* @brief Fence job base class
|
||||
* Fence job base class
|
||||
*/
|
||||
class JobFenceT : public JobT
|
||||
{
|
||||
@@ -144,8 +144,9 @@ public:
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fence job that aborts the worker pool.
|
||||
* This class is useful as the last job in the job queue.
|
||||
* Fence job that aborts the worker pool.
|
||||
*
|
||||
* Useful as the last job in the job queue.
|
||||
*/
|
||||
class JobEndT : JobFenceT
|
||||
{
|
||||
@@ -160,23 +161,29 @@ public:
|
||||
~cmWorkerPool();
|
||||
|
||||
/**
|
||||
* @brief Blocking function that starts threads to process all Jobs in
|
||||
* the queue.
|
||||
* Number of worker threads.
|
||||
*/
|
||||
unsigned int ThreadCount() const { return ThreadCount_; }
|
||||
|
||||
/**
|
||||
* Set the number of worker threads.
|
||||
*
|
||||
* Calling this method during Process() has no effect.
|
||||
*/
|
||||
void SetThreadCount(unsigned int threadCount);
|
||||
|
||||
/**
|
||||
* Blocking function that starts threads to process all Jobs in the queue.
|
||||
*
|
||||
* This method blocks until a job calls the Abort() method.
|
||||
* @arg threadCount Number of threads to process jobs.
|
||||
* @arg userData Common user data pointer available in all Jobs.
|
||||
*/
|
||||
bool Process(unsigned int threadCount, void* userData = nullptr);
|
||||
|
||||
/**
|
||||
* Number of worker threads passed to Process().
|
||||
* Only valid during Process().
|
||||
*/
|
||||
unsigned int ThreadCount() const { return ThreadCount_; }
|
||||
bool Process(void* userData = nullptr);
|
||||
|
||||
/**
|
||||
* User data reference passed to Process().
|
||||
*
|
||||
* Only valid during Process().
|
||||
*/
|
||||
void* UserData() const { return UserData_; }
|
||||
@@ -184,14 +191,14 @@ public:
|
||||
// -- Job processing interface
|
||||
|
||||
/**
|
||||
* @brief Clears the job queue and aborts all worker threads.
|
||||
* Clears the job queue and aborts all worker threads.
|
||||
*
|
||||
* This method is thread safe and can be called from inside a job.
|
||||
*/
|
||||
void Abort();
|
||||
|
||||
/**
|
||||
* @brief Push job to the queue.
|
||||
* Push job to the queue.
|
||||
*
|
||||
* This method is thread safe and can be called from inside a job or before
|
||||
* Process().
|
||||
@@ -199,7 +206,7 @@ public:
|
||||
bool PushJob(JobHandleT&& jobHandle);
|
||||
|
||||
/**
|
||||
* @brief Push job to the queue
|
||||
* Push job to the queue
|
||||
*
|
||||
* This method is thread safe and can be called from inside a job or before
|
||||
* Process().
|
||||
@@ -212,7 +219,7 @@ public:
|
||||
|
||||
private:
|
||||
void* UserData_ = nullptr;
|
||||
unsigned int ThreadCount_ = 0;
|
||||
unsigned int ThreadCount_ = 1;
|
||||
std::unique_ptr<cmWorkerPoolInternal> Int_;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user