diff options
author | Koundinya Veluri <kouvel@users.noreply.github.com> | 2017-09-15 14:15:25 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-15 14:15:25 -0700 |
commit | b8dda0cbf7eae770fc685378ad7c542e2468a209 (patch) | |
tree | e6a5ace0a08363f22021ed46246e9e8125310be3 /src | |
parent | a38aa2433942a2a200ca483c104fb0adf71db6e3 (diff) | |
download | coreclr-b8dda0cbf7eae770fc685378ad7c542e2468a209.tar.gz coreclr-b8dda0cbf7eae770fc685378ad7c542e2468a209.tar.bz2 coreclr-b8dda0cbf7eae770fc685378ad7c542e2468a209.zip |
Revert two changes to thread requests (#14015)
Reverting 99db31c41d5057e08cc4701c79f11246b9191a9b and fd91ee1fa23f35130f576c19dfaf35934dc2ce24 to unblock others while trying to figure out what the issues are and how to fix them.
fd91ee1fa23f35130f576c19dfaf35934dc2ce24 is causing @benaadams thread pool perf test (https://github.com/benaadams/ThreadPoolTaskTesting) to hang due to a missed thread request. Somehow wsqActive is ending up at zero while there is a work item in the queue and with no pending thread requests. I don't understand how yet.
99db31c41d5057e08cc4701c79f11246b9191a9b appears to have a potential issue because the order of MarkThreadRequestSatisfied and Dequeue are reversed. For instance, assuming a proc count of 1:
- Initial state: 1 work item enqueued, 1 thread request
- T1 Dispatch: dequeues a work item and requests a thread (0 work items, 1 thread request)
- T1 Dispatch: sees no more work items, returns
- T1 calls Dispatch again due to its own thread request
- T1 Dispatch: After Dequeue (which saw 0 work items) and before MarkThreadRequestSatisfied:
- Current state: 0 work items, 1 thread request
- T2 enqueues a work item, sees 1 thread request and does not request a thread (1 work item, 1 thread request)
- T1 Dispatch: MarkThreadRequestSatisfied decrements thread requests (1 work item, 0 thread requests)
- Now after T1 returns, it won't wake up again but there is still one work item in the queue
Diffstat (limited to 'src')
-rw-r--r-- | src/mscorlib/src/System/Threading/ThreadPool.cs | 128 |
1 files changed, 30 insertions, 98 deletions
diff --git a/src/mscorlib/src/System/Threading/ThreadPool.cs b/src/mscorlib/src/System/Threading/ThreadPool.cs index ec9ceef156..fa1dd095c0 100644 --- a/src/mscorlib/src/System/Threading/ThreadPool.cs +++ b/src/mscorlib/src/System/Threading/ThreadPool.cs @@ -48,12 +48,6 @@ namespace System.Threading public static WorkStealingQueue[] Queues => _queues; - // Track whether the WorkStealingQueueList is empty - // Three states simplifies race conditions. They may be considered. - // Now Active --> Maybe Inactive -> Confirmed Inactive - public const int WsqNowActive = 2; - public static int wsqActive; - public static void Add(WorkStealingQueue queue) { Debug.Assert(queue != null); @@ -400,9 +394,6 @@ namespace System.Threading ThreadPoolWorkQueueThreadLocals.threadLocals ?? (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this)); - internal bool ThreadRequestNeeded(int count) => (count < ThreadPoolGlobals.processorCount) && - (!workItems.IsEmpty || (WorkStealingQueueList.wsqActive > 0)); - internal void EnsureThreadRequested() { // @@ -413,7 +404,7 @@ namespace System.Threading // which is handled by RequestWorkerThread. // int count = numOutstandingThreadRequests; - while (ThreadRequestNeeded(count)) + while (count < ThreadPoolGlobals.processorCount) { int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count); if (prev == count) @@ -425,7 +416,7 @@ namespace System.Threading } } - internal void MarkThreadRequestSatisfied(bool dequeueSuccessful) + internal void MarkThreadRequestSatisfied() { // // The VM has called us, so one of our outstanding thread requests has been satisfied. @@ -434,17 +425,8 @@ namespace System.Threading // by the time we reach this point. // int count = numOutstandingThreadRequests; - while (count > 0) { - if (dequeueSuccessful && (count == ThreadPoolGlobals.processorCount) && ThreadRequestNeeded(count - 1)) - { - // If we gated threads due to too many outstanding requests and queue was not empty - // Request another thread. - ThreadPool.RequestWorkerThread(); - return; - } - int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count); if (prev == count) { @@ -466,18 +448,6 @@ namespace System.Threading if (null != tl) { tl.workStealingQueue.LocalPush(callback); - - // We must guarantee wsqActive is set to WsqNowActive after we push - // The ordering must be global because we rely on other threads - // observing in this order - Interlocked.MemoryBarrier(); - - // We do not want to simply write. We want to prevent unnecessary writes - // which would invalidate reader's caches - if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive) - { - Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive); - } } else { @@ -495,56 +465,33 @@ namespace System.Threading public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { + WorkStealingQueue localWsq = tl.workStealingQueue; IThreadPoolWorkItem callback; - int wsqActiveObserved = WorkStealingQueueList.wsqActive; - if (wsqActiveObserved > 0) - { - WorkStealingQueue localWsq = tl.workStealingQueue; - if ((callback = localWsq.LocalPop()) == null && // first try the local queue - !workItems.TryDequeue(out callback)) // then try the global queue + if ((callback = localWsq.LocalPop()) == null && // first try the local queue + !workItems.TryDequeue(out callback)) // then try the global queue + { + // finally try to steal from another thread's local queue + WorkStealingQueue[] queues = WorkStealingQueueList.Queues; + int c = queues.Length; + Debug.Assert(c > 0, "There must at least be a queue for this thread."); + int maxIndex = c - 1; + int i = tl.random.Next(c); + while (c > 0) { - // finally try to steal from another thread's local queue - WorkStealingQueue[] queues = WorkStealingQueueList.Queues; - int c = queues.Length; - Debug.Assert(c > 0, "There must at least be a queue for this thread."); - int maxIndex = c - 1; - int i = tl.random.Next(c); - while (c > 0) + i = (i < maxIndex) ? i + 1 : 0; + WorkStealingQueue otherQueue = queues[i]; + if (otherQueue != localWsq && otherQueue.CanSteal) { - i = (i < maxIndex) ? i + 1 : 0; - WorkStealingQueue otherQueue = queues[i]; - if (otherQueue != localWsq && otherQueue.CanSteal) + callback = otherQueue.TrySteal(ref missedSteal); + if (callback != null) { - callback = otherQueue.TrySteal(ref missedSteal); - if (callback != null) - { - break; - } + break; } - c--; - } - if ((callback == null) && !missedSteal) - { - // Only decrement if the value is unchanged since we started looking for work - // This prevents multiple threads decrementing based on overlapping scans. - // - // When we decrement from active, the producer may have inserted a queue item during our scan - // therefore we cannot transition to empty - // - // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan, - // the producer must write Active. We may transition to empty briefly if we beat the - // producer's write, but the producer will then overwrite us before waking threads. - // So effectively we cannot mark the queue empty when an item is in the queue. - Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved); } + c--; } } - else - { - // We only need to look at the global queue since WorkStealingQueueList is inactive - workItems.TryDequeue(out callback); - } return callback; } @@ -558,7 +505,15 @@ namespace System.Threading // int quantumStartTime = Environment.TickCount; - bool markThreadRequestSatisfied = true; + // + // Update our records to indicate that an outstanding request for a thread has now been fulfilled. + // From this point on, we are responsible for requesting another thread if we stop working for any + // reason, and we believe there might still be work in the queue. + // + // Note that if this thread is aborted before we get a chance to request another one, the VM will + // record a thread request on our behalf. So we don't need to worry about getting aborted right here. + // + workQueue.MarkThreadRequestSatisfied(); // Has the desire for logging changed since the last time we entered? workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); @@ -607,21 +562,7 @@ namespace System.Threading // If we found work, there may be more work. Ask for another thread so that the other work can be processed // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. // - if (markThreadRequestSatisfied) - { - // - // Update our records to indicate that an outstanding request for a thread has now been fulfilled - // and that an item was successfully dispatched and another thread may be needed - // - // From this point on, we are responsible for requesting another thread if we stop working for any - // reason, and we believe there might still be work in the queue. - // - // Note that if this thread is aborted before we get a chance to request another one, the VM will - // record a thread request on our behalf. So we don't need to worry about getting aborted right here. - // - workQueue.MarkThreadRequestSatisfied(true); - markThreadRequestSatisfied = false; - } + workQueue.EnsureThreadRequested(); // // Execute the workitem outside of any finally blocks, so that it can be aborted if needed. @@ -676,15 +617,6 @@ namespace System.Threading } finally { - if (markThreadRequestSatisfied) - { - // - // Update our records to indicate that an outstanding request for a thread has now been fulfilled - // and that an item was not successfully dispatched. We will request thread below if needed - // - workQueue.MarkThreadRequestSatisfied(false); - } - // // If we are exiting for any reason other than that the queue is definitely empty, ask for another // thread to pick up where we left off. |