1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 // ExceptQueryOperator.cs
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
15 namespace System.Linq.Parallel
18 /// Operator that yields the elements from the first data source that aren't in the second.
19 /// This is known as the set relative complement, i.e. left - right.
21 /// <typeparam name="TInputOutput"></typeparam>
22 internal sealed class ExceptQueryOperator<TInputOutput> :
23 BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
25 private readonly IEqualityComparer<TInputOutput> _comparer; // An equality comparer.
27 //---------------------------------------------------------------------------------------
28 // Constructs a new set except operator.
31 internal ExceptQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
34 Debug.Assert(left != null && right != null, "child data sources cannot be null");
36 _outputOrdered = LeftChild.OutputOrdered;
37 SetOrdinalIndex(OrdinalIndexState.Shuffled);
40 internal override QueryResults<TInputOutput> Open(
41 QuerySettings settings, bool preferStriping)
43 // We just open our child operators, left and then right. Do not propagate the preferStriping value, but
44 // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
45 // partitioning, the output will be hash-partitioned.
46 QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
47 QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
49 return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
52 public override void WrapPartitionedStream<TLeftKey, TRightKey>(
53 PartitionedStream<TInputOutput, TLeftKey> leftStream, PartitionedStream<TInputOutput, TRightKey> rightStream,
54 IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
56 Debug.Assert(leftStream.PartitionCount == rightStream.PartitionCount);
60 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
61 ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
62 leftStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
63 rightStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
67 WrapPartitionedStreamHelper<int, TRightKey>(
68 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
69 leftStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
70 rightStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
74 //---------------------------------------------------------------------------------------
75 // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
76 // to be, and then call this method with that key as a generic parameter.
79 private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
80 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
81 IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)
83 int partitionCount = leftHashStream.PartitionCount;
85 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
86 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
87 rightPartitionedStream, null, null, _comparer, cancellationToken);
89 PartitionedStream<TInputOutput, TLeftKey> outputStream =
90 new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
92 for (int i = 0; i < partitionCount; i++)
96 outputStream[i] = new OrderedExceptQueryOperatorEnumerator<TLeftKey>(
97 leftHashStream[i], rightHashStream[i], _comparer, leftHashStream.KeyComparer, cancellationToken);
101 outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object)
102 new ExceptQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], _comparer, cancellationToken);
106 outputRecipient.Receive(outputStream);
110 //---------------------------------------------------------------------------------------
111 // Returns an enumerable that represents the query executing sequentially.
114 internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
116 IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
117 IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
118 return wrappedLeftChild.Except(wrappedRightChild, _comparer);
121 //---------------------------------------------------------------------------------------
122 // Whether this operator performs a premature merge that would not be performed in
123 // a similar sequential operation (i.e., in LINQ to Objects).
126 internal override bool LimitsParallelism
128 get { return false; }
131 //---------------------------------------------------------------------------------------
132 // This enumerator calculates the distinct set incrementally. It does this by maintaining
133 // a history -- in the form of a set -- of all data already seen. It then only returns
134 // elements that have not yet been seen.
137 private class ExceptQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int>
139 private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
140 private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
141 private readonly IEqualityComparer<TInputOutput> _comparer; // A comparer used for equality checks/hash-coding.
142 private Set<TInputOutput> _hashLookup; // The hash lookup, used to produce the distinct set.
143 private readonly CancellationToken _cancellationToken;
144 private Shared<int> _outputLoopCount;
146 //---------------------------------------------------------------------------------------
147 // Instantiates a new except query operator enumerator.
150 internal ExceptQueryOperatorEnumerator(
151 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
152 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
153 IEqualityComparer<TInputOutput> comparer,
154 CancellationToken cancellationToken)
156 Debug.Assert(leftSource != null);
157 Debug.Assert(rightSource != null);
159 _leftSource = leftSource;
160 _rightSource = rightSource;
161 _comparer = comparer;
162 _cancellationToken = cancellationToken;
165 //---------------------------------------------------------------------------------------
166 // Walks the two data sources, left and then right, to produce the distinct set
169 internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
171 Debug.Assert(_leftSource != null);
172 Debug.Assert(_rightSource != null);
174 // Build the set out of the left data source, if we haven't already.
176 if (_hashLookup == null)
178 _outputLoopCount = new Shared<int>(0);
180 _hashLookup = new Set<TInputOutput>(_comparer);
182 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
183 int rightKeyUnused = default(int);
186 while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
188 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
189 CancellationState.ThrowIfCanceled(_cancellationToken);
191 _hashLookup.Add(rightElement.First);
195 // Now iterate over the right data source, looking for matches.
196 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
197 TLeftKey leftKeyUnused = default(TLeftKey);
199 while (_leftSource.MoveNext(ref leftElement, ref leftKeyUnused))
201 if ((_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
202 CancellationState.ThrowIfCanceled(_cancellationToken);
204 if (_hashLookup.Add(leftElement.First))
206 // This element has never been seen. Return it.
207 currentElement = leftElement.First;
209 currentKey = unchecked((int)0xdeadbeef);
218 protected override void Dispose(bool disposing)
220 Debug.Assert(_leftSource != null && _rightSource != null);
221 _leftSource.Dispose();
222 _rightSource.Dispose();
226 private class OrderedExceptQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey>
228 private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
229 private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
230 private readonly IEqualityComparer<TInputOutput> _comparer; // A comparer used for equality checks/hash-coding.
231 private readonly IComparer<TLeftKey> _leftKeyComparer; // A comparer for order keys.
232 private IEnumerator<KeyValuePair<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>> _outputEnumerator; // The enumerator output elements + order keys.
233 private readonly CancellationToken _cancellationToken;
235 //---------------------------------------------------------------------------------------
236 // Instantiates a new except query operator enumerator.
239 internal OrderedExceptQueryOperatorEnumerator(
240 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
241 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
242 IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer,
243 CancellationToken cancellationToken)
245 Debug.Assert(leftSource != null);
246 Debug.Assert(rightSource != null);
248 _leftSource = leftSource;
249 _rightSource = rightSource;
250 _comparer = comparer;
251 _leftKeyComparer = leftKeyComparer;
252 _cancellationToken = cancellationToken;
255 //---------------------------------------------------------------------------------------
256 // Walks the two data sources, left and then right, to produce the distinct set
259 internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)
261 Debug.Assert(_leftSource != null);
262 Debug.Assert(_rightSource != null);
264 // Build the set out of the left data source, if we haven't already.
265 if (_outputEnumerator == null)
267 Set<TInputOutput> rightLookup = new Set<TInputOutput>(_comparer);
269 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
270 int rightKeyUnused = default(int);
272 while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
274 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
275 CancellationState.ThrowIfCanceled(_cancellationToken);
277 rightLookup.Add(rightElement.First);
281 new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(
282 new WrapperEqualityComparer<TInputOutput>(_comparer));
284 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
285 TLeftKey leftKey = default(TLeftKey);
286 while (_leftSource.MoveNext(ref leftElement, ref leftKey))
288 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
289 CancellationState.ThrowIfCanceled(_cancellationToken);
291 if (rightLookup.Contains(leftElement.First))
296 Pair<TInputOutput, TLeftKey> oldEntry;
297 Wrapper<TInputOutput> wrappedLeftElement = new Wrapper<TInputOutput>(leftElement.First);
298 if (!leftLookup.TryGetValue(wrappedLeftElement, out oldEntry) || _leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
300 // For each "elem" value, we store the smallest key, and the element value that had that key.
301 // Note that even though two element values are "equal" according to the EqualityComparer,
302 // we still cannot choose arbitrarily which of the two to yield.
303 leftLookup[wrappedLeftElement] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey);
307 _outputEnumerator = leftLookup.GetEnumerator();
310 if (_outputEnumerator.MoveNext())
312 Pair<TInputOutput, TLeftKey> currentPair = _outputEnumerator.Current.Value;
313 currentElement = currentPair.First;
314 currentKey = currentPair.Second;
321 protected override void Dispose(bool disposing)
323 Debug.Assert(_leftSource != null && _rightSource != null);
324 _leftSource.Dispose();
325 _rightSource.Dispose();