diff options
Diffstat (limited to 'src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs')
-rw-r--r-- | src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs | 1250 |
1 files changed, 1250 insertions, 0 deletions
diff --git a/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs new file mode 100644 index 0000000000..e0447c5599 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/ThreadPool.cs @@ -0,0 +1,1250 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +/*============================================================================= +** +** +** +** Purpose: Class for creating and managing a threadpool +** +** +=============================================================================*/ + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Tracing; +using System.Runtime.CompilerServices; +using System.Runtime.ConstrainedExecution; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Internal.Runtime.CompilerServices; + +using Thread = Internal.Runtime.Augments.RuntimeThread; + +namespace System.Threading +{ + internal static class ThreadPoolGlobals + { + public static readonly int processorCount = Environment.ProcessorCount; + + public static volatile bool threadPoolInitialized; + public static bool enableWorkerTracking; + + public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue(); + + /// <summary>Shim used to invoke <see cref="IAsyncStateMachineBox.MoveNext"/> of the supplied <see cref="IAsyncStateMachineBox"/>.</summary> + internal static readonly Action<object> s_invokeAsyncStateMachineBox = state => + { + if (!(state is IAsyncStateMachineBox box)) + { + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state); + return; + } + + box.MoveNext(); + }; + } + + [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing + internal sealed partial class ThreadPoolWorkQueue + { + internal static class WorkStealingQueueList + { + private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0]; + + public static WorkStealingQueue[] Queues => _queues; + + public static void Add(WorkStealingQueue queue) + { + Debug.Assert(queue != null); + while (true) + { + WorkStealingQueue[] oldQueues = _queues; + Debug.Assert(Array.IndexOf(oldQueues, queue) == -1); + + var newQueues = new WorkStealingQueue[oldQueues.Length + 1]; + Array.Copy(oldQueues, 0, newQueues, 0, oldQueues.Length); + newQueues[newQueues.Length - 1] = queue; + if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) + { + break; + } + } + } + + public static void Remove(WorkStealingQueue queue) + { + Debug.Assert(queue != null); + while (true) + { + WorkStealingQueue[] oldQueues = _queues; + if (oldQueues.Length == 0) + { + return; + } + + int pos = Array.IndexOf(oldQueues, queue); + if (pos == -1) + { + Debug.Fail("Should have found the queue"); + return; + } + + var newQueues = new WorkStealingQueue[oldQueues.Length - 1]; + if (pos == 0) + { + Array.Copy(oldQueues, 1, newQueues, 0, newQueues.Length); + } + else if (pos == oldQueues.Length - 1) + { + Array.Copy(oldQueues, 0, newQueues, 0, newQueues.Length); + } + else + { + Array.Copy(oldQueues, 0, newQueues, 0, pos); + Array.Copy(oldQueues, pos + 1, newQueues, pos, newQueues.Length - pos); + } + + if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues) + { + break; + } + } + } + } + + internal sealed class WorkStealingQueue + { + private const int INITIAL_SIZE = 32; + internal volatile object[] m_array = new object[INITIAL_SIZE]; // SOS's ThreadPool command depends on this name + private volatile int m_mask = INITIAL_SIZE - 1; + +#if DEBUG + // in debug builds, start at the end so we exercise the index reset logic. + private const int START_INDEX = int.MaxValue; +#else + private const int START_INDEX = 0; +#endif + + private volatile int m_headIndex = START_INDEX; + private volatile int m_tailIndex = START_INDEX; + + private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false); + + public void LocalPush(object obj) + { + int tail = m_tailIndex; + + // We're going to increment the tail; if we'll overflow, then we need to reset our counts + if (tail == int.MaxValue) + { + bool lockTaken = false; + try + { + m_foreignLock.Enter(ref lockTaken); + + if (m_tailIndex == int.MaxValue) + { + // + // Rather than resetting to zero, we'll just mask off the bits we don't care about. + // This way we don't need to rearrange the items already in the queue; they'll be found + // correctly exactly where they are. One subtlety here is that we need to make sure that + // if head is currently < tail, it remains that way. This happens to just fall out from + // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all + // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible + // for the head to end up > than the tail, since you can't set any more bits than all of + // them. + // + m_headIndex = m_headIndex & m_mask; + m_tailIndex = tail = m_tailIndex & m_mask; + Debug.Assert(m_headIndex <= m_tailIndex); + } + } + finally + { + if (lockTaken) + m_foreignLock.Exit(useMemoryBarrier: true); + } + } + + // When there are at least 2 elements' worth of space, we can take the fast path. + if (tail < m_headIndex + m_mask) + { + Volatile.Write(ref m_array[tail & m_mask], obj); + m_tailIndex = tail + 1; + } + else + { + // We need to contend with foreign pops, so we lock. + bool lockTaken = false; + try + { + m_foreignLock.Enter(ref lockTaken); + + int head = m_headIndex; + int count = m_tailIndex - m_headIndex; + + // If there is still space (one left), just add the element. + if (count >= m_mask) + { + // We're full; expand the queue by doubling its size. + var newArray = new object[m_array.Length << 1]; + for (int i = 0; i < m_array.Length; i++) + newArray[i] = m_array[(i + head) & m_mask]; + + // Reset the field values, incl. the mask. + m_array = newArray; + m_headIndex = 0; + m_tailIndex = tail = count; + m_mask = (m_mask << 1) | 1; + } + + Volatile.Write(ref m_array[tail & m_mask], obj); + m_tailIndex = tail + 1; + } + finally + { + if (lockTaken) + m_foreignLock.Exit(useMemoryBarrier: false); + } + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + public bool LocalFindAndPop(object obj) + { + // Fast path: check the tail. If equal, we can skip the lock. + if (m_array[(m_tailIndex - 1) & m_mask] == obj) + { + object unused = LocalPop(); + Debug.Assert(unused == null || unused == obj); + return unused != null; + } + + // Else, do an O(N) search for the work item. The theory of work stealing and our + // inlining logic is that most waits will happen on recently queued work. And + // since recently queued work will be close to the tail end (which is where we + // begin our search), we will likely find it quickly. In the worst case, we + // will traverse the whole local queue; this is typically not going to be a + // problem (although degenerate cases are clearly an issue) because local work + // queues tend to be somewhat shallow in length, and because if we fail to find + // the work item, we are about to block anyway (which is very expensive). + for (int i = m_tailIndex - 2; i >= m_headIndex; i--) + { + if (m_array[i & m_mask] == obj) + { + // If we found the element, block out steals to avoid interference. + bool lockTaken = false; + try + { + m_foreignLock.Enter(ref lockTaken); + + // If we encountered a race condition, bail. + if (m_array[i & m_mask] == null) + return false; + + // Otherwise, null out the element. + Volatile.Write(ref m_array[i & m_mask], null); + + // And then check to see if we can fix up the indexes (if we're at + // the edge). If we can't, we just leave nulls in the array and they'll + // get filtered out eventually (but may lead to superfluous resizing). + if (i == m_tailIndex) + m_tailIndex -= 1; + else if (i == m_headIndex) + m_headIndex += 1; + + return true; + } + finally + { + if (lockTaken) + m_foreignLock.Exit(useMemoryBarrier: false); + } + } + } + + return false; + } + + public object LocalPop() => m_headIndex < m_tailIndex ? LocalPopCore() : null; + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private object LocalPopCore() + { + while (true) + { + int tail = m_tailIndex; + if (m_headIndex >= tail) + { + return null; + } + + // Decrement the tail using a fence to ensure subsequent read doesn't come before. + tail -= 1; + Interlocked.Exchange(ref m_tailIndex, tail); + + // If there is no interaction with a take, we can head down the fast path. + if (m_headIndex <= tail) + { + int idx = tail & m_mask; + object obj = Volatile.Read(ref m_array[idx]); + + // Check for nulls in the array. + if (obj == null) continue; + + m_array[idx] = null; + return obj; + } + else + { + // Interaction with takes: 0 or 1 elements left. + bool lockTaken = false; + try + { + m_foreignLock.Enter(ref lockTaken); + + if (m_headIndex <= tail) + { + // Element still available. Take it. + int idx = tail & m_mask; + object obj = Volatile.Read(ref m_array[idx]); + + // Check for nulls in the array. + if (obj == null) continue; + + m_array[idx] = null; + return obj; + } + else + { + // If we encountered a race condition and element was stolen, restore the tail. + m_tailIndex = tail + 1; + return null; + } + } + finally + { + if (lockTaken) + m_foreignLock.Exit(useMemoryBarrier: false); + } + } + } + } + + public bool CanSteal => m_headIndex < m_tailIndex; + + public object TrySteal(ref bool missedSteal) + { + while (true) + { + if (CanSteal) + { + bool taken = false; + try + { + m_foreignLock.TryEnter(ref taken); + if (taken) + { + // Increment head, and ensure read of tail doesn't move before it (fence). + int head = m_headIndex; + Interlocked.Exchange(ref m_headIndex, head + 1); + + if (head < m_tailIndex) + { + int idx = head & m_mask; + object obj = Volatile.Read(ref m_array[idx]); + + // Check for nulls in the array. + if (obj == null) continue; + + m_array[idx] = null; + return obj; + } + else + { + // Failed, restore head. + m_headIndex = head; + } + } + } + finally + { + if (taken) + m_foreignLock.Exit(useMemoryBarrier: false); + } + + missedSteal = true; + } + + return null; + } + } + } + + internal bool loggingEnabled; + internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>(); // SOS's ThreadPool command depends on this name + + private Internal.PaddingFor32 pad1; + + private volatile int numOutstandingThreadRequests = 0; + + private Internal.PaddingFor32 pad2; + + public ThreadPoolWorkQueue() + { + loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); + } + + public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() => + ThreadPoolWorkQueueThreadLocals.threadLocals ?? CreateThreadLocals(); + + [MethodImpl(MethodImplOptions.NoInlining)] + private ThreadPoolWorkQueueThreadLocals CreateThreadLocals() + { + Debug.Assert(ThreadPoolWorkQueueThreadLocals.threadLocals == null); + + return (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this)); + } + + internal void EnsureThreadRequested() + { + // + // If we have not yet requested #procs threads, then request a new thread. + // + // CoreCLR: Note that there is a separate count in the VM which has already been incremented + // by the VM by the time we reach this point. + // + int count = numOutstandingThreadRequests; + while (count < ThreadPoolGlobals.processorCount) + { + int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count); + if (prev == count) + { + ThreadPool.RequestWorkerThread(); + break; + } + count = prev; + } + } + + internal void MarkThreadRequestSatisfied() + { + // + // One of our outstanding thread requests has been satisfied. + // Decrement the count so that future calls to EnsureThreadRequested will succeed. + // + // CoreCLR: Note that there is a separate count in the VM which has already been decremented + // by the VM by the time we reach this point. + // + int count = numOutstandingThreadRequests; + while (count > 0) + { + int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count); + if (prev == count) + { + break; + } + count = prev; + } + } + + public void Enqueue(object callback, bool forceGlobal) + { + Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); + + if (loggingEnabled) + System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); + + ThreadPoolWorkQueueThreadLocals tl = null; + if (!forceGlobal) + tl = ThreadPoolWorkQueueThreadLocals.threadLocals; + + if (null != tl) + { + tl.workStealingQueue.LocalPush(callback); + } + else + { + workItems.Enqueue(callback); + } + + EnsureThreadRequested(); + } + + internal bool LocalFindAndPop(object callback) + { + ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals; + return tl != null && tl.workStealingQueue.LocalFindAndPop(callback); + } + + public object Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) + { + WorkStealingQueue localWsq = tl.workStealingQueue; + object callback; + + if ((callback = localWsq.LocalPop()) == null && // first try the local queue + !workItems.TryDequeue(out callback)) // then try the global queue + { + // finally try to steal from another thread's local queue + WorkStealingQueue[] queues = WorkStealingQueueList.Queues; + int c = queues.Length; + Debug.Assert(c > 0, "There must at least be a queue for this thread."); + int maxIndex = c - 1; + int i = tl.random.Next(c); + while (c > 0) + { + i = (i < maxIndex) ? i + 1 : 0; + WorkStealingQueue otherQueue = queues[i]; + if (otherQueue != localWsq && otherQueue.CanSteal) + { + callback = otherQueue.TrySteal(ref missedSteal); + if (callback != null) + { + break; + } + } + c--; + } + } + + return callback; + } + + /// <summary> + /// Dispatches work items to this thread. + /// </summary> + /// <returns> + /// <c>true</c> if this thread did as much work as was available or its quantum expired. + /// <c>false</c> if this thread stopped working early. + /// </returns> + internal static bool Dispatch() + { + ThreadPoolWorkQueue outerWorkQueue = ThreadPoolGlobals.workQueue; + + // + // Save the start time + // + int startTickCount = Environment.TickCount; + + // + // Update our records to indicate that an outstanding request for a thread has now been fulfilled. + // From this point on, we are responsible for requesting another thread if we stop working for any + // reason, and we believe there might still be work in the queue. + // + // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will + // record a thread request on our behalf. So we don't need to worry about getting aborted right here. + // + outerWorkQueue.MarkThreadRequestSatisfied(); + + // Has the desire for logging changed since the last time we entered? + outerWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); + + // + // Assume that we're going to need another thread if this one returns to the VM. We'll set this to + // false later, but only if we're absolutely certain that the queue is empty. + // + bool needAnotherThread = true; + object outerWorkItem = null; + try + { + // + // Set up our thread-local data + // + // Use operate on workQueue local to try block so it can be enregistered + ThreadPoolWorkQueue workQueue = outerWorkQueue; + ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals(); + Thread currentThread = tl.currentThread; + + // Start on clean ExecutionContext and SynchronizationContext + currentThread.ExecutionContext = null; + currentThread.SynchronizationContext = null; + + // + // Loop until our quantum expires or there is no work. + // + while (ThreadPool.KeepDispatching(startTickCount)) + { + bool missedSteal = false; + // Use operate on workItem local to try block so it can be enregistered + object workItem = outerWorkItem = workQueue.Dequeue(tl, ref missedSteal); + + if (workItem == null) + { + // + // No work. + // If we missed a steal, though, there may be more work in the queue. + // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread + // that owns the contended work-stealing queue will pick up its own workitems in the meantime, + // which will be more efficient than this thread doing it anyway. + // + needAnotherThread = missedSteal; + + // Tell the VM we're returning normally, not because Hill Climbing asked us to return. + return true; + } + + if (workQueue.loggingEnabled) + System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem); + + // + // If we found work, there may be more work. Ask for another thread so that the other work can be processed + // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue. + // + workQueue.EnsureThreadRequested(); + + // + // Execute the workitem outside of any finally blocks, so that it can be aborted if needed. + // + if (ThreadPoolGlobals.enableWorkerTracking) + { + bool reportedStatus = false; + try + { + ThreadPool.ReportThreadStatus(isWorking: true); + reportedStatus = true; + if (workItem is Task task) + { + task.ExecuteFromThreadPool(currentThread); + } + else + { + Debug.Assert(workItem is IThreadPoolWorkItem); + Unsafe.As<IThreadPoolWorkItem>(workItem).Execute(); + } + } + finally + { + if (reportedStatus) + ThreadPool.ReportThreadStatus(isWorking: false); + } + } + else if (workItem is Task task) + { + // Check for Task first as it's currently faster to type check + // for Task and then Unsafe.As for the interface, rather than + // vice versa, in particular when the object implements a bunch + // of interfaces. + task.ExecuteFromThreadPool(currentThread); + } + else + { + Debug.Assert(workItem is IThreadPoolWorkItem); + Unsafe.As<IThreadPoolWorkItem>(workItem).Execute(); + } + + currentThread.ResetThreadPoolThread(); + + // Release refs + outerWorkItem = workItem = null; + + // Return to clean ExecutionContext and SynchronizationContext + ExecutionContext.ResetThreadPoolThread(currentThread); + + // + // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants + // us to return the thread to the pool or not. + // + if (!ThreadPool.NotifyWorkItemComplete()) + return false; + } + + // If we get here, it's because our quantum expired. Tell the VM we're returning normally. + return true; + } + finally + { + // + // If we are exiting for any reason other than that the queue is definitely empty, ask for another + // thread to pick up where we left off. + // + if (needAnotherThread) + outerWorkQueue.EnsureThreadRequested(); + } + } + } + + // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast. + internal struct FastRandom // xorshift prng + { + private uint _w, _x, _y, _z; + + public FastRandom(int seed) + { + _x = (uint)seed; + _w = 88675123; + _y = 362436069; + _z = 521288629; + } + + public int Next(int maxValue) + { + Debug.Assert(maxValue > 0); + + uint t = _x ^ (_x << 11); + _x = _y; _y = _z; _z = _w; + _w = _w ^ (_w >> 19) ^ (t ^ (t >> 8)); + + return (int)(_w % (uint)maxValue); + } + } + + // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced. + internal sealed class ThreadPoolWorkQueueThreadLocals + { + [ThreadStatic] + public static ThreadPoolWorkQueueThreadLocals threadLocals; + + public readonly ThreadPoolWorkQueue workQueue; + public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue; + public readonly Thread currentThread; + public FastRandom random = new FastRandom(Thread.CurrentThread.ManagedThreadId); // mutable struct, do not copy or make readonly + + public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) + { + workQueue = tpq; + workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); + ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); + currentThread = Thread.CurrentThread; + } + + private void CleanUp() + { + if (null != workStealingQueue) + { + if (null != workQueue) + { + object cb; + while ((cb = workStealingQueue.LocalPop()) != null) + { + Debug.Assert(null != cb); + workQueue.Enqueue(cb, forceGlobal: true); + } + } + + ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); + } + } + + ~ThreadPoolWorkQueueThreadLocals() + { + // Since the purpose of calling CleanUp is to transfer any pending workitems into the global + // queue so that they will be executed by another thread, there's no point in doing this cleanup + // if we're in the process of shutting down or unloading the AD. In those cases, the work won't + // execute anyway. And there are subtle race conditions involved there that would lead us to do the wrong + // thing anyway. So we'll only clean up if this is a "normal" finalization. + if (!Environment.HasShutdownStarted) + CleanUp(); + } + } + + public delegate void WaitCallback(object state); + + public delegate void WaitOrTimerCallback(object state, bool timedOut); // signaled or timed out + + internal abstract class QueueUserWorkItemCallbackBase : IThreadPoolWorkItem + { +#if DEBUG + private volatile int executed; + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1821:RemoveEmptyFinalizers")] + ~QueueUserWorkItemCallbackBase() + { + Debug.Assert( + executed != 0 || Environment.HasShutdownStarted, + "A QueueUserWorkItemCallback was never called!"); + } +#endif + + public virtual void Execute() + { +#if DEBUG + GC.SuppressFinalize(this); + Debug.Assert( + 0 == Interlocked.Exchange(ref executed, 1), + "A QueueUserWorkItemCallback was called twice!"); +#endif + } + } + + internal sealed class QueueUserWorkItemCallback : QueueUserWorkItemCallbackBase + { + private WaitCallback _callback; // SOS's ThreadPool command depends on this name + private readonly object _state; + private readonly ExecutionContext _context; + + private static readonly Action<QueueUserWorkItemCallback> s_executionContextShim = quwi => + { + WaitCallback callback = quwi._callback; + quwi._callback = null; + + callback(quwi._state); + }; + + internal QueueUserWorkItemCallback(WaitCallback callback, object state, ExecutionContext context) + { + Debug.Assert(context != null); + + _callback = callback; + _state = state; + _context = context; + } + + public override void Execute() + { + base.Execute(); + + ExecutionContext.RunForThreadPoolUnsafe(_context, s_executionContextShim, this); + } + } + + internal sealed class QueueUserWorkItemCallback<TState> : QueueUserWorkItemCallbackBase + { + private Action<TState> _callback; // SOS's ThreadPool command depends on this name + private readonly TState _state; + private readonly ExecutionContext _context; + + internal QueueUserWorkItemCallback(Action<TState> callback, TState state, ExecutionContext context) + { + Debug.Assert(callback != null); + + _callback = callback; + _state = state; + _context = context; + } + + public override void Execute() + { + base.Execute(); + + Action<TState> callback = _callback; + _callback = null; + + ExecutionContext.RunForThreadPoolUnsafe(_context, callback, in _state); + } + } + + internal sealed class QueueUserWorkItemCallbackDefaultContext : QueueUserWorkItemCallbackBase + { + private WaitCallback _callback; // SOS's ThreadPool command depends on this name + private readonly object _state; + + internal QueueUserWorkItemCallbackDefaultContext(WaitCallback callback, object state) + { + Debug.Assert(callback != null); + + _callback = callback; + _state = state; + } + + public override void Execute() + { + ExecutionContext.CheckThreadPoolAndContextsAreDefault(); + base.Execute(); + + WaitCallback callback = _callback; + _callback = null; + + callback(_state); + + // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default + } + } + + internal sealed class QueueUserWorkItemCallbackDefaultContext<TState> : QueueUserWorkItemCallbackBase + { + private Action<TState> _callback; // SOS's ThreadPool command depends on this name + private readonly TState _state; + + internal QueueUserWorkItemCallbackDefaultContext(Action<TState> callback, TState state) + { + Debug.Assert(callback != null); + + _callback = callback; + _state = state; + } + + public override void Execute() + { + ExecutionContext.CheckThreadPoolAndContextsAreDefault(); + base.Execute(); + + Action<TState> callback = _callback; + _callback = null; + + callback(_state); + + // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default + } + } + + internal class _ThreadPoolWaitOrTimerCallback + { + private WaitOrTimerCallback _waitOrTimerCallback; + private ExecutionContext _executionContext; + private object _state; + private static readonly ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t); + private static readonly ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f); + + internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, object state, bool flowExecutionContext) + { + _waitOrTimerCallback = waitOrTimerCallback; + _state = state; + + if (flowExecutionContext) + { + // capture the exection context + _executionContext = ExecutionContext.Capture(); + } + } + + private static void WaitOrTimerCallback_Context_t(object state) => + WaitOrTimerCallback_Context(state, timedOut: true); + + private static void WaitOrTimerCallback_Context_f(object state) => + WaitOrTimerCallback_Context(state, timedOut: false); + + private static void WaitOrTimerCallback_Context(object state, bool timedOut) + { + _ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state; + helper._waitOrTimerCallback(helper._state, timedOut); + } + + // call back helper + internal static void PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback helper, bool timedOut) + { + Debug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!"); + // call directly if it is an unsafe call OR EC flow is suppressed + ExecutionContext context = helper._executionContext; + if (context == null) + { + WaitOrTimerCallback callback = helper._waitOrTimerCallback; + callback(helper._state, timedOut); + } + else + { + ExecutionContext.Run(context, timedOut ? _ccbt : _ccbf, helper); + } + } + } + + public static partial class ThreadPool + { + [CLSCompliant(false)] + public static RegisteredWaitHandle RegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + uint millisecondsTimeOutInterval, + bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC + ) + { + if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue) + throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, true); + } + + [CLSCompliant(false)] + public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + uint millisecondsTimeOutInterval, + bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC + ) + { + if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue) + throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, false); + } + + public static RegisteredWaitHandle RegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + int millisecondsTimeOutInterval, + bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC + ) + { + if (millisecondsTimeOutInterval < -1) + throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true); + } + + public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + int millisecondsTimeOutInterval, + bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC + ) + { + if (millisecondsTimeOutInterval < -1) + throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false); + } + + public static RegisteredWaitHandle RegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + long millisecondsTimeOutInterval, + bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC + ) + { + if (millisecondsTimeOutInterval < -1) + throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true); + } + + public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + long millisecondsTimeOutInterval, + bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC + ) + { + if (millisecondsTimeOutInterval < -1) + throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false); + } + + public static RegisteredWaitHandle RegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + TimeSpan timeout, + bool executeOnlyOnce + ) + { + long tm = (long)timeout.TotalMilliseconds; + if (tm < -1) + throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + if (tm > (long)int.MaxValue) + throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal); + return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, true); + } + + public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( + WaitHandle waitObject, + WaitOrTimerCallback callBack, + object state, + TimeSpan timeout, + bool executeOnlyOnce + ) + { + long tm = (long)timeout.TotalMilliseconds; + if (tm < -1) + throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1); + if (tm > (long)int.MaxValue) + throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal); + return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, false); + } + + public static bool QueueUserWorkItem(WaitCallback callBack) => + QueueUserWorkItem(callBack, null); + + public static bool QueueUserWorkItem(WaitCallback callBack, object state) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + + EnsureInitialized(); + + ExecutionContext context = ExecutionContext.Capture(); + + object tpcallBack = (context == null || context.IsDefault) ? + new QueueUserWorkItemCallbackDefaultContext(callBack, state) : + (object)new QueueUserWorkItemCallback(callBack, state, context); + + ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true); + + return true; + } + + public static bool QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + + EnsureInitialized(); + + ExecutionContext context = ExecutionContext.Capture(); + + object tpcallBack = (context == null || context.IsDefault) ? + new QueueUserWorkItemCallbackDefaultContext<TState>(callBack, state) : + (object)new QueueUserWorkItemCallback<TState>(callBack, state, context); + + ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: !preferLocal); + + return true; + } + + public static bool UnsafeQueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + + // If the callback is the runtime-provided invocation of an IAsyncStateMachineBox, + // then we can queue the Task state directly to the ThreadPool instead of + // wrapping it in a QueueUserWorkItemCallback. + // + // This occurs when user code queues its provided continuation to the ThreadPool; + // internally we call UnsafeQueueUserWorkItemInternal directly for Tasks. + if (ReferenceEquals(callBack, ThreadPoolGlobals.s_invokeAsyncStateMachineBox)) + { + if (!(state is IAsyncStateMachineBox)) + { + // The provided state must be the internal IAsyncStateMachineBox (Task) type + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state); + } + + UnsafeQueueUserWorkItemInternal((object)state, preferLocal); + return true; + } + + EnsureInitialized(); + + ThreadPoolGlobals.workQueue.Enqueue( + new QueueUserWorkItemCallbackDefaultContext<TState>(callBack, state), forceGlobal: !preferLocal); + + return true; + } + + public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object state) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + + EnsureInitialized(); + + object tpcallBack = new QueueUserWorkItemCallbackDefaultContext(callBack, state); + + ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true); + + return true; + } + + public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool preferLocal) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + if (callBack is Task) + { + // Prevent code from queueing a derived Task that also implements the interface, + // as that would bypass Task.Start and its safety checks. + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.callBack); + } + + UnsafeQueueUserWorkItemInternal(callBack, preferLocal); + return true; + } + + internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal) + { + Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task)); + + EnsureInitialized(); + + ThreadPoolGlobals.workQueue.Enqueue(callBack, forceGlobal: !preferLocal); + } + + // This method tries to take the target callback out of the current thread's queue. + internal static bool TryPopCustomWorkItem(object workItem) + { + Debug.Assert(null != workItem); + return + ThreadPoolGlobals.threadPoolInitialized && // if not initialized, so there's no way this workitem was ever queued. + ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem); + } + + // Get all workitems. Called by TaskScheduler in its debugger hooks. + internal static IEnumerable<object> GetQueuedWorkItems() + { + // Enumerate global queue + foreach (object workItem in ThreadPoolGlobals.workQueue.workItems) + { + yield return workItem; + } + + // Enumerate each local queue + foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues) + { + if (wsq != null && wsq.m_array != null) + { + object[] items = wsq.m_array; + for (int i = 0; i < items.Length; i++) + { + object item = items[i]; + if (item != null) + { + yield return item; + } + } + } + } + } + + internal static IEnumerable<object> GetLocallyQueuedWorkItems() + { + ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue; + if (wsq != null && wsq.m_array != null) + { + object[] items = wsq.m_array; + for (int i = 0; i < items.Length; i++) + { + object item = items[i]; + if (item != null) + yield return item; + } + } + } + + internal static IEnumerable<object> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems; + + private static object[] ToObjectArray(IEnumerable<object> workitems) + { + int i = 0; + foreach (object item in workitems) + { + i++; + } + + object[] result = new object[i]; + i = 0; + foreach (object item in workitems) + { + if (i < result.Length) //just in case someone calls us while the queues are in motion + result[i] = item; + i++; + } + + return result; + } + + // This is the method the debugger will actually call, if it ends up calling + // into ThreadPool directly. Tests can use this to simulate a debugger, as well. + internal static object[] GetQueuedWorkItemsForDebugger() => + ToObjectArray(GetQueuedWorkItems()); + + internal static object[] GetGloballyQueuedWorkItemsForDebugger() => + ToObjectArray(GetGloballyQueuedWorkItems()); + + internal static object[] GetLocallyQueuedWorkItemsForDebugger() => + ToObjectArray(GetLocallyQueuedWorkItems()); + } +} |