// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
//
//
// Specialized producer/consumer queues.
//
//
// *************************
//
// src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
// src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
// Keep both of them consistent by changing the other file when you change this one, also avoid:
// 1- To reference interneal types in mscorlib
// 2- To reference any dataflow specific types
// This should be fixed post Dev11 when this class becomes public.
//
// *************************
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
namespace System.Threading.Tasks
{
/// Represents a producer/consumer queue used internally by dataflow blocks.
/// Specifies the type of data contained in the queue.
internal interface IProducerConsumerQueue : IEnumerable
{
/// Enqueues an item into the queue.
/// The item to enqueue.
/// This method is meant to be thread-safe subject to the particular nature of the implementation.
void Enqueue(T item);
/// Attempts to dequeue an item from the queue.
/// The dequeued item.
/// true if an item could be dequeued; otherwise, false.
/// This method is meant to be thread-safe subject to the particular nature of the implementation.
bool TryDequeue(out T result);
/// Gets whether the collection is currently empty.
/// This method may or may not be thread-safe.
bool IsEmpty { get; }
/// Gets the number of items in the collection.
/// In many implementations, this method will not be thread-safe.
int Count { get; }
/// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
/// The sync object used to lock
/// The collection count
int GetCountSafe(object syncObj);
}
///
/// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
///
/// Specifies the type of data contained in the queue.
[DebuggerDisplay("Count = {Count}")]
internal sealed class MultiProducerMultiConsumerQueue : ConcurrentQueue, IProducerConsumerQueue
{
/// Enqueues an item into the queue.
/// The item to enqueue.
void IProducerConsumerQueue.Enqueue(T item) { base.Enqueue(item); }
/// Attempts to dequeue an item from the queue.
/// The dequeued item.
/// true if an item could be dequeued; otherwise, false.
bool IProducerConsumerQueue.TryDequeue(out T result) { return base.TryDequeue(out result); }
/// Gets whether the collection is currently empty.
bool IProducerConsumerQueue.IsEmpty { get { return base.IsEmpty; } }
/// Gets the number of items in the collection.
int IProducerConsumerQueue.Count { get { return base.Count; } }
/// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
/// ConcurrentQueue.Count is thread safe, no need to acquire the lock.
int IProducerConsumerQueue.GetCountSafe(object syncObj) { return base.Count; }
}
///
/// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
///
/// Specifies the type of data contained in the queue.
[DebuggerDisplay("Count = {Count}")]
[DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
internal sealed class SingleProducerSingleConsumerQueue : IProducerConsumerQueue
{
// Design:
//
// SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
// by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
// multiple producer threads concurrently or multiple consumer threads concurrently.
//
// SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
// as an array with two indexes: m_first and m_last. m_first is the index of the array slot for the consumer
// to read next, and m_last is the slot for the producer to write next. The circular buffer is empty when
// (m_first == m_last), and full when ((m_last+1) % m_array.Length == m_first).
//
// Since m_first is only ever modified by the consumer thread and m_last by the producer, the two indices can
// be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
// enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
// that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
// as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
// empty out the old buffer and only then follow the producer into the new (larger) buffer.
//
// As described above, the enqueue operation on the fast path only modifies the m_first field of the current segment.
// However, it also needs to read m_last in order to verify that there is room in the current segment. Similarly, the
// dequeue operation on the fast path only needs to modify m_last, but also needs to read m_first to verify that the
// queue is non-empty. This results in true cache line sharing between the producer and the consumer.
//
// The cache line sharing issue can be mitigating by having a possibly stale copy of m_first that is owned by the producer,
// and a possibly stale copy of m_last that is owned by the consumer. So, the consumer state is described using
// (m_first, m_lastCopy) and the producer state using (m_firstCopy, m_last). The consumer state is separated from
// the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
// m_lastCopy is the consumer's copy of m_last. Whenever the consumer can tell that there is room in the buffer
// simply by observing m_lastCopy, the consumer thread does not need to read m_last and thus encounter a cache miss. Only
// when the buffer appears to be empty will the consumer refresh m_lastCopy from m_last. m_firstCopy is used by the producer
// in the same way to avoid reading m_first on the hot path.
/// The initial size to use for segments (in number of elements).
private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2
/// The maximum size to use for segments (in number of elements).
private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as Int32.MaxValue / 2
/// The head of the linked list of segments.
private volatile Segment m_head;
/// The tail of the linked list of segments.
private volatile Segment m_tail;
/// Initializes the queue.
internal SingleProducerSingleConsumerQueue()
{
// Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0.");
Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2");
Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum.");
Debug.Assert(MAX_SEGMENT_SIZE < Int32.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
// Initialize the queue
m_head = m_tail = new Segment(INIT_SEGMENT_SIZE);
}
/// Enqueues an item into the queue.
/// The item to enqueue.
public void Enqueue(T item)
{
Segment segment = m_tail;
var array = segment.m_array;
int last = segment.m_state.m_last; // local copy to avoid multiple volatile reads
// Fast path: there's obviously room in the current segment
int tail2 = (last + 1) & (array.Length - 1);
if (tail2 != segment.m_state.m_firstCopy)
{
array[last] = item;
segment.m_state.m_last = tail2;
}
// Slow path: there may not be room in the current segment.
else EnqueueSlow(item, ref segment);
}
/// Enqueues an item into the queue.
/// The item to enqueue.
/// The segment in which to first attempt to store the item.
private void EnqueueSlow(T item, ref Segment segment)
{
Contract.Requires(segment != null, "Expected a non-null segment.");
if (segment.m_state.m_firstCopy != segment.m_state.m_first)
{
segment.m_state.m_firstCopy = segment.m_state.m_first;
Enqueue(item); // will only recur once for this enqueue operation
return;
}
int newSegmentSize = m_tail.m_array.Length << 1; // double size
Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE;
var newSegment = new Segment(newSegmentSize);
newSegment.m_array[0] = item;
newSegment.m_state.m_last = 1;
newSegment.m_state.m_lastCopy = 1;
try { } finally
{
// Finally block to protect against corruption due to a thread abort
// between setting m_next and setting m_tail.
Volatile.Write(ref m_tail.m_next, newSegment); // ensure segment not published until item is fully stored
m_tail = newSegment;
}
}
/// Attempts to dequeue an item from the queue.
/// The dequeued item.
/// true if an item could be dequeued; otherwise, false.
public bool TryDequeue(out T result)
{
Segment segment = m_head;
var array = segment.m_array;
int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads
// Fast path: there's obviously data available in the current segment
if (first != segment.m_state.m_lastCopy)
{
result = array[first];
array[first] = default(T); // Clear the slot to release the element
segment.m_state.m_first = (first + 1) & (array.Length - 1);
return true;
}
// Slow path: there may not be data available in the current segment
else return TryDequeueSlow(ref segment, ref array, out result);
}
/// Attempts to dequeue an item from the queue.
/// The array from which the item was dequeued.
/// The segment from which the item was dequeued.
/// The dequeued item.
/// true if an item could be dequeued; otherwise, false.
private bool TryDequeueSlow(ref Segment segment, ref T[] array, out T result)
{
Contract.Requires(segment != null, "Expected a non-null segment.");
Contract.Requires(array != null, "Expected a non-null item array.");
if (segment.m_state.m_last != segment.m_state.m_lastCopy)
{
segment.m_state.m_lastCopy = segment.m_state.m_last;
return TryDequeue(out result); // will only recur once for this dequeue operation
}
if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last)
{
segment = segment.m_next;
array = segment.m_array;
m_head = segment;
}
var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads
if (first == segment.m_state.m_last)
{
result = default(T);
return false;
}
result = array[first];
array[first] = default(T); // Clear the slot to release the element
segment.m_state.m_first = (first + 1) & (segment.m_array.Length - 1);
segment.m_state.m_lastCopy = segment.m_state.m_last; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy
return true;
}
/// Attempts to peek at an item in the queue.
/// The peeked item.
/// true if an item could be peeked; otherwise, false.
public bool TryPeek(out T result)
{
Segment segment = m_head;
var array = segment.m_array;
int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads
// Fast path: there's obviously data available in the current segment
if (first != segment.m_state.m_lastCopy)
{
result = array[first];
return true;
}
// Slow path: there may not be data available in the current segment
else return TryPeekSlow(ref segment, ref array, out result);
}
/// Attempts to peek at an item in the queue.
/// The array from which the item is peeked.
/// The segment from which the item is peeked.
/// The peeked item.
/// true if an item could be peeked; otherwise, false.
private bool TryPeekSlow(ref Segment segment, ref T[] array, out T result)
{
Contract.Requires(segment != null, "Expected a non-null segment.");
Contract.Requires(array != null, "Expected a non-null item array.");
if (segment.m_state.m_last != segment.m_state.m_lastCopy)
{
segment.m_state.m_lastCopy = segment.m_state.m_last;
return TryPeek(out result); // will only recur once for this peek operation
}
if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last)
{
segment = segment.m_next;
array = segment.m_array;
m_head = segment;
}
var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads
if (first == segment.m_state.m_last)
{
result = default(T);
return false;
}
result = array[first];
return true;
}
/// Attempts to dequeue an item from the queue.
/// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.
/// The dequeued item.
/// true if an item could be dequeued; otherwise, false.
public bool TryDequeueIf(Predicate predicate, out T result)
{
Segment segment = m_head;
var array = segment.m_array;
int first = segment.m_state.m_first; // local copy to avoid multiple volatile reads
// Fast path: there's obviously data available in the current segment
if (first != segment.m_state.m_lastCopy)
{
result = array[first];
if (predicate == null || predicate(result))
{
array[first] = default(T); // Clear the slot to release the element
segment.m_state.m_first = (first + 1) & (array.Length - 1);
return true;
}
else
{
result = default(T);
return false;
}
}
// Slow path: there may not be data available in the current segment
else return TryDequeueIfSlow(predicate, ref segment, ref array, out result);
}
/// Attempts to dequeue an item from the queue.
/// The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.
/// The array from which the item was dequeued.
/// The segment from which the item was dequeued.
/// The dequeued item.
/// true if an item could be dequeued; otherwise, false.
private bool TryDequeueIfSlow(Predicate predicate, ref Segment segment, ref T[] array, out T result)
{
Contract.Requires(segment != null, "Expected a non-null segment.");
Contract.Requires(array != null, "Expected a non-null item array.");
if (segment.m_state.m_last != segment.m_state.m_lastCopy)
{
segment.m_state.m_lastCopy = segment.m_state.m_last;
return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
}
if (segment.m_next != null && segment.m_state.m_first == segment.m_state.m_last)
{
segment = segment.m_next;
array = segment.m_array;
m_head = segment;
}
var first = segment.m_state.m_first; // local copy to avoid extraneous volatile reads
if (first == segment.m_state.m_last)
{
result = default(T);
return false;
}
result = array[first];
if (predicate == null || predicate(result))
{
array[first] = default(T); // Clear the slot to release the element
segment.m_state.m_first = (first + 1) & (segment.m_array.Length - 1);
segment.m_state.m_lastCopy = segment.m_state.m_last; // Refresh m_lastCopy to ensure that m_first has not passed m_lastCopy
return true;
}
else
{
result = default(T);
return false;
}
}
public void Clear()
{
T ignored;
while (TryDequeue(out ignored)) ;
}
/// Gets whether the collection is currently empty.
/// WARNING: This should not be used concurrently without further vetting.
public bool IsEmpty
{
// This implementation is optimized for calls from the consumer.
get
{
var head = m_head;
if (head.m_state.m_first != head.m_state.m_lastCopy) return false; // m_first is volatile, so the read of m_lastCopy cannot get reordered
if (head.m_state.m_first != head.m_state.m_last) return false;
return head.m_next == null;
}
}
/// Gets an enumerable for the collection.
/// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.
public IEnumerator GetEnumerator()
{
for (Segment segment = m_head; segment != null; segment = segment.m_next)
{
for (int pt = segment.m_state.m_first;
pt != segment.m_state.m_last;
pt = (pt + 1) & (segment.m_array.Length - 1))
{
yield return segment.m_array[pt];
}
}
}
/// Gets an enumerable for the collection.
/// WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
/// Gets the number of items in the collection.
/// WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.
public int Count
{
get
{
int count = 0;
for (Segment segment = m_head; segment != null; segment = segment.m_next)
{
int arraySize = segment.m_array.Length;
int first, last;
while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
{
first = segment.m_state.m_first;
last = segment.m_state.m_last;
if (first == segment.m_state.m_first) break;
}
count += (last - first) & (arraySize - 1);
}
return count;
}
}
/// A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.
/// The Count is not thread safe, so we need to acquire the lock.
int IProducerConsumerQueue.GetCountSafe(object syncObj)
{
Debug.Assert(syncObj != null, "The syncObj parameter is null.");
lock (syncObj)
{
return Count;
}
}
/// A segment in the queue containing one or more items.
[StructLayout(LayoutKind.Sequential)]
private sealed class Segment
{
/// The next segment in the linked list of segments.
internal Segment m_next;
/// The data stored in this segment.
internal readonly T[] m_array;
/// Details about the segment.
internal SegmentState m_state; // separated out to enable StructLayout attribute to take effect
/// Initializes the segment.
/// The size to use for this segment.
internal Segment(int size)
{
Contract.Requires((size & (size - 1)) == 0, "Size must be a power of 2");
m_array = new T[size];
}
}
/// Stores information about a segment.
[StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
private struct SegmentState
{
/// Padding to reduce false sharing between the segment's array and m_first.
internal PaddingFor32 m_pad0;
/// The index of the current head in the segment.
internal volatile int m_first;
/// A copy of the current tail index.
internal int m_lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there m_lastCopy is only read after reading the volatile m_first
/// Padding to reduce false sharing between the first and last.
internal PaddingFor32 m_pad1;
/// A copy of the current head index.
internal int m_firstCopy; // not voliatle as only read and written by the consumer thread
/// The index of the current tail in the segment.
internal volatile int m_last;
/// Padding to reduce false sharing with the last and what's after the segment.
internal PaddingFor32 m_pad2;
}
/// Debugger type proxy for a SingleProducerSingleConsumerQueue of T.
private sealed class SingleProducerSingleConsumerQueue_DebugView
{
/// The queue being visualized.
private readonly SingleProducerSingleConsumerQueue m_queue;
/// Initializes the debug view.
/// The queue being debugged.
public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue queue)
{
Contract.Requires(queue != null, "Expected a non-null queue.");
m_queue = queue;
}
/// Gets the contents of the list.
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Items
{
get
{
List list = new List();
foreach (T item in m_queue)
list.Add(item);
return list.ToArray();
}
}
}
}
/// A placeholder class for common padding constants and eventually routines.
static class PaddingHelpers
{
/// A size greater than or equal to the size of the most common CPU cache lines.
internal const int CACHE_LINE_SIZE = 128;
}
/// Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}.
[StructLayout(LayoutKind.Explicit, Size = PaddingHelpers.CACHE_LINE_SIZE - sizeof(Int32))] // Based on common case of 64-byte cache lines
struct PaddingFor32
{
}
}