summaryrefslogtreecommitdiff
path: root/src/mscorlib/src/System/Threading/Tasks/ParallelRangeManager.cs
blob: 49f61a661497cf20ad55f8588f940b62024c07a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
//
//
// Implements the algorithm for distributing loop indices to parallel loop workers
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System;
using System.Threading;
using System.Diagnostics;
using System.Diagnostics.Contracts;

#pragma warning disable 0420

namespace System.Threading.Tasks
{
    /// <summary>
    /// Represents an index range
    /// </summary>
    internal struct IndexRange
    {
        // the From and To values for this range. These do not change.
        internal long m_nFromInclusive;
        internal long m_nToExclusive;

        // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual 
        // value saves us from overflows that can happen due to multiple workers racing to increment this.
        // All updates to this field need to be interlocked.
        internal volatile Shared<long> m_nSharedCurrentIndexOffset;

        // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here.
        internal int m_bRangeFinished;
    }


    /// <summary>
    /// The RangeWorker struct wraps the state needed by a task that services the parallel loop
    /// </summary>
    internal struct RangeWorker
    {
        // reference to the IndexRange array allocated by the range manager
        internal readonly IndexRange[] m_indexRanges;

        // index of the current index range that this worker is grabbing chunks from
        internal int m_nCurrentIndexRange;        

        // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager)
        internal long m_nStep;

        // increment value is the current amount that this worker will use 
        // to increment the shared index of the range it's working on
        internal long m_nIncrementValue;

        // the increment value is doubled each time this worker finds work, and is capped at this value
        internal readonly long m_nMaxIncrementValue;

        /// <summary>
        /// Initializes a RangeWorker struct
        /// </summary>
        internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep)
        {
            m_indexRanges = ranges;
            m_nCurrentIndexRange = nInitialRange;
            m_nStep = nStep;

            m_nIncrementValue = nStep;

            m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep;
        }

        /// <summary>
        /// Implements the core work search algorithm that will be used for this range worker. 
        /// </summary> 
        /// 
        /// Usage pattern is:
        ///    1) the thread associated with this rangeworker calls FindNewWork
        ///    2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values
        ///       to execute the sequential loop
        ///    3) if we return false it means there is no more work left. It's time to quit.        
        ///    
        internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal)
        {
            // since we iterate over index ranges circularly, we will use the
            // count of visited ranges as our exit condition
            int numIndexRangesToVisit = m_indexRanges.Length;

            do
            {
                // local snap to save array access bounds checks in places where we only read fields
                IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange];

                if (currentRange.m_bRangeFinished == 0)
                {
                    if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null)
                    {
                        Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared<long>(0), null);
                    }

                    // this access needs to be on the array slot
                    long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value,
                                                    m_nIncrementValue) - m_nIncrementValue;

                    if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset)
                    {
                        // we found work

                        nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset;
                        nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue;

                        // Check for going past end of range, or wrapping
                        if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) )
                        {
                            nToExclusiveLocal = currentRange.m_nToExclusive;
                        }

                        // We will double our unit of increment until it reaches the maximum.
                        if (m_nIncrementValue < m_nMaxIncrementValue)
                        {
                            m_nIncrementValue *= 2;
                            if (m_nIncrementValue > m_nMaxIncrementValue)
                            {
                                m_nIncrementValue = m_nMaxIncrementValue;
                            }
                        }

                        return true;
                    }
                    else
                    {
                        // this index range is completed, mark it so that others can skip it quickly
                        Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1);
                    }
                }

                // move on to the next index range, in circular order.
                m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length;
                numIndexRangesToVisit--;

            } while (numIndexRangesToVisit > 0);
            // we've visited all index ranges possible => there's no work remaining

            nFromInclusiveLocal = 0;
            nToExclusiveLocal = 0;

            return false;
        }


        /// <summary>
        /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values.
        /// </summary> 
        internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32)
        {
            long nFromInclusiveLocal;
            long nToExclusiveLocal;

            bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal);

            Debug.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) &&
                            (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue));
            
            // convert to 32 bit before returning
            nFromInclusiveLocal32 = (int)nFromInclusiveLocal;
            nToExclusiveLocal32 = (int)nToExclusiveLocal;
            
            return bRetVal;
        }
    }


    /// <summary>
    /// Represents the entire loop operation, keeping track of workers and ranges.
    /// </summary>
    /// 
    /// The usage pattern is:
    ///    1) The Parallel loop entry function (ForWorker) creates an instance of this class
    ///    2) Every thread joining to service the parallel loop calls RegisterWorker to grab a 
    ///       RangeWorker struct to wrap the state it will need to find and execute work, 
    ///       and they keep interacting with that struct until the end of the loop
    internal class RangeManager
    {
        internal readonly IndexRange[] m_indexRanges;

        internal int m_nCurrentIndexRangeToAssign;
        internal long m_nStep;
        
        /// <summary>
        /// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges
        /// </summary>
        internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
        {
            m_nCurrentIndexRangeToAssign = 0;
            m_nStep = nStep;

            // Our signed math breaks down w/ nNumExpectedWorkers == 1.  So change it to 2.
            if (nNumExpectedWorkers == 1)
                nNumExpectedWorkers = 2;

            //
            // calculate the size of each index range
            //

            ulong uSpan = (ulong)(nToExclusive - nFromInclusive);
            ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first
            
            uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep 
                                                      // otherwise index range transitions will derail us from nStep

            if (uRangeSize == 0)
            {
                uRangeSize = (ulong) nStep;
            }

            //
            // find the actual number of index ranges we will need
            //
            Debug.Assert((uSpan / uRangeSize) < Int32.MaxValue);

            int nNumRanges = (int)(uSpan / uRangeSize);
            
            if (uSpan % uRangeSize != 0)
            {
                nNumRanges++;
            }


            // Convert to signed so the rest of the logic works.
            // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. 
            long nRangeSize = (long)uRangeSize; 

            // allocate the array of index ranges
            m_indexRanges = new IndexRange[nNumRanges];

            long nCurrentIndex = nFromInclusive;
            for (int i = 0; i < nNumRanges; i++)
            {
                // the fromInclusive of the new index range is always on nCurrentIndex
                m_indexRanges[i].m_nFromInclusive = nCurrentIndex;
                m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
                m_indexRanges[i].m_bRangeFinished = 0;

                // now increment it to find the toExclusive value for our range
                nCurrentIndex += nRangeSize;

                // detect integer overflow or range overage and snap to nToExclusive
                if (nCurrentIndex < nCurrentIndex - nRangeSize ||
                    nCurrentIndex > nToExclusive)
                {
                    // this should only happen at the last index
                    Debug.Assert(i == nNumRanges - 1);

                    nCurrentIndex = nToExclusive;
                }

                // now that the end point of the new range is calculated, assign it.
                m_indexRanges[i].m_nToExclusive = nCurrentIndex;
            }
        }

        /// <summary>
        /// The function that needs to be called by each new worker thread servicing the parallel loop
        /// in order to get a RangeWorker struct that wraps the state for finding and executing indices
        /// </summary>
        internal RangeWorker RegisterNewWorker()
        {
            Debug.Assert(m_indexRanges != null && m_indexRanges.Length != 0);

            int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length;

            return new RangeWorker(m_indexRanges, nInitialRange, m_nStep);
        }
    }
}
#pragma warning restore 0420