summaryrefslogtreecommitdiff
path: root/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs')
-rw-r--r--src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs1733
1 files changed, 1733 insertions, 0 deletions
diff --git a/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs b/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs
new file mode 100644
index 0000000000..2169c6dee7
--- /dev/null
+++ b/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs
@@ -0,0 +1,1733 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+#pragma warning disable 0420
+
+// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
+//
+//
+//
+// A class of default partitioners for Partitioner<TSource>
+//
+// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+
+using System.Collections.Generic;
+using System.Security.Permissions;
+using System.Threading;
+using System.Diagnostics.Contracts;
+using System.Runtime.InteropServices;
+
+namespace System.Collections.Concurrent
+{
+ /// <summary>
+ /// Out-of-the-box partitioners are created with a set of default behaviors.
+ /// For example, by default, some form of buffering and chunking will be employed to achieve
+ /// optimal performance in the common scenario where an IEnumerable<> implementation is fast and
+ /// non-blocking. These behaviors can be overridden via this enumeration.
+ /// </summary>
+ [Flags]
+#if !FEATURE_CORECLR
+ [Serializable]
+#endif
+ public enum EnumerablePartitionerOptions
+ {
+ /// <summary>
+ /// Use the default behavior (i.e., use buffering to achieve optimal performance)
+ /// </summary>
+ None = 0x0,
+
+ /// <summary>
+ /// Creates a partitioner that will take items from the source enumerable one at a time
+ /// and will not use intermediate storage that can be accessed more efficiently by multiple threads.
+ /// This option provides support for low latency (items will be processed as soon as they are available from
+ /// the source) and partial support for dependencies between items (a thread cannot deadlock waiting for an item
+ /// that it, itself, is responsible for processing).
+ /// </summary>
+ NoBuffering = 0x1
+ }
+
+ // The static class Partitioners implements 3 default partitioning strategies:
+ // 1. dynamic load balance partitioning for indexable data source (IList and arrays)
+ // 2. static partitioning for indexable data source (IList and arrays)
+ // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order
+ // of elements, but enuemrators are not indexable
+ // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3.
+ // We assume that the source data of IList/Array is not changing concurrently.
+ // - data source of type IEnumerable can only be partitioned dynamically (load-balance)
+ // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the
+ // implementation is different for data source of IList/Array vs. IEnumerable:
+ // * When the source collection is IList/Arrays, we use Interlocked on the shared index;
+ // * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source
+ // enumerator.
+
+ /// <summary>
+ /// Provides common partitioning strategies for arrays, lists, and enumerables.
+ /// </summary>
+ /// <remarks>
+ /// <para>
+ /// The static methods on <see cref="Partitioner"/> are all thread-safe and may be used concurrently
+ /// from multiple threads. However, while a created partitioner is in use, the underlying data source
+ /// should not be modified, whether from the same thread that's using a partitioner or from a separate
+ /// thread.
+ /// </para>
+ /// </remarks>
+ [HostProtection(Synchronization = true, ExternalThreading = true)]
+ public static class Partitioner
+ {
+ /// <summary>
+ /// Creates an orderable partitioner from an <see cref="System.Collections.Generic.IList{T}"/>
+ /// instance.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in source list.</typeparam>
+ /// <param name="list">The list to be partitioned.</param>
+ /// <param name="loadBalance">
+ /// A Boolean value that indicates whether the created partitioner should dynamically
+ /// load balance between partitions rather than statically partition.
+ /// </param>
+ /// <returns>
+ /// An orderable partitioner based on the input list.
+ /// </returns>
+ public static OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool loadBalance)
+ {
+ if (list == null)
+ {
+ throw new ArgumentNullException("list");
+ }
+ if (loadBalance)
+ {
+ return (new DynamicPartitionerForIList<TSource>(list));
+ }
+ else
+ {
+ return (new StaticIndexRangePartitionerForIList<TSource>(list));
+ }
+ }
+
+ /// <summary>
+ /// Creates an orderable partitioner from a <see cref="System.Array"/> instance.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in source array.</typeparam>
+ /// <param name="array">The array to be partitioned.</param>
+ /// <param name="loadBalance">
+ /// A Boolean value that indicates whether the created partitioner should dynamically load balance
+ /// between partitions rather than statically partition.
+ /// </param>
+ /// <returns>
+ /// An orderable partitioner based on the input array.
+ /// </returns>
+ public static OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool loadBalance)
+ {
+ // This implementation uses 'ldelem' instructions for element retrieval, rather than using a
+ // method call.
+
+ if (array == null)
+ {
+ throw new ArgumentNullException("array");
+ }
+ if (loadBalance)
+ {
+ return (new DynamicPartitionerForArray<TSource>(array));
+ }
+ else
+ {
+ return (new StaticIndexRangePartitionerForArray<TSource>(array));
+ }
+ }
+
+ /// <summary>
+ /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam>
+ /// <param name="source">The enumerable to be partitioned.</param>
+ /// <returns>
+ /// An orderable partitioner based on the input array.
+ /// </returns>
+ /// <remarks>
+ /// The ordering used in the created partitioner is determined by the natural order of the elements
+ /// as retrieved from the source enumerable.
+ /// </remarks>
+ public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source)
+ {
+ return Create<TSource>(source, EnumerablePartitionerOptions.None);
+ }
+
+ /// <summary>
+ /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam>
+ /// <param name="source">The enumerable to be partitioned.</param>
+ /// <param name="partitionerOptions">Options to control the buffering behavior of the partitioner.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// The <paramref name="partitionerOptions"/> argument specifies an invalid value for <see
+ /// cref="T:System.Collections.Concurrent.EnumerablePartitionerOptions"/>.
+ /// </exception>
+ /// <returns>
+ /// An orderable partitioner based on the input array.
+ /// </returns>
+ /// <remarks>
+ /// The ordering used in the created partitioner is determined by the natural order of the elements
+ /// as retrieved from the source enumerable.
+ /// </remarks>
+ public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions)
+ {
+ if (source == null)
+ {
+ throw new ArgumentNullException("source");
+ }
+
+ if ((partitionerOptions & (~EnumerablePartitionerOptions.NoBuffering)) != 0)
+ throw new ArgumentOutOfRangeException("partitionerOptions");
+
+ return (new DynamicPartitionerForIEnumerable<TSource>(source, partitionerOptions));
+ }
+
+ /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
+ /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
+ /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
+ /// <returns>A partitioner.</returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
+ /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
+ public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive)
+ {
+ // How many chunks do we want to divide the range into? If this is 1, then the
+ // answer is "one chunk per core". Generally, though, you'll achieve better
+ // load balancing on a busy system if you make it higher than 1.
+ int coreOversubscriptionRate = 3;
+
+ if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
+ long rangeSize = (toExclusive - fromInclusive) /
+ (PlatformHelper.ProcessorCount * coreOversubscriptionRate);
+ if (rangeSize == 0) rangeSize = 1;
+ return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
+ }
+
+ /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
+ /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
+ /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
+ /// <param name="rangeSize">The size of each subrange.</param>
+ /// <returns>A partitioner.</returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
+ /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is
+ /// less than or equal to 0.</exception>
+ public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive, long rangeSize)
+ {
+ if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
+ if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
+ return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
+ }
+
+ // Private method to parcel out range tuples.
+ private static IEnumerable<Tuple<long, long>> CreateRanges(long fromInclusive, long toExclusive, long rangeSize)
+ {
+ // Enumerate all of the ranges
+ long from, to;
+ bool shouldQuit = false;
+
+ for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
+ {
+ from = i;
+ try { checked { to = i + rangeSize; } }
+ catch (OverflowException)
+ {
+ to = toExclusive;
+ shouldQuit = true;
+ }
+ if (to > toExclusive) to = toExclusive;
+ yield return new Tuple<long, long>(from, to);
+ }
+ }
+
+ /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
+ /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
+ /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
+ /// <returns>A partitioner.</returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
+ /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
+ public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
+ {
+ // How many chunks do we want to divide the range into? If this is 1, then the
+ // answer is "one chunk per core". Generally, though, you'll achieve better
+ // load balancing on a busy system if you make it higher than 1.
+ int coreOversubscriptionRate = 3;
+
+ if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
+ int rangeSize = (toExclusive - fromInclusive) /
+ (PlatformHelper.ProcessorCount * coreOversubscriptionRate);
+ if (rangeSize == 0) rangeSize = 1;
+ return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
+ }
+
+ /// <summary>Creates a partitioner that chunks the user-specified range.</summary>
+ /// <param name="fromInclusive">The lower, inclusive bound of the range.</param>
+ /// <param name="toExclusive">The upper, exclusive bound of the range.</param>
+ /// <param name="rangeSize">The size of each subrange.</param>
+ /// <returns>A partitioner.</returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is
+ /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is
+ /// less than or equal to 0.</exception>
+ public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize)
+ {
+ if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
+ if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
+ return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time
+ }
+
+ // Private method to parcel out range tuples.
+ private static IEnumerable<Tuple<int, int>> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
+ {
+ // Enumerate all of the ranges
+ int from, to;
+ bool shouldQuit = false;
+
+ for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
+ {
+ from = i;
+ try { checked { to = i + rangeSize; } }
+ catch (OverflowException)
+ {
+ to = toExclusive;
+ shouldQuit = true;
+ }
+ if (to > toExclusive) to = toExclusive;
+ yield return new Tuple<int, int>(from, to);
+ }
+ }
+
+ #region DynamicPartitionEnumerator_Abstract class
+ /// <summary>
+ /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance
+ /// partitioning algorithm.
+ /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source:
+ /// the key is the index in the source collection; the value is the item itself.
+ /// - a set of such partitions share a reader over data source. The type of the reader is specified by
+ /// TSourceReader.
+ /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk
+ /// size is initially 1, and doubles every time until it reaches the maximum chunk size.
+ /// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange
+ /// types (IList and the array), one for data source of IEnumerable.
+ /// - The method "Reset" is not supported for any partitioning algorithm.
+ /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it
+ /// in this abstract class.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in the data source</typeparam>
+ /// <typeparam name="TSourceReader">Type of the reader on the data source</typeparam>
+ //TSourceReader is
+ // - IList<TSource>, when source data is IList<TSource>, the shared reader is source data itself
+ // - TSource[], when source data is TSource[], the shared reader is source data itself
+ // - IEnumerator<TSource>, when source data is IEnumerable<TSource>, and the shared reader is an
+ // enumerator of the source data
+ private abstract class DynamicPartitionEnumerator_Abstract<TSource, TSourceReader> : IEnumerator<KeyValuePair<long, TSource>>
+ {
+ //----------------- common fields and constructor for all dynamic partitioners -----------------
+ //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator
+ protected readonly TSourceReader m_sharedReader;
+
+ protected static int s_defaultMaxChunkSize = GetDefaultChunkSize<TSource>();
+
+ //deferred allocating in MoveNext() with initial value 0, to avoid false sharing
+ //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator
+ protected SharedInt m_currentChunkSize;
+
+ //deferring allocation in MoveNext() with initial value -1, to avoid false sharing
+ protected SharedInt m_localOffset;
+
+ private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs
+ private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles
+ protected readonly int m_maxChunkSize; // s_defaultMaxChunkSize unless single-chunking is requested by the caller
+
+ // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable
+ // it serves as tracking of the natual order of elements in m_sharedReader
+ // the value of this field is passed in from outside (already initialized) by the constructor,
+ protected readonly SharedLong m_sharedIndex;
+
+ protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
+ : this(sharedReader, sharedIndex, false)
+ {
+ }
+
+ protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex, bool useSingleChunking)
+ {
+ m_sharedReader = sharedReader;
+ m_sharedIndex = sharedIndex;
+ m_maxChunkSize = useSingleChunking ? 1 : s_defaultMaxChunkSize;
+ }
+
+ // ---------------- abstract method declarations --------------
+
+ /// <summary>
+ /// Abstract method to request a contiguous chunk of elements from the source collection
+ /// </summary>
+ /// <param name="requestedChunkSize">specified number of elements requested</param>
+ /// <returns>
+ /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
+ /// false if all elements in the source collection have been reserved.
+ /// </returns>
+ //GrabNextChunk does the following:
+ // - grab # of requestedChunkSize elements from source data through shared reader,
+ // - at the time of function returns, m_currentChunkSize is updated with the number of
+ // elements actually got assigned (<=requestedChunkSize).
+ // - GrabNextChunk returns true if at least one element is assigned to this partition;
+ // false if the shared reader already hits the last element of the source data before
+ // we call GrabNextChunk
+ protected abstract bool GrabNextChunk(int requestedChunkSize);
+
+ /// <summary>
+ /// Abstract property, returns whether or not the shared reader has already read the last
+ /// element of the source data
+ /// </summary>
+ protected abstract bool HasNoElementsLeft { get; set; }
+
+ /// <summary>
+ /// Get the current element in the current partition. Property required by IEnumerator interface
+ /// This property is abstract because the implementation is different depending on the type
+ /// of the source data: IList, Array or IEnumerable
+ /// </summary>
+ public abstract KeyValuePair<long, TSource> Current { get; }
+
+ /// <summary>
+ /// Dispose is abstract, and depends on the type of the source data:
+ /// - For source data type IList and Array, the type of the shared reader is just the dataitself.
+ /// We don't do anything in Dispose method for IList and Array.
+ /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created.
+ /// Thus we need to dispose this shared reader enumerator, when there is no more active partitions
+ /// left.
+ /// </summary>
+ public abstract void Dispose();
+
+ /// <summary>
+ /// Reset on partitions is not supported
+ /// </summary>
+ public void Reset()
+ {
+ throw new NotSupportedException();
+ }
+
+
+ /// <summary>
+ /// Get the current element in the current partition. Property required by IEnumerator interface
+ /// </summary>
+ Object IEnumerator.Current
+ {
+ get
+ {
+ return ((DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>)this).Current;
+ }
+ }
+
+ /// <summary>
+ /// Moves to the next element if any.
+ /// Try current chunk first, if the current chunk do not have any elements left, then we
+ /// attempt to grab a chunk from the source collection.
+ /// </summary>
+ /// <returns>
+ /// true if successfully moving to the next position;
+ /// false otherwise, if and only if there is no more elements left in the current chunk
+ /// AND the source collection is exhausted.
+ /// </returns>
+ public bool MoveNext()
+ {
+ //perform deferred allocating of the local variables.
+ if (m_localOffset == null)
+ {
+ Contract.Assert(m_currentChunkSize == null);
+ m_localOffset = new SharedInt(-1);
+ m_currentChunkSize = new SharedInt(0);
+ m_doublingCountdown = CHUNK_DOUBLING_RATE;
+ }
+
+ if (m_localOffset.Value < m_currentChunkSize.Value - 1)
+ //attempt to grab the next element from the local chunk
+ {
+ m_localOffset.Value++;
+ return true;
+ }
+ else
+ //otherwise it means we exhausted the local chunk
+ //grab a new chunk from the source enumerator
+ {
+ // The second part of the || condition is necessary to handle the case when MoveNext() is called
+ // after a previous MoveNext call returned false.
+ Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1 || m_currentChunkSize.Value == 0);
+
+ //set the requested chunk size to a proper value
+ int requestedChunkSize;
+ if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator
+ {
+ requestedChunkSize = 1;
+ }
+ else if (m_doublingCountdown > 0)
+ {
+ requestedChunkSize = m_currentChunkSize.Value;
+ }
+ else
+ {
+ requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
+ m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset
+ }
+
+ // Decrement your doubling countdown
+ m_doublingCountdown--;
+
+ Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize);
+ //GrabNextChunk will update the value of m_currentChunkSize
+ if (GrabNextChunk(requestedChunkSize))
+ {
+ Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0);
+ m_localOffset.Value = 0;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+ #endregion
+
+ #region Dynamic Partitioner for source data of IEnuemrable<> type
+ /// <summary>
+ /// Inherits from DynamicPartitioners
+ /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
+ /// of EnumerableOfPartitionsForIEnumerator defined internally
+ /// </summary>
+ /// <typeparam name="TSource">Type of elements in the source data</typeparam>
+ private class DynamicPartitionerForIEnumerable<TSource> : OrderablePartitioner<TSource>
+ {
+ IEnumerable<TSource> m_source;
+ readonly bool m_useSingleChunking;
+
+ //constructor
+ internal DynamicPartitionerForIEnumerable(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions)
+ : base(true, false, true)
+ {
+ m_source = source;
+ m_useSingleChunking = ((partitionerOptions & EnumerablePartitionerOptions.NoBuffering) != 0);
+ }
+
+ /// <summary>
+ /// Overrides OrderablePartitioner.GetOrderablePartitions.
+ /// Partitions the underlying collection into the given number of orderable partitions.
+ /// </summary>
+ /// <param name="partitionCount">number of partitions requested</param>
+ /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns>
+ override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
+ {
+ if (partitionCount <= 0)
+ {
+ throw new ArgumentOutOfRangeException("partitionCount");
+ }
+ IEnumerator<KeyValuePair<long, TSource>>[] partitions
+ = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
+
+ IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, true);
+ for (int i = 0; i < partitionCount; i++)
+ {
+ partitions[i] = partitionEnumerable.GetEnumerator();
+ }
+ return partitions;
+ }
+
+ /// <summary>
+ /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
+ /// </summary>
+ /// <returns>a enumerable collection of orderable partitions</returns>
+ override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
+ {
+ return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, false);
+ }
+
+ /// <summary>
+ /// Whether additional partitions can be created dynamically.
+ /// </summary>
+ override public bool SupportsDynamicPartitions
+ {
+ get { return true; }
+ }
+
+ #region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator
+ /// <summary>
+ /// Provides customized implementation for source data of IEnumerable
+ /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields
+ /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a
+ /// shared count "m_activePartitionCount" used to track active partitions when they were created statically
+ /// </summary>
+ private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>, IDisposable
+ {
+ //reader through which we access the source data
+ private readonly IEnumerator<TSource> m_sharedReader;
+ private SharedLong m_sharedIndex;//initial value -1
+
+ private volatile KeyValuePair<long, TSource>[] m_FillBuffer; // intermediate buffer to reduce locking
+ private volatile int m_FillBufferSize; // actual number of elements in m_FillBuffer. Will start
+ // at m_FillBuffer.Length, and might be reduced during the last refill
+ private volatile int m_FillBufferCurrentPosition; //shared value to be accessed by Interlock.Increment only
+ private volatile int m_activeCopiers; //number of active copiers
+
+ //fields shared by all partitions that this Enumerable owns, their allocation is deferred
+ private SharedBool m_hasNoElementsLeft; // no elements left at all.
+ private SharedBool m_sourceDepleted; // no elements left in the enumerator, but there may be elements in the Fill Buffer
+
+ //shared synchronization lock, created by this Enumerable
+ private object m_sharedLock;//deferring allocation by enumerator
+
+ private bool m_disposed;
+
+ // If dynamic partitioning, then m_activePartitionCount == null
+ // If static partitioning, then it keeps track of active partition count
+ private SharedInt m_activePartitionCount;
+
+ // records whether or not the user has requested single-chunking behavior
+ private readonly bool m_useSingleChunking;
+
+ internal InternalPartitionEnumerable(IEnumerator<TSource> sharedReader, bool useSingleChunking, bool isStaticPartitioning)
+ {
+ m_sharedReader = sharedReader;
+ m_sharedIndex = new SharedLong(-1);
+ m_hasNoElementsLeft = new SharedBool(false);
+ m_sourceDepleted = new SharedBool(false);
+ m_sharedLock = new object();
+ m_useSingleChunking = useSingleChunking;
+
+ // Only allocate the fill-buffer if single-chunking is not in effect
+ if (!m_useSingleChunking)
+ {
+ // Time to allocate the fill buffer which is used to reduce the contention on the shared lock.
+ // First pick the buffer size multiplier. We use 4 for when there are more than 4 cores, and just 1 for below. This is based on empirical evidence.
+ int fillBufferMultiplier = (PlatformHelper.ProcessorCount > 4) ? 4 : 1;
+
+ // and allocate the fill buffer using these two numbers
+ m_FillBuffer = new KeyValuePair<long, TSource>[fillBufferMultiplier * Partitioner.GetDefaultChunkSize<TSource>()];
+ }
+
+ if (isStaticPartitioning)
+ {
+ // If this object is created for static partitioning (ie. via GetPartitions(int partitionCount),
+ // GetOrderablePartitions(int partitionCount)), we track the active partitions, in order to dispose
+ // this object when all the partitions have been disposed.
+ m_activePartitionCount = new SharedInt(0);
+ }
+ else
+ {
+ // Otherwise this object is created for dynamic partitioning (ie, via GetDynamicPartitions(),
+ // GetOrderableDynamicPartitions()), we do not need tracking. This object must be disposed
+ // explicitly
+ m_activePartitionCount = null;
+ }
+ }
+
+ public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
+ {
+ if (m_disposed)
+ {
+ throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
+ }
+ else
+ {
+ return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex,
+ m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_useSingleChunking);
+ }
+ }
+
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return ((InternalPartitionEnumerable)this).GetEnumerator();
+ }
+
+
+ ///////////////////
+ //
+ // Used by GrabChunk_Buffered()
+ private void TryCopyFromFillBuffer(KeyValuePair<long, TSource>[] destArray,
+ int requestedChunkSize,
+ ref int actualNumElementsGrabbed)
+ {
+ actualNumElementsGrabbed = 0;
+
+
+ // making a local defensive copy of the fill buffer reference, just in case it gets nulled out
+ KeyValuePair<long, TSource>[] fillBufferLocalRef = m_FillBuffer;
+ if (fillBufferLocalRef == null) return;
+
+ // first do a quick check, and give up if the current position is at the end
+ // so that we don't do an unncessary pair of Interlocked.Increment / Decrement calls
+ if (m_FillBufferCurrentPosition >= m_FillBufferSize)
+ {
+ return; // no elements in the buffer to copy from
+ }
+
+ // We might have a chance to grab elements from the buffer. We will know for sure
+ // when we do the Interlocked.Add below.
+ // But first we must register as a potential copier in order to make sure
+ // the elements we're copying from don't get overwritten by another thread
+ // that starts refilling the buffer right after our Interlocked.Add.
+ Interlocked.Increment(ref m_activeCopiers);
+
+ int endPos = Interlocked.Add(ref m_FillBufferCurrentPosition, requestedChunkSize);
+ int beginPos = endPos - requestedChunkSize;
+
+ if (beginPos < m_FillBufferSize)
+ {
+ // adjust index and do the actual copy
+ actualNumElementsGrabbed = (endPos < m_FillBufferSize) ? endPos : m_FillBufferSize - beginPos;
+ Array.Copy(fillBufferLocalRef, beginPos, destArray, 0, actualNumElementsGrabbed);
+ }
+
+ // let the record show we are no longer accessing the buffer
+ Interlocked.Decrement(ref m_activeCopiers);
+ }
+
+ /// <summary>
+ /// This is the common entry point for consuming items from the source enumerable
+ /// </summary>
+ /// <returns>
+ /// true if we successfully reserved at least one element
+ /// false if all elements in the source collection have been reserved.
+ /// </returns>
+ internal bool GrabChunk(KeyValuePair<long, TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
+ {
+ actualNumElementsGrabbed = 0;
+
+ if (m_hasNoElementsLeft.Value)
+ {
+ return false;
+ }
+
+ if (m_useSingleChunking)
+ {
+ return GrabChunk_Single(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
+ }
+ else
+ {
+ return GrabChunk_Buffered(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
+ }
+ }
+
+ /// <summary>
+ /// Version of GrabChunk that grabs a single element at a time from the source enumerable
+ /// </summary>
+ /// <returns>
+ /// true if we successfully reserved an element
+ /// false if all elements in the source collection have been reserved.
+ /// </returns>
+ internal bool GrabChunk_Single(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
+ {
+ Contract.Assert(m_useSingleChunking, "Expected m_useSingleChecking to be true");
+ Contract.Assert(requestedChunkSize == 1, "Got requested chunk size of " + requestedChunkSize + " when single-chunking was on");
+ Contract.Assert(actualNumElementsGrabbed == 0, "Expected actualNumElementsGrabbed == 0, instead it is " + actualNumElementsGrabbed);
+ Contract.Assert(destArray.Length == 1, "Expected destArray to be of length 1, instead its length is " + destArray.Length);
+
+ lock (m_sharedLock)
+ {
+ if (m_hasNoElementsLeft.Value) return false;
+
+ try
+ {
+ if (m_sharedReader.MoveNext())
+ {
+ m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
+ destArray[0]
+ = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
+ m_sharedReader.Current);
+ actualNumElementsGrabbed = 1;
+ return true;
+ }
+ else
+ {
+ //if MoveNext() return false, we set the flag to inform other partitions
+ m_sourceDepleted.Value = true;
+ m_hasNoElementsLeft.Value = true;
+ return false;
+ }
+ }
+ catch
+ {
+ // On an exception, make sure that no additional items are hereafter enumerated
+ m_sourceDepleted.Value = true;
+ m_hasNoElementsLeft.Value = true;
+ throw;
+ }
+ }
+ }
+
+
+
+ /// <summary>
+ /// Version of GrabChunk that uses buffering scheme to grab items out of source enumerable
+ /// </summary>
+ /// <returns>
+ /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
+ /// false if all elements in the source collection have been reserved.
+ /// </returns>
+ internal bool GrabChunk_Buffered(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed)
+ {
+ Contract.Assert(requestedChunkSize > 0);
+ Contract.Assert(!m_useSingleChunking, "Did not expect to be in single-chunking mode");
+
+ TryCopyFromFillBuffer(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
+
+ if (actualNumElementsGrabbed == requestedChunkSize)
+ {
+ // that was easy.
+ return true;
+ }
+ else if (m_sourceDepleted.Value)
+ {
+ // looks like we both reached the end of the fill buffer, and the source was depleted previously
+ // this means no more work to do for any other worker
+ m_hasNoElementsLeft.Value = true;
+ m_FillBuffer = null;
+ return (actualNumElementsGrabbed > 0);
+ }
+
+
+ //
+ // now's the time to take the shared lock and enumerate
+ //
+ lock (m_sharedLock)
+ {
+ if (m_sourceDepleted.Value)
+ {
+ return (actualNumElementsGrabbed > 0);
+ }
+
+ try
+ {
+ // we need to make sure all array copiers are finished
+ if (m_activeCopiers > 0)
+ {
+ SpinWait sw = new SpinWait();
+ while( m_activeCopiers > 0) sw.SpinOnce();
+ }
+
+ Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk
+
+ // Now's the time to actually enumerate the source
+
+ // We first fill up the requested # of elements in the caller's array
+ // continue from the where TryCopyFromFillBuffer() left off
+ for (; actualNumElementsGrabbed < requestedChunkSize; actualNumElementsGrabbed++)
+ {
+ if (m_sharedReader.MoveNext())
+ {
+ m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
+ destArray[actualNumElementsGrabbed]
+ = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
+ m_sharedReader.Current);
+ }
+ else
+ {
+ //if MoveNext() return false, we set the flag to inform other partitions
+ m_sourceDepleted.Value = true;
+ break;
+ }
+ }
+
+ // taking a local snapshot of m_FillBuffer in case some other thread decides to null out m_FillBuffer
+ // in the entry of this method after observing m_sourceCompleted = true
+ var localFillBufferRef = m_FillBuffer;
+
+ // If the big buffer seems to be depleted, we will also fill that up while we are under the lock
+ // Note that this is the only place that m_FillBufferCurrentPosition can be reset
+ if (m_sourceDepleted.Value == false && localFillBufferRef != null &&
+ m_FillBufferCurrentPosition >= localFillBufferRef.Length)
+ {
+ for (int i = 0; i < localFillBufferRef.Length; i++)
+ {
+ if( m_sharedReader.MoveNext())
+ {
+ m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
+ localFillBufferRef[i]
+ = new KeyValuePair<long, TSource>(m_sharedIndex.Value,
+ m_sharedReader.Current);
+ }
+ else
+ {
+ // No more elements left in the enumerator.
+ // Record this, so that the next request can skip the lock
+ m_sourceDepleted.Value = true;
+
+ // also record the current count in m_FillBufferSize
+ m_FillBufferSize = i;
+
+ // and exit the for loop so that we don't keep incrementing m_FillBufferSize
+ break;
+ }
+
+ }
+
+ m_FillBufferCurrentPosition = 0;
+ }
+
+
+ }
+ catch
+ {
+ // If an exception occurs, don't let the other enumerators try to enumerate.
+ // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected
+ // and not helpful to the end user. We know the root cause is being communicated already.)
+ m_sourceDepleted.Value = true;
+ m_hasNoElementsLeft.Value = true;
+ throw;
+ }
+ }
+
+ return (actualNumElementsGrabbed > 0);
+ }
+
+ public void Dispose()
+ {
+ if (!m_disposed)
+ {
+ m_disposed = true;
+ m_sharedReader.Dispose();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Inherits from DynamicPartitionEnumerator_Abstract directly
+ /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose
+ /// </summary>
+ private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract<TSource, IEnumerator<TSource>>
+ {
+ //---- fields ----
+ //cached local copy of the current chunk
+ private KeyValuePair<long, TSource>[] m_localList; //defer allocating to avoid false sharing
+
+ // the values of the following two fields are passed in from
+ // outside(already initialized) by the constructor,
+ private readonly SharedBool m_hasNoElementsLeft;
+ private readonly object m_sharedLock;
+ private readonly SharedInt m_activePartitionCount;
+ private InternalPartitionEnumerable m_enumerable;
+
+ //constructor
+ internal InternalPartitionEnumerator(
+ IEnumerator<TSource> sharedReader,
+ SharedLong sharedIndex,
+ SharedBool hasNoElementsLeft,
+ object sharedLock,
+ SharedInt activePartitionCount,
+ InternalPartitionEnumerable enumerable,
+ bool useSingleChunking)
+ : base(sharedReader, sharedIndex, useSingleChunking)
+ {
+ m_hasNoElementsLeft = hasNoElementsLeft;
+ m_sharedLock = sharedLock;
+ m_enumerable = enumerable;
+ m_activePartitionCount = activePartitionCount;
+
+ if (m_activePartitionCount != null)
+ {
+ // If static partitioning, we need to increase the active partition count.
+ Interlocked.Increment(ref m_activePartitionCount.Value);
+ }
+ }
+
+ //overriding methods
+
+ /// <summary>
+ /// Reserves a contiguous range of elements from source data
+ /// </summary>
+ /// <param name="requestedChunkSize">specified number of elements requested</param>
+ /// <returns>
+ /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
+ /// false if all elements in the source collection have been reserved.
+ /// </returns>
+ override protected bool GrabNextChunk(int requestedChunkSize)
+ {
+ Contract.Assert(requestedChunkSize > 0);
+
+ if (HasNoElementsLeft)
+ {
+ return false;
+ }
+
+ // defer allocation to avoid false sharing
+ if (m_localList == null)
+ {
+ m_localList = new KeyValuePair<long, TSource>[m_maxChunkSize];
+ }
+
+ // make the actual call to the enumerable that grabs a chunk
+ return m_enumerable.GrabChunk(m_localList, requestedChunkSize, ref m_currentChunkSize.Value);
+ }
+
+ /// <summary>
+ /// Returns whether or not the shared reader has already read the last
+ /// element of the source data
+ /// </summary>
+ /// <remarks>
+ /// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element
+ /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared
+ /// boolean value m_hasNoElementsLeft across all partitions
+ /// </remarks>
+ override protected bool HasNoElementsLeft
+ {
+ get { return m_hasNoElementsLeft.Value; }
+ set
+ {
+ //we only set it from false to true once
+ //we should never set it back in any circumstances
+ Contract.Assert(value);
+ Contract.Assert(!m_hasNoElementsLeft.Value);
+ m_hasNoElementsLeft.Value = true;
+ }
+ }
+
+ override public KeyValuePair<long, TSource> Current
+ {
+ get
+ {
+ //verify that MoveNext is at least called once before Current is called
+ if (m_currentChunkSize == null)
+ {
+ throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
+ }
+ Contract.Assert(m_localList != null);
+ Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
+ return (m_localList[m_localOffset.Value]);
+ }
+ }
+
+ override public void Dispose()
+ {
+ // If this is static partitioning, ie. m_activePartitionCount != null, since the current partition
+ // is disposed, we decrement the number of active partitions for the shared reader.
+ if (m_activePartitionCount != null && Interlocked.Decrement(ref m_activePartitionCount.Value) == 0)
+ {
+ // If the number of active partitions becomes 0, we need to dispose the shared
+ // reader we created in the m_enumerable object.
+ m_enumerable.Dispose();
+ }
+ // If this is dynamic partitioning, ie. m_activePartitionCount != null, then m_enumerable needs to
+ // be disposed explicitly by the user, and we do not need to anything here
+ }
+ }
+ #endregion
+
+ }
+ #endregion
+
+ #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>)
+ /// <summary>
+ /// Dynamic load-balance partitioner. This class is abstract and to be derived from by
+ /// the customized partitioner classes for IList, Array, and IEnumerable
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in the source data</typeparam>
+ /// <typeparam name="TCollection"> Type of the source data collection</typeparam>
+ private abstract class DynamicPartitionerForIndexRange_Abstract<TSource, TCollection> : OrderablePartitioner<TSource>
+ {
+ // TCollection can be: IList<TSource>, TSource[] and IEnumerable<TSource>
+ // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly
+ TCollection m_data;
+
+ /// <summary>
+ /// Constructs a new orderable partitioner
+ /// </summary>
+ /// <param name="data">source data collection</param>
+ protected DynamicPartitionerForIndexRange_Abstract(TCollection data)
+ : base(true, false, true)
+ {
+ m_data = data;
+ }
+
+ /// <summary>
+ /// Partition the source data and create an enumerable over the resulting partitions.
+ /// </summary>
+ /// <param name="data">the source data collection</param>
+ /// <returns>an enumerable of partitions of </returns>
+ protected abstract IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TCollection data);
+
+ /// <summary>
+ /// Overrides OrderablePartitioner.GetOrderablePartitions.
+ /// Partitions the underlying collection into the given number of orderable partitions.
+ /// </summary>
+ /// <param name="partitionCount">number of partitions requested</param>
+ /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns>
+ override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
+ {
+ if (partitionCount <= 0)
+ {
+ throw new ArgumentOutOfRangeException("partitionCount");
+ }
+ IEnumerator<KeyValuePair<long, TSource>>[] partitions
+ = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
+ IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data);
+ for (int i = 0; i < partitionCount; i++)
+ {
+ partitions[i] = partitionEnumerable.GetEnumerator();
+ }
+ return partitions;
+ }
+
+ /// <summary>
+ /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
+ /// </summary>
+ /// <returns>a enumerable collection of orderable partitions</returns>
+ override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
+ {
+ return GetOrderableDynamicPartitions_Factory(m_data);
+ }
+
+ /// <summary>
+ /// Whether additional partitions can be created dynamically.
+ /// </summary>
+ override public bool SupportsDynamicPartitions
+ {
+ get { return true; }
+ }
+
+ }
+
+ /// <summary>
+ /// Defines dynamic partition for source data of IList and Array.
+ /// This class inherits DynamicPartitionEnumerator_Abstract
+ /// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array
+ /// - Current property still remains abstract, implementation is different for IList and Array
+ /// - introduces another abstract method SourceCount, which returns the number of elements in
+ /// the source data. Implementation differs for IList and Array
+ /// </summary>
+ /// <typeparam name="TSource">Type of the elements in the data source</typeparam>
+ /// <typeparam name="TSourceReader">Type of the reader on the source data</typeparam>
+ private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSourceReader> : DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>
+ {
+ //fields
+ protected int m_startIndex; //initially zero
+
+ //constructor
+ protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
+ : base(sharedReader, sharedIndex)
+ {
+ }
+
+ //abstract methods
+ //the Current property is still abstract, and will be implemented by derived classes
+ //we add another abstract method SourceCount to get the number of elements from the source reader
+
+ /// <summary>
+ /// Get the number of elements from the source reader.
+ /// It calls IList.Count or Array.Length
+ /// </summary>
+ protected abstract int SourceCount { get; }
+
+ //overriding methods
+
+ /// <summary>
+ /// Reserves a contiguous range of elements from source data
+ /// </summary>
+ /// <param name="requestedChunkSize">specified number of elements requested</param>
+ /// <returns>
+ /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
+ /// false if all elements in the source collection have been reserved.
+ /// </returns>
+ override protected bool GrabNextChunk(int requestedChunkSize)
+ {
+ Contract.Assert(requestedChunkSize > 0);
+
+ while (!HasNoElementsLeft)
+ {
+ Contract.Assert(m_sharedIndex != null);
+ // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture
+ long oldSharedIndex = Volatile.Read(ref m_sharedIndex.Value);
+
+ if (HasNoElementsLeft)
+ {
+ //HasNoElementsLeft situation changed from false to true immediately
+ //and oldSharedIndex becomes stale
+ return false;
+ }
+
+ //there won't be overflow, because the index of IList/array is int, and we
+ //have casted it to long.
+ long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize);
+
+
+ //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex]
+ //inclusive in the source collection
+ if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex)
+ == oldSharedIndex)
+ {
+ //set up local indexes.
+ //m_currentChunkSize is always set to requestedChunkSize when source data had
+ //enough elements of what we requested
+ m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex);
+ m_localOffset.Value = -1;
+ m_startIndex = (int)(oldSharedIndex + 1);
+ return true;
+ }
+ }
+ //didn't get any element, return false;
+ return false;
+ }
+
+ /// <summary>
+ /// Returns whether or not the shared reader has already read the last
+ /// element of the source data
+ /// </summary>
+ override protected bool HasNoElementsLeft
+ {
+ get
+ {
+ Contract.Assert(m_sharedIndex != null);
+ // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture
+ return Volatile.Read(ref m_sharedIndex.Value) >= SourceCount - 1;
+ }
+ set
+ {
+ Contract.Assert(false);
+ }
+ }
+
+ /// <summary>
+ /// For source data type IList and Array, the type of the shared reader is just the data itself.
+ /// We don't do anything in Dispose method for IList and Array.
+ /// </summary>
+ override public void Dispose()
+ { }
+ }
+
+
+ /// <summary>
+ /// Inherits from DynamicPartitioners
+ /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
+ /// of EnumerableOfPartitionsForIList defined internally
+ /// </summary>
+ /// <typeparam name="TSource">Type of elements in the source data</typeparam>
+ private class DynamicPartitionerForIList<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, IList<TSource>>
+ {
+ //constructor
+ internal DynamicPartitionerForIList(IList<TSource> source)
+ : base(source)
+ { }
+
+ //override methods
+ override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(IList<TSource> m_data)
+ {
+ //m_data itself serves as shared reader
+ return new InternalPartitionEnumerable(m_data);
+ }
+
+ /// <summary>
+ /// Inherits from PartitionList_Abstract
+ /// Provides customized implementation for source data of IList
+ /// </summary>
+ private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>
+ {
+ //reader through which we access the source data
+ private readonly IList<TSource> m_sharedReader;
+ private SharedLong m_sharedIndex;
+
+ internal InternalPartitionEnumerable(IList<TSource> sharedReader)
+ {
+ m_sharedReader = sharedReader;
+ m_sharedIndex = new SharedLong(-1);
+ }
+
+ public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
+ {
+ return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return ((InternalPartitionEnumerable)this).GetEnumerator();
+ }
+ }
+
+ /// <summary>
+ /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
+ /// Provides customized implementation of SourceCount property and Current property for IList
+ /// </summary>
+ private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, IList<TSource>>
+ {
+ //constructor
+ internal InternalPartitionEnumerator(IList<TSource> sharedReader, SharedLong sharedIndex)
+ : base(sharedReader, sharedIndex)
+ { }
+
+ //overriding methods
+ override protected int SourceCount
+ {
+ get { return m_sharedReader.Count; }
+ }
+ /// <summary>
+ /// return a KeyValuePair of the current element and its key
+ /// </summary>
+ override public KeyValuePair<long, TSource> Current
+ {
+ get
+ {
+ //verify that MoveNext is at least called once before Current is called
+ if (m_currentChunkSize == null)
+ {
+ throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
+ }
+
+ Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
+ return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value,
+ m_sharedReader[m_startIndex + m_localOffset.Value]);
+ }
+ }
+ }
+ }
+
+
+
+ /// <summary>
+ /// Inherits from DynamicPartitioners
+ /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
+ /// of EnumerableOfPartitionsForArray defined internally
+ /// </summary>
+ /// <typeparam name="TSource">Type of elements in the source data</typeparam>
+ private class DynamicPartitionerForArray<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, TSource[]>
+ {
+ //constructor
+ internal DynamicPartitionerForArray(TSource[] source)
+ : base(source)
+ { }
+
+ //override methods
+ override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TSource[] m_data)
+ {
+ return new InternalPartitionEnumerable(m_data);
+ }
+
+ /// <summary>
+ /// Inherits from PartitionList_Abstract
+ /// Provides customized implementation for source data of Array
+ /// </summary>
+ private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>
+ {
+ //reader through which we access the source data
+ private readonly TSource[] m_sharedReader;
+ private SharedLong m_sharedIndex;
+
+ internal InternalPartitionEnumerable(TSource[] sharedReader)
+ {
+ m_sharedReader = sharedReader;
+ m_sharedIndex = new SharedLong(-1);
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return ((InternalPartitionEnumerable)this).GetEnumerator();
+ }
+
+
+ public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
+ {
+ return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
+ }
+ }
+
+ /// <summary>
+ /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
+ /// Provides customized implementation of SourceCount property and Current property for Array
+ /// </summary>
+ private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSource[]>
+ {
+ //constructor
+ internal InternalPartitionEnumerator(TSource[] sharedReader, SharedLong sharedIndex)
+ : base(sharedReader, sharedIndex)
+ { }
+
+ //overriding methods
+ override protected int SourceCount
+ {
+ get { return m_sharedReader.Length; }
+ }
+
+ override public KeyValuePair<long, TSource> Current
+ {
+ get
+ {
+ //verify that MoveNext is at least called once before Current is called
+ if (m_currentChunkSize == null)
+ {
+ throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
+ }
+
+ Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
+ return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value,
+ m_sharedReader[m_startIndex + m_localOffset.Value]);
+ }
+ }
+ }
+ }
+ #endregion
+
+
+ #region Static partitioning for IList and Array, abstract classes
+ /// <summary>
+ /// Static partitioning over IList.
+ /// - dynamic and load-balance
+ /// - Keys are ordered within each partition
+ /// - Keys are ordered across partitions
+ /// - Keys are normalized
+ /// - Number of partitions is fixed once specified, and the elements of the source data are
+ /// distributed to each partition as evenly as possible.
+ /// </summary>
+ /// <typeparam name="TSource">type of the elements</typeparam>
+ /// <typeparam name="TCollection">Type of the source data collection</typeparam>
+ private abstract class StaticIndexRangePartitioner<TSource, TCollection> : OrderablePartitioner<TSource>
+ {
+ protected StaticIndexRangePartitioner()
+ : base(true, true, true)
+ { }
+
+ /// <summary>
+ /// Abstract method to return the number of elements in the source data
+ /// </summary>
+ protected abstract int SourceCount { get; }
+
+ /// <summary>
+ /// Abstract method to create a partition that covers a range over source data,
+ /// starting from "startIndex", ending at "endIndex"
+ /// </summary>
+ /// <param name="startIndex">start index of the current partition on the source data</param>
+ /// <param name="endIndex">end index of the current partition on the source data</param>
+ /// <returns>a partition enumerator over the specified range</returns>
+ // The partitioning algorithm is implemented in GetOrderablePartitions method
+ // This method delegates according to source data type IList/Array
+ protected abstract IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex);
+
+ /// <summary>
+ /// Overrides OrderablePartitioner.GetOrderablePartitions
+ /// Return a list of partitions, each of which enumerate a fixed part of the source data
+ /// The elements of the source data are distributed to each partition as evenly as possible.
+ /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b,
+ /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements,
+ /// and the last x-b partitions each has a elements.
+ /// For example, if N=10, x =3, then
+ /// partition 0 ranges [0,3],
+ /// partition 1 ranges [4,6],
+ /// partition 2 ranges [7,9].
+ /// This also takes care of the situation of (x&gt;N), the last x-N partitions are empty enumerators.
+ /// An empty enumerator is indicated by
+ /// (m_startIndex == list.Count &amp;&amp; m_endIndex == list.Count -1)
+ /// </summary>
+ /// <param name="partitionCount">specified number of partitions</param>
+ /// <returns>a list of partitions</returns>
+ override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount)
+ {
+ if (partitionCount <= 0)
+ {
+ throw new ArgumentOutOfRangeException("partitionCount");
+ }
+
+ int quotient, remainder;
+ quotient = Math.DivRem(SourceCount, partitionCount, out remainder);
+
+ IEnumerator<KeyValuePair<long, TSource>>[] partitions = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount];
+ int lastEndIndex = -1;
+ for (int i = 0; i < partitionCount; i++)
+ {
+ int startIndex = lastEndIndex + 1;
+
+ if (i < remainder)
+ {
+ lastEndIndex = startIndex + quotient;
+ }
+ else
+ {
+ lastEndIndex = startIndex + quotient - 1;
+ }
+ partitions[i] = CreatePartition(startIndex, lastEndIndex);
+ }
+ return partitions;
+ }
+ }
+
+ /// <summary>
+ /// Static Partition for IList/Array.
+ /// This class implements all methods required by IEnumerator interface, except for the Current property.
+ /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element
+ /// retrieval.
+ /// </summary>
+ //We assume the source collection is not being updated concurrently. Otherwise it will break the
+ //static partitioning, since each partition operates on the source collection directly, it does
+ //not have a local cache of the elements assigned to them.
+ private abstract class StaticIndexRangePartition<TSource> : IEnumerator<KeyValuePair<long, TSource>>
+ {
+ //the start and end position in the source collection for the current partition
+ //the partition is empty if and only if
+ // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1)
+ protected readonly int m_startIndex;
+ protected readonly int m_endIndex;
+
+ //the current index of the current partition while enumerating on the source collection
+ protected volatile int m_offset;
+
+ /// <summary>
+ /// Constructs an instance of StaticIndexRangePartition
+ /// </summary>
+ /// <param name="startIndex">the start index in the source collection for the current partition </param>
+ /// <param name="endIndex">the end index in the source collection for the current partition</param>
+ protected StaticIndexRangePartition(int startIndex, int endIndex)
+ {
+ m_startIndex = startIndex;
+ m_endIndex = endIndex;
+ m_offset = startIndex - 1;
+ }
+
+ /// <summary>
+ /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster
+ /// element retrieval.
+ /// </summary>
+ public abstract KeyValuePair<long, TSource> Current { get; }
+
+ /// <summary>
+ /// We don't dispose the source for IList and array
+ /// </summary>
+ public void Dispose()
+ { }
+
+ public void Reset()
+ {
+ throw new NotSupportedException();
+ }
+
+ /// <summary>
+ /// Moves to the next item
+ /// Before the first MoveNext is called: m_offset == m_startIndex-1;
+ /// </summary>
+ /// <returns>true if successful, false if there is no item left</returns>
+ public bool MoveNext()
+ {
+ if (m_offset < m_endIndex)
+ {
+ m_offset++;
+ return true;
+ }
+ else
+ {
+ //After we have enumerated over all elements, we set m_offset to m_endIndex +1.
+ //The reason we do this is, for an empty enumerator, we need to tell the Current
+ //property whether MoveNext has been called or not.
+ //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex),
+ //and we don't set a new value to m_offset, then the above condition will always be
+ //true, and the Current property will mistakenly assume MoveNext is never called.
+ m_offset = m_endIndex + 1;
+ return false;
+ }
+ }
+
+ Object IEnumerator.Current
+ {
+ get
+ {
+ return ((StaticIndexRangePartition<TSource>)this).Current;
+ }
+ }
+ }
+ #endregion
+
+ #region Static partitioning for IList
+ /// <summary>
+ /// Inherits from StaticIndexRangePartitioner
+ /// Provides customized implementation of SourceCount and CreatePartition
+ /// </summary>
+ /// <typeparam name="TSource"></typeparam>
+ private class StaticIndexRangePartitionerForIList<TSource> : StaticIndexRangePartitioner<TSource, IList<TSource>>
+ {
+ IList<TSource> m_list;
+ internal StaticIndexRangePartitionerForIList(IList<TSource> list)
+ : base()
+ {
+ Contract.Assert(list != null);
+ m_list = list;
+ }
+ override protected int SourceCount
+ {
+ get { return m_list.Count; }
+ }
+ override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex)
+ {
+ return new StaticIndexRangePartitionForIList<TSource>(m_list, startIndex, endIndex);
+ }
+ }
+
+ /// <summary>
+ /// Inherits from StaticIndexRangePartition
+ /// Provides customized implementation of Current property
+ /// </summary>
+ /// <typeparam name="TSource"></typeparam>
+ private class StaticIndexRangePartitionForIList<TSource> : StaticIndexRangePartition<TSource>
+ {
+ //the source collection shared by all partitions
+ private volatile IList<TSource> m_list;
+
+ internal StaticIndexRangePartitionForIList(IList<TSource> list, int startIndex, int endIndex)
+ : base(startIndex, endIndex)
+ {
+ Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1);
+ m_list = list;
+ }
+
+ override public KeyValuePair<long, TSource> Current
+ {
+ get
+ {
+ //verify that MoveNext is at least called once before Current is called
+ if (m_offset < m_startIndex)
+ {
+ throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
+ }
+
+ Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
+ return (new KeyValuePair<long, TSource>(m_offset, m_list[m_offset]));
+ }
+ }
+ }
+ #endregion
+
+ #region static partitioning for Arrays
+ /// <summary>
+ /// Inherits from StaticIndexRangePartitioner
+ /// Provides customized implementation of SourceCount and CreatePartition for Array
+ /// </summary>
+ private class StaticIndexRangePartitionerForArray<TSource> : StaticIndexRangePartitioner<TSource, TSource[]>
+ {
+ TSource[] m_array;
+ internal StaticIndexRangePartitionerForArray(TSource[] array)
+ : base()
+ {
+ Contract.Assert(array != null);
+ m_array = array;
+ }
+ override protected int SourceCount
+ {
+ get { return m_array.Length; }
+ }
+ override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex)
+ {
+ return new StaticIndexRangePartitionForArray<TSource>(m_array, startIndex, endIndex);
+ }
+ }
+
+ /// <summary>
+ /// Inherits from StaticIndexRangePartitioner
+ /// Provides customized implementation of SourceCount and CreatePartition
+ /// </summary>
+ private class StaticIndexRangePartitionForArray<TSource> : StaticIndexRangePartition<TSource>
+ {
+ //the source collection shared by all partitions
+ private volatile TSource[] m_array;
+
+ internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex)
+ : base(startIndex, endIndex)
+ {
+ Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1);
+ m_array = array;
+ }
+
+ override public KeyValuePair<long, TSource> Current
+ {
+ get
+ {
+ //verify that MoveNext is at least called once before Current is called
+ if (m_offset < m_startIndex)
+ {
+ throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
+ }
+
+ Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
+ return (new KeyValuePair<long, TSource>(m_offset, m_array[m_offset]));
+ }
+ }
+ }
+ #endregion
+
+
+ #region Utility functions
+ /// <summary>
+ /// A very simple primitive that allows us to share a value across multiple threads.
+ /// </summary>
+ /// <typeparam name="TSource"></typeparam>
+ private class SharedInt
+ {
+ internal volatile int Value;
+
+ internal SharedInt(int value)
+ {
+ this.Value = value;
+ }
+
+ }
+
+ /// <summary>
+ /// A very simple primitive that allows us to share a value across multiple threads.
+ /// </summary>
+ private class SharedBool
+ {
+ internal volatile bool Value;
+
+ internal SharedBool(bool value)
+ {
+ this.Value = value;
+ }
+
+ }
+
+ /// <summary>
+ /// A very simple primitive that allows us to share a value across multiple threads.
+ /// </summary>
+ private class SharedLong
+ {
+ internal long Value;
+ internal SharedLong(long value)
+ {
+ this.Value = value;
+ }
+
+ }
+
+ //--------------------
+ // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling,
+ // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling
+ //--------------------
+
+ // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache
+ // lines worth, assuming 128b cache line. Most (popular) architectures use 64b cache lines,
+ // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient
+ // for 128b cache systems. So 128b it is.
+ private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4;
+
+ private static int GetDefaultChunkSize<TSource>()
+ {
+ int chunkSize;
+
+ if (typeof(TSource).IsValueType)
+ {
+#if !FEATURE_CORECLR // Marshal.SizeOf is not supported in CoreCLR
+
+ if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit)
+ {
+ chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource)));
+ }
+ else
+ {
+ // We choose '128' because this ensures, no matter the actual size of the value type,
+ // the total bytes used will be a multiple of 128. This ensures it's cache aligned.
+ chunkSize = 128;
+ }
+#else
+ chunkSize = 128;
+#endif
+ }
+ else
+ {
+ Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size");
+ chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size);
+ }
+ return chunkSize;
+ }
+ #endregion
+
+ }
+}