diff options
Diffstat (limited to 'src/mscorlib/src/System/Threading/ThreadPool.cs')
-rw-r--r-- | src/mscorlib/src/System/Threading/ThreadPool.cs | 204 |
1 files changed, 135 insertions, 69 deletions
diff --git a/src/mscorlib/src/System/Threading/ThreadPool.cs b/src/mscorlib/src/System/Threading/ThreadPool.cs index 0084050c43..ec9ceef156 100644 --- a/src/mscorlib/src/System/Threading/ThreadPool.cs +++ b/src/mscorlib/src/System/Threading/ThreadPool.cs @@ -39,6 +39,7 @@ namespace System.Threading public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue(); } + [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing internal sealed class ThreadPoolWorkQueue { internal static class WorkStealingQueueList @@ -47,6 +48,12 @@ namespace System.Threading public static WorkStealingQueue[] Queues => _queues; + // Track whether the WorkStealingQueueList is empty + // Three states simplifies race conditions. They may be considered. + // Now Active --> Maybe Inactive -> Confirmed Inactive + public const int WsqNowActive = 2; + public static int wsqActive; + public static void Add(WorkStealingQueue queue) { Debug.Assert(queue != null); @@ -378,8 +385,12 @@ namespace System.Threading internal bool loggingEnabled; internal readonly ConcurrentQueue<IThreadPoolWorkItem> workItems = new ConcurrentQueue<IThreadPoolWorkItem>(); + private Internal.PaddingFor32 pad1; + private volatile int numOutstandingThreadRequests = 0; + private Internal.PaddingFor32 pad2; + public ThreadPoolWorkQueue() { loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); @@ -389,15 +400,20 @@ namespace System.Threading ThreadPoolWorkQueueThreadLocals.threadLocals ?? (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this)); + internal bool ThreadRequestNeeded(int count) => (count < ThreadPoolGlobals.processorCount) && + (!workItems.IsEmpty || (WorkStealingQueueList.wsqActive > 0)); + internal void EnsureThreadRequested() { // - // If we have not yet requested #procs threads from the VM, then request a new thread. + // If we have not yet requested #procs threads from the VM, then request a new thread + // as needed + // // Note that there is a separate count in the VM which will also be incremented in this case, // which is handled by RequestWorkerThread. // int count = numOutstandingThreadRequests; - while (count < ThreadPoolGlobals.processorCount) + while (ThreadRequestNeeded(count)) { int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count); if (prev == count) @@ -409,7 +425,7 @@ namespace System.Threading } } - internal void MarkThreadRequestSatisfied() + internal void MarkThreadRequestSatisfied(bool dequeueSuccessful) { // // The VM has called us, so one of our outstanding thread requests has been satisfied. @@ -418,8 +434,17 @@ namespace System.Threading // by the time we reach this point. // int count = numOutstandingThreadRequests; + while (count > 0) { + if (dequeueSuccessful && (count == ThreadPoolGlobals.processorCount) && ThreadRequestNeeded(count - 1)) + { + // If we gated threads due to too many outstanding requests and queue was not empty + // Request another thread. + ThreadPool.RequestWorkerThread(); + return; + } + int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count); if (prev == count) { @@ -441,6 +466,18 @@ namespace System.Threading if (null != tl) { tl.workStealingQueue.LocalPush(callback); + + // We must guarantee wsqActive is set to WsqNowActive after we push + // The ordering must be global because we rely on other threads + // observing in this order + Interlocked.MemoryBarrier(); + + // We do not want to simply write. We want to prevent unnecessary writes + // which would invalidate reader's caches + if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive) + { + Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive); + } } else { @@ -458,33 +495,56 @@ namespace System.Threading public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { - WorkStealingQueue localWsq = tl.workStealingQueue; IThreadPoolWorkItem callback; - - if ((callback = localWsq.LocalPop()) == null && // first try the local queue - !workItems.TryDequeue(out callback)) // then try the global queue + int wsqActiveObserved = WorkStealingQueueList.wsqActive; + if (wsqActiveObserved > 0) { - // finally try to steal from another thread's local queue - WorkStealingQueue[] queues = WorkStealingQueueList.Queues; - int c = queues.Length; - Debug.Assert(c > 0, "There must at least be a queue for this thread."); - int maxIndex = c - 1; - int i = tl.random.Next(c); - while (c > 0) + WorkStealingQueue localWsq = tl.workStealingQueue; + + if ((callback = localWsq.LocalPop()) == null && // first try the local queue + !workItems.TryDequeue(out callback)) // then try the global queue { - i = (i < maxIndex) ? i + 1 : 0; - WorkStealingQueue otherQueue = queues[i]; - if (otherQueue != localWsq && otherQueue.CanSteal) + // finally try to steal from another thread's local queue + WorkStealingQueue[] queues = WorkStealingQueueList.Queues; + int c = queues.Length; + Debug.Assert(c > 0, "There must at least be a queue for this thread."); + int maxIndex = c - 1; + int i = tl.random.Next(c); + while (c > 0) { - callback = otherQueue.TrySteal(ref missedSteal); - if (callback != null) + i = (i < maxIndex) ? i + 1 : 0; + WorkStealingQueue otherQueue = queues[i]; + if (otherQueue != localWsq && otherQueue.CanSteal) { - break; + callback = otherQueue.TrySteal(ref missedSteal); + if (callback != null) + { + break; + } } + c--; + } + if ((callback == null) && !missedSteal) + { + // Only decrement if the value is unchanged since we started looking for work + // This prevents multiple threads decrementing based on overlapping scans. + // + // When we decrement from active, the producer may have inserted a queue item during our scan + // therefore we cannot transition to empty + // + // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan, + // the producer must write Active. We may transition to empty briefly if we beat the + // producer's write, but the producer will then overwrite us before waking threads. + // So effectively we cannot mark the queue empty when an item is in the queue. + Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved); } - c--; } } + else + { + // We only need to look at the global queue since WorkStealingQueueList is inactive + workItems.TryDequeue(out callback); + } return callback; } @@ -498,15 +558,7 @@ namespace System.Threading // int quantumStartTime = Environment.TickCount; - // - // Update our records to indicate that an outstanding request for a thread has now been fulfilled. - // From this point on, we are responsible for requesting another thread if we stop working for any - // reason, and we believe there might still be work in the queue. - // - // Note that if this thread is aborted before we get a chance to request another one, the VM will - // record a thread request on our behalf. So we don't need to worry about getting aborted right here. - // - workQueue.MarkThreadRequestSatisfied(); + bool markThreadRequestSatisfied = true; // Has the desire for logging changed since the last time we entered? workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); @@ -555,7 +607,21 @@ namespace System.Threading // If we found work, there may be more work. Ask for another thread so that the other work can be processed // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. // - workQueue.EnsureThreadRequested(); + if (markThreadRequestSatisfied) + { + // + // Update our records to indicate that an outstanding request for a thread has now been fulfilled + // and that an item was successfully dispatched and another thread may be needed + // + // From this point on, we are responsible for requesting another thread if we stop working for any + // reason, and we believe there might still be work in the queue. + // + // Note that if this thread is aborted before we get a chance to request another one, the VM will + // record a thread request on our behalf. So we don't need to worry about getting aborted right here. + // + workQueue.MarkThreadRequestSatisfied(true); + markThreadRequestSatisfied = false; + } // // Execute the workitem outside of any finally blocks, so that it can be aborted if needed. @@ -610,6 +676,15 @@ namespace System.Threading } finally { + if (markThreadRequestSatisfied) + { + // + // Update our records to indicate that an outstanding request for a thread has now been fulfilled + // and that an item was not successfully dispatched. We will request thread below if needed + // + workQueue.MarkThreadRequestSatisfied(false); + } + // // If we are exiting for any reason other than that the queue is definitely empty, ask for another // thread to pick up where we left off. @@ -715,16 +790,11 @@ namespace System.Threading { // needed for DangerousAddRef RuntimeHelpers.PrepareConstrainedRegions(); - try - { - } - finally + + m_internalWaitObject = waitObject; + if (waitObject != null) { - m_internalWaitObject = waitObject; - if (waitObject != null) - { - m_internalWaitObject.SafeWaitHandle.DangerousAddRef(ref bReleaseNeeded); - } + m_internalWaitObject.SafeWaitHandle.DangerousAddRef(ref bReleaseNeeded); } } @@ -735,47 +805,43 @@ namespace System.Threading bool result = false; // needed for DangerousRelease RuntimeHelpers.PrepareConstrainedRegions(); - try - { - } - finally + + // lock(this) cannot be used reliably in Cer since thin lock could be + // promoted to syncblock and that is not a guaranteed operation + bool bLockTaken = false; + do { - // lock(this) cannot be used reliably in Cer since thin lock could be - // promoted to syncblock and that is not a guaranteed operation - bool bLockTaken = false; - do + if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0) { - if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0) + bLockTaken = true; + try { - bLockTaken = true; - try + if (ValidHandle()) { - if (ValidHandle()) + result = UnregisterWaitNative(GetHandle(), waitObject == null ? null : waitObject.SafeWaitHandle); + if (result == true) { - result = UnregisterWaitNative(GetHandle(), waitObject == null ? null : waitObject.SafeWaitHandle); - if (result == true) + if (bReleaseNeeded) { - if (bReleaseNeeded) - { - m_internalWaitObject.SafeWaitHandle.DangerousRelease(); - bReleaseNeeded = false; - } - // if result not true don't release/suppress here so finalizer can make another attempt - SetHandle(InvalidHandle); - m_internalWaitObject = null; - GC.SuppressFinalize(this); + m_internalWaitObject.SafeWaitHandle.DangerousRelease(); + bReleaseNeeded = false; } + // if result not true don't release/suppress here so finalizer can make another attempt + SetHandle(InvalidHandle); + m_internalWaitObject = null; + GC.SuppressFinalize(this); } } - finally - { - m_lock = 0; - } } - Thread.SpinWait(1); // yield to processor + finally + { + m_lock = 0; + } } - while (!bLockTaken); + Thread.SpinWait(1); // yield to processor } + while (!bLockTaken); + return result; } |