62ec3095c3aab4c0079db31f744a1b1d53024024
[platform/upstream/dotnet/runtime.git] /
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // NullableDoubleSumAggregationOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// An inlined sum aggregation and its enumerator, for nullable doubles.
19     /// </summary>
20     internal sealed class NullableDoubleSumAggregationOperator : InlinedAggregationOperator<double?, double?, double?>
21     {
22         //---------------------------------------------------------------------------------------
23         // Constructs a new instance of a sum associative operator.
24         //
25
26         internal NullableDoubleSumAggregationOperator(IEnumerable<double?> child) : base(child)
27         {
28         }
29
30         //---------------------------------------------------------------------------------------
31         // Executes the entire query tree, and aggregates the intermediate results into the
32         // final result based on the binary operators and final reduction.
33         //
34         // Return Value:
35         //     The single result of aggregation.
36         //
37
38         protected override double? InternalAggregate(ref Exception singularExceptionToThrow)
39         {
40             // Because the final reduction is typically much cheaper than the intermediate
41             // reductions over the individual partitions, and because each parallel partition
42             // will do a lot of work to produce a single output element, we prefer to turn off
43             // pipelining, and process the final reductions serially.
44             using (IEnumerator<double?> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
45             {
46                 // We just reduce the elements in each output partition.
47                 double sum = 0.0;
48                 while (enumerator.MoveNext())
49                 {
50                     sum += enumerator.Current.GetValueOrDefault();
51                 }
52
53                 return sum;
54             }
55         }
56
57         //---------------------------------------------------------------------------------------
58         // Creates an enumerator that is used internally for the final aggregation step.
59         //
60
61         protected override QueryOperatorEnumerator<double?, int> CreateEnumerator<TKey>(
62             int index, int count, QueryOperatorEnumerator<double?, TKey> source, object sharedData, CancellationToken cancellationToken)
63         {
64             return new NullableDoubleSumAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
65         }
66
67         //---------------------------------------------------------------------------------------
68         // This enumerator type encapsulates the intermediary aggregation over the underlying
69         // (possibly partitioned) data source.
70         //
71
72         private class NullableDoubleSumAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<double?>
73         {
74             private readonly QueryOperatorEnumerator<double?, TKey> _source; // The source data.
75
76             //---------------------------------------------------------------------------------------
77             // Instantiates a new aggregation operator.
78             //
79
80             internal NullableDoubleSumAggregationOperatorEnumerator(QueryOperatorEnumerator<double?, TKey> source, int partitionIndex,
81                 CancellationToken cancellationToken) :
82                 base(partitionIndex, cancellationToken)
83             {
84                 Debug.Assert(source != null);
85                 _source = source;
86             }
87
88             //---------------------------------------------------------------------------------------
89             // Tallies up the sum of the underlying data source, walking the entire thing the first
90             // time MoveNext is called on this object.
91             //
92
93             protected override bool MoveNextCore(ref double? currentElement)
94             {
95                 double? element = default(double?);
96                 TKey keyUnused = default(TKey);
97
98                 QueryOperatorEnumerator<double?, TKey> source = _source;
99                 if (source.MoveNext(ref element, ref keyUnused))
100                 {
101                     // We just scroll through the enumerator and accumulate the sum.
102                     double tempSum = 0.0;
103                     int i = 0;
104                     do
105                     {
106                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
107                             CancellationState.ThrowIfCanceled(_cancellationToken);
108
109                         tempSum += element.GetValueOrDefault();
110                     }
111                     while (source.MoveNext(ref element, ref keyUnused));
112
113                     // The sum has been calculated. Now just return.
114                     currentElement = tempSum;
115                     return true;
116                 }
117
118                 return false;
119             }
120
121
122             //---------------------------------------------------------------------------------------
123             // Dispose of resources associated with the underlying enumerator.
124             //
125
126             protected override void Dispose(bool disposing)
127             {
128                 Debug.Assert(_source != null);
129                 _source.Dispose();
130             }
131         }
132     }
133 }