94b8e788a834dba5cdd7a5ab3f38b0f553a32fe3
[platform/upstream/dotnet/runtime.git] /
1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // FloatSumAggregationOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// An inlined sum aggregation and its enumerator, for floats.
19     /// </summary>
20     internal sealed class FloatSumAggregationOperator : InlinedAggregationOperator<float, double, float>
21     {
22         //---------------------------------------------------------------------------------------
23         // Constructs a new instance of a sum associative operator.
24         //
25
26         internal FloatSumAggregationOperator(IEnumerable<float> child) : base(child)
27         {
28         }
29
30         //---------------------------------------------------------------------------------------
31         // Executes the entire query tree, and aggregates the intermediate results into the
32         // final result based on the binary operators and final reduction.
33         //
34         // Return Value:
35         //     The single result of aggregation.
36         //
37
38         protected override float InternalAggregate(ref Exception singularExceptionToThrow)
39         {
40             // Because the final reduction is typically much cheaper than the intermediate
41             // reductions over the individual partitions, and because each parallel partition
42             // will do a lot of work to produce a single output element, we prefer to turn off
43             // pipelining, and process the final reductions serially.
44             using (IEnumerator<double> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
45             {
46                 // We just reduce the elements in each output partition.
47                 double sum = 0.0;
48                 while (enumerator.MoveNext())
49                 {
50                     sum += enumerator.Current;
51                 }
52
53                 return (float)sum;
54             }
55         }
56
57         //---------------------------------------------------------------------------------------
58         // Creates an enumerator that is used internally for the final aggregation step.
59         //
60
61         protected override QueryOperatorEnumerator<double, int> CreateEnumerator<TKey>(
62             int index, int count, QueryOperatorEnumerator<float, TKey> source, object sharedData,
63             CancellationToken cancellationToken)
64         {
65             return new FloatSumAggregationOperatorEnumerator<TKey>(source, index, cancellationToken);
66         }
67
68         //---------------------------------------------------------------------------------------
69         // This enumerator type encapsulates the intermediary aggregation over the underlying
70         // (possibly partitioned) data source.
71         //
72
73         private class FloatSumAggregationOperatorEnumerator<TKey> : InlinedAggregationOperatorEnumerator<double>
74         {
75             private readonly QueryOperatorEnumerator<float, TKey> _source; // The source data.
76
77             //---------------------------------------------------------------------------------------
78             // Instantiates a new aggregation operator.
79             //
80
81             internal FloatSumAggregationOperatorEnumerator(QueryOperatorEnumerator<float, TKey> source, int partitionIndex,
82                 CancellationToken cancellationToken) :
83                 base(partitionIndex, cancellationToken)
84             {
85                 Debug.Assert(source != null);
86                 _source = source;
87             }
88
89             //---------------------------------------------------------------------------------------
90             // Tallies up the sum of the underlying data source, walking the entire thing the first
91             // time MoveNext is called on this object.
92             //
93
94             protected override bool MoveNextCore(ref double currentElement)
95             {
96                 float element = default(float);
97                 TKey keyUnused = default(TKey);
98
99                 QueryOperatorEnumerator<float, TKey> source = _source;
100                 if (source.MoveNext(ref element, ref keyUnused))
101                 {
102                     // We just scroll through the enumerator and accumulate the sum.
103                     double tempSum = 0.0f;
104                     int i = 0;
105                     do
106                     {
107                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
108                             CancellationState.ThrowIfCanceled(_cancellationToken);
109
110                         tempSum += element;
111                     }
112                     while (source.MoveNext(ref element, ref keyUnused));
113
114                     // The sum has been calculated. Now just return.
115                     currentElement = tempSum;
116                     return true;
117                 }
118
119                 return false;
120             }
121
122             //---------------------------------------------------------------------------------------
123             // Dispose of resources associated with the underlying enumerator.
124             //
125
126             protected override void Dispose(bool disposing)
127             {
128                 Debug.Assert(_source != null);
129                 _source.Dispose();
130             }
131         }
132     }
133 }