1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
//
//
// Implements the algorithm for distributing loop indices to parallel loop workers
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Threading;
using System.Diagnostics;
using System.Diagnostics.Contracts;
#pragma warning disable 0420
namespace System.Threading.Tasks
{
/// <summary>
/// Represents an index range
/// </summary>
internal struct IndexRange
{
// the From and To values for this range. These do not change.
internal long m_nFromInclusive;
internal long m_nToExclusive;
// The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual
// value saves us from overflows that can happen due to multiple workers racing to increment this.
// All updates to this field need to be interlocked.
internal volatile Shared<long> m_nSharedCurrentIndexOffset;
// to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here.
internal int m_bRangeFinished;
}
/// <summary>
/// The RangeWorker struct wraps the state needed by a task that services the parallel loop
/// </summary>
internal struct RangeWorker
{
// reference to the IndexRange array allocated by the range manager
internal readonly IndexRange[] m_indexRanges;
// index of the current index range that this worker is grabbing chunks from
internal int m_nCurrentIndexRange;
// the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager)
internal long m_nStep;
// increment value is the current amount that this worker will use
// to increment the shared index of the range it's working on
internal long m_nIncrementValue;
// the increment value is doubled each time this worker finds work, and is capped at this value
internal readonly long m_nMaxIncrementValue;
/// <summary>
/// Initializes a RangeWorker struct
/// </summary>
internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep)
{
m_indexRanges = ranges;
m_nCurrentIndexRange = nInitialRange;
m_nStep = nStep;
m_nIncrementValue = nStep;
m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep;
}
/// <summary>
/// Implements the core work search algorithm that will be used for this range worker.
/// </summary>
///
/// Usage pattern is:
/// 1) the thread associated with this rangeworker calls FindNewWork
/// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values
/// to execute the sequential loop
/// 3) if we return false it means there is no more work left. It's time to quit.
///
internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal)
{
// since we iterate over index ranges circularly, we will use the
// count of visited ranges as our exit condition
int numIndexRangesToVisit = m_indexRanges.Length;
do
{
// local snap to save array access bounds checks in places where we only read fields
IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange];
if (currentRange.m_bRangeFinished == 0)
{
if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null)
{
Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared<long>(0), null);
}
// this access needs to be on the array slot
long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value,
m_nIncrementValue) - m_nIncrementValue;
if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset)
{
// we found work
nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset;
nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue;
// Check for going past end of range, or wrapping
if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) )
{
nToExclusiveLocal = currentRange.m_nToExclusive;
}
// We will double our unit of increment until it reaches the maximum.
if (m_nIncrementValue < m_nMaxIncrementValue)
{
m_nIncrementValue *= 2;
if (m_nIncrementValue > m_nMaxIncrementValue)
{
m_nIncrementValue = m_nMaxIncrementValue;
}
}
return true;
}
else
{
// this index range is completed, mark it so that others can skip it quickly
Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1);
}
}
// move on to the next index range, in circular order.
m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length;
numIndexRangesToVisit--;
} while (numIndexRangesToVisit > 0);
// we've visited all index ranges possible => there's no work remaining
nFromInclusiveLocal = 0;
nToExclusiveLocal = 0;
return false;
}
/// <summary>
/// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values.
/// </summary>
internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32)
{
long nFromInclusiveLocal;
long nToExclusiveLocal;
bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal);
Debug.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) &&
(nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue));
// convert to 32 bit before returning
nFromInclusiveLocal32 = (int)nFromInclusiveLocal;
nToExclusiveLocal32 = (int)nToExclusiveLocal;
return bRetVal;
}
}
/// <summary>
/// Represents the entire loop operation, keeping track of workers and ranges.
/// </summary>
///
/// The usage pattern is:
/// 1) The Parallel loop entry function (ForWorker) creates an instance of this class
/// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a
/// RangeWorker struct to wrap the state it will need to find and execute work,
/// and they keep interacting with that struct until the end of the loop
internal class RangeManager
{
internal readonly IndexRange[] m_indexRanges;
internal int m_nCurrentIndexRangeToAssign;
internal long m_nStep;
/// <summary>
/// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges
/// </summary>
internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
{
m_nCurrentIndexRangeToAssign = 0;
m_nStep = nStep;
// Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2.
if (nNumExpectedWorkers == 1)
nNumExpectedWorkers = 2;
//
// calculate the size of each index range
//
ulong uSpan = (ulong)(nToExclusive - nFromInclusive);
ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first
uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep
// otherwise index range transitions will derail us from nStep
if (uRangeSize == 0)
{
uRangeSize = (ulong) nStep;
}
//
// find the actual number of index ranges we will need
//
Debug.Assert((uSpan / uRangeSize) < Int32.MaxValue);
int nNumRanges = (int)(uSpan / uRangeSize);
if (uSpan % uRangeSize != 0)
{
nNumRanges++;
}
// Convert to signed so the rest of the logic works.
// Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2.
long nRangeSize = (long)uRangeSize;
// allocate the array of index ranges
m_indexRanges = new IndexRange[nNumRanges];
long nCurrentIndex = nFromInclusive;
for (int i = 0; i < nNumRanges; i++)
{
// the fromInclusive of the new index range is always on nCurrentIndex
m_indexRanges[i].m_nFromInclusive = nCurrentIndex;
m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
m_indexRanges[i].m_bRangeFinished = 0;
// now increment it to find the toExclusive value for our range
nCurrentIndex += nRangeSize;
// detect integer overflow or range overage and snap to nToExclusive
if (nCurrentIndex < nCurrentIndex - nRangeSize ||
nCurrentIndex > nToExclusive)
{
// this should only happen at the last index
Debug.Assert(i == nNumRanges - 1);
nCurrentIndex = nToExclusive;
}
// now that the end point of the new range is calculated, assign it.
m_indexRanges[i].m_nToExclusive = nCurrentIndex;
}
}
/// <summary>
/// The function that needs to be called by each new worker thread servicing the parallel loop
/// in order to get a RangeWorker struct that wraps the state for finding and executing indices
/// </summary>
internal RangeWorker RegisterNewWorker()
{
Debug.Assert(m_indexRanges != null && m_indexRanges.Length != 0);
int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length;
return new RangeWorker(m_indexRanges, nInitialRange, m_nStep);
}
}
}
#pragma warning restore 0420
|