diff options
Diffstat (limited to 'src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs')
-rw-r--r-- | src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs b/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs new file mode 100644 index 0000000000..c4b66c41a9 --- /dev/null +++ b/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs @@ -0,0 +1,278 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// Implements the algorithm for distributing loop indices to parallel loop workers +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System; +using System.Threading; +using System.Diagnostics.Contracts; + +#pragma warning disable 0420 + +namespace System.Threading.Tasks +{ + /// <summary> + /// Represents an index range + /// </summary> + internal struct IndexRange + { + // the From and To values for this range. These do not change. + internal long m_nFromInclusive; + internal long m_nToExclusive; + + // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual + // value saves us from overflows that can happen due to multiple workers racing to increment this. + // All updates to this field need to be interlocked. + internal volatile Shared<long> m_nSharedCurrentIndexOffset; + + // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here. + internal int m_bRangeFinished; + } + + + /// <summary> + /// The RangeWorker struct wraps the state needed by a task that services the parallel loop + /// </summary> + internal struct RangeWorker + { + // reference to the IndexRange array allocated by the range manager + internal readonly IndexRange[] m_indexRanges; + + // index of the current index range that this worker is grabbing chunks from + internal int m_nCurrentIndexRange; + + // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager) + internal long m_nStep; + + // increment value is the current amount that this worker will use + // to increment the shared index of the range it's working on + internal long m_nIncrementValue; + + // the increment value is doubled each time this worker finds work, and is capped at this value + internal readonly long m_nMaxIncrementValue; + + /// <summary> + /// Initializes a RangeWorker struct + /// </summary> + internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep) + { + m_indexRanges = ranges; + m_nCurrentIndexRange = nInitialRange; + m_nStep = nStep; + + m_nIncrementValue = nStep; + + m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep; + } + + /// <summary> + /// Implements the core work search algorithm that will be used for this range worker. + /// </summary> + /// + /// Usage pattern is: + /// 1) the thread associated with this rangeworker calls FindNewWork + /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values + /// to execute the sequential loop + /// 3) if we return false it means there is no more work left. It's time to quit. + /// + internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal) + { + // since we iterate over index ranges circularly, we will use the + // count of visited ranges as our exit condition + int numIndexRangesToVisit = m_indexRanges.Length; + + do + { + // local snap to save array access bounds checks in places where we only read fields + IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange]; + + if (currentRange.m_bRangeFinished == 0) + { + if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null) + { + Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared<long>(0), null); + } + + // this access needs to be on the array slot + long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value, + m_nIncrementValue) - m_nIncrementValue; + + if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset) + { + // we found work + + nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset; + nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue; + + // Check for going past end of range, or wrapping + if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) ) + { + nToExclusiveLocal = currentRange.m_nToExclusive; + } + + // We will double our unit of increment until it reaches the maximum. + if (m_nIncrementValue < m_nMaxIncrementValue) + { + m_nIncrementValue *= 2; + if (m_nIncrementValue > m_nMaxIncrementValue) + { + m_nIncrementValue = m_nMaxIncrementValue; + } + } + + return true; + } + else + { + // this index range is completed, mark it so that others can skip it quickly + Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1); + } + } + + // move on to the next index range, in circular order. + m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length; + numIndexRangesToVisit--; + + } while (numIndexRangesToVisit > 0); + // we've visited all index ranges possible => there's no work remaining + + nFromInclusiveLocal = 0; + nToExclusiveLocal = 0; + + return false; + } + + + /// <summary> + /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values. + /// </summary> + internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32) + { + long nFromInclusiveLocal; + long nToExclusiveLocal; + + bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal); + + Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) && + (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue)); + + // convert to 32 bit before returning + nFromInclusiveLocal32 = (int)nFromInclusiveLocal; + nToExclusiveLocal32 = (int)nToExclusiveLocal; + + return bRetVal; + } + } + + + /// <summary> + /// Represents the entire loop operation, keeping track of workers and ranges. + /// </summary> + /// + /// The usage pattern is: + /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class + /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a + /// RangeWorker struct to wrap the state it will need to find and execute work, + /// and they keep interacting with that struct until the end of the loop + internal class RangeManager + { + internal readonly IndexRange[] m_indexRanges; + + internal int m_nCurrentIndexRangeToAssign; + internal long m_nStep; + + /// <summary> + /// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges + /// </summary> + internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers) + { + m_nCurrentIndexRangeToAssign = 0; + m_nStep = nStep; + + // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2. + if (nNumExpectedWorkers == 1) + nNumExpectedWorkers = 2; + + // + // calculate the size of each index range + // + + ulong uSpan = (ulong)(nToExclusive - nFromInclusive); + ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first + + uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep + // otherwise index range transitions will derail us from nStep + + if (uRangeSize == 0) + { + uRangeSize = (ulong) nStep; + } + + // + // find the actual number of index ranges we will need + // + Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue); + + int nNumRanges = (int)(uSpan / uRangeSize); + + if (uSpan % uRangeSize != 0) + { + nNumRanges++; + } + + + // Convert to signed so the rest of the logic works. + // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. + long nRangeSize = (long)uRangeSize; + + // allocate the array of index ranges + m_indexRanges = new IndexRange[nNumRanges]; + + long nCurrentIndex = nFromInclusive; + for (int i = 0; i < nNumRanges; i++) + { + // the fromInclusive of the new index range is always on nCurrentIndex + m_indexRanges[i].m_nFromInclusive = nCurrentIndex; + m_indexRanges[i].m_nSharedCurrentIndexOffset = null; + m_indexRanges[i].m_bRangeFinished = 0; + + // now increment it to find the toExclusive value for our range + nCurrentIndex += nRangeSize; + + // detect integer overflow or range overage and snap to nToExclusive + if (nCurrentIndex < nCurrentIndex - nRangeSize || + nCurrentIndex > nToExclusive) + { + // this should only happen at the last index + Contract.Assert(i == nNumRanges - 1); + + nCurrentIndex = nToExclusive; + } + + // now that the end point of the new range is calculated, assign it. + m_indexRanges[i].m_nToExclusive = nCurrentIndex; + } + } + + /// <summary> + /// The function that needs to be called by each new worker thread servicing the parallel loop + /// in order to get a RangeWorker struct that wraps the state for finding and executing indices + /// </summary> + internal RangeWorker RegisterNewWorker() + { + Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0); + + int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length; + + return new RangeWorker(m_indexRanges, nInitialRange, m_nStep); + } + } +} +#pragma warning restore 0420 |