diff options
author | Jan Kotas <jkotas@microsoft.com> | 2018-12-22 10:01:00 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-22 10:01:00 -0800 |
commit | dc3f080b89b7d3c85afdb8b6d2b9086363c48c14 (patch) | |
tree | 21ff4ad69efdadb9ee1d323481b5e36b9a1dc7d7 /src/System.Private.CoreLib/shared | |
parent | 0f8e9ee7a47423852b6112006e90504bc60e28e5 (diff) | |
download | coreclr-dc3f080b89b7d3c85afdb8b6d2b9086363c48c14.tar.gz coreclr-dc3f080b89b7d3c85afdb8b6d2b9086363c48c14.tar.bz2 coreclr-dc3f080b89b7d3c85afdb8b6d2b9086363c48c14.zip |
Move some Task related files to shared CoreLib partition (#21650)
Diffstat (limited to 'src/System.Private.CoreLib/shared')
8 files changed, 4195 insertions, 0 deletions
diff --git a/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems b/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems index 397853e3da..96c5b52f68 100644 --- a/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems +++ b/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems @@ -676,6 +676,8 @@ <Compile Include="$(MSBuildThisFileDirectory)System\Threading\AsyncLocal.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\AutoResetEvent.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\CancellationToken.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\CancellationTokenRegistration.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\CancellationTokenSource.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\DeferredDisposableLifetime.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\EventResetMode.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\EventWaitHandle.cs" /> @@ -698,11 +700,16 @@ <Compile Include="$(MSBuildThisFileDirectory)System\Threading\SynchronizationLockException.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadLocal.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ConcurrentExclusiveSchedulerPair.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Future.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ProducerConsumerQueues.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskCanceledException.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskCompletionSource.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskExceptionHolder.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskExtensions.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskToApm.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskScheduler.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskSchedulerException.cs" /> + <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ThreadPoolTaskScheduler.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ValueTask.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\ManualResetValueTaskSourceCore.cs" /> <Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\IValueTaskSource.cs" /> diff --git a/src/System.Private.CoreLib/shared/System/Threading/CancellationTokenRegistration.cs b/src/System.Private.CoreLib/shared/System/Threading/CancellationTokenRegistration.cs new file mode 100644 index 0000000000..bab2ce9c03 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/CancellationTokenRegistration.cs @@ -0,0 +1,170 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading +{ + /// <summary> + /// Represents a callback delegate that has been registered with a <see cref="T:System.Threading.CancellationToken">CancellationToken</see>. + /// </summary> + /// <remarks> + /// To unregister a callback, dispose the corresponding Registration instance. + /// </remarks> + public readonly struct CancellationTokenRegistration : IEquatable<CancellationTokenRegistration>, IDisposable, IAsyncDisposable + { + private readonly long _id; + private readonly CancellationTokenSource.CallbackNode _node; + + internal CancellationTokenRegistration(long id, CancellationTokenSource.CallbackNode node) + { + _id = id; + _node = node; + } + + /// <summary> + /// Disposes of the registration and unregisters the target callback from the associated + /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>. + /// If the target callback is currently executing, this method will wait until it completes, except + /// in the degenerate cases where a callback method unregisters itself. + /// </summary> + public void Dispose() + { + CancellationTokenSource.CallbackNode node = _node; + if (node != null && !node.Partition.Unregister(_id, node)) + { + WaitForCallbackIfNecessary(); + } + } + + /// <summary> + /// Disposes of the registration and unregisters the target callback from the associated + /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>. + /// The returned <see cref="ValueTask"/> will complete once the associated callback + /// is unregistered without having executed or once it's finished executing, except + /// in the degenerate case where the callback itself is unregistering itself. + /// </summary> + public ValueTask DisposeAsync() + { + CancellationTokenSource.CallbackNode node = _node; + return node != null && !node.Partition.Unregister(_id, node) ? + WaitForCallbackIfNecessaryAsync() : + default; + } + + /// <summary> + /// Gets the <see cref="CancellationToken"/> with which this registration is associated. If the + /// registration isn't associated with a token (such as after the registration has been disposed), + /// this will return a default token. + /// </summary> + public CancellationToken Token + { + get + { + CancellationTokenSource.CallbackNode node = _node; + return node != null ? + new CancellationToken(node.Partition.Source) : // avoid CTS.Token, which throws after disposal + default; + } + } + + /// <summary> + /// Disposes of the registration and unregisters the target callback from the associated + /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>. + /// </summary> + public bool Unregister() + { + CancellationTokenSource.CallbackNode node = _node; + return node != null && node.Partition.Unregister(_id, node); + } + + private void WaitForCallbackIfNecessary() + { + // We're a valid registration but we were unable to unregister, which means the callback wasn't in the list, + // which means either it already executed or it's currently executing. We guarantee that we will not return + // if the callback is being executed (assuming we are not currently called by the callback itself) + // We achieve this by the following rules: + // 1. If we are called in the context of an executing callback, no need to wait (determined by tracking callback-executor threadID) + // - if the currently executing callback is this CTR, then waiting would deadlock. (We choose to return rather than deadlock) + // - if not, then this CTR cannot be the one executing, hence no need to wait + // 2. If unregistration failed, and we are on a different thread, then the callback may be running under control of cts.Cancel() + // => poll until cts.ExecutingCallback is not the one we are trying to unregister. + CancellationTokenSource source = _node.Partition.Source; + if (source.IsCancellationRequested && // Running callbacks has commenced. + !source.IsCancellationCompleted && // Running callbacks hasn't finished. + source.ThreadIDExecutingCallbacks != Environment.CurrentManagedThreadId) // The executing thread ID is not this thread's ID. + { + // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution + // so observe and wait until this target callback is no longer the executing callback. + source.WaitForCallbackToComplete(_id); + } + } + + private ValueTask WaitForCallbackIfNecessaryAsync() + { + // Same as WaitForCallbackIfNecessary, except returning a task that'll be completed when callbacks complete. + + CancellationTokenSource source = _node.Partition.Source; + if (source.IsCancellationRequested && // Running callbacks has commenced. + !source.IsCancellationCompleted && // Running callbacks hasn't finished. + source.ThreadIDExecutingCallbacks != Environment.CurrentManagedThreadId) // The executing thread ID is not this thread's ID. + { + // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution + // so get a task that'll complete when this target callback is no longer the executing callback. + return source.WaitForCallbackToCompleteAsync(_id); + } + + // Callback is either already completed, won't execute, or the callback itself is calling this. + return default; + } + + /// <summary> + /// Determines whether two <see + /// cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> + /// instances are equal. + /// </summary> + /// <param name="left">The first instance.</param> + /// <param name="right">The second instance.</param> + /// <returns>True if the instances are equal; otherwise, false.</returns> + public static bool operator ==(CancellationTokenRegistration left, CancellationTokenRegistration right) => left.Equals(right); + + /// <summary> + /// Determines whether two <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> instances are not equal. + /// </summary> + /// <param name="left">The first instance.</param> + /// <param name="right">The second instance.</param> + /// <returns>True if the instances are not equal; otherwise, false.</returns> + public static bool operator !=(CancellationTokenRegistration left, CancellationTokenRegistration right) => !left.Equals(right); + + /// <summary> + /// Determines whether the current <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> instance is equal to the + /// specified <see cref="T:System.Object"/>. + /// </summary> + /// <param name="obj">The other object to which to compare this instance.</param> + /// <returns>True, if both this and <paramref name="obj"/> are equal. False, otherwise. + /// Two <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> instances are equal if + /// they both refer to the output of a single call to the same Register method of a + /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>. + /// </returns> + public override bool Equals(object obj) => obj is CancellationTokenRegistration && Equals((CancellationTokenRegistration)obj); + + /// <summary> + /// Determines whether the current <see cref="T:System.Threading.CancellationToken">CancellationToken</see> instance is equal to the + /// specified <see cref="T:System.Object"/>. + /// </summary> + /// <param name="other">The other <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> to which to compare this instance.</param> + /// <returns>True, if both this and <paramref name="other"/> are equal. False, otherwise. + /// Two <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> instances are equal if + /// they both refer to the output of a single call to the same Register method of a + /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see>. + /// </returns> + public bool Equals(CancellationTokenRegistration other) => _node == other._node && _id == other._id; + + /// <summary> + /// Serves as a hash function for a <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration.</see>. + /// </summary> + /// <returns>A hash code for the current <see cref="T:System.Threading.CancellationTokenRegistration">CancellationTokenRegistration</see> instance.</returns> + public override int GetHashCode() => _node != null ? _node.GetHashCode() ^ _id.GetHashCode() : _id.GetHashCode(); + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/CancellationTokenSource.cs b/src/System.Private.CoreLib/shared/System/Threading/CancellationTokenSource.cs new file mode 100644 index 0000000000..937bbaf456 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/CancellationTokenSource.cs @@ -0,0 +1,1021 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace System.Threading +{ + /// <summary>Signals to a <see cref="CancellationToken"/> that it should be canceled.</summary> + /// <remarks> + /// <para> + /// <see cref="CancellationTokenSource"/> is used to instantiate a <see cref="CancellationToken"/> (via + /// the source's <see cref="Token">Token</see> property) that can be handed to operations that wish to be + /// notified of cancellation or that can be used to register asynchronous operations for cancellation. That + /// token may have cancellation requested by calling to the source's <see cref="Cancel()"/> method. + /// </para> + /// <para> + /// All members of this class, except <see cref="Dispose()"/>, are thread-safe and may be used + /// concurrently from multiple threads. + /// </para> + /// </remarks> + public class CancellationTokenSource : IDisposable + { + /// <summary>A <see cref="CancellationTokenSource"/> that's already canceled.</summary> + internal static readonly CancellationTokenSource s_canceledSource = new CancellationTokenSource() { _state = NotifyingCompleteState }; + /// <summary>A <see cref="CancellationTokenSource"/> that's never canceled. This isn't enforced programmatically, only by usage. Do not cancel!</summary> + internal static readonly CancellationTokenSource s_neverCanceledSource = new CancellationTokenSource(); + + /// <summary>Delegate used with <see cref="Timer"/> to trigger cancellation of a <see cref="CancellationTokenSource"/>.</summary> + private static readonly TimerCallback s_timerCallback = obj => + ((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false); // skip ThrowIfDisposed() check in Cancel() + + /// <summary>The number of callback partitions to use in a <see cref="CancellationTokenSource"/>. Must be a power of 2.</summary> + private static readonly int s_numPartitions = GetPartitionCount(); + /// <summary><see cref="s_numPartitions"/> - 1, used to quickly mod into <see cref="_callbackPartitions"/>.</summary> + private static readonly int s_numPartitionsMask = s_numPartitions - 1; + + /// <summary>The current state of the CancellationTokenSource.</summary> + private volatile int _state; + /// <summary>The ID of the thread currently executing the main body of CTS.Cancel()</summary> + /// <remarks> + /// This helps us to know if a call to ctr.Dispose() is running 'within' a cancellation callback. + /// This is updated as we move between the main thread calling cts.Cancel() and any syncContexts + /// that are used to actually run the callbacks. + /// </remarks> + private volatile int _threadIDExecutingCallbacks = -1; + /// <summary>Tracks the running callback to assist ctr.Dispose() to wait for the target callback to complete.</summary> + private long _executingCallbackId; + /// <summary>Partitions of callbacks. Split into multiple partitions to help with scalability of registering/unregistering; each is protected by its own lock.</summary> + private volatile CallbackPartition[] _callbackPartitions; + /// <summary>TimerQueueTimer used by CancelAfter and Timer-related ctors. Used instead of Timer to avoid extra allocations and because the rooted behavior is desired.</summary> + private volatile TimerQueueTimer _timer; + /// <summary><see cref="System.Threading.WaitHandle"/> lazily initialized and returned from <see cref="WaitHandle"/>.</summary> + private volatile ManualResetEvent _kernelEvent; + /// <summary>Whether this <see cref="CancellationTokenSource"/> has been disposed.</summary> + private bool _disposed; + + // legal values for _state + private const int NotCanceledState = 1; + private const int NotifyingState = 2; + private const int NotifyingCompleteState = 3; + + /// <summary>Gets whether cancellation has been requested for this <see cref="CancellationTokenSource" />.</summary> + /// <value>Whether cancellation has been requested for this <see cref="CancellationTokenSource" />.</value> + /// <remarks> + /// <para> + /// This property indicates whether cancellation has been requested for this token source, such as + /// due to a call to its <see cref="Cancel()"/> method. + /// </para> + /// <para> + /// If this property returns true, it only guarantees that cancellation has been requested. It does not + /// guarantee that every handler registered with the corresponding token has finished executing, nor + /// that cancellation requests have finished propagating to all registered handlers. Additional + /// synchronization may be required, particularly in situations where related objects are being + /// canceled concurrently. + /// </para> + /// </remarks> + public bool IsCancellationRequested => _state >= NotifyingState; + + /// <summary>A simple helper to determine whether cancellation has finished.</summary> + internal bool IsCancellationCompleted => _state == NotifyingCompleteState; + + /// <summary>A simple helper to determine whether disposal has occurred.</summary> + internal bool IsDisposed => _disposed; + + /// <summary>The ID of the thread that is running callbacks.</summary> + internal int ThreadIDExecutingCallbacks + { + get => _threadIDExecutingCallbacks; + set => _threadIDExecutingCallbacks = value; + } + + /// <summary>Gets the <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/>.</summary> + /// <value>The <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/>.</value> + /// <exception cref="ObjectDisposedException">The token source has been disposed.</exception> + public CancellationToken Token + { + get + { + ThrowIfDisposed(); + return new CancellationToken(this); + } + } + + internal WaitHandle WaitHandle + { + get + { + ThrowIfDisposed(); + + // Return the handle if it was already allocated. + if (_kernelEvent != null) + { + return _kernelEvent; + } + + // Lazily-initialize the handle. + var mre = new ManualResetEvent(false); + if (Interlocked.CompareExchange(ref _kernelEvent, mre, null) != null) + { + mre.Dispose(); + } + + // There is a race condition between checking IsCancellationRequested and setting the event. + // However, at this point, the kernel object definitely exists and the cases are: + // 1. if IsCancellationRequested = true, then we will call Set() + // 2. if IsCancellationRequested = false, then NotifyCancellation will see that the event exists, and will call Set(). + if (IsCancellationRequested) + { + _kernelEvent.Set(); + } + + return _kernelEvent; + } + } + + + /// <summary>Gets the ID of the currently executing callback.</summary> + internal long ExecutingCallback => Volatile.Read(ref _executingCallbackId); + + /// <summary>Initializes the <see cref="CancellationTokenSource"/>.</summary> + public CancellationTokenSource() => _state = NotCanceledState; + + /// <summary> + /// Constructs a <see cref="CancellationTokenSource"/> that will be canceled after a specified time span. + /// </summary> + /// <param name="delay">The time span to wait before canceling this <see cref="CancellationTokenSource"/></param> + /// <exception cref="ArgumentOutOfRangeException"> + /// The exception that is thrown when <paramref name="delay"/> is less than -1 or greater than int.MaxValue. + /// </exception> + /// <remarks> + /// <para> + /// The countdown for the delay starts during the call to the constructor. When the delay expires, + /// the constructed <see cref="CancellationTokenSource"/> is canceled, if it has + /// not been canceled already. + /// </para> + /// <para> + /// Subsequent calls to CancelAfter will reset the delay for the constructed + /// <see cref="CancellationTokenSource"/>, if it has not been + /// canceled already. + /// </para> + /// </remarks> + public CancellationTokenSource(TimeSpan delay) + { + long totalMilliseconds = (long)delay.TotalMilliseconds; + if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue) + { + throw new ArgumentOutOfRangeException(nameof(delay)); + } + + InitializeWithTimer((int)totalMilliseconds); + } + + /// <summary> + /// Constructs a <see cref="CancellationTokenSource"/> that will be canceled after a specified time span. + /// </summary> + /// <param name="millisecondsDelay">The time span to wait before canceling this <see cref="CancellationTokenSource"/></param> + /// <exception cref="ArgumentOutOfRangeException"> + /// The exception that is thrown when <paramref name="millisecondsDelay"/> is less than -1. + /// </exception> + /// <remarks> + /// <para> + /// The countdown for the millisecondsDelay starts during the call to the constructor. When the millisecondsDelay expires, + /// the constructed <see cref="CancellationTokenSource"/> is canceled (if it has + /// not been canceled already). + /// </para> + /// <para> + /// Subsequent calls to CancelAfter will reset the millisecondsDelay for the constructed + /// <see cref="CancellationTokenSource"/>, if it has not been + /// canceled already. + /// </para> + /// </remarks> + public CancellationTokenSource(int millisecondsDelay) + { + if (millisecondsDelay < -1) + { + throw new ArgumentOutOfRangeException(nameof(millisecondsDelay)); + } + + InitializeWithTimer(millisecondsDelay); + } + + /// <summary>Common initialization logic when constructing a CTS with a delay parameter</summary> + private void InitializeWithTimer(int millisecondsDelay) + { + _state = NotCanceledState; + _timer = new TimerQueueTimer(s_timerCallback, this, (uint)millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false); + + // The timer roots this CTS instance while it's scheduled. That is by design, so + // that code like: + // CancellationToken ct = new CancellationTokenSource(timeout).Token; + // will successfully cancel the token after the timeout. + } + + /// <summary>Communicates a request for cancellation.</summary> + /// <remarks> + /// <para> + /// The associated <see cref="CancellationToken" /> will be notified of the cancellation + /// and will transition to a state where <see cref="CancellationToken.IsCancellationRequested"/> returns true. + /// Any callbacks or cancelable operations registered with the <see cref="CancellationToken"/> will be executed. + /// </para> + /// <para> + /// Cancelable operations and callbacks registered with the token should not throw exceptions. + /// However, this overload of Cancel will aggregate any exceptions thrown into a <see cref="AggregateException"/>, + /// such that one callback throwing an exception will not prevent other registered callbacks from being executed. + /// </para> + /// <para> + /// The <see cref="ExecutionContext"/> that was captured when each callback was registered + /// will be reestablished when the callback is invoked. + /// </para> + /// </remarks> + /// <exception cref="AggregateException">An aggregate exception containing all the exceptions thrown + /// by the registered callbacks on the associated <see cref="CancellationToken"/>.</exception> + /// <exception cref="ObjectDisposedException">This <see cref="CancellationTokenSource"/> has been disposed.</exception> + public void Cancel() => Cancel(false); + + /// <summary>Communicates a request for cancellation.</summary> + /// <remarks> + /// <para> + /// The associated <see cref="CancellationToken" /> will be notified of the cancellation and will transition to a state where + /// <see cref="CancellationToken.IsCancellationRequested"/> returns true. Any callbacks or cancelable operationsregistered + /// with the <see cref="CancellationToken"/> will be executed. + /// </para> + /// <para> + /// Cancelable operations and callbacks registered with the token should not throw exceptions. + /// If <paramref name="throwOnFirstException"/> is true, an exception will immediately propagate out of the + /// call to Cancel, preventing the remaining callbacks and cancelable operations from being processed. + /// If <paramref name="throwOnFirstException"/> is false, this overload will aggregate any + /// exceptions thrown into a <see cref="AggregateException"/>, + /// such that one callback throwing an exception will not prevent other registered callbacks from being executed. + /// </para> + /// <para> + /// The <see cref="ExecutionContext"/> that was captured when each callback was registered + /// will be reestablished when the callback is invoked. + /// </para> + /// </remarks> + /// <param name="throwOnFirstException">Specifies whether exceptions should immediately propagate.</param> + /// <exception cref="AggregateException">An aggregate exception containing all the exceptions thrown + /// by the registered callbacks on the associated <see cref="CancellationToken"/>.</exception> + /// <exception cref="ObjectDisposedException">This <see cref="CancellationTokenSource"/> has been disposed.</exception> + public void Cancel(bool throwOnFirstException) + { + ThrowIfDisposed(); + NotifyCancellation(throwOnFirstException); + } + + /// <summary>Schedules a Cancel operation on this <see cref="CancellationTokenSource"/>.</summary> + /// <param name="delay">The time span to wait before canceling this <see cref="CancellationTokenSource"/>. + /// </param> + /// <exception cref="ObjectDisposedException">The exception thrown when this <see + /// cref="CancellationTokenSource"/> has been disposed. + /// </exception> + /// <exception cref="ArgumentOutOfRangeException"> + /// The exception thrown when <paramref name="delay"/> is less than -1 or + /// greater than int.MaxValue. + /// </exception> + /// <remarks> + /// <para> + /// The countdown for the delay starts during this call. When the delay expires, + /// this <see cref="CancellationTokenSource"/> is canceled, if it has + /// not been canceled already. + /// </para> + /// <para> + /// Subsequent calls to CancelAfter will reset the delay for this + /// <see cref="CancellationTokenSource"/>, if it has not been canceled already. + /// </para> + /// </remarks> + public void CancelAfter(TimeSpan delay) + { + long totalMilliseconds = (long)delay.TotalMilliseconds; + if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue) + { + throw new ArgumentOutOfRangeException(nameof(delay)); + } + + CancelAfter((int)totalMilliseconds); + } + + /// <summary> + /// Schedules a Cancel operation on this <see cref="CancellationTokenSource"/>. + /// </summary> + /// <param name="millisecondsDelay">The time span to wait before canceling this <see + /// cref="CancellationTokenSource"/>. + /// </param> + /// <exception cref="ObjectDisposedException">The exception thrown when this <see + /// cref="CancellationTokenSource"/> has been disposed. + /// </exception> + /// <exception cref="ArgumentOutOfRangeException"> + /// The exception thrown when <paramref name="millisecondsDelay"/> is less than -1. + /// </exception> + /// <remarks> + /// <para> + /// The countdown for the millisecondsDelay starts during this call. When the millisecondsDelay expires, + /// this <see cref="CancellationTokenSource"/> is canceled, if it has + /// not been canceled already. + /// </para> + /// <para> + /// Subsequent calls to CancelAfter will reset the millisecondsDelay for this + /// <see cref="CancellationTokenSource"/>, if it has not been + /// canceled already. + /// </para> + /// </remarks> + public void CancelAfter(int millisecondsDelay) + { + ThrowIfDisposed(); + + if (millisecondsDelay < -1) + { + throw new ArgumentOutOfRangeException(nameof(millisecondsDelay)); + } + + if (IsCancellationRequested) + { + return; + } + + // There is a race condition here as a Cancel could occur between the check of + // IsCancellationRequested and the creation of the timer. This is benign; in the + // worst case, a timer will be created that has no effect when it expires. + + // Also, if Dispose() is called right here (after ThrowIfDisposed(), before timer + // creation), it would result in a leaked Timer object (at least until the timer + // expired and Disposed itself). But this would be considered bad behavior, as + // Dispose() is not thread-safe and should not be called concurrently with CancelAfter(). + + TimerQueueTimer timer = _timer; + if (timer == null) + { + // Lazily initialize the timer in a thread-safe fashion. + // Initially set to "never go off" because we don't want to take a + // chance on a timer "losing" the initialization and then + // cancelling the token before it (the timer) can be disposed. + timer = new TimerQueueTimer(s_timerCallback, this, Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false); + TimerQueueTimer currentTimer = Interlocked.CompareExchange(ref _timer, timer, null); + if (currentTimer != null) + { + // We did not initialize the timer. Dispose the new timer. + timer.Close(); + timer = currentTimer; + } + } + + // It is possible that _timer has already been disposed, so we must do + // the following in a try/catch block. + try + { + timer.Change((uint)millisecondsDelay, Timeout.UnsignedInfinite); + } + catch (ObjectDisposedException) + { + // Just eat the exception. There is no other way to tell that + // the timer has been disposed, and even if there were, there + // would not be a good way to deal with the observe/dispose + // race condition. + } + } + + + + /// <summary>Releases the resources used by this <see cref="CancellationTokenSource" />.</summary> + /// <remarks>This method is not thread-safe for any other concurrent calls.</remarks> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Releases the unmanaged resources used by the <see cref="CancellationTokenSource" /> class and optionally releases the managed resources. + /// </summary> + /// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param> + protected virtual void Dispose(bool disposing) + { + if (disposing && !_disposed) + { + // We specifically tolerate that a callback can be unregistered + // after the CTS has been disposed and/or concurrently with cts.Dispose(). + // This is safe without locks because Dispose doesn't interact with values + // in the callback partitions, only nulling out the ref to existing partitions. + // + // We also tolerate that a callback can be registered after the CTS has been + // disposed. This is safe because InternalRegister is tolerant + // of _callbackPartitions becoming null during its execution. However, + // we run the acceptable risk of _callbackPartitions getting reinitialized + // to non-null if there is a race between Dispose and Register, in which case this + // instance may unnecessarily hold onto a registered callback. But that's no worse + // than if Dispose wasn't safe to use concurrently, as Dispose would never be called, + // and thus no handlers would be dropped. + // + // And, we tolerate Dispose being used concurrently with Cancel. This is necessary + // to properly support, e.g., LinkedCancellationTokenSource, where, due to common usage patterns, + // it's possible for this pairing to occur with valid usage (e.g. a component accepts + // an external CancellationToken and uses CreateLinkedTokenSource to combine it with an + // internal source of cancellation, then Disposes of that linked source, which could + // happen at the same time the external entity is requesting cancellation). + + TimerQueueTimer timer = _timer; + if (timer != null) + { + _timer = null; + timer.Close(); // TimerQueueTimer.Close is thread-safe + } + + _callbackPartitions = null; // free for GC; Cancel correctly handles a null field + + // If a kernel event was created via WaitHandle, we'd like to Dispose of it. However, + // we only want to do so if it's not being used by Cancel concurrently. First, we + // interlocked exchange it to be null, and then we check whether cancellation is currently + // in progress. NotifyCancellation will only try to set the event if it exists after it's + // transitioned to and while it's in the NotifyingState. + if (_kernelEvent != null) + { + ManualResetEvent mre = Interlocked.Exchange(ref _kernelEvent, null); + if (mre != null && _state != NotifyingState) + { + mre.Dispose(); + } + } + + _disposed = true; + } + } + + /// <summary>Throws an exception if the source has been disposed.</summary> + private void ThrowIfDisposed() + { + if (_disposed) + { + ThrowObjectDisposedException(); + } + } + + /// <summary>Throws an <see cref="ObjectDisposedException"/>. Separated out from ThrowIfDisposed to help with inlining.</summary> + private static void ThrowObjectDisposedException() => + throw new ObjectDisposedException(null, SR.CancellationTokenSource_Disposed); + + /// <summary> + /// Registers a callback object. If cancellation has already occurred, the + /// callback will have been run by the time this method returns. + /// </summary> + internal CancellationTokenRegistration InternalRegister( + Action<object> callback, object stateForCallback, SynchronizationContext syncContext, ExecutionContext executionContext) + { + Debug.Assert(this != s_neverCanceledSource, "This source should never be exposed via a CancellationToken."); + + // If not canceled, register the handler; if canceled already, run the callback synchronously. + // This also ensures that during ExecuteCallbackHandlers() there will be no mutation of the _callbackPartitions. + if (!IsCancellationRequested) + { + // In order to enable code to not leak too many handlers, we allow Dispose to be called concurrently + // with Register. While this is not a recommended practice, consumers can and do use it this way. + // We don't make any guarantees about whether the CTS will hold onto the supplied callback if the CTS + // has already been disposed when the callback is registered, but we try not to while at the same time + // not paying any non-negligible overhead. The simple compromise is to check whether we're disposed + // (not volatile), and if we see we are, to return an empty registration. If there's a race and _disposed + // is false even though it's been disposed, or if the disposal request comes in after this line, we simply + // run the minor risk of having _callbackPartitions reinitialized (after it was cleared to null during Dispose). + if (_disposed) + { + return new CancellationTokenRegistration(); + } + + // Get the partitions... + CallbackPartition[] partitions = _callbackPartitions; + if (partitions == null) + { + partitions = new CallbackPartition[s_numPartitions]; + partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions; + } + + // ...and determine which partition to use. + int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask; + Debug.Assert(partitionIndex < partitions.Length, $"Expected {partitionIndex} to be less than {partitions.Length}"); + CallbackPartition partition = partitions[partitionIndex]; + if (partition == null) + { + partition = new CallbackPartition(this); + partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition; + } + + // Store the callback information into the callback arrays. + long id; + CallbackNode node; + bool lockTaken = false; + partition.Lock.Enter(ref lockTaken); + try + { + // Assign the next available unique ID. + id = partition.NextAvailableId++; + + // Get a node, from the free list if possible or else a new one. + node = partition.FreeNodeList; + if (node != null) + { + partition.FreeNodeList = node.Next; + Debug.Assert(node.Prev == null, "Nodes in the free list should all have a null Prev"); + // node.Next will be overwritten below so no need to set it here. + } + else + { + node = new CallbackNode(partition); + } + + // Configure the node. + node.Id = id; + node.Callback = callback; + node.CallbackState = stateForCallback; + node.ExecutionContext = executionContext; + node.SynchronizationContext = syncContext; + + // Add it to the callbacks list. + node.Next = partition.Callbacks; + if (node.Next != null) + { + node.Next.Prev = node; + } + partition.Callbacks = node; + } + finally + { + partition.Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts + } + + // If cancellation hasn't been requested, return the registration. + // if cancellation has been requested, try to undo the registration and run the callback + // ourselves, but if we can't unregister it (e.g. the thread running Cancel snagged + // our callback for execution), return the registration so that the caller can wait + // for callback completion in ctr.Dispose(). + var ctr = new CancellationTokenRegistration(id, node); + if (!IsCancellationRequested || !partition.Unregister(id, node)) + { + return ctr; + } + } + + // Cancellation already occurred. Run the callback on this thread and return an empty registration. + callback(stateForCallback); + return default; + } + + private void NotifyCancellation(bool throwOnFirstException) + { + // If we're the first to signal cancellation, do the main extra work. + if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState) + { + // Dispose of the timer, if any. Dispose may be running concurrently here, but TimerQueueTimer.Close is thread-safe. + TimerQueueTimer timer = _timer; + if (timer != null) + { + _timer = null; + timer.Close(); + } + + // Set the event if it's been lazily initialized and hasn't yet been disposed of. Dispose may + // be running concurrently, in which case either it'll have set m_kernelEvent back to null and + // we won't see it here, or it'll see that we've transitioned to NOTIFYING and will skip disposing it, + // leaving cleanup to finalization. + _kernelEvent?.Set(); // update the MRE value. + + // - late enlisters to the Canceled event will have their callbacks called immediately in the Register() methods. + // - Callbacks are not called inside a lock. + // - After transition, no more delegates will be added to the + // - list of handlers, and hence it can be consumed and cleared at leisure by ExecuteCallbackHandlers. + ExecuteCallbackHandlers(throwOnFirstException); + Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished"); + } + } + + /// <summary>Invoke all registered callbacks.</summary> + /// <remarks>The handlers are invoked synchronously in LIFO order.</remarks> + private void ExecuteCallbackHandlers(bool throwOnFirstException) + { + Debug.Assert(IsCancellationRequested, "ExecuteCallbackHandlers should only be called after setting IsCancellationRequested->true"); + + // Record the threadID being used for running the callbacks. + ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; + + // If there are no callbacks to run, we can safely exit. Any race conditions to lazy initialize it + // will see IsCancellationRequested and will then run the callback themselves. + CallbackPartition[] partitions = Interlocked.Exchange(ref _callbackPartitions, null); + if (partitions == null) + { + Interlocked.Exchange(ref _state, NotifyingCompleteState); + return; + } + + List<Exception> exceptionList = null; + try + { + // For each partition, and each callback in that partition, execute the associated handler. + // We call the delegates in LIFO order on each partition so that callbacks fire 'deepest first'. + // This is intended to help with nesting scenarios so that child enlisters cancel before their parents. + foreach (CallbackPartition partition in partitions) + { + if (partition == null) + { + // Uninitialized partition. Nothing to do. + continue; + } + + // Iterate through all nodes in the partition. We remove each node prior + // to processing it. This allows for unregistration of subsequent registrations + // to still be effective even as other registrations are being invoked. + while (true) + { + CallbackNode node; + bool lockTaken = false; + partition.Lock.Enter(ref lockTaken); + try + { + // Pop the next registration from the callbacks list. + node = partition.Callbacks; + if (node == null) + { + // No more registrations to process. + break; + } + else + { + Debug.Assert(node.Prev == null); + if (node.Next != null) node.Next.Prev = null; + partition.Callbacks = node.Next; + } + + // Publish the intended callback ID, to ensure ctr.Dispose can tell if a wait is necessary. + // This write happens while the lock is held so that Dispose is either able to successfully + // unregister or is guaranteed to see an accurate executing callback ID, since it takes + // the same lock to remove the node from the callback list. + _executingCallbackId = node.Id; + + // Now that we've grabbed the Id, reset the node's Id to 0. This signals + // to code unregistering that the node is no longer associated with a callback. + node.Id = 0; + } + finally + { + partition.Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts + } + + // Invoke the callback on this thread if there's no sync context or on the + // target sync context if there is one. + try + { + if (node.SynchronizationContext != null) + { + // Transition to the target syncContext and continue there. + node.SynchronizationContext.Send(s => + { + var n = (CallbackNode)s; + n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; + n.ExecuteCallback(); + }, node); + ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; // above may have altered ThreadIDExecutingCallbacks, so reset it + } + else + { + node.ExecuteCallback(); + } + } + catch (Exception ex) when (!throwOnFirstException) + { + // Store the exception and continue + (exceptionList ?? (exceptionList = new List<Exception>())).Add(ex); + } + + // Drop the node. While we could add it to the free list, doing so has cost (we'd need to take the lock again) + // and very limited value. Since a source can only be canceled once, and after it's canceled registrations don't + // need nodes, the only benefit to putting this on the free list would be if Register raced with cancellation + // occurring, such that it could have used this free node but would instead need to allocate a new node (if + // there wasn't another free node available). + } + } + } + finally + { + _state = NotifyingCompleteState; + Volatile.Write(ref _executingCallbackId, 0); + Interlocked.MemoryBarrier(); // for safety, prevent reorderings crossing this point and seeing inconsistent state. + } + + if (exceptionList != null) + { + Debug.Assert(exceptionList.Count > 0, $"Expected {exceptionList.Count} > 0"); + throw new AggregateException(exceptionList); + } + } + + /// <summary>Gets the number of callback partitions to use based on the number of cores.</summary> + /// <returns>A power of 2 representing the number of partitions to use.</returns> + private static int GetPartitionCount() + { + int procs = PlatformHelper.ProcessorCount; + int count = + procs > 8 ? 16 : // capped at 16 to limit memory usage on larger machines + procs > 4 ? 8 : + procs > 2 ? 4 : + procs > 1 ? 2 : + 1; + Debug.Assert(count > 0 && (count & (count - 1)) == 0, $"Got {count}, but expected a power of 2"); + return count; + } + + /// <summary> + /// Creates a <see cref="CancellationTokenSource"/> that will be in the canceled state + /// when any of the source tokens are in the canceled state. + /// </summary> + /// <param name="token1">The first <see cref="CancellationToken">CancellationToken</see> to observe.</param> + /// <param name="token2">The second <see cref="CancellationToken">CancellationToken</see> to observe.</param> + /// <returns>A <see cref="CancellationTokenSource"/> that is linked + /// to the source tokens.</returns> + public static CancellationTokenSource CreateLinkedTokenSource(CancellationToken token1, CancellationToken token2) => + !token1.CanBeCanceled ? CreateLinkedTokenSource(token2) : + token2.CanBeCanceled ? new Linked2CancellationTokenSource(token1, token2) : + (CancellationTokenSource)new Linked1CancellationTokenSource(token1); + + /// <summary> + /// Creates a <see cref="CancellationTokenSource"/> that will be in the canceled state + /// when any of the source tokens are in the canceled state. + /// </summary> + /// <param name="token">The first <see cref="CancellationToken">CancellationToken</see> to observe.</param> + /// <returns>A <see cref="CancellationTokenSource"/> that is linked to the source tokens.</returns> + internal static CancellationTokenSource CreateLinkedTokenSource(CancellationToken token) => + token.CanBeCanceled ? new Linked1CancellationTokenSource(token) : new CancellationTokenSource(); + + /// <summary> + /// Creates a <see cref="CancellationTokenSource"/> that will be in the canceled state + /// when any of the source tokens are in the canceled state. + /// </summary> + /// <param name="tokens">The <see cref="CancellationToken">CancellationToken</see> instances to observe.</param> + /// <returns>A <see cref="CancellationTokenSource"/> that is linked to the source tokens.</returns> + /// <exception cref="System.ArgumentNullException"><paramref name="tokens"/> is null.</exception> + public static CancellationTokenSource CreateLinkedTokenSource(params CancellationToken[] tokens) + { + if (tokens == null) + { + throw new ArgumentNullException(nameof(tokens)); + } + + switch (tokens.Length) + { + case 0: + throw new ArgumentException(SR.CancellationToken_CreateLinkedToken_TokensIsEmpty); + case 1: + return CreateLinkedTokenSource(tokens[0]); + case 2: + return CreateLinkedTokenSource(tokens[0], tokens[1]); + default: + // a defensive copy is not required as the array has value-items that have only a single reference field, + // hence each item cannot be null itself, and reads of the payloads cannot be torn. + return new LinkedNCancellationTokenSource(tokens); + } + } + + /// <summary> + /// Wait for a single callback to complete (or, more specifically, to not be running). + /// It is ok to call this method if the callback has already finished. + /// Calling this method before the target callback has been selected for execution would be an error. + /// </summary> + internal void WaitForCallbackToComplete(long id) + { + var sw = new SpinWait(); + while (ExecutingCallback == id) + { + sw.SpinOnce(); // spin, as we assume callback execution is fast and that this situation is rare. + } + } + + /// <summary> + /// Asynchronously wait for a single callback to complete (or, more specifically, to not be running). + /// It is ok to call this method if the callback has already finished. + /// Calling this method before the target callback has been selected for execution would be an error. + /// </summary> + internal ValueTask WaitForCallbackToCompleteAsync(long id) + { + // If the currently executing callback is not the target one, then the target one has already + // completed and we can simply return. This should be the most common case, as the caller + // calls if we're currently canceling but doesn't know what callback is running, if any. + if (ExecutingCallback != id) + { + return default; + } + + // The specified callback is actually running: queue a task that'll poll for the currently + // executing callback to complete. In general scheduling such a work item that polls is a really + // unfortunate thing to do. However, we expect this to be a rare case (disposing while the associated + // callback is running), and brief when it happens (so the polling will be minimal), and making + // this work with a callback mechanism will add additional cost to other more common cases. + return new ValueTask(Task.Factory.StartNew(s => + { + var state = (Tuple<CancellationTokenSource, long>)s; + state.Item1.WaitForCallbackToComplete(state.Item2); + }, Tuple.Create(this, id), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default)); + } + + private sealed class Linked1CancellationTokenSource : CancellationTokenSource + { + private readonly CancellationTokenRegistration _reg1; + + internal Linked1CancellationTokenSource(CancellationToken token1) + { + _reg1 = token1.UnsafeRegister(LinkedNCancellationTokenSource.s_linkedTokenCancelDelegate, this); + } + + protected override void Dispose(bool disposing) + { + if (!disposing || _disposed) + { + return; + } + + _reg1.Dispose(); + base.Dispose(disposing); + } + } + + private sealed class Linked2CancellationTokenSource : CancellationTokenSource + { + private readonly CancellationTokenRegistration _reg1; + private readonly CancellationTokenRegistration _reg2; + + internal Linked2CancellationTokenSource(CancellationToken token1, CancellationToken token2) + { + _reg1 = token1.UnsafeRegister(LinkedNCancellationTokenSource.s_linkedTokenCancelDelegate, this); + _reg2 = token2.UnsafeRegister(LinkedNCancellationTokenSource.s_linkedTokenCancelDelegate, this); + } + + protected override void Dispose(bool disposing) + { + if (!disposing || _disposed) + { + return; + } + + _reg1.Dispose(); + _reg2.Dispose(); + base.Dispose(disposing); + } + } + + private sealed class LinkedNCancellationTokenSource : CancellationTokenSource + { + internal static readonly Action<object> s_linkedTokenCancelDelegate = + s => ((CancellationTokenSource)s).NotifyCancellation(throwOnFirstException: false); // skip ThrowIfDisposed() check in Cancel() + private CancellationTokenRegistration[] _linkingRegistrations; + + internal LinkedNCancellationTokenSource(params CancellationToken[] tokens) + { + _linkingRegistrations = new CancellationTokenRegistration[tokens.Length]; + + for (int i = 0; i < tokens.Length; i++) + { + if (tokens[i].CanBeCanceled) + { + _linkingRegistrations[i] = tokens[i].UnsafeRegister(s_linkedTokenCancelDelegate, this); + } + // Empty slots in the array will be default(CancellationTokenRegistration), which are nops to Dispose. + // Based on usage patterns, such occurrences should also be rare, such that it's not worth resizing + // the array and incurring the related costs. + } + } + + protected override void Dispose(bool disposing) + { + if (!disposing || _disposed) + { + return; + } + + CancellationTokenRegistration[] linkingRegistrations = _linkingRegistrations; + if (linkingRegistrations != null) + { + _linkingRegistrations = null; // release for GC once we're done enumerating + for (int i = 0; i < linkingRegistrations.Length; i++) + { + linkingRegistrations[i].Dispose(); + } + } + + base.Dispose(disposing); + } + } + + internal sealed class CallbackPartition + { + /// <summary>The associated source that owns this partition.</summary> + public readonly CancellationTokenSource Source; + /// <summary>Lock that protects all state in the partition.</summary> + public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false); // mutable struct; do not make this readonly + /// <summary>Doubly-linked list of callbacks registered with the partition. Callbacks are removed during unregistration and as they're invoked.</summary> + public CallbackNode Callbacks; + /// <summary>Singly-linked list of free nodes that can be used for subsequent callback registrations.</summary> + public CallbackNode FreeNodeList; + /// <summary>Every callback is assigned a unique, never-reused ID. This defines the next available ID.</summary> + public long NextAvailableId = 1; // avoid using 0, as that's the default long value and used to represent an empty node + + public CallbackPartition(CancellationTokenSource source) + { + Debug.Assert(source != null, "Expected non-null source"); + Source = source; + } + + internal bool Unregister(long id, CallbackNode node) + { + Debug.Assert(id != 0, "Expected non-zero id"); + Debug.Assert(node != null, "Expected non-null node"); + + bool lockTaken = false; + Lock.Enter(ref lockTaken); + try + { + if (node.Id != id) + { + // Either: + // - The callback is currently or has already been invoked, in which case node.Id + // will no longer equal the assigned id, as it will have transitioned to 0. + // - The registration was already disposed of, in which case node.Id will similarly + // no longer equal the assigned id, as it will have transitioned to 0 and potentially + // then to another (larger) value when reused for a new registration. + // In either case, there's nothing to unregister. + return false; + } + + // The registration must still be in the callbacks list. Remove it. + if (Callbacks == node) + { + Debug.Assert(node.Prev == null); + Callbacks = node.Next; + } + else + { + Debug.Assert(node.Prev != null); + node.Prev.Next = node.Next; + } + + if (node.Next != null) + { + node.Next.Prev = node.Prev; + } + + // Clear out the now unused node and put it on the singly-linked free list. + // The only field we don't clear out is the associated Partition, as that's fixed + // throughout the nodes lifetime, regardless of how many times its reused by + // the same partition (it's never used on a different partition). + node.Id = 0; + node.Callback = null; + node.CallbackState = null; + node.ExecutionContext = null; + node.SynchronizationContext = null; + node.Prev = null; + node.Next = FreeNodeList; + FreeNodeList = node; + + return true; + } + finally + { + Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts + } + } + } + + /// <summary>All of the state associated a registered callback, in a node that's part of a linked list of registered callbacks.</summary> + internal sealed class CallbackNode + { + public readonly CallbackPartition Partition; + public CallbackNode Prev; + public CallbackNode Next; + + public long Id; + public Action<object> Callback; + public object CallbackState; + public ExecutionContext ExecutionContext; + public SynchronizationContext SynchronizationContext; + + public CallbackNode(CallbackPartition partition) + { + Debug.Assert(partition != null, "Expected non-null partition"); + Partition = partition; + } + + public void ExecuteCallback() + { + ExecutionContext context = ExecutionContext; + if (context != null) + { + ExecutionContext.RunInternal(context, s => + { + CallbackNode n = (CallbackNode)s; + n.Callback(n.CallbackState); + }, this); + } + else + { + Callback(CallbackState); + } + } + } + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/Future.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/Future.cs new file mode 100644 index 0000000000..e3339efd42 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/Future.cs @@ -0,0 +1,1481 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// A task that produces a value. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; + +// Disable the "reference to volatile field not treated as volatile" error. +#pragma warning disable 0420 + +namespace System.Threading.Tasks +{ + /// <summary> + /// Represents an asynchronous operation that produces a result at some time in the future. + /// </summary> + /// <typeparam name="TResult"> + /// The type of the result produced by this <see cref="Task{TResult}"/>. + /// </typeparam> + /// <remarks> + /// <para> + /// <see cref="Task{TResult}"/> instances may be created in a variety of ways. The most common approach is by + /// using the task's <see cref="Factory"/> property to retrieve a <see + /// cref="System.Threading.Tasks.TaskFactory{TResult}"/> instance that can be used to create tasks for several + /// purposes. For example, to create a <see cref="Task{TResult}"/> that runs a function, the factory's StartNew + /// method may be used: + /// <code> + /// // C# + /// var t = Task<int>.Factory.StartNew(() => GenerateResult()); + /// - or - + /// var t = Task.Factory.StartNew(() => GenerateResult()); + /// + /// ' Visual Basic + /// Dim t = Task<int>.Factory.StartNew(Function() GenerateResult()) + /// - or - + /// Dim t = Task.Factory.StartNew(Function() GenerateResult()) + /// </code> + /// </para> + /// <para> + /// The <see cref="Task{TResult}"/> class also provides constructors that initialize the task but that do not + /// schedule it for execution. For performance reasons, the StartNew method should be the + /// preferred mechanism for creating and scheduling computational tasks, but for scenarios where creation + /// and scheduling must be separated, the constructors may be used, and the task's + /// <see cref="System.Threading.Tasks.Task.Start()">Start</see> + /// method may then be used to schedule the task for execution at a later time. + /// </para> + /// <para> + /// All members of <see cref="Task{TResult}"/>, except for + /// <see cref="System.Threading.Tasks.Task.Dispose()">Dispose</see>, are thread-safe + /// and may be used from multiple threads concurrently. + /// </para> + /// </remarks> + [DebuggerTypeProxy(typeof(SystemThreadingTasks_FutureDebugView<>))] + [DebuggerDisplay("Id = {Id}, Status = {Status}, Method = {DebuggerDisplayMethodDescription}, Result = {DebuggerDisplayResultDescription}")] + public class Task<TResult> : Task + { + internal TResult m_result; // The value itself, if set. + + private static readonly TaskFactory<TResult> s_Factory = new TaskFactory<TResult>(); + + // Delegate used by: + // public static Task<Task<TResult>> WhenAny<TResult>(IEnumerable<Task<TResult>> tasks); + // public static Task<Task<TResult>> WhenAny<TResult>(params Task<TResult>[] tasks); + // Used to "cast" from Task<Task> to Task<Task<TResult>>. + internal static readonly Func<Task<Task>, Task<TResult>> TaskWhenAnyCast = completed => (Task<TResult>)completed.Result; + + // Construct a promise-style task without any options. + internal Task() : + base() + { + } + + // Construct a promise-style task with state and options. + internal Task(object state, TaskCreationOptions options) : + base(state, options, promiseStyle: true) + { + } + + + // Construct a pre-completed Task<TResult> + internal Task(TResult result) : + base(false, TaskCreationOptions.None, default) + { + m_result = result; + } + + internal Task(bool canceled, TResult result, TaskCreationOptions creationOptions, CancellationToken ct) + : base(canceled, creationOptions, ct) + { + if (!canceled) + { + m_result = result; + } + } + + // Uncomment if/when we want Task.FromException + //// Construct a pre-faulted Task<TResult> + //internal Task(Exception exception) + // : base(exception) + //{ + //} + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified function. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + public Task(Func<TResult> function) + : this(function, null, default, + TaskCreationOptions.None, InternalTaskOptions.None, null) + { + } + + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified function. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> to be assigned to this task.</param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task(Func<TResult> function, CancellationToken cancellationToken) + : this(function, null, cancellationToken, + TaskCreationOptions.None, InternalTaskOptions.None, null) + { + } + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified function and creation options. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="creationOptions"> + /// The <see cref="System.Threading.Tasks.TaskCreationOptions">TaskCreationOptions</see> used to + /// customize the task's behavior. + /// </param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="creationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskCreationOptions"/>. + /// </exception> + public Task(Func<TResult> function, TaskCreationOptions creationOptions) + : this(function, Task.InternalCurrentIfAttached(creationOptions), default, creationOptions, InternalTaskOptions.None, null) + { + } + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified function and creation options. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new task.</param> + /// <param name="creationOptions"> + /// The <see cref="System.Threading.Tasks.TaskCreationOptions">TaskCreationOptions</see> used to + /// customize the task's behavior. + /// </param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="creationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskCreationOptions"/>. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task(Func<TResult> function, CancellationToken cancellationToken, TaskCreationOptions creationOptions) + : this(function, Task.InternalCurrentIfAttached(creationOptions), cancellationToken, creationOptions, InternalTaskOptions.None, null) + { + } + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified function and state. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="state">An object representing data to be used by the action.</param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + public Task(Func<object, TResult> function, object state) + : this(function, state, null, default, + TaskCreationOptions.None, InternalTaskOptions.None, null) + { + } + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified action, state, and options. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="state">An object representing data to be used by the function.</param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> to be assigned to the new task.</param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task(Func<object, TResult> function, object state, CancellationToken cancellationToken) + : this(function, state, null, cancellationToken, + TaskCreationOptions.None, InternalTaskOptions.None, null) + { + } + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified action, state, and options. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="state">An object representing data to be used by the function.</param> + /// <param name="creationOptions"> + /// The <see cref="System.Threading.Tasks.TaskCreationOptions">TaskCreationOptions</see> used to + /// customize the task's behavior. + /// </param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="creationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskCreationOptions"/>. + /// </exception> + public Task(Func<object, TResult> function, object state, TaskCreationOptions creationOptions) + : this(function, state, Task.InternalCurrentIfAttached(creationOptions), default, + creationOptions, InternalTaskOptions.None, null) + { + } + + + /// <summary> + /// Initializes a new <see cref="Task{TResult}"/> with the specified action, state, and options. + /// </summary> + /// <param name="function"> + /// The delegate that represents the code to execute in the task. When the function has completed, + /// the task's <see cref="Result"/> property will be set to return the result value of the function. + /// </param> + /// <param name="state">An object representing data to be used by the function.</param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> to be assigned to the new task.</param> + /// <param name="creationOptions"> + /// The <see cref="System.Threading.Tasks.TaskCreationOptions">TaskCreationOptions</see> used to + /// customize the task's behavior. + /// </param> + /// <exception cref="T:System.ArgumentException"> + /// The <paramref name="function"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="creationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskCreationOptions"/>. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task(Func<object, TResult> function, object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions) + : this(function, state, Task.InternalCurrentIfAttached(creationOptions), cancellationToken, + creationOptions, InternalTaskOptions.None, null) + { + } + + /// <summary> + /// Creates a new future object. + /// </summary> + /// <param name="parent">The parent task for this future.</param> + /// <param name="valueSelector">A function that yields the future value.</param> + /// <param name="scheduler">The task scheduler which will be used to execute the future.</param> + /// <param name="cancellationToken">The CancellationToken for the task.</param> + /// <param name="creationOptions">Options to control the future's behavior.</param> + /// <param name="internalOptions">Internal options to control the future's behavior.</param> + internal Task(Func<TResult> valueSelector, Task parent, CancellationToken cancellationToken, + TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) : + base(valueSelector, null, parent, cancellationToken, creationOptions, internalOptions, scheduler) + { + } + + /// <summary> + /// Creates a new future object. + /// </summary> + /// <param name="parent">The parent task for this future.</param> + /// <param name="state">An object containing data to be used by the action; may be null.</param> + /// <param name="valueSelector">A function that yields the future value.</param> + /// <param name="cancellationToken">The CancellationToken for the task.</param> + /// <param name="scheduler">The task scheduler which will be used to execute the future.</param> + /// <param name="creationOptions">Options to control the future's behavior.</param> + /// <param name="internalOptions">Internal options to control the future's behavior.</param> + internal Task(Delegate valueSelector, object state, Task parent, CancellationToken cancellationToken, + TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) : + base(valueSelector, state, parent, cancellationToken, creationOptions, internalOptions, scheduler) + { + } + + + // Internal method used by TaskFactory<TResult>.StartNew() methods + internal static Task<TResult> StartNew(Task parent, Func<TResult> function, CancellationToken cancellationToken, + TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) + { + if (function == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.function); + } + if (scheduler == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler); + } + + // Create and schedule the future. + Task<TResult> f = new Task<TResult>(function, parent, cancellationToken, creationOptions, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler); + + f.ScheduleAndStart(false); + return f; + } + + // Internal method used by TaskFactory<TResult>.StartNew() methods + internal static Task<TResult> StartNew(Task parent, Func<object, TResult> function, object state, CancellationToken cancellationToken, + TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) + { + if (function == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.function); + } + if (scheduler == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler); + } + + // Create and schedule the future. + Task<TResult> f = new Task<TResult>(function, state, parent, cancellationToken, creationOptions, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler); + + f.ScheduleAndStart(false); + return f; + } + + // Debugger support + private string DebuggerDisplayResultDescription + { + get + { + return IsCompletedSuccessfully ? "" + m_result : SR.TaskT_DebuggerNoResult; + } + } + + // Debugger support + private string DebuggerDisplayMethodDescription + { + get + { + Delegate d = m_action; + return d != null ? d.Method.ToString() : "{null}"; + } + } + + + // internal helper function breaks out logic used by TaskCompletionSource + internal bool TrySetResult(TResult result) + { + Debug.Assert(m_action == null, "Task<T>.TrySetResult(): non-null m_action"); + + // "Reserve" the completion for this task, while making sure that: (1) No prior reservation + // has been made, (2) The result has not already been set, (3) An exception has not previously + // been recorded, and (4) Cancellation has not been requested. + // + // If the reservation is successful, then set the result and finish completion processing. + if (AtomicStateUpdate(TASK_STATE_COMPLETION_RESERVED, + TASK_STATE_COMPLETION_RESERVED | TASK_STATE_RAN_TO_COMPLETION | TASK_STATE_FAULTED | TASK_STATE_CANCELED)) + { + m_result = result; + + // Signal completion, for waiting tasks + + // This logic used to be: + // Finish(false); + // However, that goes through a windy code path, involves many non-inlineable functions + // and which can be summarized more concisely with the following snippet from + // FinishStageTwo, omitting everything that doesn't pertain to TrySetResult. + Interlocked.Exchange(ref m_stateFlags, m_stateFlags | TASK_STATE_RAN_TO_COMPLETION); + ContingentProperties props = m_contingentProperties; + if (props != null) + { + NotifyParentIfPotentiallyAttachedTask(); + props.SetCompleted(); + } + FinishContinuations(); + return true; + } + + return false; + } + + // Transitions the promise task into a successfully completed state with the specified result. + // This is dangerous, as no synchronization is used, and thus must only be used + // before this task is handed out to any consumers, before any continuations are hooked up, + // before its wait handle is accessed, etc. It's use is limited to places like in FromAsync + // where the operation completes synchronously, and thus we know we can forcefully complete + // the task, avoiding expensive completion paths, before the task is actually given to anyone. + internal void DangerousSetResult(TResult result) + { + Debug.Assert(!IsCompleted, "The promise must not yet be completed."); + + // If we have a parent, we need to notify it of the completion. Take the slow path to handle that. + if (m_contingentProperties?.m_parent != null) + { + bool success = TrySetResult(result); + + // Nobody else has had a chance to complete this Task yet, so we should succeed. + Debug.Assert(success); + } + else + { + m_result = result; + m_stateFlags |= TASK_STATE_RAN_TO_COMPLETION; + } + } + + /// <summary> + /// Gets the result value of this <see cref="Task{TResult}"/>. + /// </summary> + /// <remarks> + /// The get accessor for this property ensures that the asynchronous operation is complete before + /// returning. Once the result of the computation is available, it is stored and will be returned + /// immediately on later calls to <see cref="Result"/>. + /// </remarks> + [DebuggerBrowsable(DebuggerBrowsableState.Never)] + public TResult Result + { + get { return IsWaitNotificationEnabledOrNotRanToCompletion ? GetResultCore(waitCompletionNotification: true) : m_result; } + } + + /// <summary> + /// Gets the result value of this <see cref="Task{TResult}"/> once the task has completed successfully. + /// </summary> + /// <remarks> + /// This version of Result should only be used if the task completed successfully and if there's + /// no debugger wait notification enabled for this task. + /// </remarks> + internal TResult ResultOnSuccess + { + get + { + Debug.Assert(!IsWaitNotificationEnabledOrNotRanToCompletion, + "Should only be used when the task completed successfully and there's no wait notification enabled"); + return m_result; + } + } + + // Implements Result. Result delegates to this method if the result isn't already available. + internal TResult GetResultCore(bool waitCompletionNotification) + { + // If the result has not been calculated yet, wait for it. + if (!IsCompleted) InternalWait(Timeout.Infinite, default); // won't throw if task faulted or canceled; that's handled below + + // Notify the debugger of the wait completion if it's requested such a notification + if (waitCompletionNotification) NotifyDebuggerOfWaitCompletionIfNecessary(); + + // Throw an exception if appropriate. + if (!IsCompletedSuccessfully) ThrowIfExceptional(includeTaskCanceledExceptions: true); + + // We shouldn't be here if the result has not been set. + Debug.Assert(IsCompletedSuccessfully, "Task<T>.Result getter: Expected result to have been set."); + + return m_result; + } + + // Allow multiple exceptions to be assigned to a promise-style task. + // This is useful when a TaskCompletionSource<T> stands in as a proxy + // for a "real" task (as we do in Unwrap(), ContinueWhenAny() and ContinueWhenAll()) + // and the "real" task ends up with multiple exceptions, which is possible when + // a task has children. + // + // Called from TaskCompletionSource<T>.SetException(IEnumerable<Exception>). + internal bool TrySetException(object exceptionObject) + { + Debug.Assert(m_action == null, "Task<T>.TrySetException(): non-null m_action"); + + // TCS.{Try}SetException() should have checked for this + Debug.Assert(exceptionObject != null, "Expected non-null exceptionObject argument"); + + // Only accept these types. + Debug.Assert( + (exceptionObject is Exception) || (exceptionObject is IEnumerable<Exception>) || + (exceptionObject is ExceptionDispatchInfo) || (exceptionObject is IEnumerable<ExceptionDispatchInfo>), + "Expected exceptionObject to be either Exception, ExceptionDispatchInfo, or IEnumerable<> of one of those"); + + bool returnValue = false; + + // "Reserve" the completion for this task, while making sure that: (1) No prior reservation + // has been made, (2) The result has not already been set, (3) An exception has not previously + // been recorded, and (4) Cancellation has not been requested. + // + // If the reservation is successful, then add the exception(s) and finish completion processing. + // + // The lazy initialization may not be strictly necessary, but I'd like to keep it here + // anyway. Some downstream logic may depend upon an inflated m_contingentProperties. + EnsureContingentPropertiesInitialized(); + if (AtomicStateUpdate(TASK_STATE_COMPLETION_RESERVED, + TASK_STATE_COMPLETION_RESERVED | TASK_STATE_RAN_TO_COMPLETION | TASK_STATE_FAULTED | TASK_STATE_CANCELED)) + { + AddException(exceptionObject); // handles singleton exception or exception collection + Finish(false); + returnValue = true; + } + + return returnValue; + } + + // internal helper function breaks out logic used by TaskCompletionSource and AsyncMethodBuilder + // If the tokenToRecord is not None, it will be stored onto the task. + // This method is only valid for promise tasks. + internal bool TrySetCanceled(CancellationToken tokenToRecord) + { + return TrySetCanceled(tokenToRecord, null); + } + + // internal helper function breaks out logic used by TaskCompletionSource and AsyncMethodBuilder + // If the tokenToRecord is not None, it will be stored onto the task. + // If the OperationCanceledException is not null, it will be stored into the task's exception holder. + // This method is only valid for promise tasks. + internal bool TrySetCanceled(CancellationToken tokenToRecord, object cancellationException) + { + Debug.Assert(m_action == null, "Task<T>.TrySetCanceled(): non-null m_action"); +#if DEBUG + var ceAsEdi = cancellationException as ExceptionDispatchInfo; + Debug.Assert( + cancellationException == null || + cancellationException is OperationCanceledException || + (ceAsEdi != null && ceAsEdi.SourceException is OperationCanceledException), + "Expected null or an OperationCanceledException"); +#endif + + bool returnValue = false; + + // "Reserve" the completion for this task, while making sure that: (1) No prior reservation + // has been made, (2) The result has not already been set, (3) An exception has not previously + // been recorded, and (4) Cancellation has not been requested. + // + // If the reservation is successful, then record the cancellation and finish completion processing. + // + // Note: I had to access static Task variables through Task<object> + // instead of Task, because I have a property named Task and that + // was confusing the compiler. + if (AtomicStateUpdate(Task<object>.TASK_STATE_COMPLETION_RESERVED, + Task<object>.TASK_STATE_COMPLETION_RESERVED | Task<object>.TASK_STATE_CANCELED | + Task<object>.TASK_STATE_FAULTED | Task<object>.TASK_STATE_RAN_TO_COMPLETION)) + { + RecordInternalCancellationRequest(tokenToRecord, cancellationException); + CancellationCleanupLogic(); // perform cancellation cleanup actions + returnValue = true; + } + + return returnValue; + } + + /// <summary> + /// Provides access to factory methods for creating <see cref="Task{TResult}"/> instances. + /// </summary> + /// <remarks> + /// The factory returned from <see cref="Factory"/> is a default instance + /// of <see cref="System.Threading.Tasks.TaskFactory{TResult}"/>, as would result from using + /// the default constructor on the factory type. + /// </remarks> + public new static TaskFactory<TResult> Factory { get { return s_Factory; } } + + /// <summary> + /// Evaluates the value selector of the Task which is passed in as an object and stores the result. + /// </summary> + internal override void InnerInvoke() + { + // Invoke the delegate + Debug.Assert(m_action != null); + var func = m_action as Func<TResult>; + if (func != null) + { + m_result = func(); + return; + } + var funcWithState = m_action as Func<object, TResult>; + if (funcWithState != null) + { + m_result = funcWithState(m_stateObject); + return; + } + Debug.Fail("Invalid m_action in Task<TResult>"); + } + + #region Await Support + + /// <summary>Gets an awaiter used to await this <see cref="System.Threading.Tasks.Task{TResult}"/>.</summary> + /// <returns>An awaiter instance.</returns> + /// <remarks>This method is intended for compiler user rather than use directly in code.</remarks> + public new TaskAwaiter<TResult> GetAwaiter() + { + return new TaskAwaiter<TResult>(this); + } + + /// <summary>Configures an awaiter used to await this <see cref="System.Threading.Tasks.Task{TResult}"/>.</summary> + /// <param name="continueOnCapturedContext"> + /// true to attempt to marshal the continuation back to the original context captured; otherwise, false. + /// </param> + /// <returns>An object used to await this task.</returns> + public new ConfiguredTaskAwaitable<TResult> ConfigureAwait(bool continueOnCapturedContext) + { + return new ConfiguredTaskAwaitable<TResult>(this, continueOnCapturedContext); + } + + #endregion + + #region Continuation methods + + #region Action<Task<TResult>> continuations + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + public Task ContinueWith(Action<Task<TResult>> continuationAction) + { + return ContinueWith(continuationAction, TaskScheduler.Current, default, TaskContinuationOptions.None); + } + + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new continuation task.</param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task ContinueWith(Action<Task<TResult>> continuationAction, CancellationToken cancellationToken) + { + return ContinueWith(continuationAction, TaskScheduler.Current, cancellationToken, TaskContinuationOptions.None); + } + + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its execution. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + public Task ContinueWith(Action<Task<TResult>> continuationAction, TaskScheduler scheduler) + { + return ContinueWith(continuationAction, scheduler, default, TaskContinuationOptions.None); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed. If the continuation criteria specified through the <paramref + /// name="continuationOptions"/> parameter are not met, the continuation task will be canceled + /// instead of scheduled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + public Task ContinueWith(Action<Task<TResult>> continuationAction, TaskContinuationOptions continuationOptions) + { + return ContinueWith(continuationAction, TaskScheduler.Current, default, continuationOptions); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new continuation task.</param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its + /// execution. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed. If the criteria specified through the <paramref name="continuationOptions"/> parameter + /// are not met, the continuation task will be canceled instead of scheduled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task ContinueWith(Action<Task<TResult>> continuationAction, CancellationToken cancellationToken, + TaskContinuationOptions continuationOptions, TaskScheduler scheduler) + { + return ContinueWith(continuationAction, scheduler, cancellationToken, continuationOptions); + } + + // Same as the above overload, only with a stack mark. + internal Task ContinueWith(Action<Task<TResult>> continuationAction, TaskScheduler scheduler, CancellationToken cancellationToken, + TaskContinuationOptions continuationOptions) + { + if (continuationAction == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.continuationAction); + } + + if (scheduler == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler); + } + + TaskCreationOptions creationOptions; + InternalTaskOptions internalOptions; + CreationOptionsFromContinuationOptions( + continuationOptions, + out creationOptions, + out internalOptions); + + Task continuationTask = new ContinuationTaskFromResultTask<TResult>( + this, continuationAction, null, + creationOptions, internalOptions + ); + + // Register the continuation. If synchronous execution is requested, this may + // actually invoke the continuation before returning. + ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions); + + return continuationTask; + } + #endregion + + #region Action<Task<TResult>, Object> continuations + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation action.</param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + public Task ContinueWith(Action<Task<TResult>, object> continuationAction, object state) + { + return ContinueWith(continuationAction, state, TaskScheduler.Current, default, TaskContinuationOptions.None); + } + + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation action.</param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new continuation task.</param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task ContinueWith(Action<Task<TResult>, object> continuationAction, object state, CancellationToken cancellationToken) + { + return ContinueWith(continuationAction, state, TaskScheduler.Current, cancellationToken, TaskContinuationOptions.None); + } + + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation action.</param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its execution. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + public Task ContinueWith(Action<Task<TResult>, object> continuationAction, object state, TaskScheduler scheduler) + { + return ContinueWith(continuationAction, state, scheduler, default, TaskContinuationOptions.None); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation action.</param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed. If the continuation criteria specified through the <paramref + /// name="continuationOptions"/> parameter are not met, the continuation task will be canceled + /// instead of scheduled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + public Task ContinueWith(Action<Task<TResult>, object> continuationAction, object state, TaskContinuationOptions continuationOptions) + { + return ContinueWith(continuationAction, state, TaskScheduler.Current, default, continuationOptions); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <param name="continuationAction"> + /// An action to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation action.</param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new continuation task.</param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its + /// execution. + /// </param> + /// <returns>A new continuation <see cref="Task"/>.</returns> + /// <remarks> + /// The returned <see cref="Task"/> will not be scheduled for execution until the current task has + /// completed. If the criteria specified through the <paramref name="continuationOptions"/> parameter + /// are not met, the continuation task will be canceled instead of scheduled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationAction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task ContinueWith(Action<Task<TResult>, object> continuationAction, object state, CancellationToken cancellationToken, + TaskContinuationOptions continuationOptions, TaskScheduler scheduler) + { + return ContinueWith(continuationAction, state, scheduler, cancellationToken, continuationOptions); + } + + // Same as the above overload, only with a stack mark. + internal Task ContinueWith(Action<Task<TResult>, object> continuationAction, object state, TaskScheduler scheduler, CancellationToken cancellationToken, + TaskContinuationOptions continuationOptions) + { + if (continuationAction == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.continuationAction); + } + + if (scheduler == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler); + } + + TaskCreationOptions creationOptions; + InternalTaskOptions internalOptions; + CreationOptionsFromContinuationOptions( + continuationOptions, + out creationOptions, + out internalOptions); + + Task continuationTask = new ContinuationTaskFromResultTask<TResult>( + this, continuationAction, state, + creationOptions, internalOptions + ); + + // Register the continuation. If synchronous execution is requested, this may + // actually invoke the continuation before returning. + ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions); + + return continuationTask; + } + + #endregion + + #region Func<Task<TResult>,TNewResult> continuations + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current + /// task has completed, whether it completes due to running to completion successfully, faulting due + /// to an unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction) + { + return ContinueWith<TNewResult>(continuationFunction, TaskScheduler.Current, default, TaskContinuationOptions.None); + } + + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new task.</param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current + /// task has completed, whether it completes due to running to completion successfully, faulting due + /// to an unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction, CancellationToken cancellationToken) + { + return ContinueWith<TNewResult>(continuationFunction, TaskScheduler.Current, cancellationToken, TaskContinuationOptions.None); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its execution. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction, TaskScheduler scheduler) + { + return ContinueWith<TNewResult>(continuationFunction, scheduler, default, TaskContinuationOptions.None); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task as an argument. + /// </param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// <para> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current + /// task has completed, whether it completes due to running to completion successfully, faulting due + /// to an unhandled exception, or exiting out early due to being canceled. + /// </para> + /// <para> + /// The <paramref name="continuationFunction"/>, when executed, should return a <see + /// cref="Task{TNewResult}"/>. This task's completion state will be transferred to the task returned + /// from the ContinueWith call. + /// </para> + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction, TaskContinuationOptions continuationOptions) + { + return ContinueWith<TNewResult>(continuationFunction, TaskScheduler.Current, default, continuationOptions); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be passed as + /// an argument this completed task. + /// </param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new task.</param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its + /// execution. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// <para> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </para> + /// <para> + /// The <paramref name="continuationFunction"/>, when executed, should return a <see cref="Task{TNewResult}"/>. + /// This task's completion state will be transferred to the task returned from the + /// ContinueWith call. + /// </para> + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction, CancellationToken cancellationToken, + TaskContinuationOptions continuationOptions, TaskScheduler scheduler) + { + return ContinueWith<TNewResult>(continuationFunction, scheduler, cancellationToken, continuationOptions); + } + + // Same as the above overload, just with a stack mark. + internal Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, TNewResult> continuationFunction, TaskScheduler scheduler, + CancellationToken cancellationToken, TaskContinuationOptions continuationOptions) + { + if (continuationFunction == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.continuationFunction); + } + + if (scheduler == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler); + } + + TaskCreationOptions creationOptions; + InternalTaskOptions internalOptions; + CreationOptionsFromContinuationOptions( + continuationOptions, + out creationOptions, + out internalOptions); + + Task<TNewResult> continuationFuture = new ContinuationResultTaskFromResultTask<TResult, TNewResult>( + this, continuationFunction, null, + creationOptions, internalOptions + ); + + // Register the continuation. If synchronous execution is requested, this may + // actually invoke the continuation before returning. + ContinueWithCore(continuationFuture, scheduler, cancellationToken, continuationOptions); + + return continuationFuture; + } + #endregion + + #region Func<Task<TResult>, Object,TNewResult> continuations + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation function.</param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current + /// task has completed, whether it completes due to running to completion successfully, faulting due + /// to an unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, object, TNewResult> continuationFunction, object state) + { + return ContinueWith<TNewResult>(continuationFunction, state, TaskScheduler.Current, default, TaskContinuationOptions.None); + } + + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation function.</param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new task.</param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current + /// task has completed, whether it completes due to running to completion successfully, faulting due + /// to an unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, object, TNewResult> continuationFunction, object state, + CancellationToken cancellationToken) + { + return ContinueWith<TNewResult>(continuationFunction, state, TaskScheduler.Current, cancellationToken, TaskContinuationOptions.None); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation function.</param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its execution. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, object, TNewResult> continuationFunction, object state, + TaskScheduler scheduler) + { + return ContinueWith<TNewResult>(continuationFunction, state, scheduler, default, TaskContinuationOptions.None); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation function.</param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// <para> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current + /// task has completed, whether it completes due to running to completion successfully, faulting due + /// to an unhandled exception, or exiting out early due to being canceled. + /// </para> + /// <para> + /// The <paramref name="continuationFunction"/>, when executed, should return a <see + /// cref="Task{TNewResult}"/>. This task's completion state will be transferred to the task returned + /// from the ContinueWith call. + /// </para> + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, object, TNewResult> continuationFunction, object state, + TaskContinuationOptions continuationOptions) + { + return ContinueWith<TNewResult>(continuationFunction, state, TaskScheduler.Current, default, continuationOptions); + } + + /// <summary> + /// Creates a continuation that executes when the target <see cref="Task{TResult}"/> completes. + /// </summary> + /// <typeparam name="TNewResult"> + /// The type of the result produced by the continuation. + /// </typeparam> + /// <param name="continuationFunction"> + /// A function to run when the <see cref="Task{TResult}"/> completes. When run, the delegate will be + /// passed the completed task and the caller-supplied state object as arguments. + /// </param> + /// <param name="state">An object representing data to be used by the continuation function.</param> + /// <param name="cancellationToken">The <see cref="CancellationToken"/> that will be assigned to the new task.</param> + /// <param name="continuationOptions"> + /// Options for when the continuation is scheduled and how it behaves. This includes criteria, such + /// as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.OnlyOnCanceled">OnlyOnCanceled</see>, as + /// well as execution options, such as <see + /// cref="System.Threading.Tasks.TaskContinuationOptions.ExecuteSynchronously">ExecuteSynchronously</see>. + /// </param> + /// <param name="scheduler"> + /// The <see cref="TaskScheduler"/> to associate with the continuation task and to use for its + /// execution. + /// </param> + /// <returns>A new continuation <see cref="Task{TNewResult}"/>.</returns> + /// <remarks> + /// <para> + /// The returned <see cref="Task{TNewResult}"/> will not be scheduled for execution until the current task has + /// completed, whether it completes due to running to completion successfully, faulting due to an + /// unhandled exception, or exiting out early due to being canceled. + /// </para> + /// <para> + /// The <paramref name="continuationFunction"/>, when executed, should return a <see cref="Task{TNewResult}"/>. + /// This task's completion state will be transferred to the task returned from the + /// ContinueWith call. + /// </para> + /// </remarks> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="continuationFunction"/> argument is null. + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="continuationOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Threading.Tasks.TaskContinuationOptions">TaskContinuationOptions</see>. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"> + /// The <paramref name="scheduler"/> argument is null. + /// </exception> + /// <exception cref="T:System.ObjectDisposedException">The provided <see cref="System.Threading.CancellationToken">CancellationToken</see> + /// has already been disposed. + /// </exception> + public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, object, TNewResult> continuationFunction, object state, + CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, TaskScheduler scheduler) + { + return ContinueWith<TNewResult>(continuationFunction, state, scheduler, cancellationToken, continuationOptions); + } + + // Same as the above overload, just with a stack mark. + internal Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, object, TNewResult> continuationFunction, object state, + TaskScheduler scheduler, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions) + { + if (continuationFunction == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.continuationFunction); + } + + if (scheduler == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.scheduler); + } + + TaskCreationOptions creationOptions; + InternalTaskOptions internalOptions; + CreationOptionsFromContinuationOptions( + continuationOptions, + out creationOptions, + out internalOptions); + + Task<TNewResult> continuationFuture = new ContinuationResultTaskFromResultTask<TResult, TNewResult>( + this, continuationFunction, state, + creationOptions, internalOptions + ); + + // Register the continuation. If synchronous execution is requested, this may + // actually invoke the continuation before returning. + ContinueWithCore(continuationFuture, scheduler, cancellationToken, continuationOptions); + + return continuationFuture; + } + + #endregion + + #endregion + } + + // Proxy class for better debugging experience + internal class SystemThreadingTasks_FutureDebugView<TResult> + { + private Task<TResult> m_task; + + public SystemThreadingTasks_FutureDebugView(Task<TResult> task) + { + m_task = task; + } + + public TResult Result { get { return m_task.Status == TaskStatus.RanToCompletion ? m_task.Result : default; } } + public object AsyncState { get { return m_task.AsyncState; } } + public TaskCreationOptions CreationOptions { get { return m_task.CreationOptions; } } + public Exception Exception { get { return m_task.Exception; } } + public int Id { get { return m_task.Id; } } + public bool CancellationPending { get { return (m_task.Status == TaskStatus.WaitingToRun) && m_task.CancellationToken.IsCancellationRequested; } } + public TaskStatus Status { get { return m_task.Status; } } + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/ProducerConsumerQueues.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/ProducerConsumerQueues.cs new file mode 100644 index 0000000000..1880288c51 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/ProducerConsumerQueues.cs @@ -0,0 +1,368 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// Specialized producer/consumer queues. +// +// +// ************<IMPORTANT NOTE>************* +// +// src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs +// src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs +// Keep both of them consistent by changing the other file when you change this one, also avoid: +// 1- To reference interneal types in mscorlib +// 2- To reference any dataflow specific types +// This should be fixed post Dev11 when this class becomes public. +// +// ************</IMPORTANT NOTE>************* +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.InteropServices; + +namespace System.Threading.Tasks +{ + /// <summary>Represents a producer/consumer queue used internally by dataflow blocks.</summary> + /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam> + internal interface IProducerConsumerQueue<T> : IEnumerable<T> + { + /// <summary>Enqueues an item into the queue.</summary> + /// <param name="item">The item to enqueue.</param> + /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks> + void Enqueue(T item); + + /// <summary>Attempts to dequeue an item from the queue.</summary> + /// <param name="result">The dequeued item.</param> + /// <returns>true if an item could be dequeued; otherwise, false.</returns> + /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks> + bool TryDequeue(out T result); + + /// <summary>Gets whether the collection is currently empty.</summary> + /// <remarks>This method may or may not be thread-safe.</remarks> + bool IsEmpty { get; } + + /// <summary>Gets the number of items in the collection.</summary> + /// <remarks>In many implementations, this method will not be thread-safe.</remarks> + int Count { get; } + } + + /// <summary> + /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently. + /// </summary> + /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam> + [DebuggerDisplay("Count = {Count}")] + internal sealed class MultiProducerMultiConsumerQueue<T> : ConcurrentQueue<T>, IProducerConsumerQueue<T> + { + /// <summary>Enqueues an item into the queue.</summary> + /// <param name="item">The item to enqueue.</param> + void IProducerConsumerQueue<T>.Enqueue(T item) { base.Enqueue(item); } + + /// <summary>Attempts to dequeue an item from the queue.</summary> + /// <param name="result">The dequeued item.</param> + /// <returns>true if an item could be dequeued; otherwise, false.</returns> + bool IProducerConsumerQueue<T>.TryDequeue(out T result) { return base.TryDequeue(out result); } + + /// <summary>Gets whether the collection is currently empty.</summary> + bool IProducerConsumerQueue<T>.IsEmpty { get { return base.IsEmpty; } } + + /// <summary>Gets the number of items in the collection.</summary> + int IProducerConsumerQueue<T>.Count { get { return base.Count; } } + } + + /// <summary> + /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently. + /// </summary> + /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam> + [DebuggerDisplay("Count = {Count}")] + [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))] + internal sealed class SingleProducerSingleConsumerQueue<T> : IProducerConsumerQueue<T> + { + // Design: + // + // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used + // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by + // multiple producer threads concurrently or multiple consumer threads concurrently. + // + // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented + // as an array with two indexes: m_first and m_last. m_first is the index of the array slot for the consumer + // to read next, and m_last is the slot for the producer to write next. The circular buffer is empty when + // (m_first == m_last), and full when ((m_last+1) % m_array.Length == m_first). + // + // Since m_first is only ever modified by the consumer thread and m_last by the producer, the two indices can + // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer, + // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds + // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big + // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first + // empty out the old buffer and only then follow the producer into the new (larger) buffer. + // + // As described above, the enqueue operation on the fast path only modifies the m_first field of the current segment. + // However, it also needs to read m_last in order to verify that there is room in the current segment. Similarly, the + // dequeue operation on the fast path only needs to modify m_last, but also needs to read m_first to verify that the + // queue is non-empty. This results in true cache line sharing between the producer and the consumer. + // + // The cache line sharing issue can be mitigating by having a possibly stale copy of m_first that is owned by the producer, + // and a possibly stale copy of m_last that is owned by the consumer. So, the consumer state is described using + // (m_first, m_lastCopy) and the producer state using (m_firstCopy, m_last). The consumer state is separated from + // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines. + // m_lastCopy is the consumer's copy of m_last. Whenever the consumer can tell that there is room in the buffer + // simply by observing m_lastCopy, the consumer thread does not need to read m_last and thus encounter a cache miss. Only + // when the buffer appears to be empty will the consumer refresh m_lastCopy from m_last. m_firstCopy is used by the producer + // in the same way to avoid reading m_first on the hot path. + + /// <summary>The initial size to use for segments (in number of elements).</summary> + private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2 + /// <summary>The maximum size to use for segments (in number of elements).</summary> + private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as int.MaxValue / 2 + + /// <summary>The head of the linked list of segments.</summary> + private volatile Segment m_head; + /// <summary>The tail of the linked list of segments.</summary> + private volatile Segment m_tail; + + /// <summary>Initializes the queue.</summary> + internal SingleProducerSingleConsumerQueue() + { + // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation + Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0."); + Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2"); + Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum."); + Debug.Assert(MAX_SEGMENT_SIZE < int.MaxValue / 2, "Max segment size * 2 must be < int.MaxValue, or else overflow could occur."); + + // Initialize the queue + m_head = m_tail = new Segment(INIT_SEGMENT_SIZE); + } + + /// <summary>Enqueues an item into the queue.</summary> + /// <param name="item">The item to enqueue.</param> + public void Enqueue(T item) + { + Segment segment = m_tail; + var array = segment.m_array; + int last = segment.m_state.m_last; // local copy to avoid multiple volatile reads + + // Fast path: there's obviously room in the current segment + int tail2 = (last + 1) & (array.Length - 1); + if (tail2 != segment.m_state.m_firstCopy) + { + array[last] = item; + segment.m_state.m_last = tail2; + } + // Slow path: there may not be room in the current segment. + else EnqueueSlow(item, ref segment); + } + + /// <summary>Enqueues an item into the queue.</summary> + /// <param name="item">The item to enqueue.</param> + /// <param name="segment">The segment in which to first attempt to store the item.</param> + private void EnqueueSlow(T item, ref Segment segment) + { + Debug.Assert(segment != null, "Expected a non-null segment."); + + if (segment.m_state.m_firstCopy != segment.m_state.m_first) + { + segment.m_state.m_firstCopy = segment.m_state.m_first; + Enqueue(item); // will only recur once for this enqueue operation + return; + } + + int newSegmentSize = m_tail.m_array.Length << 1; // double size + Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow."); + if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE; + + var newSegment = new Segment(newSegmentSize); + newSegment.m_array[0] = item; + newSegment.m_state.m_last = 1; + newSegment.m_state.m_lastCopy = 1; + + try { } + finally + { + // Finally block to protect against corruption due to a thread abort + // between setting m_next and setting m_tail. + Volatile.Write(ref m_tail.m_next, newSegment); // ensure segment not published until item is fully stored + m_tail = newSegment; + } + } + + /// <summary>Attempts to dequeue an item from the queue.</summary> + /// <param name="result">The dequeued item.</param> + /// <returns>true if an item could be dequeued; otherwise, false.</returns> + public bool TryDequeue(out T result) + { + Segment segment = m_head; + var array = segment.m_array; + int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads + + // Fast path: there's obviously data available in the current segment + if (first != segment.m_state.m_lastCopy) + { + result = array[first]; + array[first] = default; // Clear the slot to release the element + segment.m_state.m_first = (first + 1) & (array.Length - 1); + return true; + } + // Slow path: there may not be data available in the current segment + else return TryDequeueSlow(ref segment, ref array, out result); + } + + /// <summary>Attempts to dequeue an item from the queue.</summary> + /// <param name="array">The array from which the item was dequeued.</param> + /// <param name="segment">The segment from which the item was dequeued.</param> + /// <param name="result">The dequeued item.</param> + /// <returns>true if an item could be dequeued; otherwise, false.</returns> + private bool TryDequeueSlow(ref Segment segment, ref T[] array, out T result) + { + Debug.Assert(segment != null, "Expected a non-null segment."); + Debug.Assert(array != null, "Expected a non-null item array."); + + if (segment.m_state.m_last != segment.m_state.m_lastCopy) + { + segment.m_state.m_lastCopy = segment.m_state.m_last; + return TryDequeue(out result); // will only recur once for this dequeue operation + } + + if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last) + { + segment = segment.m_next; + array = segment.m_array; + m_head = segment; + } + + var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads + + if (first == segment.m_state.m_last) + { + result = default; + return false; + } + + result = array[first]; + array[first] = default; // Clear the slot to release the element + segment.m_state.m_first = (first + 1) & (segment.m_array.Length - 1); + segment.m_state.m_lastCopy = segment.m_state.m_last; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy + + return true; + } + + /// <summary>Gets whether the collection is currently empty.</summary> + /// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks> + public bool IsEmpty + { + // This implementation is optimized for calls from the consumer. + get + { + var head = m_head; + if (head.m_state.m_first != head.m_state.m_lastCopy) return false; // m_first is volatile, so the read of m_lastCopy cannot get reordered + if (head.m_state.m_first != head.m_state.m_last) return false; + return head.m_next == null; + } + } + + /// <summary>Gets an enumerable for the collection.</summary> + /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks> + public IEnumerator<T> GetEnumerator() + { + for (Segment segment = m_head; segment != null; segment = segment.m_next) + { + for (int pt = segment.m_state.m_first; + pt != segment.m_state.m_last; + pt = (pt + 1) & (segment.m_array.Length - 1)) + { + yield return segment.m_array[pt]; + } + } + } + /// <summary>Gets an enumerable for the collection.</summary> + /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks> + IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } + + /// <summary>Gets the number of items in the collection.</summary> + /// <remarks>WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.</remarks> + public int Count + { + get + { + int count = 0; + for (Segment segment = m_head; segment != null; segment = segment.m_next) + { + int arraySize = segment.m_array.Length; + int first, last; + while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is + { + first = segment.m_state.m_first; + last = segment.m_state.m_last; + if (first == segment.m_state.m_first) break; + } + count += (last - first) & (arraySize - 1); + } + return count; + } + } + + /// <summary>A segment in the queue containing one or more items.</summary> + [StructLayout(LayoutKind.Sequential)] + private sealed class Segment + { + /// <summary>The next segment in the linked list of segments.</summary> + internal Segment m_next; + /// <summary>The data stored in this segment.</summary> + internal readonly T[] m_array; + /// <summary>Details about the segment.</summary> + internal SegmentState m_state; // separated out to enable StructLayout attribute to take effect + + /// <summary>Initializes the segment.</summary> + /// <param name="size">The size to use for this segment.</param> + internal Segment(int size) + { + Debug.Assert((size & (size - 1)) == 0, "Size must be a power of 2"); + m_array = new T[size]; + } + } + + /// <summary>Stores information about a segment.</summary> + [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing + private struct SegmentState + { + /// <summary>Padding to reduce false sharing between the segment's array and m_first.</summary> + internal Internal.PaddingFor32 m_pad0; + + /// <summary>The index of the current head in the segment.</summary> + internal volatile int m_first; + /// <summary>A copy of the current tail index.</summary> + internal int m_lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there m_lastCopy is only read after reading the volatile m_first + + /// <summary>Padding to reduce false sharing between the first and last.</summary> + internal Internal.PaddingFor32 m_pad1; + + /// <summary>A copy of the current head index.</summary> + internal int m_firstCopy; // not voliatle as only read and written by the consumer thread + /// <summary>The index of the current tail in the segment.</summary> + internal volatile int m_last; + + /// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary> + internal Internal.PaddingFor32 m_pad2; + } + + /// <summary>Debugger type proxy for a SingleProducerSingleConsumerQueue of T.</summary> + private sealed class SingleProducerSingleConsumerQueue_DebugView + { + /// <summary>The queue being visualized.</summary> + private readonly SingleProducerSingleConsumerQueue<T> m_queue; + + /// <summary>Initializes the debug view.</summary> + /// <param name="queue">The queue being debugged.</param> + public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue<T> queue) + { + Debug.Assert(queue != null, "Expected a non-null queue."); + m_queue = queue; + } + } + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskExceptionHolder.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskExceptionHolder.cs new file mode 100644 index 0000000000..0037923bdb --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskExceptionHolder.cs @@ -0,0 +1,317 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// An abstraction for holding and aggregating exceptions. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +// Disable the "reference to volatile field not treated as volatile" error. +#pragma warning disable 0420 + +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Runtime.ExceptionServices; + +namespace System.Threading.Tasks +{ + /// <summary> + /// An exception holder manages a list of exceptions for one particular task. + /// It offers the ability to aggregate, but more importantly, also offers intrinsic + /// support for propagating unhandled exceptions that are never observed. It does + /// this by aggregating and calling UnobservedTaskException event event if the holder + /// is ever GC'd without the holder's contents ever having been requested + /// (e.g. by a Task.Wait, Task.get_Exception, etc). + /// </summary> + internal class TaskExceptionHolder + { + /// <summary>The task with which this holder is associated.</summary> + private readonly Task m_task; + /// <summary> + /// The lazily-initialized list of faulting exceptions. Volatile + /// so that it may be read to determine whether any exceptions were stored. + /// </summary> + private volatile List<ExceptionDispatchInfo> m_faultExceptions; + /// <summary>An exception that triggered the task to cancel.</summary> + private ExceptionDispatchInfo m_cancellationException; + /// <summary>Whether the holder was "observed" and thus doesn't cause finalization behavior.</summary> + private volatile bool m_isHandled; + + /// <summary> + /// Creates a new holder; it will be registered for finalization. + /// </summary> + /// <param name="task">The task this holder belongs to.</param> + internal TaskExceptionHolder(Task task) + { + Debug.Assert(task != null, "Expected a non-null task."); + m_task = task; + } + + /// <summary> + /// A finalizer that repropagates unhandled exceptions. + /// </summary> + ~TaskExceptionHolder() + { + if (m_faultExceptions != null && !m_isHandled) + { + // We will only propagate if this is truly unhandled. The reason this could + // ever occur is somewhat subtle: if a Task's exceptions are observed in some + // other finalizer, and the Task was finalized before the holder, the holder + // will have been marked as handled before even getting here. + + // Publish the unobserved exception and allow users to observe it + AggregateException exceptionToThrow = new AggregateException( + SR.TaskExceptionHolder_UnhandledException, + m_faultExceptions); + UnobservedTaskExceptionEventArgs ueea = new UnobservedTaskExceptionEventArgs(exceptionToThrow); + TaskScheduler.PublishUnobservedTaskException(m_task, ueea); + } + } + + /// <summary>Gets whether the exception holder is currently storing any exceptions for faults.</summary> + internal bool ContainsFaultList { get { return m_faultExceptions != null; } } + + /// <summary> + /// Add an exception to the holder. This will ensure the holder is + /// in the proper state (handled/unhandled) depending on the list's contents. + /// </summary> + /// <param name="representsCancellation"> + /// Whether the exception represents a cancellation request (true) or a fault (false). + /// </param> + /// <param name="exceptionObject"> + /// An exception object (either an Exception, an ExceptionDispatchInfo, + /// an IEnumerable{Exception}, or an IEnumerable{ExceptionDispatchInfo}) + /// to add to the list. + /// </param> + /// <remarks> + /// Must be called under lock. + /// </remarks> + internal void Add(object exceptionObject, bool representsCancellation) + { + Debug.Assert(exceptionObject != null, "TaskExceptionHolder.Add(): Expected a non-null exceptionObject"); + Debug.Assert( + exceptionObject is Exception || exceptionObject is IEnumerable<Exception> || + exceptionObject is ExceptionDispatchInfo || exceptionObject is IEnumerable<ExceptionDispatchInfo>, + "TaskExceptionHolder.Add(): Expected Exception, IEnumerable<Exception>, ExceptionDispatchInfo, or IEnumerable<ExceptionDispatchInfo>"); + + if (representsCancellation) SetCancellationException(exceptionObject); + else AddFaultException(exceptionObject); + } + + /// <summary>Sets the cancellation exception.</summary> + /// <param name="exceptionObject">The cancellation exception.</param> + /// <remarks> + /// Must be called under lock. + /// </remarks> + private void SetCancellationException(object exceptionObject) + { + Debug.Assert(exceptionObject != null, "Expected exceptionObject to be non-null."); + + Debug.Assert(m_cancellationException == null, + "Expected SetCancellationException to be called only once."); + // Breaking this assumption will overwrite a previously OCE, + // and implies something may be wrong elsewhere, since there should only ever be one. + + Debug.Assert(m_faultExceptions == null, + "Expected SetCancellationException to be called before any faults were added."); + // Breaking this assumption shouldn't hurt anything here, but it implies something may be wrong elsewhere. + // If this changes, make sure to only conditionally mark as handled below. + + // Store the cancellation exception + var oce = exceptionObject as OperationCanceledException; + if (oce != null) + { + m_cancellationException = ExceptionDispatchInfo.Capture(oce); + } + else + { + var edi = exceptionObject as ExceptionDispatchInfo; + Debug.Assert(edi != null && edi.SourceException is OperationCanceledException, + "Expected an OCE or an EDI that contained an OCE"); + m_cancellationException = edi; + } + + // This is just cancellation, and there are no faults, so mark the holder as handled. + MarkAsHandled(false); + } + + /// <summary>Adds the exception to the fault list.</summary> + /// <param name="exceptionObject">The exception to store.</param> + /// <remarks> + /// Must be called under lock. + /// </remarks> + private void AddFaultException(object exceptionObject) + { + Debug.Assert(exceptionObject != null, "AddFaultException(): Expected a non-null exceptionObject"); + + // Initialize the exceptions list if necessary. The list should be non-null iff it contains exceptions. + var exceptions = m_faultExceptions; + if (exceptions == null) m_faultExceptions = exceptions = new List<ExceptionDispatchInfo>(1); + else Debug.Assert(exceptions.Count > 0, "Expected existing exceptions list to have > 0 exceptions."); + + // Handle Exception by capturing it into an ExceptionDispatchInfo and storing that + var exception = exceptionObject as Exception; + if (exception != null) + { + exceptions.Add(ExceptionDispatchInfo.Capture(exception)); + } + else + { + // Handle ExceptionDispatchInfo by storing it into the list + var edi = exceptionObject as ExceptionDispatchInfo; + if (edi != null) + { + exceptions.Add(edi); + } + else + { + // Handle enumerables of exceptions by capturing each of the contained exceptions into an EDI and storing it + var exColl = exceptionObject as IEnumerable<Exception>; + if (exColl != null) + { +#if DEBUG + int numExceptions = 0; +#endif + foreach (var exc in exColl) + { +#if DEBUG + Debug.Assert(exc != null, "No exceptions should be null"); + numExceptions++; +#endif + exceptions.Add(ExceptionDispatchInfo.Capture(exc)); + } +#if DEBUG + Debug.Assert(numExceptions > 0, "Collection should contain at least one exception."); +#endif + } + else + { + // Handle enumerables of EDIs by storing them directly + var ediColl = exceptionObject as IEnumerable<ExceptionDispatchInfo>; + if (ediColl != null) + { + exceptions.AddRange(ediColl); +#if DEBUG + Debug.Assert(exceptions.Count > 0, "There should be at least one dispatch info."); + foreach (var tmp in exceptions) + { + Debug.Assert(tmp != null, "No dispatch infos should be null"); + } +#endif + } + // Anything else is a programming error + else + { + throw new ArgumentException(SR.TaskExceptionHolder_UnknownExceptionType, nameof(exceptionObject)); + } + } + } + } + + if (exceptions.Count > 0) + MarkAsUnhandled(); + } + + /// <summary> + /// A private helper method that ensures the holder is considered + /// unhandled, i.e. it is registered for finalization. + /// </summary> + private void MarkAsUnhandled() + { + // If a thread partially observed this thread's exceptions, we + // should revert back to "not handled" so that subsequent exceptions + // must also be seen. Otherwise, some could go missing. We also need + // to reregister for finalization. + if (m_isHandled) + { + GC.ReRegisterForFinalize(this); + m_isHandled = false; + } + } + + /// <summary> + /// A private helper method that ensures the holder is considered + /// handled, i.e. it is not registered for finalization. + /// </summary> + /// <param name="calledFromFinalizer">Whether this is called from the finalizer thread.</param> + internal void MarkAsHandled(bool calledFromFinalizer) + { + if (!m_isHandled) + { + if (!calledFromFinalizer) + { + GC.SuppressFinalize(this); + } + + m_isHandled = true; + } + } + + /// <summary> + /// Allocates a new aggregate exception and adds the contents of the list to + /// it. By calling this method, the holder assumes exceptions to have been + /// "observed", such that the finalization check will be subsequently skipped. + /// </summary> + /// <param name="calledFromFinalizer">Whether this is being called from a finalizer.</param> + /// <param name="includeThisException">An extra exception to be included (optionally).</param> + /// <returns>The aggregate exception to throw.</returns> + internal AggregateException CreateExceptionObject(bool calledFromFinalizer, Exception includeThisException) + { + var exceptions = m_faultExceptions; + Debug.Assert(exceptions != null, "Expected an initialized list."); + Debug.Assert(exceptions.Count > 0, "Expected at least one exception."); + + // Mark as handled and aggregate the exceptions. + MarkAsHandled(calledFromFinalizer); + + // If we're only including the previously captured exceptions, + // return them immediately in an aggregate. + if (includeThisException == null) + return new AggregateException(exceptions); + + // Otherwise, the caller wants a specific exception to be included, + // so return an aggregate containing that exception and the rest. + Exception[] combinedExceptions = new Exception[exceptions.Count + 1]; + for (int i = 0; i < combinedExceptions.Length - 1; i++) + { + combinedExceptions[i] = exceptions[i].SourceException; + } + combinedExceptions[combinedExceptions.Length - 1] = includeThisException; + return new AggregateException(combinedExceptions); + } + + /// <summary> + /// Wraps the exception dispatch infos into a new read-only collection. By calling this method, + /// the holder assumes exceptions to have been "observed", such that the finalization + /// check will be subsequently skipped. + /// </summary> + internal ReadOnlyCollection<ExceptionDispatchInfo> GetExceptionDispatchInfos() + { + var exceptions = m_faultExceptions; + Debug.Assert(exceptions != null, "Expected an initialized list."); + Debug.Assert(exceptions.Count > 0, "Expected at least one exception."); + MarkAsHandled(false); + return new ReadOnlyCollection<ExceptionDispatchInfo>(exceptions); + } + + /// <summary> + /// Gets the ExceptionDispatchInfo representing the singular exception + /// that was the cause of the task's cancellation. + /// </summary> + /// <returns> + /// The ExceptionDispatchInfo for the cancellation exception. May be null. + /// </returns> + internal ExceptionDispatchInfo GetCancellationExceptionDispatchInfo() + { + var edi = m_cancellationException; + Debug.Assert(edi == null || edi.SourceException is OperationCanceledException, + "Expected the EDI to be for an OperationCanceledException"); + return edi; + } + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskScheduler.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskScheduler.cs new file mode 100644 index 0000000000..c5bf02b9bc --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/TaskScheduler.cs @@ -0,0 +1,703 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// This file contains the primary interface and management of tasks and queues. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +// Disable the "reference to volatile field not treated as volatile" error. +#pragma warning disable 0420 + +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; + +namespace System.Threading.Tasks +{ + /// <summary> + /// Represents an abstract scheduler for tasks. + /// </summary> + /// <remarks> + /// <para> + /// <see cref="System.Threading.Tasks.TaskScheduler">TaskScheduler</see> acts as the extension point for all + /// pluggable scheduling logic. This includes mechanisms such as how to schedule a task for execution, and + /// how scheduled tasks should be exposed to debuggers. + /// </para> + /// <para> + /// All members of the abstract <see cref="TaskScheduler"/> type are thread-safe + /// and may be used from multiple threads concurrently. + /// </para> + /// </remarks> + [DebuggerDisplay("Id={Id}")] + [DebuggerTypeProxy(typeof(SystemThreadingTasks_TaskSchedulerDebugView))] + public abstract class TaskScheduler + { + //////////////////////////////////////////////////////////// + // + // User Provided Methods and Properties + // + + /// <summary> + /// Queues a <see cref="T:System.Threading.Tasks.Task">Task</see> to the scheduler. + /// </summary> + /// <remarks> + /// <para> + /// A class derived from <see cref="T:System.Threading.Tasks.TaskScheduler">TaskScheduler</see> + /// implements this method to accept tasks being scheduled on the scheduler. + /// A typical implementation would store the task in an internal data structure, which would + /// be serviced by threads that would execute those tasks at some time in the future. + /// </para> + /// <para> + /// This method is only meant to be called by the .NET Framework and + /// should not be called directly by the derived class. This is necessary + /// for maintaining the consistency of the system. + /// </para> + /// </remarks> + /// <param name="task">The <see cref="T:System.Threading.Tasks.Task">Task</see> to be queued.</param> + /// <exception cref="T:System.ArgumentNullException">The <paramref name="task"/> argument is null.</exception> + protected internal abstract void QueueTask(Task task); + + /// <summary> + /// Determines whether the provided <see cref="T:System.Threading.Tasks.Task">Task</see> + /// can be executed synchronously in this call, and if it can, executes it. + /// </summary> + /// <remarks> + /// <para> + /// A class derived from <see cref="TaskScheduler">TaskScheduler</see> implements this function to + /// support inline execution of a task on a thread that initiates a wait on that task object. Inline + /// execution is optional, and the request may be rejected by returning false. However, better + /// scalability typically results the more tasks that can be inlined, and in fact a scheduler that + /// inlines too little may be prone to deadlocks. A proper implementation should ensure that a + /// request executing under the policies guaranteed by the scheduler can successfully inline. For + /// example, if a scheduler uses a dedicated thread to execute tasks, any inlining requests from that + /// thread should succeed. + /// </para> + /// <para> + /// If a scheduler decides to perform the inline execution, it should do so by calling to the base + /// TaskScheduler's + /// <see cref="TryExecuteTask">TryExecuteTask</see> method with the provided task object, propagating + /// the return value. It may also be appropriate for the scheduler to remove an inlined task from its + /// internal data structures if it decides to honor the inlining request. Note, however, that under + /// some circumstances a scheduler may be asked to inline a task that was not previously provided to + /// it with the <see cref="QueueTask"/> method. + /// </para> + /// <para> + /// The derived scheduler is responsible for making sure that the calling thread is suitable for + /// executing the given task as far as its own scheduling and execution policies are concerned. + /// </para> + /// </remarks> + /// <param name="task">The <see cref="T:System.Threading.Tasks.Task">Task</see> to be + /// executed.</param> + /// <param name="taskWasPreviouslyQueued">A Boolean denoting whether or not task has previously been + /// queued. If this parameter is True, then the task may have been previously queued (scheduled); if + /// False, then the task is known not to have been queued, and this call is being made in order to + /// execute the task inline without queueing it.</param> + /// <returns>A Boolean value indicating whether the task was executed inline.</returns> + /// <exception cref="T:System.ArgumentNullException">The <paramref name="task"/> argument is + /// null.</exception> + /// <exception cref="T:System.InvalidOperationException">The <paramref name="task"/> was already + /// executed.</exception> + protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued); + + /// <summary> + /// Generates an enumerable of <see cref="T:System.Threading.Tasks.Task">Task</see> instances + /// currently queued to the scheduler waiting to be executed. + /// </summary> + /// <remarks> + /// <para> + /// A class derived from <see cref="TaskScheduler"/> implements this method in order to support + /// integration with debuggers. This method will only be invoked by the .NET Framework when the + /// debugger requests access to the data. The enumerable returned will be traversed by debugging + /// utilities to access the tasks currently queued to this scheduler, enabling the debugger to + /// provide a representation of this information in the user interface. + /// </para> + /// <para> + /// It is important to note that, when this method is called, all other threads in the process will + /// be frozen. Therefore, it's important to avoid synchronization with other threads that may lead to + /// blocking. If synchronization is necessary, the method should prefer to throw a <see + /// cref="System.NotSupportedException"/> + /// than to block, which could cause a debugger to experience delays. Additionally, this method and + /// the enumerable returned must not modify any globally visible state. + /// </para> + /// <para> + /// The returned enumerable should never be null. If there are currently no queued tasks, an empty + /// enumerable should be returned instead. + /// </para> + /// <para> + /// For developers implementing a custom debugger, this method shouldn't be called directly, but + /// rather this functionality should be accessed through the internal wrapper method + /// GetScheduledTasksForDebugger: + /// <c>internal Task[] GetScheduledTasksForDebugger()</c>. This method returns an array of tasks, + /// rather than an enumerable. In order to retrieve a list of active schedulers, a debugger may use + /// another internal method: <c>internal static TaskScheduler[] GetTaskSchedulersForDebugger()</c>. + /// This static method returns an array of all active TaskScheduler instances. + /// GetScheduledTasksForDebugger then may be used on each of these scheduler instances to retrieve + /// the list of scheduled tasks for each. + /// </para> + /// </remarks> + /// <returns>An enumerable that allows traversal of tasks currently queued to this scheduler. + /// </returns> + /// <exception cref="T:System.NotSupportedException"> + /// This scheduler is unable to generate a list of queued tasks at this time. + /// </exception> + protected abstract IEnumerable<Task> GetScheduledTasks(); + + /// <summary> + /// Indicates the maximum concurrency level this + /// <see cref="TaskScheduler"/> is able to support. + /// </summary> + public virtual int MaximumConcurrencyLevel + { + get + { + return int.MaxValue; + } + } + + //////////////////////////////////////////////////////////// + // + // Internal overridable methods + // + + + /// <summary> + /// Attempts to execute the target task synchronously. + /// </summary> + /// <param name="task">The task to run.</param> + /// <param name="taskWasPreviouslyQueued">True if the task may have been previously queued, + /// false if the task was absolutely not previously queued.</param> + /// <returns>True if it ran, false otherwise.</returns> + internal bool TryRunInline(Task task, bool taskWasPreviouslyQueued) + { + // Do not inline unstarted tasks (i.e., task.ExecutingTaskScheduler == null). + // Do not inline TaskCompletionSource-style (a.k.a. "promise") tasks. + // No need to attempt inlining if the task body was already run (i.e. either TASK_STATE_DELEGATE_INVOKED or TASK_STATE_CANCELED bits set) + TaskScheduler ets = task.ExecutingTaskScheduler; + + // Delegate cross-scheduler inlining requests to target scheduler + if (ets != this && ets != null) return ets.TryRunInline(task, taskWasPreviouslyQueued); + + StackGuard currentStackGuard; + if ((ets == null) || + (task.m_action == null) || + task.IsDelegateInvoked || + task.IsCanceled || + (currentStackGuard = Task.CurrentStackGuard).TryBeginInliningScope() == false) + { + return false; + } + + // Task class will still call into TaskScheduler.TryRunInline rather than TryExecuteTaskInline() so that + // 1) we can adjust the return code from TryExecuteTaskInline in case a buggy custom scheduler lies to us + // 2) we maintain a mechanism for the TLS lookup optimization that we used to have for the ConcRT scheduler (will potentially introduce the same for TP) + bool bInlined = false; + try + { +#if CORECLR + if (TplEtwProvider.Log.IsEnabled()) + { + task.FireTaskScheduledIfNeeded(this); + } +#endif + bInlined = TryExecuteTaskInline(task, taskWasPreviouslyQueued); + } + finally + { + currentStackGuard.EndInliningScope(); + } + + // If the custom scheduler returned true, we should either have the TASK_STATE_DELEGATE_INVOKED or TASK_STATE_CANCELED bit set + // Otherwise the scheduler is buggy + if (bInlined && !(task.IsDelegateInvoked || task.IsCanceled)) + { + throw new InvalidOperationException(SR.TaskScheduler_InconsistentStateAfterTryExecuteTaskInline); + } + + return bInlined; + } + + /// <summary> + /// Attempts to dequeue a <see cref="T:System.Threading.Tasks.Task">Task</see> that was previously queued to + /// this scheduler. + /// </summary> + /// <param name="task">The <see cref="T:System.Threading.Tasks.Task">Task</see> to be dequeued.</param> + /// <returns>A Boolean denoting whether the <paramref name="task"/> argument was successfully dequeued.</returns> + /// <exception cref="T:System.ArgumentNullException">The <paramref name="task"/> argument is null.</exception> + protected internal virtual bool TryDequeue(Task task) + { + return false; + } + + /// <summary> + /// Notifies the scheduler that a work item has made progress. + /// </summary> + internal virtual void NotifyWorkItemProgress() + { + } + + /// <summary> + /// Indicates whether this is a custom scheduler, in which case the safe code paths will be taken upon task entry + /// using a CAS to transition from queued state to executing. + /// </summary> + internal virtual bool RequiresAtomicStartTransition + { + get { return true; } + } + + /// <summary> + /// Calls QueueTask() after performing any needed firing of events + /// </summary> + internal void InternalQueueTask(Task task) + { + Debug.Assert(task != null); + +#if CORECLR + if (TplEtwProvider.Log.IsEnabled()) + { + task.FireTaskScheduledIfNeeded(this); + } +#endif + + this.QueueTask(task); + } + + + //////////////////////////////////////////////////////////// + // + // Member variables + // + + // The global container that keeps track of TaskScheduler instances for debugging purposes. + private static ConditionalWeakTable<TaskScheduler, object> s_activeTaskSchedulers; + + // An AppDomain-wide default manager. + private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler(); + + //static counter used to generate unique TaskScheduler IDs + internal static int s_taskSchedulerIdCounter; + + // this TaskScheduler's unique ID + private volatile int m_taskSchedulerId; + + + + //////////////////////////////////////////////////////////// + // + // Constructors and public properties + // + + /// <summary> + /// Initializes the <see cref="System.Threading.Tasks.TaskScheduler"/>. + /// </summary> + protected TaskScheduler() + { +#if CORECLR // Debugger support + // Register the scheduler in the active scheduler list. This is only relevant when debugging, + // so we only pay the cost if the debugger is attached when the scheduler is created. This + // means that the internal TaskScheduler.GetTaskSchedulersForDebugger() will only include + // schedulers created while the debugger is attached. + if (Debugger.IsAttached) + { + AddToActiveTaskSchedulers(); + } +#endif + } + + /// <summary>Adds this scheduler ot the active schedulers tracking collection for debugging purposes.</summary> + private void AddToActiveTaskSchedulers() + { + ConditionalWeakTable<TaskScheduler, object> activeTaskSchedulers = s_activeTaskSchedulers; + if (activeTaskSchedulers == null) + { + Interlocked.CompareExchange(ref s_activeTaskSchedulers, new ConditionalWeakTable<TaskScheduler, object>(), null); + activeTaskSchedulers = s_activeTaskSchedulers; + } + activeTaskSchedulers.Add(this, null); + } + + /// <summary> + /// Gets the default <see cref="System.Threading.Tasks.TaskScheduler">TaskScheduler</see> instance. + /// </summary> + public static TaskScheduler Default + { + get + { + return s_defaultTaskScheduler; + } + } + + /// <summary> + /// Gets the <see cref="System.Threading.Tasks.TaskScheduler">TaskScheduler</see> + /// associated with the currently executing task. + /// </summary> + /// <remarks> + /// When not called from within a task, <see cref="Current"/> will return the <see cref="Default"/> scheduler. + /// </remarks> + public static TaskScheduler Current + { + get + { + TaskScheduler current = InternalCurrent; + return current ?? TaskScheduler.Default; + } + } + + /// <summary> + /// Gets the <see cref="System.Threading.Tasks.TaskScheduler">TaskScheduler</see> + /// associated with the currently executing task. + /// </summary> + /// <remarks> + /// When not called from within a task, <see cref="InternalCurrent"/> will return null. + /// </remarks> + internal static TaskScheduler InternalCurrent + { + get + { + Task currentTask = Task.InternalCurrent; + return ((currentTask != null) + && ((currentTask.CreationOptions & TaskCreationOptions.HideScheduler) == 0) + ) ? currentTask.ExecutingTaskScheduler : null; + } + } + + /// <summary> + /// Creates a <see cref="TaskScheduler"/> + /// associated with the current <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + /// <remarks> + /// All <see cref="System.Threading.Tasks.Task">Task</see> instances queued to + /// the returned scheduler will be executed through a call to the + /// <see cref="System.Threading.SynchronizationContext.Post">Post</see> method + /// on that context. + /// </remarks> + /// <returns> + /// A <see cref="TaskScheduler"/> associated with + /// the current <see cref="T:System.Threading.SynchronizationContext">SynchronizationContext</see>, as + /// determined by <see cref="System.Threading.SynchronizationContext.Current">SynchronizationContext.Current</see>. + /// </returns> + /// <exception cref="T:System.InvalidOperationException"> + /// The current SynchronizationContext may not be used as a TaskScheduler. + /// </exception> + public static TaskScheduler FromCurrentSynchronizationContext() + { + return new SynchronizationContextTaskScheduler(); + } + + /// <summary> + /// Gets the unique ID for this <see cref="TaskScheduler"/>. + /// </summary> + public int Id + { + get + { + if (m_taskSchedulerId == 0) + { + int newId = 0; + + // We need to repeat if Interlocked.Increment wraps around and returns 0. + // Otherwise next time this scheduler's Id is queried it will get a new value + do + { + newId = Interlocked.Increment(ref s_taskSchedulerIdCounter); + } while (newId == 0); + + Interlocked.CompareExchange(ref m_taskSchedulerId, newId, 0); + } + + return m_taskSchedulerId; + } + } + + /// <summary> + /// Attempts to execute the provided <see cref="T:System.Threading.Tasks.Task">Task</see> + /// on this scheduler. + /// </summary> + /// <remarks> + /// <para> + /// Scheduler implementations are provided with <see cref="T:System.Threading.Tasks.Task">Task</see> + /// instances to be executed through either the <see cref="QueueTask"/> method or the + /// <see cref="TryExecuteTaskInline"/> method. When the scheduler deems it appropriate to run the + /// provided task, <see cref="TryExecuteTask"/> should be used to do so. TryExecuteTask handles all + /// aspects of executing a task, including action invocation, exception handling, state management, + /// and lifecycle control. + /// </para> + /// <para> + /// <see cref="TryExecuteTask"/> must only be used for tasks provided to this scheduler by the .NET + /// Framework infrastructure. It should not be used to execute arbitrary tasks obtained through + /// custom mechanisms. + /// </para> + /// </remarks> + /// <param name="task"> + /// A <see cref="T:System.Threading.Tasks.Task">Task</see> object to be executed.</param> + /// <exception cref="T:System.InvalidOperationException"> + /// The <paramref name="task"/> is not associated with this scheduler. + /// </exception> + /// <returns>A Boolean that is true if <paramref name="task"/> was successfully executed, false if it + /// was not. A common reason for execution failure is that the task had previously been executed or + /// is in the process of being executed by another thread.</returns> + protected bool TryExecuteTask(Task task) + { + if (task.ExecutingTaskScheduler != this) + { + throw new InvalidOperationException(SR.TaskScheduler_ExecuteTask_WrongTaskScheduler); + } + + return task.ExecuteEntry(); + } + + //////////////////////////////////////////////////////////// + // + // Events + // + + /// <summary> + /// Occurs when a faulted <see cref="System.Threading.Tasks.Task"/>'s unobserved exception is about to trigger exception escalation + /// policy, which, by default, would terminate the process. + /// </summary> + /// <remarks> + /// This AppDomain-wide event provides a mechanism to prevent exception + /// escalation policy (which, by default, terminates the process) from triggering. + /// Each handler is passed a <see cref="T:System.Threading.Tasks.UnobservedTaskExceptionEventArgs"/> + /// instance, which may be used to examine the exception and to mark it as observed. + /// </remarks> + public static event EventHandler<UnobservedTaskExceptionEventArgs> UnobservedTaskException; + + //////////////////////////////////////////////////////////// + // + // Internal methods + // + + // This is called by the TaskExceptionHolder finalizer. + internal static void PublishUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs ueea) + { + UnobservedTaskException?.Invoke(sender, ueea); + } + + /// <summary> + /// Provides an array of all queued <see cref="System.Threading.Tasks.Task">Task</see> instances + /// for the debugger. + /// </summary> + /// <remarks> + /// The returned array is populated through a call to <see cref="GetScheduledTasks"/>. + /// Note that this function is only meant to be invoked by a debugger remotely. + /// It should not be called by any other codepaths. + /// </remarks> + /// <returns>An array of <see cref="System.Threading.Tasks.Task">Task</see> instances.</returns> + /// <exception cref="T:System.NotSupportedException"> + /// This scheduler is unable to generate a list of queued tasks at this time. + /// </exception> + internal Task[] GetScheduledTasksForDebugger() + { + // this can throw InvalidOperationException indicating that they are unable to provide the info + // at the moment. We should let the debugger receive that exception so that it can indicate it in the UI + IEnumerable<Task> activeTasksSource = GetScheduledTasks(); + + if (activeTasksSource == null) + return null; + + // If it can be cast to an array, use it directly + Task[] activeTasksArray = activeTasksSource as Task[]; + if (activeTasksArray == null) + { + activeTasksArray = (new List<Task>(activeTasksSource)).ToArray(); + } + + // touch all Task.Id fields so that the debugger doesn't need to do a lot of cross-proc calls to generate them + foreach (Task t in activeTasksArray) + { + int tmp = t.Id; + } + + return activeTasksArray; + } + + /// <summary> + /// Provides an array of all active <see cref="System.Threading.Tasks.TaskScheduler">TaskScheduler</see> + /// instances for the debugger. + /// </summary> + /// <remarks> + /// This function is only meant to be invoked by a debugger remotely. + /// It should not be called by any other codepaths. + /// </remarks> + /// <returns>An array of <see cref="System.Threading.Tasks.TaskScheduler">TaskScheduler</see> instances.</returns> + internal static TaskScheduler[] GetTaskSchedulersForDebugger() + { + if (s_activeTaskSchedulers == null) + { + // No schedulers were tracked. Just give back the default. + return new TaskScheduler[] { s_defaultTaskScheduler }; + } + + List<TaskScheduler> schedulers = new List<TaskScheduler>(); + foreach (var item in s_activeTaskSchedulers) + { + schedulers.Add(item.Key); + } + + if (!schedulers.Contains(s_defaultTaskScheduler)) + { + // Make sure the default is included, in case the debugger attached + // after it was created. + schedulers.Add(s_defaultTaskScheduler); + } + + var arr = schedulers.ToArray(); + foreach (var scheduler in arr) + { + Debug.Assert(scheduler != null, "Table returned an incorrect Count or CopyTo failed"); + int tmp = scheduler.Id; // force Ids for debugger + } + return arr; + } + + /// <summary> + /// Nested class that provides debugger view for TaskScheduler + /// </summary> + internal sealed class SystemThreadingTasks_TaskSchedulerDebugView + { + private readonly TaskScheduler m_taskScheduler; + public SystemThreadingTasks_TaskSchedulerDebugView(TaskScheduler scheduler) + { + m_taskScheduler = scheduler; + } + + // returns the scheduler's Id + public int Id + { + get { return m_taskScheduler.Id; } + } + + // returns the scheduler's GetScheduledTasks + public IEnumerable<Task> ScheduledTasks + { + get { return m_taskScheduler.GetScheduledTasks(); } + } + } + } + + + + + /// <summary> + /// A TaskScheduler implementation that executes all tasks queued to it through a call to + /// <see cref="System.Threading.SynchronizationContext.Post"/> on the <see cref="T:System.Threading.SynchronizationContext"/> + /// that its associated with. The default constructor for this class binds to the current <see cref="T:System.Threading.SynchronizationContext"/> + /// </summary> + internal sealed class SynchronizationContextTaskScheduler : TaskScheduler + { + private SynchronizationContext m_synchronizationContext; + + /// <summary> + /// Constructs a SynchronizationContextTaskScheduler associated with <see cref="T:System.Threading.SynchronizationContext.Current"/> + /// </summary> + /// <exception cref="T:System.InvalidOperationException">This constructor expects <see cref="T:System.Threading.SynchronizationContext.Current"/> to be set.</exception> + internal SynchronizationContextTaskScheduler() + { + SynchronizationContext synContext = SynchronizationContext.Current; + + // make sure we have a synccontext to work with + if (synContext == null) + { + throw new InvalidOperationException(SR.TaskScheduler_FromCurrentSynchronizationContext_NoCurrent); + } + + m_synchronizationContext = synContext; + } + + /// <summary> + /// Implementation of <see cref="T:System.Threading.Tasks.TaskScheduler.QueueTask"/> for this scheduler class. + /// + /// Simply posts the tasks to be executed on the associated <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + /// <param name="task"></param> + protected internal override void QueueTask(Task task) + { + m_synchronizationContext.Post(s_postCallback, (object)task); + } + + /// <summary> + /// Implementation of <see cref="T:System.Threading.Tasks.TaskScheduler.TryExecuteTaskInline"/> for this scheduler class. + /// + /// The task will be executed inline only if the call happens within + /// the associated <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + /// <param name="task"></param> + /// <param name="taskWasPreviouslyQueued"></param> + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + if (SynchronizationContext.Current == m_synchronizationContext) + { + return TryExecuteTask(task); + } + else + return false; + } + + // not implemented + protected override IEnumerable<Task> GetScheduledTasks() + { + return null; + } + + /// <summary> + /// Implements the <see cref="T:System.Threading.Tasks.TaskScheduler.MaximumConcurrencyLevel"/> property for + /// this scheduler class. + /// + /// By default it returns 1, because a <see cref="T:System.Threading.SynchronizationContext"/> based + /// scheduler only supports execution on a single thread. + /// </summary> + public override int MaximumConcurrencyLevel + { + get + { + return 1; + } + } + + // preallocated SendOrPostCallback delegate + private static readonly SendOrPostCallback s_postCallback = s => ((Task)s).ExecuteEntry(); // with double-execute check because SC could be buggy + } + + /// <summary> + /// Provides data for the event that is raised when a faulted <see cref="System.Threading.Tasks.Task"/>'s + /// exception goes unobserved. + /// </summary> + /// <remarks> + /// The Exception property is used to examine the exception without marking it + /// as observed, whereas the <see cref="SetObserved"/> method is used to mark the exception + /// as observed. Marking the exception as observed prevents it from triggering exception escalation policy + /// which, by default, terminates the process. + /// </remarks> + public class UnobservedTaskExceptionEventArgs : EventArgs + { + private AggregateException m_exception; + internal bool m_observed = false; + + /// <summary> + /// Initializes a new instance of the <see cref="UnobservedTaskExceptionEventArgs"/> class + /// with the unobserved exception. + /// </summary> + /// <param name="exception">The Exception that has gone unobserved.</param> + public UnobservedTaskExceptionEventArgs(AggregateException exception) { m_exception = exception; } + + /// <summary> + /// Marks the <see cref="Exception"/> as "observed," thus preventing it + /// from triggering exception escalation policy which, by default, terminates the process. + /// </summary> + public void SetObserved() { m_observed = true; } + + /// <summary> + /// Gets whether this exception has been marked as "observed." + /// </summary> + public bool Observed { get { return m_observed; } } + + /// <summary> + /// The Exception that went unobserved. + /// </summary> + public AggregateException Exception { get { return m_exception; } } + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs new file mode 100644 index 0000000000..339a9d8fb5 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/ThreadPoolTaskScheduler.cs @@ -0,0 +1,128 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// TaskScheduler.cs +// +// +// This file contains the primary interface and management of tasks and queues. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System; +using System.Security; +using System.Diagnostics; +using System.Collections.Generic; +using System.Text; + +using Internal.Runtime.Augments; + +namespace System.Threading.Tasks +{ + /// <summary> + /// An implementation of TaskScheduler that uses the ThreadPool scheduler + /// </summary> + internal sealed class ThreadPoolTaskScheduler : TaskScheduler + { + /// <summary> + /// Constructs a new ThreadPool task scheduler object + /// </summary> + internal ThreadPoolTaskScheduler() + { + int id = base.Id; // force ID creation of the default scheduler + } + + // static delegate for threads allocated to handle LongRunning tasks. + private static readonly ParameterizedThreadStart s_longRunningThreadWork = s => ((Task)s).ExecuteEntryUnsafe(threadPoolThread: null); + + /// <summary> + /// Schedules a task to the ThreadPool. + /// </summary> + /// <param name="task">The task to schedule.</param> + protected internal override void QueueTask(Task task) + { + TaskCreationOptions options = task.Options; + if ((options & TaskCreationOptions.LongRunning) != 0) + { + // Run LongRunning tasks on their own dedicated thread. + RuntimeThread thread = RuntimeThread.Create(s_longRunningThreadWork); + thread.IsBackground = true; // Keep this thread from blocking process shutdown + thread.Start(task); + } + else + { + // Normal handling for non-LongRunning tasks. + bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0); + ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal); + } + } + + /// <summary> + /// This internal function will do this: + /// (1) If the task had previously been queued, attempt to pop it and return false if that fails. + /// (2) Return whether the task is executed + /// + /// IMPORTANT NOTE: TryExecuteTaskInline will NOT throw task exceptions itself. Any wait code path using this function needs + /// to account for exceptions that need to be propagated, and throw themselves accordingly. + /// </summary> + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If the task was previously scheduled, and we can't pop it, then return false. + if (taskWasPreviouslyQueued && !ThreadPool.TryPopCustomWorkItem(task)) + return false; + + try + { + task.ExecuteEntryUnsafe(threadPoolThread: null); // handles switching Task.Current etc. + } + finally + { + // Only call NWIP() if task was previously queued + if (taskWasPreviouslyQueued) NotifyWorkItemProgress(); + } + + return true; + } + + protected internal override bool TryDequeue(Task task) + { + // just delegate to TP + return ThreadPool.TryPopCustomWorkItem(task); + } + + protected override IEnumerable<Task> GetScheduledTasks() + { + return FilterTasksFromWorkItems(ThreadPool.GetQueuedWorkItems()); + } + + private IEnumerable<Task> FilterTasksFromWorkItems(IEnumerable<object> tpwItems) + { + foreach (object tpwi in tpwItems) + { + if (tpwi is Task t) + { + yield return t; + } + } + } + + /// <summary> + /// Notifies the scheduler that work is progressing (no-op). + /// </summary> + internal override void NotifyWorkItemProgress() + { + ThreadPool.NotifyWorkItemProgress(); + } + + /// <summary> + /// This is the only scheduler that returns false for this property, indicating that the task entry codepath is unsafe (CAS free) + /// since we know that the underlying scheduler already takes care of atomic transitions from queued to non-queued. + /// </summary> + internal override bool RequiresAtomicStartTransition + { + get { return false; } + } + } +} |