dd84479fb75d30fe3ca950a9c2a1a1e36ebb6689
[platform/upstream/dotnet/runtime.git] /
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // HashRepartitionEnumerator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10
11 using System.Collections.Generic;
12 using System.Threading;
13 using System.Diagnostics;
14
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// This enumerator handles the actual coordination among partitions required to
19     /// accomplish the repartitioning operation, as explained above.
20     /// </summary>
21     /// <typeparam name="TInputOutput">The kind of elements.</typeparam>
22     /// <typeparam name="THashKey">The key used to distribute elements.</typeparam>
23     /// <typeparam name="TIgnoreKey">The kind of keys found in the source (ignored).</typeparam>
24     internal class HashRepartitionEnumerator<TInputOutput, THashKey, TIgnoreKey> : QueryOperatorEnumerator<Pair<TInputOutput, THashKey>, int>
25
26     {
27         private const int ENUMERATION_NOT_STARTED = -1; // Sentinel to note we haven't begun enumerating yet.
28
29         private readonly int _partitionCount; // The number of partitions.
30         private readonly int _partitionIndex; // Our unique partition index.
31         private readonly Func<TInputOutput, THashKey> _keySelector; // A key-selector function.
32         private readonly HashRepartitionStream<TInputOutput, THashKey, int> _repartitionStream; // A repartitioning stream.
33         private readonly ListChunk<Pair<TInputOutput, THashKey>>[][] _valueExchangeMatrix; // Matrix to do inter-task communication.
34         private readonly QueryOperatorEnumerator<TInputOutput, TIgnoreKey> _source; // The immediate source of data.
35         private CountdownEvent _barrier; // Used to signal and wait for repartitions to complete.
36         private readonly CancellationToken _cancellationToken; // A token for canceling the process.
37         private Mutables _mutables; // Mutable fields for this enumerator.
38
39         private class Mutables
40         {
41             internal int _currentBufferIndex; // Current buffer index.
42             internal ListChunk<Pair<TInputOutput, THashKey>> _currentBuffer; // The buffer we're currently enumerating.
43             internal int _currentIndex; // Current index into the buffer.
44
45             internal Mutables()
46             {
47                 _currentBufferIndex = ENUMERATION_NOT_STARTED;
48             }
49         }
50
51         //---------------------------------------------------------------------------------------
52         // Creates a new repartitioning enumerator.
53         //
54         // Arguments:
55         //     source            - the data stream from which to pull elements
56         //     useOrdinalOrderPreservation - whether order preservation is required
57         //     partitionCount    - total number of partitions
58         //     partitionIndex    - this operator's unique partition index
59         //     repartitionStream - the stream object to use for partition selection
60         //     barrier           - a latch used to signal task completion
61         //     buffers           - a set of buffers for inter-task communication
62         //
63
64         internal HashRepartitionEnumerator(
65             QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, int partitionCount, int partitionIndex,
66             Func<TInputOutput, THashKey> keySelector, HashRepartitionStream<TInputOutput, THashKey, int> repartitionStream,
67             CountdownEvent barrier, ListChunk<Pair<TInputOutput, THashKey>>[][] valueExchangeMatrix, CancellationToken cancellationToken)
68         {
69             Debug.Assert(source != null);
70             Debug.Assert(keySelector != null || typeof(THashKey) == typeof(NoKeyMemoizationRequired));
71             Debug.Assert(repartitionStream != null);
72             Debug.Assert(barrier != null);
73             Debug.Assert(valueExchangeMatrix != null);
74             Debug.Assert(valueExchangeMatrix.GetLength(0) == partitionCount, "expected square matrix of buffers (NxN)");
75             Debug.Assert(partitionCount > 0 && valueExchangeMatrix[0].Length == partitionCount, "expected square matrix of buffers (NxN)");
76             Debug.Assert(0 <= partitionIndex && partitionIndex < partitionCount);
77
78             _source = source;
79             _partitionCount = partitionCount;
80             _partitionIndex = partitionIndex;
81             _keySelector = keySelector;
82             _repartitionStream = repartitionStream;
83             _barrier = barrier;
84             _valueExchangeMatrix = valueExchangeMatrix;
85             _cancellationToken = cancellationToken;
86         }
87
88         //---------------------------------------------------------------------------------------
89         // Retrieves the next element from this partition.  All repartitioning operators across
90         // all partitions cooperate in a barrier-style algorithm.  The first time an element is
91         // requested, the repartitioning operator will enter the 1st phase: during this phase, it
92         // scans its entire input and compute the destination partition for each element.  During
93         // the 2nd phase, each partition scans the elements found by all other partitions for
94         // it, and yield this to callers.  The only synchronization required is the barrier itself
95         // -- all other parts of this algorithm are synchronization-free.
96         //
97         // Notes: One rather large penalty that this algorithm incurs is higher memory usage and a
98         // larger time-to-first-element latency, at least compared with our old implementation; this
99         // happens because all input elements must be fetched before we can produce a single output
100         // element.  In many cases this isn't too terrible: e.g. a GroupBy requires this to occur
101         // anyway, so having the repartitioning operator do so isn't complicating matters much at all.
102         //
103
104         internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement, ref int currentKey)
105         {
106             if (_partitionCount == 1)
107             {
108                 // If there's only one partition, no need to do any sort of exchanges.
109                 TIgnoreKey keyUnused = default(TIgnoreKey);
110                 TInputOutput current = default(TInputOutput);
111 #if DEBUG
112                 currentKey = unchecked((int)0xdeadbeef);
113 #endif
114                 if (_source.MoveNext(ref current, ref keyUnused))
115                 {
116                     currentElement = new Pair<TInputOutput, THashKey>(
117                         current, _keySelector == null ? default(THashKey) : _keySelector(current));
118                     return true;
119                 }
120                 return false;
121             }
122
123             Mutables mutables = _mutables;
124             if (mutables == null)
125                 mutables = _mutables = new Mutables();
126
127             // If we haven't enumerated the source yet, do that now.  This is the first phase
128             // of a two-phase barrier style operation.
129             if (mutables._currentBufferIndex == ENUMERATION_NOT_STARTED)
130             {
131                 EnumerateAndRedistributeElements();
132                 Debug.Assert(mutables._currentBufferIndex != ENUMERATION_NOT_STARTED);
133             }
134
135             // Once we've enumerated our contents, we can then go back and walk the buffers that belong
136             // to the current partition.  This is phase two.  Note that we slyly move on to the first step
137             // of phase two before actually waiting for other partitions.  That's because we can enumerate
138             // the buffer we wrote to above, as already noted.
139             while (mutables._currentBufferIndex < _partitionCount)
140             {
141                 // If the queue is non-null and still has elements, yield them.
142                 if (mutables._currentBuffer != null)
143                 {
144                     if (++mutables._currentIndex < mutables._currentBuffer.Count)
145                     {
146                         // Return the current element.
147                         currentElement = mutables._currentBuffer._chunk[mutables._currentIndex];
148                         return true;
149                     }
150                     else
151                     {
152                         // If the chunk is empty, advance to the next one (if any).
153                         mutables._currentIndex = ENUMERATION_NOT_STARTED;
154                         mutables._currentBuffer = mutables._currentBuffer.Next;
155                         Debug.Assert(mutables._currentBuffer == null || mutables._currentBuffer.Count > 0);
156                         continue; // Go back around and invoke this same logic.
157                     }
158                 }
159
160                 // We're done with the current partition.  Slightly different logic depending on whether
161                 // we're on our own buffer or one that somebody else found for us.
162                 if (mutables._currentBufferIndex == _partitionIndex)
163                 {
164                     // We now need to wait at the barrier, in case some other threads aren't done.
165                     // Once we wake up, we reset our index and will increment it immediately after.
166                     _barrier.Wait(_cancellationToken);
167                     mutables._currentBufferIndex = ENUMERATION_NOT_STARTED;
168                 }
169
170                 // Advance to the next buffer.
171                 mutables._currentBufferIndex++;
172                 mutables._currentIndex = ENUMERATION_NOT_STARTED;
173
174                 if (mutables._currentBufferIndex == _partitionIndex)
175                 {
176                     // Skip our current buffer (since we already enumerated it).
177                     mutables._currentBufferIndex++;
178                 }
179
180                 // Assuming we're within bounds, retrieve the next buffer object.
181                 if (mutables._currentBufferIndex < _partitionCount)
182                 {
183                     mutables._currentBuffer = _valueExchangeMatrix[mutables._currentBufferIndex][_partitionIndex];
184                 }
185             }
186
187             // We're done. No more buffers to enumerate.
188             return false;
189         }
190
191         //---------------------------------------------------------------------------------------
192         // Called when this enumerator is first enumerated; it must walk through the source
193         // and redistribute elements to their slot in the exchange matrix.
194         //
195
196         private void EnumerateAndRedistributeElements()
197         {
198             Mutables mutables = _mutables;
199             Debug.Assert(mutables != null);
200
201             ListChunk<Pair<TInputOutput, THashKey>>[] privateBuffers = new ListChunk<Pair<TInputOutput, THashKey>>[_partitionCount];
202
203             TInputOutput element = default(TInputOutput);
204             TIgnoreKey ignoreKey = default(TIgnoreKey);
205             int loopCount = 0;
206             while (_source.MoveNext(ref element, ref ignoreKey))
207             {
208                 if ((loopCount++ & CancellationState.POLL_INTERVAL) == 0)
209                     CancellationState.ThrowIfCanceled(_cancellationToken);
210
211                 // Calculate the element's destination partition index, placing it into the
212                 // appropriate buffer from which partitions will later enumerate.
213                 int destinationIndex;
214                 THashKey elementHashKey = default(THashKey);
215                 if (_keySelector != null)
216                 {
217                     elementHashKey = _keySelector(element);
218                     destinationIndex = _repartitionStream.GetHashCode(elementHashKey) % _partitionCount;
219                 }
220                 else
221                 {
222                     Debug.Assert(typeof(THashKey) == typeof(NoKeyMemoizationRequired));
223                     destinationIndex = _repartitionStream.GetHashCode(element) % _partitionCount;
224                 }
225
226                 Debug.Assert(0 <= destinationIndex && destinationIndex < _partitionCount,
227                                 "destination partition outside of the legal range of partitions");
228
229                 // Get the buffer for the destination partition, lazily allocating if needed.  We maintain
230                 // this list in our own private cache so that we avoid accessing shared memory locations
231                 // too much.  In the original implementation, we'd access the buffer in the matrix ([N,M],
232                 // where N is the current partition and M is the destination), but some rudimentary
233                 // performance profiling indicates copying at the end performs better.
234                 ListChunk<Pair<TInputOutput, THashKey>> buffer = privateBuffers[destinationIndex];
235                 if (buffer == null)
236                 {
237                     const int INITIAL_PRIVATE_BUFFER_SIZE = 128;
238                     privateBuffers[destinationIndex] = buffer = new ListChunk<Pair<TInputOutput, THashKey>>(INITIAL_PRIVATE_BUFFER_SIZE);
239                 }
240
241                 buffer.Add(new Pair<TInputOutput, THashKey>(element, elementHashKey));
242             }
243
244             // Copy the local buffers to the shared space and then signal to other threads that
245             // we are done.  We can then immediately move on to enumerating the elements we found
246             // for the current partition before waiting at the barrier.  If we found a lot, we will
247             // hopefully never have to physically wait.
248             for (int i = 0; i < _partitionCount; i++)
249             {
250                 _valueExchangeMatrix[_partitionIndex][i] = privateBuffers[i];
251             }
252
253             _barrier.Signal();
254
255             // Begin at our own buffer.
256             mutables._currentBufferIndex = _partitionIndex;
257             mutables._currentBuffer = privateBuffers[_partitionIndex];
258             mutables._currentIndex = ENUMERATION_NOT_STARTED;
259         }
260
261         protected override void Dispose(bool disposed)
262         {
263             if (_barrier != null)
264             {
265                 // Since this enumerator is being disposed, we will decrement the barrier,
266                 // in case other enumerators will wait on the barrier.
267                 if (_mutables == null || (_mutables._currentBufferIndex == ENUMERATION_NOT_STARTED))
268                 {
269                     _barrier.Signal();
270                     _barrier = null;
271                 }
272
273                 _source.Dispose();
274             }
275         }
276     }
277 }