2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 #include "tbb/parallel_reduce.h"
19 #include "tbb/atomic.h"
20 #include "harness_assert.h"
24 static tbb::atomic<long> ForkCount;
25 static tbb::atomic<long> FooBodyCount;
27 //! Class with public interface that is exactly minimal requirements for Range concept
31 explicit MinimalRange( size_t i ) : begin(0), end(i) {}
32 friend void Flog( int nthread, bool inteference );
34 MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) {
35 begin = r.end = (r.begin+r.end)/2;
37 bool is_divisible() const {return end-begin>=2;}
38 bool empty() const {return begin==end;}
41 //! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce
44 FooBody( const FooBody& ); // Deny access
45 void operator=( const FooBody& ); // Deny access
46 friend void Flog( int nthread, bool interference );
47 //! Parent that created this body via split operation. NULL if original body.
49 //! Total number of index values processed by body and its children.
51 //! Number of join operations done so far on this body and its children.
53 //! Range that has been processed so far by this body and its children.
55 //! True if body has not yet been processed at least once by operator().
57 //! 1 if body was created by split; 0 if original body.
59 FooBody() {++FooBodyCount;}
64 join_count=0xDEADBEEF;
67 FooBody( FooBody& other, tbb::split ) {
76 void join( FooBody& s ) {
77 ASSERT( s.forked==1, NULL );
78 ASSERT( this!=&s, NULL );
79 ASSERT( this==s.parent, NULL );
80 ASSERT( end==s.begin, NULL );
83 join_count += s.join_count + 1;
86 void operator()( const MinimalRange& r ) {
87 for( size_t k=r.begin; k<r.end; ++k )
93 ASSERT( end==r.begin, NULL );
100 #include "tbb/tick_count.h"
102 void Flog( int nthread, bool interference=false ) {
103 for (int mode = 0; mode < 4; mode++) {
104 tbb::tick_count T0 = tbb::tick_count::now();
106 tbb::affinity_partitioner ap;
107 for( size_t i=0; i<=1000; ++i ) {
114 f.begin = ~size_t(0);
116 ASSERT( FooBodyCount==1, NULL );
119 tbb::parallel_reduce( MinimalRange(i), f );
122 tbb::parallel_reduce( MinimalRange(i), f, tbb::simple_partitioner() );
125 tbb::parallel_reduce( MinimalRange(i), f, tbb::auto_partitioner() );
128 tbb::parallel_reduce( MinimalRange(i), f, ap );
131 join_count += f.join_count;
132 ASSERT( FooBodyCount==1, NULL );
133 ASSERT( f.sum==i, NULL );
134 ASSERT( f.begin==(i==0 ? ~size_t(0) : 0), NULL );
135 ASSERT( f.end==(i==0 ? ~size_t(0) : i), NULL );
137 tbb::tick_count T1 = tbb::tick_count::now();
138 REMARK("time=%g join_count=%ld ForkCount=%ld nthread=%d%s\n",
139 (T1-T0).seconds(),join_count,long(ForkCount), nthread, interference ? " with interference":"");
143 #include "tbb/blocked_range.h"
146 typedef tbb::internal::uint64_t ValueType;
148 typedef uint64_t ValueType;
153 T operator() ( const T& v1, const T& v2 ) const {
159 ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const {
160 for ( ValueType* pv = r.begin(); pv != r.end(); ++pv )
166 class ParallelSumTester: public NoAssign {
168 ParallelSumTester() : m_range(NULL, NULL) {
169 m_array = new ValueType[unsigned(N)];
170 for ( ValueType i = 0; i < N; ++i )
172 m_range = tbb::blocked_range<ValueType*>( m_array, m_array + N );
174 ~ParallelSumTester() { delete[] m_array; }
175 template<typename Partitioner>
176 void CheckParallelReduce() {
177 Partitioner partitioner;
178 ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum(), partitioner );
179 ASSERT( r1 == R, NULL );
180 #if __TBB_CPP11_LAMBDAS_PRESENT
181 ValueType r2 = tbb::parallel_reduce(
183 [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType {
184 for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv )
191 ASSERT( r2 == R, NULL );
194 void CheckParallelReduceDefault() {
195 ValueType r1 = tbb::parallel_reduce( m_range, I, Accumulator(), Sum() );
196 ASSERT( r1 == R, NULL );
197 #if __TBB_CPP11_LAMBDAS_PRESENT
198 ValueType r2 = tbb::parallel_reduce(
200 [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType {
201 for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv )
207 ASSERT( r2 == R, NULL );
212 tbb::blocked_range<ValueType*> m_range;
213 static const ValueType I, N, R;
216 const ValueType ParallelSumTester::I = 0;
217 const ValueType ParallelSumTester::N = 1000000;
218 const ValueType ParallelSumTester::R = N * (N + 1) / 2;
220 void ParallelSum () {
221 ParallelSumTester pst;
222 pst.CheckParallelReduceDefault();
223 pst.CheckParallelReduce<tbb::simple_partitioner>();
224 pst.CheckParallelReduce<tbb::auto_partitioner>();
225 pst.CheckParallelReduce<tbb::affinity_partitioner>();
226 pst.CheckParallelReduce<tbb::static_partitioner>();
229 #include "harness_concurrency_tracker.h"
234 int operator() ( int x, int i ) const {
237 int join( int x, int y ) const {
238 return operator()( x, y );
244 typedef typename Op::Type result_type;
245 result_type my_value;
247 ReduceBody() : my_value() {}
248 ReduceBody( ReduceBody &, tbb::split ) : my_value() {}
250 void operator() ( const tbb::blocked_range<int>& r ) {
251 Harness::ConcurrencyTracker ct;
252 for ( int i = r.begin(); i != r.end(); ++i ) {
254 my_value = op(my_value, i);
258 void join( const ReduceBody& y ) {
260 my_value = op.join(my_value, y.my_value);
264 //! Type-tag for automatic testing algorithm deduction
265 struct harness_default_partitioner {};
267 template<typename Body, typename Partitioner>
268 struct parallel_deterministic_reduce_invoker {
269 template<typename Range>
270 static typename Body::result_type run( const Range& range ) {
272 tbb::parallel_deterministic_reduce(range, body, Partitioner());
273 return body.my_value;
277 template<typename Body>
278 struct parallel_deterministic_reduce_invoker<Body, harness_default_partitioner> {
279 template<typename Range>
280 static typename Body::result_type run( const Range& range ) {
282 tbb::parallel_deterministic_reduce(range, body);
283 return body.my_value;
287 template<typename ResultType, typename Partitioner>
288 struct parallel_deterministic_reduce_lambda_invoker {
289 template<typename Range, typename Func, typename Reduction>
290 static ResultType run( const Range& range, Func f, Reduction r ) {
291 return tbb::parallel_deterministic_reduce(range, ResultType(), f, r, Partitioner());
295 template<typename ResultType>
296 struct parallel_deterministic_reduce_lambda_invoker<ResultType, harness_default_partitioner> {
297 template<typename Range, typename Func, typename Reduction>
298 static ResultType run(const Range& range, Func f, Reduction r) {
299 return tbb::parallel_deterministic_reduce(range, ResultType(), f, r);
303 //! Define overloads of parallel_deterministic_reduce that accept "undesired" types of partitioners
304 namespace unsupported {
306 template<typename Range, typename Body>
307 void parallel_deterministic_reduce(const Range&, Body&, const tbb::auto_partitioner&) { }
309 template<typename Range, typename Body>
310 void parallel_deterministic_reduce(const Range&, Body&, tbb::affinity_partitioner&) { }
312 template<typename Range, typename Value, typename RealBody, typename Reduction>
313 Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , const tbb::auto_partitioner&) {
317 template<typename Range, typename Value, typename RealBody, typename Reduction>
318 Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , tbb::affinity_partitioner&) {
327 Body(Body&, tbb::split) { value = 0; }
328 void operator()(const tbb::blocked_range<int>&) {}
332 //! Check that other types of partitioners are not supported (auto, affinity)
333 //! In the case of "unsupported" API unexpectedly sneaking into namespace tbb,
334 //! this test should result in a compilation error due to overload resolution ambiguity
335 static void TestUnsupportedPartitioners() {
337 using namespace unsupported;
339 parallel_deterministic_reduce(blocked_range<int>(0, 10), body, tbb::auto_partitioner());
341 tbb::affinity_partitioner ap;
342 parallel_deterministic_reduce(blocked_range<int>(0, 10), body, ap);
344 #if __TBB_CPP11_LAMBDAS_PRESENT
345 parallel_deterministic_reduce(
346 blocked_range<int>(0, 10),
348 [](const blocked_range<int>&, int init)->int {
351 [](int x, int y)->int {
354 tbb::auto_partitioner()
356 parallel_deterministic_reduce(
357 blocked_range<int>(0, 10),
359 [](const blocked_range<int>&, int init)->int {
362 [](int x, int y)->int {
370 template <class Partitioner>
371 void TestDeterministicReductionFor() {
373 const tbb::blocked_range<int> range(0, N);
374 typedef ReduceBody<RotOp> BodyType;
375 BodyType::result_type R1 =
376 parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range);
377 for ( int i=0; i<100; ++i ) {
378 BodyType::result_type R2 =
379 parallel_deterministic_reduce_invoker<BodyType, Partitioner>::run(range);
380 ASSERT( R1 == R2, "parallel_deterministic_reduce behaves differently from run to run" );
381 #if __TBB_CPP11_LAMBDAS_PRESENT
382 typedef RotOp::Type Type;
383 Type R3 = parallel_deterministic_reduce_lambda_invoker<Type, Partitioner>::run(
385 [](const tbb::blocked_range<int>& br, Type value) -> Type {
386 Harness::ConcurrencyTracker ct;
387 for ( int ii = br.begin(); ii != br.end(); ++ii ) {
389 value = op(value, ii);
393 [](const Type& v1, const Type& v2) -> Type {
395 return op.join(v1,v2);
398 ASSERT( R1 == R3, "lambda-based parallel_deterministic_reduce behaves differently from run to run" );
403 void TestDeterministicReduction () {
404 TestDeterministicReductionFor<tbb::simple_partitioner>();
405 TestDeterministicReductionFor<tbb::static_partitioner>();
406 TestDeterministicReductionFor<harness_default_partitioner>();
407 ASSERT_WARNING((Harness::ConcurrencyTracker::PeakParallelism() > 1), "no parallel execution\n");
410 #include "tbb/task_scheduler_init.h"
411 #include "harness_cpu.h"
412 #include "test_partitioner.h"
414 namespace interaction_with_range_and_partitioner {
416 // Test checks compatibility of parallel_reduce algorithm with various range implementations
419 using namespace test_partitioner_utils::interaction_with_range_and_partitioner;
421 test_partitioner_utils::SimpleReduceBody body;
422 tbb::affinity_partitioner ap;
424 parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false), body, ap);
425 parallel_reduce(Range2(true, false), body, ap);
426 parallel_reduce(Range3(true, false), body, ap);
427 parallel_reduce(Range4(false, true), body, ap);
428 parallel_reduce(Range5(false, true), body, ap);
429 parallel_reduce(Range6(false, true), body, ap);
431 parallel_reduce(Range1(/*assert_in_split*/ true, /*assert_in_proportional_split*/ false),
432 body, tbb::static_partitioner());
433 parallel_reduce(Range2(true, false), body, tbb::static_partitioner());
434 parallel_reduce(Range3(true, false), body, tbb::static_partitioner());
435 parallel_reduce(Range4(false, true), body, tbb::static_partitioner());
436 parallel_reduce(Range5(false, true), body, tbb::static_partitioner());
437 parallel_reduce(Range6(false, true), body, tbb::static_partitioner());
439 parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true),
440 body, tbb::simple_partitioner());
441 parallel_reduce(Range2(false, true), body, tbb::simple_partitioner());
442 parallel_reduce(Range3(false, true), body, tbb::simple_partitioner());
443 parallel_reduce(Range4(false, true), body, tbb::simple_partitioner());
444 parallel_reduce(Range5(false, true), body, tbb::simple_partitioner());
445 parallel_reduce(Range6(false, true), body, tbb::simple_partitioner());
447 parallel_reduce(Range1(/*assert_in_split*/ false, /*assert_in_proportional_split*/ true),
448 body, tbb::auto_partitioner());
449 parallel_reduce(Range2(false, true), body, tbb::auto_partitioner());
450 parallel_reduce(Range3(false, true), body, tbb::auto_partitioner());
451 parallel_reduce(Range4(false, true), body, tbb::auto_partitioner());
452 parallel_reduce(Range5(false, true), body, tbb::auto_partitioner());
453 parallel_reduce(Range6(false, true), body, tbb::auto_partitioner());
455 parallel_deterministic_reduce(Range1(/*assert_in_split*/true, /*assert_in_proportional_split*/ false),
456 body, tbb::static_partitioner());
457 parallel_deterministic_reduce(Range2(true, false), body, tbb::static_partitioner());
458 parallel_deterministic_reduce(Range3(true, false), body, tbb::static_partitioner());
459 parallel_deterministic_reduce(Range4(false, true), body, tbb::static_partitioner());
460 parallel_deterministic_reduce(Range5(false, true), body, tbb::static_partitioner());
461 parallel_deterministic_reduce(Range6(false, true), body, tbb::static_partitioner());
463 parallel_deterministic_reduce(Range1(/*assert_in_split*/false, /*assert_in_proportional_split*/ true),
464 body, tbb::simple_partitioner());
465 parallel_deterministic_reduce(Range2(false, true), body, tbb::simple_partitioner());
466 parallel_deterministic_reduce(Range3(false, true), body, tbb::simple_partitioner());
467 parallel_deterministic_reduce(Range4(false, true), body, tbb::simple_partitioner());
468 parallel_deterministic_reduce(Range5(false, true), body, tbb::simple_partitioner());
469 parallel_deterministic_reduce(Range6(false, true), body, tbb::simple_partitioner());
472 } // interaction_with_range_and_partitioner
475 TestUnsupportedPartitioners();
477 REPORT("Usage: nthread must be positive\n");
480 for( int p=MinThread; p<=MaxThread; ++p ) {
481 tbb::task_scheduler_init init( p );
485 TestDeterministicReduction();
486 // Test that all workers sleep when no work
489 interaction_with_range_and_partitioner::test();
490 return Harness::Done;