1 /****************************************************************************
3 ** Copyright (C) 2012 Nokia Corporation and/or its subsidiary(-ies).
4 ** Contact: http://www.qt-project.org/
6 ** This file is part of the QtCore module of the Qt Toolkit.
8 ** $QT_BEGIN_LICENSE:LGPL$
9 ** GNU Lesser General Public License Usage
10 ** This file may be used under the terms of the GNU Lesser General Public
11 ** License version 2.1 as published by the Free Software Foundation and
12 ** appearing in the file LICENSE.LGPL included in the packaging of this
13 ** file. Please review the following information to ensure the GNU Lesser
14 ** General Public License version 2.1 requirements will be met:
15 ** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
17 ** In addition, as a special exception, Nokia gives you certain additional
18 ** rights. These rights are described in the Nokia Qt LGPL Exception
19 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
21 ** GNU General Public License Usage
22 ** Alternatively, this file may be used under the terms of the GNU General
23 ** Public License version 3.0 as published by the Free Software Foundation
24 ** and appearing in the file LICENSE.GPL included in the packaging of this
25 ** file. Please review the following information to ensure the GNU General
26 ** Public License version 3.0 requirements will be met:
27 ** http://www.gnu.org/copyleft/gpl.html.
30 ** Alternatively, this file may be used in accordance with the terms and
31 ** conditions contained in a signed written agreement between you and Nokia.
40 ****************************************************************************/
42 #ifndef QTCONCURRENT_REDUCEKERNEL_H
43 #define QTCONCURRENT_REDUCEKERNEL_H
45 #include <QtConcurrent/qtconcurrent_global.h>
47 #ifndef QT_NO_CONCURRENT
49 #include <QtCore/qatomic.h>
50 #include <QtCore/qlist.h>
51 #include <QtCore/qmap.h>
52 #include <QtCore/qmutex.h>
53 #include <QtCore/qthread.h>
54 #include <QtCore/qthreadpool.h>
55 #include <QtCore/qvector.h>
61 namespace QtConcurrent {
66 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
67 limit the reduce queue size for MapReduce. When the number of
68 reduce blocks in the queue exceeds ReduceQueueStartLimit,
69 MapReduce won't start any new threads, and when it exceeds
70 ReduceQueueThrottleLimit running threads will be stopped.
73 ReduceQueueStartLimit = 20,
74 ReduceQueueThrottleLimit = 30
77 // IntermediateResults holds a block of intermediate results from a
78 // map or filter functor. The begin/end offsets indicates the origin
79 // and range of the block.
81 class IntermediateResults
91 UnorderedReduce = 0x1,
93 SequentialReduce = 0x4
94 // ParallelReduce = 0x8
96 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
97 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
101 // supports both ordered and out-of-order reduction
102 template <typename ReduceFunctor, typename ReduceResultType, typename T>
105 typedef QMap<int, IntermediateResults<T> > ResultsMap;
107 const ReduceOptions reduceOptions;
110 int progress, resultsMapSize, threadCount;
111 ResultsMap resultsMap;
113 bool canReduce(int begin) const
115 return (((reduceOptions & UnorderedReduce)
117 || ((reduceOptions & OrderedReduce)
118 && progress == begin));
121 void reduceResult(ReduceFunctor &reduce,
123 const IntermediateResults<T> &result)
125 for (int i = 0; i < result.vector.size(); ++i) {
126 reduce(r, result.vector.at(i));
130 void reduceResults(ReduceFunctor &reduce,
134 typename ResultsMap::iterator it = map.begin();
135 while (it != map.end()) {
136 reduceResult(reduce, r, it.value());
142 ReduceKernel(ReduceOptions _reduceOptions)
143 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
144 threadCount(QThreadPool::globalInstance()->maxThreadCount())
147 void runReduce(ReduceFunctor &reduce,
149 const IntermediateResults<T> &result)
151 QMutexLocker locker(&mutex);
152 if (!canReduce(result.begin)) {
154 resultsMap.insert(result.begin, result);
158 if (reduceOptions & UnorderedReduce) {
162 // reduce this result
164 reduceResult(reduce, r, result);
167 // reduce all stored results as well
168 while (!resultsMap.isEmpty()) {
169 ResultsMap resultsMapCopy = resultsMap;
173 reduceResults(reduce, r, resultsMapCopy);
176 resultsMapSize -= resultsMapCopy.size();
181 // reduce this result
183 reduceResult(reduce, r, result);
187 progress += result.end - result.begin;
189 // reduce as many other results as possible
190 typename ResultsMap::iterator it = resultsMap.begin();
191 while (it != resultsMap.end()) {
192 if (it.value().begin != progress)
196 reduceResult(reduce, r, it.value());
200 progress += it.value().end - it.value().begin;
201 it = resultsMap.erase(it);
207 void finish(ReduceFunctor &reduce, ReduceResultType &r)
209 reduceResults(reduce, r, resultsMap);
212 inline bool shouldThrottle()
214 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
217 inline bool shouldStartThread()
219 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
223 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
224 struct SequenceHolder2 : public Base
226 SequenceHolder2(const Sequence &_sequence,
229 ReduceOptions reduceOptions)
230 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
239 // Clear the sequence to make sure all temporaries are destroyed
240 // before finished is signaled.
241 sequence = Sequence();
247 } // namespace QtConcurrent
252 #endif // QT_NO_CONCURRENT