1 /****************************************************************************
3 ** Copyright (C) 2011 Nokia Corporation and/or its subsidiary(-ies).
4 ** All rights reserved.
5 ** Contact: Nokia Corporation (qt-info@nokia.com)
7 ** This file is part of the QtCore module of the Qt Toolkit.
9 ** $QT_BEGIN_LICENSE:LGPL$
10 ** GNU Lesser General Public License Usage
11 ** This file may be used under the terms of the GNU Lesser General Public
12 ** License version 2.1 as published by the Free Software Foundation and
13 ** appearing in the file LICENSE.LGPL included in the packaging of this
14 ** file. Please review the following information to ensure the GNU Lesser
15 ** General Public License version 2.1 requirements will be met:
16 ** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
18 ** In addition, as a special exception, Nokia gives you certain additional
19 ** rights. These rights are described in the Nokia Qt LGPL Exception
20 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
22 ** GNU General Public License Usage
23 ** Alternatively, this file may be used under the terms of the GNU General
24 ** Public License version 3.0 as published by the Free Software Foundation
25 ** and appearing in the file LICENSE.GPL included in the packaging of this
26 ** file. Please review the following information to ensure the GNU General
27 ** Public License version 3.0 requirements will be met:
28 ** http://www.gnu.org/copyleft/gpl.html.
31 ** Alternatively, this file may be used in accordance with the terms and
32 ** conditions contained in a signed written agreement between you and Nokia.
40 ****************************************************************************/
42 #ifndef QTCONCURRENT_REDUCEKERNEL_H
43 #define QTCONCURRENT_REDUCEKERNEL_H
45 #include <QtCore/qglobal.h>
47 #ifndef QT_NO_CONCURRENT
49 #include <QtCore/qatomic.h>
50 #include <QtCore/qlist.h>
51 #include <QtCore/qmap.h>
52 #include <QtCore/qmutex.h>
53 #include <QtCore/qthread.h>
54 #include <QtCore/qthreadpool.h>
55 #include <QtCore/qvector.h>
62 namespace QtConcurrent {
67 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
68 limit the reduce queue size for MapReduce. When the number of
69 reduce blocks in the queue exceeds ReduceQueueStartLimit,
70 MapReduce won't start any new threads, and when it exceeds
71 ReduceQueueThrottleLimit running threads will be stopped.
74 ReduceQueueStartLimit = 20,
75 ReduceQueueThrottleLimit = 30
78 // IntermediateResults holds a block of intermediate results from a
79 // map or filter functor. The begin/end offsets indicates the origin
80 // and range of the block.
82 class IntermediateResults
92 UnorderedReduce = 0x1,
94 SequentialReduce = 0x4
95 // ParallelReduce = 0x8
97 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
98 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
102 // supports both ordered and out-of-order reduction
103 template <typename ReduceFunctor, typename ReduceResultType, typename T>
106 typedef QMap<int, IntermediateResults<T> > ResultsMap;
108 const ReduceOptions reduceOptions;
111 int progress, resultsMapSize, threadCount;
112 ResultsMap resultsMap;
114 bool canReduce(int begin) const
116 return (((reduceOptions & UnorderedReduce)
118 || ((reduceOptions & OrderedReduce)
119 && progress == begin));
122 void reduceResult(ReduceFunctor &reduce,
124 const IntermediateResults<T> &result)
126 for (int i = 0; i < result.vector.size(); ++i) {
127 reduce(r, result.vector.at(i));
131 void reduceResults(ReduceFunctor &reduce,
135 typename ResultsMap::iterator it = map.begin();
136 while (it != map.end()) {
137 reduceResult(reduce, r, it.value());
143 ReduceKernel(ReduceOptions _reduceOptions)
144 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
145 threadCount(QThreadPool::globalInstance()->maxThreadCount())
148 void runReduce(ReduceFunctor &reduce,
150 const IntermediateResults<T> &result)
152 QMutexLocker locker(&mutex);
153 if (!canReduce(result.begin)) {
155 resultsMap.insert(result.begin, result);
159 if (reduceOptions & UnorderedReduce) {
163 // reduce this result
165 reduceResult(reduce, r, result);
168 // reduce all stored results as well
169 while (!resultsMap.isEmpty()) {
170 ResultsMap resultsMapCopy = resultsMap;
174 reduceResults(reduce, r, resultsMapCopy);
177 resultsMapSize -= resultsMapCopy.size();
182 // reduce this result
184 reduceResult(reduce, r, result);
188 progress += result.end - result.begin;
190 // reduce as many other results as possible
191 typename ResultsMap::iterator it = resultsMap.begin();
192 while (it != resultsMap.end()) {
193 if (it.value().begin != progress)
197 reduceResult(reduce, r, it.value());
201 progress += it.value().end - it.value().begin;
202 it = resultsMap.erase(it);
208 void finish(ReduceFunctor &reduce, ReduceResultType &r)
210 reduceResults(reduce, r, resultsMap);
213 inline bool shouldThrottle()
215 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
218 inline bool shouldStartThread()
220 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
224 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
225 struct SequenceHolder2 : public Base
227 SequenceHolder2(const Sequence &_sequence,
230 ReduceOptions reduceOptions)
231 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
240 // Clear the sequence to make sure all temporaries are destroyed
241 // before finished is signaled.
242 sequence = Sequence();
248 } // namespace QtConcurrent
253 #endif // QT_NO_CONCURRENT