// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
//
//
// A pair of schedulers that together support concurrent (reader) / exclusive (writer)
// task scheduling. Using just the exclusive scheduler can be used to simulate a serial
// processing queue, and using just the concurrent scheduler with a specified
// MaximumConcurrentlyLevel can be used to achieve a MaxDegreeOfParallelism across
// a bunch of tasks, parallel loops, dataflow blocks, etc.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Security;
using System.Security.Permissions;
namespace System.Threading.Tasks
{
///
/// Provides concurrent and exclusive task schedulers that coordinate to execute
/// tasks while ensuring that concurrent tasks may run concurrently and exclusive tasks never do.
///
[DebuggerDisplay("Concurrent={ConcurrentTaskCountForDebugger}, Exclusive={ExclusiveTaskCountForDebugger}, Mode={ModeForDebugger}")]
[DebuggerTypeProxy(typeof(ConcurrentExclusiveSchedulerPair.DebugView))]
public class ConcurrentExclusiveSchedulerPair
{
/// A dictionary mapping thread ID to a processing mode to denote what kinds of tasks are currently being processed on this thread.
private readonly ConcurrentDictionary m_threadProcessingMapping = new ConcurrentDictionary();
/// The scheduler used to queue and execute "concurrent" tasks that may run concurrently with other concurrent tasks.
private readonly ConcurrentExclusiveTaskScheduler m_concurrentTaskScheduler;
/// The scheduler used to queue and execute "exclusive" tasks that must run exclusively while no other tasks for this pair are running.
private readonly ConcurrentExclusiveTaskScheduler m_exclusiveTaskScheduler;
/// The underlying task scheduler to which all work should be scheduled.
private readonly TaskScheduler m_underlyingTaskScheduler;
///
/// The maximum number of tasks allowed to run concurrently. This only applies to concurrent tasks,
/// since exlusive tasks are inherently limited to 1.
///
private readonly int m_maxConcurrencyLevel;
/// The maximum number of tasks we can process before recyling our runner tasks.
private readonly int m_maxItemsPerTask;
///
/// If positive, it represents the number of concurrently running concurrent tasks.
/// If negative, it means an exclusive task has been scheduled.
/// If 0, nothing has been scheduled.
///
private int m_processingCount;
/// Completion state for a task representing the completion of this pair.
/// Lazily-initialized only if the scheduler pair is shutting down or if the Completion is requested.
private CompletionState m_completionState;
/// A constant value used to signal unlimited processing.
private const int UNLIMITED_PROCESSING = -1;
/// Constant used for m_processingCount to indicate that an exclusive task is being processed.
private const int EXCLUSIVE_PROCESSING_SENTINEL = -1;
/// Default MaxItemsPerTask to use for processing if none is specified.
private const int DEFAULT_MAXITEMSPERTASK = UNLIMITED_PROCESSING;
/// Default MaxConcurrencyLevel is the processor count if not otherwise specified.
private static Int32 DefaultMaxConcurrencyLevel { get { return Environment.ProcessorCount; } }
/// Gets the sync obj used to protect all state on this instance.
private object ValueLock { get { return m_threadProcessingMapping; } }
///
/// Initializes the ConcurrentExclusiveSchedulerPair.
///
public ConcurrentExclusiveSchedulerPair() :
this(TaskScheduler.Default, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { }
///
/// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler.
///
/// The target scheduler on which this pair should execute.
public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler) :
this(taskScheduler, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { }
///
/// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum concurrency level.
///
/// The target scheduler on which this pair should execute.
/// The maximum number of tasks to run concurrently.
public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel) :
this(taskScheduler, maxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { }
///
/// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum
/// concurrency level and a maximum number of scheduled tasks that may be processed as a unit.
///
/// The target scheduler on which this pair should execute.
/// The maximum number of tasks to run concurrently.
/// The maximum number of tasks to process for each underlying scheduled task used by the pair.
public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
{
// Validate arguments
if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler));
if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException(nameof(maxConcurrencyLevel));
if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException(nameof(maxItemsPerTask));
Contract.EndContractBlock();
// Store configuration
m_underlyingTaskScheduler = taskScheduler;
m_maxConcurrencyLevel = maxConcurrencyLevel;
m_maxItemsPerTask = maxItemsPerTask;
// Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level
int mcl = taskScheduler.MaximumConcurrencyLevel;
if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl;
// Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't
// have to special case UNLIMITED_PROCESSING later on in processing.
if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue;
if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue;
// Create the concurrent/exclusive schedulers for this pair
m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask);
m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks);
}
/// Informs the scheduler pair that it should not accept any more tasks.
///
/// Calling is optional, and it's only necessary if the
/// will be relied on for notification of all processing being completed.
///
public void Complete()
{
lock (ValueLock)
{
if (!CompletionRequested)
{
RequestCompletion();
CleanupStateIfCompletingAndQuiesced();
}
}
}
/// Gets a that will complete when the scheduler has completed processing.
public Task Completion
{
// ValueLock not needed, but it's ok if it's held
get { return EnsureCompletionStateInitialized().Task; }
}
/// Gets the lazily-initialized completion state.
private CompletionState EnsureCompletionStateInitialized()
{
// ValueLock not needed, but it's ok if it's held
return LazyInitializer.EnsureInitialized(ref m_completionState, () => new CompletionState());
}
/// Gets whether completion has been requested.
private bool CompletionRequested
{
// ValueLock not needed, but it's ok if it's held
get { return m_completionState != null && Volatile.Read(ref m_completionState.m_completionRequested); }
}
/// Sets that completion has been requested.
private void RequestCompletion()
{
ContractAssertMonitorStatus(ValueLock, held: true);
EnsureCompletionStateInitialized().m_completionRequested = true;
}
///
/// Cleans up state if and only if there's no processing currently happening
/// and no more to be done later.
///
private void CleanupStateIfCompletingAndQuiesced()
{
ContractAssertMonitorStatus(ValueLock, held: true);
if (ReadyToComplete) CompleteTaskAsync();
}
/// Gets whether the pair is ready to complete.
private bool ReadyToComplete
{
get
{
ContractAssertMonitorStatus(ValueLock, held: true);
// We can only complete if completion has been requested and no processing is currently happening.
if (!CompletionRequested || m_processingCount != 0) return false;
// Now, only allow shutdown if an exception occurred or if there are no more tasks to process.
var cs = EnsureCompletionStateInitialized();
return
(cs.m_exceptions != null && cs.m_exceptions.Count > 0) ||
(m_concurrentTaskScheduler.m_tasks.IsEmpty && m_exclusiveTaskScheduler.m_tasks.IsEmpty);
}
}
/// Completes the completion task asynchronously.
private void CompleteTaskAsync()
{
Contract.Requires(ReadyToComplete, "The block must be ready to complete to be here.");
ContractAssertMonitorStatus(ValueLock, held: true);
// Ensure we only try to complete once, then schedule completion
// in order to escape held locks and the caller's context
var cs = EnsureCompletionStateInitialized();
if (!cs.m_completionQueued)
{
cs.m_completionQueued = true;
ThreadPool.QueueUserWorkItem(state =>
{
var localCs = (CompletionState)state; // don't use 'cs', as it'll force a closure
Debug.Assert(!localCs.Task.IsCompleted, "Completion should only happen once.");
var exceptions = localCs.m_exceptions;
bool success = (exceptions != null && exceptions.Count > 0) ?
localCs.TrySetException(exceptions) :
localCs.TrySetResult(default(VoidTaskResult));
Debug.Assert(success, "Expected to complete completion task.");
}, cs);
}
}
/// Initiatites scheduler shutdown due to a worker task faulting..
/// The faulted worker task that's initiating the shutdown.
private void FaultWithTask(Task faultedTask)
{
Contract.Requires(faultedTask != null && faultedTask.IsFaulted && faultedTask.Exception.InnerExceptions.Count > 0,
"Needs a task in the faulted state and thus with exceptions.");
ContractAssertMonitorStatus(ValueLock, held: true);
// Store the faulted task's exceptions
var cs = EnsureCompletionStateInitialized();
if (cs.m_exceptions == null) cs.m_exceptions = new List();
cs.m_exceptions.AddRange(faultedTask.Exception.InnerExceptions);
// Now that we're doomed, request completion
RequestCompletion();
}
///
/// Gets a TaskScheduler that can be used to schedule tasks to this pair
/// that may run concurrently with other tasks on this pair.
///
public TaskScheduler ConcurrentScheduler { get { return m_concurrentTaskScheduler; } }
///
/// Gets a TaskScheduler that can be used to schedule tasks to this pair
/// that must run exclusively with regards to other tasks on this pair.
///
public TaskScheduler ExclusiveScheduler { get { return m_exclusiveTaskScheduler; } }
/// Gets the number of tasks waiting to run concurrently.
/// This does not take the necessary lock, as it's only called from under the debugger.
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
private int ConcurrentTaskCountForDebugger { get { return m_concurrentTaskScheduler.m_tasks.Count; } }
/// Gets the number of tasks waiting to run exclusively.
/// This does not take the necessary lock, as it's only called from under the debugger.
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
private int ExclusiveTaskCountForDebugger { get { return m_exclusiveTaskScheduler.m_tasks.Count; } }
/// Notifies the pair that new work has arrived to be processed.
/// Whether tasks should be scheduled fairly with regards to other tasks.
/// Must only be called while holding the lock.
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
[SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")]
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
private void ProcessAsyncIfNecessary(bool fairly = false)
{
ContractAssertMonitorStatus(ValueLock, held: true);
// If the current processing count is >= 0, we can potentially launch further processing.
if (m_processingCount >= 0)
{
// We snap whether there are any exclusive tasks or concurrent tasks waiting.
// (We grab the concurrent count below only once we know we need it.)
// With processing happening concurrent to this operation, this data may
// immediately be out of date, but it can only go from non-empty
// to empty and not the other way around. As such, this is safe,
// as worst case is we'll schedule an extra task when we didn't
// otherwise need to, and we'll just eat its overhead.
bool exclusiveTasksAreWaiting = !m_exclusiveTaskScheduler.m_tasks.IsEmpty;
// If there's no processing currently happening but there are waiting exclusive tasks,
// let's start processing those exclusive tasks.
Task processingTask = null;
if (m_processingCount == 0 && exclusiveTasksAreWaiting)
{
// Launch exclusive task processing
m_processingCount = EXCLUSIVE_PROCESSING_SENTINEL; // -1
try
{
processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessExclusiveTasks(), this,
default(CancellationToken), GetCreationOptionsForTask(fairly));
processingTask.Start(m_underlyingTaskScheduler);
// When we call Start, if the underlying scheduler throws in QueueTask, TPL will fault the task and rethrow
// the exception. To deal with that, we need a reference to the task object, so that we can observe its exception.
// Hence, we separate creation and starting, so that we can store a reference to the task before we attempt QueueTask.
}
catch
{
m_processingCount = 0;
FaultWithTask(processingTask);
}
}
// If there are no waiting exclusive tasks, there are concurrent tasks, and we haven't reached our maximum
// concurrency level for processing, let's start processing more concurrent tasks.
else
{
int concurrentTasksWaitingCount = m_concurrentTaskScheduler.m_tasks.Count;
if (concurrentTasksWaitingCount > 0 && !exclusiveTasksAreWaiting && m_processingCount < m_maxConcurrencyLevel)
{
// Launch concurrent task processing, up to the allowed limit
for (int i = 0; i < concurrentTasksWaitingCount && m_processingCount < m_maxConcurrencyLevel; ++i)
{
++m_processingCount;
try
{
processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessConcurrentTasks(), this,
default(CancellationToken), GetCreationOptionsForTask(fairly));
processingTask.Start(m_underlyingTaskScheduler); // See above logic for why we use new + Start rather than StartNew
}
catch
{
--m_processingCount;
FaultWithTask(processingTask);
}
}
}
}
// Check to see if all tasks have completed and if completion has been requested.
CleanupStateIfCompletingAndQuiesced();
}
else Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing count must be the sentinel if it's not >= 0.");
}
///
/// Processes exclusive tasks serially until either there are no more to process
/// or we've reached our user-specified maximum limit.
///
private void ProcessExclusiveTasks()
{
Contract.Requires(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "Processing exclusive tasks requires being in exclusive mode.");
Contract.Requires(!m_exclusiveTaskScheduler.m_tasks.IsEmpty, "Processing exclusive tasks requires tasks to be processed.");
ContractAssertMonitorStatus(ValueLock, held: false);
try
{
// Note that we're processing exclusive tasks on the current thread
Debug.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId),
"This thread should not yet be involved in this pair's processing.");
m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingExclusiveTask;
// Process up to the maximum number of items per task allowed
for (int i = 0; i < m_maxItemsPerTask; i++)
{
// Get the next available exclusive task. If we can't find one, bail.
Task exclusiveTask;
if (!m_exclusiveTaskScheduler.m_tasks.TryDequeue(out exclusiveTask)) break;
// Execute the task. If the scheduler was previously faulted,
// this task could have been faulted when it was queued; ignore such tasks.
if (!exclusiveTask.IsFaulted) m_exclusiveTaskScheduler.ExecuteTask(exclusiveTask);
}
}
finally
{
// We're no longer processing exclusive tasks on the current thread
ProcessingMode currentMode;
m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode);
Debug.Assert(currentMode == ProcessingMode.ProcessingExclusiveTask,
"Somehow we ended up escaping exclusive mode.");
lock (ValueLock)
{
// When this task was launched, we tracked it by setting m_processingCount to WRITER_IN_PROGRESS.
// now reset it to 0. Then check to see whether there's more processing to be done.
// There might be more concurrent tasks available, for example, if concurrent tasks arrived
// after we exited the loop, or if we exited the loop while concurrent tasks were still
// available but we hit our maxItemsPerTask limit.
Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing mode should not have deviated from exclusive.");
m_processingCount = 0;
ProcessAsyncIfNecessary(true);
}
}
}
///
/// Processes concurrent tasks serially until either there are no more to process,
/// we've reached our user-specified maximum limit, or exclusive tasks have arrived.
///
private void ProcessConcurrentTasks()
{
Contract.Requires(m_processingCount > 0, "Processing concurrent tasks requires us to be in concurrent mode.");
ContractAssertMonitorStatus(ValueLock, held: false);
try
{
// Note that we're processing concurrent tasks on the current thread
Debug.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId),
"This thread should not yet be involved in this pair's processing.");
m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingConcurrentTasks;
// Process up to the maximum number of items per task allowed
for (int i = 0; i < m_maxItemsPerTask; i++)
{
// Get the next available concurrent task. If we can't find one, bail.
Task concurrentTask;
if (!m_concurrentTaskScheduler.m_tasks.TryDequeue(out concurrentTask)) break;
// Execute the task. If the scheduler was previously faulted,
// this task could have been faulted when it was queued; ignore such tasks.
if (!concurrentTask.IsFaulted) m_concurrentTaskScheduler.ExecuteTask(concurrentTask);
// Now check to see if exclusive tasks have arrived; if any have, they take priority
// so we'll bail out here. Note that we could have checked this condition
// in the for loop's condition, but that could lead to extra overhead
// in the case where a concurrent task arrives, this task is launched, and then
// before entering the loop an exclusive task arrives. If we didn't execute at
// least one task, we would have spent all of the overhead to launch a
// task but with none of the benefit. There's of course also an inherent
// race condition here with regards to exclusive tasks arriving, and we're ok with
// executing one more concurrent task than we should before giving priority to exclusive tasks.
if (!m_exclusiveTaskScheduler.m_tasks.IsEmpty) break;
}
}
finally
{
// We're no longer processing concurrent tasks on the current thread
ProcessingMode currentMode;
m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode);
Debug.Assert(currentMode == ProcessingMode.ProcessingConcurrentTasks,
"Somehow we ended up escaping concurrent mode.");
lock (ValueLock)
{
// When this task was launched, we tracked it with a positive processing count;
// decrement that count. Then check to see whether there's more processing to be done.
// There might be more concurrent tasks available, for example, if concurrent tasks arrived
// after we exited the loop, or if we exited the loop while concurrent tasks were still
// available but we hit our maxItemsPerTask limit.
Debug.Assert(m_processingCount > 0, "The procesing mode should not have deviated from concurrent.");
if (m_processingCount > 0) --m_processingCount;
ProcessAsyncIfNecessary(true);
}
}
}
#if PRENET45
///
/// Type used with TaskCompletionSource(Of TResult) as the TResult
/// to ensure that the resulting task can't be upcast to something
/// that in the future could lead to compat problems.
///
[SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
[DebuggerNonUserCode]
private struct VoidTaskResult { }
#endif
///
/// Holder for lazily-initialized state about the completion of a scheduler pair.
/// Completion is only triggered either by rare exceptional conditions or by
/// the user calling Complete, and as such we only lazily initialize this
/// state in one of those conditions or if the user explicitly asks for
/// the Completion.
///
[SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
private sealed class CompletionState : TaskCompletionSource
{
/// Whether the scheduler has had completion requested.
/// This variable is not volatile, so to gurantee safe reading reads, Volatile.Read is used in TryExecuteTaskInline.
internal bool m_completionRequested;
/// Whether completion processing has been queued.
internal bool m_completionQueued;
/// Unrecoverable exceptions incurred while processing.
internal List m_exceptions;
}
///
/// A scheduler shim used to queue tasks to the pair and execute those tasks on request of the pair.
///
[DebuggerDisplay("Count={CountForDebugger}, MaxConcurrencyLevel={m_maxConcurrencyLevel}, Id={Id}")]
[DebuggerTypeProxy(typeof(ConcurrentExclusiveTaskScheduler.DebugView))]
private sealed class ConcurrentExclusiveTaskScheduler : TaskScheduler
{
/// Cached delegate for invoking TryExecuteTaskShim.
private static readonly Func