summaryrefslogtreecommitdiff
path: root/src/mscorlib/src/System/Threading/Tasks/ConcurrentExclusiveSchedulerPair.cs
blob: cb4a22bb2b0ae3f6d21b379c74246f79e19ada32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
//
//
// A pair of schedulers that together support concurrent (reader) / exclusive (writer) 
// task scheduling.  Using just the exclusive scheduler can be used to simulate a serial
// processing queue, and using just the concurrent scheduler with a specified 
// MaximumConcurrentlyLevel can be used to achieve a MaxDegreeOfParallelism across
// a bunch of tasks, parallel loops, dataflow blocks, etc.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Security;
using System.Security.Permissions;

namespace System.Threading.Tasks
{
    /// <summary>
    /// Provides concurrent and exclusive task schedulers that coordinate to execute
    /// tasks while ensuring that concurrent tasks may run concurrently and exclusive tasks never do.
    /// </summary>
    [HostProtection(Synchronization = true, ExternalThreading = true)]
    [DebuggerDisplay("Concurrent={ConcurrentTaskCountForDebugger}, Exclusive={ExclusiveTaskCountForDebugger}, Mode={ModeForDebugger}")]
    [DebuggerTypeProxy(typeof(ConcurrentExclusiveSchedulerPair.DebugView))]
    public class ConcurrentExclusiveSchedulerPair
    {
        /// <summary>A dictionary mapping thread ID to a processing mode to denote what kinds of tasks are currently being processed on this thread.</summary>
        private readonly ConcurrentDictionary<int, ProcessingMode> m_threadProcessingMapping = new ConcurrentDictionary<int, ProcessingMode>();
        /// <summary>The scheduler used to queue and execute "concurrent" tasks that may run concurrently with other concurrent tasks.</summary>
        private readonly ConcurrentExclusiveTaskScheduler m_concurrentTaskScheduler;
        /// <summary>The scheduler used to queue and execute "exclusive" tasks that must run exclusively while no other tasks for this pair are running.</summary>
        private readonly ConcurrentExclusiveTaskScheduler m_exclusiveTaskScheduler;
        /// <summary>The underlying task scheduler to which all work should be scheduled.</summary>
        private readonly TaskScheduler m_underlyingTaskScheduler;
        /// <summary>
        /// The maximum number of tasks allowed to run concurrently.  This only applies to concurrent tasks, 
        /// since exlusive tasks are inherently limited to 1.
        /// </summary>
        private readonly int m_maxConcurrencyLevel;
        /// <summary>The maximum number of tasks we can process before recyling our runner tasks.</summary>
        private readonly int m_maxItemsPerTask;
        /// <summary>
        /// If positive, it represents the number of concurrently running concurrent tasks.
        /// If negative, it means an exclusive task has been scheduled.
        /// If 0, nothing has been scheduled.
        /// </summary>
        private int m_processingCount;
        /// <summary>Completion state for a task representing the completion of this pair.</summary>
        /// <remarks>Lazily-initialized only if the scheduler pair is shutting down or if the Completion is requested.</remarks>
        private CompletionState m_completionState;

        /// <summary>A constant value used to signal unlimited processing.</summary>
        private const int UNLIMITED_PROCESSING = -1;
        /// <summary>Constant used for m_processingCount to indicate that an exclusive task is being processed.</summary>
        private const int EXCLUSIVE_PROCESSING_SENTINEL = -1;
        /// <summary>Default MaxItemsPerTask to use for processing if none is specified.</summary>
        private const int DEFAULT_MAXITEMSPERTASK = UNLIMITED_PROCESSING;
        /// <summary>Default MaxConcurrencyLevel is the processor count if not otherwise specified.</summary>
        private static Int32 DefaultMaxConcurrencyLevel { get { return Environment.ProcessorCount; } }

        /// <summary>Gets the sync obj used to protect all state on this instance.</summary>
        private object ValueLock { get { return m_threadProcessingMapping; } }

        /// <summary>
        /// Initializes the ConcurrentExclusiveSchedulerPair.
        /// </summary>
        public ConcurrentExclusiveSchedulerPair() :
            this(TaskScheduler.Default, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { }

        /// <summary>
        /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler.
        /// </summary>
        /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param>
        public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler) :
            this(taskScheduler, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { }

        /// <summary>
        /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum concurrency level.
        /// </summary>
        /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param>
        /// <param name="maxConcurrencyLevel">The maximum number of tasks to run concurrently.</param>
        public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel) : 
            this(taskScheduler, maxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) { }

        /// <summary>
        /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum 
        /// concurrency level and a maximum number of scheduled tasks that may be processed as a unit.
        /// </summary>
        /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param>
        /// <param name="maxConcurrencyLevel">The maximum number of tasks to run concurrently.</param>
        /// <param name="maxItemsPerTask">The maximum number of tasks to process for each underlying scheduled task used by the pair.</param>
        public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
        {
            // Validate arguments
            if (taskScheduler == null) throw new ArgumentNullException("taskScheduler");
            if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException("maxConcurrencyLevel");
            if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException("maxItemsPerTask");
            Contract.EndContractBlock();

            // Store configuration
            m_underlyingTaskScheduler = taskScheduler;
            m_maxConcurrencyLevel = maxConcurrencyLevel;
            m_maxItemsPerTask = maxItemsPerTask;

            // Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level
            int mcl = taskScheduler.MaximumConcurrencyLevel;
            if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl;

            // Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't
            // have to special case UNLIMITED_PROCESSING later on in processing.
            if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue;
            if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue;

            // Create the concurrent/exclusive schedulers for this pair
            m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask);
            m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks);
        }

        /// <summary>Informs the scheduler pair that it should not accept any more tasks.</summary>
        /// <remarks>
        /// Calling <see cref="Complete"/> is optional, and it's only necessary if the <see cref="Completion"/>
        /// will be relied on for notification of all processing being completed.
        /// </remarks>
        public void Complete()
        {
            lock (ValueLock)
            {
                if (!CompletionRequested)
                {
                    RequestCompletion();
                    CleanupStateIfCompletingAndQuiesced();
                }
            }
        }

        /// <summary>Gets a <see cref="System.Threading.Tasks.Task"/> that will complete when the scheduler has completed processing.</summary>
        public Task Completion 
        {
            // ValueLock not needed, but it's ok if it's held
            get { return EnsureCompletionStateInitialized().Task; }
        }

        /// <summary>Gets the lazily-initialized completion state.</summary>
        private CompletionState EnsureCompletionStateInitialized()
        {
            // ValueLock not needed, but it's ok if it's held
            return LazyInitializer.EnsureInitialized(ref m_completionState, () => new CompletionState());
        }

        /// <summary>Gets whether completion has been requested.</summary>
        private bool CompletionRequested
        {
            // ValueLock not needed, but it's ok if it's held
            get { return m_completionState != null && Volatile.Read(ref m_completionState.m_completionRequested); }
        }

        /// <summary>Sets that completion has been requested.</summary>
        private void RequestCompletion() 
        {
            ContractAssertMonitorStatus(ValueLock, held: true);
            EnsureCompletionStateInitialized().m_completionRequested = true;
        }

        /// <summary>
        /// Cleans up state if and only if there's no processing currently happening
        /// and no more to be done later.
        /// </summary>
        private void CleanupStateIfCompletingAndQuiesced()
        {
            ContractAssertMonitorStatus(ValueLock, held: true);
            if (ReadyToComplete) CompleteTaskAsync();
        }

        /// <summary>Gets whether the pair is ready to complete.</summary>
        private bool ReadyToComplete
        {
            get
            {
                ContractAssertMonitorStatus(ValueLock, held: true);

                // We can only complete if completion has been requested and no processing is currently happening.
                if (!CompletionRequested || m_processingCount != 0) return false;

                // Now, only allow shutdown if an exception occurred or if there are no more tasks to process.
                var cs = EnsureCompletionStateInitialized();
                return 
                    (cs.m_exceptions != null && cs.m_exceptions.Count > 0) ||
                    (m_concurrentTaskScheduler.m_tasks.IsEmpty && m_exclusiveTaskScheduler.m_tasks.IsEmpty); 
            }
        }

        /// <summary>Completes the completion task asynchronously.</summary>
        private void CompleteTaskAsync()
        {
            Contract.Requires(ReadyToComplete, "The block must be ready to complete to be here.");
            ContractAssertMonitorStatus(ValueLock, held: true);

            // Ensure we only try to complete once, then schedule completion
            // in order to escape held locks and the caller's context
            var cs = EnsureCompletionStateInitialized();
            if (!cs.m_completionQueued)
            {
                cs.m_completionQueued = true;
                ThreadPool.QueueUserWorkItem(state =>
                {
                    var localCs = (CompletionState)state; // don't use 'cs', as it'll force a closure
                    Contract.Assert(!localCs.Task.IsCompleted, "Completion should only happen once.");

                    var exceptions = localCs.m_exceptions;
                    bool success = (exceptions != null && exceptions.Count > 0) ?
                        localCs.TrySetException(exceptions) :
                        localCs.TrySetResult(default(VoidTaskResult));
                    Contract.Assert(success, "Expected to complete completion task.");
                }, cs);
            }
        }

        /// <summary>Initiatites scheduler shutdown due to a worker task faulting..</summary>
        /// <param name="faultedTask">The faulted worker task that's initiating the shutdown.</param>
        private void FaultWithTask(Task faultedTask)
        {
            Contract.Requires(faultedTask != null && faultedTask.IsFaulted && faultedTask.Exception.InnerExceptions.Count > 0,
                "Needs a task in the faulted state and thus with exceptions.");
            ContractAssertMonitorStatus(ValueLock, held: true);

            // Store the faulted task's exceptions
            var cs = EnsureCompletionStateInitialized();
            if (cs.m_exceptions == null) cs.m_exceptions = new List<Exception>();
            cs.m_exceptions.AddRange(faultedTask.Exception.InnerExceptions);

            // Now that we're doomed, request completion
            RequestCompletion();
        }

        /// <summary>
        /// Gets a TaskScheduler that can be used to schedule tasks to this pair
        /// that may run concurrently with other tasks on this pair.
        /// </summary>
        public TaskScheduler ConcurrentScheduler { get { return m_concurrentTaskScheduler; } }
        /// <summary>
        /// Gets a TaskScheduler that can be used to schedule tasks to this pair
        /// that must run exclusively with regards to other tasks on this pair.
        /// </summary>
        public TaskScheduler ExclusiveScheduler { get { return m_exclusiveTaskScheduler; } }

        /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
        /// <remarks>This does not take the necessary lock, as it's only called from under the debugger.</remarks>
        [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
        private int ConcurrentTaskCountForDebugger { get { return m_concurrentTaskScheduler.m_tasks.Count; } }

        /// <summary>Gets the number of tasks waiting to run exclusively.</summary>
        /// <remarks>This does not take the necessary lock, as it's only called from under the debugger.</remarks>
        [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
        private int ExclusiveTaskCountForDebugger { get { return m_exclusiveTaskScheduler.m_tasks.Count; } }

        /// <summary>Notifies the pair that new work has arrived to be processed.</summary>
        /// <param name="fairly">Whether tasks should be scheduled fairly with regards to other tasks.</param>
        /// <remarks>Must only be called while holding the lock.</remarks>
        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
        [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")]
        [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
        private void ProcessAsyncIfNecessary(bool fairly = false)
        {
            ContractAssertMonitorStatus(ValueLock, held: true);

            // If the current processing count is >= 0, we can potentially launch further processing.
            if (m_processingCount >= 0)
            {
                // We snap whether there are any exclusive tasks or concurrent tasks waiting.
                // (We grab the concurrent count below only once we know we need it.)
                // With processing happening concurrent to this operation, this data may 
                // immediately be out of date, but it can only go from non-empty
                // to empty and not the other way around.  As such, this is safe, 
                // as worst case is we'll schedule an extra  task when we didn't
                // otherwise need to, and we'll just eat its overhead.
                bool exclusiveTasksAreWaiting = !m_exclusiveTaskScheduler.m_tasks.IsEmpty;

                // If there's no processing currently happening but there are waiting exclusive tasks,
                // let's start processing those exclusive tasks.
                Task processingTask = null;
                if (m_processingCount == 0 && exclusiveTasksAreWaiting)
                {
                    // Launch exclusive task processing
                    m_processingCount = EXCLUSIVE_PROCESSING_SENTINEL; // -1
                    try
                    {
                        processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessExclusiveTasks(), this,
                            default(CancellationToken), GetCreationOptionsForTask(fairly));
                        processingTask.Start(m_underlyingTaskScheduler);
                        // When we call Start, if the underlying scheduler throws in QueueTask, TPL will fault the task and rethrow
                        // the exception.  To deal with that, we need a reference to the task object, so that we can observe its exception.
                        // Hence, we separate creation and starting, so that we can store a reference to the task before we attempt QueueTask.
                    }
                    catch
                    {
                        m_processingCount = 0;
                        FaultWithTask(processingTask);
                    }
                }
                // If there are no waiting exclusive tasks, there are concurrent tasks, and we haven't reached our maximum
                // concurrency level for processing, let's start processing more concurrent tasks.
                else
                {
                    int concurrentTasksWaitingCount = m_concurrentTaskScheduler.m_tasks.Count;

                    if (concurrentTasksWaitingCount > 0 && !exclusiveTasksAreWaiting && m_processingCount < m_maxConcurrencyLevel)
                    {
                        // Launch concurrent task processing, up to the allowed limit
                        for (int i = 0; i < concurrentTasksWaitingCount && m_processingCount < m_maxConcurrencyLevel; ++i)
                        {
                            ++m_processingCount;
                            try
                            {
                                processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessConcurrentTasks(), this,
                                    default(CancellationToken), GetCreationOptionsForTask(fairly));
                                processingTask.Start(m_underlyingTaskScheduler); // See above logic for why we use new + Start rather than StartNew
                            }
                            catch
                            {
                                --m_processingCount;
                                FaultWithTask(processingTask);
                            }
                        }
                    }
                }
                
                // Check to see if all tasks have completed and if completion has been requested.
                CleanupStateIfCompletingAndQuiesced();
            }
            else Contract.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing count must be the sentinel if it's not >= 0.");
        }

        /// <summary>
        /// Processes exclusive tasks serially until either there are no more to process
        /// or we've reached our user-specified maximum limit.
        /// </summary>
        private void ProcessExclusiveTasks()
        {
            Contract.Requires(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "Processing exclusive tasks requires being in exclusive mode.");
            Contract.Requires(!m_exclusiveTaskScheduler.m_tasks.IsEmpty, "Processing exclusive tasks requires tasks to be processed.");
            ContractAssertMonitorStatus(ValueLock, held: false);
            try
            {
                // Note that we're processing exclusive tasks on the current thread
                Contract.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId),
                    "This thread should not yet be involved in this pair's processing.");
                m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingExclusiveTask;

                // Process up to the maximum number of items per task allowed
                for (int i = 0; i < m_maxItemsPerTask; i++)
                {
                    // Get the next available exclusive task.  If we can't find one, bail.
                    Task exclusiveTask;
                    if (!m_exclusiveTaskScheduler.m_tasks.TryDequeue(out exclusiveTask)) break;

                    // Execute the task.  If the scheduler was previously faulted,
                    // this task could have been faulted when it was queued; ignore such tasks.
                    if (!exclusiveTask.IsFaulted) m_exclusiveTaskScheduler.ExecuteTask(exclusiveTask);
                }
            }
            finally
            {
                // We're no longer processing exclusive tasks on the current thread
                ProcessingMode currentMode;
                m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode);
                Contract.Assert(currentMode == ProcessingMode.ProcessingExclusiveTask, 
                    "Somehow we ended up escaping exclusive mode.");

                lock (ValueLock)
                {
                    // When this task was launched, we tracked it by setting m_processingCount to WRITER_IN_PROGRESS.
                    // now reset it to 0.  Then check to see whether there's more processing to be done.
                    // There might be more concurrent tasks available, for example, if concurrent tasks arrived
                    // after we exited the loop, or if we exited the loop while concurrent tasks were still
                    // available but we hit our maxItemsPerTask limit.
                    Contract.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing mode should not have deviated from exclusive.");
                    m_processingCount = 0;
                    ProcessAsyncIfNecessary(true);
                }
            }
        }

        /// <summary>
        /// Processes concurrent tasks serially until either there are no more to process,
        /// we've reached our user-specified maximum limit, or exclusive tasks have arrived.
        /// </summary>
        private void ProcessConcurrentTasks()
        {
            Contract.Requires(m_processingCount > 0, "Processing concurrent tasks requires us to be in concurrent mode.");
            ContractAssertMonitorStatus(ValueLock, held: false);
            try
            {
                // Note that we're processing concurrent tasks on the current thread
                Contract.Assert(!m_threadProcessingMapping.ContainsKey(Thread.CurrentThread.ManagedThreadId),
                    "This thread should not yet be involved in this pair's processing.");
                m_threadProcessingMapping[Thread.CurrentThread.ManagedThreadId] = ProcessingMode.ProcessingConcurrentTasks;

                // Process up to the maximum number of items per task allowed
                for (int i = 0; i < m_maxItemsPerTask; i++)
                {
                    // Get the next available concurrent task.  If we can't find one, bail.
                    Task concurrentTask;
                    if (!m_concurrentTaskScheduler.m_tasks.TryDequeue(out concurrentTask)) break;

                    // Execute the task.  If the scheduler was previously faulted,
                    // this task could have been faulted when it was queued; ignore such tasks.
                    if (!concurrentTask.IsFaulted) m_concurrentTaskScheduler.ExecuteTask(concurrentTask);

                    // Now check to see if exclusive tasks have arrived; if any have, they take priority
                    // so we'll bail out here.  Note that we could have checked this condition
                    // in the for loop's condition, but that could lead to extra overhead
                    // in the case where a concurrent task arrives, this task is launched, and then
                    // before entering the loop an exclusive task arrives.  If we didn't execute at
                    // least one task, we would have spent all of the overhead to launch a
                    // task but with none of the benefit.  There's of course also an inherent
                    // race condition here with regards to exclusive tasks arriving, and we're ok with
                    // executing one more concurrent task than we should before giving priority to exclusive tasks.
                    if (!m_exclusiveTaskScheduler.m_tasks.IsEmpty) break;
                }
            }
            finally
            {
                // We're no longer processing concurrent tasks on the current thread
                ProcessingMode currentMode;
                m_threadProcessingMapping.TryRemove(Thread.CurrentThread.ManagedThreadId, out currentMode);
                Contract.Assert(currentMode == ProcessingMode.ProcessingConcurrentTasks,
                    "Somehow we ended up escaping concurrent mode.");

                lock (ValueLock)
                {
                    // When this task was launched, we tracked it with a positive processing count;
                    // decrement that count.  Then check to see whether there's more processing to be done.
                    // There might be more concurrent tasks available, for example, if concurrent tasks arrived
                    // after we exited the loop, or if we exited the loop while concurrent tasks were still
                    // available but we hit our maxItemsPerTask limit.
                    Contract.Assert(m_processingCount > 0, "The procesing mode should not have deviated from concurrent.");
                    if (m_processingCount > 0) --m_processingCount;
                    ProcessAsyncIfNecessary(true);
                }
            }
        }

#if PRENET45
        /// <summary>
        /// Type used with TaskCompletionSource(Of TResult) as the TResult
        /// to ensure that the resulting task can't be upcast to something
        /// that in the future could lead to compat problems.
        /// </summary>
        [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
        [DebuggerNonUserCode]
        private struct VoidTaskResult { }
#endif

        /// <summary>
        /// Holder for lazily-initialized state about the completion of a scheduler pair.
        /// Completion is only triggered either by rare exceptional conditions or by
        /// the user calling Complete, and as such we only lazily initialize this
        /// state in one of those conditions or if the user explicitly asks for
        /// the Completion.
        /// </summary>
        [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
        private sealed class CompletionState : TaskCompletionSource<VoidTaskResult>
        {
            /// <summary>Whether the scheduler has had completion requested.</summary>
            /// <remarks>This variable is not volatile, so to gurantee safe reading reads, Volatile.Read is used in TryExecuteTaskInline.</remarks>
            internal bool m_completionRequested;
            /// <summary>Whether completion processing has been queued.</summary>
            internal bool m_completionQueued;
            /// <summary>Unrecoverable exceptions incurred while processing.</summary>
            internal List<Exception> m_exceptions;
        }

        /// <summary>
        /// A scheduler shim used to queue tasks to the pair and execute those tasks on request of the pair.
        /// </summary>
        [DebuggerDisplay("Count={CountForDebugger}, MaxConcurrencyLevel={m_maxConcurrencyLevel}, Id={Id}")]
        [DebuggerTypeProxy(typeof(ConcurrentExclusiveTaskScheduler.DebugView))]
        private sealed class ConcurrentExclusiveTaskScheduler : TaskScheduler
        {
            /// <summary>Cached delegate for invoking TryExecuteTaskShim.</summary>
            private static readonly Func<object, bool> s_tryExecuteTaskShim = new Func<object, bool>(TryExecuteTaskShim);
            /// <summary>The parent pair.</summary>
            private readonly ConcurrentExclusiveSchedulerPair m_pair;
            /// <summary>The maximum concurrency level for the scheduler.</summary>
            private readonly int m_maxConcurrencyLevel;
            /// <summary>The processing mode of this scheduler, exclusive or concurrent.</summary>
            private readonly ProcessingMode m_processingMode;
            /// <summary>Gets the queue of tasks for this scheduler.</summary>
            internal readonly IProducerConsumerQueue<Task> m_tasks;

            /// <summary>Initializes the scheduler.</summary>
            /// <param name="pair">The parent pair.</param>
            /// <param name="maxConcurrencyLevel">The maximum degree of concurrency this scheduler may use.</param>
            /// <param name="processingMode">The processing mode of this scheduler.</param>
            internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair pair, int maxConcurrencyLevel, ProcessingMode processingMode)
            {
                Contract.Requires(pair != null, "Scheduler must be associated with a valid pair.");
                Contract.Requires(processingMode == ProcessingMode.ProcessingConcurrentTasks || processingMode == ProcessingMode.ProcessingExclusiveTask,
                    "Scheduler must be for concurrent or exclusive processing.");
                Contract.Requires(
                    (processingMode == ProcessingMode.ProcessingConcurrentTasks && (maxConcurrencyLevel >= 1 || maxConcurrencyLevel == UNLIMITED_PROCESSING)) ||
                    (processingMode == ProcessingMode.ProcessingExclusiveTask && maxConcurrencyLevel == 1),
                    "If we're in concurrent mode, our concurrency level should be positive or unlimited.  If exclusive, it should be 1.");

                m_pair = pair;
                m_maxConcurrencyLevel = maxConcurrencyLevel;
                m_processingMode = processingMode;
                m_tasks = (processingMode == ProcessingMode.ProcessingExclusiveTask) ?
                    (IProducerConsumerQueue<Task>)new SingleProducerSingleConsumerQueue<Task>() :
                    (IProducerConsumerQueue<Task>)new MultiProducerMultiConsumerQueue<Task>();
            }

            /// <summary>Gets the maximum concurrency level this scheduler is able to support.</summary>
            public override int MaximumConcurrencyLevel { get { return m_maxConcurrencyLevel; } }

            /// <summary>Queues a task to the scheduler.</summary>
            /// <param name="task">The task to be queued.</param>
            [SecurityCritical]
            protected internal override void QueueTask(Task task)
            {
                Contract.Assert(task != null, "Infrastructure should have provided a non-null task.");
                lock (m_pair.ValueLock)
                {
                    // If the scheduler has already had completion requested, no new work is allowed to be scheduled
                    if (m_pair.CompletionRequested) throw new InvalidOperationException(GetType().Name);

                    // Queue the task, and then let the pair know that more work is now available to be scheduled
                    m_tasks.Enqueue(task);
                    m_pair.ProcessAsyncIfNecessary();
                }
            }

            /// <summary>Executes a task on this scheduler.</summary>
            /// <param name="task">The task to be executed.</param>
            [SecuritySafeCritical]
            internal void ExecuteTask(Task task)
            {
                Contract.Assert(task != null, "Infrastructure should have provided a non-null task.");
                base.TryExecuteTask(task);
            }

            /// <summary>Tries to execute the task synchronously on this scheduler.</summary>
            /// <param name="task">The task to execute.</param>
            /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to the scheduler.</param>
            /// <returns>true if the task could be executed; otherwise, false.</returns>
            [SecurityCritical]
            protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
            {
                Contract.Assert(task != null, "Infrastructure should have provided a non-null task.");

                // If the scheduler has had completion requested, no new work is allowed to be scheduled.
                // A non-locked read on m_completionRequested (in CompletionRequested) is acceptable here because:
                // a) we don't need to be exact... a Complete call could come in later in the function anyway
                // b) this is only a fast path escape hatch.  To actually inline the task,
                //    we need to be inside of an already executing task, and in such a case,
                //    while completion may have been requested, we can't have shutdown yet.
                if (!taskWasPreviouslyQueued && m_pair.CompletionRequested) return false;

                // We know the implementation of the default scheduler and how it will behave. 
                // As it's the most common underlying scheduler, we optimize for it.
                bool isDefaultScheduler = m_pair.m_underlyingTaskScheduler == TaskScheduler.Default;

                // If we're targeting the default scheduler and taskWasPreviouslyQueued is true,
                // we know that the default scheduler will only allow it to be inlined
                // if we're on a thread pool thread (but it won't always allow it in that case,
                // since it'll only allow inlining if it can find the task in the local queue).
                // As such, if we're not on a thread pool thread, we know for sure the
                // task won't be inlined, so let's not even try.
                if (isDefaultScheduler && taskWasPreviouslyQueued && !Thread.CurrentThread.IsThreadPoolThread)
                {
                    return false;
                }
                else
                {
                    // If a task is already running on this thread, allow inline execution to proceed.
                    // If there's already a task from this scheduler running on the current thread, we know it's safe
                    // to run this task, in effect temporarily taking that task's count allocation.
                    ProcessingMode currentThreadMode;
                    if (m_pair.m_threadProcessingMapping.TryGetValue(Thread.CurrentThread.ManagedThreadId, out currentThreadMode) &&
                        currentThreadMode == m_processingMode)
                    {
                        // If we're targeting the default scheduler and taskWasPreviouslyQueued is false,
                        // we know the default scheduler will allow it, so we can just execute it here.
                        // Otherwise, delegate to the target scheduler's inlining.
                        return (isDefaultScheduler && !taskWasPreviouslyQueued) ?
                            TryExecuteTask(task) :
                            TryExecuteTaskInlineOnTargetScheduler(task);
                    }
                }

                // We're not in the context of a task already executing on this scheduler.  Bail.
                return false;
            }

            /// <summary>
            /// Implements a reasonable approximation for TryExecuteTaskInline on the underlying scheduler, 
            /// which we can't call directly on the underlying scheduler.
            /// </summary>
            /// <param name="task">The task to execute inline if possible.</param>
            /// <returns>true if the task was inlined successfully; otherwise, false.</returns>
            [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")]
            private bool TryExecuteTaskInlineOnTargetScheduler(Task task)
            {
                // We'd like to simply call TryExecuteTaskInline here, but we can't.
                // As there's no built-in API for this, a workaround is to create a new task that,
                // when executed, will simply call TryExecuteTask to run the real task, and then
                // we run our new shim task synchronously on the target scheduler.  If all goes well,
                // our synchronous invocation will succeed in running the shim task on the current thread,
                // which will in turn run the real task on the current thread.  If the scheduler
                // doesn't allow that execution, RunSynchronously will block until the underlying scheduler
                // is able to invoke the task, which might account for an additional but unavoidable delay.
                // Once it's done, we can return whether the task executed by returning the
                // shim task's Result, which is in turn the result of TryExecuteTask.
                var t = new Task<bool>(s_tryExecuteTaskShim, Tuple.Create(this, task));
                try
                {
                    t.RunSynchronously(m_pair.m_underlyingTaskScheduler);
                    return t.Result;
                }
                catch
                {
                    Contract.Assert(t.IsFaulted, "Task should be faulted due to the scheduler faulting it and throwing the exception.");
                    var ignored = t.Exception;
                    throw;
                }
                finally { t.Dispose(); }
            }

            /// <summary>Shim used to invoke this.TryExecuteTask(task).</summary>
            /// <param name="state">A tuple of the ConcurrentExclusiveTaskScheduler and the task to execute.</param>
            /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
            /// <remarks>
            /// This method is separated out not because of performance reasons but so that
            /// the SecuritySafeCritical attribute may be employed.
            /// </remarks>
            [SecuritySafeCritical]
            private static bool TryExecuteTaskShim(object state)
            {
                var tuple = (Tuple<ConcurrentExclusiveTaskScheduler, Task>)state;
                return tuple.Item1.TryExecuteTask(tuple.Item2);
            }

            /// <summary>Gets for debugging purposes the tasks scheduled to this scheduler.</summary>
            /// <returns>An enumerable of the tasks queued.</returns>
            [SecurityCritical]
            protected override IEnumerable<Task> GetScheduledTasks() { return m_tasks; }

            /// <summary>Gets the number of tasks queued to this scheduler.</summary>
            [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
            private int CountForDebugger { get { return m_tasks.Count; } }

            /// <summary>Provides a debug view for ConcurrentExclusiveTaskScheduler.</summary>
            private sealed class DebugView
            {
                /// <summary>The scheduler being debugged.</summary>
                private readonly ConcurrentExclusiveTaskScheduler m_taskScheduler;

                /// <summary>Initializes the debug view.</summary>
                /// <param name="scheduler">The scheduler being debugged.</param>
                public DebugView(ConcurrentExclusiveTaskScheduler scheduler)
                {
                    Contract.Requires(scheduler != null, "Need a scheduler with which to construct the debug view.");
                    m_taskScheduler = scheduler;
                }

                /// <summary>Gets this pair's maximum allowed concurrency level.</summary>
                public int MaximumConcurrencyLevel { get { return m_taskScheduler.m_maxConcurrencyLevel; } }
                /// <summary>Gets the tasks scheduled to this scheduler.</summary>
                public IEnumerable<Task> ScheduledTasks { get { return m_taskScheduler.m_tasks; } }
                /// <summary>Gets the scheduler pair with which this scheduler is associated.</summary>
                public ConcurrentExclusiveSchedulerPair SchedulerPair { get { return m_taskScheduler.m_pair; } }
            }
        }

        /// <summary>Provides a debug view for ConcurrentExclusiveSchedulerPair.</summary>
        private sealed class DebugView
        {
            /// <summary>The pair being debugged.</summary>
            private readonly ConcurrentExclusiveSchedulerPair m_pair;

            /// <summary>Initializes the debug view.</summary>
            /// <param name="pair">The pair being debugged.</param>
            public DebugView(ConcurrentExclusiveSchedulerPair pair)
            {
                Contract.Requires(pair != null, "Need a pair with which to construct the debug view.");
                m_pair = pair;
            }

            /// <summary>Gets a representation of the execution state of the pair.</summary>
            public ProcessingMode Mode { get { return m_pair.ModeForDebugger; } }
            /// <summary>Gets the number of tasks waiting to run exclusively.</summary>
            public IEnumerable<Task> ScheduledExclusive { get { return m_pair.m_exclusiveTaskScheduler.m_tasks; } }
            /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
            public IEnumerable<Task> ScheduledConcurrent { get { return m_pair.m_concurrentTaskScheduler.m_tasks; } }
            /// <summary>Gets the number of tasks currently being executed.</summary>
            public int CurrentlyExecutingTaskCount
            {
                get { return (m_pair.m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) ? 1 : m_pair.m_processingCount; }
            }
            /// <summary>Gets the underlying task scheduler that actually executes the tasks.</summary>
            public TaskScheduler TargetScheduler { get { return m_pair.m_underlyingTaskScheduler; } }
        }

        /// <summary>Gets an enumeration for debugging that represents the current state of the scheduler pair.</summary>
        /// <remarks>This is only for debugging.  It does not take the necessary locks to be useful for runtime usage.</remarks>
        private ProcessingMode ModeForDebugger
        {
            get
            {
                // If our completion task is done, so are we.
                if (m_completionState != null && m_completionState.Task.IsCompleted) return ProcessingMode.Completed;

                // Otherwise, summarize our current state.
                var mode = ProcessingMode.NotCurrentlyProcessing;
                if (m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) mode |= ProcessingMode.ProcessingExclusiveTask;
                if (m_processingCount >= 1) mode |= ProcessingMode.ProcessingConcurrentTasks;
                if (CompletionRequested) mode |= ProcessingMode.Completing;
                return mode;
            }
        }
        
        /// <summary>Asserts that a given synchronization object is either held or not held.</summary>
        /// <param name="syncObj">The monitor to check.</param>
        /// <param name="held">Whether we want to assert that it's currently held or not held.</param>
        [Conditional("DEBUG")]
        internal static void ContractAssertMonitorStatus(object syncObj, bool held)
        {
            Contract.Requires(syncObj != null, "The monitor object to check must be provided.");
#if PRENET45
#if DEBUG
            // This check is expensive,
            // which is why it's protected by ShouldCheckMonitorStatus and controlled by an environment variable DEBUGSYNC.
            if (ShouldCheckMonitorStatus)
            {
                bool exceptionThrown;
                try
                {
                    Monitor.Pulse(syncObj); // throws a SynchronizationLockException if the monitor isn't held by this thread
                    exceptionThrown = false;
                }
                catch (SynchronizationLockException) { exceptionThrown = true; }
                Contract.Assert(held == !exceptionThrown, "The locking scheme was not correctly followed.");
            }
#endif
#else
            Contract.Assert(Monitor.IsEntered(syncObj) == held, "The locking scheme was not correctly followed.");
#endif
        }
        
        /// <summary>Gets the options to use for tasks.</summary>
        /// <param name="isReplacementReplica">If this task is being created to replace another.</param>
        /// <remarks>
        /// These options should be used for all tasks that have the potential to run user code or
        /// that are repeatedly spawned and thus need a modicum of fair treatment.
        /// </remarks>
        /// <returns>The options to use.</returns>
        internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false)
        {
            TaskCreationOptions options = 
#if PRENET45
                TaskCreationOptions.None;
#else
                TaskCreationOptions.DenyChildAttach;
#endif
            if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness;
            return options;
        }

        /// <summary>Provides an enumeration that represents the current state of the scheduler pair.</summary>
        [Flags]
        private enum ProcessingMode : byte
        {
            /// <summary>The scheduler pair is currently dormant, with no work scheduled.</summary>
            NotCurrentlyProcessing = 0x0,
            /// <summary>The scheduler pair has queued processing for exclusive tasks.</summary>
            ProcessingExclusiveTask = 0x1,
            /// <summary>The scheduler pair has queued processing for concurrent tasks.</summary>
            ProcessingConcurrentTasks = 0x2,
            /// <summary>Completion has been requested.</summary>
            Completing = 0x4,
            /// <summary>The scheduler pair is finished processing.</summary>
            Completed = 0x8
        }
    }
    
}