// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // // // A pair of schedulers that together support concurrent (reader) / exclusive (writer) // task scheduling. Using just the exclusive scheduler can be used to simulate a serial // processing queue, and using just the concurrent scheduler with a specified // MaximumConcurrentlyLevel can be used to achieve a MaxDegreeOfParallelism across // a bunch of tasks, parallel loops, dataflow blocks, etc. // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Diagnostics.Contracts; using System.Security; using System.Security.Permissions; namespace System.Threading.Tasks { /// /// Provides concurrent and exclusive task schedulers that coordinate to execute /// tasks while ensuring that concurrent tasks may run concurrently and exclusive tasks never do. /// [DebuggerDisplay("Concurrent={ConcurrentTaskCountForDebugger}, Exclusive={ExclusiveTaskCountForDebugger}, Mode={ModeForDebugger}")] [DebuggerTypeProxy(typeof(ConcurrentExclusiveSchedulerPair.DebugView))] public class ConcurrentExclusiveSchedulerPair { /// A dictionary mapping thread ID to a processing mode to denote what kinds of tasks are currently being processed on this thread. private readonly ConcurrentDictionary m_threadProcessingMapping = new ConcurrentDictionary(); /// The scheduler used to queue and execute "concurrent" tasks that may run concurrently with other concurrent tasks. private readonly ConcurrentExclusiveTaskScheduler m_concurrentTaskScheduler; /// The scheduler used to queue and execute "exclusive" tasks that must run exclusively while no other tasks for this pair are running. private readonly ConcurrentExclusiveTaskScheduler m_exclusiveTaskScheduler; /// The underlying task scheduler to which all work should be scheduled. private readonly TaskScheduler m_underlyingTaskScheduler; /// /// The maximum number of tasks allowed to run concurrently. This only applies to concurrent tasks, /// since exlusive tasks are inherently limited to 1. /// private readonly int m_maxConcurrencyLevel; /// The maximum number of tasks we can process before recyling our runner tasks. private readonly int m_maxItemsPerTask; /// /// If positive, it represents the number of concurrently running concurrent tasks. /// If negative, it means an exclusive task has been scheduled. /// If 0, nothing has been scheduled. /// private int m_processingCount; /// Completion state for a task representing the completion of this pair. /// Lazily-initialized only if the scheduler pair is shutting down or if the Completion is requested. private CompletionState m_completionState; /// A constant value used to signal unlimited processing. private const int UNLIMITED_PROCESSING = -1; /// Constant used for m_processingCount to indicate that an exclusive task is being processed. private const int EXCLUSIVE_PROCESSING_SENTINEL = -1; /// Default MaxItemsPerTask to use for processing if none is specified. private const int DEFAULT_MAXITEMSPERTASK = UNLIMITED_PROCESSING; /// Default MaxConcurrencyLevel is the processor count if not otherwise specified. private static Int32 DefaultMaxConcurrencyLevel { get { return Environment.ProcessorCount; } } /// Gets the sync obj used to protect all state on this instance. private object ValueLock { get { return m_threadProcessingMapping; } } /// /// Initializes the ConcurrentExclusiveSchedulerPair. /// public ConcurrentExclusiveSchedulerPair() : this(TaskScheduler.Default, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { } /// /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler. /// /// The target scheduler on which this pair should execute. public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler) : this(taskScheduler, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { } /// /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum concurrency level. /// /// The target scheduler on which this pair should execute. /// The maximum number of tasks to run concurrently. public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel) : this(taskScheduler, maxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { } /// /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum /// concurrency level and a maximum number of scheduled tasks that may be processed as a unit. /// /// The target scheduler on which this pair should execute. /// The maximum number of tasks to run concurrently. /// The maximum number of tasks to process for each underlying scheduled task used by the pair. public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask) { // Validate arguments if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler)); if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException(nameof(maxConcurrencyLevel)); if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException(nameof(maxItemsPerTask)); Contract.EndContractBlock(); // Store configuration m_underlyingTaskScheduler = taskScheduler; m_maxConcurrencyLevel = maxConcurrencyLevel; m_maxItemsPerTask = maxItemsPerTask; // Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level int mcl = taskScheduler.MaximumConcurrencyLevel; if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl; // Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't // have to special case UNLIMITED_PROCESSING later on in processing. if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue; if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue; // Create the concurrent/exclusive schedulers for this pair m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask); m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks); } /// Informs the scheduler pair that it should not accept any more tasks. /// /// Calling is optional, and it's only necessary if the /// will be relied on for notification of all processing being completed. /// public void Complete() { lock (ValueLock) { if (!CompletionRequested) { RequestCompletion(); CleanupStateIfCompletingAndQuiesced(); } } } /// Gets a that will complete when the scheduler has completed processing. public Task Completion { // ValueLock not needed, but it's ok if it's held get { return EnsureCompletionStateInitialized().Task; } } /// Gets the lazily-initialized completion state. private CompletionState EnsureCompletionStateInitialized() { // ValueLock not needed, but it's ok if it's held return LazyInitializer.EnsureInitialized(ref m_completionState, () => new CompletionState()); } /// Gets whether completion has been requested. private bool CompletionRequested { // ValueLock not needed, but it's ok if it's held get { return m_completionState != null && Volatile.Read(ref m_completionState.m_completionRequested); } } /// Sets that completion has been requested. private void RequestCompletion() { ContractAssertMonitorStatus(ValueLock, held: true); EnsureCompletionStateInitialized().m_completionRequested = true; } /// /// Cleans up state if and only if there's no processing currently happening /// and no more to be done later. /// private void CleanupStateIfCompletingAndQuiesced() { ContractAssertMonitorStatus(ValueLock, held: true); if (ReadyToComplete) CompleteTaskAsync(); } /// Gets whether the pair is ready to complete. private bool ReadyToComplete { get { ContractAssertMonitorStatus(ValueLock, held: true); // We can only complete if completion has been requested and no processing is currently happening. if (!CompletionRequested || m_processingCount != 0) return false; // Now, only allow shutdown if an exception occurred or if there are no more tasks to process. var cs = EnsureCompletionStateInitialized(); return (cs.m_exceptions != null && cs.m_exceptions.Count > 0) || (m_concurrentTaskScheduler.m_tasks.IsEmpty && m_exclusiveTaskScheduler.m_tasks.IsEmpty); } } /// Completes the completion task asynchronously. private void CompleteTaskAsync() { Contract.Requires(ReadyToComplete, "The block must be ready to complete to be here."); ContractAssertMonitorStatus(ValueLock, held: true); // Ensure we only try to complete once, then schedule completion // in order to escape held locks and the caller's context var cs = EnsureCompletionStateInitialized(); if (!cs.m_completionQueued) { cs.m_completionQueued = true; ThreadPool.QueueUserWorkItem(state => { var localCs = (CompletionState)state; // don't use 'cs', as it'll force a closure Debug.Assert(!localCs.Task.IsCompleted, "Completion should only happen once."); var exceptions = localCs.m_exceptions; bool success = (exceptions != null && exceptions.Count > 0) ? localCs.TrySetException(exceptions) : localCs.TrySetResult(default(VoidTaskResult)); Debug.Assert(success, "Expected to complete completion task."); }, cs); } } /// Initiatites scheduler shutdown due to a worker task faulting.. /// The faulted worker task that's initiating the shutdown. private void FaultWithTask(Task faultedTask) { Contract.Requires(faultedTask != null && faultedTask.IsFaulted && faultedTask.Exception.InnerExceptions.Count > 0, "Needs a task in the faulted state and thus with exceptions."); ContractAssertMonitorStatus(ValueLock, held: true); // Store the faulted task's exceptions var cs = EnsureCompletionStateInitialized(); if (cs.m_exceptions == null) cs.m_exceptions = new List(); cs.m_exceptions.AddRange(faultedTask.Exception.InnerExceptions); // Now that we're doomed, request completion RequestCompletion(); } /// /// Gets a TaskScheduler that can be used to schedule tasks to this pair /// that may run concurrently with other tasks on this pair. /// public TaskScheduler ConcurrentScheduler { get { return m_concurrentTaskScheduler; } } /// /// Gets a TaskScheduler that can be used to schedule tasks to this pair /// that must run exclusively with regards to other tasks on this pair. /// public TaskScheduler ExclusiveScheduler { get { return m_exclusiveTaskScheduler; } } /// Gets the number of tasks waiting to run concurrently. /// This does not take the necessary lock, as it's only called from under the debugger. [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] private int ConcurrentTaskCountForDebugger { get { return m_concurrentTaskScheduler.m_tasks.Count; } } /// Gets the number of tasks waiting to run exclusively. /// This does not take the necessary lock, as it's only called from under the debugger. [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] private int ExclusiveTaskCountForDebugger { get { return m_exclusiveTaskScheduler.m_tasks.Count; } } /// Notifies the pair that new work has arrived to be processed. /// Whether tasks should be scheduled fairly with regards to other tasks. /// Must only be called while holding the lock. [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")] [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] private void ProcessAsyncIfNecessary(bool fairly = false) { ContractAssertMonitorStatus(ValueLock, held: true); // If the current processing count is >= 0, we can potentially launch further processing. if (m_processingCount >= 0) { // We snap whether there are any exclusive tasks or concurrent tasks waiting. // (We grab the concurrent count below only once we know we need it.) // With processing happening concurrent to this operation, this data may // immediately be out of date, but it can only go from non-empty // to empty and not the other way around. As such, this is safe, // as worst case is we'll schedule an extra task when we didn't // otherwise need to, and we'll just eat its overhead. bool exclusiveTasksAreWaiting = !m_exclusiveTaskScheduler.m_tasks.IsEmpty; // If there's no processing currently happening but there are waiting exclusive tasks, // let's start processing those exclusive tasks. Task processingTask = null; if (m_processingCount == 0 && exclusiveTasksAreWaiting) { // Launch exclusive task processing m_processingCount = EXCLUSIVE_PROCESSING_SENTINEL; // -1 try { processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessExclusiveTasks(), this, default(CancellationToken), GetCreationOptionsForTask(fairly)); processingTask.Start(m_underlyingTaskScheduler); // When we call Start, if the underlying scheduler throws in QueueTask, TPL will fault the task and rethrow // the exception. To deal with that, we need a reference to the task object, so that we can observe its exception. // Hence, we separate creation and starting, so that we can store a reference to the task before we attempt QueueTask. } catch { m_processingCount = 0; FaultWithTask(processingTask); } } // If there are no waiting exclusive tasks, there are concurrent tasks, and we haven't reached our maximum // concurrency level for processing, let's start processing more concurrent tasks. else { int concurrentTasksWaitingCount = m_concurrentTaskScheduler.m_tasks.Count; if (concurrentTasksWaitingCount > 0 && !exclusiveTasksAreWaiting && m_processingCount < m_maxConcurrencyLevel) { // Launch concurrent task processing, up to the allowed limit for (int i = 0; i < concurrentTasksWaitingCount && m_processingCount < m_maxConcurrencyLevel; ++i) { ++m_processingCount; try { processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessConcurrentTasks(), this, default(CancellationToken), GetCreationOptionsForTask(fairly)); processingTask.Start(m_underlyingTaskScheduler); // See above logic for why we use new + Start rather than StartNew } catch { --m_processingCount; FaultWithTask(processingTask); } } } } // Check to see if all tasks have completed and if completion has been requested. CleanupStateIfCompletingAndQuiesced(); } else Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing count must be the sentinel if it's not >= 0."); } /// /// Processes exclusive tasks serially until either there are no more to process /// or we've reached our user-specified maximum limit. /// private void ProcessExclusiveTasks() { Contract.Requires(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "Processing exclusive tasks requires being in exclusive mode."); Contract.Requires(!m_exclusiveTaskScheduler.m_tasks.IsEmpty, "Processing exclusive tasks requires tasks to be processed."); ContractAssertMonitorStatus(ValueLock, held: false); try { // Note that we're processing exclusive tasks on the current thread Debug.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId), "This thread should not yet be involved in this pair's processing."); m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingExclusiveTask; // Process up to the maximum number of items per task allowed for (int i = 0; i < m_maxItemsPerTask; i++) { // Get the next available exclusive task. If we can't find one, bail. Task exclusiveTask; if (!m_exclusiveTaskScheduler.m_tasks.TryDequeue(out exclusiveTask)) break; // Execute the task. If the scheduler was previously faulted, // this task could have been faulted when it was queued; ignore such tasks. if (!exclusiveTask.IsFaulted) m_exclusiveTaskScheduler.ExecuteTask(exclusiveTask); } } finally { // We're no longer processing exclusive tasks on the current thread ProcessingMode currentMode; m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode); Debug.Assert(currentMode == ProcessingMode.ProcessingExclusiveTask, "Somehow we ended up escaping exclusive mode."); lock (ValueLock) { // When this task was launched, we tracked it by setting m_processingCount to WRITER_IN_PROGRESS. // now reset it to 0. Then check to see whether there's more processing to be done. // There might be more concurrent tasks available, for example, if concurrent tasks arrived // after we exited the loop, or if we exited the loop while concurrent tasks were still // available but we hit our maxItemsPerTask limit. Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing mode should not have deviated from exclusive."); m_processingCount = 0; ProcessAsyncIfNecessary(true); } } } /// /// Processes concurrent tasks serially until either there are no more to process, /// we've reached our user-specified maximum limit, or exclusive tasks have arrived. /// private void ProcessConcurrentTasks() { Contract.Requires(m_processingCount > 0, "Processing concurrent tasks requires us to be in concurrent mode."); ContractAssertMonitorStatus(ValueLock, held: false); try { // Note that we're processing concurrent tasks on the current thread Debug.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId), "This thread should not yet be involved in this pair's processing."); m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingConcurrentTasks; // Process up to the maximum number of items per task allowed for (int i = 0; i < m_maxItemsPerTask; i++) { // Get the next available concurrent task. If we can't find one, bail. Task concurrentTask; if (!m_concurrentTaskScheduler.m_tasks.TryDequeue(out concurrentTask)) break; // Execute the task. If the scheduler was previously faulted, // this task could have been faulted when it was queued; ignore such tasks. if (!concurrentTask.IsFaulted) m_concurrentTaskScheduler.ExecuteTask(concurrentTask); // Now check to see if exclusive tasks have arrived; if any have, they take priority // so we'll bail out here. Note that we could have checked this condition // in the for loop's condition, but that could lead to extra overhead // in the case where a concurrent task arrives, this task is launched, and then // before entering the loop an exclusive task arrives. If we didn't execute at // least one task, we would have spent all of the overhead to launch a // task but with none of the benefit. There's of course also an inherent // race condition here with regards to exclusive tasks arriving, and we're ok with // executing one more concurrent task than we should before giving priority to exclusive tasks. if (!m_exclusiveTaskScheduler.m_tasks.IsEmpty) break; } } finally { // We're no longer processing concurrent tasks on the current thread ProcessingMode currentMode; m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode); Debug.Assert(currentMode == ProcessingMode.ProcessingConcurrentTasks, "Somehow we ended up escaping concurrent mode."); lock (ValueLock) { // When this task was launched, we tracked it with a positive processing count; // decrement that count. Then check to see whether there's more processing to be done. // There might be more concurrent tasks available, for example, if concurrent tasks arrived // after we exited the loop, or if we exited the loop while concurrent tasks were still // available but we hit our maxItemsPerTask limit. Debug.Assert(m_processingCount > 0, "The procesing mode should not have deviated from concurrent."); if (m_processingCount > 0) --m_processingCount; ProcessAsyncIfNecessary(true); } } } #if PRENET45 /// /// Type used with TaskCompletionSource(Of TResult) as the TResult /// to ensure that the resulting task can't be upcast to something /// that in the future could lead to compat problems. /// [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")] [DebuggerNonUserCode] private struct VoidTaskResult { } #endif /// /// Holder for lazily-initialized state about the completion of a scheduler pair. /// Completion is only triggered either by rare exceptional conditions or by /// the user calling Complete, and as such we only lazily initialize this /// state in one of those conditions or if the user explicitly asks for /// the Completion. /// [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")] private sealed class CompletionState : TaskCompletionSource { /// Whether the scheduler has had completion requested. /// This variable is not volatile, so to gurantee safe reading reads, Volatile.Read is used in TryExecuteTaskInline. internal bool m_completionRequested; /// Whether completion processing has been queued. internal bool m_completionQueued; /// Unrecoverable exceptions incurred while processing. internal List m_exceptions; } /// /// A scheduler shim used to queue tasks to the pair and execute those tasks on request of the pair. /// [DebuggerDisplay("Count={CountForDebugger}, MaxConcurrencyLevel={m_maxConcurrencyLevel}, Id={Id}")] [DebuggerTypeProxy(typeof(ConcurrentExclusiveTaskScheduler.DebugView))] private sealed class ConcurrentExclusiveTaskScheduler : TaskScheduler { /// Cached delegate for invoking TryExecuteTaskShim. private static readonly Func s_tryExecuteTaskShim = new Func(TryExecuteTaskShim); /// The parent pair. private readonly ConcurrentExclusiveSchedulerPair m_pair; /// The maximum concurrency level for the scheduler. private readonly int m_maxConcurrencyLevel; /// The processing mode of this scheduler, exclusive or concurrent. private readonly ProcessingMode m_processingMode; /// Gets the queue of tasks for this scheduler. internal readonly IProducerConsumerQueue m_tasks; /// Initializes the scheduler. /// The parent pair. /// The maximum degree of concurrency this scheduler may use. /// The processing mode of this scheduler. internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair pair, int maxConcurrencyLevel, ProcessingMode processingMode) { Contract.Requires(pair != null, "Scheduler must be associated with a valid pair."); Contract.Requires(processingMode == ProcessingMode.ProcessingConcurrentTasks || processingMode == ProcessingMode.ProcessingExclusiveTask, "Scheduler must be for concurrent or exclusive processing."); Contract.Requires( (processingMode == ProcessingMode.ProcessingConcurrentTasks && (maxConcurrencyLevel >= 1 || maxConcurrencyLevel == UNLIMITED_PROCESSING)) || (processingMode == ProcessingMode.ProcessingExclusiveTask && maxConcurrencyLevel == 1), "If we're in concurrent mode, our concurrency level should be positive or unlimited. If exclusive, it should be 1."); m_pair = pair; m_maxConcurrencyLevel = maxConcurrencyLevel; m_processingMode = processingMode; m_tasks = (processingMode == ProcessingMode.ProcessingExclusiveTask) ? (IProducerConsumerQueue)new SingleProducerSingleConsumerQueue() : (IProducerConsumerQueue)new MultiProducerMultiConsumerQueue(); } /// Gets the maximum concurrency level this scheduler is able to support. public override int MaximumConcurrencyLevel { get { return m_maxConcurrencyLevel; } } /// Queues a task to the scheduler. /// The task to be queued. protected internal override void QueueTask(Task task) { Debug.Assert(task != null, "Infrastructure should have provided a non-null task."); lock (m_pair.ValueLock) { // If the scheduler has already had completion requested, no new work is allowed to be scheduled if (m_pair.CompletionRequested) throw new InvalidOperationException(GetType().Name); // Queue the task, and then let the pair know that more work is now available to be scheduled m_tasks.Enqueue(task); m_pair.ProcessAsyncIfNecessary(); } } /// Executes a task on this scheduler. /// The task to be executed. internal void ExecuteTask(Task task) { Debug.Assert(task != null, "Infrastructure should have provided a non-null task."); base.TryExecuteTask(task); } /// Tries to execute the task synchronously on this scheduler. /// The task to execute. /// Whether the task was previously queued to the scheduler. /// true if the task could be executed; otherwise, false. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { Debug.Assert(task != null, "Infrastructure should have provided a non-null task."); // If the scheduler has had completion requested, no new work is allowed to be scheduled. // A non-locked read on m_completionRequested (in CompletionRequested) is acceptable here because: // a) we don't need to be exact... a Complete call could come in later in the function anyway // b) this is only a fast path escape hatch. To actually inline the task, // we need to be inside of an already executing task, and in such a case, // while completion may have been requested, we can't have shutdown yet. if (!taskWasPreviouslyQueued && m_pair.CompletionRequested) return false; // We know the implementation of the default scheduler and how it will behave. // As it's the most common underlying scheduler, we optimize for it. bool isDefaultScheduler = m_pair.m_underlyingTaskScheduler == TaskScheduler.Default; // If we're targeting the default scheduler and taskWasPreviouslyQueued is true, // we know that the default scheduler will only allow it to be inlined // if we're on a thread pool thread (but it won't always allow it in that case, // since it'll only allow inlining if it can find the task in the local queue). // As such, if we're not on a thread pool thread, we know for sure the // task won't be inlined, so let's not even try. if (isDefaultScheduler && taskWasPreviouslyQueued && !Thread.CurrentThread.IsThreadPoolThread) { return false; } else { // If a task is already running on this thread, allow inline execution to proceed. // If there's already a task from this scheduler running on the current thread, we know it's safe // to run this task, in effect temporarily taking that task's count allocation. ProcessingMode currentThreadMode; if (m_pair.m_threadProcessingMapping.TryGetValue(Thread.CurrentThread.ManagedThreadId, out currentThreadMode) && currentThreadMode == m_processingMode) { // If we're targeting the default scheduler and taskWasPreviouslyQueued is false, // we know the default scheduler will allow it, so we can just execute it here. // Otherwise, delegate to the target scheduler's inlining. return (isDefaultScheduler && !taskWasPreviouslyQueued) ? TryExecuteTask(task) : TryExecuteTaskInlineOnTargetScheduler(task); } } // We're not in the context of a task already executing on this scheduler. Bail. return false; } /// /// Implements a reasonable approximation for TryExecuteTaskInline on the underlying scheduler, /// which we can't call directly on the underlying scheduler. /// /// The task to execute inline if possible. /// true if the task was inlined successfully; otherwise, false. [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")] private bool TryExecuteTaskInlineOnTargetScheduler(Task task) { // We'd like to simply call TryExecuteTaskInline here, but we can't. // As there's no built-in API for this, a workaround is to create a new task that, // when executed, will simply call TryExecuteTask to run the real task, and then // we run our new shim task synchronously on the target scheduler. If all goes well, // our synchronous invocation will succeed in running the shim task on the current thread, // which will in turn run the real task on the current thread. If the scheduler // doesn't allow that execution, RunSynchronously will block until the underlying scheduler // is able to invoke the task, which might account for an additional but unavoidable delay. // Once it's done, we can return whether the task executed by returning the // shim task's Result, which is in turn the result of TryExecuteTask. var t = new Task(s_tryExecuteTaskShim, Tuple.Create(this, task)); try { t.RunSynchronously(m_pair.m_underlyingTaskScheduler); return t.Result; } catch { Debug.Assert(t.IsFaulted, "Task should be faulted due to the scheduler faulting it and throwing the exception."); var ignored = t.Exception; throw; } finally { t.Dispose(); } } /// Shim used to invoke this.TryExecuteTask(task). /// A tuple of the ConcurrentExclusiveTaskScheduler and the task to execute. /// true if the task was successfully inlined; otherwise, false. /// /// This method is separated out not because of performance reasons but so that /// the SecuritySafeCritical attribute may be employed. /// private static bool TryExecuteTaskShim(object state) { var tuple = (Tuple)state; return tuple.Item1.TryExecuteTask(tuple.Item2); } /// Gets for debugging purposes the tasks scheduled to this scheduler. /// An enumerable of the tasks queued. protected override IEnumerable GetScheduledTasks() { return m_tasks; } /// Gets the number of tasks queued to this scheduler. [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] private int CountForDebugger { get { return m_tasks.Count; } } /// Provides a debug view for ConcurrentExclusiveTaskScheduler. private sealed class DebugView { /// The scheduler being debugged. private readonly ConcurrentExclusiveTaskScheduler m_taskScheduler; /// Initializes the debug view. /// The scheduler being debugged. public DebugView(ConcurrentExclusiveTaskScheduler scheduler) { Contract.Requires(scheduler != null, "Need a scheduler with which to construct the debug view."); m_taskScheduler = scheduler; } /// Gets this pair's maximum allowed concurrency level. public int MaximumConcurrencyLevel { get { return m_taskScheduler.m_maxConcurrencyLevel; } } /// Gets the tasks scheduled to this scheduler. public IEnumerable ScheduledTasks { get { return m_taskScheduler.m_tasks; } } /// Gets the scheduler pair with which this scheduler is associated. public ConcurrentExclusiveSchedulerPair SchedulerPair { get { return m_taskScheduler.m_pair; } } } } /// Provides a debug view for ConcurrentExclusiveSchedulerPair. private sealed class DebugView { /// The pair being debugged. private readonly ConcurrentExclusiveSchedulerPair m_pair; /// Initializes the debug view. /// The pair being debugged. public DebugView(ConcurrentExclusiveSchedulerPair pair) { Contract.Requires(pair != null, "Need a pair with which to construct the debug view."); m_pair = pair; } /// Gets a representation of the execution state of the pair. public ProcessingMode Mode { get { return m_pair.ModeForDebugger; } } /// Gets the number of tasks waiting to run exclusively. public IEnumerable ScheduledExclusive { get { return m_pair.m_exclusiveTaskScheduler.m_tasks; } } /// Gets the number of tasks waiting to run concurrently. public IEnumerable ScheduledConcurrent { get { return m_pair.m_concurrentTaskScheduler.m_tasks; } } /// Gets the number of tasks currently being executed. public int CurrentlyExecutingTaskCount { get { return (m_pair.m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) ? 1 : m_pair.m_processingCount; } } /// Gets the underlying task scheduler that actually executes the tasks. public TaskScheduler TargetScheduler { get { return m_pair.m_underlyingTaskScheduler; } } } /// Gets an enumeration for debugging that represents the current state of the scheduler pair. /// This is only for debugging. It does not take the necessary locks to be useful for runtime usage. private ProcessingMode ModeForDebugger { get { // If our completion task is done, so are we. if (m_completionState != null && m_completionState.Task.IsCompleted) return ProcessingMode.Completed; // Otherwise, summarize our current state. var mode = ProcessingMode.NotCurrentlyProcessing; if (m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) mode |= ProcessingMode.ProcessingExclusiveTask; if (m_processingCount >= 1) mode |= ProcessingMode.ProcessingConcurrentTasks; if (CompletionRequested) mode |= ProcessingMode.Completing; return mode; } } /// Asserts that a given synchronization object is either held or not held. /// The monitor to check. /// Whether we want to assert that it's currently held or not held. [Conditional("DEBUG")] internal static void ContractAssertMonitorStatus(object syncObj, bool held) { Contract.Requires(syncObj != null, "The monitor object to check must be provided."); #if PRENET45 #if DEBUG // This check is expensive, // which is why it's protected by ShouldCheckMonitorStatus and controlled by an environment variable DEBUGSYNC. if (ShouldCheckMonitorStatus) { bool exceptionThrown; try { Monitor.Pulse(syncObj); // throws a SynchronizationLockException if the monitor isn't held by this thread exceptionThrown = false; } catch (SynchronizationLockException) { exceptionThrown = true; } Debug.Assert(held == !exceptionThrown, "The locking scheme was not correctly followed."); } #endif #else Debug.Assert(Monitor.IsEntered(syncObj) == held, "The locking scheme was not correctly followed."); #endif } /// Gets the options to use for tasks. /// If this task is being created to replace another. /// /// These options should be used for all tasks that have the potential to run user code or /// that are repeatedly spawned and thus need a modicum of fair treatment. /// /// The options to use. internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false) { TaskCreationOptions options = #if PRENET45 TaskCreationOptions.None; #else TaskCreationOptions.DenyChildAttach; #endif if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness; return options; } /// Provides an enumeration that represents the current state of the scheduler pair. [Flags] private enum ProcessingMode : byte { /// The scheduler pair is currently dormant, with no work scheduled. NotCurrentlyProcessing = 0x0, /// The scheduler pair has queued processing for exclusive tasks. ProcessingExclusiveTask = 0x1, /// The scheduler pair has queued processing for concurrent tasks. ProcessingConcurrentTasks = 0x2, /// Completion has been requested. Completing = 0x4, /// The scheduler pair is finished processing. Completed = 0x8 } } }