diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 75ca47a035..cbf070e91c 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -370,88 +370,144 @@ void cmUVReadOnlyProcess::UVTryFinish() FinishedCallback_(); } +/** + * @brief Worker pool worker thread + */ +class cmWorkerPoolWorker +{ +public: + cmWorkerPoolWorker(uv_loop_t& uvLoop); + ~cmWorkerPoolWorker(); + + cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete; + cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete; + + /** + * Set the internal thread + */ + void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); } + + /** + * Run an external process + */ + bool RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector const& command, + std::string const& workingDirectory); + +private: + // -- Libuv callbacks + static void UVProcessStart(uv_async_t* handle); + void UVProcessFinished(); + +private: + // -- Process management + struct + { + std::mutex Mutex; + cm::uv_async_ptr Request; + std::condition_variable Condition; + std::unique_ptr ROP; + } Proc_; + // -- System thread + std::thread Thread_; +}; + +cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop) +{ + Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this); +} + +cmWorkerPoolWorker::~cmWorkerPoolWorker() +{ + if (Thread_.joinable()) { + Thread_.join(); + } +} + +bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result, + std::vector const& command, + std::string const& workingDirectory) +{ + if (command.empty()) { + return false; + } + // Create process instance + { + std::lock_guard lock(Proc_.Mutex); + Proc_.ROP = cm::make_unique(); + Proc_.ROP->setup(&result, true, command, workingDirectory); + } + // Send asynchronous process start request to libuv loop + Proc_.Request.send(); + // Wait until the process has been finished and destroyed + { + std::unique_lock ulock(Proc_.Mutex); + while (Proc_.ROP) { + Proc_.Condition.wait(ulock); + } + } + return !result.error(); +} + +void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) +{ + auto* wrk = reinterpret_cast(handle->data); + bool startFailed = false; + { + auto& Proc = wrk->Proc_; + std::lock_guard lock(Proc.Mutex); + if (Proc.ROP && !Proc.ROP->IsStarted()) { + startFailed = + !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); }); + } + } + // Clean up if starting of the process failed + if (startFailed) { + wrk->UVProcessFinished(); + } +} + +void cmWorkerPoolWorker::UVProcessFinished() +{ + { + std::lock_guard lock(Proc_.Mutex); + if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { + Proc_.ROP.reset(); + } + } + // Notify idling thread + Proc_.Condition.notify_one(); +} + /** * @brief Private worker pool internals */ class cmWorkerPoolInternal { -public: - // -- Types - - /** - * @brief Worker thread - */ - class WorkerT - { - public: - WorkerT(unsigned int index); - ~WorkerT(); - - WorkerT(WorkerT const&) = delete; - WorkerT& operator=(WorkerT const&) = delete; - - /** - * Start the thread - */ - void Start(cmWorkerPoolInternal* internal); - - /** - * @brief Run an external process - */ - bool RunProcess(cmWorkerPool::ProcessResultT& result, - std::vector const& command, - std::string const& workingDirectory); - - // -- Accessors - unsigned int Index() const { return Index_; } - cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; } - - private: - // -- Libuv callbacks - static void UVProcessStart(uv_async_t* handle); - void UVProcessFinished(); - - private: - //! @brief Job handle - cmWorkerPool::JobHandleT JobHandle_; - //! @brief Worker index - unsigned int Index_; - // -- Process management - struct - { - std::mutex Mutex; - cm::uv_async_ptr Request; - std::condition_variable Condition; - std::unique_ptr ROP; - } Proc_; - // -- System thread - std::thread Thread_; - }; - public: // -- Constructors cmWorkerPoolInternal(cmWorkerPool* pool); ~cmWorkerPoolInternal(); /** - * @brief Runs the libuv loop + * Runs the libuv loop. */ bool Process(); /** - * @brief Clear queue and abort threads + * Clear queue and abort threads. */ void Abort(); /** - * @brief Push a job to the queue and notify a worker + * Push a job to the queue and notify a worker. */ bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); /** - * @brief Worker thread main loop method + * Worker thread main loop method. */ - void Work(WorkerT* worker); + void Work(unsigned int workerIndex); // -- Request slots static void UVSlotBegin(uv_async_t* handle); @@ -476,85 +532,12 @@ public: unsigned int JobsProcessing = 0; std::deque Queue; std::condition_variable Condition; - std::vector> Workers; + std::vector> Workers; // -- References cmWorkerPool* Pool = nullptr; }; -cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index) - : Index_(index) -{ -} - -cmWorkerPoolInternal::WorkerT::~WorkerT() -{ - if (Thread_.joinable()) { - Thread_.join(); - } -} - -void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal) -{ - Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this); - Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this); -} - -bool cmWorkerPoolInternal::WorkerT::RunProcess( - cmWorkerPool::ProcessResultT& result, - std::vector const& command, std::string const& workingDirectory) -{ - if (command.empty()) { - return false; - } - // Create process instance - { - std::lock_guard lock(Proc_.Mutex); - Proc_.ROP = cm::make_unique(); - Proc_.ROP->setup(&result, true, command, workingDirectory); - } - // Send asynchronous process start request to libuv loop - Proc_.Request.send(); - // Wait until the process has been finished and destroyed - { - std::unique_lock ulock(Proc_.Mutex); - while (Proc_.ROP) { - Proc_.Condition.wait(ulock); - } - } - return !result.error(); -} - -void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle) -{ - auto* wrk = reinterpret_cast(handle->data); - bool startFailed = false; - { - auto& Proc = wrk->Proc_; - std::lock_guard lock(Proc.Mutex); - if (Proc.ROP && !Proc.ROP->IsStarted()) { - startFailed = - !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); }); - } - } - // Clean up if starting of the process failed - if (startFailed) { - wrk->UVProcessFinished(); - } -} - -void cmWorkerPoolInternal::WorkerT::UVProcessFinished() -{ - { - std::lock_guard lock(Proc_.Mutex); - if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { - Proc_.ROP.reset(); - } - } - // Notify idling thread - Proc_.Condition.notify_one(); -} - void cmWorkerPool::ProcessResultT::reset() { ExitStatus = 0; @@ -652,11 +635,13 @@ void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle) // Create workers gint.Workers.reserve(num); for (unsigned int ii = 0; ii != num; ++ii) { - gint.Workers.emplace_back(cm::make_unique(ii)); + gint.Workers.emplace_back( + cm::make_unique(*gint.UVLoop)); } - // Start workers - for (auto& wrk : gint.Workers) { - wrk->Start(&gint); + // Start worker threads + for (unsigned int ii = 0; ii != num; ++ii) { + gint.Workers[ii]->SetThread( + std::thread(&cmWorkerPoolInternal::Work, &gint, ii)); } } // Destroy begin request @@ -672,8 +657,9 @@ void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle) gint.UVRequestEnd.reset(); } -void cmWorkerPoolInternal::Work(WorkerT* worker) +void cmWorkerPoolInternal::Work(unsigned int workerIndex) { + cmWorkerPool::JobHandleT jobHandle; std::unique_lock uLock(Mutex); // Increment running workers count ++WorkersRunning; @@ -702,15 +688,15 @@ void cmWorkerPoolInternal::Work(WorkerT* worker) } // Pop next job from queue - worker->JobHandle() = std::move(Queue.front()); + jobHandle = std::move(Queue.front()); Queue.pop_front(); // Unlocked scope for job processing ++JobsProcessing; { uLock.unlock(); - worker->JobHandle()->Work(Pool, worker->Index()); // Process job - worker->JobHandle().reset(); // Destroy job + jobHandle->Work(Pool, workerIndex); // Process job + jobHandle.reset(); // Destroy job uLock.lock(); } --JobsProcessing;