ebc37e793667ba0510302eb2d24458c944b4d1b1
[profile/ivi/qtbase.git] / src / corelib / concurrent / qtconcurrentreducekernel.h
1 /****************************************************************************
2 **
3 ** Copyright (C) 2011 Nokia Corporation and/or its subsidiary(-ies).
4 ** All rights reserved.
5 ** Contact: Nokia Corporation (qt-info@nokia.com)
6 **
7 ** This file is part of the QtCore module of the Qt Toolkit.
8 **
9 ** $QT_BEGIN_LICENSE:LGPL$
10 ** GNU Lesser General Public License Usage
11 ** This file may be used under the terms of the GNU Lesser General Public
12 ** License version 2.1 as published by the Free Software Foundation and
13 ** appearing in the file LICENSE.LGPL included in the packaging of this
14 ** file. Please review the following information to ensure the GNU Lesser
15 ** General Public License version 2.1 requirements will be met:
16 ** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
17 **
18 ** In addition, as a special exception, Nokia gives you certain additional
19 ** rights. These rights are described in the Nokia Qt LGPL Exception
20 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
21 **
22 ** GNU General Public License Usage
23 ** Alternatively, this file may be used under the terms of the GNU General
24 ** Public License version 3.0 as published by the Free Software Foundation
25 ** and appearing in the file LICENSE.GPL included in the packaging of this
26 ** file. Please review the following information to ensure the GNU General
27 ** Public License version 3.0 requirements will be met:
28 ** http://www.gnu.org/copyleft/gpl.html.
29 **
30 ** Other Usage
31 ** Alternatively, this file may be used in accordance with the terms and
32 ** conditions contained in a signed written agreement between you and Nokia.
33 **
34 **
35 **
36 **
37 **
38 ** $QT_END_LICENSE$
39 **
40 ****************************************************************************/
41
42 #ifndef QTCONCURRENT_REDUCEKERNEL_H
43 #define QTCONCURRENT_REDUCEKERNEL_H
44
45 #include <QtCore/qglobal.h>
46
47 #ifndef QT_NO_CONCURRENT
48
49 #include <QtCore/qatomic.h>
50 #include <QtCore/qlist.h>
51 #include <QtCore/qmap.h>
52 #include <QtCore/qmutex.h>
53 #include <QtCore/qthread.h>
54 #include <QtCore/qthreadpool.h>
55 #include <QtCore/qvector.h>
56
57 QT_BEGIN_HEADER
58 QT_BEGIN_NAMESPACE
59
60 QT_MODULE(Core)
61
62 namespace QtConcurrent {
63
64 #ifndef qdoc
65
66 /*
67     The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
68     limit the reduce queue size for MapReduce. When the number of
69     reduce blocks in the queue exceeds ReduceQueueStartLimit,
70     MapReduce won't start any new threads, and when it exceeds
71     ReduceQueueThrottleLimit running threads will be stopped.
72 */
73 enum {
74     ReduceQueueStartLimit = 20,
75     ReduceQueueThrottleLimit = 30
76 };
77
78 // IntermediateResults holds a block of intermediate results from a
79 // map or filter functor. The begin/end offsets indicates the origin
80 // and range of the block.
81 template <typename T>
82 class IntermediateResults
83 {
84 public:
85     int begin, end;
86     QVector<T> vector;
87 };
88
89 #endif // qdoc
90
91 enum ReduceOption {
92     UnorderedReduce = 0x1,
93     OrderedReduce = 0x2,
94     SequentialReduce = 0x4
95     // ParallelReduce = 0x8
96 };
97 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
98 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
99
100 #ifndef qdoc
101
102 // supports both ordered and out-of-order reduction
103 template <typename ReduceFunctor, typename ReduceResultType, typename T>
104 class ReduceKernel
105 {
106     typedef QMap<int, IntermediateResults<T> > ResultsMap;
107
108     const ReduceOptions reduceOptions;
109
110     QMutex mutex;
111     int progress, resultsMapSize, threadCount;
112     ResultsMap resultsMap;
113
114     bool canReduce(int begin) const
115     {
116         return (((reduceOptions & UnorderedReduce)
117                  && progress == 0)
118                 || ((reduceOptions & OrderedReduce)
119                     && progress == begin));
120     }
121
122     void reduceResult(ReduceFunctor &reduce,
123                       ReduceResultType &r,
124                       const IntermediateResults<T> &result)
125     {
126         for (int i = 0; i < result.vector.size(); ++i) {
127             reduce(r, result.vector.at(i));
128         }
129     }
130
131     void reduceResults(ReduceFunctor &reduce,
132                        ReduceResultType &r,
133                        ResultsMap &map)
134     {
135         typename ResultsMap::iterator it = map.begin();
136         while (it != map.end()) {
137             reduceResult(reduce, r, it.value());
138             ++it;
139         }
140     }
141
142 public:
143     ReduceKernel(ReduceOptions _reduceOptions)
144         : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0), 
145           threadCount(QThreadPool::globalInstance()->maxThreadCount())
146     { }
147
148     void runReduce(ReduceFunctor &reduce,
149                    ReduceResultType &r,
150                    const IntermediateResults<T> &result)
151     {
152         QMutexLocker locker(&mutex);
153         if (!canReduce(result.begin)) {
154             ++resultsMapSize;
155             resultsMap.insert(result.begin, result);
156             return;
157         }
158
159         if (reduceOptions & UnorderedReduce) {
160             // UnorderedReduce
161             progress = -1;
162
163             // reduce this result
164             locker.unlock();
165             reduceResult(reduce, r, result);
166             locker.relock();
167
168             // reduce all stored results as well
169             while (!resultsMap.isEmpty()) {
170                 ResultsMap resultsMapCopy = resultsMap;
171                 resultsMap.clear();
172
173                 locker.unlock();
174                 reduceResults(reduce, r, resultsMapCopy);
175                 locker.relock();
176
177                 resultsMapSize -= resultsMapCopy.size();
178             }
179
180             progress = 0;
181         } else {
182             // reduce this result
183             locker.unlock();
184             reduceResult(reduce, r, result);
185             locker.relock();
186
187             // OrderedReduce
188             progress += result.end - result.begin;
189
190             // reduce as many other results as possible
191             typename ResultsMap::iterator it = resultsMap.begin();
192             while (it != resultsMap.end()) {
193                 if (it.value().begin != progress)
194                     break;
195
196                 locker.unlock();
197                 reduceResult(reduce, r, it.value());
198                 locker.relock();
199
200                 --resultsMapSize;
201                 progress += it.value().end - it.value().begin;
202                 it = resultsMap.erase(it);
203             }
204         }
205     }
206
207     // final reduction
208     void finish(ReduceFunctor &reduce, ReduceResultType &r)
209     {
210         reduceResults(reduce, r, resultsMap);
211     }
212
213     inline bool shouldThrottle()
214     {
215         return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
216     }
217
218     inline bool shouldStartThread()
219     {
220         return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
221     }
222 };
223
224 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
225 struct SequenceHolder2 : public Base
226 {
227     SequenceHolder2(const Sequence &_sequence,
228                     Functor1 functor1,
229                     Functor2 functor2,
230                     ReduceOptions reduceOptions)
231         : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
232           sequence(_sequence)
233     { }
234
235     Sequence sequence;
236
237     void finish()
238     {
239         Base::finish();
240         // Clear the sequence to make sure all temporaries are destroyed
241         // before finished is signaled.
242         sequence = Sequence();
243     }
244 };
245
246 #endif //qdoc
247
248 } // namespace QtConcurrent
249
250 QT_END_NAMESPACE
251 QT_END_HEADER
252
253 #endif // QT_NO_CONCURRENT
254
255 #endif