summaryrefslogtreecommitdiff
path: root/Source/cmWorkerPool.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'Source/cmWorkerPool.cxx')
-rw-r--r--Source/cmWorkerPool.cxx71
1 files changed, 42 insertions, 29 deletions
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx
index aa0d6b3d5..12aba4f96 100644
--- a/Source/cmWorkerPool.cxx
+++ b/Source/cmWorkerPool.cxx
@@ -13,7 +13,7 @@
#include <cm/memory>
-#include "cm_uv.h"
+#include <cm3p/uv.h>
#include "cmRange.h"
#include "cmStringAlgorithms.h"
@@ -469,11 +469,9 @@ void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
void cmWorkerPoolWorker::UVProcessFinished()
{
- {
- std::lock_guard<std::mutex> lock(Proc_.Mutex);
- if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
- Proc_.ROP.reset();
- }
+ std::lock_guard<std::mutex> lock(Proc_.Mutex);
+ if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
+ Proc_.ROP.reset();
}
// Notify idling thread
Proc_.Condition.notify_one();
@@ -532,6 +530,7 @@ public:
unsigned int JobsProcessing = 0;
std::deque<cmWorkerPool::JobHandleT> Queue;
std::condition_variable Condition;
+ std::condition_variable ConditionFence;
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
// -- References
@@ -593,19 +592,12 @@ bool cmWorkerPoolInternal::Process()
void cmWorkerPoolInternal::Abort()
{
- bool notifyThreads = false;
// Clear all jobs and set abort flag
- {
- std::lock_guard<std::mutex> guard(Mutex);
- if (Processing && !Aborting) {
- // Register abort and clear queue
- Aborting = true;
- Queue.clear();
- notifyThreads = true;
- }
- }
- if (notifyThreads) {
- // Wake threads
+ std::lock_guard<std::mutex> guard(Mutex);
+ if (!Aborting) {
+ // Register abort and clear queue
+ Aborting = true;
+ Queue.clear();
Condition.notify_all();
}
}
@@ -669,7 +661,7 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
if (Aborting) {
break;
}
- // Wait for new jobs
+ // Wait for new jobs on the main CV
if (Queue.empty()) {
++WorkersIdle;
Condition.wait(uLock);
@@ -677,20 +669,34 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
continue;
}
- // Check for fence jobs
- if (FenceProcessing || Queue.front()->IsFence()) {
- if (JobsProcessing != 0) {
- Condition.wait(uLock);
- continue;
- }
- // No jobs get processed. Set the fence job processing flag.
- FenceProcessing = true;
+ // If there is a fence currently active or waiting,
+ // sleep on the main CV and try again.
+ if (FenceProcessing) {
+ Condition.wait(uLock);
+ continue;
}
// Pop next job from queue
jobHandle = std::move(Queue.front());
Queue.pop_front();
+ // Check for fence jobs
+ bool raisedFence = false;
+ if (jobHandle->IsFence()) {
+ FenceProcessing = true;
+ raisedFence = true;
+ // Wait on the Fence CV until all pending jobs are done.
+ while (JobsProcessing != 0 && !Aborting) {
+ ConditionFence.wait(uLock);
+ }
+ // When aborting, explicitly kick all threads alive once more.
+ if (Aborting) {
+ FenceProcessing = false;
+ Condition.notify_all();
+ break;
+ }
+ }
+
// Unlocked scope for job processing
++JobsProcessing;
{
@@ -701,11 +707,18 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
}
--JobsProcessing;
- // Was this a fence job?
- if (FenceProcessing) {
+ // If this was the thread that entered fence processing
+ // originally, notify all idling workers that the fence
+ // is done.
+ if (raisedFence) {
FenceProcessing = false;
Condition.notify_all();
}
+ // If fence processing is still not done, notify the
+ // the fencing worker when all active jobs are done.
+ if (FenceProcessing && JobsProcessing == 0) {
+ ConditionFence.notify_all();
+ }
}
// Decrement running workers count