1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 // HashRepartitionEnumerator.cs
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
11 using System.Collections.Generic;
12 using System.Threading;
13 using System.Diagnostics;
15 namespace System.Linq.Parallel
18 /// This enumerator handles the actual coordination among partitions required to
19 /// accomplish the repartitioning operation, as explained above.
21 /// <typeparam name="TInputOutput">The kind of elements.</typeparam>
22 /// <typeparam name="THashKey">The key used to distribute elements.</typeparam>
23 /// <typeparam name="TIgnoreKey">The kind of keys found in the source (ignored).</typeparam>
24 internal class HashRepartitionEnumerator<TInputOutput, THashKey, TIgnoreKey> : QueryOperatorEnumerator<Pair<TInputOutput, THashKey>, int>
27 private const int ENUMERATION_NOT_STARTED = -1; // Sentinel to note we haven't begun enumerating yet.
29 private readonly int _partitionCount; // The number of partitions.
30 private readonly int _partitionIndex; // Our unique partition index.
31 private readonly Func<TInputOutput, THashKey> _keySelector; // A key-selector function.
32 private readonly HashRepartitionStream<TInputOutput, THashKey, int> _repartitionStream; // A repartitioning stream.
33 private readonly ListChunk<Pair<TInputOutput, THashKey>>[][] _valueExchangeMatrix; // Matrix to do inter-task communication.
34 private readonly QueryOperatorEnumerator<TInputOutput, TIgnoreKey> _source; // The immediate source of data.
35 private CountdownEvent _barrier; // Used to signal and wait for repartitions to complete.
36 private readonly CancellationToken _cancellationToken; // A token for canceling the process.
37 private Mutables _mutables; // Mutable fields for this enumerator.
39 private class Mutables
41 internal int _currentBufferIndex; // Current buffer index.
42 internal ListChunk<Pair<TInputOutput, THashKey>> _currentBuffer; // The buffer we're currently enumerating.
43 internal int _currentIndex; // Current index into the buffer.
47 _currentBufferIndex = ENUMERATION_NOT_STARTED;
51 //---------------------------------------------------------------------------------------
52 // Creates a new repartitioning enumerator.
55 // source - the data stream from which to pull elements
56 // useOrdinalOrderPreservation - whether order preservation is required
57 // partitionCount - total number of partitions
58 // partitionIndex - this operator's unique partition index
59 // repartitionStream - the stream object to use for partition selection
60 // barrier - a latch used to signal task completion
61 // buffers - a set of buffers for inter-task communication
64 internal HashRepartitionEnumerator(
65 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, int partitionCount, int partitionIndex,
66 Func<TInputOutput, THashKey> keySelector, HashRepartitionStream<TInputOutput, THashKey, int> repartitionStream,
67 CountdownEvent barrier, ListChunk<Pair<TInputOutput, THashKey>>[][] valueExchangeMatrix, CancellationToken cancellationToken)
69 Debug.Assert(source != null);
70 Debug.Assert(keySelector != null || typeof(THashKey) == typeof(NoKeyMemoizationRequired));
71 Debug.Assert(repartitionStream != null);
72 Debug.Assert(barrier != null);
73 Debug.Assert(valueExchangeMatrix != null);
74 Debug.Assert(valueExchangeMatrix.GetLength(0) == partitionCount, "expected square matrix of buffers (NxN)");
75 Debug.Assert(partitionCount > 0 && valueExchangeMatrix[0].Length == partitionCount, "expected square matrix of buffers (NxN)");
76 Debug.Assert(0 <= partitionIndex && partitionIndex < partitionCount);
79 _partitionCount = partitionCount;
80 _partitionIndex = partitionIndex;
81 _keySelector = keySelector;
82 _repartitionStream = repartitionStream;
84 _valueExchangeMatrix = valueExchangeMatrix;
85 _cancellationToken = cancellationToken;
88 //---------------------------------------------------------------------------------------
89 // Retrieves the next element from this partition. All repartitioning operators across
90 // all partitions cooperate in a barrier-style algorithm. The first time an element is
91 // requested, the repartitioning operator will enter the 1st phase: during this phase, it
92 // scans its entire input and compute the destination partition for each element. During
93 // the 2nd phase, each partition scans the elements found by all other partitions for
94 // it, and yield this to callers. The only synchronization required is the barrier itself
95 // -- all other parts of this algorithm are synchronization-free.
97 // Notes: One rather large penalty that this algorithm incurs is higher memory usage and a
98 // larger time-to-first-element latency, at least compared with our old implementation; this
99 // happens because all input elements must be fetched before we can produce a single output
100 // element. In many cases this isn't too terrible: e.g. a GroupBy requires this to occur
101 // anyway, so having the repartitioning operator do so isn't complicating matters much at all.
104 internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement, ref int currentKey)
106 if (_partitionCount == 1)
108 // If there's only one partition, no need to do any sort of exchanges.
109 TIgnoreKey keyUnused = default(TIgnoreKey);
110 TInputOutput current = default(TInputOutput);
112 currentKey = unchecked((int)0xdeadbeef);
114 if (_source.MoveNext(ref current, ref keyUnused))
116 currentElement = new Pair<TInputOutput, THashKey>(
117 current, _keySelector == null ? default(THashKey) : _keySelector(current));
123 Mutables mutables = _mutables;
124 if (mutables == null)
125 mutables = _mutables = new Mutables();
127 // If we haven't enumerated the source yet, do that now. This is the first phase
128 // of a two-phase barrier style operation.
129 if (mutables._currentBufferIndex == ENUMERATION_NOT_STARTED)
131 EnumerateAndRedistributeElements();
132 Debug.Assert(mutables._currentBufferIndex != ENUMERATION_NOT_STARTED);
135 // Once we've enumerated our contents, we can then go back and walk the buffers that belong
136 // to the current partition. This is phase two. Note that we slyly move on to the first step
137 // of phase two before actually waiting for other partitions. That's because we can enumerate
138 // the buffer we wrote to above, as already noted.
139 while (mutables._currentBufferIndex < _partitionCount)
141 // If the queue is non-null and still has elements, yield them.
142 if (mutables._currentBuffer != null)
144 if (++mutables._currentIndex < mutables._currentBuffer.Count)
146 // Return the current element.
147 currentElement = mutables._currentBuffer._chunk[mutables._currentIndex];
152 // If the chunk is empty, advance to the next one (if any).
153 mutables._currentIndex = ENUMERATION_NOT_STARTED;
154 mutables._currentBuffer = mutables._currentBuffer.Next;
155 Debug.Assert(mutables._currentBuffer == null || mutables._currentBuffer.Count > 0);
156 continue; // Go back around and invoke this same logic.
160 // We're done with the current partition. Slightly different logic depending on whether
161 // we're on our own buffer or one that somebody else found for us.
162 if (mutables._currentBufferIndex == _partitionIndex)
164 // We now need to wait at the barrier, in case some other threads aren't done.
165 // Once we wake up, we reset our index and will increment it immediately after.
166 _barrier.Wait(_cancellationToken);
167 mutables._currentBufferIndex = ENUMERATION_NOT_STARTED;
170 // Advance to the next buffer.
171 mutables._currentBufferIndex++;
172 mutables._currentIndex = ENUMERATION_NOT_STARTED;
174 if (mutables._currentBufferIndex == _partitionIndex)
176 // Skip our current buffer (since we already enumerated it).
177 mutables._currentBufferIndex++;
180 // Assuming we're within bounds, retrieve the next buffer object.
181 if (mutables._currentBufferIndex < _partitionCount)
183 mutables._currentBuffer = _valueExchangeMatrix[mutables._currentBufferIndex][_partitionIndex];
187 // We're done. No more buffers to enumerate.
191 //---------------------------------------------------------------------------------------
192 // Called when this enumerator is first enumerated; it must walk through the source
193 // and redistribute elements to their slot in the exchange matrix.
196 private void EnumerateAndRedistributeElements()
198 Mutables mutables = _mutables;
199 Debug.Assert(mutables != null);
201 ListChunk<Pair<TInputOutput, THashKey>>[] privateBuffers = new ListChunk<Pair<TInputOutput, THashKey>>[_partitionCount];
203 TInputOutput element = default(TInputOutput);
204 TIgnoreKey ignoreKey = default(TIgnoreKey);
206 while (_source.MoveNext(ref element, ref ignoreKey))
208 if ((loopCount++ & CancellationState.POLL_INTERVAL) == 0)
209 CancellationState.ThrowIfCanceled(_cancellationToken);
211 // Calculate the element's destination partition index, placing it into the
212 // appropriate buffer from which partitions will later enumerate.
213 int destinationIndex;
214 THashKey elementHashKey = default(THashKey);
215 if (_keySelector != null)
217 elementHashKey = _keySelector(element);
218 destinationIndex = _repartitionStream.GetHashCode(elementHashKey) % _partitionCount;
222 Debug.Assert(typeof(THashKey) == typeof(NoKeyMemoizationRequired));
223 destinationIndex = _repartitionStream.GetHashCode(element) % _partitionCount;
226 Debug.Assert(0 <= destinationIndex && destinationIndex < _partitionCount,
227 "destination partition outside of the legal range of partitions");
229 // Get the buffer for the destination partition, lazily allocating if needed. We maintain
230 // this list in our own private cache so that we avoid accessing shared memory locations
231 // too much. In the original implementation, we'd access the buffer in the matrix ([N,M],
232 // where N is the current partition and M is the destination), but some rudimentary
233 // performance profiling indicates copying at the end performs better.
234 ListChunk<Pair<TInputOutput, THashKey>> buffer = privateBuffers[destinationIndex];
237 const int INITIAL_PRIVATE_BUFFER_SIZE = 128;
238 privateBuffers[destinationIndex] = buffer = new ListChunk<Pair<TInputOutput, THashKey>>(INITIAL_PRIVATE_BUFFER_SIZE);
241 buffer.Add(new Pair<TInputOutput, THashKey>(element, elementHashKey));
244 // Copy the local buffers to the shared space and then signal to other threads that
245 // we are done. We can then immediately move on to enumerating the elements we found
246 // for the current partition before waiting at the barrier. If we found a lot, we will
247 // hopefully never have to physically wait.
248 for (int i = 0; i < _partitionCount; i++)
250 _valueExchangeMatrix[_partitionIndex][i] = privateBuffers[i];
255 // Begin at our own buffer.
256 mutables._currentBufferIndex = _partitionIndex;
257 mutables._currentBuffer = privateBuffers[_partitionIndex];
258 mutables._currentIndex = ENUMERATION_NOT_STARTED;
261 protected override void Dispose(bool disposed)
263 if (_barrier != null)
265 // Since this enumerator is being disposed, we will decrement the barrier,
266 // in case other enumerators will wait on the barrier.
267 if (_mutables == null || (_mutables._currentBufferIndex == ENUMERATION_NOT_STARTED))