8e191f77146efe7a4620d0a94a75776bb15a6f1f
[platform/upstream/dotnet/runtime.git] /
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // ExceptQueryOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// Operator that yields the elements from the first data source that aren't in the second.
19     /// This is known as the set relative complement, i.e. left - right.
20     /// </summary>
21     /// <typeparam name="TInputOutput"></typeparam>
22     internal sealed class ExceptQueryOperator<TInputOutput> :
23         BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
24     {
25         private readonly IEqualityComparer<TInputOutput> _comparer; // An equality comparer.
26
27         //---------------------------------------------------------------------------------------
28         // Constructs a new set except operator.
29         //
30
31         internal ExceptQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
32             : base(left, right)
33         {
34             Debug.Assert(left != null && right != null, "child data sources cannot be null");
35             _comparer = comparer;
36             _outputOrdered = LeftChild.OutputOrdered;
37             SetOrdinalIndex(OrdinalIndexState.Shuffled);
38         }
39
40         internal override QueryResults<TInputOutput> Open(
41             QuerySettings settings, bool preferStriping)
42         {
43             // We just open our child operators, left and then right.  Do not propagate the preferStriping value, but
44             // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
45             // partitioning, the output will be hash-partitioned.
46             QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
47             QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
48
49             return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
50         }
51
52         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
53             PartitionedStream<TInputOutput, TLeftKey> leftStream, PartitionedStream<TInputOutput, TRightKey> rightStream,
54             IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
55         {
56             Debug.Assert(leftStream.PartitionCount == rightStream.PartitionCount);
57
58             if (OutputOrdered)
59             {
60                 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
61                     ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
62                         leftStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
63                     rightStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
64             }
65             else
66             {
67                 WrapPartitionedStreamHelper<int, TRightKey>(
68                     ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
69                         leftStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
70                     rightStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
71             }
72         }
73
74         //---------------------------------------------------------------------------------------
75         // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
76         // to be, and then call this method with that key as a generic parameter.
77         //
78
79         private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
80             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
81             IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)
82         {
83             int partitionCount = leftHashStream.PartitionCount;
84
85             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
86                 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
87                     rightPartitionedStream, null, null, _comparer, cancellationToken);
88
89             PartitionedStream<TInputOutput, TLeftKey> outputStream =
90                 new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
91
92             for (int i = 0; i < partitionCount; i++)
93             {
94                 if (OutputOrdered)
95                 {
96                     outputStream[i] = new OrderedExceptQueryOperatorEnumerator<TLeftKey>(
97                         leftHashStream[i], rightHashStream[i], _comparer, leftHashStream.KeyComparer, cancellationToken);
98                 }
99                 else
100                 {
101                     outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object)
102                         new ExceptQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], _comparer, cancellationToken);
103                 }
104             }
105
106             outputRecipient.Receive(outputStream);
107         }
108
109
110         //---------------------------------------------------------------------------------------
111         // Returns an enumerable that represents the query executing sequentially.
112         //
113
114         internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
115         {
116             IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
117             IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
118             return wrappedLeftChild.Except(wrappedRightChild, _comparer);
119         }
120
121         //---------------------------------------------------------------------------------------
122         // Whether this operator performs a premature merge that would not be performed in
123         // a similar sequential operation (i.e., in LINQ to Objects).
124         //
125
126         internal override bool LimitsParallelism
127         {
128             get { return false; }
129         }
130
131         //---------------------------------------------------------------------------------------
132         // This enumerator calculates the distinct set incrementally. It does this by maintaining
133         // a history -- in the form of a set -- of all data already seen. It then only returns
134         // elements that have not yet been seen.
135         //
136
137         private class ExceptQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int>
138         {
139             private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
140             private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
141             private readonly IEqualityComparer<TInputOutput> _comparer; // A comparer used for equality checks/hash-coding.
142             private Set<TInputOutput> _hashLookup; // The hash lookup, used to produce the distinct set.
143             private readonly CancellationToken _cancellationToken;
144             private Shared<int> _outputLoopCount;
145
146             //---------------------------------------------------------------------------------------
147             // Instantiates a new except query operator enumerator.
148             //
149
150             internal ExceptQueryOperatorEnumerator(
151                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
152                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
153                 IEqualityComparer<TInputOutput> comparer,
154                 CancellationToken cancellationToken)
155             {
156                 Debug.Assert(leftSource != null);
157                 Debug.Assert(rightSource != null);
158
159                 _leftSource = leftSource;
160                 _rightSource = rightSource;
161                 _comparer = comparer;
162                 _cancellationToken = cancellationToken;
163             }
164
165             //---------------------------------------------------------------------------------------
166             // Walks the two data sources, left and then right, to produce the distinct set
167             //
168
169             internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
170             {
171                 Debug.Assert(_leftSource != null);
172                 Debug.Assert(_rightSource != null);
173
174                 // Build the set out of the left data source, if we haven't already.
175
176                 if (_hashLookup == null)
177                 {
178                     _outputLoopCount = new Shared<int>(0);
179
180                     _hashLookup = new Set<TInputOutput>(_comparer);
181
182                     Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
183                     int rightKeyUnused = default(int);
184
185                     int i = 0;
186                     while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
187                     {
188                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
189                             CancellationState.ThrowIfCanceled(_cancellationToken);
190
191                         _hashLookup.Add(rightElement.First);
192                     }
193                 }
194
195                 // Now iterate over the right data source, looking for matches.
196                 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
197                 TLeftKey leftKeyUnused = default(TLeftKey);
198
199                 while (_leftSource.MoveNext(ref leftElement, ref leftKeyUnused))
200                 {
201                     if ((_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
202                         CancellationState.ThrowIfCanceled(_cancellationToken);
203
204                     if (_hashLookup.Add(leftElement.First))
205                     {
206                         // This element has never been seen. Return it.
207                         currentElement = leftElement.First;
208 #if DEBUG
209                         currentKey = unchecked((int)0xdeadbeef);
210 #endif
211                         return true;
212                     }
213                 }
214
215                 return false;
216             }
217
218             protected override void Dispose(bool disposing)
219             {
220                 Debug.Assert(_leftSource != null && _rightSource != null);
221                 _leftSource.Dispose();
222                 _rightSource.Dispose();
223             }
224         }
225
226         private class OrderedExceptQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey>
227         {
228             private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
229             private readonly QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
230             private readonly IEqualityComparer<TInputOutput> _comparer; // A comparer used for equality checks/hash-coding.
231             private readonly IComparer<TLeftKey> _leftKeyComparer; // A comparer for order keys.
232             private IEnumerator<KeyValuePair<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>> _outputEnumerator; // The enumerator output elements + order keys.
233             private readonly CancellationToken _cancellationToken;
234
235             //---------------------------------------------------------------------------------------
236             // Instantiates a new except query operator enumerator.
237             //
238
239             internal OrderedExceptQueryOperatorEnumerator(
240                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
241                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
242                 IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer,
243                 CancellationToken cancellationToken)
244             {
245                 Debug.Assert(leftSource != null);
246                 Debug.Assert(rightSource != null);
247
248                 _leftSource = leftSource;
249                 _rightSource = rightSource;
250                 _comparer = comparer;
251                 _leftKeyComparer = leftKeyComparer;
252                 _cancellationToken = cancellationToken;
253             }
254
255             //---------------------------------------------------------------------------------------
256             // Walks the two data sources, left and then right, to produce the distinct set
257             //
258
259             internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)
260             {
261                 Debug.Assert(_leftSource != null);
262                 Debug.Assert(_rightSource != null);
263
264                 // Build the set out of the left data source, if we haven't already.
265                 if (_outputEnumerator == null)
266                 {
267                     Set<TInputOutput> rightLookup = new Set<TInputOutput>(_comparer);
268
269                     Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
270                     int rightKeyUnused = default(int);
271                     int i = 0;
272                     while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
273                     {
274                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
275                             CancellationState.ThrowIfCanceled(_cancellationToken);
276
277                         rightLookup.Add(rightElement.First);
278                     }
279
280                     var leftLookup =
281                         new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(
282                             new WrapperEqualityComparer<TInputOutput>(_comparer));
283
284                     Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
285                     TLeftKey leftKey = default(TLeftKey);
286                     while (_leftSource.MoveNext(ref leftElement, ref leftKey))
287                     {
288                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
289                             CancellationState.ThrowIfCanceled(_cancellationToken);
290
291                         if (rightLookup.Contains(leftElement.First))
292                         {
293                             continue;
294                         }
295
296                         Pair<TInputOutput, TLeftKey> oldEntry;
297                         Wrapper<TInputOutput> wrappedLeftElement = new Wrapper<TInputOutput>(leftElement.First);
298                         if (!leftLookup.TryGetValue(wrappedLeftElement, out oldEntry) || _leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
299                         {
300                             // For each "elem" value, we store the smallest key, and the element value that had that key.
301                             // Note that even though two element values are "equal" according to the EqualityComparer,
302                             // we still cannot choose arbitrarily which of the two to yield.
303                             leftLookup[wrappedLeftElement] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey);
304                         }
305                     }
306
307                     _outputEnumerator = leftLookup.GetEnumerator();
308                 }
309
310                 if (_outputEnumerator.MoveNext())
311                 {
312                     Pair<TInputOutput, TLeftKey> currentPair = _outputEnumerator.Current.Value;
313                     currentElement = currentPair.First;
314                     currentKey = currentPair.Second;
315                     return true;
316                 }
317
318                 return false;
319             }
320
321             protected override void Dispose(bool disposing)
322             {
323                 Debug.Assert(_leftSource != null && _rightSource != null);
324                 _leftSource.Dispose();
325                 _rightSource.Dispose();
326             }
327         }
328     }
329 }