// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. #pragma warning disable 0420 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // // // A class of default partitioners for Partitioner // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Security.Permissions; using System.Threading; using System.Diagnostics; using System.Diagnostics.Contracts; using System.Runtime.InteropServices; namespace System.Collections.Concurrent { /// /// Out-of-the-box partitioners are created with a set of default behaviors. /// For example, by default, some form of buffering and chunking will be employed to achieve /// optimal performance in the common scenario where an IEnumerable<> implementation is fast and /// non-blocking. These behaviors can be overridden via this enumeration. /// [Flags] public enum EnumerablePartitionerOptions { /// /// Use the default behavior (i.e., use buffering to achieve optimal performance) /// None = 0x0, /// /// Creates a partitioner that will take items from the source enumerable one at a time /// and will not use intermediate storage that can be accessed more efficiently by multiple threads. /// This option provides support for low latency (items will be processed as soon as they are available from /// the source) and partial support for dependencies between items (a thread cannot deadlock waiting for an item /// that it, itself, is responsible for processing). /// NoBuffering = 0x1 } // The static class Partitioners implements 3 default partitioning strategies: // 1. dynamic load balance partitioning for indexable data source (IList and arrays) // 2. static partitioning for indexable data source (IList and arrays) // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order // of elements, but enuemrators are not indexable // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3. // We assume that the source data of IList/Array is not changing concurrently. // - data source of type IEnumerable can only be partitioned dynamically (load-balance) // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the // implementation is different for data source of IList/Array vs. IEnumerable: // * When the source collection is IList/Arrays, we use Interlocked on the shared index; // * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source // enumerator. /// /// Provides common partitioning strategies for arrays, lists, and enumerables. /// /// /// /// The static methods on are all thread-safe and may be used concurrently /// from multiple threads. However, while a created partitioner is in use, the underlying data source /// should not be modified, whether from the same thread that's using a partitioner or from a separate /// thread. /// /// public static class Partitioner { /// /// Creates an orderable partitioner from an /// instance. /// /// Type of the elements in source list. /// The list to be partitioned. /// /// A Boolean value that indicates whether the created partitioner should dynamically /// load balance between partitions rather than statically partition. /// /// /// An orderable partitioner based on the input list. /// public static OrderablePartitioner Create(IList list, bool loadBalance) { if (list == null) { throw new ArgumentNullException(nameof(list)); } if (loadBalance) { return (new DynamicPartitionerForIList(list)); } else { return (new StaticIndexRangePartitionerForIList(list)); } } /// /// Creates an orderable partitioner from a instance. /// /// Type of the elements in source array. /// The array to be partitioned. /// /// A Boolean value that indicates whether the created partitioner should dynamically load balance /// between partitions rather than statically partition. /// /// /// An orderable partitioner based on the input array. /// public static OrderablePartitioner Create(TSource[] array, bool loadBalance) { // This implementation uses 'ldelem' instructions for element retrieval, rather than using a // method call. if (array == null) { throw new ArgumentNullException(nameof(array)); } if (loadBalance) { return (new DynamicPartitionerForArray(array)); } else { return (new StaticIndexRangePartitionerForArray(array)); } } /// /// Creates an orderable partitioner from a instance. /// /// Type of the elements in source enumerable. /// The enumerable to be partitioned. /// /// An orderable partitioner based on the input array. /// /// /// The ordering used in the created partitioner is determined by the natural order of the elements /// as retrieved from the source enumerable. /// public static OrderablePartitioner Create(IEnumerable source) { return Create(source, EnumerablePartitionerOptions.None); } /// /// Creates an orderable partitioner from a instance. /// /// Type of the elements in source enumerable. /// The enumerable to be partitioned. /// Options to control the buffering behavior of the partitioner. /// /// The argument specifies an invalid value for . /// /// /// An orderable partitioner based on the input array. /// /// /// The ordering used in the created partitioner is determined by the natural order of the elements /// as retrieved from the source enumerable. /// public static OrderablePartitioner Create(IEnumerable source, EnumerablePartitionerOptions partitionerOptions) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if ((partitionerOptions & (~EnumerablePartitionerOptions.NoBuffering)) != 0) throw new ArgumentOutOfRangeException(nameof(partitionerOptions)); return (new DynamicPartitionerForIEnumerable(source, partitionerOptions)); } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. /// A partitioner. /// The argument is /// less than or equal to the argument. public static OrderablePartitioner> Create(long fromInclusive, long toExclusive) { // How many chunks do we want to divide the range into? If this is 1, then the // answer is "one chunk per core". Generally, though, you'll achieve better // load balancing on a busy system if you make it higher than 1. int coreOversubscriptionRate = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException(nameof(toExclusive)); long rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * coreOversubscriptionRate); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. /// The size of each subrange. /// A partitioner. /// The argument is /// less than or equal to the argument. /// The argument is /// less than or equal to 0. public static OrderablePartitioner> Create(long fromInclusive, long toExclusive, long rangeSize) { if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException(nameof(toExclusive)); if (rangeSize <= 0) throw new ArgumentOutOfRangeException(nameof(rangeSize)); return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time } // Private method to parcel out range tuples. private static IEnumerable> CreateRanges(long fromInclusive, long toExclusive, long rangeSize) { // Enumerate all of the ranges long from, to; bool shouldQuit = false; for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize) { from = i; try { checked { to = i + rangeSize; } } catch (OverflowException) { to = toExclusive; shouldQuit = true; } if (to > toExclusive) to = toExclusive; yield return new Tuple(from, to); } } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. /// A partitioner. /// The argument is /// less than or equal to the argument. public static OrderablePartitioner> Create(int fromInclusive, int toExclusive) { // How many chunks do we want to divide the range into? If this is 1, then the // answer is "one chunk per core". Generally, though, you'll achieve better // load balancing on a busy system if you make it higher than 1. int coreOversubscriptionRate = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException(nameof(toExclusive)); int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * coreOversubscriptionRate); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. /// The size of each subrange. /// A partitioner. /// The argument is /// less than or equal to the argument. /// The argument is /// less than or equal to 0. public static OrderablePartitioner> Create(int fromInclusive, int toExclusive, int rangeSize) { if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException(nameof(toExclusive)); if (rangeSize <= 0) throw new ArgumentOutOfRangeException(nameof(rangeSize)); return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time } // Private method to parcel out range tuples. private static IEnumerable> CreateRanges(int fromInclusive, int toExclusive, int rangeSize) { // Enumerate all of the ranges int from, to; bool shouldQuit = false; for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize) { from = i; try { checked { to = i + rangeSize; } } catch (OverflowException) { to = toExclusive; shouldQuit = true; } if (to > toExclusive) to = toExclusive; yield return new Tuple(from, to); } } #region DynamicPartitionEnumerator_Abstract class /// /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance /// partitioning algorithm. /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source: /// the key is the index in the source collection; the value is the item itself. /// - a set of such partitions share a reader over data source. The type of the reader is specified by /// TSourceReader. /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk /// size is initially 1, and doubles every time until it reaches the maximum chunk size. /// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange /// types (IList and the array), one for data source of IEnumerable. /// - The method "Reset" is not supported for any partitioning algorithm. /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it /// in this abstract class. /// /// Type of the elements in the data source /// Type of the reader on the data source //TSourceReader is // - IList, when source data is IList, the shared reader is source data itself // - TSource[], when source data is TSource[], the shared reader is source data itself // - IEnumerator, when source data is IEnumerable, and the shared reader is an // enumerator of the source data private abstract class DynamicPartitionEnumerator_Abstract : IEnumerator> { //----------------- common fields and constructor for all dynamic partitioners ----------------- //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator protected readonly TSourceReader m_sharedReader; protected static int s_defaultMaxChunkSize = GetDefaultChunkSize(); //deferred allocating in MoveNext() with initial value 0, to avoid false sharing //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator protected SharedInt m_currentChunkSize; //deferring allocation in MoveNext() with initial value -1, to avoid false sharing protected SharedInt m_localOffset; private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles protected readonly int m_maxChunkSize; // s_defaultMaxChunkSize unless single-chunking is requested by the caller // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable // it serves as tracking of the natual order of elements in m_sharedReader // the value of this field is passed in from outside (already initialized) by the constructor, protected readonly SharedLong m_sharedIndex; protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex) : this(sharedReader, sharedIndex, false) { } protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex, bool useSingleChunking) { m_sharedReader = sharedReader; m_sharedIndex = sharedIndex; m_maxChunkSize = useSingleChunking ? 1 : s_defaultMaxChunkSize; } // ---------------- abstract method declarations -------------- /// /// Abstract method to request a contiguous chunk of elements from the source collection /// /// specified number of elements requested /// /// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// //GrabNextChunk does the following: // - grab # of requestedChunkSize elements from source data through shared reader, // - at the time of function returns, m_currentChunkSize is updated with the number of // elements actually got assigned (<=requestedChunkSize). // - GrabNextChunk returns true if at least one element is assigned to this partition; // false if the shared reader already hits the last element of the source data before // we call GrabNextChunk protected abstract bool GrabNextChunk(int requestedChunkSize); /// /// Abstract property, returns whether or not the shared reader has already read the last /// element of the source data /// protected abstract bool HasNoElementsLeft { get; set; } /// /// Get the current element in the current partition. Property required by IEnumerator interface /// This property is abstract because the implementation is different depending on the type /// of the source data: IList, Array or IEnumerable /// public abstract KeyValuePair Current { get; } /// /// Dispose is abstract, and depends on the type of the source data: /// - For source data type IList and Array, the type of the shared reader is just the dataitself. /// We don't do anything in Dispose method for IList and Array. /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created. /// Thus we need to dispose this shared reader enumerator, when there is no more active partitions /// left. /// public abstract void Dispose(); /// /// Reset on partitions is not supported /// public void Reset() { throw new NotSupportedException(); } /// /// Get the current element in the current partition. Property required by IEnumerator interface /// Object IEnumerator.Current { get { return ((DynamicPartitionEnumerator_Abstract)this).Current; } } /// /// Moves to the next element if any. /// Try current chunk first, if the current chunk do not have any elements left, then we /// attempt to grab a chunk from the source collection. /// /// /// true if successfully moving to the next position; /// false otherwise, if and only if there is no more elements left in the current chunk /// AND the source collection is exhausted. /// public bool MoveNext() { //perform deferred allocating of the local variables. if (m_localOffset == null) { Debug.Assert(m_currentChunkSize == null); m_localOffset = new SharedInt(-1); m_currentChunkSize = new SharedInt(0); m_doublingCountdown = CHUNK_DOUBLING_RATE; } if (m_localOffset.Value < m_currentChunkSize.Value - 1) //attempt to grab the next element from the local chunk { m_localOffset.Value++; return true; } else //otherwise it means we exhausted the local chunk //grab a new chunk from the source enumerator { // The second part of the || condition is necessary to handle the case when MoveNext() is called // after a previous MoveNext call returned false. Debug.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1 || m_currentChunkSize.Value == 0); //set the requested chunk size to a proper value int requestedChunkSize; if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator { requestedChunkSize = 1; } else if (m_doublingCountdown > 0) { requestedChunkSize = m_currentChunkSize.Value; } else { requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize); m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset } // Decrement your doubling countdown m_doublingCountdown--; Debug.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize); //GrabNextChunk will update the value of m_currentChunkSize if (GrabNextChunk(requestedChunkSize)) { Debug.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0); m_localOffset.Value = 0; return true; } else { return false; } } } } #endregion #region Dynamic Partitioner for source data of IEnuemrable<> type /// /// Inherits from DynamicPartitioners /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance /// of EnumerableOfPartitionsForIEnumerator defined internally /// /// Type of elements in the source data private class DynamicPartitionerForIEnumerable : OrderablePartitioner { IEnumerable m_source; readonly bool m_useSingleChunking; //constructor internal DynamicPartitionerForIEnumerable(IEnumerable source, EnumerablePartitionerOptions partitionerOptions) : base(true, false, true) { m_source = source; m_useSingleChunking = ((partitionerOptions & EnumerablePartitionerOptions.NoBuffering) != 0); } /// /// Overrides OrderablePartitioner.GetOrderablePartitions. /// Partitions the underlying collection into the given number of orderable partitions. /// /// number of partitions requested /// A list containing enumerators. override public IList>> GetOrderablePartitions(int partitionCount) { if (partitionCount <= 0) { throw new ArgumentOutOfRangeException(nameof(partitionCount)); } IEnumerator>[] partitions = new IEnumerator>[partitionCount]; IEnumerable> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, true); for (int i = 0; i < partitionCount; i++) { partitions[i] = partitionEnumerable.GetEnumerator(); } return partitions; } /// /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions /// /// a enumerable collection of orderable partitions override public IEnumerable> GetOrderableDynamicPartitions() { return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, false); } /// /// Whether additional partitions can be created dynamically. /// override public bool SupportsDynamicPartitions { get { return true; } } #region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator /// /// Provides customized implementation for source data of IEnumerable /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a /// shared count "m_activePartitionCount" used to track active partitions when they were created statically /// private class InternalPartitionEnumerable : IEnumerable>, IDisposable { //reader through which we access the source data private readonly IEnumerator m_sharedReader; private SharedLong m_sharedIndex;//initial value -1 private volatile KeyValuePair[] m_FillBuffer; // intermediate buffer to reduce locking private volatile int m_FillBufferSize; // actual number of elements in m_FillBuffer. Will start // at m_FillBuffer.Length, and might be reduced during the last refill private volatile int m_FillBufferCurrentPosition; //shared value to be accessed by Interlock.Increment only private volatile int m_activeCopiers; //number of active copiers //fields shared by all partitions that this Enumerable owns, their allocation is deferred private SharedBool m_hasNoElementsLeft; // no elements left at all. private SharedBool m_sourceDepleted; // no elements left in the enumerator, but there may be elements in the Fill Buffer //shared synchronization lock, created by this Enumerable private object m_sharedLock;//deferring allocation by enumerator private bool m_disposed; // If dynamic partitioning, then m_activePartitionCount == null // If static partitioning, then it keeps track of active partition count private SharedInt m_activePartitionCount; // records whether or not the user has requested single-chunking behavior private readonly bool m_useSingleChunking; internal InternalPartitionEnumerable(IEnumerator sharedReader, bool useSingleChunking, bool isStaticPartitioning) { m_sharedReader = sharedReader; m_sharedIndex = new SharedLong(-1); m_hasNoElementsLeft = new SharedBool(false); m_sourceDepleted = new SharedBool(false); m_sharedLock = new object(); m_useSingleChunking = useSingleChunking; // Only allocate the fill-buffer if single-chunking is not in effect if (!m_useSingleChunking) { // Time to allocate the fill buffer which is used to reduce the contention on the shared lock. // First pick the buffer size multiplier. We use 4 for when there are more than 4 cores, and just 1 for below. This is based on empirical evidence. int fillBufferMultiplier = (PlatformHelper.ProcessorCount > 4) ? 4 : 1; // and allocate the fill buffer using these two numbers m_FillBuffer = new KeyValuePair[fillBufferMultiplier * Partitioner.GetDefaultChunkSize()]; } if (isStaticPartitioning) { // If this object is created for static partitioning (ie. via GetPartitions(int partitionCount), // GetOrderablePartitions(int partitionCount)), we track the active partitions, in order to dispose // this object when all the partitions have been disposed. m_activePartitionCount = new SharedInt(0); } else { // Otherwise this object is created for dynamic partitioning (ie, via GetDynamicPartitions(), // GetOrderableDynamicPartitions()), we do not need tracking. This object must be disposed // explicitly m_activePartitionCount = null; } } public IEnumerator> GetEnumerator() { if (m_disposed) { throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed")); } else { return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex, m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_useSingleChunking); } } IEnumerator IEnumerable.GetEnumerator() { return ((InternalPartitionEnumerable)this).GetEnumerator(); } /////////////////// // // Used by GrabChunk_Buffered() private void TryCopyFromFillBuffer(KeyValuePair[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) { actualNumElementsGrabbed = 0; // making a local defensive copy of the fill buffer reference, just in case it gets nulled out KeyValuePair[] fillBufferLocalRef = m_FillBuffer; if (fillBufferLocalRef == null) return; // first do a quick check, and give up if the current position is at the end // so that we don't do an unncessary pair of Interlocked.Increment / Decrement calls if (m_FillBufferCurrentPosition >= m_FillBufferSize) { return; // no elements in the buffer to copy from } // We might have a chance to grab elements from the buffer. We will know for sure // when we do the Interlocked.Add below. // But first we must register as a potential copier in order to make sure // the elements we're copying from don't get overwritten by another thread // that starts refilling the buffer right after our Interlocked.Add. Interlocked.Increment(ref m_activeCopiers); int endPos = Interlocked.Add(ref m_FillBufferCurrentPosition, requestedChunkSize); int beginPos = endPos - requestedChunkSize; if (beginPos < m_FillBufferSize) { // adjust index and do the actual copy actualNumElementsGrabbed = (endPos < m_FillBufferSize) ? endPos : m_FillBufferSize - beginPos; Array.Copy(fillBufferLocalRef, beginPos, destArray, 0, actualNumElementsGrabbed); } // let the record show we are no longer accessing the buffer Interlocked.Decrement(ref m_activeCopiers); } /// /// This is the common entry point for consuming items from the source enumerable /// /// /// true if we successfully reserved at least one element /// false if all elements in the source collection have been reserved. /// internal bool GrabChunk(KeyValuePair[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) { actualNumElementsGrabbed = 0; if (m_hasNoElementsLeft.Value) { return false; } if (m_useSingleChunking) { return GrabChunk_Single(destArray, requestedChunkSize, ref actualNumElementsGrabbed); } else { return GrabChunk_Buffered(destArray, requestedChunkSize, ref actualNumElementsGrabbed); } } /// /// Version of GrabChunk that grabs a single element at a time from the source enumerable /// /// /// true if we successfully reserved an element /// false if all elements in the source collection have been reserved. /// internal bool GrabChunk_Single(KeyValuePair[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) { Debug.Assert(m_useSingleChunking, "Expected m_useSingleChecking to be true"); Debug.Assert(requestedChunkSize == 1, "Got requested chunk size of " + requestedChunkSize + " when single-chunking was on"); Debug.Assert(actualNumElementsGrabbed == 0, "Expected actualNumElementsGrabbed == 0, instead it is " + actualNumElementsGrabbed); Debug.Assert(destArray.Length == 1, "Expected destArray to be of length 1, instead its length is " + destArray.Length); lock (m_sharedLock) { if (m_hasNoElementsLeft.Value) return false; try { if (m_sharedReader.MoveNext()) { m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); destArray[0] = new KeyValuePair(m_sharedIndex.Value, m_sharedReader.Current); actualNumElementsGrabbed = 1; return true; } else { //if MoveNext() return false, we set the flag to inform other partitions m_sourceDepleted.Value = true; m_hasNoElementsLeft.Value = true; return false; } } catch { // On an exception, make sure that no additional items are hereafter enumerated m_sourceDepleted.Value = true; m_hasNoElementsLeft.Value = true; throw; } } } /// /// Version of GrabChunk that uses buffering scheme to grab items out of source enumerable /// /// /// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// internal bool GrabChunk_Buffered(KeyValuePair[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) { Debug.Assert(requestedChunkSize > 0); Debug.Assert(!m_useSingleChunking, "Did not expect to be in single-chunking mode"); TryCopyFromFillBuffer(destArray, requestedChunkSize, ref actualNumElementsGrabbed); if (actualNumElementsGrabbed == requestedChunkSize) { // that was easy. return true; } else if (m_sourceDepleted.Value) { // looks like we both reached the end of the fill buffer, and the source was depleted previously // this means no more work to do for any other worker m_hasNoElementsLeft.Value = true; m_FillBuffer = null; return (actualNumElementsGrabbed > 0); } // // now's the time to take the shared lock and enumerate // lock (m_sharedLock) { if (m_sourceDepleted.Value) { return (actualNumElementsGrabbed > 0); } try { // we need to make sure all array copiers are finished if (m_activeCopiers > 0) { SpinWait sw = new SpinWait(); while( m_activeCopiers > 0) sw.SpinOnce(); } Debug.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk // Now's the time to actually enumerate the source // We first fill up the requested # of elements in the caller's array // continue from the where TryCopyFromFillBuffer() left off for (; actualNumElementsGrabbed < requestedChunkSize; actualNumElementsGrabbed++) { if (m_sharedReader.MoveNext()) { m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); destArray[actualNumElementsGrabbed] = new KeyValuePair(m_sharedIndex.Value, m_sharedReader.Current); } else { //if MoveNext() return false, we set the flag to inform other partitions m_sourceDepleted.Value = true; break; } } // taking a local snapshot of m_FillBuffer in case some other thread decides to null out m_FillBuffer // in the entry of this method after observing m_sourceCompleted = true var localFillBufferRef = m_FillBuffer; // If the big buffer seems to be depleted, we will also fill that up while we are under the lock // Note that this is the only place that m_FillBufferCurrentPosition can be reset if (m_sourceDepleted.Value == false && localFillBufferRef != null && m_FillBufferCurrentPosition >= localFillBufferRef.Length) { for (int i = 0; i < localFillBufferRef.Length; i++) { if( m_sharedReader.MoveNext()) { m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); localFillBufferRef[i] = new KeyValuePair(m_sharedIndex.Value, m_sharedReader.Current); } else { // No more elements left in the enumerator. // Record this, so that the next request can skip the lock m_sourceDepleted.Value = true; // also record the current count in m_FillBufferSize m_FillBufferSize = i; // and exit the for loop so that we don't keep incrementing m_FillBufferSize break; } } m_FillBufferCurrentPosition = 0; } } catch { // If an exception occurs, don't let the other enumerators try to enumerate. // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected // and not helpful to the end user. We know the root cause is being communicated already.) m_sourceDepleted.Value = true; m_hasNoElementsLeft.Value = true; throw; } } return (actualNumElementsGrabbed > 0); } public void Dispose() { if (!m_disposed) { m_disposed = true; m_sharedReader.Dispose(); } } } /// /// Inherits from DynamicPartitionEnumerator_Abstract directly /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose /// private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract> { //---- fields ---- //cached local copy of the current chunk private KeyValuePair[] m_localList; //defer allocating to avoid false sharing // the values of the following two fields are passed in from // outside(already initialized) by the constructor, private readonly SharedBool m_hasNoElementsLeft; private readonly object m_sharedLock; private readonly SharedInt m_activePartitionCount; private InternalPartitionEnumerable m_enumerable; //constructor internal InternalPartitionEnumerator( IEnumerator sharedReader, SharedLong sharedIndex, SharedBool hasNoElementsLeft, object sharedLock, SharedInt activePartitionCount, InternalPartitionEnumerable enumerable, bool useSingleChunking) : base(sharedReader, sharedIndex, useSingleChunking) { m_hasNoElementsLeft = hasNoElementsLeft; m_sharedLock = sharedLock; m_enumerable = enumerable; m_activePartitionCount = activePartitionCount; if (m_activePartitionCount != null) { // If static partitioning, we need to increase the active partition count. Interlocked.Increment(ref m_activePartitionCount.Value); } } //overriding methods /// /// Reserves a contiguous range of elements from source data /// /// specified number of elements requested /// /// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// override protected bool GrabNextChunk(int requestedChunkSize) { Debug.Assert(requestedChunkSize > 0); if (HasNoElementsLeft) { return false; } // defer allocation to avoid false sharing if (m_localList == null) { m_localList = new KeyValuePair[m_maxChunkSize]; } // make the actual call to the enumerable that grabs a chunk return m_enumerable.GrabChunk(m_localList, requestedChunkSize, ref m_currentChunkSize.Value); } /// /// Returns whether or not the shared reader has already read the last /// element of the source data /// /// /// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared /// boolean value m_hasNoElementsLeft across all partitions /// override protected bool HasNoElementsLeft { get { return m_hasNoElementsLeft.Value; } set { //we only set it from false to true once //we should never set it back in any circumstances Debug.Assert(value); Debug.Assert(!m_hasNoElementsLeft.Value); m_hasNoElementsLeft.Value = true; } } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_currentChunkSize == null) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Debug.Assert(m_localList != null); Debug.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); return (m_localList[m_localOffset.Value]); } } override public void Dispose() { // If this is static partitioning, ie. m_activePartitionCount != null, since the current partition // is disposed, we decrement the number of active partitions for the shared reader. if (m_activePartitionCount != null && Interlocked.Decrement(ref m_activePartitionCount.Value) == 0) { // If the number of active partitions becomes 0, we need to dispose the shared // reader we created in the m_enumerable object. m_enumerable.Dispose(); } // If this is dynamic partitioning, ie. m_activePartitionCount != null, then m_enumerable needs to // be disposed explicitly by the user, and we do not need to anything here } } #endregion } #endregion #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>) /// /// Dynamic load-balance partitioner. This class is abstract and to be derived from by /// the customized partitioner classes for IList, Array, and IEnumerable /// /// Type of the elements in the source data /// Type of the source data collection private abstract class DynamicPartitionerForIndexRange_Abstract : OrderablePartitioner { // TCollection can be: IList, TSource[] and IEnumerable // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly TCollection m_data; /// /// Constructs a new orderable partitioner /// /// source data collection protected DynamicPartitionerForIndexRange_Abstract(TCollection data) : base(true, false, true) { m_data = data; } /// /// Partition the source data and create an enumerable over the resulting partitions. /// /// the source data collection /// an enumerable of partitions of protected abstract IEnumerable> GetOrderableDynamicPartitions_Factory(TCollection data); /// /// Overrides OrderablePartitioner.GetOrderablePartitions. /// Partitions the underlying collection into the given number of orderable partitions. /// /// number of partitions requested /// A list containing enumerators. override public IList>> GetOrderablePartitions(int partitionCount) { if (partitionCount <= 0) { throw new ArgumentOutOfRangeException(nameof(partitionCount)); } IEnumerator>[] partitions = new IEnumerator>[partitionCount]; IEnumerable> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data); for (int i = 0; i < partitionCount; i++) { partitions[i] = partitionEnumerable.GetEnumerator(); } return partitions; } /// /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions /// /// a enumerable collection of orderable partitions override public IEnumerable> GetOrderableDynamicPartitions() { return GetOrderableDynamicPartitions_Factory(m_data); } /// /// Whether additional partitions can be created dynamically. /// override public bool SupportsDynamicPartitions { get { return true; } } } /// /// Defines dynamic partition for source data of IList and Array. /// This class inherits DynamicPartitionEnumerator_Abstract /// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array /// - Current property still remains abstract, implementation is different for IList and Array /// - introduces another abstract method SourceCount, which returns the number of elements in /// the source data. Implementation differs for IList and Array /// /// Type of the elements in the data source /// Type of the reader on the source data private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract : DynamicPartitionEnumerator_Abstract { //fields protected int m_startIndex; //initially zero //constructor protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, SharedLong sharedIndex) : base(sharedReader, sharedIndex) { } //abstract methods //the Current property is still abstract, and will be implemented by derived classes //we add another abstract method SourceCount to get the number of elements from the source reader /// /// Get the number of elements from the source reader. /// It calls IList.Count or Array.Length /// protected abstract int SourceCount { get; } //overriding methods /// /// Reserves a contiguous range of elements from source data /// /// specified number of elements requested /// /// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// override protected bool GrabNextChunk(int requestedChunkSize) { Debug.Assert(requestedChunkSize > 0); while (!HasNoElementsLeft) { Debug.Assert(m_sharedIndex != null); // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture long oldSharedIndex = Volatile.Read(ref m_sharedIndex.Value); if (HasNoElementsLeft) { //HasNoElementsLeft situation changed from false to true immediately //and oldSharedIndex becomes stale return false; } //there won't be overflow, because the index of IList/array is int, and we //have casted it to long. long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize); //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex] //inclusive in the source collection if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex) == oldSharedIndex) { //set up local indexes. //m_currentChunkSize is always set to requestedChunkSize when source data had //enough elements of what we requested m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex); m_localOffset.Value = -1; m_startIndex = (int)(oldSharedIndex + 1); return true; } } //didn't get any element, return false; return false; } /// /// Returns whether or not the shared reader has already read the last /// element of the source data /// override protected bool HasNoElementsLeft { get { Debug.Assert(m_sharedIndex != null); // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture return Volatile.Read(ref m_sharedIndex.Value) >= SourceCount - 1; } set { Debug.Assert(false); } } /// /// For source data type IList and Array, the type of the shared reader is just the data itself. /// We don't do anything in Dispose method for IList and Array. /// override public void Dispose() { } } /// /// Inherits from DynamicPartitioners /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance /// of EnumerableOfPartitionsForIList defined internally /// /// Type of elements in the source data private class DynamicPartitionerForIList : DynamicPartitionerForIndexRange_Abstract> { //constructor internal DynamicPartitionerForIList(IList source) : base(source) { } //override methods override protected IEnumerable> GetOrderableDynamicPartitions_Factory(IList m_data) { //m_data itself serves as shared reader return new InternalPartitionEnumerable(m_data); } /// /// Inherits from PartitionList_Abstract /// Provides customized implementation for source data of IList /// private class InternalPartitionEnumerable : IEnumerable> { //reader through which we access the source data private readonly IList m_sharedReader; private SharedLong m_sharedIndex; internal InternalPartitionEnumerable(IList sharedReader) { m_sharedReader = sharedReader; m_sharedIndex = new SharedLong(-1); } public IEnumerator> GetEnumerator() { return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); } IEnumerator IEnumerable.GetEnumerator() { return ((InternalPartitionEnumerable)this).GetEnumerator(); } } /// /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract /// Provides customized implementation of SourceCount property and Current property for IList /// private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract> { //constructor internal InternalPartitionEnumerator(IList sharedReader, SharedLong sharedIndex) : base(sharedReader, sharedIndex) { } //overriding methods override protected int SourceCount { get { return m_sharedReader.Count; } } /// /// return a KeyValuePair of the current element and its key /// override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_currentChunkSize == null) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Debug.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); return new KeyValuePair(m_startIndex + m_localOffset.Value, m_sharedReader[m_startIndex + m_localOffset.Value]); } } } } /// /// Inherits from DynamicPartitioners /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance /// of EnumerableOfPartitionsForArray defined internally /// /// Type of elements in the source data private class DynamicPartitionerForArray : DynamicPartitionerForIndexRange_Abstract { //constructor internal DynamicPartitionerForArray(TSource[] source) : base(source) { } //override methods override protected IEnumerable> GetOrderableDynamicPartitions_Factory(TSource[] m_data) { return new InternalPartitionEnumerable(m_data); } /// /// Inherits from PartitionList_Abstract /// Provides customized implementation for source data of Array /// private class InternalPartitionEnumerable : IEnumerable> { //reader through which we access the source data private readonly TSource[] m_sharedReader; private SharedLong m_sharedIndex; internal InternalPartitionEnumerable(TSource[] sharedReader) { m_sharedReader = sharedReader; m_sharedIndex = new SharedLong(-1); } IEnumerator IEnumerable.GetEnumerator() { return ((InternalPartitionEnumerable)this).GetEnumerator(); } public IEnumerator> GetEnumerator() { return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); } } /// /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract /// Provides customized implementation of SourceCount property and Current property for Array /// private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract { //constructor internal InternalPartitionEnumerator(TSource[] sharedReader, SharedLong sharedIndex) : base(sharedReader, sharedIndex) { } //overriding methods override protected int SourceCount { get { return m_sharedReader.Length; } } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_currentChunkSize == null) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Debug.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); return new KeyValuePair(m_startIndex + m_localOffset.Value, m_sharedReader[m_startIndex + m_localOffset.Value]); } } } } #endregion #region Static partitioning for IList and Array, abstract classes /// /// Static partitioning over IList. /// - dynamic and load-balance /// - Keys are ordered within each partition /// - Keys are ordered across partitions /// - Keys are normalized /// - Number of partitions is fixed once specified, and the elements of the source data are /// distributed to each partition as evenly as possible. /// /// type of the elements /// Type of the source data collection private abstract class StaticIndexRangePartitioner : OrderablePartitioner { protected StaticIndexRangePartitioner() : base(true, true, true) { } /// /// Abstract method to return the number of elements in the source data /// protected abstract int SourceCount { get; } /// /// Abstract method to create a partition that covers a range over source data, /// starting from "startIndex", ending at "endIndex" /// /// start index of the current partition on the source data /// end index of the current partition on the source data /// a partition enumerator over the specified range // The partitioning algorithm is implemented in GetOrderablePartitions method // This method delegates according to source data type IList/Array protected abstract IEnumerator> CreatePartition(int startIndex, int endIndex); /// /// Overrides OrderablePartitioner.GetOrderablePartitions /// Return a list of partitions, each of which enumerate a fixed part of the source data /// The elements of the source data are distributed to each partition as evenly as possible. /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b, /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements, /// and the last x-b partitions each has a elements. /// For example, if N=10, x =3, then /// partition 0 ranges [0,3], /// partition 1 ranges [4,6], /// partition 2 ranges [7,9]. /// This also takes care of the situation of (x>N), the last x-N partitions are empty enumerators. /// An empty enumerator is indicated by /// (m_startIndex == list.Count && m_endIndex == list.Count -1) /// /// specified number of partitions /// a list of partitions override public IList>> GetOrderablePartitions(int partitionCount) { if (partitionCount <= 0) { throw new ArgumentOutOfRangeException(nameof(partitionCount)); } int quotient, remainder; quotient = Math.DivRem(SourceCount, partitionCount, out remainder); IEnumerator>[] partitions = new IEnumerator>[partitionCount]; int lastEndIndex = -1; for (int i = 0; i < partitionCount; i++) { int startIndex = lastEndIndex + 1; if (i < remainder) { lastEndIndex = startIndex + quotient; } else { lastEndIndex = startIndex + quotient - 1; } partitions[i] = CreatePartition(startIndex, lastEndIndex); } return partitions; } } /// /// Static Partition for IList/Array. /// This class implements all methods required by IEnumerator interface, except for the Current property. /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element /// retrieval. /// //We assume the source collection is not being updated concurrently. Otherwise it will break the //static partitioning, since each partition operates on the source collection directly, it does //not have a local cache of the elements assigned to them. private abstract class StaticIndexRangePartition : IEnumerator> { //the start and end position in the source collection for the current partition //the partition is empty if and only if // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1) protected readonly int m_startIndex; protected readonly int m_endIndex; //the current index of the current partition while enumerating on the source collection protected volatile int m_offset; /// /// Constructs an instance of StaticIndexRangePartition /// /// the start index in the source collection for the current partition /// the end index in the source collection for the current partition protected StaticIndexRangePartition(int startIndex, int endIndex) { m_startIndex = startIndex; m_endIndex = endIndex; m_offset = startIndex - 1; } /// /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster /// element retrieval. /// public abstract KeyValuePair Current { get; } /// /// We don't dispose the source for IList and array /// public void Dispose() { } public void Reset() { throw new NotSupportedException(); } /// /// Moves to the next item /// Before the first MoveNext is called: m_offset == m_startIndex-1; /// /// true if successful, false if there is no item left public bool MoveNext() { if (m_offset < m_endIndex) { m_offset++; return true; } else { //After we have enumerated over all elements, we set m_offset to m_endIndex +1. //The reason we do this is, for an empty enumerator, we need to tell the Current //property whether MoveNext has been called or not. //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex), //and we don't set a new value to m_offset, then the above condition will always be //true, and the Current property will mistakenly assume MoveNext is never called. m_offset = m_endIndex + 1; return false; } } Object IEnumerator.Current { get { return ((StaticIndexRangePartition)this).Current; } } } #endregion #region Static partitioning for IList /// /// Inherits from StaticIndexRangePartitioner /// Provides customized implementation of SourceCount and CreatePartition /// /// private class StaticIndexRangePartitionerForIList : StaticIndexRangePartitioner> { IList m_list; internal StaticIndexRangePartitionerForIList(IList list) : base() { Debug.Assert(list != null); m_list = list; } override protected int SourceCount { get { return m_list.Count; } } override protected IEnumerator> CreatePartition(int startIndex, int endIndex) { return new StaticIndexRangePartitionForIList(m_list, startIndex, endIndex); } } /// /// Inherits from StaticIndexRangePartition /// Provides customized implementation of Current property /// /// private class StaticIndexRangePartitionForIList : StaticIndexRangePartition { //the source collection shared by all partitions private volatile IList m_list; internal StaticIndexRangePartitionForIList(IList list, int startIndex, int endIndex) : base(startIndex, endIndex) { Debug.Assert(startIndex >= 0 && endIndex <= list.Count - 1); m_list = list; } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_offset < m_startIndex) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Debug.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); return (new KeyValuePair(m_offset, m_list[m_offset])); } } } #endregion #region static partitioning for Arrays /// /// Inherits from StaticIndexRangePartitioner /// Provides customized implementation of SourceCount and CreatePartition for Array /// private class StaticIndexRangePartitionerForArray : StaticIndexRangePartitioner { TSource[] m_array; internal StaticIndexRangePartitionerForArray(TSource[] array) : base() { Debug.Assert(array != null); m_array = array; } override protected int SourceCount { get { return m_array.Length; } } override protected IEnumerator> CreatePartition(int startIndex, int endIndex) { return new StaticIndexRangePartitionForArray(m_array, startIndex, endIndex); } } /// /// Inherits from StaticIndexRangePartitioner /// Provides customized implementation of SourceCount and CreatePartition /// private class StaticIndexRangePartitionForArray : StaticIndexRangePartition { //the source collection shared by all partitions private volatile TSource[] m_array; internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex) : base(startIndex, endIndex) { Debug.Assert(startIndex >= 0 && endIndex <= array.Length - 1); m_array = array; } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_offset < m_startIndex) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Debug.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); return (new KeyValuePair(m_offset, m_array[m_offset])); } } } #endregion #region Utility functions /// /// A very simple primitive that allows us to share a value across multiple threads. /// /// private class SharedInt { internal volatile int Value; internal SharedInt(int value) { this.Value = value; } } /// /// A very simple primitive that allows us to share a value across multiple threads. /// private class SharedBool { internal volatile bool Value; internal SharedBool(bool value) { this.Value = value; } } /// /// A very simple primitive that allows us to share a value across multiple threads. /// private class SharedLong { internal long Value; internal SharedLong(long value) { this.Value = value; } } //-------------------- // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling, // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling //-------------------- // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache // lines worth, assuming 128b cache line. Most (popular) architectures use 64b cache lines, // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient // for 128b cache systems. So 128b it is. private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4; private static int GetDefaultChunkSize() { int chunkSize; if (typeof(TSource).IsValueType) { chunkSize = 128; } else { Debug.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size"); chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size); } return chunkSize; } #endregion } }