526b92a6ed4426a2cc482a778e967b764f6539ec
[platform/upstream/dotnet/runtime.git] /
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // DoubleAverageAggregationOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// An inlined average aggregation operator and its enumerator, for doubles.
19     /// </summary>
20     internal sealed class DoubleAverageAggregationOperator : InlinedAggregationOperator<double, Pair<double, long>, double>
21     {
22         //---------------------------------------------------------------------------------------
23         // Constructs a new instance of an average associative operator.
24         //
25
26         internal DoubleAverageAggregationOperator(IEnumerable<double> child) : base(child)
27         {
28         }
29
30         //---------------------------------------------------------------------------------------
31         // Executes the entire query tree, and aggregates the intermediate results into the
32         // final result based on the binary operators and final reduction.
33         //
34         // Return Value:
35         //     The single result of aggregation.
36         //
37
38         protected override double InternalAggregate(ref Exception singularExceptionToThrow)
39         {
40             // Because the final reduction is typically much cheaper than the intermediate
41             // reductions over the individual partitions, and because each parallel partition
42             // will do a lot of work to produce a single output element, we prefer to turn off
43             // pipelining, and process the final reductions serially.
44             using (IEnumerator<Pair<double, long>> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
45             {
46                 // Throw an error for empty results.
47                 if (!enumerator.MoveNext())
48                 {
49                     singularExceptionToThrow = new InvalidOperationException(SR.NoElements);
50                     return default(double);
51                 }
52
53                 Pair<double, long> result = enumerator.Current;
54
55                 // Simply add together the sums and totals.
56                 while (enumerator.MoveNext())
57                 {
58                     checked
59                     {
60                         result.First += enumerator.Current.First;
61                         result.Second += enumerator.Current.Second;
62                     }
63                 }
64
65                 // And divide the sum by the total to obtain the final result.
66                 return result.First / result.Second;
67             }
68         }
69
70         //---------------------------------------------------------------------------------------
71         // Creates an enumerator that is used internally for the final aggregation step.
72         //
73
74         protected override QueryOperatorEnumerator<Pair<double, long>, int> CreateEnumerator<TKey>(
75             int index, int count, QueryOperatorEnumerator<double, TKey> source, object sharedData,
76             CancellationToken cancellationToken)
77         {
78             return new DoubleAverageAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
79         }
80
81         //---------------------------------------------------------------------------------------
82         // This enumerator type encapsulates the intermediary aggregation over the underlying
83         // (possibly partitioned) data source.
84         //
85
86         private class DoubleAverageAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<Pair<double, long>>
87         {
88             private readonly QueryOperatorEnumerator<double, TKey> _source; // The source data.
89
90             //---------------------------------------------------------------------------------------
91             // Instantiates a new aggregation operator.
92             //
93
94             internal DoubleAverageAggregationOperatorEnumerator(QueryOperatorEnumerator<double, TKey> source, int partitionIndex,
95                 CancellationToken cancellationToken) :
96                 base(partitionIndex, cancellationToken)
97             {
98                 Debug.Assert(source != null);
99                 _source = source;
100             }
101
102             //---------------------------------------------------------------------------------------
103             // Tallies up the average of the underlying data source, walking the entire thing the first
104             // time MoveNext is called on this object.
105             //
106
107             protected override bool MoveNextCore(ref Pair<double, long> currentElement)
108             {
109                 // The temporary result contains the running sum and count, respectively.
110                 double sum = 0.0;
111                 long count = 0;
112
113                 QueryOperatorEnumerator<double, TKey> source = _source;
114                 double current = default(double);
115                 TKey keyUnused = default(TKey);
116
117                 if (source.MoveNext(ref current, ref keyUnused))
118                 {
119                     int i = 0;
120                     do
121                     {
122                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
123                             CancellationState.ThrowIfCanceled(_cancellationToken);
124
125                         checked
126                         {
127                             sum += current;
128                             count++;
129                         }
130                     }
131                     while (source.MoveNext(ref current, ref keyUnused));
132
133                     currentElement = new Pair<double, long>(sum, count);
134
135                     return true;
136                 }
137
138                 return false;
139             }
140
141             //---------------------------------------------------------------------------------------
142             // Dispose of resources associated with the underlying enumerator.
143             //
144
145             protected override void Dispose(bool disposing)
146             {
147                 Debug.Assert(_source != null);
148                 _source.Dispose();
149             }
150         }
151     }
152 }