// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // // // Specialized producer/consumer queues. // // // ************************* // // src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs // src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs // Keep both of them consistent by changing the other file when you change this one, also avoid: // 1- To reference interneal types in mscorlib // 2- To reference any dataflow specific types // This should be fixed post Dev11 when this class becomes public. // // ************************* // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.Contracts; using System.Runtime.InteropServices; namespace System.Threading.Tasks { /// Represents a producer/consumer queue used internally by dataflow blocks. /// Specifies the type of data contained in the queue. internal interface IProducerConsumerQueue : IEnumerable { /// Enqueues an item into the queue. /// The item to enqueue. /// This method is meant to be thread-safe subject to the particular nature of the implementation. void Enqueue(T item); /// Attempts to dequeue an item from the queue. /// The dequeued item. /// true if an item could be dequeued; otherwise, false. /// This method is meant to be thread-safe subject to the particular nature of the implementation. bool TryDequeue(out T result); /// Gets whether the collection is currently empty. /// This method may or may not be thread-safe. bool IsEmpty { get; } /// Gets the number of items in the collection. /// In many implementations, this method will not be thread-safe. int Count { get; } /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object. /// The sync object used to lock /// The collection count int GetCountSafe(object syncObj); } /// /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently. /// /// Specifies the type of data contained in the queue. [DebuggerDisplay("Count = {Count}")] internal sealed class MultiProducerMultiConsumerQueue : ConcurrentQueue, IProducerConsumerQueue { /// Enqueues an item into the queue. /// The item to enqueue. void IProducerConsumerQueue.Enqueue(T item) { base.Enqueue(item); } /// Attempts to dequeue an item from the queue. /// The dequeued item. /// true if an item could be dequeued; otherwise, false. bool IProducerConsumerQueue.TryDequeue(out T result) { return base.TryDequeue(out result); } /// Gets whether the collection is currently empty. bool IProducerConsumerQueue.IsEmpty { get { return base.IsEmpty; } } /// Gets the number of items in the collection. int IProducerConsumerQueue.Count { get { return base.Count; } } /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object. /// ConcurrentQueue.Count is thread safe, no need to acquire the lock. int IProducerConsumerQueue.GetCountSafe(object syncObj) { return base.Count; } } /// /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently. /// /// Specifies the type of data contained in the queue. [DebuggerDisplay("Count = {Count}")] [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))] internal sealed class SingleProducerSingleConsumerQueue : IProducerConsumerQueue { // Design: // // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by // multiple producer threads concurrently or multiple consumer threads concurrently. // // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented // as an array with two indexes: m_first and m_last. m_first is the index of the array slot for the consumer // to read next, and m_last is the slot for the producer to write next. The circular buffer is empty when // (m_first == m_last), and full when ((m_last+1) % m_array.Length == m_first). // // Since m_first is only ever modified by the consumer thread and m_last by the producer, the two indices can // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer, // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first // empty out the old buffer and only then follow the producer into the new (larger) buffer. // // As described above, the enqueue operation on the fast path only modifies the m_first field of the current segment. // However, it also needs to read m_last in order to verify that there is room in the current segment. Similarly, the // dequeue operation on the fast path only needs to modify m_last, but also needs to read m_first to verify that the // queue is non-empty. This results in true cache line sharing between the producer and the consumer. // // The cache line sharing issue can be mitigating by having a possibly stale copy of m_first that is owned by the producer, // and a possibly stale copy of m_last that is owned by the consumer. So, the consumer state is described using // (m_first, m_lastCopy) and the producer state using (m_firstCopy, m_last). The consumer state is separated from // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines. // m_lastCopy is the consumer's copy of m_last. Whenever the consumer can tell that there is room in the buffer // simply by observing m_lastCopy, the consumer thread does not need to read m_last and thus encounter a cache miss. Only // when the buffer appears to be empty will the consumer refresh m_lastCopy from m_last. m_firstCopy is used by the producer // in the same way to avoid reading m_first on the hot path. /// The initial size to use for segments (in number of elements). private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2 /// The maximum size to use for segments (in number of elements). private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as Int32.MaxValue / 2 /// The head of the linked list of segments. private volatile Segment m_head; /// The tail of the linked list of segments. private volatile Segment m_tail; /// Initializes the queue. internal SingleProducerSingleConsumerQueue() { // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0."); Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2"); Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum."); Debug.Assert(MAX_SEGMENT_SIZE < Int32.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur."); // Initialize the queue m_head = m_tail = new Segment(INIT_SEGMENT_SIZE); } /// Enqueues an item into the queue. /// The item to enqueue. public void Enqueue(T item) { Segment segment = m_tail; var array = segment.m_array; int last = segment.m_state.m_last; // local copy to avoid multiple volatile reads // Fast path: there's obviously room in the current segment int tail2 = (last + 1) & (array.Length - 1); if (tail2 != segment.m_state.m_firstCopy) { array[last] = item; segment.m_state.m_last = tail2; } // Slow path: there may not be room in the current segment. else EnqueueSlow(item, ref segment); } /// Enqueues an item into the queue. /// The item to enqueue. /// The segment in which to first attempt to store the item. private void EnqueueSlow(T item, ref Segment segment) { Contract.Requires(segment != null, "Expected a non-null segment."); if (segment.m_state.m_firstCopy != segment.m_state.m_first) { segment.m_state.m_firstCopy = segment.m_state.m_first; Enqueue(item); // will only recur once for this enqueue operation return; } int newSegmentSize = m_tail.m_array.Length << 1; // double size Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow."); if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE; var newSegment = new Segment(newSegmentSize); newSegment.m_array[0] = item; newSegment.m_state.m_last = 1; newSegment.m_state.m_lastCopy = 1; try { } finally { // Finally block to protect against corruption due to a thread abort // between setting m_next and setting m_tail. Volatile.Write(ref m_tail.m_next, newSegment); // ensure segment not published until item is fully stored m_tail = newSegment; } } /// Attempts to dequeue an item from the queue. /// The dequeued item. /// true if an item could be dequeued; otherwise, false. public bool TryDequeue(out T result) { Segment segment = m_head; var array = segment.m_array; int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads // Fast path: there's obviously data available in the current segment if (first != segment.m_state.m_lastCopy) { result = array[first]; array[first] = default(T); // Clear the slot to release the element segment.m_state.m_first = (first + 1) & (array.Length - 1); return true; } // Slow path: there may not be data available in the current segment else return TryDequeueSlow(ref segment, ref array, out result); } /// Attempts to dequeue an item from the queue. /// The array from which the item was dequeued. /// The segment from which the item was dequeued. /// The dequeued item. /// true if an item could be dequeued; otherwise, false. private bool TryDequeueSlow(ref Segment segment, ref T[] array, out T result) { Contract.Requires(segment != null, "Expected a non-null segment."); Contract.Requires(array != null, "Expected a non-null item array."); if (segment.m_state.m_last != segment.m_state.m_lastCopy) { segment.m_state.m_lastCopy = segment.m_state.m_last; return TryDequeue(out result); // will only recur once for this dequeue operation } if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last) { segment = segment.m_next; array = segment.m_array; m_head = segment; } var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads if (first == segment.m_state.m_last) { result = default(T); return false; } result = array[first]; array[first] = default(T); // Clear the slot to release the element segment.m_state.m_first = (first + 1) & (segment.m_array.Length - 1); segment.m_state.m_lastCopy = segment.m_state.m_last; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy return true; } /// Attempts to peek at an item in the queue. /// The peeked item. /// true if an item could be peeked; otherwise, false. public bool TryPeek(out T result) { Segment segment = m_head; var array = segment.m_array; int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads // Fast path: there's obviously data available in the current segment if (first != segment.m_state.m_lastCopy) { result = array[first]; return true; } // Slow path: there may not be data available in the current segment else return TryPeekSlow(ref segment, ref array, out result); } /// Attempts to peek at an item in the queue. /// The array from which the item is peeked. /// The segment from which the item is peeked. /// The peeked item. /// true if an item could be peeked; otherwise, false. private bool TryPeekSlow(ref Segment segment, ref T[] array, out T result) { Contract.Requires(segment != null, "Expected a non-null segment."); Contract.Requires(array != null, "Expected a non-null item array."); if (segment.m_state.m_last != segment.m_state.m_lastCopy) { segment.m_state.m_lastCopy = segment.m_state.m_last; return TryPeek(out result); // will only recur once for this peek operation } if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last) { segment = segment.m_next; array = segment.m_array; m_head = segment; } var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads if (first == segment.m_state.m_last) { result = default(T); return false; } result = array[first]; return true; } /// Attempts to dequeue an item from the queue. /// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true. /// The dequeued item. /// true if an item could be dequeued; otherwise, false. public bool TryDequeueIf(Predicate predicate, out T result) { Segment segment = m_head; var array = segment.m_array; int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads // Fast path: there's obviously data available in the current segment if (first != segment.m_state.m_lastCopy) { result = array[first]; if (predicate == null || predicate(result)) { array[first] = default(T); // Clear the slot to release the element segment.m_state.m_first = (first + 1) & (array.Length - 1); return true; } else { result = default(T); return false; } } // Slow path: there may not be data available in the current segment else return TryDequeueIfSlow(predicate, ref segment, ref array, out result); } /// Attempts to dequeue an item from the queue. /// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true. /// The array from which the item was dequeued. /// The segment from which the item was dequeued. /// The dequeued item. /// true if an item could be dequeued; otherwise, false. private bool TryDequeueIfSlow(Predicate predicate, ref Segment segment, ref T[] array, out T result) { Contract.Requires(segment != null, "Expected a non-null segment."); Contract.Requires(array != null, "Expected a non-null item array."); if (segment.m_state.m_last != segment.m_state.m_lastCopy) { segment.m_state.m_lastCopy = segment.m_state.m_last; return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation } if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last) { segment = segment.m_next; array = segment.m_array; m_head = segment; } var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads if (first == segment.m_state.m_last) { result = default(T); return false; } result = array[first]; if (predicate == null || predicate(result)) { array[first] = default(T); // Clear the slot to release the element segment.m_state.m_first = (first + 1) & (segment.m_array.Length - 1); segment.m_state.m_lastCopy = segment.m_state.m_last; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy return true; } else { result = default(T); return false; } } public void Clear() { T ignored; while (TryDequeue(out ignored)) ; } /// Gets whether the collection is currently empty. /// WARNING: This should not be used concurrently without further vetting. public bool IsEmpty { // This implementation is optimized for calls from the consumer. get { var head = m_head; if (head.m_state.m_first != head.m_state.m_lastCopy) return false; // m_first is volatile, so the read of m_lastCopy cannot get reordered if (head.m_state.m_first != head.m_state.m_last) return false; return head.m_next == null; } } /// Gets an enumerable for the collection. /// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently. public IEnumerator GetEnumerator() { for (Segment segment = m_head; segment != null; segment = segment.m_next) { for (int pt = segment.m_state.m_first; pt != segment.m_state.m_last; pt = (pt + 1) & (segment.m_array.Length - 1)) { yield return segment.m_array[pt]; } } } /// Gets an enumerable for the collection. /// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently. IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } /// Gets the number of items in the collection. /// WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently. public int Count { get { int count = 0; for (Segment segment = m_head; segment != null; segment = segment.m_next) { int arraySize = segment.m_array.Length; int first, last; while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is { first = segment.m_state.m_first; last = segment.m_state.m_last; if (first == segment.m_state.m_first) break; } count += (last - first) & (arraySize - 1); } return count; } } /// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object. /// The Count is not thread safe, so we need to acquire the lock. int IProducerConsumerQueue.GetCountSafe(object syncObj) { Debug.Assert(syncObj != null, "The syncObj parameter is null."); lock (syncObj) { return Count; } } /// A segment in the queue containing one or more items. [StructLayout(LayoutKind.Sequential)] private sealed class Segment { /// The next segment in the linked list of segments. internal Segment m_next; /// The data stored in this segment. internal readonly T[] m_array; /// Details about the segment. internal SegmentState m_state; // separated out to enable StructLayout attribute to take effect /// Initializes the segment. /// The size to use for this segment. internal Segment(int size) { Contract.Requires((size & (size - 1)) == 0, "Size must be a power of 2"); m_array = new T[size]; } } /// Stores information about a segment. [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing private struct SegmentState { /// Padding to reduce false sharing between the segment's array and m_first. internal PaddingFor32 m_pad0; /// The index of the current head in the segment. internal volatile int m_first; /// A copy of the current tail index. internal int m_lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there m_lastCopy is only read after reading the volatile m_first /// Padding to reduce false sharing between the first and last. internal PaddingFor32 m_pad1; /// A copy of the current head index. internal int m_firstCopy; // not voliatle as only read and written by the consumer thread /// The index of the current tail in the segment. internal volatile int m_last; /// Padding to reduce false sharing with the last and what's after the segment. internal PaddingFor32 m_pad2; } /// Debugger type proxy for a SingleProducerSingleConsumerQueue of T. private sealed class SingleProducerSingleConsumerQueue_DebugView { /// The queue being visualized. private readonly SingleProducerSingleConsumerQueue m_queue; /// Initializes the debug view. /// The queue being debugged. public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue queue) { Contract.Requires(queue != null, "Expected a non-null queue."); m_queue = queue; } /// Gets the contents of the list. [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get { List list = new List(); foreach (T item in m_queue) list.Add(item); return list.ToArray(); } } } } /// A placeholder class for common padding constants and eventually routines. static class PaddingHelpers { /// A size greater than or equal to the size of the most common CPU cache lines. internal const int CACHE_LINE_SIZE = 128; } /// Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}. [StructLayout(LayoutKind.Explicit, Size = PaddingHelpers.CACHE_LINE_SIZE - sizeof(Int32))] // Based on common case of 64-byte cache lines struct PaddingFor32 { } }