diff options
Diffstat (limited to 'src/mscorlib/src/System/Threading/Tasks/Parallel.cs')
-rw-r--r-- | src/mscorlib/src/System/Threading/Tasks/Parallel.cs | 3593 |
1 files changed, 0 insertions, 3593 deletions
diff --git a/src/mscorlib/src/System/Threading/Tasks/Parallel.cs b/src/mscorlib/src/System/Threading/Tasks/Parallel.cs deleted file mode 100644 index 7808943870..0000000000 --- a/src/mscorlib/src/System/Threading/Tasks/Parallel.cs +++ /dev/null @@ -1,3593 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// -// -// -// A helper class that contains parallel versions of various looping constructs. This -// internally uses the task parallel library, but takes care to expose very little -// evidence of this infrastructure being used. -// -// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - -using System; -using System.Diagnostics; -using System.Collections; -using System.Collections.Generic; -using System.Collections.Concurrent; -using System.Security.Permissions; -using System.Threading; -using System.Threading.Tasks; -using System.Diagnostics.Contracts; - - -namespace System.Threading.Tasks -{ - /// <summary> - /// Stores options that configure the operation of methods on the - /// <see cref="T:System.Threading.Tasks.Parallel">Parallel</see> class. - /// </summary> - /// <remarks> - /// By default, methods on the Parallel class attempt to utilize all available processors, are non-cancelable, and target - /// the default TaskScheduler (TaskScheduler.Default). <see cref="ParallelOptions"/> enables - /// overriding these defaults. - /// </remarks> - public class ParallelOptions - { - private TaskScheduler m_scheduler; - private int m_maxDegreeOfParallelism; - private CancellationToken m_cancellationToken; - - /// <summary> - /// Initializes a new instance of the <see cref="ParallelOptions"/> class. - /// </summary> - /// <remarks> - /// This constructor initializes the instance with default values. <see cref="MaxDegreeOfParallelism"/> - /// is initialized to -1, signifying that there is no upper bound set on how much parallelism should - /// be employed. <see cref="CancellationToken"/> is initialized to a non-cancelable token, - /// and <see cref="TaskScheduler"/> is initialized to the default scheduler (TaskScheduler.Default). - /// All of these defaults may be overwritten using the property set accessors on the instance. - /// </remarks> - public ParallelOptions() - { - m_scheduler = TaskScheduler.Default; - m_maxDegreeOfParallelism = -1; - m_cancellationToken = CancellationToken.None; - } - - /// <summary> - /// Gets or sets the <see cref="T:System.Threading.Tasks.TaskScheduler">TaskScheduler</see> - /// associated with this <see cref="ParallelOptions"/> instance. Setting this property to null - /// indicates that the current scheduler should be used. - /// </summary> - public TaskScheduler TaskScheduler - { - get { return m_scheduler; } - set { m_scheduler = value; } - } - - // Convenience property used by TPL logic - internal TaskScheduler EffectiveTaskScheduler - { - get - { - if (m_scheduler == null) return TaskScheduler.Current; - else return m_scheduler; - } - } - - /// <summary> - /// Gets or sets the maximum degree of parallelism enabled by this ParallelOptions instance. - /// </summary> - /// <remarks> - /// The <see cref="MaxDegreeOfParallelism"/> limits the number of concurrent operations run by <see - /// cref="T:System.Threading.Tasks.Parallel">Parallel</see> method calls that are passed this - /// ParallelOptions instance to the set value, if it is positive. If <see - /// cref="MaxDegreeOfParallelism"/> is -1, then there is no limit placed on the number of concurrently - /// running operations. - /// </remarks> - /// <exception cref="T:System.ArgumentOutOfRangeException"> - /// The exception that is thrown when this <see cref="MaxDegreeOfParallelism"/> is set to 0 or some - /// value less than -1. - /// </exception> - public int MaxDegreeOfParallelism - { - get { return m_maxDegreeOfParallelism; } - set - { - if ((value == 0) || (value < -1)) - throw new ArgumentOutOfRangeException(nameof(MaxDegreeOfParallelism)); - m_maxDegreeOfParallelism = value; - } - } - - /// <summary> - /// Gets or sets the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> - /// associated with this <see cref="ParallelOptions"/> instance. - /// </summary> - /// <remarks> - /// Providing a <see cref="T:System.Threading.CancellationToken">CancellationToken</see> - /// to a <see cref="T:System.Threading.Tasks.Parallel">Parallel</see> method enables the operation to be - /// exited early. Code external to the operation may cancel the token, and if the operation observes the - /// token being set, it may exit early by throwing an - /// <see cref="T:System.OperationCanceledException"/>. - /// </remarks> - public CancellationToken CancellationToken - { - get { return m_cancellationToken; } - set { m_cancellationToken = value; } - } - - internal int EffectiveMaxConcurrencyLevel - { - get - { - int rval = MaxDegreeOfParallelism; - int schedulerMax = EffectiveTaskScheduler.MaximumConcurrencyLevel; - if ((schedulerMax > 0) && (schedulerMax != Int32.MaxValue)) - { - rval = (rval == -1) ? schedulerMax : Math.Min(schedulerMax, rval); - } - return rval; - } - } - } - - /// <summary> - /// Provides support for parallel loops and regions. - /// </summary> - /// <remarks> - /// The <see cref="T:System.Threading.Tasks.Parallel"/> class provides library-based data parallel replacements - /// for common operations such as for loops, for each loops, and execution of a set of statements. - /// </remarks> - public static class Parallel - { - // static counter for generating unique Fork/Join Context IDs to be used in ETW events - internal static int s_forkJoinContextID; - - // We use a stride for loops to amortize the frequency of interlocked operations. - internal const int DEFAULT_LOOP_STRIDE = 16; - - // Static variable to hold default parallel options - internal static ParallelOptions s_defaultParallelOptions = new ParallelOptions(); - - /// <summary> - /// Executes each of the provided actions, possibly in parallel. - /// </summary> - /// <param name="actions">An array of <see cref="T:System.Action">Actions</see> to execute.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="actions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentException">The exception that is thrown when the - /// <paramref name="actions"/> array contains a null element.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown when any - /// action in the <paramref name="actions"/> array throws an exception.</exception> - /// <remarks> - /// This method can be used to execute a set of operations, potentially in parallel. - /// No guarantees are made about the order in which the operations execute or whether - /// they execute in parallel. This method does not return until each of the - /// provided operations has completed, regardless of whether completion - /// occurs due to normal or exceptional termination. - /// </remarks> - public static void Invoke(params Action[] actions) - { - Invoke(s_defaultParallelOptions, actions); - } - - /// <summary> - /// Executes each of the provided actions, possibly in parallel. - /// </summary> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="actions">An array of <see cref="T:System.Action">Actions</see> to execute.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="actions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentException">The exception that is thrown when the - /// <paramref name="actions"/> array contains a null element.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown when any - /// action in the <paramref name="actions"/> array throws an exception.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <remarks> - /// This method can be used to execute a set of operations, potentially in parallel. - /// No guarantees are made about the order in which the operations execute or whether - /// the they execute in parallel. This method does not return until each of the - /// provided operations has completed, regardless of whether completion - /// occurs due to normal or exceptional termination. - /// </remarks> - public static void Invoke(ParallelOptions parallelOptions, params Action[] actions) - { - if (actions == null) - { - throw new ArgumentNullException(nameof(actions)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - // Throw an ODE if we're passed a disposed CancellationToken. - if (parallelOptions.CancellationToken.CanBeCanceled - && AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource) - { - parallelOptions.CancellationToken.ThrowIfSourceDisposed(); - } - // Quit early if we're already canceled -- avoid a bunch of work. - if (parallelOptions.CancellationToken.IsCancellationRequested) - throw new OperationCanceledException(parallelOptions.CancellationToken); - - // We must validate that the actions array contains no null elements, and also - // make a defensive copy of the actions array. - Action[] actionsCopy = new Action[actions.Length]; - for (int i = 0; i < actionsCopy.Length; i++) - { - actionsCopy[i] = actions[i]; - if (actionsCopy[i] == null) - { - throw new ArgumentException(Environment.GetResourceString("Parallel_Invoke_ActionNull")); - } - } - - // ETW event for Parallel Invoke Begin - int forkJoinContextID = 0; - Task callerTask = null; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callerTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelInvokeBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelInvoke, - actionsCopy.Length); - } - -#if DEBUG - actions = null; // Ensure we don't accidentally use this below. -#endif - - // If we have no work to do, we are done. - if (actionsCopy.Length < 1) return; - - // In the algorithm below, if the number of actions is greater than this, we automatically - // use Parallel.For() to handle the actions, rather than the Task-per-Action strategy. - const int SMALL_ACTIONCOUNT_LIMIT = 10; - - try - { - // If we've gotten this far, it's time to process the actions. - if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) || - (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)) - { - // Used to hold any exceptions encountered during action processing - ConcurrentQueue<Exception> exceptionQ = null; // will be lazily initialized if necessary - - // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism. - try - { - // Launch a self-replicating task to handle the execution of all actions. - // The use of a self-replicating task allows us to use as many cores - // as are available, and no more. The exception to this rule is - // that, in the case of a blocked action, the ThreadPool may inject - // extra threads, which means extra tasks can run. - int actionIndex = 0; - ParallelForReplicatingTask rootTask = new ParallelForReplicatingTask(parallelOptions, delegate - { - // Each for-task will pull an action at a time from the list - int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1 - while (myIndex <= actionsCopy.Length) - { - // Catch and store any exceptions. If we don't catch them, the self-replicating - // task will exit, and that may cause other SR-tasks to exit. - // And (absent cancellation) we want all actions to execute. - try - { - actionsCopy[myIndex - 1](); - } - catch (Exception e) - { - LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); - exceptionQ.Enqueue(e); - } - - // Check for cancellation. If it is encountered, then exit the delegate. - if (parallelOptions.CancellationToken.IsCancellationRequested) - throw new OperationCanceledException(parallelOptions.CancellationToken); - - // You're still in the game. Grab your next action index. - myIndex = Interlocked.Increment(ref actionIndex); - } - }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating); - - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); - rootTask.Wait(); - } - catch (Exception e) - { - LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); - - // Since we're consuming all action exceptions, there are very few reasons that - // we would see an exception here. Two that come to mind: - // (1) An OCE thrown by one or more actions (AggregateException thrown) - // (2) An exception thrown from the ParallelForReplicatingTask constructor - // (regular exception thrown). - // We'll need to cover them both. - AggregateException ae = e as AggregateException; - if (ae != null) - { - // Strip off outer container of an AggregateException, because downstream - // logic needs OCEs to be at the top level. - foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc); - } - else - { - exceptionQ.Enqueue(e); - } - } - - // If we have encountered any exceptions, then throw. - if ((exceptionQ != null) && (exceptionQ.Count > 0)) - { - ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken); - throw new AggregateException(exceptionQ); - } - } - else - { - // This is more efficient for a small number of actions and no DOP support - - // Initialize our array of tasks, one per action. - Task[] tasks = new Task[actionsCopy.Length]; - - // One more check before we begin... - if (parallelOptions.CancellationToken.IsCancellationRequested) - throw new OperationCanceledException(parallelOptions.CancellationToken); - - // Launch all actions as tasks - for (int i = 1; i < tasks.Length; i++) - { - tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None, - InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler); - } - - // Optimization: Use current thread to run something before we block waiting for all tasks. - tasks[0] = new Task(actionsCopy[0]); - tasks[0].RunSynchronously(parallelOptions.EffectiveTaskScheduler); - - // Now wait for the tasks to complete. This will not unblock until all of - // them complete, and it will throw an exception if one or more of them also - // threw an exception. We let such exceptions go completely unhandled. - try - { - if (tasks.Length <= 4) - { - // for 4 or less tasks, the sequential waitall version is faster - Task.FastWaitAll(tasks); - } - else - { - // otherwise we revert to the regular WaitAll which delegates the multiple wait to the cooperative event. - Task.WaitAll(tasks); - } - } - catch (AggregateException aggExp) - { - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - finally - { - for (int i = 0; i < tasks.Length; i++) - { - if (tasks[i].IsCompleted) tasks[i].Dispose(); - } - } - } - } - finally - { - // ETW event for Parallel Invoke End - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelInvokeEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID); - } - } - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter. - /// </remarks> - public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker<object>( - fromInclusive, toExclusive, - s_defaultParallelOptions, - body, null, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter. - /// </remarks> - public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker64<object>( - fromInclusive, toExclusive, s_defaultParallelOptions, - body, null, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter. - /// </remarks> - public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker<object>( - fromInclusive, toExclusive, parallelOptions, - body, null, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter. - /// </remarks> - public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker64<object>( - fromInclusive, toExclusive, parallelOptions, - body, null, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </para> - /// <para> - /// Calling <see cref="System.Threading.Tasks.ParallelLoopState.Break()">ParallelLoopState.Break()</see> - /// informs the For operation that iterations after the current one need not - /// execute. However, all iterations before the current one will still need to be executed if they have not already. - /// Therefore, calling Break is similar to using a break operation within a - /// conventional for loop in a language like C#, but it is not a perfect substitute: for example, there is no guarantee that iterations - /// after the current one will definitely not execute. - /// </para> - /// <para> - /// If executing all iterations before the current one is not necessary, - /// <see cref="System.Threading.Tasks.ParallelLoopState.Stop()">ParallelLoopState.Stop()</see> - /// should be preferred to using Break. Calling Stop informs the For loop that it may abandon all remaining - /// iterations, regardless of whether they're for interations above or below the current, - /// since all required work has already been completed. As with Break, however, there are no guarantees regarding - /// which other iterations will not execute. - /// </para> - /// <para> - /// When a loop is ended prematurely, the <see cref="T:ParallelLoopState"/> that's returned will contain - /// relevant information about the loop's completion. - /// </para> - /// </remarks> - public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker<object>( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, body, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </remarks> - public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForWorker64<object>( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, body, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </remarks> - public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker<object>( - fromInclusive, toExclusive, parallelOptions, - null, body, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </remarks> - public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, - Action<long, ParallelLoopState> body) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker64<object>( - fromInclusive, toExclusive, parallelOptions, - null, body, null, null, null); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult For<TLocal>( - int fromInclusive, int toExclusive, - Func<TLocal> localInit, - Func<int, ParallelLoopState, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForWorker( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, null, body, localInit, localFinally); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. Supports 64-bit indices. - /// </summary> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult For<TLocal>( - long fromInclusive, long toExclusive, - Func<TLocal> localInit, - Func<long, ParallelLoopState, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForWorker64( - fromInclusive, toExclusive, s_defaultParallelOptions, - null, null, body, localInit, localFinally); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult For<TLocal>( - int fromInclusive, int toExclusive, ParallelOptions parallelOptions, - Func<TLocal> localInit, - Func<int, ParallelLoopState, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForWorker( - fromInclusive, toExclusive, parallelOptions, - null, null, body, localInit, localFinally); - } - - /// <summary> - /// Executes a for loop in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="fromInclusive">The start index, inclusive.</param> - /// <param name="toExclusive">The end index, exclusive.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: - /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult For<TLocal>( - long fromInclusive, long toExclusive, ParallelOptions parallelOptions, - Func<TLocal> localInit, - Func<long, ParallelLoopState, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - - return ForWorker64( - fromInclusive, toExclusive, parallelOptions, - null, null, body, localInit, localFinally); - } - - - - - - - - /// <summary> - /// Performs the major work of the parallel for loop. It assumes that argument validation has already - /// been performed by the caller. This function's whole purpose in life is to enable as much reuse of - /// common implementation details for the various For overloads we offer. Without it, we'd end up - /// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on - /// ParallelState, and (3) for loops with thread local data. - /// - /// </summary> - /// <typeparam name="TLocal">The type of the local data.</typeparam> - /// <param name="fromInclusive">The loop's start index, inclusive.</param> - /// <param name="toExclusive">The loop's end index, exclusive.</param> - /// <param name="parallelOptions">A ParallelOptions instance.</param> - /// <param name="body">The simple loop body.</param> - /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> - /// <param name="bodyWithLocal">The loop body for thread local state overloads.</param> - /// <param name="localInit">A selector function that returns new thread local state.</param> - /// <param name="localFinally">A cleanup function to destroy thread local state.</param> - /// <remarks>Only one of the body arguments may be supplied (i.e. they are exclusive).</remarks> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> - private static ParallelLoopResult ForWorker<TLocal>( - int fromInclusive, int toExclusive, - ParallelOptions parallelOptions, - Action<int> body, - Action<int, ParallelLoopState> bodyWithState, - Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal, - Func<TLocal> localInit, Action<TLocal> localFinally) - { - Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1, - "expected exactly one body function to be supplied"); - Debug.Assert(bodyWithLocal != null || (localInit == null && localFinally == null), - "thread local functions should only be supplied for loops w/ thread local bodies"); - - // Instantiate our result. Specifics will be filled in later. - ParallelLoopResult result = new ParallelLoopResult(); - - // We just return immediately if 'to' is smaller (or equal to) 'from'. - if (toExclusive <= fromInclusive) - { - result.m_completed = true; - return result; - } - - // For all loops we need a shared flag even though we don't have a body with state, - // because the shared flag contains the exceptional bool, which triggers other workers - // to exit their loops if one worker catches an exception - ParallelLoopStateFlags32 sharedPStateFlags = new ParallelLoopStateFlags32(); - - TaskCreationOptions creationOptions = TaskCreationOptions.None; - InternalTaskOptions internalOptions = InternalTaskOptions.SelfReplicating; - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // initialize ranges with passed in loop arguments and expected number of workers - int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? - PlatformHelper.ProcessorCount : - parallelOptions.EffectiveMaxConcurrencyLevel; - RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers); - - // Keep track of any cancellations - OperationCanceledException oce = null; - - CancellationTokenRegistration ctr = new CancellationTokenRegistration(); - - // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) => - { - // Cause processing to stop - sharedPStateFlags.Cancel(); - // Record our cancellation - oce = new OperationCanceledException(parallelOptions.CancellationToken); - }, null); - } - - // ETW event for Parallel For begin - int forkJoinContextID = 0; - Task callingTask = null; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callingTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelLoopBegin((callingTask != null ? callingTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callingTask != null ? callingTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelFor, - fromInclusive, toExclusive); - } - - ParallelForReplicatingTask rootTask = null; - - try - { - // this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel - rootTask = new ParallelForReplicatingTask( - parallelOptions, - delegate - { - // - // first thing we do upon enterying the task is to register as a new "RangeWorker" with the - // shared RangeManager instance. - // - // If this call returns a RangeWorker struct which wraps the state needed by this task - // - // We need to call FindNewWork32() on it to see whether there's a chunk available. - // - - - // Cache some information about the current task - Task currentWorkerTask = Task.InternalCurrent; - bool bIsRootTask = (currentWorkerTask == rootTask); - - RangeWorker currentWorker = new RangeWorker(); - Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica; - - if (savedStateFromPreviousReplica is RangeWorker) - currentWorker = (RangeWorker)savedStateFromPreviousReplica; - else - currentWorker = rangeManager.RegisterNewWorker(); - - - - // These are the local index values to be used in the sequential loop. - // Their values filled in by FindNewWork32 - int nFromInclusiveLocal; - int nToExclusiveLocal; - - if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false || - sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) - { - return; // no need to run - } - - // ETW event for ParallelFor Worker Fork - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - - TLocal localValue = default(TLocal); - bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't - - try - { - // Create a new state object that references the shared "stopped" and "exceptional" flags - // If needed, it will contain a new instance of thread-local state by invoking the selector. - ParallelLoopState32 state = null; - - if (bodyWithState != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState32(sharedPStateFlags); - } - else if (bodyWithLocal != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState32(sharedPStateFlags); - if (localInit != null) - { - localValue = localInit(); - bLocalValueInitialized = true; - } - } - - // initialize a loop timer which will help us decide whether we should exit early - LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); - - // Now perform the loop itself. - do - { - if (body != null) - { - for (int j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state - j += 1) - { - - body(j); - } - } - else if (bodyWithState != null) - { - for (int j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - - state.CurrentIteration = j; - bodyWithState(j, state); - } - } - else - { - for (int j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - state.CurrentIteration = j; - localValue = bodyWithLocal(j, state, localValue); - } - } - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = (object)currentWorker; - break; - } - - } - // Exit if we can't find new work, or if the loop was stoppped. - while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) && - ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) || - !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); - } - catch - { - // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow - sharedPStateFlags.SetExceptional(); - throw; - } - finally - { - // If a cleanup function was specified, call it. Otherwise, if the type is - // IDisposable, we will invoke Dispose on behalf of the user. - if (localFinally != null && bLocalValueInitialized) - { - localFinally(localValue); - } - - // ETW event for ParallelFor Worker Join - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - } - }, - creationOptions, internalOptions); - - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE - rootTask.Wait(); - - // If we made a cancellation registration, we need to clean it up now before observing the OCE - // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // If we got through that with no exceptions, and we were canceled, then - // throw our cancellation exception - if (oce != null) throw oce; - } - catch (AggregateException aggExp) - { - // if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - catch (TaskSchedulerException) - { - // if we made a cancellation registration, and rootTask.RunSynchronously threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - throw; - } - finally - { - int sb_status = sharedPStateFlags.LoopStateFlags; - result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE); - if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - { - result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; - } - - if ((rootTask != null) && rootTask.IsCompleted) rootTask.Dispose(); - - // ETW event for Parallel For End - if (TplEtwProvider.Log.IsEnabled()) - { - int nTotalIterations = 0; - - // calculate how many iterations we ran in total - if (sb_status == ParallelLoopStateFlags.PLS_NONE) - nTotalIterations = toExclusive - fromInclusive; - else if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive; - else - nTotalIterations = -1; //PLS_STOPPED! We can't determine this if we were stopped.. - - TplEtwProvider.Log.ParallelLoopEnd((callingTask != null ? callingTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callingTask != null ? callingTask.Id : 0), - forkJoinContextID, nTotalIterations); - } - } - - return result; - } - - /// <summary> - /// Performs the major work of the 64-bit parallel for loop. It assumes that argument validation has already - /// been performed by the caller. This function's whole purpose in life is to enable as much reuse of - /// common implementation details for the various For overloads we offer. Without it, we'd end up - /// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on - /// ParallelState, and (3) for loops with thread local data. - /// - /// </summary> - /// <typeparam name="TLocal">The type of the local data.</typeparam> - /// <param name="fromInclusive">The loop's start index, inclusive.</param> - /// <param name="toExclusive">The loop's end index, exclusive.</param> - /// <param name="parallelOptions">A ParallelOptions instance.</param> - /// <param name="body">The simple loop body.</param> - /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> - /// <param name="bodyWithLocal">The loop body for thread local state overloads.</param> - /// <param name="localInit">A selector function that returns new thread local state.</param> - /// <param name="localFinally">A cleanup function to destroy thread local state.</param> - /// <remarks>Only one of the body arguments may be supplied (i.e. they are exclusive).</remarks> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> - private static ParallelLoopResult ForWorker64<TLocal>( - long fromInclusive, long toExclusive, - ParallelOptions parallelOptions, - Action<long> body, - Action<long, ParallelLoopState> bodyWithState, - Func<long, ParallelLoopState, TLocal, TLocal> bodyWithLocal, - Func<TLocal> localInit, Action<TLocal> localFinally) - { - Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1, - "expected exactly one body function to be supplied"); - Debug.Assert(bodyWithLocal != null || (localInit == null && localFinally == null), - "thread local functions should only be supplied for loops w/ thread local bodies"); - - // Instantiate our result. Specifics will be filled in later. - ParallelLoopResult result = new ParallelLoopResult(); - - // We just return immediately if 'to' is smaller (or equal to) 'from'. - if (toExclusive <= fromInclusive) - { - result.m_completed = true; - return result; - } - - // For all loops we need a shared flag even though we don't have a body with state, - // because the shared flag contains the exceptional bool, which triggers other workers - // to exit their loops if one worker catches an exception - ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64(); - - TaskCreationOptions creationOptions = TaskCreationOptions.None; - InternalTaskOptions internalOptions = InternalTaskOptions.SelfReplicating; - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // initialize ranges with passed in loop arguments and expected number of workers - int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? - PlatformHelper.ProcessorCount : - parallelOptions.EffectiveMaxConcurrencyLevel; - RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers); - - // Keep track of any cancellations - OperationCanceledException oce = null; - - CancellationTokenRegistration ctr = new CancellationTokenRegistration(); - - // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) => - { - // Cause processing to stop - sharedPStateFlags.Cancel(); - // Record our cancellation - oce = new OperationCanceledException(parallelOptions.CancellationToken); - }, null); - } - - // ETW event for Parallel For begin - Task callerTask = null; - int forkJoinContextID = 0; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callerTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelLoopBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelFor, - fromInclusive, toExclusive); - } - - ParallelForReplicatingTask rootTask = null; - - try - { - // this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel - rootTask = new ParallelForReplicatingTask( - parallelOptions, - delegate - { - // - // first thing we do upon enterying the task is to register as a new "RangeWorker" with the - // shared RangeManager instance. - // - // If this call returns a RangeWorker struct which wraps the state needed by this task - // - // We need to call FindNewWork() on it to see whether there's a chunk available. - // - - // Cache some information about the current task - Task currentWorkerTask = Task.InternalCurrent; - bool bIsRootTask = (currentWorkerTask == rootTask); - - RangeWorker currentWorker = new RangeWorker(); - Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica; - - if (savedStateFromPreviousReplica is RangeWorker) - currentWorker = (RangeWorker)savedStateFromPreviousReplica; - else - currentWorker = rangeManager.RegisterNewWorker(); - - - // These are the local index values to be used in the sequential loop. - // Their values filled in by FindNewWork - long nFromInclusiveLocal; - long nToExclusiveLocal; - - if (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) == false || - sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) - { - return; // no need to run - } - - // ETW event for ParallelFor Worker Fork - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - - TLocal localValue = default(TLocal); - bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't - - try - { - - // Create a new state object that references the shared "stopped" and "exceptional" flags - // If needed, it will contain a new instance of thread-local state by invoking the selector. - ParallelLoopState64 state = null; - - if (bodyWithState != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState64(sharedPStateFlags); - } - else if (bodyWithLocal != null) - { - Debug.Assert(sharedPStateFlags != null); - state = new ParallelLoopState64(sharedPStateFlags); - - // If a thread-local selector was supplied, invoke it. Otherwise, use the default. - if (localInit != null) - { - localValue = localInit(); - bLocalValueInitialized = true; - } - } - - // initialize a loop timer which will help us decide whether we should exit early - LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); - - // Now perform the loop itself. - do - { - if (body != null) - { - for (long j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state - j += 1) - { - body(j); - } - } - else if (bodyWithState != null) - { - for (long j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - state.CurrentIteration = j; - bodyWithState(j, state); - } - } - else - { - for (long j = nFromInclusiveLocal; - j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline - || !sharedPStateFlags.ShouldExitLoop(j)); - j += 1) - { - state.CurrentIteration = j; - localValue = bodyWithLocal(j, state, localValue); - } - } - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = (object)currentWorker; - break; - } - } - // Exit if we can't find new work, or if the loop was stoppped. - while (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) && - ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) || - !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); - } - catch - { - // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow - sharedPStateFlags.SetExceptional(); - throw; - } - finally - { - // If a cleanup function was specified, call it. Otherwise, if the type is - // IDisposable, we will invoke Dispose on behalf of the user. - if (localFinally != null && bLocalValueInitialized) - { - localFinally(localValue); - } - - // ETW event for ParallelFor Worker Join - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - } - }, - creationOptions, internalOptions); - - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE - rootTask.Wait(); - - // If we made a cancellation registration, we need to clean it up now before observing the OCE - // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // If we got through that with no exceptions, and we were canceled, then - // throw our cancellation exception - if (oce != null) throw oce; - } - catch (AggregateException aggExp) - { - // if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - catch (TaskSchedulerException) - { - // if we made a cancellation registration, and rootTask.RunSynchronously threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - throw; - } - finally - { - int sb_status = sharedPStateFlags.LoopStateFlags; - result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE); - if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - { - result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; - } - - if ((rootTask != null) && rootTask.IsCompleted) rootTask.Dispose(); - - // ETW event for Parallel For End - if (TplEtwProvider.Log.IsEnabled()) - { - long nTotalIterations = 0; - - // calculate how many iterations we ran in total - if (sb_status == ParallelLoopStateFlags.PLS_NONE) - nTotalIterations = toExclusive - fromInclusive; - else if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive; - else - nTotalIterations = -1; //PLS_STOPPED! We can't determine this if we were stopped.. - - TplEtwProvider.Log.ParallelLoopEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, nTotalIterations); - } - } - - return result; - } - - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the current element as a parameter. - /// </remarks> - public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForEachWorker<TSource, object>( - source, s_defaultParallelOptions, body, null, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the current element as a parameter. - /// </remarks> - public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker<TSource, object>( - source, parallelOptions, body, null, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </remarks> - public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForEachWorker<TSource, object>( - source, s_defaultParallelOptions, null, body, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </remarks> - public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker<TSource, object>( - source, parallelOptions, null, body, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// </remarks> - public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return ForEachWorker<TSource, object>( - source, s_defaultParallelOptions, null, null, body, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// </remarks> - public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker<TSource, object>( - source, parallelOptions, null, null, body, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, - Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForEachWorker<TSource, TLocal>( - source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, - ParallelOptions parallelOptions, Func<TLocal> localInit, - Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker<TSource, TLocal>( - source, parallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return ForEachWorker<TSource, TLocal>( - source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally); - } - - /// <summary> - /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> - /// in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the data in the source.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// enumerable. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return ForEachWorker<TSource, TLocal>( - source, parallelOptions, null, null, null, null, body, localInit, localFinally); - } - - - /// <summary> - /// Performs the major work of the parallel foreach loop. It assumes that argument validation has - /// already been performed by the caller. This function's whole purpose in life is to enable as much - /// reuse of common implementation details for the various For overloads we offer. Without it, we'd - /// end up with lots of duplicate code. It handles: (1) simple foreach loops, (2) foreach loops that - /// depend on ParallelState, and (3) foreach loops that access indices, (4) foreach loops with thread - /// local data, and any necessary permutations thereof. - /// - /// </summary> - /// <typeparam name="TSource">The type of the source data.</typeparam> - /// <typeparam name="TLocal">The type of the local data.</typeparam> - /// <param name="source">An enumerable data source.</param> - /// <param name="parallelOptions">ParallelOptions instance to use with this ForEach-loop</param> - /// <param name="body">The simple loop body.</param> - /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> - /// <param name="bodyWithStateAndIndex">The loop body for ParallelState/indexed overloads.</param> - /// <param name="bodyWithStateAndLocal">The loop body for ParallelState/thread local state overloads.</param> - /// <param name="bodyWithEverything">The loop body for ParallelState/indexed/thread local state overloads.</param> - /// <param name="localInit">A selector function that returns new thread local state.</param> - /// <param name="localFinally">A cleanup function to destroy thread local state.</param> - /// <remarks>Only one of the bodyXX arguments may be supplied (i.e. they are exclusive).</remarks> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> - private static ParallelLoopResult ForEachWorker<TSource, TLocal>( - IEnumerable<TSource> source, - ParallelOptions parallelOptions, - Action<TSource> body, - Action<TSource, ParallelLoopState> bodyWithState, - Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, - Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, - Func<TLocal> localInit, Action<TLocal> localFinally) - { - Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + - (bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1, - "expected exactly one body function to be supplied"); - Debug.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null), - "thread local functions should only be supplied for loops w/ thread local bodies"); - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // If it's an array, we can use a fast-path that uses ldelems in the IL. - TSource[] sourceAsArray = source as TSource[]; - if (sourceAsArray != null) - { - return ForEachWorker<TSource, TLocal>( - sourceAsArray, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, - bodyWithEverything, localInit, localFinally); - } - - // If we can index into the list, we can use a faster code-path that doesn't result in - // contention for the single, shared enumerator object. - IList<TSource> sourceAsList = source as IList<TSource>; - if (sourceAsList != null) - { - return ForEachWorker<TSource, TLocal>( - sourceAsList, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, - bodyWithEverything, localInit, localFinally); - } - - // This is an honest-to-goodness IEnumerable. Wrap it in a Partitioner and defer to our - // ForEach(Partitioner) logic. - return PartitionerForEachWorker<TSource, TLocal>(Partitioner.Create(source), parallelOptions, body, bodyWithState, - bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); - - } - - /// <summary> - /// A fast path for the more general ForEachWorker method above. This uses ldelem instructions to - /// access the individual elements of the array, which will be faster. - /// </summary> - /// <typeparam name="TSource">The type of the source data.</typeparam> - /// <typeparam name="TLocal">The type of the local data.</typeparam> - /// <param name="array">An array data source.</param> - /// <param name="parallelOptions">The options to use for execution.</param> - /// <param name="body">The simple loop body.</param> - /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> - /// <param name="bodyWithStateAndIndex">The loop body for indexed/ParallelLoopState overloads.</param> - /// <param name="bodyWithStateAndLocal">The loop body for local/ParallelLoopState overloads.</param> - /// <param name="bodyWithEverything">The loop body for the most generic overload.</param> - /// <param name="localInit">A selector function that returns new thread local state.</param> - /// <param name="localFinally">A cleanup function to destroy thread local state.</param> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> - private static ParallelLoopResult ForEachWorker<TSource, TLocal>( - TSource[] array, - ParallelOptions parallelOptions, - Action<TSource> body, - Action<TSource, ParallelLoopState> bodyWithState, - Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, - Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, - Func<TLocal> localInit, Action<TLocal> localFinally) - { - Debug.Assert(array != null); - Debug.Assert(parallelOptions != null, "ForEachWorker(array): parallelOptions is null"); - - int from = array.GetLowerBound(0); - int to = array.GetUpperBound(0) + 1; - - if (body != null) - { - return ForWorker<object>( - from, to, parallelOptions, (i) => body(array[i]), null, null, null, null); - } - else if (bodyWithState != null) - { - return ForWorker<object>( - from, to, parallelOptions, null, (i, state) => bodyWithState(array[i], state), null, null, null); - } - else if (bodyWithStateAndIndex != null) - { - return ForWorker<object>( - from, to, parallelOptions, null, (i, state) => bodyWithStateAndIndex(array[i], state, i), null, null, null); - } - else if (bodyWithStateAndLocal != null) - { - return ForWorker<TLocal>( - from, to, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(array[i], state, local), localInit, localFinally); - } - else - { - return ForWorker<TLocal>( - from, to, parallelOptions, null, null, (i, state, local) => bodyWithEverything(array[i], state, i, local), localInit, localFinally); - } - } - - /// <summary> - /// A fast path for the more general ForEachWorker method above. This uses IList<T>'s indexer - /// capabilities to access the individual elements of the list rather than an enumerator. - /// </summary> - /// <typeparam name="TSource">The type of the source data.</typeparam> - /// <typeparam name="TLocal">The type of the local data.</typeparam> - /// <param name="list">A list data source.</param> - /// <param name="parallelOptions">The options to use for execution.</param> - /// <param name="body">The simple loop body.</param> - /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> - /// <param name="bodyWithStateAndIndex">The loop body for indexed/ParallelLoopState overloads.</param> - /// <param name="bodyWithStateAndLocal">The loop body for local/ParallelLoopState overloads.</param> - /// <param name="bodyWithEverything">The loop body for the most generic overload.</param> - /// <param name="localInit">A selector function that returns new thread local state.</param> - /// <param name="localFinally">A cleanup function to destroy thread local state.</param> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> - private static ParallelLoopResult ForEachWorker<TSource, TLocal>( - IList<TSource> list, - ParallelOptions parallelOptions, - Action<TSource> body, - Action<TSource, ParallelLoopState> bodyWithState, - Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, - Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, - Func<TLocal> localInit, Action<TLocal> localFinally) - { - Debug.Assert(list != null); - Debug.Assert(parallelOptions != null, "ForEachWorker(list): parallelOptions is null"); - - if (body != null) - { - return ForWorker<object>( - 0, list.Count, parallelOptions, (i) => body(list[i]), null, null, null, null); - } - else if (bodyWithState != null) - { - return ForWorker<object>( - 0, list.Count, parallelOptions, null, (i, state) => bodyWithState(list[i], state), null, null, null); - } - else if (bodyWithStateAndIndex != null) - { - return ForWorker<object>( - 0, list.Count, parallelOptions, null, (i, state) => bodyWithStateAndIndex(list[i], state, i), null, null, null); - } - else if (bodyWithStateAndLocal != null) - { - return ForWorker<TLocal>( - 0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(list[i], state, local), localInit, localFinally); - } - else - { - return ForWorker<TLocal>( - 0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithEverything(list[i], state, i, local), localInit, localFinally); - } - } - - - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> - /// Partitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <param name="source">The Partitioner that contains the original data source.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> Partitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return - /// the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList - /// with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the current element as a parameter. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource>( - Partitioner<TSource> source, - Action<TSource> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, body, null, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> - /// Partitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <param name="source">The Partitioner that contains the original data source.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> Partitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return - /// the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList - /// with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource>( - Partitioner<TSource> source, - Action<TSource, ParallelLoopState> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, body, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> - /// OrderablePartitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <param name="source">The OrderablePartitioner that contains the original data source.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner do not return the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IList with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource>( - OrderablePartitioner<TSource> source, - Action<TSource, ParallelLoopState, long> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, null, body, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> - /// Partitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">The Partitioner that contains the original data source.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> Partitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return - /// the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList - /// with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>( - Partitioner<TSource> source, - Func<TLocal> localInit, - Func<TSource, ParallelLoopState, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - return PartitionerForEachWorker<TSource, TLocal>(source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> - /// OrderablePartitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">The OrderablePartitioner that contains the original data source.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner do not return the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IList with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>( - OrderablePartitioner<TSource> source, - Func<TLocal> localInit, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker<TSource, TLocal>(source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> - /// Partitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <param name="source">The Partitioner that contains the original data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> Partitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return - /// the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList - /// with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the current element as a parameter. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource>( - Partitioner<TSource> source, - ParallelOptions parallelOptions, - Action<TSource> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return PartitionerForEachWorker<TSource, object>(source, parallelOptions, body, null, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> - /// Partitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <param name="source">The Partitioner that contains the original data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> Partitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return - /// the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList - /// with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource>( - Partitioner<TSource> source, - ParallelOptions parallelOptions, - Action<TSource, ParallelLoopState> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return PartitionerForEachWorker<TSource, object>(source, parallelOptions, null, body, null, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> - /// OrderablePartitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <param name="source">The OrderablePartitioner that contains the original data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner do not return the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IList with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and the current element's index (an Int64). - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource>( - OrderablePartitioner<TSource> source, - ParallelOptions parallelOptions, - Action<TSource, ParallelLoopState, long> body) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker<TSource, object>(source, parallelOptions, null, null, body, null, null, null, null); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> - /// Partitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">The Partitioner that contains the original data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> Partitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return - /// the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList - /// with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an - /// IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations - /// that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>( - Partitioner<TSource> source, - ParallelOptions parallelOptions, - Func<TLocal> localInit, - Func<TSource, ParallelLoopState, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - return PartitionerForEachWorker<TSource, TLocal>(source, parallelOptions, null, null, null, body, null, localInit, localFinally); - } - - /// <summary> - /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> - /// OrderablePartitioner</see> in which iterations may run in parallel. - /// </summary> - /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> - /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> - /// <param name="source">The OrderablePartitioner that contains the original data source.</param> - /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> - /// instance that configures the behavior of this operation.</param> - /// <param name="localInit">The function delegate that returns the initial state of the local data - /// for each thread.</param> - /// <param name="body">The delegate that is invoked once per iteration.</param> - /// <param name="localFinally">The delegate that performs a final action on the local state of each - /// thread.</param> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="parallelOptions"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> - /// argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localInit"/> argument is null.</exception> - /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the - /// <paramref name="localFinally"/> argument is null.</exception> - /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the - /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> - /// argument is set</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns - /// false.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any - /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner do not return the correct number of partitions.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IList with at least one null value.</exception> - /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the - /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> - /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> - /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception - /// thrown from one of the specified delegates.</exception> - /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the - /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the - /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the - /// <paramref name="parallelOptions"/> has been disposed.</exception> - /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure - /// that contains information on what portion of the loop completed.</returns> - /// <remarks> - /// <para> - /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve - /// the elements to be processed, in place of the original data source. If the current element's - /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> - /// OrderablePartitioner</see>. - /// </para> - /// <para> - /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> - /// Partitioner. It is provided with the following parameters: the current element, - /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be - /// used to break out of the loop prematurely, the current element's index (an Int64), and some local - /// state that may be shared amongst iterations that execute on the same thread. - /// </para> - /// <para> - /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's - /// execution and returns the initial local state for each of those threads. These initial states are passed to the first - /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly - /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value - /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final - /// action on each thread's local state. - /// </para> - /// </remarks> - public static ParallelLoopResult ForEach<TSource, TLocal>( - OrderablePartitioner<TSource> source, - ParallelOptions parallelOptions, - Func<TLocal> localInit, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, - Action<TLocal> localFinally) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - if (body == null) - { - throw new ArgumentNullException(nameof(body)); - } - if (localInit == null) - { - throw new ArgumentNullException(nameof(localInit)); - } - if (localFinally == null) - { - throw new ArgumentNullException(nameof(localFinally)); - } - if (parallelOptions == null) - { - throw new ArgumentNullException(nameof(parallelOptions)); - } - - if (!source.KeysNormalized) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized")); - } - - return PartitionerForEachWorker<TSource, TLocal>(source, parallelOptions, null, null, null, null, body, localInit, localFinally); - } - - // Main worker method for Parallel.ForEach() calls w/ Partitioners. - private static ParallelLoopResult PartitionerForEachWorker<TSource, TLocal>( - Partitioner<TSource> source, // Might be OrderablePartitioner - ParallelOptions parallelOptions, - Action<TSource> simpleBody, - Action<TSource, ParallelLoopState> bodyWithState, - Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, - Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, - Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, - Func<TLocal> localInit, - Action<TLocal> localFinally) - { - Debug.Assert(((simpleBody == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + - (bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1, - "PartitionForEach: expected exactly one body function to be supplied"); - Debug.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null), - "PartitionForEach: thread local functions should only be supplied for loops w/ thread local bodies"); - - OrderablePartitioner<TSource> orderedSource = source as OrderablePartitioner<TSource>; - Debug.Assert((orderedSource != null) || (bodyWithStateAndIndex == null && bodyWithEverything == null), - "PartitionForEach: bodies with indices are only allowable for OrderablePartitioner"); - - if (!source.SupportsDynamicPartitions) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerNotDynamic")); - } - - // Before getting started, do a quick peek to see if we have been canceled already - if (parallelOptions.CancellationToken.IsCancellationRequested) - { - throw new OperationCanceledException(parallelOptions.CancellationToken); - } - - // ETW event for Parallel For begin - int forkJoinContextID = 0; - Task callerTask = null; - if (TplEtwProvider.Log.IsEnabled()) - { - forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); - callerTask = Task.InternalCurrent; - TplEtwProvider.Log.ParallelLoopBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelForEach, - 0, 0); - } - - // For all loops we need a shared flag even though we don't have a body with state, - // because the shared flag contains the exceptional bool, which triggers other workers - // to exit their loops if one worker catches an exception - ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64(); - - // Instantiate our result. Specifics will be filled in later. - ParallelLoopResult result = new ParallelLoopResult(); - - // Keep track of any cancellations - OperationCanceledException oce = null; - - CancellationTokenRegistration ctr = new CancellationTokenRegistration(); - - // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) => - { - // Cause processing to stop - sharedPStateFlags.Cancel(); - // Record our cancellation - oce = new OperationCanceledException(parallelOptions.CancellationToken); - }, null); - } - - // Get our dynamic partitioner -- depends on whether source is castable to OrderablePartitioner - // Also, do some error checking. - IEnumerable<TSource> partitionerSource = null; - IEnumerable<KeyValuePair<long, TSource>> orderablePartitionerSource = null; - if (orderedSource != null) - { - orderablePartitionerSource = orderedSource.GetOrderableDynamicPartitions(); - if (orderablePartitionerSource == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerReturnedNull")); - } - } - else - { - partitionerSource = source.GetDynamicPartitions(); - if (partitionerSource == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerReturnedNull")); - } - } - - ParallelForReplicatingTask rootTask = null; - - // This is the action that will be run by each replicable task. - Action partitionAction = delegate - { - Task currentWorkerTask = Task.InternalCurrent; - - // ETW event for ParallelForEach Worker Fork - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - - TLocal localValue = default(TLocal); - bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't - IDisposable myPartitionToDispose = null; - - try - { - // Create a new state object that references the shared "stopped" and "exceptional" flags. - // If needed, it will contain a new instance of thread-local state by invoking the selector. - ParallelLoopState64 state = null; - - if (bodyWithState != null || bodyWithStateAndIndex != null) - { - state = new ParallelLoopState64(sharedPStateFlags); - } - else if (bodyWithStateAndLocal != null || bodyWithEverything != null) - { - state = new ParallelLoopState64(sharedPStateFlags); - // If a thread-local selector was supplied, invoke it. Otherwise, stick with the default. - if (localInit != null) - { - localValue = localInit(); - bLocalValueInitialized = true; - } - } - - - bool bIsRootTask = (rootTask == currentWorkerTask); - - // initialize a loop timer which will help us decide whether we should exit early - LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); - - if (orderedSource != null) - { - // Use this path for OrderablePartitioner - - - // first check if there's saved state from a previous replica that we might be replacing. - // the only state to be passed down in such a transition is the enumerator - IEnumerator<KeyValuePair<long, TSource>> myPartition = currentWorkerTask.SavedStateFromPreviousReplica as IEnumerator<KeyValuePair<long, TSource>>; - if (myPartition == null) - { - // apparently we're a brand new replica, get a fresh enumerator from the partitioner - myPartition = orderablePartitionerSource.GetEnumerator(); - if (myPartition == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_NullEnumerator")); - } - } - myPartitionToDispose = myPartition; - - while (myPartition.MoveNext()) - { - KeyValuePair<long, TSource> kvp = myPartition.Current; - long index = kvp.Key; - TSource value = kvp.Value; - - // Update our iteration index - if (state != null) state.CurrentIteration = index; - - if (simpleBody != null) - simpleBody(value); - else if (bodyWithState != null) - bodyWithState(value, state); - else if (bodyWithStateAndIndex != null) - bodyWithStateAndIndex(value, state, index); - else if (bodyWithStateAndLocal != null) - localValue = bodyWithStateAndLocal(value, state, localValue); - else - localValue = bodyWithEverything(value, state, index, localValue); - - if (sharedPStateFlags.ShouldExitLoop(index)) break; - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = myPartition; - myPartitionToDispose = null; - break; - } - } - - } - else - { - // Use this path for Partitioner that is not OrderablePartitioner - - // first check if there's saved state from a previous replica that we might be replacing. - // the only state to be passed down in such a transition is the enumerator - IEnumerator<TSource> myPartition = currentWorkerTask.SavedStateFromPreviousReplica as IEnumerator<TSource>; - if (myPartition == null) - { - // apparently we're a brand new replica, get a fresh enumerator from the partitioner - myPartition = partitionerSource.GetEnumerator(); - if (myPartition == null) - { - throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_NullEnumerator")); - } - } - myPartitionToDispose = myPartition; - - // I'm not going to try to maintain this - if (state != null) - state.CurrentIteration = 0; - - while (myPartition.MoveNext()) - { - TSource t = myPartition.Current; - - if (simpleBody != null) - simpleBody(t); - else if (bodyWithState != null) - bodyWithState(t, state); - else if (bodyWithStateAndLocal != null) - localValue = bodyWithStateAndLocal(t, state, localValue); - else - Debug.Assert(false, "PartitionerForEach: illegal body type in Partitioner handler"); - - - // Any break, stop or exception causes us to halt - // We don't have the global indexing information to discriminate whether or not - // we are before or after a break point. - if (sharedPStateFlags.LoopStateFlags != ParallelLoopStateFlags.PLS_NONE) - break; - - // Cooperative multitasking workaround for AppDomain fairness. - // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic - // will detect this, and queue up a replacement task. Note that we don't do this on the root task. - if (!bIsRootTask && loopTimer.LimitExceeded()) - { - currentWorkerTask.SavedStateForNextReplica = myPartition; - myPartitionToDispose = null; - break; - } - } - } - } - catch - { - // Inform other tasks of the exception, then rethrow - sharedPStateFlags.SetExceptional(); - throw; - } - finally - { - if (localFinally != null && bLocalValueInitialized) - { - localFinally(localValue); - } - - if (myPartitionToDispose != null) - { - myPartitionToDispose.Dispose(); - } - - // ETW event for ParallelFor Worker Join - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), - forkJoinContextID); - } - } - }; - - try - { - // Create and start the self-replicating task. - // This needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel - rootTask = new ParallelForReplicatingTask(parallelOptions, partitionAction, TaskCreationOptions.None, - InternalTaskOptions.SelfReplicating); - - // And process it's completion... - // Moved inside try{} block because faulty scheduler may throw here. - rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); - - rootTask.Wait(); - - // If we made a cancellation registration, we need to clean it up now before observing the OCE - // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // If we got through that with no exceptions, and we were canceled, then - // throw our cancellation exception - if (oce != null) throw oce; - } - catch (AggregateException aggExp) - { - // if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - - // see if we can combine it into a single OCE. If not propagate the original exception - ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); - throw; - } - catch (TaskSchedulerException) - { - // if we made a cancellation registration, and either we threw an exception constructing rootTask or - // rootTask.RunSynchronously threw, we need to clean it up here. - if (parallelOptions.CancellationToken.CanBeCanceled) - { - ctr.Dispose(); - } - throw; - } - finally - { - int sb_status = sharedPStateFlags.LoopStateFlags; - result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE); - if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0) - { - result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; - } - - if ((rootTask != null) && rootTask.IsCompleted) rootTask.Dispose(); - - //dispose the partitioner source if it implements IDisposable - IDisposable d = null; - if (orderablePartitionerSource != null) - { - d = orderablePartitionerSource as IDisposable; - } - else - { - d = partitionerSource as IDisposable; - } - - if (d != null) - { - d.Dispose(); - } - - // ETW event for Parallel For End - if (TplEtwProvider.Log.IsEnabled()) - { - TplEtwProvider.Log.ParallelLoopEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), - forkJoinContextID, 0); - } - } - - return result; - } - - /// <summary> - /// Internal utility function that implements the OCE filtering behavior for all Parallel.* APIs. - /// Throws a single OperationCancelledException object with the token if the Exception collection only contains - /// OperationCancelledExceptions with the given CancellationToken. - /// - /// </summary> - /// <param name="excCollection"> The exception collection to filter</param> - /// <param name="ct"> The CancellationToken expected on all inner exceptions</param> - /// <returns></returns> - internal static void ThrowIfReducableToSingleOCE(IEnumerable<Exception> excCollection, CancellationToken ct) - { - bool bCollectionNotZeroLength = false; - if (ct.IsCancellationRequested) - { - foreach (Exception e in excCollection) - { - bCollectionNotZeroLength = true; - OperationCanceledException oce = e as OperationCanceledException; - if (oce == null || oce.CancellationToken != ct) - { - // mismatch found - return; - } - } - - // all exceptions are OCEs with this token, let's throw it - if (bCollectionNotZeroLength) throw new OperationCanceledException(ct); - } - } - - internal struct LoopTimer - { - public LoopTimer(int nWorkerTaskIndex) - { - // This logic ensures that we have a diversity of timeouts across worker tasks (100, 150, 200, 250, 100, etc) - // Otherwise all worker will try to timeout at precisely the same point, which is bad if the work is just about to finish - int timeOut = s_BaseNotifyPeriodMS + (nWorkerTaskIndex % PlatformHelper.ProcessorCount) * s_NotifyPeriodIncrementMS; - - m_timeLimit = Environment.TickCount + timeOut; - } - - public bool LimitExceeded() - { - Debug.Assert(m_timeLimit != 0, "Probably the default initializer for LoopTimer was used somewhere"); - - // comparing against the next expected time saves an addition operation here - // Also we omit the comparison for wrap around here. The only side effect is one extra early yield every 38 days. - return (Environment.TickCount > m_timeLimit); - } - - const int s_BaseNotifyPeriodMS = 100; - const int s_NotifyPeriodIncrementMS = 50; - - private int m_timeLimit; - } - - } - -} |