diff options
Diffstat (limited to 'src/mscorlib/src/System/Collections/Concurrent')
7 files changed, 6127 insertions, 0 deletions
diff --git a/src/mscorlib/src/System/Collections/Concurrent/ConcurrentDictionary.cs b/src/mscorlib/src/System/Collections/Concurrent/ConcurrentDictionary.cs new file mode 100644 index 0000000000..d805dc8be7 --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/ConcurrentDictionary.cs @@ -0,0 +1,2095 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// +/*============================================================ +** +** +** +** Purpose: A scalable dictionary for concurrent access +** +** +===========================================================*/ + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Contracts; +using System.Runtime.InteropServices; +using System.Runtime.Serialization; +using System.Text; +using System.Threading; +using System.Security; +using System.Security.Permissions; + +namespace System.Collections.Concurrent +{ + + /// <summary> + /// Represents a thread-safe collection of keys and values. + /// </summary> + /// <typeparam name="TKey">The type of the keys in the dictionary.</typeparam> + /// <typeparam name="TValue">The type of the values in the dictionary.</typeparam> + /// <remarks> + /// All public and protected members of <see cref="ConcurrentDictionary{TKey,TValue}"/> are thread-safe and may be used + /// concurrently from multiple threads. + /// </remarks> +#if !FEATURE_CORECLR + [Serializable] +#endif + [ComVisible(false)] + [DebuggerTypeProxy(typeof(Mscorlib_DictionaryDebugView<,>))] + [DebuggerDisplay("Count = {Count}")] + [HostProtection(Synchronization = true, ExternalThreading = true)] + public class ConcurrentDictionary<TKey, TValue> : IDictionary<TKey, TValue>, IDictionary, IReadOnlyDictionary<TKey, TValue> + { + /// <summary> + /// Tables that hold the internal state of the ConcurrentDictionary + /// + /// Wrapping the three tables in a single object allows us to atomically + /// replace all tables at once. + /// </summary> + private class Tables + { + internal readonly Node[] m_buckets; // A singly-linked list for each bucket. + internal readonly object[] m_locks; // A set of locks, each guarding a section of the table. + internal volatile int[] m_countPerLock; // The number of elements guarded by each lock. + internal readonly IEqualityComparer<TKey> m_comparer; // Key equality comparer + + internal Tables(Node[] buckets, object[] locks, int[] countPerLock, IEqualityComparer<TKey> comparer) + { + m_buckets = buckets; + m_locks = locks; + m_countPerLock = countPerLock; + m_comparer = comparer; + } + } +#if !FEATURE_CORECLR + [NonSerialized] +#endif + private volatile Tables m_tables; // Internal tables of the dictionary + // NOTE: this is only used for compat reasons to serialize the comparer. + // This should not be accessed from anywhere else outside of the serialization methods. + internal IEqualityComparer<TKey> m_comparer; +#if !FEATURE_CORECLR + [NonSerialized] +#endif + private readonly bool m_growLockArray; // Whether to dynamically increase the size of the striped lock + + // How many times we resized becaused of collisions. + // This is used to make sure we don't resize the dictionary because of multi-threaded Add() calls + // that generate collisions. Whenever a GrowTable() should be the only place that changes this +#if !FEATURE_CORECLR + // The field should be have been marked as NonSerialized but because we shipped it without that attribute in 4.5.1. + // we can't add it back without breaking compat. To maximize compat we are going to keep the OptionalField attribute + // This will prevent cases where the field was not serialized. + [OptionalField] +#endif + private int m_keyRehashCount; + +#if !FEATURE_CORECLR + [NonSerialized] +#endif + private int m_budget; // The maximum number of elements per lock before a resize operation is triggered + +#if !FEATURE_CORECLR // These fields are not used in CoreCLR + private KeyValuePair<TKey, TValue>[] m_serializationArray; // Used for custom serialization + + private int m_serializationConcurrencyLevel; // used to save the concurrency level in serialization + + private int m_serializationCapacity; // used to save the capacity in serialization +#endif + // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the + // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference + // and blocking, but also the more expensive operations that require all locks become (e.g. table + // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good + // compromise. + private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4; + + // The default capacity, i.e. the initial # of buckets. When choosing this value, we are making + // a trade-off between the size of a very small dictionary, and the number of resizes when + // constructing a large dictionary. Also, the capacity should not be divisible by a small prime. + private const int DEFAULT_CAPACITY = 31; + + // The maximum size of the striped lock that will not be exceeded when locks are automatically + // added as the dictionary grows. However, the user is allowed to exceed this limit by passing + // a concurrency level larger than MAX_LOCK_NUMBER into the constructor. + private const int MAX_LOCK_NUMBER = 1024; + + // Whether TValue is a type that can be written atomically (i.e., with no danger of torn reads) + private static readonly bool s_isValueWriteAtomic = IsValueWriteAtomic(); + + + /// <summary> + /// Determines whether type TValue can be written atomically + /// </summary> + private static bool IsValueWriteAtomic() + { + Type valueType = typeof(TValue); + + // + // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without + // the risk of tearing. + // + // See http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-335.pdf + // + bool isAtomic = + (valueType.IsClass) + || valueType == typeof(Boolean) + || valueType == typeof(Char) + || valueType == typeof(Byte) + || valueType == typeof(SByte) + || valueType == typeof(Int16) + || valueType == typeof(UInt16) + || valueType == typeof(Int32) + || valueType == typeof(UInt32) + || valueType == typeof(Single); + + if (!isAtomic && IntPtr.Size == 8) + { + isAtomic |= valueType == typeof(Double) || valueType == typeof(Int64); + } + + return isAtomic; + } + + /// <summary> + /// Initializes a new instance of the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that is empty, has the default concurrency level, has the default initial capacity, and + /// uses the default comparer for the key type. + /// </summary> + public ConcurrentDictionary() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, EqualityComparer<TKey>.Default) { } + + /// <summary> + /// Initializes a new instance of the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that is empty, has the specified concurrency level and capacity, and uses the default + /// comparer for the key type. + /// </summary> + /// <param name="concurrencyLevel">The estimated number of threads that will update the + /// <see cref="ConcurrentDictionary{TKey,TValue}"/> concurrently.</param> + /// <param name="capacity">The initial number of elements that the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/> + /// can contain.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="concurrencyLevel"/> is + /// less than 1.</exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> <paramref name="capacity"/> is less than + /// 0.</exception> + public ConcurrentDictionary(int concurrencyLevel, int capacity) : this(concurrencyLevel, capacity, false, EqualityComparer<TKey>.Default) { } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that contains elements copied from the specified <see + /// cref="T:System.Collections.IEnumerable{KeyValuePair{TKey,TValue}}"/>, has the default concurrency + /// level, has the default initial capacity, and uses the default comparer for the key type. + /// </summary> + /// <param name="collection">The <see + /// cref="T:System.Collections.IEnumerable{KeyValuePair{TKey,TValue}}"/> whose elements are copied to + /// the new + /// <see cref="ConcurrentDictionary{TKey,TValue}"/>.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="collection"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentException"><paramref name="collection"/> contains one or more + /// duplicate keys.</exception> + public ConcurrentDictionary(IEnumerable<KeyValuePair<TKey, TValue>> collection) : this(collection, EqualityComparer<TKey>.Default) { } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that is empty, has the specified concurrency level and capacity, and uses the specified + /// <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/>. + /// </summary> + /// <param name="comparer">The <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/> + /// implementation to use when comparing keys.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="comparer"/> is a null reference + /// (Nothing in Visual Basic).</exception> + public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that contains elements copied from the specified <see + /// cref="T:System.Collections.IEnumerable"/>, has the default concurrency level, has the default + /// initial capacity, and uses the specified + /// <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/>. + /// </summary> + /// <param name="collection">The <see + /// cref="T:System.Collections.IEnumerable{KeyValuePair{TKey,TValue}}"/> whose elements are copied to + /// the new + /// <see cref="ConcurrentDictionary{TKey,TValue}"/>.</param> + /// <param name="comparer">The <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/> + /// implementation to use when comparing keys.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="collection"/> is a null reference + /// (Nothing in Visual Basic). -or- + /// <paramref name="comparer"/> is a null reference (Nothing in Visual Basic). + /// </exception> + public ConcurrentDictionary(IEnumerable<KeyValuePair<TKey, TValue>> collection, IEqualityComparer<TKey> comparer) + : this(comparer) + { + if (collection == null) throw new ArgumentNullException("collection"); + + InitializeFromCollection(collection); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that contains elements copied from the specified <see cref="T:System.Collections.IEnumerable"/>, + /// has the specified concurrency level, has the specified initial capacity, and uses the specified + /// <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/>. + /// </summary> + /// <param name="concurrencyLevel">The estimated number of threads that will update the + /// <see cref="ConcurrentDictionary{TKey,TValue}"/> concurrently.</param> + /// <param name="collection">The <see cref="T:System.Collections.IEnumerable{KeyValuePair{TKey,TValue}}"/> whose elements are copied to the new + /// <see cref="ConcurrentDictionary{TKey,TValue}"/>.</param> + /// <param name="comparer">The <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/> implementation to use + /// when comparing keys.</param> + /// <exception cref="T:System.ArgumentNullException"> + /// <paramref name="collection"/> is a null reference (Nothing in Visual Basic). + /// -or- + /// <paramref name="comparer"/> is a null reference (Nothing in Visual Basic). + /// </exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="concurrencyLevel"/> is less than 1. + /// </exception> + /// <exception cref="T:System.ArgumentException"><paramref name="collection"/> contains one or more duplicate keys.</exception> + public ConcurrentDictionary( + int concurrencyLevel, IEnumerable<KeyValuePair<TKey, TValue>> collection, IEqualityComparer<TKey> comparer) + : this(concurrencyLevel, DEFAULT_CAPACITY, false, comparer) + { + if (collection == null) throw new ArgumentNullException("collection"); + if (comparer == null) throw new ArgumentNullException("comparer"); + + InitializeFromCollection(collection); + } + + private void InitializeFromCollection(IEnumerable<KeyValuePair<TKey, TValue>> collection) + { + TValue dummy; + foreach (KeyValuePair<TKey, TValue> pair in collection) + { + if (pair.Key == null) throw new ArgumentNullException("key"); + + if (!TryAddInternal(pair.Key, pair.Value, false, false, out dummy)) + { + throw new ArgumentException(GetResource("ConcurrentDictionary_SourceContainsDuplicateKeys")); + } + } + + if (m_budget == 0) + { + m_budget = m_tables.m_buckets.Length / m_tables.m_locks.Length; + } + + } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// class that is empty, has the specified concurrency level, has the specified initial capacity, and + /// uses the specified <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/>. + /// </summary> + /// <param name="concurrencyLevel">The estimated number of threads that will update the + /// <see cref="ConcurrentDictionary{TKey,TValue}"/> concurrently.</param> + /// <param name="capacity">The initial number of elements that the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/> + /// can contain.</param> + /// <param name="comparer">The <see cref="T:System.Collections.Generic.IEqualityComparer{TKey}"/> + /// implementation to use when comparing keys.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="concurrencyLevel"/> is less than 1. -or- + /// <paramref name="capacity"/> is less than 0. + /// </exception> + /// <exception cref="T:System.ArgumentNullException"><paramref name="comparer"/> is a null reference + /// (Nothing in Visual Basic).</exception> + public ConcurrentDictionary(int concurrencyLevel, int capacity, IEqualityComparer<TKey> comparer) + : this(concurrencyLevel, capacity, false, comparer) + { + } + + internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) + { + if (concurrencyLevel < 1) + { + throw new ArgumentOutOfRangeException("concurrencyLevel", GetResource("ConcurrentDictionary_ConcurrencyLevelMustBePositive")); + } + if (capacity < 0) + { + throw new ArgumentOutOfRangeException("capacity", GetResource("ConcurrentDictionary_CapacityMustNotBeNegative")); + } + if (comparer == null) throw new ArgumentNullException("comparer"); + + // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard + // any buckets. + if (capacity < concurrencyLevel) + { + capacity = concurrencyLevel; + } + + object[] locks = new object[concurrencyLevel]; + for (int i = 0; i < locks.Length; i++) + { + locks[i] = new object(); + } + + int[] countPerLock = new int[locks.Length]; + Node[] buckets = new Node[capacity]; + m_tables = new Tables(buckets, locks, countPerLock, comparer); + + m_growLockArray = growLockArray; + m_budget = buckets.Length / locks.Length; + } + + + /// <summary> + /// Attempts to add the specified key and value to the <see cref="ConcurrentDictionary{TKey, + /// TValue}"/>. + /// </summary> + /// <param name="key">The key of the element to add.</param> + /// <param name="value">The value of the element to add. The value can be a null reference (Nothing + /// in Visual Basic) for reference types.</param> + /// <returns>true if the key/value pair was added to the <see cref="ConcurrentDictionary{TKey, + /// TValue}"/> + /// successfully; otherwise, false.</returns> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The <see cref="ConcurrentDictionary{TKey, TValue}"/> + /// contains too many elements.</exception> + public bool TryAdd(TKey key, TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + TValue dummy; + return TryAddInternal(key, value, false, true, out dummy); + } + + /// <summary> + /// Determines whether the <see cref="ConcurrentDictionary{TKey, TValue}"/> contains the specified + /// key. + /// </summary> + /// <param name="key">The key to locate in the <see cref="ConcurrentDictionary{TKey, + /// TValue}"/>.</param> + /// <returns>true if the <see cref="ConcurrentDictionary{TKey, TValue}"/> contains an element with + /// the specified key; otherwise, false.</returns> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + public bool ContainsKey(TKey key) + { + if (key == null) throw new ArgumentNullException("key"); + + TValue throwAwayValue; + return TryGetValue(key, out throwAwayValue); + } + + /// <summary> + /// Attempts to remove and return the the value with the specified key from the + /// <see cref="ConcurrentDictionary{TKey, TValue}"/>. + /// </summary> + /// <param name="key">The key of the element to remove and return.</param> + /// <param name="value">When this method returns, <paramref name="value"/> contains the object removed from the + /// <see cref="ConcurrentDictionary{TKey,TValue}"/> or the default value of <typeparamref + /// name="TValue"/> + /// if the operation failed.</param> + /// <returns>true if an object was removed successfully; otherwise, false.</returns> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + public bool TryRemove(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + return TryRemoveInternal(key, out value, false, default(TValue)); + } + + /// <summary> + /// Removes the specified key from the dictionary if it exists and returns its associated value. + /// If matchValue flag is set, the key will be removed only if is associated with a particular + /// value. + /// </summary> + /// <param name="key">The key to search for and remove if it exists.</param> + /// <param name="value">The variable into which the removed value, if found, is stored.</param> + /// <param name="matchValue">Whether removal of the key is conditional on its value.</param> + /// <param name="oldValue">The conditional value to compare against if <paramref name="matchValue"/> is true</param> + /// <returns></returns> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) + { + while (true) + { + Tables tables = m_tables; + + IEqualityComparer<TKey> comparer = tables.m_comparer; + + int bucketNo, lockNo; + GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + lock (tables.m_locks[lockNo]) + { + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + Node prev = null; + for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next) + { + Assert((prev == null && curr == tables.m_buckets[bucketNo]) || prev.m_next == curr); + + if (comparer.Equals(curr.m_key, key)) + { + if (matchValue) + { + bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value); + if (!valuesMatch) + { + value = default(TValue); + return false; + } + } + + if (prev == null) + { + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], curr.m_next); + } + else + { + prev.m_next = curr.m_next; + } + + value = curr.m_value; + tables.m_countPerLock[lockNo]--; + return true; + } + prev = curr; + } + } + + value = default(TValue); + return false; + } + } + + /// <summary> + /// Attempts to get the value associated with the specified key from the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>. + /// </summary> + /// <param name="key">The key of the value to get.</param> + /// <param name="value">When this method returns, <paramref name="value"/> contains the object from + /// the + /// <see cref="ConcurrentDictionary{TKey,TValue}"/> with the specified key or the default value of + /// <typeparamref name="TValue"/>, if the operation failed.</param> + /// <returns>true if the key was found in the <see cref="ConcurrentDictionary{TKey,TValue}"/>; + /// otherwise, false.</returns> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + public bool TryGetValue(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + int bucketNo, lockNoUnused; + + // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize. + Tables tables = m_tables; + IEqualityComparer<TKey> comparer = tables.m_comparer; + GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNoUnused, tables.m_buckets.Length, tables.m_locks.Length); + + // We can get away w/out a lock here. + // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i]. + Node n = Volatile.Read<Node>(ref tables.m_buckets[bucketNo]); + + while (n != null) + { + if (comparer.Equals(n.m_key, key)) + { + value = n.m_value; + return true; + } + n = n.m_next; + } + + value = default(TValue); + return false; + } + + /// <summary> + /// Compares the existing value for the specified key with a specified value, and if they're equal, + /// updates the key with a third value. + /// </summary> + /// <param name="key">The key whose value is compared with <paramref name="comparisonValue"/> and + /// possibly replaced.</param> + /// <param name="newValue">The value that replaces the value of the element with <paramref + /// name="key"/> if the comparison results in equality.</param> + /// <param name="comparisonValue">The value that is compared to the value of the element with + /// <paramref name="key"/>.</param> + /// <returns>true if the value with <paramref name="key"/> was equal to <paramref + /// name="comparisonValue"/> and replaced with <paramref name="newValue"/>; otherwise, + /// false.</returns> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null + /// reference.</exception> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + public bool TryUpdate(TKey key, TValue newValue, TValue comparisonValue) + { + if (key == null) throw new ArgumentNullException("key"); + + IEqualityComparer<TValue> valueComparer = EqualityComparer<TValue>.Default; + + while (true) + { + int bucketNo; + int lockNo; + int hashcode; + + Tables tables = m_tables; + IEqualityComparer<TKey> comparer = tables.m_comparer; + + hashcode = comparer.GetHashCode(key); + GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + lock (tables.m_locks[lockNo]) + { + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + // Try to find this key in the bucket + Node prev = null; + for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) + { + Assert((prev == null && node == tables.m_buckets[bucketNo]) || prev.m_next == node); + if (comparer.Equals(node.m_key, key)) + { + if (valueComparer.Equals(node.m_value, comparisonValue)) + { + if (s_isValueWriteAtomic) + { + node.m_value = newValue; + } + else + { + Node newNode = new Node(node.m_key, newValue, hashcode, node.m_next); + + if (prev == null) + { + tables.m_buckets[bucketNo] = newNode; + } + else + { + prev.m_next = newNode; + } + } + + return true; + } + + return false; + } + + prev = node; + } + + //didn't find the key + return false; + } + } + } + + /// <summary> + /// Removes all keys and values from the <see cref="ConcurrentDictionary{TKey,TValue}"/>. + /// </summary> + public void Clear() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + + Tables newTables = new Tables(new Node[DEFAULT_CAPACITY], m_tables.m_locks, new int[m_tables.m_countPerLock.Length], m_tables.m_comparer); + m_tables = newTables; + m_budget = Math.Max(1, newTables.m_buckets.Length / newTables.m_locks.Length); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// Copies the elements of the <see cref="T:System.Collections.Generic.ICollection"/> to an array of + /// type <see cref="T:System.Collections.Generic.KeyValuePair{TKey,TValue}"/>, starting at the + /// specified array index. + /// </summary> + /// <param name="array">The one-dimensional array of type <see + /// cref="T:System.Collections.Generic.KeyValuePair{TKey,TValue}"/> + /// that is the destination of the <see + /// cref="T:System.Collections.Generic.KeyValuePair{TKey,TValue}"/> elements copied from the <see + /// cref="T:System.Collections.ICollection"/>. The array must have zero-based indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="array"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// 0.</exception> + /// <exception cref="T:System.ArgumentException"><paramref name="index"/> is equal to or greater than + /// the length of the <paramref name="array"/>. -or- The number of elements in the source <see + /// cref="T:System.Collections.ICollection"/> + /// is greater than the available space from <paramref name="index"/> to the end of the destination + /// <paramref name="array"/>.</exception> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + void ICollection<KeyValuePair<TKey, TValue>>.CopyTo(KeyValuePair<TKey, TValue>[] array, int index) + { + if (array == null) throw new ArgumentNullException("array"); + if (index < 0) throw new ArgumentOutOfRangeException("index", GetResource("ConcurrentDictionary_IndexIsNegative")); + + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + + int count = 0; + + for (int i = 0; i < m_tables.m_locks.Length && count >= 0; i++) + { + count += m_tables.m_countPerLock[i]; + } + + if (array.Length - count < index || count < 0) //"count" itself or "count + index" can overflow + { + throw new ArgumentException(GetResource("ConcurrentDictionary_ArrayNotLargeEnough")); + } + + CopyToPairs(array, index); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// Copies the key and value pairs stored in the <see cref="ConcurrentDictionary{TKey,TValue}"/> to a + /// new array. + /// </summary> + /// <returns>A new array containing a snapshot of key and value pairs copied from the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>.</returns> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + public KeyValuePair<TKey, TValue>[] ToArray() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + int count = 0; + checked + { + for (int i = 0; i < m_tables.m_locks.Length; i++) + { + count += m_tables.m_countPerLock[i]; + } + } + + KeyValuePair<TKey, TValue>[] array = new KeyValuePair<TKey, TValue>[count]; + + CopyToPairs(array, 0); + return array; + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// Copy dictionary contents to an array - shared implementation between ToArray and CopyTo. + /// + /// Important: the caller must hold all locks in m_locks before calling CopyToPairs. + /// </summary> + private void CopyToPairs(KeyValuePair<TKey, TValue>[] array, int index) + { + Node[] buckets = m_tables.m_buckets; + for (int i = 0; i < buckets.Length; i++) + { + for (Node current = buckets[i]; current != null; current = current.m_next) + { + array[index] = new KeyValuePair<TKey, TValue>(current.m_key, current.m_value); + index++; //this should never flow, CopyToPairs is only called when there's no overflow risk + } + } + } + + /// <summary> + /// Copy dictionary contents to an array - shared implementation between ToArray and CopyTo. + /// + /// Important: the caller must hold all locks in m_locks before calling CopyToEntries. + /// </summary> + private void CopyToEntries(DictionaryEntry[] array, int index) + { + Node[] buckets = m_tables.m_buckets; + for (int i = 0; i < buckets.Length; i++) + { + for (Node current = buckets[i]; current != null; current = current.m_next) + { + array[index] = new DictionaryEntry(current.m_key, current.m_value); + index++; //this should never flow, CopyToEntries is only called when there's no overflow risk + } + } + } + + /// <summary> + /// Copy dictionary contents to an array - shared implementation between ToArray and CopyTo. + /// + /// Important: the caller must hold all locks in m_locks before calling CopyToObjects. + /// </summary> + private void CopyToObjects(object[] array, int index) + { + Node[] buckets = m_tables.m_buckets; + for (int i = 0; i < buckets.Length; i++) + { + for (Node current = buckets[i]; current != null; current = current.m_next) + { + array[index] = new KeyValuePair<TKey, TValue>(current.m_key, current.m_value); + index++; //this should never flow, CopyToObjects is only called when there's no overflow risk + } + } + } + + /// <summary>Returns an enumerator that iterates through the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>.</summary> + /// <returns>An enumerator for the <see cref="ConcurrentDictionary{TKey,TValue}"/>.</returns> + /// <remarks> + /// The enumerator returned from the dictionary is safe to use concurrently with + /// reads and writes to the dictionary, however it does not represent a moment-in-time snapshot + /// of the dictionary. The contents exposed through the enumerator may contain modifications + /// made to the dictionary after <see cref="GetEnumerator"/> was called. + /// </remarks> + public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() + { + Node[] buckets = m_tables.m_buckets; + + for (int i = 0; i < buckets.Length; i++) + { + // The Volatile.Read ensures that the load of the fields of 'current' doesn't move before the load from buckets[i]. + Node current = Volatile.Read<Node>(ref buckets[i]); + + while (current != null) + { + yield return new KeyValuePair<TKey, TValue>(current.m_key, current.m_value); + current = current.m_next; + } + } + } + + /// <summary> + /// Shared internal implementation for inserts and updates. + /// If key exists, we always return false; and if updateIfExists == true we force update with value; + /// If key doesn't exist, we always add value and return true; + /// </summary> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) + { + while (true) + { + int bucketNo, lockNo; + int hashcode; + + Tables tables = m_tables; + IEqualityComparer<TKey> comparer = tables.m_comparer; + hashcode = comparer.GetHashCode(key); + GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + bool resizeDesired = false; + bool lockTaken = false; +#if FEATURE_RANDOMIZED_STRING_HASHING +#if !FEATURE_CORECLR + bool resizeDueToCollisions = false; +#endif // !FEATURE_CORECLR +#endif + + try + { + if (acquireLock) + Monitor.Enter(tables.m_locks[lockNo], ref lockTaken); + + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + +#if FEATURE_RANDOMIZED_STRING_HASHING +#if !FEATURE_CORECLR + int collisionCount = 0; +#endif // !FEATURE_CORECLR +#endif + + // Try to find this key in the bucket + Node prev = null; + for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) + { + Assert((prev == null && node == tables.m_buckets[bucketNo]) || prev.m_next == node); + if (comparer.Equals(node.m_key, key)) + { + // The key was found in the dictionary. If updates are allowed, update the value for that key. + // We need to create a new node for the update, in order to support TValue types that cannot + // be written atomically, since lock-free reads may be happening concurrently. + if (updateIfExists) + { + if (s_isValueWriteAtomic) + { + node.m_value = value; + } + else + { + Node newNode = new Node(node.m_key, value, hashcode, node.m_next); + if (prev == null) + { + tables.m_buckets[bucketNo] = newNode; + } + else + { + prev.m_next = newNode; + } + } + resultingValue = value; + } + else + { + resultingValue = node.m_value; + } + return false; + } + prev = node; + +#if FEATURE_RANDOMIZED_STRING_HASHING +#if !FEATURE_CORECLR + collisionCount++; +#endif // !FEATURE_CORECLR +#endif + } + +#if FEATURE_RANDOMIZED_STRING_HASHING +#if !FEATURE_CORECLR + if(collisionCount > HashHelpers.HashCollisionThreshold && HashHelpers.IsWellKnownEqualityComparer(comparer)) + { + resizeDesired = true; + resizeDueToCollisions = true; + } +#endif // !FEATURE_CORECLR +#endif + + // The key was not found in the bucket. Insert the key-value pair. + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo])); + checked + { + tables.m_countPerLock[lockNo]++; + } + + // + // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. + // It is also possible that GrowTable will increase the budget but won't resize the bucket table. + // That happens if the bucket table is found to be poorly utilized due to a bad hash function. + // + if (tables.m_countPerLock[lockNo] > m_budget) + { + resizeDesired = true; + } + } + finally + { + if (lockTaken) + Monitor.Exit(tables.m_locks[lockNo]); + } + + // + // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. + // + // Concurrency notes: + // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. + // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 + // and then verify that the table we passed to it as the argument is still the current table. + // + if (resizeDesired) + { +#if FEATURE_RANDOMIZED_STRING_HASHING +#if !FEATURE_CORECLR + if (resizeDueToCollisions) + { + GrowTable(tables, (IEqualityComparer<TKey>)HashHelpers.GetRandomizedEqualityComparer(comparer), true, m_keyRehashCount); + } + else +#endif // !FEATURE_CORECLR + { + GrowTable(tables, tables.m_comparer, false, m_keyRehashCount); + } +#else + GrowTable(tables, tables.m_comparer, false, m_keyRehashCount); +#endif + } + + resultingValue = value; + return true; + } + } + + /// <summary> + /// Gets or sets the value associated with the specified key. + /// </summary> + /// <param name="key">The key of the value to get or set.</param> + /// <value>The value associated with the specified key. If the specified key is not found, a get + /// operation throws a + /// <see cref="T:Sytem.Collections.Generic.KeyNotFoundException"/>, and a set operation creates a new + /// element with the specified key.</value> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.Collections.Generic.KeyNotFoundException">The property is retrieved and + /// <paramref name="key"/> + /// does not exist in the collection.</exception> + public TValue this[TKey key] + { + get + { + TValue value; + if (!TryGetValue(key, out value)) + { + throw new KeyNotFoundException(); + } + return value; + } + set + { + if (key == null) throw new ArgumentNullException("key"); + TValue dummy; + TryAddInternal(key, value, true, true, out dummy); + } + } + + /// <summary> + /// Gets the number of key/value pairs contained in the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>. + /// </summary> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <value>The number of key/value paris contained in the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>.</value> + /// <remarks>Count has snapshot semantics and represents the number of items in the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/> + /// at the moment when Count was accessed.</remarks> + public int Count + { + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + get + { + int count = 0; + + int acquiredLocks = 0; + try + { + // Acquire all locks + AcquireAllLocks(ref acquiredLocks); + + // Compute the count, we allow overflow + for (int i = 0; i < m_tables.m_countPerLock.Length; i++) + { + count += m_tables.m_countPerLock[i]; + } + + } + finally + { + // Release locks that have been acquired earlier + ReleaseLocks(0, acquiredLocks); + } + + return count; + } + } + + /// <summary> + /// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// if the key does not already exist. + /// </summary> + /// <param name="key">The key of the element to add.</param> + /// <param name="valueFactory">The function used to generate a value for the key</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentNullException"><paramref name="valueFactory"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <returns>The value for the key. This will be either the existing value for the key if the + /// key is already in the dictionary, or the new value for the key as returned by valueFactory + /// if the key was not in the dictionary.</returns> + public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory) + { + if (key == null) throw new ArgumentNullException("key"); + if (valueFactory == null) throw new ArgumentNullException("valueFactory"); + + TValue resultingValue; + if (TryGetValue(key, out resultingValue)) + { + return resultingValue; + } + TryAddInternal(key, valueFactory(key), false, true, out resultingValue); + return resultingValue; + } + + /// <summary> + /// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey,TValue}"/> + /// if the key does not already exist. + /// </summary> + /// <param name="key">The key of the element to add.</param> + /// <param name="value">the value to be added, if the key does not already exist</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <returns>The value for the key. This will be either the existing value for the key if the + /// key is already in the dictionary, or the new value if the key was not in the dictionary.</returns> + public TValue GetOrAdd(TKey key, TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + TValue resultingValue; + TryAddInternal(key, value, false, true, out resultingValue); + return resultingValue; + } + + /// <summary> + /// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey,TValue}"/> if the key does not already + /// exist, or updates a key/value pair in the <see cref="ConcurrentDictionary{TKey,TValue}"/> if the key + /// already exists. + /// </summary> + /// <param name="key">The key to be added or whose value should be updated</param> + /// <param name="addValueFactory">The function used to generate a value for an absent key</param> + /// <param name="updateValueFactory">The function used to generate a new value for an existing key + /// based on the key's existing value</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentNullException"><paramref name="addValueFactory"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentNullException"><paramref name="updateValueFactory"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <returns>The new value for the key. This will be either be the result of addValueFactory (if the key was + /// absent) or the result of updateValueFactory (if the key was present).</returns> + public TValue AddOrUpdate(TKey key, Func<TKey, TValue> addValueFactory, Func<TKey, TValue, TValue> updateValueFactory) + { + if (key == null) throw new ArgumentNullException("key"); + if (addValueFactory == null) throw new ArgumentNullException("addValueFactory"); + if (updateValueFactory == null) throw new ArgumentNullException("updateValueFactory"); + + TValue newValue, resultingValue; + while (true) + { + TValue oldValue; + if (TryGetValue(key, out oldValue)) + //key exists, try to update + { + newValue = updateValueFactory(key, oldValue); + if (TryUpdate(key, newValue, oldValue)) + { + return newValue; + } + } + else //try add + { + newValue = addValueFactory(key); + if (TryAddInternal(key, newValue, false, true, out resultingValue)) + { + return resultingValue; + } + } + } + } + + /// <summary> + /// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey,TValue}"/> if the key does not already + /// exist, or updates a key/value pair in the <see cref="ConcurrentDictionary{TKey,TValue}"/> if the key + /// already exists. + /// </summary> + /// <param name="key">The key to be added or whose value should be updated</param> + /// <param name="addValue">The value to be added for an absent key</param> + /// <param name="updateValueFactory">The function used to generate a new value for an existing key based on + /// the key's existing value</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentNullException"><paramref name="updateValueFactory"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <returns>The new value for the key. This will be either be the result of addValueFactory (if the key was + /// absent) or the result of updateValueFactory (if the key was present).</returns> + public TValue AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory) + { + if (key == null) throw new ArgumentNullException("key"); + if (updateValueFactory == null) throw new ArgumentNullException("updateValueFactory"); + TValue newValue, resultingValue; + while (true) + { + TValue oldValue; + if (TryGetValue(key, out oldValue)) + //key exists, try to update + { + newValue = updateValueFactory(key, oldValue); + if (TryUpdate(key, newValue, oldValue)) + { + return newValue; + } + } + else //try add + { + if (TryAddInternal(key, addValue, false, true, out resultingValue)) + { + return resultingValue; + } + } + } + } + + + + /// <summary> + /// Gets a value that indicates whether the <see cref="ConcurrentDictionary{TKey,TValue}"/> is empty. + /// </summary> + /// <value>true if the <see cref="ConcurrentDictionary{TKey,TValue}"/> is empty; otherwise, + /// false.</value> + public bool IsEmpty + { + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + get + { + int acquiredLocks = 0; + try + { + // Acquire all locks + AcquireAllLocks(ref acquiredLocks); + + for (int i = 0; i < m_tables.m_countPerLock.Length; i++) + { + if (m_tables.m_countPerLock[i] != 0) + { + return false; + } + } + } + finally + { + // Release locks that have been acquired earlier + ReleaseLocks(0, acquiredLocks); + } + + return true; + } + } + + #region IDictionary<TKey,TValue> members + + /// <summary> + /// Adds the specified key and value to the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>. + /// </summary> + /// <param name="key">The object to use as the key of the element to add.</param> + /// <param name="value">The object to use as the value of the element to add.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <exception cref="T:System.ArgumentException"> + /// An element with the same key already exists in the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>.</exception> + void IDictionary<TKey, TValue>.Add(TKey key, TValue value) + { + if (!TryAdd(key, value)) + { + throw new ArgumentException(GetResource("ConcurrentDictionary_KeyAlreadyExisted")); + } + } + + /// <summary> + /// Removes the element with the specified key from the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>. + /// </summary> + /// <param name="key">The key of the element to remove.</param> + /// <returns>true if the element is successfully remove; otherwise false. This method also returns + /// false if + /// <paramref name="key"/> was not found in the original <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>. + /// </returns> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + bool IDictionary<TKey, TValue>.Remove(TKey key) + { + TValue throwAwayValue; + return TryRemove(key, out throwAwayValue); + } + + /// <summary> + /// Gets a collection containing the keys in the <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>. + /// </summary> + /// <value>An <see cref="T:System.Collections.Generic.ICollection{TKey}"/> containing the keys in the + /// <see cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>.</value> + public ICollection<TKey> Keys + { + get { return GetKeys(); } + } + + /// <summary> + /// Gets an <see cref="T:System.Collections.Generic.IEnumerable{TKey}"/> containing the keys of + /// the <see cref="T:System.Collections.Generic.IReadOnlyDictionary{TKey,TValue}"/>. + /// </summary> + /// <value>An <see cref="T:System.Collections.Generic.IEnumerable{TKey}"/> containing the keys of + /// the <see cref="T:System.Collections.Generic.IReadOnlyDictionary{TKey,TValue}"/>.</value> + IEnumerable<TKey> IReadOnlyDictionary<TKey, TValue>.Keys + { + get { return GetKeys(); } + } + + /// <summary> + /// Gets a collection containing the values in the <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>. + /// </summary> + /// <value>An <see cref="T:System.Collections.Generic.ICollection{TValue}"/> containing the values in + /// the + /// <see cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>.</value> + public ICollection<TValue> Values + { + get { return GetValues(); } + } + + /// <summary> + /// Gets an <see cref="T:System.Collections.Generic.IEnumerable{TValue}"/> containing the values + /// in the <see cref="T:System.Collections.Generic.IReadOnlyDictionary{TKey,TValue}"/>. + /// </summary> + /// <value>An <see cref="T:System.Collections.Generic.IEnumerable{TValue}"/> containing the + /// values in the <see cref="T:System.Collections.Generic.IReadOnlyDictionary{TKey,TValue}"/>.</value> + IEnumerable<TValue> IReadOnlyDictionary<TKey, TValue>.Values + { + get { return GetValues(); } + } + #endregion + + #region ICollection<KeyValuePair<TKey,TValue>> Members + + /// <summary> + /// Adds the specified value to the <see cref="T:System.Collections.Generic.ICollection{TValue}"/> + /// with the specified key. + /// </summary> + /// <param name="keyValuePair">The <see cref="T:System.Collections.Generic.KeyValuePair{TKey,TValue}"/> + /// structure representing the key and value to add to the <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>.</param> + /// <exception cref="T:System.ArgumentNullException">The <paramref name="keyValuePair"/> of <paramref + /// name="keyValuePair"/> is null.</exception> + /// <exception cref="T:System.OverflowException">The <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/> + /// contains too many elements.</exception> + /// <exception cref="T:System.ArgumentException">An element with the same key already exists in the + /// <see cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/></exception> + void ICollection<KeyValuePair<TKey, TValue>>.Add(KeyValuePair<TKey, TValue> keyValuePair) + { + ((IDictionary<TKey, TValue>)this).Add(keyValuePair.Key, keyValuePair.Value); + } + + /// <summary> + /// Determines whether the <see cref="T:System.Collections.Generic.ICollection{TKey,TValue}"/> + /// contains a specific key and value. + /// </summary> + /// <param name="keyValuePair">The <see cref="T:System.Collections.Generic.KeyValuePair{TKey,TValue}"/> + /// structure to locate in the <see + /// cref="T:System.Collections.Generic.ICollection{TValue}"/>.</param> + /// <returns>true if the <paramref name="keyValuePair"/> is found in the <see + /// cref="T:System.Collections.Generic.ICollection{TKey,TValue}"/>; otherwise, false.</returns> + bool ICollection<KeyValuePair<TKey, TValue>>.Contains(KeyValuePair<TKey, TValue> keyValuePair) + { + TValue value; + if (!TryGetValue(keyValuePair.Key, out value)) + { + return false; + } + return EqualityComparer<TValue>.Default.Equals(value, keyValuePair.Value); + } + + /// <summary> + /// Gets a value indicating whether the dictionary is read-only. + /// </summary> + /// <value>true if the <see cref="T:System.Collections.Generic.ICollection{TKey,TValue}"/> is + /// read-only; otherwise, false. For <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>, this property always returns + /// false.</value> + bool ICollection<KeyValuePair<TKey, TValue>>.IsReadOnly + { + get { return false; } + } + + /// <summary> + /// Removes a key and value from the dictionary. + /// </summary> + /// <param name="keyValuePair">The <see + /// cref="T:System.Collections.Generic.KeyValuePair{TKey,TValue}"/> + /// structure representing the key and value to remove from the <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>.</param> + /// <returns>true if the key and value represented by <paramref name="keyValuePair"/> is successfully + /// found and removed; otherwise, false.</returns> + /// <exception cref="T:System.ArgumentNullException">The Key property of <paramref + /// name="keyValuePair"/> is a null reference (Nothing in Visual Basic).</exception> + bool ICollection<KeyValuePair<TKey, TValue>>.Remove(KeyValuePair<TKey, TValue> keyValuePair) + { + if (keyValuePair.Key == null) throw new ArgumentNullException(GetResource("ConcurrentDictionary_ItemKeyIsNull")); + + TValue throwAwayValue; + return TryRemoveInternal(keyValuePair.Key, out throwAwayValue, true, keyValuePair.Value); + } + + #endregion + + #region IEnumerable Members + + /// <summary>Returns an enumerator that iterates through the <see + /// cref="ConcurrentDictionary{TKey,TValue}"/>.</summary> + /// <returns>An enumerator for the <see cref="ConcurrentDictionary{TKey,TValue}"/>.</returns> + /// <remarks> + /// The enumerator returned from the dictionary is safe to use concurrently with + /// reads and writes to the dictionary, however it does not represent a moment-in-time snapshot + /// of the dictionary. The contents exposed through the enumerator may contain modifications + /// made to the dictionary after <see cref="GetEnumerator"/> was called. + /// </remarks> + IEnumerator IEnumerable.GetEnumerator() + { + return ((ConcurrentDictionary<TKey, TValue>)this).GetEnumerator(); + } + + #endregion + + #region IDictionary Members + + /// <summary> + /// Adds the specified key and value to the dictionary. + /// </summary> + /// <param name="key">The object to use as the key.</param> + /// <param name="value">The object to use as the value.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.OverflowException">The dictionary contains too many + /// elements.</exception> + /// <exception cref="T:System.ArgumentException"> + /// <paramref name="key"/> is of a type that is not assignable to the key type <typeparamref + /// name="TKey"/> of the <see cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>. -or- + /// <paramref name="value"/> is of a type that is not assignable to <typeparamref name="TValue"/>, + /// the type of values in the <see cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>. + /// -or- A value with the same key already exists in the <see + /// cref="T:System.Collections.Generic.Dictionary{TKey,TValue}"/>. + /// </exception> + void IDictionary.Add(object key, object value) + { + if (key == null) throw new ArgumentNullException("key"); + if (!(key is TKey)) throw new ArgumentException(GetResource("ConcurrentDictionary_TypeOfKeyIncorrect")); + + TValue typedValue; + try + { + typedValue = (TValue)value; + } + catch (InvalidCastException) + { + throw new ArgumentException(GetResource("ConcurrentDictionary_TypeOfValueIncorrect")); + } + + ((IDictionary<TKey, TValue>)this).Add((TKey)key, typedValue); + } + + /// <summary> + /// Gets whether the <see cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/> contains an + /// element with the specified key. + /// </summary> + /// <param name="key">The key to locate in the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>.</param> + /// <returns>true if the <see cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/> contains + /// an element with the specified key; otherwise, false.</returns> + /// <exception cref="T:System.ArgumentNullException"> <paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + bool IDictionary.Contains(object key) + { + if (key == null) throw new ArgumentNullException("key"); + + return (key is TKey) && ((ConcurrentDictionary<TKey, TValue>)this).ContainsKey((TKey)key); + } + + /// <summary>Provides an <see cref="T:System.Collections.Generics.IDictionaryEnumerator"/> for the + /// <see cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>.</summary> + /// <returns>An <see cref="T:System.Collections.Generics.IDictionaryEnumerator"/> for the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>.</returns> + IDictionaryEnumerator IDictionary.GetEnumerator() + { + return new DictionaryEnumerator(this); + } + + /// <summary> + /// Gets a value indicating whether the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/> has a fixed size. + /// </summary> + /// <value>true if the <see cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/> has a + /// fixed size; otherwise, false. For <see + /// cref="T:System.Collections.Generic.ConcurrentDictionary{TKey,TValue}"/>, this property always + /// returns false.</value> + bool IDictionary.IsFixedSize + { + get { return false; } + } + + /// <summary> + /// Gets a value indicating whether the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/> is read-only. + /// </summary> + /// <value>true if the <see cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/> is + /// read-only; otherwise, false. For <see + /// cref="T:System.Collections.Generic.ConcurrentDictionary{TKey,TValue}"/>, this property always + /// returns false.</value> + bool IDictionary.IsReadOnly + { + get { return false; } + } + + /// <summary> + /// Gets an <see cref="T:System.Collections.ICollection"/> containing the keys of the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>. + /// </summary> + /// <value>An <see cref="T:System.Collections.ICollection"/> containing the keys of the <see + /// cref="T:System.Collections.Generic.IDictionary{TKey,TValue}"/>.</value> + ICollection IDictionary.Keys + { + get { return GetKeys(); } + } + + /// <summary> + /// Removes the element with the specified key from the <see + /// cref="T:System.Collections.IDictionary"/>. + /// </summary> + /// <param name="key">The key of the element to remove.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + void IDictionary.Remove(object key) + { + if (key == null) throw new ArgumentNullException("key"); + + TValue throwAwayValue; + if (key is TKey) + { + this.TryRemove((TKey)key, out throwAwayValue); + } + } + + /// <summary> + /// Gets an <see cref="T:System.Collections.ICollection"/> containing the values in the <see + /// cref="T:System.Collections.IDictionary"/>. + /// </summary> + /// <value>An <see cref="T:System.Collections.ICollection"/> containing the values in the <see + /// cref="T:System.Collections.IDictionary"/>.</value> + ICollection IDictionary.Values + { + get { return GetValues(); } + } + + /// <summary> + /// Gets or sets the value associated with the specified key. + /// </summary> + /// <param name="key">The key of the value to get or set.</param> + /// <value>The value associated with the specified key, or a null reference (Nothing in Visual Basic) + /// if <paramref name="key"/> is not in the dictionary or <paramref name="key"/> is of a type that is + /// not assignable to the key type <typeparamref name="TKey"/> of the <see + /// cref="T:System.Collections.Generic.ConcurrentDictionary{TKey,TValue}"/>.</value> + /// <exception cref="T:System.ArgumentNullException"><paramref name="key"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentException"> + /// A value is being assigned, and <paramref name="key"/> is of a type that is not assignable to the + /// key type <typeparamref name="TKey"/> of the <see + /// cref="T:System.Collections.Generic.ConcurrentDictionary{TKey,TValue}"/>. -or- A value is being + /// assigned, and <paramref name="key"/> is of a type that is not assignable to the value type + /// <typeparamref name="TValue"/> of the <see + /// cref="T:System.Collections.Generic.ConcurrentDictionary{TKey,TValue}"/> + /// </exception> + object IDictionary.this[object key] + { + get + { + if (key == null) throw new ArgumentNullException("key"); + + TValue value; + if (key is TKey && this.TryGetValue((TKey)key, out value)) + { + return value; + } + + return null; + } + set + { + if (key == null) throw new ArgumentNullException("key"); + + if (!(key is TKey)) throw new ArgumentException(GetResource("ConcurrentDictionary_TypeOfKeyIncorrect")); + if (!(value is TValue)) throw new ArgumentException(GetResource("ConcurrentDictionary_TypeOfValueIncorrect")); + + ((ConcurrentDictionary<TKey, TValue>)this)[(TKey)key] = (TValue)value; + } + } + + #endregion + + #region ICollection Members + + /// <summary> + /// Copies the elements of the <see cref="T:System.Collections.ICollection"/> to an array, starting + /// at the specified array index. + /// </summary> + /// <param name="array">The one-dimensional array that is the destination of the elements copied from + /// the <see cref="T:System.Collections.ICollection"/>. The array must have zero-based + /// indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="T:System.ArgumentNullException"><paramref name="array"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// 0.</exception> + /// <exception cref="T:System.ArgumentException"><paramref name="index"/> is equal to or greater than + /// the length of the <paramref name="array"/>. -or- The number of elements in the source <see + /// cref="T:System.Collections.ICollection"/> + /// is greater than the available space from <paramref name="index"/> to the end of the destination + /// <paramref name="array"/>.</exception> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + void ICollection.CopyTo(Array array, int index) + { + if (array == null) throw new ArgumentNullException("array"); + if (index < 0) throw new ArgumentOutOfRangeException("index", GetResource("ConcurrentDictionary_IndexIsNegative")); + + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + Tables tables = m_tables; + + int count = 0; + + for (int i = 0; i < tables.m_locks.Length && count >= 0; i++) + { + count += tables.m_countPerLock[i]; + } + + if (array.Length - count < index || count < 0) //"count" itself or "count + index" can overflow + { + throw new ArgumentException(GetResource("ConcurrentDictionary_ArrayNotLargeEnough")); + } + + // To be consistent with the behavior of ICollection.CopyTo() in Dictionary<TKey,TValue>, + // we recognize three types of target arrays: + // - an array of KeyValuePair<TKey, TValue> structs + // - an array of DictionaryEntry structs + // - an array of objects + + KeyValuePair<TKey, TValue>[] pairs = array as KeyValuePair<TKey, TValue>[]; + if (pairs != null) + { + CopyToPairs(pairs, index); + return; + } + + DictionaryEntry[] entries = array as DictionaryEntry[]; + if (entries != null) + { + CopyToEntries(entries, index); + return; + } + + object[] objects = array as object[]; + if (objects != null) + { + CopyToObjects(objects, index); + return; + } + + throw new ArgumentException(GetResource("ConcurrentDictionary_ArrayIncorrectType"), "array"); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is + /// synchronized with the SyncRoot. + /// </summary> + /// <value>true if access to the <see cref="T:System.Collections.ICollection"/> is synchronized + /// (thread safe); otherwise, false. For <see + /// cref="T:System.Collections.Concurrent.ConcurrentDictionary{TKey,TValue}"/>, this property always + /// returns false.</value> + bool ICollection.IsSynchronized + { + get { return false; } + } + + /// <summary> + /// Gets an object that can be used to synchronize access to the <see + /// cref="T:System.Collections.ICollection"/>. This property is not supported. + /// </summary> + /// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported.</exception> + object ICollection.SyncRoot + { + get + { + throw new NotSupportedException(Environment.GetResourceString("ConcurrentCollection_SyncRoot_NotSupported")); + } + } + + #endregion + + /// <summary> + /// Replaces the bucket table with a larger one. To prevent multiple threads from resizing the + /// table as a result of a race condition, the Tables instance that holds the table of buckets deemed too + /// small is passed in as an argument to GrowTable(). GrowTable() obtains a lock, and then checks + /// the Tables instance has been replaced in the meantime or not. + /// The <paramref name="rehashCount"/> will be used to ensure that we don't do two subsequent resizes + /// because of a collision + /// </summary> + private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys, int rehashCount) + { + int locksAcquired = 0; + try + { + // The thread that first obtains m_locks[0] will be the one doing the resize operation + AcquireLocks(0, 1, ref locksAcquired); + + if (regenerateHashKeys && rehashCount == m_keyRehashCount) + { + // This method is called with regenerateHashKeys==true when we detected + // more than HashHelpers.HashCollisionThreshold collisions when adding a new element. + // In that case we are in the process of switching to another (randomized) comparer + // and we have to re-hash all the keys in the table. + // We are only going to do this if we did not just rehash the entire table while waiting for the lock + tables = m_tables; + } + else + { + // If we don't require a regeneration of hash keys we want to make sure we don't do work when + // we don't have to + if (tables != m_tables) + { + // We assume that since the table reference is different, it was already resized (or the budget + // was adjusted). If we ever decide to do table shrinking, or replace the table for other reasons, + // we will have to revisit this logic. + return; + } + + // Compute the (approx.) total size. Use an Int64 accumulation variable to avoid an overflow. + long approxCount = 0; + for (int i = 0; i < tables.m_countPerLock.Length; i++) + { + approxCount += tables.m_countPerLock[i]; + } + + // + // If the bucket array is too empty, double the budget instead of resizing the table + // + if (approxCount < tables.m_buckets.Length / 4) + { + m_budget = 2 * m_budget; + if (m_budget < 0) + { + m_budget = int.MaxValue; + } + + return; + } + } + // Compute the new table size. We find the smallest integer larger than twice the previous table size, and not divisible by + // 2,3,5 or 7. We can consider a different table-sizing policy in the future. + int newLength = 0; + bool maximizeTableSize = false; + try + { + checked + { + // Double the size of the buckets table and add one, so that we have an odd integer. + newLength = tables.m_buckets.Length * 2 + 1; + + // Now, we only need to check odd integers, and find the first that is not divisible + // by 3, 5 or 7. + while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) + { + newLength += 2; + } + + Assert(newLength % 2 != 0); + + if (newLength > Array.MaxArrayLength) + { + maximizeTableSize = true; + } + } + } + catch (OverflowException) + { + maximizeTableSize = true; + } + + if (maximizeTableSize) + { + newLength = Array.MaxArrayLength; + + // We want to make sure that GrowTable will not be called again, since table is at the maximum size. + // To achieve that, we set the budget to int.MaxValue. + // + // (There is one special case that would allow GrowTable() to be called in the future: + // calling Clear() on the ConcurrentDictionary will shrink the table and lower the budget.) + m_budget = int.MaxValue; + } + + // Now acquire all other locks for the table + AcquireLocks(1, tables.m_locks.Length, ref locksAcquired); + + object[] newLocks = tables.m_locks; + + // Add more locks + if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER) + { + newLocks = new object[tables.m_locks.Length * 2]; + Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length); + + for (int i = tables.m_locks.Length; i < newLocks.Length; i++) + { + newLocks[i] = new object(); + } + } + + Node[] newBuckets = new Node[newLength]; + int[] newCountPerLock = new int[newLocks.Length]; + + // Copy all data into a new table, creating new nodes for all elements + for (int i = 0; i < tables.m_buckets.Length; i++) + { + Node current = tables.m_buckets[i]; + while (current != null) + { + Node next = current.m_next; + int newBucketNo, newLockNo; + int nodeHashCode = current.m_hashcode; + + if (regenerateHashKeys) + { + // Recompute the hash from the key + nodeHashCode = newComparer.GetHashCode(current.m_key); + } + + GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); + + newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode, newBuckets[newBucketNo]); + + checked + { + newCountPerLock[newLockNo]++; + } + + current = next; + } + } + + // If this resize regenerated the hashkeys, increment the count + if (regenerateHashKeys) + { + // We use unchecked here because we don't want to throw an exception if + // an overflow happens + unchecked + { + m_keyRehashCount++; + } + } + + // Adjust the budget + m_budget = Math.Max(1, newBuckets.Length / newLocks.Length); + + // Replace tables with the new versions + m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer); + } + finally + { + // Release all locks that we took earlier + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// Computes the bucket and lock number for a particular key. + /// </summary> + private void GetBucketAndLockNo( + int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) + { + bucketNo = (hashcode & 0x7fffffff) % bucketCount; + lockNo = bucketNo % lockCount; + + Assert(bucketNo >= 0 && bucketNo < bucketCount); + Assert(lockNo >= 0 && lockNo < lockCount); + } + + /// <summary> + /// The number of concurrent writes for which to optimize by default. + /// </summary> + private static int DefaultConcurrencyLevel + { + + get { return DEFAULT_CONCURRENCY_MULTIPLIER * PlatformHelper.ProcessorCount; } + } + + /// <summary> + /// Acquires all locks for this hash table, and increments locksAcquired by the number + /// of locks that were successfully acquired. The locks are acquired in an increasing + /// order. + /// </summary> + private void AcquireAllLocks(ref int locksAcquired) + { +#if !FEATURE_CORECLR + if (CDSCollectionETWBCLProvider.Log.IsEnabled()) + { + CDSCollectionETWBCLProvider.Log.ConcurrentDictionary_AcquiringAllLocks(m_tables.m_buckets.Length); + } +#endif //!FEATURE_CORECLR + + // First, acquire lock 0 + AcquireLocks(0, 1, ref locksAcquired); + + // Now that we have lock 0, the m_locks array will not change (i.e., grow), + // and so we can safely read m_locks.Length. + AcquireLocks(1, m_tables.m_locks.Length, ref locksAcquired); + Assert(locksAcquired == m_tables.m_locks.Length); + } + + /// <summary> + /// Acquires a contiguous range of locks for this hash table, and increments locksAcquired + /// by the number of locks that were successfully acquired. The locks are acquired in an + /// increasing order. + /// </summary> + private void AcquireLocks(int fromInclusive, int toExclusive, ref int locksAcquired) + { + Assert(fromInclusive <= toExclusive); + object[] locks = m_tables.m_locks; + + for (int i = fromInclusive; i < toExclusive; i++) + { + bool lockTaken = false; + try + { + Monitor.Enter(locks[i], ref lockTaken); + } + finally + { + if (lockTaken) + { + locksAcquired++; + } + } + } + } + + /// <summary> + /// Releases a contiguous range of locks. + /// </summary> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private void ReleaseLocks(int fromInclusive, int toExclusive) + { + Assert(fromInclusive <= toExclusive); + + for (int i = fromInclusive; i < toExclusive; i++) + { + Monitor.Exit(m_tables.m_locks[i]); + } + } + + /// <summary> + /// Gets a collection containing the keys in the dictionary. + /// </summary> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + private ReadOnlyCollection<TKey> GetKeys() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + List<TKey> keys = new List<TKey>(); + + for (int i = 0; i < m_tables.m_buckets.Length; i++) + { + Node current = m_tables.m_buckets[i]; + while (current != null) + { + keys.Add(current.m_key); + current = current.m_next; + } + } + + return new ReadOnlyCollection<TKey>(keys); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// Gets a collection containing the values in the dictionary. + /// </summary> + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + private ReadOnlyCollection<TValue> GetValues() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + List<TValue> values = new List<TValue>(); + + for (int i = 0; i < m_tables.m_buckets.Length; i++) + { + Node current = m_tables.m_buckets[i]; + while (current != null) + { + values.Add(current.m_value); + current = current.m_next; + } + } + + return new ReadOnlyCollection<TValue>(values); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + /// <summary> + /// A helper method for asserts. + /// </summary> + [Conditional("DEBUG")] + private void Assert(bool condition) + { + Contract.Assert(condition); + } + + /// <summary> + /// A helper function to obtain the string for a particular resource key. + /// </summary> + /// <param name="key"></param> + /// <returns></returns> + private string GetResource(string key) + { + Assert(key != null); + + return Environment.GetResourceString(key); + } + + /// <summary> + /// A node in a singly-linked list representing a particular hash table bucket. + /// </summary> + private class Node + { + internal TKey m_key; + internal TValue m_value; + internal volatile Node m_next; + internal int m_hashcode; + + internal Node(TKey key, TValue value, int hashcode, Node next) + { + m_key = key; + m_value = value; + m_next = next; + m_hashcode = hashcode; + } + } + + /// <summary> + /// A private class to represent enumeration over the dictionary that implements the + /// IDictionaryEnumerator interface. + /// </summary> + private class DictionaryEnumerator : IDictionaryEnumerator + { + IEnumerator<KeyValuePair<TKey, TValue>> m_enumerator; // Enumerator over the dictionary. + + internal DictionaryEnumerator(ConcurrentDictionary<TKey, TValue> dictionary) + { + m_enumerator = dictionary.GetEnumerator(); + } + + public DictionaryEntry Entry + { + get { return new DictionaryEntry(m_enumerator.Current.Key, m_enumerator.Current.Value); } + } + + public object Key + { + get { return m_enumerator.Current.Key; } + } + + public object Value + { + get { return m_enumerator.Current.Value; } + } + + public object Current + { + get { return this.Entry; } + } + + public bool MoveNext() + { + return m_enumerator.MoveNext(); + } + + public void Reset() + { + m_enumerator.Reset(); + } + } + +#if !FEATURE_CORECLR + /// <summary> + /// Get the data array to be serialized + /// </summary> + [OnSerializing] + private void OnSerializing(StreamingContext context) + { + Tables tables = m_tables; + + // save the data into the serialization array to be saved + m_serializationArray = ToArray(); + m_serializationConcurrencyLevel = tables.m_locks.Length; + m_serializationCapacity = tables.m_buckets.Length; + m_comparer = (IEqualityComparer<TKey>)HashHelpers.GetEqualityComparerForSerialization(tables.m_comparer); + } + + /// <summary> + /// Construct the dictionary from a previously serialized one + /// </summary> + [OnDeserialized] + private void OnDeserialized(StreamingContext context) + { + KeyValuePair<TKey, TValue>[] array = m_serializationArray; + + var buckets = new Node[m_serializationCapacity]; + var countPerLock = new int[m_serializationConcurrencyLevel]; + + var locks = new object[m_serializationConcurrencyLevel]; + for (int i = 0; i < locks.Length; i++) + { + locks[i] = new object(); + } + + m_tables = new Tables(buckets, locks, countPerLock, m_comparer); + + InitializeFromCollection(array); + m_serializationArray = null; + + } +#endif + } +} diff --git a/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs b/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs new file mode 100644 index 0000000000..9164eadad1 --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs @@ -0,0 +1,960 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. +#pragma warning disable 0420 + + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// A lock-free, concurrent queue primitive, and its associated debugger view type. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.Contracts; +using System.Runtime.ConstrainedExecution; +using System.Runtime.InteropServices; +using System.Runtime.Serialization; +using System.Security; +using System.Security.Permissions; +using System.Threading; + +namespace System.Collections.Concurrent +{ + + /// <summary> + /// Represents a thread-safe first-in, first-out collection of objects. + /// </summary> + /// <typeparam name="T">Specifies the type of elements in the queue.</typeparam> + /// <remarks> + /// All public and protected members of <see cref="ConcurrentQueue{T}"/> are thread-safe and may be used + /// concurrently from multiple threads. + /// </remarks> + [ComVisible(false)] + [DebuggerDisplay("Count = {Count}")] + [DebuggerTypeProxy(typeof(SystemCollectionsConcurrent_ProducerConsumerCollectionDebugView<>))] + [HostProtection(Synchronization = true, ExternalThreading = true)] + [Serializable] + public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> + { + //fields of ConcurrentQueue + [NonSerialized] + private volatile Segment m_head; + + [NonSerialized] + private volatile Segment m_tail; + + private T[] m_serializationArray; // Used for custom serialization. + + private const int SEGMENT_SIZE = 32; + + //number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot. + [NonSerialized] + internal volatile int m_numSnapshotTakers = 0; + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class. + /// </summary> + public ConcurrentQueue() + { + m_head = m_tail = new Segment(0, this); + } + + /// <summary> + /// Initializes the contents of the queue from an existing collection. + /// </summary> + /// <param name="collection">A collection from which to copy elements.</param> + private void InitializeFromCollection(IEnumerable<T> collection) + { + Segment localTail = new Segment(0, this);//use this local variable to avoid the extra volatile read/write. this is safe because it is only called from ctor + m_head = localTail; + + int index = 0; + foreach (T element in collection) + { + Contract.Assert(index >= 0 && index < SEGMENT_SIZE); + localTail.UnsafeAdd(element); + index++; + + if (index >= SEGMENT_SIZE) + { + localTail = localTail.UnsafeGrow(); + index = 0; + } + } + + m_tail = localTail; + } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> + /// class that contains elements copied from the specified collection + /// </summary> + /// <param name="collection">The collection whose elements are copied to the new <see + /// cref="ConcurrentQueue{T}"/>.</param> + /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is + /// null.</exception> + public ConcurrentQueue(IEnumerable<T> collection) + { + if (collection == null) + { + throw new ArgumentNullException("collection"); + } + + InitializeFromCollection(collection); + } + + /// <summary> + /// Get the data array to be serialized + /// </summary> + [OnSerializing] + private void OnSerializing(StreamingContext context) + { + // save the data into the serialization array to be saved + m_serializationArray = ToArray(); + } + + /// <summary> + /// Construct the queue from a previously seiralized one + /// </summary> + [OnDeserialized] + private void OnDeserialized(StreamingContext context) + { + Contract.Assert(m_serializationArray != null); + InitializeFromCollection(m_serializationArray); + m_serializationArray = null; + } + + /// <summary> + /// Copies the elements of the <see cref="T:System.Collections.ICollection"/> to an <see + /// cref="T:System.Array"/>, starting at a particular + /// <see cref="T:System.Array"/> index. + /// </summary> + /// <param name="array">The one-dimensional <see cref="T:System.Array">Array</see> that is the + /// destination of the elements copied from the + /// <see cref="T:System.Collections.Concurrent.ConcurrentBag"/>. The <see + /// cref="T:System.Array">Array</see> must have zero-based indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in + /// Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// zero.</exception> + /// <exception cref="ArgumentException"> + /// <paramref name="array"/> is multidimensional. -or- + /// <paramref name="array"/> does not have zero-based indexing. -or- + /// <paramref name="index"/> is equal to or greater than the length of the <paramref name="array"/> + /// -or- The number of elements in the source <see cref="T:System.Collections.ICollection"/> is + /// greater than the available space from <paramref name="index"/> to the end of the destination + /// <paramref name="array"/>. -or- The type of the source <see + /// cref="T:System.Collections.ICollection"/> cannot be cast automatically to the type of the + /// destination <paramref name="array"/>. + /// </exception> + void ICollection.CopyTo(Array array, int index) + { + // Validate arguments. + if (array == null) + { + throw new ArgumentNullException("array"); + } + + // We must be careful not to corrupt the array, so we will first accumulate an + // internal list of elements that we will then copy to the array. This requires + // some extra allocation, but is necessary since we don't know up front whether + // the array is sufficiently large to hold the stack's contents. + ((ICollection)ToList()).CopyTo(array, index); + } + + /// <summary> + /// Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is + /// synchronized with the SyncRoot. + /// </summary> + /// <value>true if access to the <see cref="T:System.Collections.ICollection"/> is synchronized + /// with the SyncRoot; otherwise, false. For <see cref="ConcurrentQueue{T}"/>, this property always + /// returns false.</value> + bool ICollection.IsSynchronized + { + // Gets a value indicating whether access to this collection is synchronized. Always returns + // false. The reason is subtle. While access is in face thread safe, it's not the case that + // locking on the SyncRoot would have prevented concurrent pushes and pops, as this property + // would typically indicate; that's because we internally use CAS operations vs. true locks. + get { return false; } + } + + + /// <summary> + /// Gets an object that can be used to synchronize access to the <see + /// cref="T:System.Collections.ICollection"/>. This property is not supported. + /// </summary> + /// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported.</exception> + object ICollection.SyncRoot + { + get + { + throw new NotSupportedException(Environment.GetResourceString("ConcurrentCollection_SyncRoot_NotSupported")); + } + } + + /// <summary> + /// Returns an enumerator that iterates through a collection. + /// </summary> + /// <returns>An <see cref="T:System.Collections.IEnumerator"/> that can be used to iterate through the collection.</returns> + IEnumerator IEnumerable.GetEnumerator() + { + return ((IEnumerable<T>)this).GetEnumerator(); + } + + /// <summary> + /// Attempts to add an object to the <see + /// cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. + /// </summary> + /// <param name="item">The object to add to the <see + /// cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. The value can be a null + /// reference (Nothing in Visual Basic) for reference types. + /// </param> + /// <returns>true if the object was added successfully; otherwise, false.</returns> + /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will always add the object to the + /// end of the <see cref="ConcurrentQueue{T}"/> + /// and return true.</remarks> + bool IProducerConsumerCollection<T>.TryAdd(T item) + { + Enqueue(item); + return true; + } + + /// <summary> + /// Attempts to remove and return an object from the <see + /// cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. + /// </summary> + /// <param name="item"> + /// When this method returns, if the operation was successful, <paramref name="item"/> contains the + /// object removed. If no object was available to be removed, the value is unspecified. + /// </param> + /// <returns>true if an element was removed and returned succesfully; otherwise, false.</returns> + /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will attempt to remove the object + /// from the beginning of the <see cref="ConcurrentQueue{T}"/>. + /// </remarks> + bool IProducerConsumerCollection<T>.TryTake(out T item) + { + return TryDequeue(out item); + } + + /// <summary> + /// Gets a value that indicates whether the <see cref="ConcurrentQueue{T}"/> is empty. + /// </summary> + /// <value>true if the <see cref="ConcurrentQueue{T}"/> is empty; otherwise, false.</value> + /// <remarks> + /// For determining whether the collection contains any items, use of this property is recommended + /// rather than retrieving the number of items from the <see cref="Count"/> property and comparing it + /// to 0. However, as this collection is intended to be accessed concurrently, it may be the case + /// that another thread will modify the collection after <see cref="IsEmpty"/> returns, thus invalidating + /// the result. + /// </remarks> + public bool IsEmpty + { + get + { + Segment head = m_head; + if (!head.IsEmpty) + //fast route 1: + //if current head is not empty, then queue is not empty + return false; + else if (head.Next == null) + //fast route 2: + //if current head is empty and it's the last segment + //then queue is empty + return true; + else + //slow route: + //current head is empty and it is NOT the last segment, + //it means another thread is growing new segment + { + SpinWait spin = new SpinWait(); + while (head.IsEmpty) + { + if (head.Next == null) + return true; + + spin.SpinOnce(); + head = m_head; + } + return false; + } + } + } + + /// <summary> + /// Copies the elements stored in the <see cref="ConcurrentQueue{T}"/> to a new array. + /// </summary> + /// <returns>A new array containing a snapshot of elements copied from the <see + /// cref="ConcurrentQueue{T}"/>.</returns> + public T[] ToArray() + { + return ToList().ToArray(); + } + + /// <summary> + /// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see + /// cref="T:System.Collections.Generic.List{T}"/>. + /// </summary> + /// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of + /// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns> + private List<T> ToList() + { + // Increments the number of active snapshot takers. This increment must happen before the snapshot is + // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it + // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. + Interlocked.Increment(ref m_numSnapshotTakers); + + List<T> list = new List<T>(); + try + { + //store head and tail positions in buffer, + Segment head, tail; + int headLow, tailHigh; + GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); + + if (head == tail) + { + head.AddToList(list, headLow, tailHigh); + } + else + { + head.AddToList(list, headLow, SEGMENT_SIZE - 1); + Segment curr = head.Next; + while (curr != tail) + { + curr.AddToList(list, 0, SEGMENT_SIZE - 1); + curr = curr.Next; + } + //Add tail segment + tail.AddToList(list, 0, tailHigh); + } + } + finally + { + // This Decrement must happen after copying is over. + Interlocked.Decrement(ref m_numSnapshotTakers); + } + return list; + } + + /// <summary> + /// Store the position of the current head and tail positions. + /// </summary> + /// <param name="head">return the head segment</param> + /// <param name="tail">return the tail segment</param> + /// <param name="headLow">return the head offset, value range [0, SEGMENT_SIZE]</param> + /// <param name="tailHigh">return the tail offset, value range [-1, SEGMENT_SIZE-1]</param> + private void GetHeadTailPositions(out Segment head, out Segment tail, + out int headLow, out int tailHigh) + { + head = m_head; + tail = m_tail; + headLow = head.Low; + tailHigh = tail.High; + SpinWait spin = new SpinWait(); + + //we loop until the observed values are stable and sensible. + //This ensures that any update order by other methods can be tolerated. + while ( + //if head and tail changed, retry + head != m_head || tail != m_tail + //if low and high pointers, retry + || headLow != head.Low || tailHigh != tail.High + //if head jumps ahead of tail because of concurrent grow and dequeue, retry + || head.m_index > tail.m_index) + { + spin.SpinOnce(); + head = m_head; + tail = m_tail; + headLow = head.Low; + tailHigh = tail.High; + } + } + + + /// <summary> + /// Gets the number of elements contained in the <see cref="ConcurrentQueue{T}"/>. + /// </summary> + /// <value>The number of elements contained in the <see cref="ConcurrentQueue{T}"/>.</value> + /// <remarks> + /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/> + /// property is recommended rather than retrieving the number of items from the <see cref="Count"/> + /// property and comparing it to 0. + /// </remarks> + public int Count + { + get + { + //store head and tail positions in buffer, + Segment head, tail; + int headLow, tailHigh; + GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); + + if (head == tail) + { + return tailHigh - headLow + 1; + } + + //head segment + int count = SEGMENT_SIZE - headLow; + + //middle segment(s), if any, are full. + //We don't deal with overflow to be consistent with the behavior of generic types in CLR. + count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1)); + + //tail segment + count += tailHigh + 1; + + return count; + } + } + + + /// <summary> + /// Copies the <see cref="ConcurrentQueue{T}"/> elements to an existing one-dimensional <see + /// cref="T:System.Array">Array</see>, starting at the specified array index. + /// </summary> + /// <param name="array">The one-dimensional <see cref="T:System.Array">Array</see> that is the + /// destination of the elements copied from the + /// <see cref="ConcurrentQueue{T}"/>. The <see cref="T:System.Array">Array</see> must have zero-based + /// indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in + /// Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// zero.</exception> + /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the + /// length of the <paramref name="array"/> + /// -or- The number of elements in the source <see cref="ConcurrentQueue{T}"/> is greater than the + /// available space from <paramref name="index"/> to the end of the destination <paramref + /// name="array"/>. + /// </exception> + public void CopyTo(T[] array, int index) + { + if (array == null) + { + throw new ArgumentNullException("array"); + } + + // We must be careful not to corrupt the array, so we will first accumulate an + // internal list of elements that we will then copy to the array. This requires + // some extra allocation, but is necessary since we don't know up front whether + // the array is sufficiently large to hold the stack's contents. + ToList().CopyTo(array, index); + } + + + /// <summary> + /// Returns an enumerator that iterates through the <see + /// cref="ConcurrentQueue{T}"/>. + /// </summary> + /// <returns>An enumerator for the contents of the <see + /// cref="ConcurrentQueue{T}"/>.</returns> + /// <remarks> + /// The enumeration represents a moment-in-time snapshot of the contents + /// of the queue. It does not reflect any updates to the collection after + /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use + /// concurrently with reads from and writes to the queue. + /// </remarks> + public IEnumerator<T> GetEnumerator() + { + // Increments the number of active snapshot takers. This increment must happen before the snapshot is + // taken. At the same time, Decrement must happen after the enumeration is over. Only in this way, can it + // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. + Interlocked.Increment(ref m_numSnapshotTakers); + + // Takes a snapshot of the queue. + // A design flaw here: if a Thread.Abort() happens, we cannot decrement m_numSnapshotTakers. But we cannot + // wrap the following with a try/finally block, otherwise the decrement will happen before the yield return + // statements in the GetEnumerator (head, tail, headLow, tailHigh) method. + Segment head, tail; + int headLow, tailHigh; + GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); + + //If we put yield-return here, the iterator will be lazily evaluated. As a result a snapshot of + // the queue is not taken when GetEnumerator is initialized but when MoveNext() is first called. + // This is inconsistent with existing generic collections. In order to prevent it, we capture the + // value of m_head in a buffer and call out to a helper method. + //The old way of doing this was to return the ToList().GetEnumerator(), but ToList() was an + // unnecessary perfomance hit. + return GetEnumerator(head, tail, headLow, tailHigh); + } + + /// <summary> + /// Helper method of GetEnumerator to seperate out yield return statement, and prevent lazy evaluation. + /// </summary> + private IEnumerator<T> GetEnumerator(Segment head, Segment tail, int headLow, int tailHigh) + { + try + { + SpinWait spin = new SpinWait(); + + if (head == tail) + { + for (int i = headLow; i <= tailHigh; i++) + { + // If the position is reserved by an Enqueue operation, but the value is not written into, + // spin until the value is available. + spin.Reset(); + while (!head.m_state[i].m_value) + { + spin.SpinOnce(); + } + yield return head.m_array[i]; + } + } + else + { + //iterate on head segment + for (int i = headLow; i < SEGMENT_SIZE; i++) + { + // If the position is reserved by an Enqueue operation, but the value is not written into, + // spin until the value is available. + spin.Reset(); + while (!head.m_state[i].m_value) + { + spin.SpinOnce(); + } + yield return head.m_array[i]; + } + //iterate on middle segments + Segment curr = head.Next; + while (curr != tail) + { + for (int i = 0; i < SEGMENT_SIZE; i++) + { + // If the position is reserved by an Enqueue operation, but the value is not written into, + // spin until the value is available. + spin.Reset(); + while (!curr.m_state[i].m_value) + { + spin.SpinOnce(); + } + yield return curr.m_array[i]; + } + curr = curr.Next; + } + + //iterate on tail segment + for (int i = 0; i <= tailHigh; i++) + { + // If the position is reserved by an Enqueue operation, but the value is not written into, + // spin until the value is available. + spin.Reset(); + while (!tail.m_state[i].m_value) + { + spin.SpinOnce(); + } + yield return tail.m_array[i]; + } + } + } + finally + { + // This Decrement must happen after the enumeration is over. + Interlocked.Decrement(ref m_numSnapshotTakers); + } + } + + /// <summary> + /// Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>. + /// </summary> + /// <param name="item">The object to add to the end of the <see + /// cref="ConcurrentQueue{T}"/>. The value can be a null reference + /// (Nothing in Visual Basic) for reference types. + /// </param> + public void Enqueue(T item) + { + SpinWait spin = new SpinWait(); + while (true) + { + Segment tail = m_tail; + if (tail.TryAppend(item)) + return; + spin.SpinOnce(); + } + } + + + /// <summary> + /// Attempts to remove and return the object at the beginning of the <see + /// cref="ConcurrentQueue{T}"/>. + /// </summary> + /// <param name="result"> + /// When this method returns, if the operation was successful, <paramref name="result"/> contains the + /// object removed. If no object was available to be removed, the value is unspecified. + /// </param> + /// <returns>true if an element was removed and returned from the beggining of the <see + /// cref="ConcurrentQueue{T}"/> + /// succesfully; otherwise, false.</returns> + public bool TryDequeue(out T result) + { + while (!IsEmpty) + { + Segment head = m_head; + if (head.TryRemove(out result)) + return true; + //since method IsEmpty spins, we don't need to spin in the while loop + } + result = default(T); + return false; + } + + /// <summary> + /// Attempts to return an object from the beginning of the <see cref="ConcurrentQueue{T}"/> + /// without removing it. + /// </summary> + /// <param name="result">When this method returns, <paramref name="result"/> contains an object from + /// the beginning of the <see cref="T:System.Collections.Concurrent.ConccurrentQueue{T}"/> or an + /// unspecified value if the operation failed.</param> + /// <returns>true if and object was returned successfully; otherwise, false.</returns> + public bool TryPeek(out T result) + { + Interlocked.Increment(ref m_numSnapshotTakers); + + while (!IsEmpty) + { + Segment head = m_head; + if (head.TryPeek(out result)) + { + Interlocked.Decrement(ref m_numSnapshotTakers); + return true; + } + //since method IsEmpty spins, we don't need to spin in the while loop + } + result = default(T); + Interlocked.Decrement(ref m_numSnapshotTakers); + return false; + } + + + /// <summary> + /// private class for ConcurrentQueue. + /// a queue is a linked list of small arrays, each node is called a segment. + /// A segment contains an array, a pointer to the next segment, and m_low, m_high indices recording + /// the first and last valid elements of the array. + /// </summary> + private class Segment + { + //we define two volatile arrays: m_array and m_state. Note that the accesses to the array items + //do not get volatile treatment. But we don't need to worry about loading adjacent elements or + //store/load on adjacent elements would suffer reordering. + // - Two stores: these are at risk, but CLRv2 memory model guarantees store-release hence we are safe. + // - Two loads: because one item from two volatile arrays are accessed, the loads of the array references + // are sufficient to prevent reordering of the loads of the elements. + internal volatile T[] m_array; + + // For each entry in m_array, the corresponding entry in m_state indicates whether this position contains + // a valid value. m_state is initially all false. + internal volatile VolatileBool[] m_state; + + //pointer to the next segment. null if the current segment is the last segment + private volatile Segment m_next; + + //We use this zero based index to track how many segments have been created for the queue, and + //to compute how many active segments are there currently. + // * The number of currently active segments is : m_tail.m_index - m_head.m_index + 1; + // * m_index is incremented with every Segment.Grow operation. We use Int64 type, and we can safely + // assume that it never overflows. To overflow, we need to do 2^63 increments, even at a rate of 4 + // billion (2^32) increments per second, it takes 2^31 seconds, which is about 64 years. + internal readonly long m_index; + + //indices of where the first and last valid values + // - m_low points to the position of the next element to pop from this segment, range [0, infinity) + // m_low >= SEGMENT_SIZE implies the segment is disposable + // - m_high points to the position of the latest pushed element, range [-1, infinity) + // m_high == -1 implies the segment is new and empty + // m_high >= SEGMENT_SIZE-1 means this segment is ready to grow. + // and the thread who sets m_high to SEGMENT_SIZE-1 is responsible to grow the segment + // - Math.Min(m_low, SEGMENT_SIZE) > Math.Min(m_high, SEGMENT_SIZE-1) implies segment is empty + // - initially m_low =0 and m_high=-1; + private volatile int m_low; + private volatile int m_high; + + private volatile ConcurrentQueue<T> m_source; + + /// <summary> + /// Create and initialize a segment with the specified index. + /// </summary> + internal Segment(long index, ConcurrentQueue<T> source) + { + m_array = new T[SEGMENT_SIZE]; + m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false + m_high = -1; + Contract.Assert(index >= 0); + m_index = index; + m_source = source; + } + + /// <summary> + /// return the next segment + /// </summary> + internal Segment Next + { + get { return m_next; } + } + + + /// <summary> + /// return true if the current segment is empty (doesn't have any element available to dequeue, + /// false otherwise + /// </summary> + internal bool IsEmpty + { + get { return (Low > High); } + } + + /// <summary> + /// Add an element to the tail of the current segment + /// exclusively called by ConcurrentQueue.InitializedFromCollection + /// InitializeFromCollection is responsible to guaratee that there is no index overflow, + /// and there is no contention + /// </summary> + /// <param name="value"></param> + internal void UnsafeAdd(T value) + { + Contract.Assert(m_high < SEGMENT_SIZE - 1); + m_high++; + m_array[m_high] = value; + m_state[m_high].m_value = true; + } + + /// <summary> + /// Create a new segment and append to the current one + /// Does not update the m_tail pointer + /// exclusively called by ConcurrentQueue.InitializedFromCollection + /// InitializeFromCollection is responsible to guaratee that there is no index overflow, + /// and there is no contention + /// </summary> + /// <returns>the reference to the new Segment</returns> + internal Segment UnsafeGrow() + { + Contract.Assert(m_high >= SEGMENT_SIZE - 1); + Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow + m_next = newSegment; + return newSegment; + } + + /// <summary> + /// Create a new segment and append to the current one + /// Update the m_tail pointer + /// This method is called when there is no contention + /// </summary> + internal void Grow() + { + //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) + Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow + m_next = newSegment; + Contract.Assert(m_source.m_tail == this); + m_source.m_tail = m_next; + } + + + /// <summary> + /// Try to append an element at the end of this segment. + /// </summary> + /// <param name="value">the element to append</param> + /// <param name="tail">The tail.</param> + /// <returns>true if the element is appended, false if the current segment is full</returns> + /// <remarks>if appending the specified element succeeds, and after which the segment is full, + /// then grow the segment</remarks> + internal bool TryAppend(T value) + { + //quickly check if m_high is already over the boundary, if so, bail out + if (m_high >= SEGMENT_SIZE - 1) + { + return false; + } + + //Now we will use a CAS to increment m_high, and store the result in newhigh. + //Depending on how many free spots left in this segment and how many threads are doing this Increment + //at this time, the returning "newhigh" can be + // 1) < SEGMENT_SIZE - 1 : we took a spot in this segment, and not the last one, just insert the value + // 2) == SEGMENT_SIZE - 1 : we took the last spot, insert the value AND grow the segment + // 3) > SEGMENT_SIZE - 1 : we failed to reserve a spot in this segment, we return false to + // Queue.Enqueue method, telling it to try again in the next segment. + + int newhigh = SEGMENT_SIZE; //initial value set to be over the boundary + + //We need do Interlocked.Increment and value/state update in a finally block to ensure that they run + //without interuption. This is to prevent anything from happening between them, and another dequeue + //thread maybe spinning forever to wait for m_state[] to be true; + try + { } + finally + { + newhigh = Interlocked.Increment(ref m_high); + if (newhigh <= SEGMENT_SIZE - 1) + { + m_array[newhigh] = value; + m_state[newhigh].m_value = true; + } + + //if this thread takes up the last slot in the segment, then this thread is responsible + //to grow a new segment. Calling Grow must be in the finally block too for reliability reason: + //if thread abort during Grow, other threads will be left busy spinning forever. + if (newhigh == SEGMENT_SIZE - 1) + { + Grow(); + } + } + + //if newhigh <= SEGMENT_SIZE-1, it means the current thread successfully takes up a spot + return newhigh <= SEGMENT_SIZE - 1; + } + + + /// <summary> + /// try to remove an element from the head of current segment + /// </summary> + /// <param name="result">The result.</param> + /// <param name="head">The head.</param> + /// <returns>return false only if the current segment is empty</returns> + internal bool TryRemove(out T result) + { + SpinWait spin = new SpinWait(); + int lowLocal = Low, highLocal = High; + while (lowLocal <= highLocal) + { + //try to update m_low + if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) + { + //if the specified value is not available (this spot is taken by a push operation, + // but the value is not written into yet), then spin + SpinWait spinLocal = new SpinWait(); + while (!m_state[lowLocal].m_value) + { + spinLocal.SpinOnce(); + } + result = m_array[lowLocal]; + + // If there is no other thread taking snapshot (GetEnumerator(), ToList(), etc), reset the deleted entry to null. + // It is ok if after this conditional check m_numSnapshotTakers becomes > 0, because new snapshots won't include + // the deleted entry at m_array[lowLocal]. + if (m_source.m_numSnapshotTakers <= 0) + { + m_array[lowLocal] = default(T); //release the reference to the object. + } + + //if the current thread sets m_low to SEGMENT_SIZE, which means the current segment becomes + //disposable, then this thread is responsible to dispose this segment, and reset m_head + if (lowLocal + 1 >= SEGMENT_SIZE) + { + // Invariant: we only dispose the current m_head, not any other segment + // In usual situation, disposing a segment is simply seting m_head to m_head.m_next + // But there is one special case, where m_head and m_tail points to the same and ONLY + //segment of the queue: Another thread A is doing Enqueue and finds that it needs to grow, + //while the *current* thread is doing *this* Dequeue operation, and finds that it needs to + //dispose the current (and ONLY) segment. Then we need to wait till thread A finishes its + //Grow operation, this is the reason of having the following while loop + spinLocal = new SpinWait(); + while (m_next == null) + { + spinLocal.SpinOnce(); + } + Contract.Assert(m_source.m_head == this); + m_source.m_head = m_next; + } + return true; + } + else + { + //CAS failed due to contention: spin briefly and retry + spin.SpinOnce(); + lowLocal = Low; highLocal = High; + } + }//end of while + result = default(T); + return false; + } + + /// <summary> + /// try to peek the current segment + /// </summary> + /// <param name="result">holds the return value of the element at the head position, + /// value set to default(T) if there is no such an element</param> + /// <returns>true if there are elements in the current segment, false otherwise</returns> + internal bool TryPeek(out T result) + { + result = default(T); + int lowLocal = Low; + if (lowLocal > High) + return false; + SpinWait spin = new SpinWait(); + while (!m_state[lowLocal].m_value) + { + spin.SpinOnce(); + } + result = m_array[lowLocal]; + return true; + } + + /// <summary> + /// Adds part or all of the current segment into a List. + /// </summary> + /// <param name="list">the list to which to add</param> + /// <param name="start">the start position</param> + /// <param name="end">the end position</param> + internal void AddToList(List<T> list, int start, int end) + { + for (int i = start; i <= end; i++) + { + SpinWait spin = new SpinWait(); + while (!m_state[i].m_value) + { + spin.SpinOnce(); + } + list.Add(m_array[i]); + } + } + + /// <summary> + /// return the position of the head of the current segment + /// Value range [0, SEGMENT_SIZE], if it's SEGMENT_SIZE, it means this segment is exhausted and thus empty + /// </summary> + internal int Low + { + get + { + return Math.Min(m_low, SEGMENT_SIZE); + } + } + + /// <summary> + /// return the logical position of the tail of the current segment + /// Value range [-1, SEGMENT_SIZE-1]. When it's -1, it means this is a new segment and has no elemnet yet + /// </summary> + internal int High + { + get + { + //if m_high > SEGMENT_SIZE, it means it's out of range, we should return + //SEGMENT_SIZE-1 as the logical position + return Math.Min(m_high, SEGMENT_SIZE - 1); + } + } + + } + }//end of class Segment + + /// <summary> + /// A wrapper struct for volatile bool, please note the copy of the struct it self will not be volatile + /// for example this statement will not include in volatilness operation volatileBool1 = volatileBool2 the jit will copy the struct and will ignore the volatile + /// </summary> + struct VolatileBool + { + public VolatileBool(bool value) + { + m_value = value; + } + public volatile bool m_value; + } +} diff --git a/src/mscorlib/src/System/Collections/Concurrent/ConcurrentStack.cs b/src/mscorlib/src/System/Collections/Concurrent/ConcurrentStack.cs new file mode 100644 index 0000000000..15d4176cff --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/ConcurrentStack.cs @@ -0,0 +1,840 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. +#pragma warning disable 0420 + + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// A lock-free, concurrent stack primitive, and its associated debugger view type. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.Contracts; +using System.Runtime.ConstrainedExecution; +using System.Runtime.Serialization; +using System.Security; +using System.Security.Permissions; +using System.Threading; + +namespace System.Collections.Concurrent +{ + // A stack that uses CAS operations internally to maintain thread-safety in a lock-free + // manner. Attempting to push or pop concurrently from the stack will not trigger waiting, + // although some optimistic concurrency and retry is used, possibly leading to lack of + // fairness and/or livelock. The stack uses spinning and backoff to add some randomization, + // in hopes of statistically decreasing the possibility of livelock. + // + // Note that we currently allocate a new node on every push. This avoids having to worry + // about potential ABA issues, since the CLR GC ensures that a memory address cannot be + // reused before all references to it have died. + + /// <summary> + /// Represents a thread-safe last-in, first-out collection of objects. + /// </summary> + /// <typeparam name="T">Specifies the type of elements in the stack.</typeparam> + /// <remarks> + /// All public and protected members of <see cref="ConcurrentStack{T}"/> are thread-safe and may be used + /// concurrently from multiple threads. + /// </remarks> + [DebuggerDisplay("Count = {Count}")] + [DebuggerTypeProxy(typeof(SystemCollectionsConcurrent_ProducerConsumerCollectionDebugView<>))] + [HostProtection(Synchronization = true, ExternalThreading = true)] +#if !FEATURE_CORECLR + [Serializable] +#endif //!FEATURE_CORECLR + public class ConcurrentStack<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> + { + /// <summary> + /// A simple (internal) node type used to store elements of concurrent stacks and queues. + /// </summary> + private class Node + { + internal readonly T m_value; // Value of the node. + internal Node m_next; // Next pointer. + + /// <summary> + /// Constructs a new node with the specified value and no next node. + /// </summary> + /// <param name="value">The value of the node.</param> + internal Node(T value) + { + m_value = value; + m_next = null; + } + } + +#if !FEATURE_CORECLR + [NonSerialized] +#endif //!FEATURE_CORECLR + private volatile Node m_head; // The stack is a singly linked list, and only remembers the head. + +#if !FEATURE_CORECLR + private T[] m_serializationArray; // Used for custom serialization. +#endif //!FEATURE_CORECLR + + private const int BACKOFF_MAX_YIELDS = 8; // Arbitrary number to cap backoff. + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentStack{T}"/> + /// class. + /// </summary> + public ConcurrentStack() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="ConcurrentStack{T}"/> + /// class that contains elements copied from the specified collection + /// </summary> + /// <param name="collection">The collection whose elements are copied to the new <see + /// cref="ConcurrentStack{T}"/>.</param> + /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is + /// null.</exception> + public ConcurrentStack(IEnumerable<T> collection) + { + if (collection == null) + { + throw new ArgumentNullException("collection"); + } + InitializeFromCollection(collection); + } + + /// <summary> + /// Initializes the contents of the stack from an existing collection. + /// </summary> + /// <param name="collection">A collection from which to copy elements.</param> + private void InitializeFromCollection(IEnumerable<T> collection) + { + // We just copy the contents of the collection to our stack. + Node lastNode = null; + foreach (T element in collection) + { + Node newNode = new Node(element); + newNode.m_next = lastNode; + lastNode = newNode; + } + + m_head = lastNode; + } + +#if !FEATURE_CORECLR + /// <summary> + /// Get the data array to be serialized + /// </summary> + [OnSerializing] + private void OnSerializing(StreamingContext context) + { + // save the data into the serialization array to be saved + m_serializationArray = ToArray(); + } + + /// <summary> + /// Construct the stack from a previously seiralized one + /// </summary> + [OnDeserialized] + private void OnDeserialized(StreamingContext context) + { + Contract.Assert(m_serializationArray != null); + // Add the elements to our stack. We need to add them from head-to-tail, to + // preserve the original ordering of the stack before serialization. + Node prevNode = null; + Node head = null; + for (int i = 0; i < m_serializationArray.Length; i++) + { + Node currNode = new Node(m_serializationArray[i]); + + if (prevNode == null) + { + head = currNode; + } + else + { + prevNode.m_next = currNode; + } + + prevNode = currNode; + } + + m_head = head; + m_serializationArray = null; + } +#endif //!FEATURE_CORECLR + + + /// <summary> + /// Gets a value that indicates whether the <see cref="ConcurrentStack{T}"/> is empty. + /// </summary> + /// <value>true if the <see cref="ConcurrentStack{T}"/> is empty; otherwise, false.</value> + /// <remarks> + /// For determining whether the collection contains any items, use of this property is recommended + /// rather than retrieving the number of items from the <see cref="Count"/> property and comparing it + /// to 0. However, as this collection is intended to be accessed concurrently, it may be the case + /// that another thread will modify the collection after <see cref="IsEmpty"/> returns, thus invalidating + /// the result. + /// </remarks> + public bool IsEmpty + { + // Checks whether the stack is empty. Clearly the answer may be out of date even prior to + // the function returning (i.e. if another thread concurrently adds to the stack). It does + // guarantee, however, that, if another thread does not mutate the stack, a subsequent call + // to TryPop will return true -- i.e. it will also read the stack as non-empty. + get { return m_head == null; } + } + + /// <summary> + /// Gets the number of elements contained in the <see cref="ConcurrentStack{T}"/>. + /// </summary> + /// <value>The number of elements contained in the <see cref="ConcurrentStack{T}"/>.</value> + /// <remarks> + /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/> + /// property is recommended rather than retrieving the number of items from the <see cref="Count"/> + /// property and comparing it to 0. + /// </remarks> + public int Count + { + // Counts the number of entries in the stack. This is an O(n) operation. The answer may be out + // of date before returning, but guarantees to return a count that was once valid. Conceptually, + // the implementation snaps a copy of the list and then counts the entries, though physically + // this is not what actually happens. + get + { + int count = 0; + + // Just whip through the list and tally up the number of nodes. We rely on the fact that + // node next pointers are immutable after being enqueued for the first time, even as + // they are being dequeued. If we ever changed this (e.g. to pool nodes somehow), + // we'd need to revisit this implementation. + + for (Node curr = m_head; curr != null; curr = curr.m_next) + { + count++; //we don't handle overflow, to be consistent with existing generic collection types in CLR + } + + return count; + } + } + + + /// <summary> + /// Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is + /// synchronized with the SyncRoot. + /// </summary> + /// <value>true if access to the <see cref="T:System.Collections.ICollection"/> is synchronized + /// with the SyncRoot; otherwise, false. For <see cref="ConcurrentStack{T}"/>, this property always + /// returns false.</value> + bool ICollection.IsSynchronized + { + // Gets a value indicating whether access to this collection is synchronized. Always returns + // false. The reason is subtle. While access is in face thread safe, it's not the case that + // locking on the SyncRoot would have prevented concurrent pushes and pops, as this property + // would typically indicate; that's because we internally use CAS operations vs. true locks. + get { return false; } + } + + /// <summary> + /// Gets an object that can be used to synchronize access to the <see + /// cref="T:System.Collections.ICollection"/>. This property is not supported. + /// </summary> + /// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported</exception> + object ICollection.SyncRoot + { + get + { + throw new NotSupportedException(Environment.GetResourceString("ConcurrentCollection_SyncRoot_NotSupported")); + } + } + + /// <summary> + /// Removes all objects from the <see cref="ConcurrentStack{T}"/>. + /// </summary> + public void Clear() + { + // Clear the list by setting the head to null. We don't need to use an atomic + // operation for this: anybody who is mutating the head by pushing or popping + // will need to use an atomic operation to guarantee they serialize and don't + // overwrite our setting of the head to null. + m_head = null; + } + + /// <summary> + /// Copies the elements of the <see cref="T:System.Collections.ICollection"/> to an <see + /// cref="T:System.Array"/>, starting at a particular + /// <see cref="T:System.Array"/> index. + /// </summary> + /// <param name="array">The one-dimensional <see cref="T:System.Array"/> that is the destination of + /// the elements copied from the + /// <see cref="ConcurrentStack{T}"/>. The <see cref="T:System.Array"/> must + /// have zero-based indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in + /// Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// zero.</exception> + /// <exception cref="ArgumentException"> + /// <paramref name="array"/> is multidimensional. -or- + /// <paramref name="array"/> does not have zero-based indexing. -or- + /// <paramref name="index"/> is equal to or greater than the length of the <paramref name="array"/> + /// -or- The number of elements in the source <see cref="T:System.Collections.ICollection"/> is + /// greater than the available space from <paramref name="index"/> to the end of the destination + /// <paramref name="array"/>. -or- The type of the source <see + /// cref="T:System.Collections.ICollection"/> cannot be cast automatically to the type of the + /// destination <paramref name="array"/>. + /// </exception> + void ICollection.CopyTo(Array array, int index) + { + // Validate arguments. + if (array == null) + { + throw new ArgumentNullException("array"); + } + + // We must be careful not to corrupt the array, so we will first accumulate an + // internal list of elements that we will then copy to the array. This requires + // some extra allocation, but is necessary since we don't know up front whether + // the array is sufficiently large to hold the stack's contents. + ((ICollection)ToList()).CopyTo(array, index); + } + + /// <summary> + /// Copies the <see cref="ConcurrentStack{T}"/> elements to an existing one-dimensional <see + /// cref="T:System.Array"/>, starting at the specified array index. + /// </summary> + /// <param name="array">The one-dimensional <see cref="T:System.Array"/> that is the destination of + /// the elements copied from the + /// <see cref="ConcurrentStack{T}"/>. The <see cref="T:System.Array"/> must have zero-based + /// indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in + /// Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// zero.</exception> + /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the + /// length of the <paramref name="array"/> + /// -or- The number of elements in the source <see cref="ConcurrentStack{T}"/> is greater than the + /// available space from <paramref name="index"/> to the end of the destination <paramref + /// name="array"/>. + /// </exception> + public void CopyTo(T[] array, int index) + { + if (array == null) + { + throw new ArgumentNullException("array"); + } + + // We must be careful not to corrupt the array, so we will first accumulate an + // internal list of elements that we will then copy to the array. This requires + // some extra allocation, but is necessary since we don't know up front whether + // the array is sufficiently large to hold the stack's contents. + ToList().CopyTo(array, index); + } + + + /// <summary> + /// Inserts an object at the top of the <see cref="ConcurrentStack{T}"/>. + /// </summary> + /// <param name="item">The object to push onto the <see cref="ConcurrentStack{T}"/>. The value can be + /// a null reference (Nothing in Visual Basic) for reference types. + /// </param> + public void Push(T item) + { + // Pushes a node onto the front of the stack thread-safely. Internally, this simply + // swaps the current head pointer using a (thread safe) CAS operation to accomplish + // lock freedom. If the CAS fails, we add some back off to statistically decrease + // contention at the head, and then go back around and retry. + + Node newNode = new Node(item); + newNode.m_next = m_head; + if (Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next) == newNode.m_next) + { + return; + } + + // If we failed, go to the slow path and loop around until we succeed. + PushCore(newNode, newNode); + } + + /// <summary> + /// Inserts multiple objects at the top of the <see cref="ConcurrentStack{T}"/> atomically. + /// </summary> + /// <param name="items">The objects to push onto the <see cref="ConcurrentStack{T}"/>.</param> + /// <exception cref="ArgumentNullException"><paramref name="items"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <remarks> + /// When adding multiple items to the stack, using PushRange is a more efficient + /// mechanism than using <see cref="Push"/> one item at a time. Additionally, PushRange + /// guarantees that all of the elements will be added atomically, meaning that no other threads will + /// be able to inject elements between the elements being pushed. Items at lower indices in + /// the <paramref name="items"/> array will be pushed before items at higher indices. + /// </remarks> + public void PushRange(T[] items) + { + if (items == null) + { + throw new ArgumentNullException("items"); + } + PushRange(items, 0, items.Length); + } + + /// <summary> + /// Inserts multiple objects at the top of the <see cref="ConcurrentStack{T}"/> atomically. + /// </summary> + /// <param name="items">The objects to push onto the <see cref="ConcurrentStack{T}"/>.</param> + /// <param name="startIndex">The zero-based offset in <paramref name="items"/> at which to begin + /// inserting elements onto the top of the <see cref="ConcurrentStack{T}"/>.</param> + /// <param name="count">The number of elements to be inserted onto the top of the <see + /// cref="ConcurrentStack{T}"/>.</param> + /// <exception cref="ArgumentNullException"><paramref name="items"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="startIndex"/> or <paramref + /// name="count"/> is negative. Or <paramref name="startIndex"/> is greater than or equal to the length + /// of <paramref name="items"/>.</exception> + /// <exception cref="ArgumentException"><paramref name="startIndex"/> + <paramref name="count"/> is + /// greater than the length of <paramref name="items"/>.</exception> + /// <remarks> + /// When adding multiple items to the stack, using PushRange is a more efficient + /// mechanism than using <see cref="Push"/> one item at a time. Additionally, PushRange + /// guarantees that all of the elements will be added atomically, meaning that no other threads will + /// be able to inject elements between the elements being pushed. Items at lower indices in the + /// <paramref name="items"/> array will be pushed before items at higher indices. + /// </remarks> + public void PushRange(T[] items, int startIndex, int count) + { + ValidatePushPopRangeInput(items, startIndex, count); + + // No op if the count is zero + if (count == 0) + return; + + + Node head, tail; + head = tail = new Node(items[startIndex]); + for (int i = startIndex + 1; i < startIndex + count; i++) + { + Node node = new Node(items[i]); + node.m_next = head; + head = node; + } + + tail.m_next = m_head; + if (Interlocked.CompareExchange(ref m_head, head, tail.m_next) == tail.m_next) + { + return; + } + + // If we failed, go to the slow path and loop around until we succeed. + PushCore(head, tail); + + } + + + /// <summary> + /// Push one or many nodes into the stack, if head and tails are equal then push one node to the stack other wise push the list between head + /// and tail to the stack + /// </summary> + /// <param name="head">The head pointer to the new list</param> + /// <param name="tail">The tail pointer to the new list</param> + private void PushCore(Node head, Node tail) + { + SpinWait spin = new SpinWait(); + + // Keep trying to CAS the exising head with the new node until we succeed. + do + { + spin.SpinOnce(); + // Reread the head and link our new node. + tail.m_next = m_head; + } + while (Interlocked.CompareExchange( + ref m_head, head, tail.m_next) != tail.m_next); + +#if !FEATURE_CORECLR + if (CDSCollectionETWBCLProvider.Log.IsEnabled()) + { + CDSCollectionETWBCLProvider.Log.ConcurrentStack_FastPushFailed(spin.Count); + } +#endif // !FEATURE_CORECLR + } + + /// <summary> + /// Local helper function to validate the Pop Push range methods input + /// </summary> + private void ValidatePushPopRangeInput(T[] items, int startIndex, int count) + { + if (items == null) + { + throw new ArgumentNullException("items"); + } + if (count < 0) + { + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ConcurrentStack_PushPopRange_CountOutOfRange")); + } + int length = items.Length; + if (startIndex >= length || startIndex < 0) + { + throw new ArgumentOutOfRangeException("startIndex", Environment.GetResourceString("ConcurrentStack_PushPopRange_StartOutOfRange")); + } + if (length - count < startIndex) //instead of (startIndex + count > items.Length) to prevent overflow + { + throw new ArgumentException(Environment.GetResourceString("ConcurrentStack_PushPopRange_InvalidCount")); + } + } + + /// <summary> + /// Attempts to add an object to the <see + /// cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. + /// </summary> + /// <param name="item">The object to add to the <see + /// cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. The value can be a null + /// reference (Nothing in Visual Basic) for reference types. + /// </param> + /// <returns>true if the object was added successfully; otherwise, false.</returns> + /// <remarks>For <see cref="ConcurrentStack{T}"/>, this operation + /// will always insert the object onto the top of the <see cref="ConcurrentStack{T}"/> + /// and return true.</remarks> + bool IProducerConsumerCollection<T>.TryAdd(T item) + { + Push(item); + return true; + } + + /// <summary> + /// Attempts to return an object from the top of the <see cref="ConcurrentStack{T}"/> + /// without removing it. + /// </summary> + /// <param name="result">When this method returns, <paramref name="result"/> contains an object from + /// the top of the <see cref="T:System.Collections.Concurrent.ConccurrentStack{T}"/> or an + /// unspecified value if the operation failed.</param> + /// <returns>true if and object was returned successfully; otherwise, false.</returns> + public bool TryPeek(out T result) + { + Node head = m_head; + + // If the stack is empty, return false; else return the element and true. + if (head == null) + { + result = default(T); + return false; + } + else + { + result = head.m_value; + return true; + } + } + + /// <summary> + /// Attempts to pop and return the object at the top of the <see cref="ConcurrentStack{T}"/>. + /// </summary> + /// <param name="result"> + /// When this method returns, if the operation was successful, <paramref name="result"/> contains the + /// object removed. If no object was available to be removed, the value is unspecified. + /// </param> + /// <returns>true if an element was removed and returned from the top of the <see + /// cref="ConcurrentStack{T}"/> + /// succesfully; otherwise, false.</returns> + public bool TryPop(out T result) + { + Node head = m_head; + //stack is empty + if (head == null) + { + result = default(T); + return false; + } + if (Interlocked.CompareExchange(ref m_head, head.m_next, head) == head) + { + result = head.m_value; + return true; + } + + // Fall through to the slow path. + return TryPopCore(out result); + } + + /// <summary> + /// Attempts to pop and return multiple objects from the top of the <see cref="ConcurrentStack{T}"/> + /// atomically. + /// </summary> + /// <param name="items"> + /// The <see cref="T:System.Array"/> to which objects popped from the top of the <see + /// cref="ConcurrentStack{T}"/> will be added. + /// </param> + /// <returns>The number of objects successfully popped from the top of the <see + /// cref="ConcurrentStack{T}"/> and inserted in + /// <paramref name="items"/>.</returns> + /// <exception cref="ArgumentNullException"><paramref name="items"/> is a null argument (Nothing + /// in Visual Basic).</exception> + /// <remarks> + /// When popping multiple items, if there is little contention on the stack, using + /// TryPopRange can be more efficient than using <see cref="TryPop"/> + /// once per item to be removed. Nodes fill the <paramref name="items"/> + /// with the first node to be popped at the startIndex, the second node to be popped + /// at startIndex + 1, and so on. + /// </remarks> + public int TryPopRange(T[] items) + { + if (items == null) + { + throw new ArgumentNullException("items"); + } + + return TryPopRange(items, 0, items.Length); + } + + /// <summary> + /// Attempts to pop and return multiple objects from the top of the <see cref="ConcurrentStack{T}"/> + /// atomically. + /// </summary> + /// <param name="items"> + /// The <see cref="T:System.Array"/> to which objects popped from the top of the <see + /// cref="ConcurrentStack{T}"/> will be added. + /// </param> + /// <param name="startIndex">The zero-based offset in <paramref name="items"/> at which to begin + /// inserting elements from the top of the <see cref="ConcurrentStack{T}"/>.</param> + /// <param name="count">The number of elements to be popped from top of the <see + /// cref="ConcurrentStack{T}"/> and inserted into <paramref name="items"/>.</param> + /// <returns>The number of objects successfully popped from the top of + /// the <see cref="ConcurrentStack{T}"/> and inserted in <paramref name="items"/>.</returns> + /// <exception cref="ArgumentNullException"><paramref name="items"/> is a null reference + /// (Nothing in Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="startIndex"/> or <paramref + /// name="count"/> is negative. Or <paramref name="startIndex"/> is greater than or equal to the length + /// of <paramref name="items"/>.</exception> + /// <exception cref="ArgumentException"><paramref name="startIndex"/> + <paramref name="count"/> is + /// greater than the length of <paramref name="items"/>.</exception> + /// <remarks> + /// When popping multiple items, if there is little contention on the stack, using + /// TryPopRange can be more efficient than using <see cref="TryPop"/> + /// once per item to be removed. Nodes fill the <paramref name="items"/> + /// with the first node to be popped at the startIndex, the second node to be popped + /// at startIndex + 1, and so on. + /// </remarks> + public int TryPopRange(T[] items, int startIndex, int count) + { + ValidatePushPopRangeInput(items, startIndex, count); + + // No op if the count is zero + if (count == 0) + return 0; + + Node poppedHead; + int nodesCount = TryPopCore(count, out poppedHead); + if (nodesCount > 0) + { + CopyRemovedItems(poppedHead, items, startIndex, nodesCount); + + } + return nodesCount; + + } + + /// <summary> + /// Local helper function to Pop an item from the stack, slow path + /// </summary> + /// <param name="result">The popped item</param> + /// <returns>True if succeeded, false otherwise</returns> + private bool TryPopCore(out T result) + { + Node poppedNode; + + if (TryPopCore(1, out poppedNode) == 1) + { + result = poppedNode.m_value; + return true; + } + + result = default(T); + return false; + + } + + /// <summary> + /// Slow path helper for TryPop. This method assumes an initial attempt to pop an element + /// has already occurred and failed, so it begins spinning right away. + /// </summary> + /// <param name="count">The number of items to pop.</param> + /// <param name="poppedHead"> + /// When this method returns, if the pop succeeded, contains the removed object. If no object was + /// available to be removed, the value is unspecified. This parameter is passed uninitialized. + /// </param> + /// <returns>True if an element was removed and returned; otherwise, false.</returns> + private int TryPopCore(int count, out Node poppedHead) + { + SpinWait spin = new SpinWait(); + + // Try to CAS the head with its current next. We stop when we succeed or + // when we notice that the stack is empty, whichever comes first. + Node head; + Node next; + int backoff = 1; + Random r = new Random(Environment.TickCount & Int32.MaxValue); // avoid the case where TickCount could return Int32.MinValue + while (true) + { + head = m_head; + // Is the stack empty? + if (head == null) + { +#if !FEATURE_CORECLR + if (count == 1 && CDSCollectionETWBCLProvider.Log.IsEnabled()) + { + CDSCollectionETWBCLProvider.Log.ConcurrentStack_FastPopFailed(spin.Count); + } +#endif //!FEATURE_CORECLR + poppedHead = null; + return 0; + } + next = head; + int nodesCount = 1; + for (; nodesCount < count && next.m_next != null; nodesCount++) + { + next = next.m_next; + } + + // Try to swap the new head. If we succeed, break out of the loop. + if (Interlocked.CompareExchange(ref m_head, next.m_next, head) == head) + { +#if !FEATURE_CORECLR + if (count == 1 && CDSCollectionETWBCLProvider.Log.IsEnabled()) + { + CDSCollectionETWBCLProvider.Log.ConcurrentStack_FastPopFailed(spin.Count); + } +#endif //!FEATURE_CORECLR + // Return the popped Node. + poppedHead = head; + return nodesCount; + } + + // We failed to CAS the new head. Spin briefly and retry. + for (int i = 0; i < backoff; i++) + { + spin.SpinOnce(); + } + + backoff = spin.NextSpinWillYield ? r.Next(1, BACKOFF_MAX_YIELDS) : backoff * 2; + } + } + + + /// <summary> + /// Local helper function to copy the poped elements into a given collection + /// </summary> + /// <param name="head">The head of the list to be copied</param> + /// <param name="collection">The collection to place the popped items in</param> + /// <param name="startIndex">the beginning of index of where to place the popped items</param> + /// <param name="nodesCount">The number of nodes.</param> + private void CopyRemovedItems(Node head, T[] collection, int startIndex, int nodesCount) + { + Node current = head; + for (int i = startIndex; i < startIndex + nodesCount; i++) + { + collection[i] = current.m_value; + current = current.m_next; + } + + } + + /// <summary> + /// Attempts to remove and return an object from the <see + /// cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. + /// </summary> + /// <param name="item"> + /// When this method returns, if the operation was successful, <paramref name="item"/> contains the + /// object removed. If no object was available to be removed, the value is unspecified. + /// </param> + /// <returns>true if an element was removed and returned succesfully; otherwise, false.</returns> + /// <remarks>For <see cref="ConcurrentStack{T}"/>, this operation will attempt to pope the object at + /// the top of the <see cref="ConcurrentStack{T}"/>. + /// </remarks> + bool IProducerConsumerCollection<T>.TryTake(out T item) + { + return TryPop(out item); + } + + /// <summary> + /// Copies the items stored in the <see cref="ConcurrentStack{T}"/> to a new array. + /// </summary> + /// <returns>A new array containing a snapshot of elements copied from the <see + /// cref="ConcurrentStack{T}"/>.</returns> + public T[] ToArray() + { + return ToList().ToArray(); + } + + /// <summary> + /// Returns an array containing a snapshot of the list's contents, using + /// the target list node as the head of a region in the list. + /// </summary> + /// <returns>An array of the list's contents.</returns> + private List<T> ToList() + { + List<T> list = new List<T>(); + + Node curr = m_head; + while (curr != null) + { + list.Add(curr.m_value); + curr = curr.m_next; + } + + return list; + } + + /// <summary> + /// Returns an enumerator that iterates through the <see cref="ConcurrentStack{T}"/>. + /// </summary> + /// <returns>An enumerator for the <see cref="ConcurrentStack{T}"/>.</returns> + /// <remarks> + /// The enumeration represents a moment-in-time snapshot of the contents + /// of the stack. It does not reflect any updates to the collection after + /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use + /// concurrently with reads from and writes to the stack. + /// </remarks> + public IEnumerator<T> GetEnumerator() + { + // Returns an enumerator for the stack. This effectively takes a snapshot + // of the stack's contents at the time of the call, i.e. subsequent modifications + // (pushes or pops) will not be reflected in the enumerator's contents. + + //If we put yield-return here, the iterator will be lazily evaluated. As a result a snapshot of + //the stack is not taken when GetEnumerator is initialized but when MoveNext() is first called. + //This is inconsistent with existing generic collections. In order to prevent it, we capture the + //value of m_head in a buffer and call out to a helper method + return GetEnumerator(m_head); + } + + private IEnumerator<T> GetEnumerator(Node head) + { + Node current = head; + while (current != null) + { + yield return current.m_value; + current = current.m_next; + } + } + + /// <summary> + /// Returns an enumerator that iterates through a collection. + /// </summary> + /// <returns>An <see cref="T:System.Collections.IEnumerator"/> that can be used to iterate through + /// the collection.</returns> + /// <remarks> + /// The enumeration represents a moment-in-time snapshot of the contents of the stack. It does not + /// reflect any updates to the collection after + /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use concurrently with reads + /// from and writes to the stack. + /// </remarks> + IEnumerator IEnumerable.GetEnumerator() + { + return ((IEnumerable<T>)this).GetEnumerator(); + } + } +} diff --git a/src/mscorlib/src/System/Collections/Concurrent/IProducerConsumerCollection.cs b/src/mscorlib/src/System/Collections/Concurrent/IProducerConsumerCollection.cs new file mode 100644 index 0000000000..a74f69069a --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/IProducerConsumerCollection.cs @@ -0,0 +1,116 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// A common interface for all concurrent collections. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; + +namespace System.Collections.Concurrent +{ + + /// <summary> + /// Defines methods to manipulate thread-safe collections intended for producer/consumer usage. + /// </summary> + /// <typeparam name="T">Specifies the type of elements in the collection.</typeparam> + /// <remarks> + /// All implementations of this interface must enable all members of this interface + /// to be used concurrently from multiple threads. + /// </remarks> + public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection + { + + /// <summary> + /// Copies the elements of the <see cref="IProducerConsumerCollection{T}"/> to + /// an + /// <see cref="T:System.Array"/>, starting at a specified index. + /// </summary> + /// <param name="array">The one-dimensional <see cref="T:System.Array"/> that is the destination of + /// the elements copied from the <see cref="IProducerConsumerCollection{T}"/>. + /// The array must have zero-based indexing.</param> + /// <param name="index">The zero-based index in <paramref name="array"/> at which copying + /// begins.</param> + /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in + /// Visual Basic).</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than + /// zero.</exception> + /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the + /// length of the <paramref name="array"/> + /// -or- The number of elements in the source <see cref="ConcurrentQueue{T}"/> is greater than the + /// available space from <paramref name="index"/> to the end of the destination <paramref + /// name="array"/>. + /// </exception> + void CopyTo(T[] array, int index); + + /// <summary> + /// Attempts to add an object to the <see + /// cref="IProducerConsumerCollection{T}"/>. + /// </summary> + /// <param name="item">The object to add to the <see + /// cref="IProducerConsumerCollection{T}"/>.</param> + /// <returns>true if the object was added successfully; otherwise, false.</returns> + /// <exception cref="T:System.ArgumentException">The <paramref name="item"/> was invalid for this collection.</exception> + bool TryAdd(T item); + + /// <summary> + /// Attempts to remove and return an object from the <see cref="IProducerConsumerCollection{T}"/>. + /// </summary> + /// <param name="item"> + /// When this method returns, if the object was removed and returned successfully, <paramref + /// name="item"/> contains the removed object. If no object was available to be removed, the value is + /// unspecified. + /// </param> + /// <returns>true if an object was removed and returned successfully; otherwise, false.</returns> + bool TryTake(out T item); + + /// <summary> + /// Copies the elements contained in the <see cref="IProducerConsumerCollection{T}"/> to a new array. + /// </summary> + /// <returns>A new array containing the elements copied from the <see cref="IProducerConsumerCollection{T}"/>.</returns> + T[] ToArray(); + + } + + + /// <summary> + /// A debugger view of the IProducerConsumerCollection that makes it simple to browse the + /// collection's contents at a point in time. + /// </summary> + /// <typeparam name="T">The type of elements stored within.</typeparam> + internal sealed class SystemCollectionsConcurrent_ProducerConsumerCollectionDebugView<T> + { + private IProducerConsumerCollection<T> m_collection; // The collection being viewed. + + /// <summary> + /// Constructs a new debugger view object for the provided collection object. + /// </summary> + /// <param name="collection">A collection to browse in the debugger.</param> + public SystemCollectionsConcurrent_ProducerConsumerCollectionDebugView(IProducerConsumerCollection<T> collection) + { + if (collection == null) + { + throw new ArgumentNullException("collection"); + } + + m_collection = collection; + } + + /// <summary> + /// Returns a snapshot of the underlying collection's elements. + /// </summary> + [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] + public T[] Items + { + get { return m_collection.ToArray(); } + } + + } +} diff --git a/src/mscorlib/src/System/Collections/Concurrent/OrderablePartitioner.cs b/src/mscorlib/src/System/Collections/Concurrent/OrderablePartitioner.cs new file mode 100644 index 0000000000..02263b7f97 --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/OrderablePartitioner.cs @@ -0,0 +1,281 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System; +using System.Collections.Generic; +using System.Security.Permissions; +using System.Threading; + +namespace System.Collections.Concurrent +{ + + /// <summary> + /// Represents a particular manner of splitting an orderable data source into multiple partitions. + /// </summary> + /// <typeparam name="TSource">Type of the elements in the collection.</typeparam> + /// <remarks> + /// <para> + /// Each element in each partition has an integer index associated with it, which determines the relative + /// order of that element against elements in other partitions. + /// </para> + /// <para> + /// Inheritors of <see cref="OrderablePartitioner{TSource}"/> must adhere to the following rules: + /// <ol> + /// <li>All indices must be unique, such that there may not be duplicate indices. If all indices are not + /// unique, the output ordering may be scrambled.</li> + /// <li>All indices must be non-negative. If any indices are negative, consumers of the implementation + /// may throw exceptions.</li> + /// <li><see cref="GetPartitions"/> and <see cref="GetOrderablePartitions"/> should throw a + /// <see cref="T:System.ArgumentOutOfRangeException"/> if the requested partition count is less than or + /// equal to zero.</li> + /// <li><see cref="GetPartitions"/> and <see cref="GetOrderablePartitions"/> should always return a number + /// of enumerables equal to the requested partition count. If the partitioner runs out of data and cannot + /// create as many partitions as requested, an empty enumerator should be returned for each of the + /// remaining partitions. If this rule is not followed, consumers of the implementation may throw a <see + /// cref="T:System.InvalidOperationException"/>.</li> + /// <li><see cref="GetPartitions"/>, <see cref="GetOrderablePartitions"/>, + /// <see cref="GetDynamicPartitions"/>, and <see cref="GetOrderableDynamicPartitions"/> + /// should never return null. If null is returned, a consumer of the implementation may throw a + /// <see cref="T:System.InvalidOperationException"/>.</li> + /// <li><see cref="GetPartitions"/>, <see cref="GetOrderablePartitions"/>, + /// <see cref="GetDynamicPartitions"/>, and <see cref="GetOrderableDynamicPartitions"/> + /// should always return partitions that can fully and uniquely enumerate the input data source. All of + /// the data and only the data contained in the input source should be enumerated, with no duplication + /// that was not already in the input, unless specifically required by the particular partitioner's + /// design. If this is not followed, the output ordering may be scrambled.</li> + /// <li>If <see cref="KeysOrderedInEachPartition"/> returns true, each partition must return elements + /// with increasing key indices.</li> + /// <li>If <see cref="KeysOrderedAcrossPartitions"/> returns true, all the keys in partition numbered N + /// must be larger than all the keys in partition numbered N-1.</li> + /// <li>If <see cref="KeysNormalized"/> returns true, all indices must be monotonically increasing from + /// 0, though not necessarily within a single partition.</li> + /// </ol> + /// </para> + /// </remarks> + [HostProtection(Synchronization = true, ExternalThreading = true)] + public abstract class OrderablePartitioner<TSource> : Partitioner<TSource> + { + /// <summary> + /// Initializes a new instance of the <see cref="OrderablePartitioner{TSource}"/> class with the + /// specified constraints on the index keys. + /// </summary> + /// <param name="keysOrderedInEachPartition"> + /// Indicates whether the elements in each partition are yielded in the order of + /// increasing keys. + /// </param> + /// <param name="keysOrderedAcrossPartitions"> + /// Indicates whether elements in an earlier partition always come before + /// elements in a later partition. If true, each element in partition 0 has a smaller order key than + /// any element in partition 1, each element in partition 1 has a smaller order key than any element + /// in partition 2, and so on. + /// </param> + /// <param name="keysNormalized"> + /// Indicates whether keys are normalized. If true, all order keys are distinct + /// integers in the range [0 .. numberOfElements-1]. If false, order keys must still be dictinct, but + /// only their relative order is considered, not their absolute values. + /// </param> + protected OrderablePartitioner(bool keysOrderedInEachPartition, bool keysOrderedAcrossPartitions, bool keysNormalized) + { + KeysOrderedInEachPartition = keysOrderedInEachPartition; + KeysOrderedAcrossPartitions = keysOrderedAcrossPartitions; + KeysNormalized = keysNormalized; + } + + /// <summary> + /// Partitions the underlying collection into the specified number of orderable partitions. + /// </summary> + /// <remarks> + /// Each partition is represented as an enumerator over key-value pairs. + /// The value of the pair is the element itself, and the key is an integer which determines + /// the relative ordering of this element against other elements in the data source. + /// </remarks> + /// <param name="partitionCount">The number of partitions to create.</param> + /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns> + public abstract IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount); + + /// <summary> + /// Creates an object that can partition the underlying collection into a variable number of + /// partitions. + /// </summary> + /// <remarks> + /// <para> + /// The returned object implements the <see + /// cref="T:System.Collections.Generic.IEnumerable{TSource}"/> interface. Calling <see + /// cref="System.Collections.Generic.IEnumerable{TSource}.GetEnumerator">GetEnumerator</see> on the + /// object creates another partition over the sequence. + /// </para> + /// <para> + /// Each partition is represented as an enumerator over key-value pairs. The value in the pair is the element + /// itself, and the key is an integer which determines the relative ordering of this element against + /// other elements. + /// </para> + /// <para> + /// The <see cref="GetOrderableDynamicPartitions"/> method is only supported if the <see + /// cref="System.Collections.Concurrent.Partitioner{TSource}.SupportsDynamicPartitions">SupportsDynamicPartitions</see> + /// property returns true. + /// </para> + /// </remarks> + /// <returns>An object that can create partitions over the underlying data source.</returns> + /// <exception cref="NotSupportedException">Dynamic partitioning is not supported by this + /// partitioner.</exception> + public virtual IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions() + { + throw new NotSupportedException(Environment.GetResourceString("Partitioner_DynamicPartitionsNotSupported")); + } + + /// <summary> + /// Gets whether elements in each partition are yielded in the order of increasing keys. + /// </summary> + public bool KeysOrderedInEachPartition { get; private set; } + + /// <summary> + /// Gets whether elements in an earlier partition always come before elements in a later partition. + /// </summary> + /// <remarks> + /// If <see cref="KeysOrderedAcrossPartitions"/> returns true, each element in partition 0 has a + /// smaller order key than any element in partition 1, each element in partition 1 has a smaller + /// order key than any element in partition 2, and so on. + /// </remarks> + public bool KeysOrderedAcrossPartitions { get; private set; } + + /// <summary> + /// Gets whether order keys are normalized. + /// </summary> + /// <remarks> + /// If <see cref="KeysNormalized"/> returns true, all order keys are distinct integers in the range + /// [0 .. numberOfElements-1]. If the property returns false, order keys must still be dictinct, but + /// only their relative order is considered, not their absolute values. + /// </remarks> + public bool KeysNormalized { get; private set; } + + /// <summary> + /// Partitions the underlying collection into the given number of ordered partitions. + /// </summary> + /// <remarks> + /// The default implementation provides the same behavior as <see cref="GetOrderablePartitions"/> except + /// that the returned set of partitions does not provide the keys for the elements. + /// </remarks> + /// <param name="partitionCount">The number of partitions to create.</param> + /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns> + public override IList<IEnumerator<TSource>> GetPartitions(int partitionCount) + { + IList<IEnumerator<KeyValuePair<long, TSource>>> orderablePartitions = GetOrderablePartitions(partitionCount); + + if (orderablePartitions.Count != partitionCount) + { + throw new InvalidOperationException("OrderablePartitioner_GetPartitions_WrongNumberOfPartitions"); + } + + IEnumerator<TSource>[] partitions = new IEnumerator<TSource>[partitionCount]; + for (int i = 0; i < partitionCount; i++) + { + partitions[i] = new EnumeratorDropIndices(orderablePartitions[i]); + } + return partitions; + } + + /// <summary> + /// Creates an object that can partition the underlying collection into a variable number of + /// partitions. + /// </summary> + /// <remarks> + /// <para> + /// The returned object implements the <see + /// cref="T:System.Collections.Generic.IEnumerable{TSource}"/> interface. Calling <see + /// cref="System.Collections.Generic.IEnumerable{TSource}.GetEnumerator">GetEnumerator</see> on the + /// object creates another partition over the sequence. + /// </para> + /// <para> + /// The default implementation provides the same behavior as <see cref="GetOrderableDynamicPartitions"/> except + /// that the returned set of partitions does not provide the keys for the elements. + /// </para> + /// <para> + /// The <see cref="GetDynamicPartitions"/> method is only supported if the <see + /// cref="System.Collections.Concurrent.Partitioner{TSource}.SupportsDynamicPartitions"/> + /// property returns true. + /// </para> + /// </remarks> + /// <returns>An object that can create partitions over the underlying data source.</returns> + /// <exception cref="NotSupportedException">Dynamic partitioning is not supported by this + /// partitioner.</exception> + public override IEnumerable<TSource> GetDynamicPartitions() + { + IEnumerable<KeyValuePair<long, TSource>> orderablePartitions = GetOrderableDynamicPartitions(); + return new EnumerableDropIndices(orderablePartitions); + } + + /// <summary> + /// Converts an enumerable over key-value pairs to an enumerable over values. + /// </summary> + private class EnumerableDropIndices : IEnumerable<TSource>, IDisposable + { + private readonly IEnumerable<KeyValuePair<long, TSource>> m_source; + public EnumerableDropIndices(IEnumerable<KeyValuePair<long, TSource>> source) + { + m_source = source; + } + public IEnumerator<TSource> GetEnumerator() + { + return new EnumeratorDropIndices(m_source.GetEnumerator()); + } + IEnumerator IEnumerable.GetEnumerator() + { + return ((EnumerableDropIndices)this).GetEnumerator(); + } + public void Dispose() + { + IDisposable d = m_source as IDisposable; + if (d != null) + { + d.Dispose(); + } + } + } + + private class EnumeratorDropIndices : IEnumerator<TSource> + { + private readonly IEnumerator<KeyValuePair<long, TSource>> m_source; + public EnumeratorDropIndices(IEnumerator<KeyValuePair<long, TSource>> source) + { + m_source = source; + } + public bool MoveNext() + { + return m_source.MoveNext(); + } + public TSource Current + { + get + { + return m_source.Current.Value; + } + } + Object IEnumerator.Current + { + get + { + return ((EnumeratorDropIndices)this).Current; + } + } + public void Dispose() + { + m_source.Dispose(); + } + public void Reset() + { + m_source.Reset(); + } + } + + } + +} diff --git a/src/mscorlib/src/System/Collections/Concurrent/Partitioner.cs b/src/mscorlib/src/System/Collections/Concurrent/Partitioner.cs new file mode 100644 index 0000000000..3d54c1471b --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/Partitioner.cs @@ -0,0 +1,102 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// Represents a particular way of splitting a collection into multiple partitions. +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System; +using System.Collections.Generic; +using System.Security.Permissions; +using System.Threading; + +namespace System.Collections.Concurrent +{ + /// <summary> + /// Represents a particular manner of splitting a data source into multiple partitions. + /// </summary> + /// <typeparam name="TSource">Type of the elements in the collection.</typeparam> + /// <remarks> + /// <para> + /// Inheritors of <see cref="Partitioner{TSource}"/> must adhere to the following rules: + /// <ol> + /// <li><see cref="GetPartitions"/> should throw a + /// <see cref="T:System.ArgumentOutOfRangeException"/> if the requested partition count is less than or + /// equal to zero.</li> + /// <li><see cref="GetPartitions"/> should always return a number of enumerables equal to the requested + /// partition count. If the partitioner runs out of data and cannot create as many partitions as + /// requested, an empty enumerator should be returned for each of the remaining partitions. If this rule + /// is not followed, consumers of the implementation may throw a <see + /// cref="T:System.InvalidOperationException"/>.</li> + /// <li><see cref="GetPartitions"/> and <see cref="GetDynamicPartitions"/> + /// should never return null. If null is returned, a consumer of the implementation may throw a + /// <see cref="T:System.InvalidOperationException"/>.</li> + /// <li><see cref="GetPartitions"/> and <see cref="GetDynamicPartitions"/> should always return + /// partitions that can fully and uniquely enumerate the input data source. All of the data and only the + /// data contained in the input source should be enumerated, with no duplication that was not already in + /// the input, unless specifically required by the particular partitioner's design. If this is not + /// followed, the output ordering may be scrambled.</li> + /// </ol> + /// </para> + /// </remarks> + [HostProtection(Synchronization = true, ExternalThreading = true)] + public abstract class Partitioner<TSource> + { + /// <summary> + /// Partitions the underlying collection into the given number of partitions. + /// </summary> + /// <param name="partitionCount">The number of partitions to create.</param> + /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns> + public abstract IList<IEnumerator<TSource>> GetPartitions(int partitionCount); + + /// <summary> + /// Gets whether additional partitions can be created dynamically. + /// </summary> + /// <returns> + /// true if the <see cref="Partitioner{TSource}"/> can create partitions dynamically as they are + /// requested; false if the <see cref="Partitioner{TSource}"/> can only allocate + /// partitions statically. + /// </returns> + /// <remarks> + /// <para> + /// If a derived class does not override and implement <see cref="GetDynamicPartitions"/>, + /// <see cref="SupportsDynamicPartitions"/> should return false. The value of <see + /// cref="SupportsDynamicPartitions"/> should not vary over the lifetime of this instance. + /// </para> + /// </remarks> + public virtual bool SupportsDynamicPartitions + { + get { return false; } + } + + /// <summary> + /// Creates an object that can partition the underlying collection into a variable number of + /// partitions. + /// </summary> + /// <remarks> + /// <para> + /// The returned object implements the <see + /// cref="T:System.Collections.Generic.IEnumerable{TSource}"/> interface. Calling <see + /// cref="System.Collections.Generic.IEnumerable{TSource}.GetEnumerator">GetEnumerator</see> on the + /// object creates another partition over the sequence. + /// </para> + /// <para> + /// The <see cref="GetDynamicPartitions"/> method is only supported if the <see + /// cref="SupportsDynamicPartitions"/> + /// property returns true. + /// </para> + /// </remarks> + /// <returns>An object that can create partitions over the underlying data source.</returns> + /// <exception cref="NotSupportedException">Dynamic partitioning is not supported by this + /// partitioner.</exception> + public virtual IEnumerable<TSource> GetDynamicPartitions() + { + throw new NotSupportedException(Environment.GetResourceString("Partitioner_DynamicPartitionsNotSupported")); + } + } +} diff --git a/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs b/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs new file mode 100644 index 0000000000..2169c6dee7 --- /dev/null +++ b/src/mscorlib/src/System/Collections/Concurrent/PartitionerStatic.cs @@ -0,0 +1,1733 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. +#pragma warning disable 0420 + +// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// +// +// +// A class of default partitioners for Partitioner<TSource> +// +// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +using System.Collections.Generic; +using System.Security.Permissions; +using System.Threading; +using System.Diagnostics.Contracts; +using System.Runtime.InteropServices; + +namespace System.Collections.Concurrent +{ + /// <summary> + /// Out-of-the-box partitioners are created with a set of default behaviors. + /// For example, by default, some form of buffering and chunking will be employed to achieve + /// optimal performance in the common scenario where an IEnumerable<> implementation is fast and + /// non-blocking. These behaviors can be overridden via this enumeration. + /// </summary> + [Flags] +#if !FEATURE_CORECLR + [Serializable] +#endif + public enum EnumerablePartitionerOptions + { + /// <summary> + /// Use the default behavior (i.e., use buffering to achieve optimal performance) + /// </summary> + None = 0x0, + + /// <summary> + /// Creates a partitioner that will take items from the source enumerable one at a time + /// and will not use intermediate storage that can be accessed more efficiently by multiple threads. + /// This option provides support for low latency (items will be processed as soon as they are available from + /// the source) and partial support for dependencies between items (a thread cannot deadlock waiting for an item + /// that it, itself, is responsible for processing). + /// </summary> + NoBuffering = 0x1 + } + + // The static class Partitioners implements 3 default partitioning strategies: + // 1. dynamic load balance partitioning for indexable data source (IList and arrays) + // 2. static partitioning for indexable data source (IList and arrays) + // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order + // of elements, but enuemrators are not indexable + // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3. + // We assume that the source data of IList/Array is not changing concurrently. + // - data source of type IEnumerable can only be partitioned dynamically (load-balance) + // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the + // implementation is different for data source of IList/Array vs. IEnumerable: + // * When the source collection is IList/Arrays, we use Interlocked on the shared index; + // * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source + // enumerator. + + /// <summary> + /// Provides common partitioning strategies for arrays, lists, and enumerables. + /// </summary> + /// <remarks> + /// <para> + /// The static methods on <see cref="Partitioner"/> are all thread-safe and may be used concurrently + /// from multiple threads. However, while a created partitioner is in use, the underlying data source + /// should not be modified, whether from the same thread that's using a partitioner or from a separate + /// thread. + /// </para> + /// </remarks> + [HostProtection(Synchronization = true, ExternalThreading = true)] + public static class Partitioner + { + /// <summary> + /// Creates an orderable partitioner from an <see cref="System.Collections.Generic.IList{T}"/> + /// instance. + /// </summary> + /// <typeparam name="TSource">Type of the elements in source list.</typeparam> + /// <param name="list">The list to be partitioned.</param> + /// <param name="loadBalance"> + /// A Boolean value that indicates whether the created partitioner should dynamically + /// load balance between partitions rather than statically partition. + /// </param> + /// <returns> + /// An orderable partitioner based on the input list. + /// </returns> + public static OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool loadBalance) + { + if (list == null) + { + throw new ArgumentNullException("list"); + } + if (loadBalance) + { + return (new DynamicPartitionerForIList<TSource>(list)); + } + else + { + return (new StaticIndexRangePartitionerForIList<TSource>(list)); + } + } + + /// <summary> + /// Creates an orderable partitioner from a <see cref="System.Array"/> instance. + /// </summary> + /// <typeparam name="TSource">Type of the elements in source array.</typeparam> + /// <param name="array">The array to be partitioned.</param> + /// <param name="loadBalance"> + /// A Boolean value that indicates whether the created partitioner should dynamically load balance + /// between partitions rather than statically partition. + /// </param> + /// <returns> + /// An orderable partitioner based on the input array. + /// </returns> + public static OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool loadBalance) + { + // This implementation uses 'ldelem' instructions for element retrieval, rather than using a + // method call. + + if (array == null) + { + throw new ArgumentNullException("array"); + } + if (loadBalance) + { + return (new DynamicPartitionerForArray<TSource>(array)); + } + else + { + return (new StaticIndexRangePartitionerForArray<TSource>(array)); + } + } + + /// <summary> + /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance. + /// </summary> + /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam> + /// <param name="source">The enumerable to be partitioned.</param> + /// <returns> + /// An orderable partitioner based on the input array. + /// </returns> + /// <remarks> + /// The ordering used in the created partitioner is determined by the natural order of the elements + /// as retrieved from the source enumerable. + /// </remarks> + public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source) + { + return Create<TSource>(source, EnumerablePartitionerOptions.None); + } + + /// <summary> + /// Creates an orderable partitioner from a <see cref="System.Collections.Generic.IEnumerable{TSource}"/> instance. + /// </summary> + /// <typeparam name="TSource">Type of the elements in source enumerable.</typeparam> + /// <param name="source">The enumerable to be partitioned.</param> + /// <param name="partitionerOptions">Options to control the buffering behavior of the partitioner.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// The <paramref name="partitionerOptions"/> argument specifies an invalid value for <see + /// cref="T:System.Collections.Concurrent.EnumerablePartitionerOptions"/>. + /// </exception> + /// <returns> + /// An orderable partitioner based on the input array. + /// </returns> + /// <remarks> + /// The ordering used in the created partitioner is determined by the natural order of the elements + /// as retrieved from the source enumerable. + /// </remarks> + public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions) + { + if (source == null) + { + throw new ArgumentNullException("source"); + } + + if ((partitionerOptions & (~EnumerablePartitionerOptions.NoBuffering)) != 0) + throw new ArgumentOutOfRangeException("partitionerOptions"); + + return (new DynamicPartitionerForIEnumerable<TSource>(source, partitionerOptions)); + } + + /// <summary>Creates a partitioner that chunks the user-specified range.</summary> + /// <param name="fromInclusive">The lower, inclusive bound of the range.</param> + /// <param name="toExclusive">The upper, exclusive bound of the range.</param> + /// <returns>A partitioner.</returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is + /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception> + public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive) + { + // How many chunks do we want to divide the range into? If this is 1, then the + // answer is "one chunk per core". Generally, though, you'll achieve better + // load balancing on a busy system if you make it higher than 1. + int coreOversubscriptionRate = 3; + + if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); + long rangeSize = (toExclusive - fromInclusive) / + (PlatformHelper.ProcessorCount * coreOversubscriptionRate); + if (rangeSize == 0) rangeSize = 1; + return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time + } + + /// <summary>Creates a partitioner that chunks the user-specified range.</summary> + /// <param name="fromInclusive">The lower, inclusive bound of the range.</param> + /// <param name="toExclusive">The upper, exclusive bound of the range.</param> + /// <param name="rangeSize">The size of each subrange.</param> + /// <returns>A partitioner.</returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is + /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is + /// less than or equal to 0.</exception> + public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive, long rangeSize) + { + if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); + if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize"); + return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time + } + + // Private method to parcel out range tuples. + private static IEnumerable<Tuple<long, long>> CreateRanges(long fromInclusive, long toExclusive, long rangeSize) + { + // Enumerate all of the ranges + long from, to; + bool shouldQuit = false; + + for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize) + { + from = i; + try { checked { to = i + rangeSize; } } + catch (OverflowException) + { + to = toExclusive; + shouldQuit = true; + } + if (to > toExclusive) to = toExclusive; + yield return new Tuple<long, long>(from, to); + } + } + + /// <summary>Creates a partitioner that chunks the user-specified range.</summary> + /// <param name="fromInclusive">The lower, inclusive bound of the range.</param> + /// <param name="toExclusive">The upper, exclusive bound of the range.</param> + /// <returns>A partitioner.</returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is + /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception> + public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive) + { + // How many chunks do we want to divide the range into? If this is 1, then the + // answer is "one chunk per core". Generally, though, you'll achieve better + // load balancing on a busy system if you make it higher than 1. + int coreOversubscriptionRate = 3; + + if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); + int rangeSize = (toExclusive - fromInclusive) / + (PlatformHelper.ProcessorCount * coreOversubscriptionRate); + if (rangeSize == 0) rangeSize = 1; + return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time + } + + /// <summary>Creates a partitioner that chunks the user-specified range.</summary> + /// <param name="fromInclusive">The lower, inclusive bound of the range.</param> + /// <param name="toExclusive">The upper, exclusive bound of the range.</param> + /// <param name="rangeSize">The size of each subrange.</param> + /// <returns>A partitioner.</returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="toExclusive"/> argument is + /// less than or equal to the <paramref name="fromInclusive"/> argument.</exception> + /// <exception cref="T:System.ArgumentOutOfRangeException"> The <paramref name="rangeSize"/> argument is + /// less than or equal to 0.</exception> + public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize) + { + if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); + if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize"); + return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); // chunk one range at a time + } + + // Private method to parcel out range tuples. + private static IEnumerable<Tuple<int, int>> CreateRanges(int fromInclusive, int toExclusive, int rangeSize) + { + // Enumerate all of the ranges + int from, to; + bool shouldQuit = false; + + for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize) + { + from = i; + try { checked { to = i + rangeSize; } } + catch (OverflowException) + { + to = toExclusive; + shouldQuit = true; + } + if (to > toExclusive) to = toExclusive; + yield return new Tuple<int, int>(from, to); + } + } + + #region DynamicPartitionEnumerator_Abstract class + /// <summary> + /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance + /// partitioning algorithm. + /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source: + /// the key is the index in the source collection; the value is the item itself. + /// - a set of such partitions share a reader over data source. The type of the reader is specified by + /// TSourceReader. + /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk + /// size is initially 1, and doubles every time until it reaches the maximum chunk size. + /// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange + /// types (IList and the array), one for data source of IEnumerable. + /// - The method "Reset" is not supported for any partitioning algorithm. + /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it + /// in this abstract class. + /// </summary> + /// <typeparam name="TSource">Type of the elements in the data source</typeparam> + /// <typeparam name="TSourceReader">Type of the reader on the data source</typeparam> + //TSourceReader is + // - IList<TSource>, when source data is IList<TSource>, the shared reader is source data itself + // - TSource[], when source data is TSource[], the shared reader is source data itself + // - IEnumerator<TSource>, when source data is IEnumerable<TSource>, and the shared reader is an + // enumerator of the source data + private abstract class DynamicPartitionEnumerator_Abstract<TSource, TSourceReader> : IEnumerator<KeyValuePair<long, TSource>> + { + //----------------- common fields and constructor for all dynamic partitioners ----------------- + //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator + protected readonly TSourceReader m_sharedReader; + + protected static int s_defaultMaxChunkSize = GetDefaultChunkSize<TSource>(); + + //deferred allocating in MoveNext() with initial value 0, to avoid false sharing + //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator + protected SharedInt m_currentChunkSize; + + //deferring allocation in MoveNext() with initial value -1, to avoid false sharing + protected SharedInt m_localOffset; + + private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs + private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles + protected readonly int m_maxChunkSize; // s_defaultMaxChunkSize unless single-chunking is requested by the caller + + // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable + // it serves as tracking of the natual order of elements in m_sharedReader + // the value of this field is passed in from outside (already initialized) by the constructor, + protected readonly SharedLong m_sharedIndex; + + protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex) + : this(sharedReader, sharedIndex, false) + { + } + + protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex, bool useSingleChunking) + { + m_sharedReader = sharedReader; + m_sharedIndex = sharedIndex; + m_maxChunkSize = useSingleChunking ? 1 : s_defaultMaxChunkSize; + } + + // ---------------- abstract method declarations -------------- + + /// <summary> + /// Abstract method to request a contiguous chunk of elements from the source collection + /// </summary> + /// <param name="requestedChunkSize">specified number of elements requested</param> + /// <returns> + /// true if we successfully reserved at least one element (up to #=requestedChunkSize) + /// false if all elements in the source collection have been reserved. + /// </returns> + //GrabNextChunk does the following: + // - grab # of requestedChunkSize elements from source data through shared reader, + // - at the time of function returns, m_currentChunkSize is updated with the number of + // elements actually got assigned (<=requestedChunkSize). + // - GrabNextChunk returns true if at least one element is assigned to this partition; + // false if the shared reader already hits the last element of the source data before + // we call GrabNextChunk + protected abstract bool GrabNextChunk(int requestedChunkSize); + + /// <summary> + /// Abstract property, returns whether or not the shared reader has already read the last + /// element of the source data + /// </summary> + protected abstract bool HasNoElementsLeft { get; set; } + + /// <summary> + /// Get the current element in the current partition. Property required by IEnumerator interface + /// This property is abstract because the implementation is different depending on the type + /// of the source data: IList, Array or IEnumerable + /// </summary> + public abstract KeyValuePair<long, TSource> Current { get; } + + /// <summary> + /// Dispose is abstract, and depends on the type of the source data: + /// - For source data type IList and Array, the type of the shared reader is just the dataitself. + /// We don't do anything in Dispose method for IList and Array. + /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created. + /// Thus we need to dispose this shared reader enumerator, when there is no more active partitions + /// left. + /// </summary> + public abstract void Dispose(); + + /// <summary> + /// Reset on partitions is not supported + /// </summary> + public void Reset() + { + throw new NotSupportedException(); + } + + + /// <summary> + /// Get the current element in the current partition. Property required by IEnumerator interface + /// </summary> + Object IEnumerator.Current + { + get + { + return ((DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>)this).Current; + } + } + + /// <summary> + /// Moves to the next element if any. + /// Try current chunk first, if the current chunk do not have any elements left, then we + /// attempt to grab a chunk from the source collection. + /// </summary> + /// <returns> + /// true if successfully moving to the next position; + /// false otherwise, if and only if there is no more elements left in the current chunk + /// AND the source collection is exhausted. + /// </returns> + public bool MoveNext() + { + //perform deferred allocating of the local variables. + if (m_localOffset == null) + { + Contract.Assert(m_currentChunkSize == null); + m_localOffset = new SharedInt(-1); + m_currentChunkSize = new SharedInt(0); + m_doublingCountdown = CHUNK_DOUBLING_RATE; + } + + if (m_localOffset.Value < m_currentChunkSize.Value - 1) + //attempt to grab the next element from the local chunk + { + m_localOffset.Value++; + return true; + } + else + //otherwise it means we exhausted the local chunk + //grab a new chunk from the source enumerator + { + // The second part of the || condition is necessary to handle the case when MoveNext() is called + // after a previous MoveNext call returned false. + Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1 || m_currentChunkSize.Value == 0); + + //set the requested chunk size to a proper value + int requestedChunkSize; + if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator + { + requestedChunkSize = 1; + } + else if (m_doublingCountdown > 0) + { + requestedChunkSize = m_currentChunkSize.Value; + } + else + { + requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize); + m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset + } + + // Decrement your doubling countdown + m_doublingCountdown--; + + Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize); + //GrabNextChunk will update the value of m_currentChunkSize + if (GrabNextChunk(requestedChunkSize)) + { + Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0); + m_localOffset.Value = 0; + return true; + } + else + { + return false; + } + } + } + } + #endregion + + #region Dynamic Partitioner for source data of IEnuemrable<> type + /// <summary> + /// Inherits from DynamicPartitioners + /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance + /// of EnumerableOfPartitionsForIEnumerator defined internally + /// </summary> + /// <typeparam name="TSource">Type of elements in the source data</typeparam> + private class DynamicPartitionerForIEnumerable<TSource> : OrderablePartitioner<TSource> + { + IEnumerable<TSource> m_source; + readonly bool m_useSingleChunking; + + //constructor + internal DynamicPartitionerForIEnumerable(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions) + : base(true, false, true) + { + m_source = source; + m_useSingleChunking = ((partitionerOptions & EnumerablePartitionerOptions.NoBuffering) != 0); + } + + /// <summary> + /// Overrides OrderablePartitioner.GetOrderablePartitions. + /// Partitions the underlying collection into the given number of orderable partitions. + /// </summary> + /// <param name="partitionCount">number of partitions requested</param> + /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns> + override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount) + { + if (partitionCount <= 0) + { + throw new ArgumentOutOfRangeException("partitionCount"); + } + IEnumerator<KeyValuePair<long, TSource>>[] partitions + = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount]; + + IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, true); + for (int i = 0; i < partitionCount; i++) + { + partitions[i] = partitionEnumerable.GetEnumerator(); + } + return partitions; + } + + /// <summary> + /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions + /// </summary> + /// <returns>a enumerable collection of orderable partitions</returns> + override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions() + { + return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_useSingleChunking, false); + } + + /// <summary> + /// Whether additional partitions can be created dynamically. + /// </summary> + override public bool SupportsDynamicPartitions + { + get { return true; } + } + + #region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator + /// <summary> + /// Provides customized implementation for source data of IEnumerable + /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields + /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a + /// shared count "m_activePartitionCount" used to track active partitions when they were created statically + /// </summary> + private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>>, IDisposable + { + //reader through which we access the source data + private readonly IEnumerator<TSource> m_sharedReader; + private SharedLong m_sharedIndex;//initial value -1 + + private volatile KeyValuePair<long, TSource>[] m_FillBuffer; // intermediate buffer to reduce locking + private volatile int m_FillBufferSize; // actual number of elements in m_FillBuffer. Will start + // at m_FillBuffer.Length, and might be reduced during the last refill + private volatile int m_FillBufferCurrentPosition; //shared value to be accessed by Interlock.Increment only + private volatile int m_activeCopiers; //number of active copiers + + //fields shared by all partitions that this Enumerable owns, their allocation is deferred + private SharedBool m_hasNoElementsLeft; // no elements left at all. + private SharedBool m_sourceDepleted; // no elements left in the enumerator, but there may be elements in the Fill Buffer + + //shared synchronization lock, created by this Enumerable + private object m_sharedLock;//deferring allocation by enumerator + + private bool m_disposed; + + // If dynamic partitioning, then m_activePartitionCount == null + // If static partitioning, then it keeps track of active partition count + private SharedInt m_activePartitionCount; + + // records whether or not the user has requested single-chunking behavior + private readonly bool m_useSingleChunking; + + internal InternalPartitionEnumerable(IEnumerator<TSource> sharedReader, bool useSingleChunking, bool isStaticPartitioning) + { + m_sharedReader = sharedReader; + m_sharedIndex = new SharedLong(-1); + m_hasNoElementsLeft = new SharedBool(false); + m_sourceDepleted = new SharedBool(false); + m_sharedLock = new object(); + m_useSingleChunking = useSingleChunking; + + // Only allocate the fill-buffer if single-chunking is not in effect + if (!m_useSingleChunking) + { + // Time to allocate the fill buffer which is used to reduce the contention on the shared lock. + // First pick the buffer size multiplier. We use 4 for when there are more than 4 cores, and just 1 for below. This is based on empirical evidence. + int fillBufferMultiplier = (PlatformHelper.ProcessorCount > 4) ? 4 : 1; + + // and allocate the fill buffer using these two numbers + m_FillBuffer = new KeyValuePair<long, TSource>[fillBufferMultiplier * Partitioner.GetDefaultChunkSize<TSource>()]; + } + + if (isStaticPartitioning) + { + // If this object is created for static partitioning (ie. via GetPartitions(int partitionCount), + // GetOrderablePartitions(int partitionCount)), we track the active partitions, in order to dispose + // this object when all the partitions have been disposed. + m_activePartitionCount = new SharedInt(0); + } + else + { + // Otherwise this object is created for dynamic partitioning (ie, via GetDynamicPartitions(), + // GetOrderableDynamicPartitions()), we do not need tracking. This object must be disposed + // explicitly + m_activePartitionCount = null; + } + } + + public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator() + { + if (m_disposed) + { + throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed")); + } + else + { + return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex, + m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_useSingleChunking); + } + } + + + IEnumerator IEnumerable.GetEnumerator() + { + return ((InternalPartitionEnumerable)this).GetEnumerator(); + } + + + /////////////////// + // + // Used by GrabChunk_Buffered() + private void TryCopyFromFillBuffer(KeyValuePair<long, TSource>[] destArray, + int requestedChunkSize, + ref int actualNumElementsGrabbed) + { + actualNumElementsGrabbed = 0; + + + // making a local defensive copy of the fill buffer reference, just in case it gets nulled out + KeyValuePair<long, TSource>[] fillBufferLocalRef = m_FillBuffer; + if (fillBufferLocalRef == null) return; + + // first do a quick check, and give up if the current position is at the end + // so that we don't do an unncessary pair of Interlocked.Increment / Decrement calls + if (m_FillBufferCurrentPosition >= m_FillBufferSize) + { + return; // no elements in the buffer to copy from + } + + // We might have a chance to grab elements from the buffer. We will know for sure + // when we do the Interlocked.Add below. + // But first we must register as a potential copier in order to make sure + // the elements we're copying from don't get overwritten by another thread + // that starts refilling the buffer right after our Interlocked.Add. + Interlocked.Increment(ref m_activeCopiers); + + int endPos = Interlocked.Add(ref m_FillBufferCurrentPosition, requestedChunkSize); + int beginPos = endPos - requestedChunkSize; + + if (beginPos < m_FillBufferSize) + { + // adjust index and do the actual copy + actualNumElementsGrabbed = (endPos < m_FillBufferSize) ? endPos : m_FillBufferSize - beginPos; + Array.Copy(fillBufferLocalRef, beginPos, destArray, 0, actualNumElementsGrabbed); + } + + // let the record show we are no longer accessing the buffer + Interlocked.Decrement(ref m_activeCopiers); + } + + /// <summary> + /// This is the common entry point for consuming items from the source enumerable + /// </summary> + /// <returns> + /// true if we successfully reserved at least one element + /// false if all elements in the source collection have been reserved. + /// </returns> + internal bool GrabChunk(KeyValuePair<long, TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) + { + actualNumElementsGrabbed = 0; + + if (m_hasNoElementsLeft.Value) + { + return false; + } + + if (m_useSingleChunking) + { + return GrabChunk_Single(destArray, requestedChunkSize, ref actualNumElementsGrabbed); + } + else + { + return GrabChunk_Buffered(destArray, requestedChunkSize, ref actualNumElementsGrabbed); + } + } + + /// <summary> + /// Version of GrabChunk that grabs a single element at a time from the source enumerable + /// </summary> + /// <returns> + /// true if we successfully reserved an element + /// false if all elements in the source collection have been reserved. + /// </returns> + internal bool GrabChunk_Single(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) + { + Contract.Assert(m_useSingleChunking, "Expected m_useSingleChecking to be true"); + Contract.Assert(requestedChunkSize == 1, "Got requested chunk size of " + requestedChunkSize + " when single-chunking was on"); + Contract.Assert(actualNumElementsGrabbed == 0, "Expected actualNumElementsGrabbed == 0, instead it is " + actualNumElementsGrabbed); + Contract.Assert(destArray.Length == 1, "Expected destArray to be of length 1, instead its length is " + destArray.Length); + + lock (m_sharedLock) + { + if (m_hasNoElementsLeft.Value) return false; + + try + { + if (m_sharedReader.MoveNext()) + { + m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); + destArray[0] + = new KeyValuePair<long, TSource>(m_sharedIndex.Value, + m_sharedReader.Current); + actualNumElementsGrabbed = 1; + return true; + } + else + { + //if MoveNext() return false, we set the flag to inform other partitions + m_sourceDepleted.Value = true; + m_hasNoElementsLeft.Value = true; + return false; + } + } + catch + { + // On an exception, make sure that no additional items are hereafter enumerated + m_sourceDepleted.Value = true; + m_hasNoElementsLeft.Value = true; + throw; + } + } + } + + + + /// <summary> + /// Version of GrabChunk that uses buffering scheme to grab items out of source enumerable + /// </summary> + /// <returns> + /// true if we successfully reserved at least one element (up to #=requestedChunkSize) + /// false if all elements in the source collection have been reserved. + /// </returns> + internal bool GrabChunk_Buffered(KeyValuePair<long,TSource>[] destArray, int requestedChunkSize, ref int actualNumElementsGrabbed) + { + Contract.Assert(requestedChunkSize > 0); + Contract.Assert(!m_useSingleChunking, "Did not expect to be in single-chunking mode"); + + TryCopyFromFillBuffer(destArray, requestedChunkSize, ref actualNumElementsGrabbed); + + if (actualNumElementsGrabbed == requestedChunkSize) + { + // that was easy. + return true; + } + else if (m_sourceDepleted.Value) + { + // looks like we both reached the end of the fill buffer, and the source was depleted previously + // this means no more work to do for any other worker + m_hasNoElementsLeft.Value = true; + m_FillBuffer = null; + return (actualNumElementsGrabbed > 0); + } + + + // + // now's the time to take the shared lock and enumerate + // + lock (m_sharedLock) + { + if (m_sourceDepleted.Value) + { + return (actualNumElementsGrabbed > 0); + } + + try + { + // we need to make sure all array copiers are finished + if (m_activeCopiers > 0) + { + SpinWait sw = new SpinWait(); + while( m_activeCopiers > 0) sw.SpinOnce(); + } + + Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk + + // Now's the time to actually enumerate the source + + // We first fill up the requested # of elements in the caller's array + // continue from the where TryCopyFromFillBuffer() left off + for (; actualNumElementsGrabbed < requestedChunkSize; actualNumElementsGrabbed++) + { + if (m_sharedReader.MoveNext()) + { + m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); + destArray[actualNumElementsGrabbed] + = new KeyValuePair<long, TSource>(m_sharedIndex.Value, + m_sharedReader.Current); + } + else + { + //if MoveNext() return false, we set the flag to inform other partitions + m_sourceDepleted.Value = true; + break; + } + } + + // taking a local snapshot of m_FillBuffer in case some other thread decides to null out m_FillBuffer + // in the entry of this method after observing m_sourceCompleted = true + var localFillBufferRef = m_FillBuffer; + + // If the big buffer seems to be depleted, we will also fill that up while we are under the lock + // Note that this is the only place that m_FillBufferCurrentPosition can be reset + if (m_sourceDepleted.Value == false && localFillBufferRef != null && + m_FillBufferCurrentPosition >= localFillBufferRef.Length) + { + for (int i = 0; i < localFillBufferRef.Length; i++) + { + if( m_sharedReader.MoveNext()) + { + m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); + localFillBufferRef[i] + = new KeyValuePair<long, TSource>(m_sharedIndex.Value, + m_sharedReader.Current); + } + else + { + // No more elements left in the enumerator. + // Record this, so that the next request can skip the lock + m_sourceDepleted.Value = true; + + // also record the current count in m_FillBufferSize + m_FillBufferSize = i; + + // and exit the for loop so that we don't keep incrementing m_FillBufferSize + break; + } + + } + + m_FillBufferCurrentPosition = 0; + } + + + } + catch + { + // If an exception occurs, don't let the other enumerators try to enumerate. + // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected + // and not helpful to the end user. We know the root cause is being communicated already.) + m_sourceDepleted.Value = true; + m_hasNoElementsLeft.Value = true; + throw; + } + } + + return (actualNumElementsGrabbed > 0); + } + + public void Dispose() + { + if (!m_disposed) + { + m_disposed = true; + m_sharedReader.Dispose(); + } + } + } + + /// <summary> + /// Inherits from DynamicPartitionEnumerator_Abstract directly + /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose + /// </summary> + private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract<TSource, IEnumerator<TSource>> + { + //---- fields ---- + //cached local copy of the current chunk + private KeyValuePair<long, TSource>[] m_localList; //defer allocating to avoid false sharing + + // the values of the following two fields are passed in from + // outside(already initialized) by the constructor, + private readonly SharedBool m_hasNoElementsLeft; + private readonly object m_sharedLock; + private readonly SharedInt m_activePartitionCount; + private InternalPartitionEnumerable m_enumerable; + + //constructor + internal InternalPartitionEnumerator( + IEnumerator<TSource> sharedReader, + SharedLong sharedIndex, + SharedBool hasNoElementsLeft, + object sharedLock, + SharedInt activePartitionCount, + InternalPartitionEnumerable enumerable, + bool useSingleChunking) + : base(sharedReader, sharedIndex, useSingleChunking) + { + m_hasNoElementsLeft = hasNoElementsLeft; + m_sharedLock = sharedLock; + m_enumerable = enumerable; + m_activePartitionCount = activePartitionCount; + + if (m_activePartitionCount != null) + { + // If static partitioning, we need to increase the active partition count. + Interlocked.Increment(ref m_activePartitionCount.Value); + } + } + + //overriding methods + + /// <summary> + /// Reserves a contiguous range of elements from source data + /// </summary> + /// <param name="requestedChunkSize">specified number of elements requested</param> + /// <returns> + /// true if we successfully reserved at least one element (up to #=requestedChunkSize) + /// false if all elements in the source collection have been reserved. + /// </returns> + override protected bool GrabNextChunk(int requestedChunkSize) + { + Contract.Assert(requestedChunkSize > 0); + + if (HasNoElementsLeft) + { + return false; + } + + // defer allocation to avoid false sharing + if (m_localList == null) + { + m_localList = new KeyValuePair<long, TSource>[m_maxChunkSize]; + } + + // make the actual call to the enumerable that grabs a chunk + return m_enumerable.GrabChunk(m_localList, requestedChunkSize, ref m_currentChunkSize.Value); + } + + /// <summary> + /// Returns whether or not the shared reader has already read the last + /// element of the source data + /// </summary> + /// <remarks> + /// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element + /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared + /// boolean value m_hasNoElementsLeft across all partitions + /// </remarks> + override protected bool HasNoElementsLeft + { + get { return m_hasNoElementsLeft.Value; } + set + { + //we only set it from false to true once + //we should never set it back in any circumstances + Contract.Assert(value); + Contract.Assert(!m_hasNoElementsLeft.Value); + m_hasNoElementsLeft.Value = true; + } + } + + override public KeyValuePair<long, TSource> Current + { + get + { + //verify that MoveNext is at least called once before Current is called + if (m_currentChunkSize == null) + { + throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); + } + Contract.Assert(m_localList != null); + Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); + return (m_localList[m_localOffset.Value]); + } + } + + override public void Dispose() + { + // If this is static partitioning, ie. m_activePartitionCount != null, since the current partition + // is disposed, we decrement the number of active partitions for the shared reader. + if (m_activePartitionCount != null && Interlocked.Decrement(ref m_activePartitionCount.Value) == 0) + { + // If the number of active partitions becomes 0, we need to dispose the shared + // reader we created in the m_enumerable object. + m_enumerable.Dispose(); + } + // If this is dynamic partitioning, ie. m_activePartitionCount != null, then m_enumerable needs to + // be disposed explicitly by the user, and we do not need to anything here + } + } + #endregion + + } + #endregion + + #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>) + /// <summary> + /// Dynamic load-balance partitioner. This class is abstract and to be derived from by + /// the customized partitioner classes for IList, Array, and IEnumerable + /// </summary> + /// <typeparam name="TSource">Type of the elements in the source data</typeparam> + /// <typeparam name="TCollection"> Type of the source data collection</typeparam> + private abstract class DynamicPartitionerForIndexRange_Abstract<TSource, TCollection> : OrderablePartitioner<TSource> + { + // TCollection can be: IList<TSource>, TSource[] and IEnumerable<TSource> + // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly + TCollection m_data; + + /// <summary> + /// Constructs a new orderable partitioner + /// </summary> + /// <param name="data">source data collection</param> + protected DynamicPartitionerForIndexRange_Abstract(TCollection data) + : base(true, false, true) + { + m_data = data; + } + + /// <summary> + /// Partition the source data and create an enumerable over the resulting partitions. + /// </summary> + /// <param name="data">the source data collection</param> + /// <returns>an enumerable of partitions of </returns> + protected abstract IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TCollection data); + + /// <summary> + /// Overrides OrderablePartitioner.GetOrderablePartitions. + /// Partitions the underlying collection into the given number of orderable partitions. + /// </summary> + /// <param name="partitionCount">number of partitions requested</param> + /// <returns>A list containing <paramref name="partitionCount"/> enumerators.</returns> + override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount) + { + if (partitionCount <= 0) + { + throw new ArgumentOutOfRangeException("partitionCount"); + } + IEnumerator<KeyValuePair<long, TSource>>[] partitions + = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount]; + IEnumerable<KeyValuePair<long, TSource>> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data); + for (int i = 0; i < partitionCount; i++) + { + partitions[i] = partitionEnumerable.GetEnumerator(); + } + return partitions; + } + + /// <summary> + /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions + /// </summary> + /// <returns>a enumerable collection of orderable partitions</returns> + override public IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions() + { + return GetOrderableDynamicPartitions_Factory(m_data); + } + + /// <summary> + /// Whether additional partitions can be created dynamically. + /// </summary> + override public bool SupportsDynamicPartitions + { + get { return true; } + } + + } + + /// <summary> + /// Defines dynamic partition for source data of IList and Array. + /// This class inherits DynamicPartitionEnumerator_Abstract + /// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array + /// - Current property still remains abstract, implementation is different for IList and Array + /// - introduces another abstract method SourceCount, which returns the number of elements in + /// the source data. Implementation differs for IList and Array + /// </summary> + /// <typeparam name="TSource">Type of the elements in the data source</typeparam> + /// <typeparam name="TSourceReader">Type of the reader on the source data</typeparam> + private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSourceReader> : DynamicPartitionEnumerator_Abstract<TSource, TSourceReader> + { + //fields + protected int m_startIndex; //initially zero + + //constructor + protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, SharedLong sharedIndex) + : base(sharedReader, sharedIndex) + { + } + + //abstract methods + //the Current property is still abstract, and will be implemented by derived classes + //we add another abstract method SourceCount to get the number of elements from the source reader + + /// <summary> + /// Get the number of elements from the source reader. + /// It calls IList.Count or Array.Length + /// </summary> + protected abstract int SourceCount { get; } + + //overriding methods + + /// <summary> + /// Reserves a contiguous range of elements from source data + /// </summary> + /// <param name="requestedChunkSize">specified number of elements requested</param> + /// <returns> + /// true if we successfully reserved at least one element (up to #=requestedChunkSize) + /// false if all elements in the source collection have been reserved. + /// </returns> + override protected bool GrabNextChunk(int requestedChunkSize) + { + Contract.Assert(requestedChunkSize > 0); + + while (!HasNoElementsLeft) + { + Contract.Assert(m_sharedIndex != null); + // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture + long oldSharedIndex = Volatile.Read(ref m_sharedIndex.Value); + + if (HasNoElementsLeft) + { + //HasNoElementsLeft situation changed from false to true immediately + //and oldSharedIndex becomes stale + return false; + } + + //there won't be overflow, because the index of IList/array is int, and we + //have casted it to long. + long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize); + + + //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex] + //inclusive in the source collection + if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex) + == oldSharedIndex) + { + //set up local indexes. + //m_currentChunkSize is always set to requestedChunkSize when source data had + //enough elements of what we requested + m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex); + m_localOffset.Value = -1; + m_startIndex = (int)(oldSharedIndex + 1); + return true; + } + } + //didn't get any element, return false; + return false; + } + + /// <summary> + /// Returns whether or not the shared reader has already read the last + /// element of the source data + /// </summary> + override protected bool HasNoElementsLeft + { + get + { + Contract.Assert(m_sharedIndex != null); + // use the new Volatile.Read method because it is cheaper than Interlocked.Read on AMD64 architecture + return Volatile.Read(ref m_sharedIndex.Value) >= SourceCount - 1; + } + set + { + Contract.Assert(false); + } + } + + /// <summary> + /// For source data type IList and Array, the type of the shared reader is just the data itself. + /// We don't do anything in Dispose method for IList and Array. + /// </summary> + override public void Dispose() + { } + } + + + /// <summary> + /// Inherits from DynamicPartitioners + /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance + /// of EnumerableOfPartitionsForIList defined internally + /// </summary> + /// <typeparam name="TSource">Type of elements in the source data</typeparam> + private class DynamicPartitionerForIList<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, IList<TSource>> + { + //constructor + internal DynamicPartitionerForIList(IList<TSource> source) + : base(source) + { } + + //override methods + override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(IList<TSource> m_data) + { + //m_data itself serves as shared reader + return new InternalPartitionEnumerable(m_data); + } + + /// <summary> + /// Inherits from PartitionList_Abstract + /// Provides customized implementation for source data of IList + /// </summary> + private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>> + { + //reader through which we access the source data + private readonly IList<TSource> m_sharedReader; + private SharedLong m_sharedIndex; + + internal InternalPartitionEnumerable(IList<TSource> sharedReader) + { + m_sharedReader = sharedReader; + m_sharedIndex = new SharedLong(-1); + } + + public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator() + { + return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return ((InternalPartitionEnumerable)this).GetEnumerator(); + } + } + + /// <summary> + /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract + /// Provides customized implementation of SourceCount property and Current property for IList + /// </summary> + private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, IList<TSource>> + { + //constructor + internal InternalPartitionEnumerator(IList<TSource> sharedReader, SharedLong sharedIndex) + : base(sharedReader, sharedIndex) + { } + + //overriding methods + override protected int SourceCount + { + get { return m_sharedReader.Count; } + } + /// <summary> + /// return a KeyValuePair of the current element and its key + /// </summary> + override public KeyValuePair<long, TSource> Current + { + get + { + //verify that MoveNext is at least called once before Current is called + if (m_currentChunkSize == null) + { + throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); + } + + Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); + return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value, + m_sharedReader[m_startIndex + m_localOffset.Value]); + } + } + } + } + + + + /// <summary> + /// Inherits from DynamicPartitioners + /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance + /// of EnumerableOfPartitionsForArray defined internally + /// </summary> + /// <typeparam name="TSource">Type of elements in the source data</typeparam> + private class DynamicPartitionerForArray<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, TSource[]> + { + //constructor + internal DynamicPartitionerForArray(TSource[] source) + : base(source) + { } + + //override methods + override protected IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions_Factory(TSource[] m_data) + { + return new InternalPartitionEnumerable(m_data); + } + + /// <summary> + /// Inherits from PartitionList_Abstract + /// Provides customized implementation for source data of Array + /// </summary> + private class InternalPartitionEnumerable : IEnumerable<KeyValuePair<long, TSource>> + { + //reader through which we access the source data + private readonly TSource[] m_sharedReader; + private SharedLong m_sharedIndex; + + internal InternalPartitionEnumerable(TSource[] sharedReader) + { + m_sharedReader = sharedReader; + m_sharedIndex = new SharedLong(-1); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return ((InternalPartitionEnumerable)this).GetEnumerator(); + } + + + public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator() + { + return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); + } + } + + /// <summary> + /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract + /// Provides customized implementation of SourceCount property and Current property for Array + /// </summary> + private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSource[]> + { + //constructor + internal InternalPartitionEnumerator(TSource[] sharedReader, SharedLong sharedIndex) + : base(sharedReader, sharedIndex) + { } + + //overriding methods + override protected int SourceCount + { + get { return m_sharedReader.Length; } + } + + override public KeyValuePair<long, TSource> Current + { + get + { + //verify that MoveNext is at least called once before Current is called + if (m_currentChunkSize == null) + { + throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); + } + + Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); + return new KeyValuePair<long, TSource>(m_startIndex + m_localOffset.Value, + m_sharedReader[m_startIndex + m_localOffset.Value]); + } + } + } + } + #endregion + + + #region Static partitioning for IList and Array, abstract classes + /// <summary> + /// Static partitioning over IList. + /// - dynamic and load-balance + /// - Keys are ordered within each partition + /// - Keys are ordered across partitions + /// - Keys are normalized + /// - Number of partitions is fixed once specified, and the elements of the source data are + /// distributed to each partition as evenly as possible. + /// </summary> + /// <typeparam name="TSource">type of the elements</typeparam> + /// <typeparam name="TCollection">Type of the source data collection</typeparam> + private abstract class StaticIndexRangePartitioner<TSource, TCollection> : OrderablePartitioner<TSource> + { + protected StaticIndexRangePartitioner() + : base(true, true, true) + { } + + /// <summary> + /// Abstract method to return the number of elements in the source data + /// </summary> + protected abstract int SourceCount { get; } + + /// <summary> + /// Abstract method to create a partition that covers a range over source data, + /// starting from "startIndex", ending at "endIndex" + /// </summary> + /// <param name="startIndex">start index of the current partition on the source data</param> + /// <param name="endIndex">end index of the current partition on the source data</param> + /// <returns>a partition enumerator over the specified range</returns> + // The partitioning algorithm is implemented in GetOrderablePartitions method + // This method delegates according to source data type IList/Array + protected abstract IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex); + + /// <summary> + /// Overrides OrderablePartitioner.GetOrderablePartitions + /// Return a list of partitions, each of which enumerate a fixed part of the source data + /// The elements of the source data are distributed to each partition as evenly as possible. + /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b, + /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements, + /// and the last x-b partitions each has a elements. + /// For example, if N=10, x =3, then + /// partition 0 ranges [0,3], + /// partition 1 ranges [4,6], + /// partition 2 ranges [7,9]. + /// This also takes care of the situation of (x>N), the last x-N partitions are empty enumerators. + /// An empty enumerator is indicated by + /// (m_startIndex == list.Count && m_endIndex == list.Count -1) + /// </summary> + /// <param name="partitionCount">specified number of partitions</param> + /// <returns>a list of partitions</returns> + override public IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount) + { + if (partitionCount <= 0) + { + throw new ArgumentOutOfRangeException("partitionCount"); + } + + int quotient, remainder; + quotient = Math.DivRem(SourceCount, partitionCount, out remainder); + + IEnumerator<KeyValuePair<long, TSource>>[] partitions = new IEnumerator<KeyValuePair<long, TSource>>[partitionCount]; + int lastEndIndex = -1; + for (int i = 0; i < partitionCount; i++) + { + int startIndex = lastEndIndex + 1; + + if (i < remainder) + { + lastEndIndex = startIndex + quotient; + } + else + { + lastEndIndex = startIndex + quotient - 1; + } + partitions[i] = CreatePartition(startIndex, lastEndIndex); + } + return partitions; + } + } + + /// <summary> + /// Static Partition for IList/Array. + /// This class implements all methods required by IEnumerator interface, except for the Current property. + /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element + /// retrieval. + /// </summary> + //We assume the source collection is not being updated concurrently. Otherwise it will break the + //static partitioning, since each partition operates on the source collection directly, it does + //not have a local cache of the elements assigned to them. + private abstract class StaticIndexRangePartition<TSource> : IEnumerator<KeyValuePair<long, TSource>> + { + //the start and end position in the source collection for the current partition + //the partition is empty if and only if + // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1) + protected readonly int m_startIndex; + protected readonly int m_endIndex; + + //the current index of the current partition while enumerating on the source collection + protected volatile int m_offset; + + /// <summary> + /// Constructs an instance of StaticIndexRangePartition + /// </summary> + /// <param name="startIndex">the start index in the source collection for the current partition </param> + /// <param name="endIndex">the end index in the source collection for the current partition</param> + protected StaticIndexRangePartition(int startIndex, int endIndex) + { + m_startIndex = startIndex; + m_endIndex = endIndex; + m_offset = startIndex - 1; + } + + /// <summary> + /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster + /// element retrieval. + /// </summary> + public abstract KeyValuePair<long, TSource> Current { get; } + + /// <summary> + /// We don't dispose the source for IList and array + /// </summary> + public void Dispose() + { } + + public void Reset() + { + throw new NotSupportedException(); + } + + /// <summary> + /// Moves to the next item + /// Before the first MoveNext is called: m_offset == m_startIndex-1; + /// </summary> + /// <returns>true if successful, false if there is no item left</returns> + public bool MoveNext() + { + if (m_offset < m_endIndex) + { + m_offset++; + return true; + } + else + { + //After we have enumerated over all elements, we set m_offset to m_endIndex +1. + //The reason we do this is, for an empty enumerator, we need to tell the Current + //property whether MoveNext has been called or not. + //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex), + //and we don't set a new value to m_offset, then the above condition will always be + //true, and the Current property will mistakenly assume MoveNext is never called. + m_offset = m_endIndex + 1; + return false; + } + } + + Object IEnumerator.Current + { + get + { + return ((StaticIndexRangePartition<TSource>)this).Current; + } + } + } + #endregion + + #region Static partitioning for IList + /// <summary> + /// Inherits from StaticIndexRangePartitioner + /// Provides customized implementation of SourceCount and CreatePartition + /// </summary> + /// <typeparam name="TSource"></typeparam> + private class StaticIndexRangePartitionerForIList<TSource> : StaticIndexRangePartitioner<TSource, IList<TSource>> + { + IList<TSource> m_list; + internal StaticIndexRangePartitionerForIList(IList<TSource> list) + : base() + { + Contract.Assert(list != null); + m_list = list; + } + override protected int SourceCount + { + get { return m_list.Count; } + } + override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex) + { + return new StaticIndexRangePartitionForIList<TSource>(m_list, startIndex, endIndex); + } + } + + /// <summary> + /// Inherits from StaticIndexRangePartition + /// Provides customized implementation of Current property + /// </summary> + /// <typeparam name="TSource"></typeparam> + private class StaticIndexRangePartitionForIList<TSource> : StaticIndexRangePartition<TSource> + { + //the source collection shared by all partitions + private volatile IList<TSource> m_list; + + internal StaticIndexRangePartitionForIList(IList<TSource> list, int startIndex, int endIndex) + : base(startIndex, endIndex) + { + Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1); + m_list = list; + } + + override public KeyValuePair<long, TSource> Current + { + get + { + //verify that MoveNext is at least called once before Current is called + if (m_offset < m_startIndex) + { + throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); + } + + Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); + return (new KeyValuePair<long, TSource>(m_offset, m_list[m_offset])); + } + } + } + #endregion + + #region static partitioning for Arrays + /// <summary> + /// Inherits from StaticIndexRangePartitioner + /// Provides customized implementation of SourceCount and CreatePartition for Array + /// </summary> + private class StaticIndexRangePartitionerForArray<TSource> : StaticIndexRangePartitioner<TSource, TSource[]> + { + TSource[] m_array; + internal StaticIndexRangePartitionerForArray(TSource[] array) + : base() + { + Contract.Assert(array != null); + m_array = array; + } + override protected int SourceCount + { + get { return m_array.Length; } + } + override protected IEnumerator<KeyValuePair<long, TSource>> CreatePartition(int startIndex, int endIndex) + { + return new StaticIndexRangePartitionForArray<TSource>(m_array, startIndex, endIndex); + } + } + + /// <summary> + /// Inherits from StaticIndexRangePartitioner + /// Provides customized implementation of SourceCount and CreatePartition + /// </summary> + private class StaticIndexRangePartitionForArray<TSource> : StaticIndexRangePartition<TSource> + { + //the source collection shared by all partitions + private volatile TSource[] m_array; + + internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex) + : base(startIndex, endIndex) + { + Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1); + m_array = array; + } + + override public KeyValuePair<long, TSource> Current + { + get + { + //verify that MoveNext is at least called once before Current is called + if (m_offset < m_startIndex) + { + throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); + } + + Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); + return (new KeyValuePair<long, TSource>(m_offset, m_array[m_offset])); + } + } + } + #endregion + + + #region Utility functions + /// <summary> + /// A very simple primitive that allows us to share a value across multiple threads. + /// </summary> + /// <typeparam name="TSource"></typeparam> + private class SharedInt + { + internal volatile int Value; + + internal SharedInt(int value) + { + this.Value = value; + } + + } + + /// <summary> + /// A very simple primitive that allows us to share a value across multiple threads. + /// </summary> + private class SharedBool + { + internal volatile bool Value; + + internal SharedBool(bool value) + { + this.Value = value; + } + + } + + /// <summary> + /// A very simple primitive that allows us to share a value across multiple threads. + /// </summary> + private class SharedLong + { + internal long Value; + internal SharedLong(long value) + { + this.Value = value; + } + + } + + //-------------------- + // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling, + // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling + //-------------------- + + // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache + // lines worth, assuming 128b cache line. Most (popular) architectures use 64b cache lines, + // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient + // for 128b cache systems. So 128b it is. + private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4; + + private static int GetDefaultChunkSize<TSource>() + { + int chunkSize; + + if (typeof(TSource).IsValueType) + { +#if !FEATURE_CORECLR // Marshal.SizeOf is not supported in CoreCLR + + if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit) + { + chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource))); + } + else + { + // We choose '128' because this ensures, no matter the actual size of the value type, + // the total bytes used will be a multiple of 128. This ensures it's cache aligned. + chunkSize = 128; + } +#else + chunkSize = 128; +#endif + } + else + { + Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size"); + chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size); + } + return chunkSize; + } + #endregion + + } +} |