diff options
Diffstat (limited to 'src/mscorlib/src/System/Threading/ThreadPool.cs')
-rw-r--r-- | src/mscorlib/src/System/Threading/ThreadPool.cs | 960 |
1 files changed, 325 insertions, 635 deletions
diff --git a/src/mscorlib/src/System/Threading/ThreadPool.cs b/src/mscorlib/src/System/Threading/ThreadPool.cs index 451b15d22f..adf0615819 100644 --- a/src/mscorlib/src/System/Threading/ThreadPool.cs +++ b/src/mscorlib/src/System/Threading/ThreadPool.cs @@ -11,36 +11,20 @@ ** =============================================================================*/ -#pragma warning disable 0420 - -/* - * Below you'll notice two sets of APIs that are separated by the - * use of 'Unsafe' in their names. The unsafe versions are called - * that because they do not propagate the calling stack onto the - * worker thread. This allows code to lose the calling stack and - * thereby elevate its security privileges. Note that this operation - * is much akin to the combined ability to control security policy - * and control security evidence. With these privileges, a person - * can gain the right to load assemblies that are fully trusted which - * then assert full trust and can call any code they want regardless - * of the previous stack information. - */ +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Contracts; +using System.Diagnostics.Tracing; +using System.Runtime.CompilerServices; +using System.Runtime.ConstrainedExecution; +using System.Runtime.InteropServices; +using System.Security; +using Microsoft.Win32; namespace System.Threading { - using System.Security; - using System.Security.Permissions; - using System; - using Microsoft.Win32; - using System.Runtime.CompilerServices; - using System.Runtime.ConstrainedExecution; - using System.Runtime.InteropServices; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.Contracts; - using System.Diagnostics.CodeAnalysis; - using System.Diagnostics.Tracing; - internal static class ThreadPoolGlobals { //Per-appDomain quantum (in ms) for which the thread keeps processing @@ -55,78 +39,76 @@ namespace System.Threading public static bool enableWorkerTracking; public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue(); - - static ThreadPoolGlobals() - { - } } internal sealed class ThreadPoolWorkQueue { - // Simple sparsely populated array to allow lock-free reading. - internal class SparseArray<T> where T : class + internal static class WorkStealingQueueList { - private volatile T[] m_array; + private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0]; - internal SparseArray(int initialSize) - { - m_array = new T[initialSize]; - } + public static WorkStealingQueue[] Queues => _queues; - internal T[] Current - { - get { return m_array; } - } - - internal int Add(T e) + public static void Add(WorkStealingQueue queue) { + Debug.Assert(queue != null); while (true) { - T[] array = m_array; - lock (array) + WorkStealingQueue[] oldQueues = _queues; + Debug.Assert(Array.IndexOf(oldQueues, queue) == -1); + + var newQueues = new WorkStealingQueue[oldQueues.Length + 1]; + Array.Copy(oldQueues, 0, newQueues, 0, oldQueues.Length); + newQueues[newQueues.Length - 1] = queue; + if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) { - for (int i = 0; i < array.Length; i++) - { - if (array[i] == null) - { - Volatile.Write(ref array[i], e); - return i; - } - else if (i == array.Length - 1) - { - // Must resize. If there was a race condition, we start over again. - if (array != m_array) - continue; - - T[] newArray = new T[array.Length * 2]; - Array.Copy(array, newArray, i + 1); - newArray[i + 1] = e; - m_array = newArray; - return i + 1; - } - } + break; } } } - internal void Remove(T e) + public static void Remove(WorkStealingQueue queue) { - T[] array = m_array; - lock (array) + Debug.Assert(queue != null); + while (true) { - for (int i = 0; i < m_array.Length; i++) + WorkStealingQueue[] oldQueues = _queues; + if (oldQueues.Length == 0) { - if (m_array[i] == e) - { - Volatile.Write(ref m_array[i], null); - break; - } + return; + } + + int pos = Array.IndexOf(oldQueues, queue); + if (pos == -1) + { + Debug.Fail("Should have found the queue"); + return; + } + + var newQueues = new WorkStealingQueue[oldQueues.Length - 1]; + if (pos == 0) + { + Array.Copy(oldQueues, 1, newQueues, 0, newQueues.Length); + } + else if (pos == oldQueues.Length - 1) + { + Array.Copy(oldQueues, 0, newQueues, 0, newQueues.Length); + } + else + { + Array.Copy(oldQueues, 0, newQueues, 0, pos); + Array.Copy(oldQueues, pos + 1, newQueues, pos, newQueues.Length - pos); + } + + if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) + { + break; } } } } - internal class WorkStealingQueue + internal sealed class WorkStealingQueue { private const int INITIAL_SIZE = 32; internal volatile IThreadPoolWorkItem[] m_array = new IThreadPoolWorkItem[INITIAL_SIZE]; @@ -142,7 +124,7 @@ namespace System.Threading private volatile int m_headIndex = START_INDEX; private volatile int m_tailIndex = START_INDEX; - private SpinLock m_foreignLock = new SpinLock(false); + private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking:false); public void LocalPush(IThreadPoolWorkItem obj) { @@ -176,7 +158,7 @@ namespace System.Threading finally { if (lockTaken) - m_foreignLock.Exit(true); + m_foreignLock.Exit(useMemoryBarrier:true); } } @@ -201,7 +183,7 @@ namespace System.Threading if (count >= m_mask) { // We're full; expand the queue by doubling its size. - IThreadPoolWorkItem[] newArray = new IThreadPoolWorkItem[m_array.Length << 1]; + var newArray = new IThreadPoolWorkItem[m_array.Length << 1]; for (int i = 0; i < m_array.Length; i++) newArray[i] = m_array[(i + head) & m_mask]; @@ -218,7 +200,7 @@ namespace System.Threading finally { if (lockTaken) - m_foreignLock.Exit(false); + m_foreignLock.Exit(useMemoryBarrier:false); } } } @@ -229,13 +211,9 @@ namespace System.Threading // Fast path: check the tail. If equal, we can skip the lock. if (m_array[(m_tailIndex - 1) & m_mask] == obj) { - IThreadPoolWorkItem unused; - if (LocalPop(out unused)) - { - Debug.Assert(unused == obj); - return true; - } - return false; + IThreadPoolWorkItem unused = LocalPop(); + Debug.Assert(unused == null || unused == obj); + return unused != null; } // Else, do an O(N) search for the work item. The theory of work stealing and our @@ -276,7 +254,7 @@ namespace System.Threading finally { if (lockTaken) - m_foreignLock.Exit(false); + m_foreignLock.Exit(useMemoryBarrier:false); } } } @@ -284,19 +262,20 @@ namespace System.Threading return false; } + public IThreadPoolWorkItem LocalPop() => m_headIndex < m_tailIndex ? LocalPopCore() : null; + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] - public bool LocalPop(out IThreadPoolWorkItem obj) + private IThreadPoolWorkItem LocalPopCore() { while (true) { - // Decrement the tail using a fence to ensure subsequent read doesn't come before. int tail = m_tailIndex; if (m_headIndex >= tail) { - obj = null; - return false; + return null; } + // Decrement the tail using a fence to ensure subsequent read doesn't come before. tail -= 1; Interlocked.Exchange(ref m_tailIndex, tail); @@ -304,13 +283,13 @@ namespace System.Threading if (m_headIndex <= tail) { int idx = tail & m_mask; - obj = Volatile.Read(ref m_array[idx]); + IThreadPoolWorkItem obj = Volatile.Read(ref m_array[idx]); // Check for nulls in the array. if (obj == null) continue; m_array[idx] = null; - return true; + return obj; } else { @@ -324,241 +303,93 @@ namespace System.Threading { // Element still available. Take it. int idx = tail & m_mask; - obj = Volatile.Read(ref m_array[idx]); + IThreadPoolWorkItem obj = Volatile.Read(ref m_array[idx]); // Check for nulls in the array. if (obj == null) continue; m_array[idx] = null; - return true; + return obj; } else { // If we encountered a race condition and element was stolen, restore the tail. m_tailIndex = tail + 1; - obj = null; - return false; + return null; } } finally { if (lockTaken) - m_foreignLock.Exit(false); + m_foreignLock.Exit(useMemoryBarrier:false); } } } } - public bool TrySteal(out IThreadPoolWorkItem obj, ref bool missedSteal) - { - return TrySteal(out obj, ref missedSteal, 0); // no blocking by default. - } + public bool CanSteal => m_headIndex < m_tailIndex; - private bool TrySteal(out IThreadPoolWorkItem obj, ref bool missedSteal, int millisecondsTimeout) + public IThreadPoolWorkItem TrySteal(ref bool missedSteal) { - obj = null; - while (true) { - if (m_headIndex >= m_tailIndex) - return false; - - bool taken = false; - try + if (CanSteal) { - m_foreignLock.TryEnter(millisecondsTimeout, ref taken); - if (taken) + bool taken = false; + try { - // Increment head, and ensure read of tail doesn't move before it (fence). - int head = m_headIndex; - Interlocked.Exchange(ref m_headIndex, head + 1); - - if (head < m_tailIndex) + m_foreignLock.TryEnter(ref taken); + if (taken) { - int idx = head & m_mask; - obj = Volatile.Read(ref m_array[idx]); + // Increment head, and ensure read of tail doesn't move before it (fence). + int head = m_headIndex; + Interlocked.Exchange(ref m_headIndex, head + 1); - // Check for nulls in the array. - if (obj == null) continue; + if (head < m_tailIndex) + { + int idx = head & m_mask; + IThreadPoolWorkItem obj = Volatile.Read(ref m_array[idx]); - m_array[idx] = null; - return true; - } - else - { - // Failed, restore head. - m_headIndex = head; - obj = null; - missedSteal = true; + // Check for nulls in the array. + if (obj == null) continue; + + m_array[idx] = null; + return obj; + } + else + { + // Failed, restore head. + m_headIndex = head; + } } } - else + finally { - missedSteal = true; + if (taken) + m_foreignLock.Exit(useMemoryBarrier:false); } - } - finally - { - if (taken) - m_foreignLock.Exit(false); - } - - return false; - } - } - } - - internal class QueueSegment - { - // Holds a segment of the queue. Enqueues/Dequeues start at element 0, and work their way up. - internal readonly IThreadPoolWorkItem[] nodes; - private const int QueueSegmentLength = 256; - - // Holds the indexes of the lowest and highest valid elements of the nodes array. - // The low index is in the lower 16 bits, high index is in the upper 16 bits. - // Use GetIndexes and CompareExchangeIndexes to manipulate this. - private volatile int indexes; - - // The next segment in the queue. - public volatile QueueSegment Next; - - - const int SixteenBits = 0xffff; - - void GetIndexes(out int upper, out int lower) - { - int i = indexes; - upper = (i >> 16) & SixteenBits; - lower = i & SixteenBits; - - Debug.Assert(upper >= lower); - Debug.Assert(upper <= nodes.Length); - Debug.Assert(lower <= nodes.Length); - Debug.Assert(upper >= 0); - Debug.Assert(lower >= 0); - } - bool CompareExchangeIndexes(ref int prevUpper, int newUpper, ref int prevLower, int newLower) - { - Debug.Assert(newUpper >= newLower); - Debug.Assert(newUpper <= nodes.Length); - Debug.Assert(newLower <= nodes.Length); - Debug.Assert(newUpper >= 0); - Debug.Assert(newLower >= 0); - Debug.Assert(newUpper >= prevUpper); - Debug.Assert(newLower >= prevLower); - Debug.Assert(newUpper == prevUpper ^ newLower == prevLower); - - int oldIndexes = (prevUpper << 16) | (prevLower & SixteenBits); - int newIndexes = (newUpper << 16) | (newLower & SixteenBits); - int prevIndexes = Interlocked.CompareExchange(ref indexes, newIndexes, oldIndexes); - prevUpper = (prevIndexes >> 16) & SixteenBits; - prevLower = prevIndexes & SixteenBits; - return prevIndexes == oldIndexes; - } - - [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)] - public QueueSegment() - { - Debug.Assert(QueueSegmentLength <= SixteenBits); - nodes = new IThreadPoolWorkItem[QueueSegmentLength]; - } - - - public bool IsUsedUp() - { - int upper, lower; - GetIndexes(out upper, out lower); - return (upper == nodes.Length) && - (lower == nodes.Length); - } - - public bool TryEnqueue(IThreadPoolWorkItem node) - { - // - // If there's room in this segment, atomically increment the upper count (to reserve - // space for this node), then store the node. - // Note that this leaves a window where it will look like there is data in that - // array slot, but it hasn't been written yet. This is taken care of in TryDequeue - // with a busy-wait loop, waiting for the element to become non-null. This implies - // that we can never store null nodes in this data structure. - // - Debug.Assert(null != node); - - int upper, lower; - GetIndexes(out upper, out lower); - - while (true) - { - if (upper == nodes.Length) - return false; - - if (CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower)) - { - Debug.Assert(Volatile.Read(ref nodes[upper]) == null); - Volatile.Write(ref nodes[upper], node); - return true; + missedSteal = true; } - } - } - - [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] - public bool TryDequeue(out IThreadPoolWorkItem node) - { - // - // If there are nodes in this segment, increment the lower count, then take the - // element we find there. - // - int upper, lower; - GetIndexes(out upper, out lower); - - while(true) - { - if (lower == upper) - { - node = null; - return false; - } - - if (CompareExchangeIndexes(ref upper, upper, ref lower, lower + 1)) - { - // It's possible that a concurrent call to Enqueue hasn't yet - // written the node reference to the array. We need to spin until - // it shows up. - SpinWait spinner = new SpinWait(); - while ((node = Volatile.Read(ref nodes[lower])) == null) - spinner.SpinOnce(); - - // Null-out the reference so the object can be GC'd earlier. - nodes[lower] = null; - return true; - } + return null; } } } - // The head and tail of the queue. We enqueue to the head, and dequeue from the tail. - internal volatile QueueSegment queueHead; - internal volatile QueueSegment queueTail; internal bool loggingEnabled; - - internal static readonly SparseArray<WorkStealingQueue> allThreadQueues = new SparseArray<WorkStealingQueue>(16); + internal readonly ConcurrentQueue<IThreadPoolWorkItem> workItems = new ConcurrentQueue<IThreadPoolWorkItem>(); private volatile int numOutstandingThreadRequests = 0; public ThreadPoolWorkQueue() { - queueTail = queueHead = new QueueSegment(); loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool|FrameworkEventSource.Keywords.ThreadTransfer); } - public ThreadPoolWorkQueueThreadLocals EnsureCurrentThreadHasQueue() - { - if (null == ThreadPoolWorkQueueThreadLocals.threadLocals) - ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this); - return ThreadPoolWorkQueueThreadLocals.threadLocals; - } + public ThreadPoolWorkQueueThreadLocals EnsureCurrentThreadHasQueue() => + ThreadPoolWorkQueueThreadLocals.threadLocals ?? + (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this)); internal void EnsureThreadRequested() { @@ -602,12 +433,12 @@ namespace System.Threading public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) { + if (loggingEnabled) + System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); + ThreadPoolWorkQueueThreadLocals tl = null; if (!forceGlobal) tl = ThreadPoolWorkQueueThreadLocals.threadLocals; - - if (loggingEnabled) - System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); if (null != tl) { @@ -615,18 +446,7 @@ namespace System.Threading } else { - QueueSegment head = queueHead; - - while (!head.TryEnqueue(callback)) - { - Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null); - - while (head.Next != null) - { - Interlocked.CompareExchange(ref queueHead, head.Next, head); - head = queueHead; - } - } + workItems.Enqueue(callback); } EnsureThreadRequested(); @@ -635,67 +455,43 @@ namespace System.Threading internal bool LocalFindAndPop(IThreadPoolWorkItem callback) { ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals; - if (null == tl) - return false; - - return tl.workStealingQueue.LocalFindAndPop(callback); + return tl != null && tl.workStealingQueue.LocalFindAndPop(callback); } - public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal) + public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { - callback = null; - missedSteal = false; - WorkStealingQueue wsq = tl.workStealingQueue; + WorkStealingQueue localWsq = tl.workStealingQueue; + IThreadPoolWorkItem callback; - if (wsq.LocalPop(out callback)) - Debug.Assert(null != callback); - - if (null == callback) + if ((callback = localWsq.LocalPop()) == null && // first try the local queue + !workItems.TryDequeue(out callback)) // then try the global queue { - QueueSegment tail = queueTail; - while (true) - { - if (tail.TryDequeue(out callback)) - { - Debug.Assert(null != callback); - break; - } - - if (null == tail.Next || !tail.IsUsedUp()) - { - break; - } - else - { - Interlocked.CompareExchange(ref queueTail, tail.Next, tail); - tail = queueTail; - } - } - } - - if (null == callback) - { - WorkStealingQueue[] otherQueues = allThreadQueues.Current; - int c = otherQueues.Length; + // finally try to steal from another thread's local queue + WorkStealingQueue[] queues = WorkStealingQueueList.Queues; + int c = queues.Length; + Debug.Assert(c > 0, "There must at least be a queue for this thread."); int maxIndex = c - 1; int i = tl.random.Next(c); while (c > 0) { i = (i < maxIndex) ? i + 1 : 0; - WorkStealingQueue otherQueue = Volatile.Read(ref otherQueues[i]); - if (otherQueue != null && - otherQueue != wsq && - otherQueue.TrySteal(out callback, ref missedSteal)) + WorkStealingQueue otherQueue = queues[i]; + if (otherQueue != localWsq && otherQueue.CanSteal) { - Debug.Assert(null != callback); - break; + callback = otherQueue.TrySteal(ref missedSteal); + if (callback != null) + { + break; + } } c--; } } + + return callback; } - static internal bool Dispatch() + internal static bool Dispatch() { var workQueue = ThreadPoolGlobals.workQueue; // @@ -735,85 +531,66 @@ namespace System.Threading // while ((Environment.TickCount - quantumStartTime) < ThreadPoolGlobals.TP_QUANTUM) { - // - // Dequeue and EnsureThreadRequested must be protected from ThreadAbortException. - // These are fast, so this will not delay aborts/AD-unloads for very long. - // - try { } - finally - { - bool missedSteal = false; - workQueue.Dequeue(tl, out workItem, out missedSteal); - - if (workItem == null) - { - // - // No work. We're going to return to the VM once we leave this protected region. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. This way - // we won't starve other AppDomains while we spin trying to get locks, and hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - needAnotherThread = missedSteal; - } - else - { - // - // If we found work, there may be more work. Ask for another thread so that the other work can be processed - // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. - // - workQueue.EnsureThreadRequested(); - } - } + bool missedSteal = false; + workItem = workQueue.Dequeue(tl, ref missedSteal); if (workItem == null) { + // + // No work. We're going to return to the VM once we leave this protected region. + // If we missed a steal, though, there may be more work in the queue. + // Instead of looping around and trying again, we'll just request another thread. This way + // we won't starve other AppDomains while we spin trying to get locks, and hopefully the thread + // that owns the contended work-stealing queue will pick up its own workitems in the meantime, + // which will be more efficient than this thread doing it anyway. + // + needAnotherThread = missedSteal; + // Tell the VM we're returning normally, not because Hill Climbing asked us to return. return true; } - else - { - if (workQueue.loggingEnabled) - System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem); - // - // Execute the workitem outside of any finally blocks, so that it can be aborted if needed. - // - if (ThreadPoolGlobals.enableWorkerTracking) + if (workQueue.loggingEnabled) + System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem); + + // + // If we found work, there may be more work. Ask for another thread so that the other work can be processed + // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. + // + workQueue.EnsureThreadRequested(); + + // + // Execute the workitem outside of any finally blocks, so that it can be aborted if needed. + // + if (ThreadPoolGlobals.enableWorkerTracking) + { + bool reportedStatus = false; + try { - bool reportedStatus = false; - try - { - try { } - finally - { - ThreadPool.ReportThreadStatus(true); - reportedStatus = true; - } - workItem.ExecuteWorkItem(); - workItem = null; - } - finally - { - if (reportedStatus) - ThreadPool.ReportThreadStatus(false); - } + ThreadPool.ReportThreadStatus(isWorking: true); + reportedStatus = true; + workItem.ExecuteWorkItem(); } - else + finally { - workItem.ExecuteWorkItem(); - workItem = null; + if (reportedStatus) + ThreadPool.ReportThreadStatus(isWorking: false); } - - // - // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants - // us to return the thread to the pool or not. - // - if (!ThreadPool.NotifyWorkItemComplete()) - return false; } + else + { + workItem.ExecuteWorkItem(); + } + workItem = null; + + // + // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants + // us to return the thread to the pool or not. + // + if (!ThreadPool.NotifyWorkItemComplete()) + return false; } + // If we get here, it's because our quantum expired. Tell the VM we're returning normally. return true; } @@ -825,8 +602,7 @@ namespace System.Threading // it was executed or not (in debug builds only). Task uses this to communicate the ThreadAbortException to anyone // who waits for the task to complete. // - if (workItem != null) - workItem.MarkAborted(tae); + workItem?.MarkAborted(tae); // // In this case, the VM is going to request another thread on our behalf. No need to do it twice. @@ -845,11 +621,36 @@ namespace System.Threading } // we can never reach this point, but the C# compiler doesn't know that, because it doesn't know the ThreadAbortException will be reraised above. - Debug.Assert(false); + Debug.Fail("Should never reach this point"); return true; } } + // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast. + internal struct FastRandom // xorshift prng + { + private uint _w, _x, _y, _z; + + public FastRandom(int seed) + { + _x = (uint)seed; + _w = 88675123; + _y = 362436069; + _z = 521288629; + } + + public int Next(int maxValue) + { + Debug.Assert(maxValue > 0); + + uint t = _x ^ (_x << 11); + _x = _y; _y = _z; _z = _w; + _w = _w ^ (_w >> 19) ^ (t ^ (t >> 8)); + + return (int)(_w % (uint)maxValue); + } + } + // Holds a WorkStealingQueue, and remmoves it from the list when this object is no longer referened. internal sealed class ThreadPoolWorkQueueThreadLocals { @@ -858,13 +659,13 @@ namespace System.Threading public readonly ThreadPoolWorkQueue workQueue; public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue; - public readonly Random random = new Random(Thread.CurrentThread.ManagedThreadId); + public FastRandom random = new FastRandom(Thread.CurrentThread.ManagedThreadId); // mutable struct, do not copy or make readonly public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) { workQueue = tpq; workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); - ThreadPoolWorkQueue.allThreadQueues.Add(workStealingQueue); + ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); } private void CleanUp() @@ -873,28 +674,15 @@ namespace System.Threading { if (null != workQueue) { - bool done = false; - while (!done) + IThreadPoolWorkItem cb; + while ((cb = workStealingQueue.LocalPop()) != null) { - // Ensure that we won't be aborted between LocalPop and Enqueue. - try { } - finally - { - IThreadPoolWorkItem cb = null; - if (workStealingQueue.LocalPop(out cb)) - { - Debug.Assert(null != cb); - workQueue.Enqueue(cb, true); - } - else - { - done = true; - } - } + Debug.Assert(null != cb); + workQueue.Enqueue(cb, forceGlobal: true); } } - ThreadPoolWorkQueue.allThreadQueues.Remove(workStealingQueue); + ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); } } @@ -912,34 +700,19 @@ namespace System.Threading internal sealed class RegisteredWaitHandleSafe : CriticalFinalizerObject { - private static IntPtr InvalidHandle - { - get - { - return Win32Native.INVALID_HANDLE_VALUE; - } - } - private IntPtr registeredWaitHandle; + private static IntPtr InvalidHandle => Win32Native.INVALID_HANDLE_VALUE; + private IntPtr registeredWaitHandle = InvalidHandle; private WaitHandle m_internalWaitObject; private bool bReleaseNeeded = false; private volatile int m_lock = 0; - internal RegisteredWaitHandleSafe() - { - registeredWaitHandle = InvalidHandle; - } - - internal IntPtr GetHandle() - { - return registeredWaitHandle; - } + internal IntPtr GetHandle() => registeredWaitHandle; internal void SetHandle(IntPtr handle) { registeredWaitHandle = handle; } - [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)] internal void SetWaitObject(WaitHandle waitObject) { // needed for DangerousAddRef @@ -957,7 +730,6 @@ namespace System.Threading } } - [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)] internal bool Unregister( WaitHandle waitObject // object to be notified when all callbacks to delegates have completed ) @@ -1009,10 +781,8 @@ namespace System.Threading return result; } - private bool ValidHandle() - { - return (registeredWaitHandle != InvalidHandle && registeredWaitHandle != IntPtr.Zero); - } + private bool ValidHandle() => + registeredWaitHandle != InvalidHandle && registeredWaitHandle != IntPtr.Zero; ~RegisteredWaitHandleSafe() { @@ -1071,9 +841,8 @@ namespace System.Threading private static extern bool UnregisterWaitNative(IntPtr handle, SafeHandle waitObject); } -[System.Runtime.InteropServices.ComVisible(true)] public sealed class RegisteredWaitHandle : MarshalByRefObject { - private RegisteredWaitHandleSafe internalRegisteredWait; + private readonly RegisteredWaitHandleSafe internalRegisteredWait; internal RegisteredWaitHandle() { @@ -1090,8 +859,6 @@ namespace System.Threading internalRegisteredWait.SetWaitObject(waitObject); } - -[System.Runtime.InteropServices.ComVisible(true)] // This is the only public method on this class public bool Unregister( WaitHandle waitObject // object to be notified when all callbacks to delegates have completed @@ -1101,10 +868,8 @@ namespace System.Threading } } - [System.Runtime.InteropServices.ComVisible(true)] public delegate void WaitCallback(Object state); - [System.Runtime.InteropServices.ComVisible(true)] public delegate void WaitOrTimerCallback(Object state, bool timedOut); // signalled or timed out // @@ -1115,10 +880,7 @@ namespace System.Threading // internal static class _ThreadPoolWaitCallback { - static internal bool PerformWaitCallback() - { - return ThreadPoolWorkQueue.Dispatch(); - } + internal static bool PerformWaitCallback() => ThreadPoolWorkQueue.Dispatch(); } // @@ -1138,11 +900,9 @@ namespace System.Threading internal sealed class QueueUserWorkItemCallback : IThreadPoolWorkItem { - static QueueUserWorkItemCallback() {} - private WaitCallback callback; - private ExecutionContext context; - private Object state; + private readonly ExecutionContext context; + private readonly Object state; #if DEBUG volatile int executed; @@ -1173,7 +933,7 @@ namespace System.Threading void IThreadPoolWorkItem.ExecuteWorkItem() { #if DEBUG - MarkExecuted(false); + MarkExecuted(aborted:false); #endif // call directly if it is an unsafe call OR EC flow is suppressed if (context == null) @@ -1184,7 +944,7 @@ namespace System.Threading } else { - ExecutionContext.Run(context, ccb, this, true); + ExecutionContext.Run(context, ccb, this); } } @@ -1193,16 +953,16 @@ namespace System.Threading #if DEBUG // this workitem didn't execute because we got a ThreadAbortException prior to the call to ExecuteWorkItem. // This counts as being executed for our purposes. - MarkExecuted(true); + MarkExecuted(aborted:true); #endif } - static internal ContextCallback ccb = new ContextCallback(WaitCallback_Context); + internal static readonly ContextCallback ccb = new ContextCallback(WaitCallback_Context); - static private void WaitCallback_Context(Object state) + private static void WaitCallback_Context(Object state) { QueueUserWorkItemCallback obj = (QueueUserWorkItemCallback)state; - WaitCallback wc = obj.callback as WaitCallback; + WaitCallback wc = obj.callback; Debug.Assert(null != wc); wc(obj.state); } @@ -1210,10 +970,8 @@ namespace System.Threading internal sealed class QueueUserWorkItemCallbackDefaultContext : IThreadPoolWorkItem { - static QueueUserWorkItemCallbackDefaultContext() { } - private WaitCallback callback; - private Object state; + private readonly Object state; #if DEBUG private volatile int executed; @@ -1243,9 +1001,9 @@ namespace System.Threading void IThreadPoolWorkItem.ExecuteWorkItem() { #if DEBUG - MarkExecuted(false); + MarkExecuted(aborted:false); #endif - ExecutionContext.Run(ExecutionContext.PreAllocatedDefault, ccb, this, true); + ExecutionContext.Run(ExecutionContext.Default, ccb, this); } void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae) @@ -1253,16 +1011,16 @@ namespace System.Threading #if DEBUG // this workitem didn't execute because we got a ThreadAbortException prior to the call to ExecuteWorkItem. // This counts as being executed for our purposes. - MarkExecuted(true); + MarkExecuted(aborted:true); #endif } - static internal ContextCallback ccb = new ContextCallback(WaitCallback_Context); + internal static readonly ContextCallback ccb = new ContextCallback(WaitCallback_Context); - static private void WaitCallback_Context(Object state) + private static void WaitCallback_Context(Object state) { QueueUserWorkItemCallbackDefaultContext obj = (QueueUserWorkItemCallbackDefaultContext)state; - WaitCallback wc = obj.callback as WaitCallback; + WaitCallback wc = obj.callback; Debug.Assert(null != wc); obj.callback = null; wc(obj.state); @@ -1271,46 +1029,38 @@ namespace System.Threading internal class _ThreadPoolWaitOrTimerCallback { - static _ThreadPoolWaitOrTimerCallback() {} - WaitOrTimerCallback _waitOrTimerCallback; ExecutionContext _executionContext; Object _state; - static private ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t); - static private ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f); + private static readonly ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t); + private static readonly ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f); - internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, Object state, bool compressStack, ref StackCrawlMark stackMark) + internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, Object state, bool compressStack) { _waitOrTimerCallback = waitOrTimerCallback; _state = state; - if (compressStack && !ExecutionContext.IsFlowSuppressed()) + if (compressStack) { // capture the exection context - _executionContext = ExecutionContext.Capture( - ref stackMark, - ExecutionContext.CaptureOptions.IgnoreSyncCtx | ExecutionContext.CaptureOptions.OptimizeDefaultCase); + _executionContext = ExecutionContext.Capture(); } } - static private void WaitOrTimerCallback_Context_t(Object state) - { - WaitOrTimerCallback_Context(state, true); - } + private static void WaitOrTimerCallback_Context_t(Object state) => + WaitOrTimerCallback_Context(state, timedOut:true); - static private void WaitOrTimerCallback_Context_f(Object state) - { - WaitOrTimerCallback_Context(state, false); - } + private static void WaitOrTimerCallback_Context_f(Object state) => + WaitOrTimerCallback_Context(state, timedOut:false); - static private void WaitOrTimerCallback_Context(Object state, bool timedOut) + private static void WaitOrTimerCallback_Context(Object state, bool timedOut) { _ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state; helper._waitOrTimerCallback(helper._state, timedOut); } // call back helper - static internal void PerformWaitOrTimerCallback(Object state, bool timedOut) + internal static void PerformWaitOrTimerCallback(Object state, bool timedOut) { _ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state; Debug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!"); @@ -1322,20 +1072,13 @@ namespace System.Threading } else { - using (ExecutionContext executionContext = helper._executionContext.CreateCopy()) - { - if (timedOut) - ExecutionContext.Run(executionContext, _ccbt, helper, true); - else - ExecutionContext.Run(executionContext, _ccbf, helper, true); - } + ExecutionContext.Run(helper._executionContext, timedOut ? _ccbt : _ccbf, helper); } } } [CLSCompliant(false)] - [System.Runtime.InteropServices.ComVisible(true)] unsafe public delegate void IOCompletionCallback(uint errorCode, // Error code uint numBytes, // No. of bytes transferred NativeOverlapped* pOVERLAP // ptr to OVERLAP structure @@ -1343,7 +1086,6 @@ namespace System.Threading public static class ThreadPool { - public static bool SetMaxThreads(int workerThreads, int completionPortThreads) { return SetMaxThreadsNative(workerThreads, completionPortThreads); @@ -1412,7 +1154,7 @@ namespace System.Threading if (callBack != null) { - _ThreadPoolWaitOrTimerCallback callBackHelper = new _ThreadPoolWaitOrTimerCallback(callBack, state, compressStack, ref stackMark); + _ThreadPoolWaitOrTimerCallback callBackHelper = new _ThreadPoolWaitOrTimerCallback(callBack, state, compressStack); state = (Object)callBackHelper; // call SetWaitObject before native call so that waitObject won't be closed before threadpoolmgr registration // this could occur if callback were to fire before SetWaitObject does its addref @@ -1533,141 +1275,84 @@ namespace System.Threading StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)tm,executeOnlyOnce,ref stackMark,false); } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // Methods containing StackCrawlMark local var has to be marked non-inlineable - public static bool QueueUserWorkItem( - WaitCallback callBack, // NOTE: we do not expose options that allow the callback to be queued as an APC - Object state - ) - { - StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; - return QueueUserWorkItemHelper(callBack,state,ref stackMark,true); - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // Methods containing StackCrawlMark local var has to be marked non-inlineable - public static bool QueueUserWorkItem( - WaitCallback callBack // NOTE: we do not expose options that allow the callback to be queued as an APC - ) - { - StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; - return QueueUserWorkItemHelper(callBack,null,ref stackMark,true); - } - - [MethodImplAttribute(MethodImplOptions.NoInlining)] // Methods containing StackCrawlMark local var has to be marked non-inlineable - public static bool UnsafeQueueUserWorkItem( - WaitCallback callBack, // NOTE: we do not expose options that allow the callback to be queued as an APC - Object state - ) - { - StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; - return QueueUserWorkItemHelper(callBack,state,ref stackMark,false); - } - //ThreadPool has per-appdomain managed queue of work-items. The VM is - //responsible for just scheduling threads into appdomains. After that - //work-items are dispatched from the managed queue. - private static bool QueueUserWorkItemHelper(WaitCallback callBack, Object state, ref StackCrawlMark stackMark, bool compressStack ) - { - bool success = true; + public static bool QueueUserWorkItem(WaitCallback callBack) => + QueueUserWorkItem(callBack, null); - if (callBack != null) + public static bool QueueUserWorkItem(WaitCallback callBack, object state) + { + if (callBack == null) { - //The thread pool maintains a per-appdomain managed work queue. - //New thread pool entries are added in the managed queue. - //The VM is responsible for the actual growing/shrinking of - //threads. + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } - EnsureVMInitialized(); + EnsureVMInitialized(); - // - // If we are able to create the workitem, we need to get it in the queue without being interrupted - // by a ThreadAbortException. - // - try { } - finally - { - ExecutionContext context = compressStack && !ExecutionContext.IsFlowSuppressed() ? - ExecutionContext.Capture(ref stackMark, ExecutionContext.CaptureOptions.IgnoreSyncCtx | ExecutionContext.CaptureOptions.OptimizeDefaultCase) : - null; + ExecutionContext context = ExecutionContext.Capture(); - IThreadPoolWorkItem tpcallBack = context == ExecutionContext.PreAllocatedDefault ? - new QueueUserWorkItemCallbackDefaultContext(callBack, state) : - (IThreadPoolWorkItem)new QueueUserWorkItemCallback(callBack, state, context); + IThreadPoolWorkItem tpcallBack = context == ExecutionContext.Default ? + new QueueUserWorkItemCallbackDefaultContext(callBack, state) : + (IThreadPoolWorkItem)new QueueUserWorkItemCallback(callBack, state, context); - ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, true); - success = true; - } - } - else + ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true); + + return true; + } + + public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, Object state) + { + if (callBack == null) { - throw new ArgumentNullException(nameof(WaitCallback)); + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); } - return success; + + EnsureVMInitialized(); + + IThreadPoolWorkItem tpcallBack = new QueueUserWorkItemCallback(callBack, state, null); + + ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true); + + return true; } internal static void UnsafeQueueCustomWorkItem(IThreadPoolWorkItem workItem, bool forceGlobal) { Debug.Assert(null != workItem); EnsureVMInitialized(); - - // - // Enqueue needs to be protected from ThreadAbort - // - try { } - finally - { - ThreadPoolGlobals.workQueue.Enqueue(workItem, forceGlobal); - } + ThreadPoolGlobals.workQueue.Enqueue(workItem, forceGlobal); } // This method tries to take the target callback out of the current thread's queue. internal static bool TryPopCustomWorkItem(IThreadPoolWorkItem workItem) { Debug.Assert(null != workItem); - if (!ThreadPoolGlobals.vmTpInitialized) - return false; //Not initialized, so there's no way this workitem was ever queued. - return ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem); + return + ThreadPoolGlobals.vmTpInitialized && // if not initialized, so there's no way this workitem was ever queued. + ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem); } // Get all workitems. Called by TaskScheduler in its debugger hooks. internal static IEnumerable<IThreadPoolWorkItem> GetQueuedWorkItems() { - return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail); - } - - internal static IEnumerable<IThreadPoolWorkItem> EnumerateQueuedWorkItems(ThreadPoolWorkQueue.WorkStealingQueue[] wsQueues, ThreadPoolWorkQueue.QueueSegment globalQueueTail) - { - if (wsQueues != null) + // Enumerate global queue + foreach (IThreadPoolWorkItem workItem in ThreadPoolGlobals.workQueue.workItems) { - // First, enumerate all workitems in thread-local queues. - foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in wsQueues) - { - if (wsq != null && wsq.m_array != null) - { - IThreadPoolWorkItem[] items = wsq.m_array; - for (int i = 0; i < items.Length; i++) - { - IThreadPoolWorkItem item = items[i]; - if (item != null) - yield return item; - } - } - } + yield return workItem; } - if (globalQueueTail != null) + // Enumerate each local queue + foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues) { - // Now the global queue - for (ThreadPoolWorkQueue.QueueSegment segment = globalQueueTail; - segment != null; - segment = segment.Next) + if (wsq != null && wsq.m_array != null) { - IThreadPoolWorkItem[] items = segment.nodes; + IThreadPoolWorkItem[] items = wsq.m_array; for (int i = 0; i < items.Length; i++) { IThreadPoolWorkItem item = items[i]; if (item != null) + { yield return item; + } } } } @@ -1675,13 +1360,20 @@ namespace System.Threading internal static IEnumerable<IThreadPoolWorkItem> GetLocallyQueuedWorkItems() { - return EnumerateQueuedWorkItems(new ThreadPoolWorkQueue.WorkStealingQueue[] { ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue }, null); + ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue; + if (wsq != null && wsq.m_array != null) + { + IThreadPoolWorkItem[] items = wsq.m_array; + for (int i = 0; i < items.Length; i++) + { + IThreadPoolWorkItem item = items[i]; + if (item != null) + yield return item; + } + } } - internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems() - { - return EnumerateQueuedWorkItems(null, ThreadPoolGlobals.workQueue.queueTail); - } + internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems; private static object[] ToObjectArray(IEnumerable<IThreadPoolWorkItem> workitems) { @@ -1705,20 +1397,14 @@ namespace System.Threading // This is the method the debugger will actually call, if it ends up calling // into ThreadPool directly. Tests can use this to simulate a debugger, as well. - internal static object[] GetQueuedWorkItemsForDebugger() - { - return ToObjectArray(GetQueuedWorkItems()); - } + internal static object[] GetQueuedWorkItemsForDebugger() => + ToObjectArray(GetQueuedWorkItems()); - internal static object[] GetGloballyQueuedWorkItemsForDebugger() - { - return ToObjectArray(GetGloballyQueuedWorkItems()); - } + internal static object[] GetGloballyQueuedWorkItemsForDebugger() => + ToObjectArray(GetGloballyQueuedWorkItems()); - internal static object[] GetLocallyQueuedWorkItemsForDebugger() - { - return ToObjectArray(GetLocallyQueuedWorkItems()); - } + internal static object[] GetLocallyQueuedWorkItemsForDebugger() => + ToObjectArray(GetLocallyQueuedWorkItems()); [DllImport(JitHelpers.QCall, CharSet = CharSet.Unicode)] [SuppressUnmanagedCodeSecurity] @@ -1728,19 +1414,26 @@ namespace System.Threading unsafe private static extern bool PostQueuedCompletionStatus(NativeOverlapped* overlapped); [CLSCompliant(false)] - unsafe public static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped) - { - return PostQueuedCompletionStatus(overlapped); - } + unsafe public static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped) => + PostQueuedCompletionStatus(overlapped); + // The thread pool maintains a per-appdomain managed work queue. + // New thread pool entries are added in the managed queue. + // The VM is responsible for the actual growing/shrinking of + // threads. private static void EnsureVMInitialized() { if (!ThreadPoolGlobals.vmTpInitialized) { - ThreadPool.InitializeVMTp(ref ThreadPoolGlobals.enableWorkerTracking); - ThreadPoolGlobals.vmTpInitialized = true; + EnsureVMInitializedCore(); // separate out to help with inlining } } + + private static void EnsureVMInitializedCore() + { + ThreadPool.InitializeVMTp(ref ThreadPoolGlobals.enableWorkerTracking); + ThreadPoolGlobals.vmTpInitialized = true; + } // Native methods: @@ -1795,9 +1488,7 @@ namespace System.Threading [Obsolete("ThreadPool.BindHandle(IntPtr) has been deprecated. Please use ThreadPool.BindHandle(SafeHandle) instead.", false)] - public static bool BindHandle( - IntPtr osHandle - ) + public static bool BindHandle(IntPtr osHandle) { return BindIOCompletionCallbackNative(osHandle); } @@ -1822,7 +1513,6 @@ namespace System.Threading } [MethodImplAttribute(MethodImplOptions.InternalCall)] - [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)] private static extern bool BindIOCompletionCallbackNative(IntPtr fileHandle); } } |