summaryrefslogtreecommitdiff
path: root/src/mscorlib/src/System/Threading/ThreadPool.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mscorlib/src/System/Threading/ThreadPool.cs')
-rw-r--r--src/mscorlib/src/System/Threading/ThreadPool.cs204
1 files changed, 135 insertions, 69 deletions
diff --git a/src/mscorlib/src/System/Threading/ThreadPool.cs b/src/mscorlib/src/System/Threading/ThreadPool.cs
index 0084050c43..ec9ceef156 100644
--- a/src/mscorlib/src/System/Threading/ThreadPool.cs
+++ b/src/mscorlib/src/System/Threading/ThreadPool.cs
@@ -39,6 +39,7 @@ namespace System.Threading
public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue();
}
+ [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
internal sealed class ThreadPoolWorkQueue
{
internal static class WorkStealingQueueList
@@ -47,6 +48,12 @@ namespace System.Threading
public static WorkStealingQueue[] Queues => _queues;
+ // Track whether the WorkStealingQueueList is empty
+ // Three states simplifies race conditions. They may be considered.
+ // Now Active --> Maybe Inactive -> Confirmed Inactive
+ public const int WsqNowActive = 2;
+ public static int wsqActive;
+
public static void Add(WorkStealingQueue queue)
{
Debug.Assert(queue != null);
@@ -378,8 +385,12 @@ namespace System.Threading
internal bool loggingEnabled;
internal readonly ConcurrentQueue<IThreadPoolWorkItem> workItems = new ConcurrentQueue<IThreadPoolWorkItem>();
+ private Internal.PaddingFor32 pad1;
+
private volatile int numOutstandingThreadRequests = 0;
+ private Internal.PaddingFor32 pad2;
+
public ThreadPoolWorkQueue()
{
loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
@@ -389,15 +400,20 @@ namespace System.Threading
ThreadPoolWorkQueueThreadLocals.threadLocals ??
(ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));
+ internal bool ThreadRequestNeeded(int count) => (count < ThreadPoolGlobals.processorCount) &&
+ (!workItems.IsEmpty || (WorkStealingQueueList.wsqActive > 0));
+
internal void EnsureThreadRequested()
{
//
- // If we have not yet requested #procs threads from the VM, then request a new thread.
+ // If we have not yet requested #procs threads from the VM, then request a new thread
+ // as needed
+ //
// Note that there is a separate count in the VM which will also be incremented in this case,
// which is handled by RequestWorkerThread.
//
int count = numOutstandingThreadRequests;
- while (count < ThreadPoolGlobals.processorCount)
+ while (ThreadRequestNeeded(count))
{
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
if (prev == count)
@@ -409,7 +425,7 @@ namespace System.Threading
}
}
- internal void MarkThreadRequestSatisfied()
+ internal void MarkThreadRequestSatisfied(bool dequeueSuccessful)
{
//
// The VM has called us, so one of our outstanding thread requests has been satisfied.
@@ -418,8 +434,17 @@ namespace System.Threading
// by the time we reach this point.
//
int count = numOutstandingThreadRequests;
+
while (count > 0)
{
+ if (dequeueSuccessful && (count == ThreadPoolGlobals.processorCount) && ThreadRequestNeeded(count - 1))
+ {
+ // If we gated threads due to too many outstanding requests and queue was not empty
+ // Request another thread.
+ ThreadPool.RequestWorkerThread();
+ return;
+ }
+
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
if (prev == count)
{
@@ -441,6 +466,18 @@ namespace System.Threading
if (null != tl)
{
tl.workStealingQueue.LocalPush(callback);
+
+ // We must guarantee wsqActive is set to WsqNowActive after we push
+ // The ordering must be global because we rely on other threads
+ // observing in this order
+ Interlocked.MemoryBarrier();
+
+ // We do not want to simply write. We want to prevent unnecessary writes
+ // which would invalidate reader's caches
+ if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive)
+ {
+ Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive);
+ }
}
else
{
@@ -458,33 +495,56 @@ namespace System.Threading
public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
{
- WorkStealingQueue localWsq = tl.workStealingQueue;
IThreadPoolWorkItem callback;
-
- if ((callback = localWsq.LocalPop()) == null && // first try the local queue
- !workItems.TryDequeue(out callback)) // then try the global queue
+ int wsqActiveObserved = WorkStealingQueueList.wsqActive;
+ if (wsqActiveObserved > 0)
{
- // finally try to steal from another thread's local queue
- WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
- int c = queues.Length;
- Debug.Assert(c > 0, "There must at least be a queue for this thread.");
- int maxIndex = c - 1;
- int i = tl.random.Next(c);
- while (c > 0)
+ WorkStealingQueue localWsq = tl.workStealingQueue;
+
+ if ((callback = localWsq.LocalPop()) == null && // first try the local queue
+ !workItems.TryDequeue(out callback)) // then try the global queue
{
- i = (i < maxIndex) ? i + 1 : 0;
- WorkStealingQueue otherQueue = queues[i];
- if (otherQueue != localWsq && otherQueue.CanSteal)
+ // finally try to steal from another thread's local queue
+ WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
+ int c = queues.Length;
+ Debug.Assert(c > 0, "There must at least be a queue for this thread.");
+ int maxIndex = c - 1;
+ int i = tl.random.Next(c);
+ while (c > 0)
{
- callback = otherQueue.TrySteal(ref missedSteal);
- if (callback != null)
+ i = (i < maxIndex) ? i + 1 : 0;
+ WorkStealingQueue otherQueue = queues[i];
+ if (otherQueue != localWsq && otherQueue.CanSteal)
{
- break;
+ callback = otherQueue.TrySteal(ref missedSteal);
+ if (callback != null)
+ {
+ break;
+ }
}
+ c--;
+ }
+ if ((callback == null) && !missedSteal)
+ {
+ // Only decrement if the value is unchanged since we started looking for work
+ // This prevents multiple threads decrementing based on overlapping scans.
+ //
+ // When we decrement from active, the producer may have inserted a queue item during our scan
+ // therefore we cannot transition to empty
+ //
+ // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan,
+ // the producer must write Active. We may transition to empty briefly if we beat the
+ // producer's write, but the producer will then overwrite us before waking threads.
+ // So effectively we cannot mark the queue empty when an item is in the queue.
+ Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved);
}
- c--;
}
}
+ else
+ {
+ // We only need to look at the global queue since WorkStealingQueueList is inactive
+ workItems.TryDequeue(out callback);
+ }
return callback;
}
@@ -498,15 +558,7 @@ namespace System.Threading
//
int quantumStartTime = Environment.TickCount;
- //
- // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
- // From this point on, we are responsible for requesting another thread if we stop working for any
- // reason, and we believe there might still be work in the queue.
- //
- // Note that if this thread is aborted before we get a chance to request another one, the VM will
- // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
- //
- workQueue.MarkThreadRequestSatisfied();
+ bool markThreadRequestSatisfied = true;
// Has the desire for logging changed since the last time we entered?
workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
@@ -555,7 +607,21 @@ namespace System.Threading
// If we found work, there may be more work. Ask for another thread so that the other work can be processed
// in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
//
- workQueue.EnsureThreadRequested();
+ if (markThreadRequestSatisfied)
+ {
+ //
+ // Update our records to indicate that an outstanding request for a thread has now been fulfilled
+ // and that an item was successfully dispatched and another thread may be needed
+ //
+ // From this point on, we are responsible for requesting another thread if we stop working for any
+ // reason, and we believe there might still be work in the queue.
+ //
+ // Note that if this thread is aborted before we get a chance to request another one, the VM will
+ // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
+ //
+ workQueue.MarkThreadRequestSatisfied(true);
+ markThreadRequestSatisfied = false;
+ }
//
// Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
@@ -610,6 +676,15 @@ namespace System.Threading
}
finally
{
+ if (markThreadRequestSatisfied)
+ {
+ //
+ // Update our records to indicate that an outstanding request for a thread has now been fulfilled
+ // and that an item was not successfully dispatched. We will request thread below if needed
+ //
+ workQueue.MarkThreadRequestSatisfied(false);
+ }
+
//
// If we are exiting for any reason other than that the queue is definitely empty, ask for another
// thread to pick up where we left off.
@@ -715,16 +790,11 @@ namespace System.Threading
{
// needed for DangerousAddRef
RuntimeHelpers.PrepareConstrainedRegions();
- try
- {
- }
- finally
+
+ m_internalWaitObject = waitObject;
+ if (waitObject != null)
{
- m_internalWaitObject = waitObject;
- if (waitObject != null)
- {
- m_internalWaitObject.SafeWaitHandle.DangerousAddRef(ref bReleaseNeeded);
- }
+ m_internalWaitObject.SafeWaitHandle.DangerousAddRef(ref bReleaseNeeded);
}
}
@@ -735,47 +805,43 @@ namespace System.Threading
bool result = false;
// needed for DangerousRelease
RuntimeHelpers.PrepareConstrainedRegions();
- try
- {
- }
- finally
+
+ // lock(this) cannot be used reliably in Cer since thin lock could be
+ // promoted to syncblock and that is not a guaranteed operation
+ bool bLockTaken = false;
+ do
{
- // lock(this) cannot be used reliably in Cer since thin lock could be
- // promoted to syncblock and that is not a guaranteed operation
- bool bLockTaken = false;
- do
+ if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0)
{
- if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0)
+ bLockTaken = true;
+ try
{
- bLockTaken = true;
- try
+ if (ValidHandle())
{
- if (ValidHandle())
+ result = UnregisterWaitNative(GetHandle(), waitObject == null ? null : waitObject.SafeWaitHandle);
+ if (result == true)
{
- result = UnregisterWaitNative(GetHandle(), waitObject == null ? null : waitObject.SafeWaitHandle);
- if (result == true)
+ if (bReleaseNeeded)
{
- if (bReleaseNeeded)
- {
- m_internalWaitObject.SafeWaitHandle.DangerousRelease();
- bReleaseNeeded = false;
- }
- // if result not true don't release/suppress here so finalizer can make another attempt
- SetHandle(InvalidHandle);
- m_internalWaitObject = null;
- GC.SuppressFinalize(this);
+ m_internalWaitObject.SafeWaitHandle.DangerousRelease();
+ bReleaseNeeded = false;
}
+ // if result not true don't release/suppress here so finalizer can make another attempt
+ SetHandle(InvalidHandle);
+ m_internalWaitObject = null;
+ GC.SuppressFinalize(this);
}
}
- finally
- {
- m_lock = 0;
- }
}
- Thread.SpinWait(1); // yield to processor
+ finally
+ {
+ m_lock = 0;
+ }
}
- while (!bLockTaken);
+ Thread.SpinWait(1); // yield to processor
}
+ while (!bLockTaken);
+
return result;
}