caf2d4ae26da206164a092c97acf02979ebd68a5
[platform/upstream/dotnet/runtime.git] /
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // DoubleSumAggregationOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// An inlined sum aggregation and its enumerator, for doubles.
19     /// </summary>
20     internal sealed class DoubleSumAggregationOperator : InlinedAggregationOperator<double, double, double>
21     {
22         //---------------------------------------------------------------------------------------
23         // Constructs a new instance of a sum associative operator.
24         //
25
26         internal DoubleSumAggregationOperator(IEnumerable<double> child) : base(child)
27         {
28         }
29
30         //---------------------------------------------------------------------------------------
31         // Executes the entire query tree, and aggregates the intermediate results into the
32         // final result based on the binary operators and final reduction.
33         //
34         // Return Value:
35         //     The single result of aggregation.
36         //
37
38         protected override double InternalAggregate(ref Exception singularExceptionToThrow)
39         {
40             // Because the final reduction is typically much cheaper than the intermediate
41             // reductions over the individual partitions, and because each parallel partition
42             // will do a lot of work to produce a single output element, we prefer to turn off
43             // pipelining, and process the final reductions serially.
44             using (IEnumerator<double> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
45             {
46                 // We just reduce the elements in each output partition.
47                 double sum = 0.0;
48                 while (enumerator.MoveNext())
49                 {
50                     sum += enumerator.Current;
51                 }
52
53                 return sum;
54             }
55         }
56
57         //---------------------------------------------------------------------------------------
58         // Creates an enumerator that is used internally for the final aggregation step.
59         //
60
61         protected override QueryOperatorEnumerator<double, int> CreateEnumerator<TKey>(
62             int index, int count, QueryOperatorEnumerator<double, TKey> source, object sharedData, CancellationToken cancellationToken)
63         {
64             return new DoubleSumAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
65         }
66
67         //---------------------------------------------------------------------------------------
68         // This enumerator type encapsulates the intermediary aggregation over the underlying
69         // (possibly partitioned) data source.
70         //
71
72         private class DoubleSumAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<double>
73         {
74             private readonly QueryOperatorEnumerator<double, TKey> _source; // The source data.
75
76             //---------------------------------------------------------------------------------------
77             // Instantiates a new aggregation operator.
78             //
79
80             internal DoubleSumAggregationOperatorEnumerator(QueryOperatorEnumerator<double, TKey> source, int partitionIndex,
81                 CancellationToken cancellationToken) :
82                 base(partitionIndex, cancellationToken)
83             {
84                 Debug.Assert(source != null);
85                 _source = source;
86             }
87
88             //---------------------------------------------------------------------------------------
89             // Tallies up the sum of the underlying data source, walking the entire thing the first
90             // time MoveNext is called on this object.
91             //
92
93             protected override bool MoveNextCore(ref double currentElement)
94             {
95                 double element = default(double);
96                 TKey keyUnused = default(TKey);
97
98                 QueryOperatorEnumerator<double, TKey> source = _source;
99                 if (source.MoveNext(ref element, ref keyUnused))
100                 {
101                     // We just scroll through the enumerator and accumulate the sum.
102                     double tempSum = 0.0;
103                     int i = 0;
104                     do
105                     {
106                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
107                             CancellationState.ThrowIfCanceled(_cancellationToken);
108                         tempSum += element;
109                     }
110                     while (source.MoveNext(ref element, ref keyUnused));
111
112                     // The sum has been calculated. Now just return.
113                     currentElement = tempSum;
114                     return true;
115                 }
116
117                 return false;
118             }
119
120             //---------------------------------------------------------------------------------------
121             // Dispose of resources associated with the underlying enumerator.
122             //
123
124             protected override void Dispose(bool disposing)
125             {
126                 Debug.Assert(_source != null);
127                 _source.Dispose();
128             }
129         }
130     }
131 }