2 Copyright (c) 2005-2017 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
21 /* Example program that computes Fibonacci numbers in different ways.
22 Arguments are: [ Number [Threads [Repeats]]]
23 The defaults are Number=500 Threads=1:4 Repeats=1.
25 The point of this program is to check that the library is working properly.
26 Most of the computations are deliberately silly and not expected to
27 show any speedup on multiprocessors.
40 #include "tbb/task_scheduler_init.h"
41 #include "tbb/tick_count.h"
42 #include "tbb/blocked_range.h"
43 #include "tbb/concurrent_vector.h"
44 #include "tbb/concurrent_queue.h"
45 #include "tbb/concurrent_hash_map.h"
46 #include "tbb/parallel_while.h"
47 #include "tbb/parallel_for.h"
48 #include "tbb/parallel_reduce.h"
49 #include "tbb/parallel_scan.h"
50 #include "tbb/pipeline.h"
51 #include "tbb/atomic.h"
52 #include "tbb/mutex.h"
53 #include "tbb/spin_mutex.h"
54 #include "tbb/queuing_mutex.h"
55 #include "tbb/tbb_thread.h"
60 //! type used for Fibonacci number computations
61 typedef long long value;
69 Matrix2x2(value v00, value v01, value v10, value v11) {
70 v[0][0] = v00; v[0][1] = v01; v[1][0] = v10; v[1][1] = v11;
72 Matrix2x2 operator * (const Matrix2x2 &to) const; //< Multiply two Matrices
75 static const Matrix2x2 MatrixIdentity(1, 0, 0, 1);
76 //! Default matrix to multiply
77 static const Matrix2x2 Matrix1110(1, 1, 1, 0);
78 //! Raw arrays matrices multiply
79 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2]);
81 /////////////////////// Serial methods ////////////////////////
84 value SerialFib(int n)
88 value a = 0, b = 1, sum; int i;
89 for( i = 2; i <= n; i++ )
90 { // n is really index of Fibonacci number
91 sum = a + b; a = b; b = sum;
95 //! Serial n-1 matrices multiplication
96 value SerialMatrixFib(int n)
98 value c[2][2], a[2][2] = {{1, 1}, {1, 0}}, b[2][2] = {{1, 1}, {1, 0}}; int i;
99 for(i = 2; i < n; i++)
100 { // Using condition to prevent copying of values
101 if(i & 1) Matrix2x2Multiply(a, c, b);
102 else Matrix2x2Multiply(a, b, c);
104 return (i & 1) ? c[0][0] : b[0][0]; // get result from upper left cell
106 //! Recursive summing. Just for complete list of serial algorithms, not used
107 value SerialRecursiveFib(int n)
113 result = SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2);
116 //! Introducing of queue method in serial
117 value SerialQueueFib(int n)
119 concurrent_queue<Matrix2x2> Q;
120 for(int i = 1; i < n; i++)
124 while( !Q.try_pop(A) ) this_tbb_thread::yield();
126 while( !Q.try_pop(B) ) this_tbb_thread::yield();
131 //! Trying to use concurrent_vector
132 value SerialVectorFib(int n)
134 concurrent_vector<value> A;
137 for( int i = 2; i <= n; i++)
139 A.grow_to_at_least(i+1);
140 A[i] = A[i-1] + A[i-2];
145 ///////////////////// Parallel methods ////////////////////////
147 // *** Serial shared by mutexes *** //
150 value SharedA = 0, SharedB = 1; int SharedI = 1, SharedN;
152 //! Template task class which computes Fibonacci numbers with shared globals
154 class SharedSerialFibBody {
157 SharedSerialFibBody( M &m ) : mutex( m ) {}
159 void operator()( const blocked_range<int>& range ) const {
161 typename M::scoped_lock lock( mutex );
162 if(SharedI >= SharedN) break;
163 value sum = SharedA + SharedB;
164 SharedA = SharedB; SharedB = sum;
172 value SharedSerialFib(int n)
174 SharedA = 0; SharedB = 1; SharedI = 1; SharedN = n; M mutex;
175 parallel_for( blocked_range<int>(0,4,1), SharedSerialFibBody<M>( mutex ) );
179 // *** Serial shared by concurrent hash map *** //
182 struct IntHashCompare {
183 bool equal( const int j, const int k ) const { return j == k; }
184 unsigned long hash( const int k ) const { return (unsigned long)k; }
186 //! NumbersTable type based on concurrent_hash_map
187 typedef concurrent_hash_map<int, value, IntHashCompare> NumbersTable;
188 //! task for serial method using shared concurrent_hash_map
189 class ConcurrentHashSerialFibTask: public task {
194 ConcurrentHashSerialFibTask( NumbersTable &cht, int n ) : Fib(cht), my_n(n) { }
196 task* execute() /*override*/ {
197 for( int i = 2; i <= my_n; ++i ) { // there is no difference in to recycle or to make loop
198 NumbersTable::const_accessor f1, f2; // same as iterators
199 if( !Fib.find(f1, i-1) || !Fib.find(f2, i-2) ) {
200 // Something is seriously wrong, because i-1 and i-2 must have been inserted
201 // earlier by this thread or another thread.
204 value sum = f1->second + f2->second;
205 NumbersTable::const_accessor fsum;
206 Fib.insert(fsum, make_pair(i, sum)); // inserting
207 assert( fsum->second == sum ); // check value
214 value ConcurrentHashSerialFib(int n)
218 okay = Fib.insert( make_pair(0, 0) ); assert(okay); // assign initial values
219 okay = Fib.insert( make_pair(1, 1) ); assert(okay);
223 list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
224 list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
225 task::spawn_root_and_wait(list);
226 NumbersTable::const_accessor fresult;
227 okay = Fib.find( fresult, n );
229 return fresult->second;
232 // *** Queue with parallel_for and parallel_while *** //
234 //! Stream of matrices
236 volatile bool producer_is_done;
237 concurrent_queue<Matrix2x2> Queue;
238 //! Get pair of matricies if present
239 bool pop_if_present( pair<Matrix2x2, Matrix2x2> &mm ) {
240 // get first matrix if present
241 if(!Queue.try_pop(mm.first)) return false;
242 // get second matrix if present
243 if(!Queue.try_pop(mm.second)) {
244 // if not, then push back first matrix
245 Queue.push(mm.first); return false;
251 //! Functor for parallel_for which fills the queue
252 struct parallel_forFibBody {
253 QueueStream &my_stream;
254 //! fill functor arguments
255 parallel_forFibBody(QueueStream &s) : my_stream(s) { }
256 //! iterate thorough range
257 void operator()( const blocked_range<int> &range ) const {
258 int i_end = range.end();
259 for( int i = range.begin(); i != i_end; ++i ) {
260 my_stream.Queue.push( Matrix1110 ); // push initial matrix
264 //! Functor for parallel_while which process the queue
265 class parallel_whileFibBody
267 QueueStream &my_stream;
268 parallel_while<parallel_whileFibBody> &my_while;
270 typedef pair<Matrix2x2, Matrix2x2> argument_type;
271 //! fill functor arguments
272 parallel_whileFibBody(parallel_while<parallel_whileFibBody> &w, QueueStream &s)
273 : my_while(w), my_stream(s) { }
274 //! process pair of matrices
275 void operator() (argument_type mm) const {
276 mm.first = mm.first * mm.second;
277 // note: it can run concurrently with QueueStream::pop_if_present()
278 if(my_stream.Queue.try_pop(mm.second))
279 my_while.add( mm ); // now, two matrices available. Add next iteration.
280 else my_stream.Queue.push( mm.first ); // or push back calculated value if queue is empty
284 //! Parallel queue's filling task
285 struct QueueInsertTask: public task {
286 QueueStream &my_stream;
288 //! fill task arguments
289 QueueInsertTask( int n, QueueStream &s ) : my_n(n), my_stream(s) { }
291 task* execute() /*override*/ {
292 // Execute of parallel pushing of n-1 initial matrices
293 parallel_for( blocked_range<int>( 1, my_n, 10 ), parallel_forFibBody(my_stream) );
294 my_stream.producer_is_done = true;
298 //! Parallel queue's processing task
299 struct QueueProcessTask: public task {
300 QueueStream &my_stream;
301 //! fill task argument
302 QueueProcessTask( QueueStream &s ) : my_stream(s) { }
304 task* execute() /*override*/ {
305 while( !my_stream.producer_is_done || my_stream.Queue.unsafe_size()>1 ) {
306 parallel_while<parallel_whileFibBody> w; // run while loop in parallel
307 w.run( my_stream, parallel_whileFibBody( w, my_stream ) );
313 value ParallelQueueFib(int n)
316 stream.producer_is_done = false;
318 list.push_back(*new(task::allocate_root()) QueueInsertTask( n, stream ));
319 list.push_back(*new(task::allocate_root()) QueueProcessTask( stream ));
320 // If there is only a single thread, the first task in the list runs to completion
321 // before the second task in the list starts.
322 task::spawn_root_and_wait(list);
323 assert(stream.Queue.unsafe_size() == 1); // it is easy to lose some work
325 bool result = stream.Queue.try_pop( M ); // get last matrix
327 return M.v[0][0]; // and result number
330 // *** Queue with pipeline *** //
332 //! filter to fills queue
333 class InputFilter: public filter {
334 tbb::atomic<int> N; //< index of Fibonacci number minus 1
336 concurrent_queue<Matrix2x2> Queue;
337 //! fill filter arguments
338 InputFilter( int n ) : filter(false /*is not serial*/) { N = n; }
340 void* operator()(void*) /*override*/ {
343 Queue.push( Matrix1110 );
347 //! filter to process queue
348 class MultiplyFilter: public filter {
350 MultiplyFilter( ) : filter(false /*is not serial*/) { }
352 void* operator()(void*p) /*override*/ {
353 concurrent_queue<Matrix2x2> &Queue = *static_cast<concurrent_queue<Matrix2x2> *>(p);
356 while( !Queue.try_pop( m1 ) ) this_tbb_thread::yield();
357 while( !Queue.try_pop( m2 ) ) this_tbb_thread::yield();
358 m1 = m1 * m2; // process them
359 Queue.push( m1 ); // and push back
360 return this; // just nothing
364 value ParallelPipeFib(int n)
366 InputFilter input( n-1 );
367 MultiplyFilter process;
368 // Create the pipeline
371 pipeline.add_filter( input ); // first
372 pipeline.add_filter( process ); // second
374 input.Queue.push( Matrix1110 );
376 pipeline.run( n ); // must be larger then max threads number
377 pipeline.clear(); // do not forget clear the pipeline
379 assert( input.Queue.unsafe_size()==1 );
381 bool result = input.Queue.try_pop( M ); // get last element
383 return M.v[0][0]; // get value
386 // *** parallel_reduce *** //
388 //! Functor for parallel_reduce
389 struct parallel_reduceFibBody {
391 int splitted; //< flag to make one less operation for splitted bodies
392 //! Constructor fills sum with initial matrix
393 parallel_reduceFibBody() : sum( Matrix1110 ), splitted(0) { }
394 //! Splitting constructor
395 parallel_reduceFibBody( parallel_reduceFibBody& other, split ) : sum( Matrix1110 ), splitted(1/*note that it is splitted*/) {}
397 void join( parallel_reduceFibBody &s ) {
400 //! Process multiplications
401 void operator()( const blocked_range<int> &r ) {
402 for( int k = r.begin() + splitted; k < r.end(); ++k )
403 sum = sum * Matrix1110;
404 splitted = 0; // reset flag, because this method can be reused for next range
408 value parallel_reduceFib(int n)
410 parallel_reduceFibBody b;
411 parallel_reduce(blocked_range<int>(2, n, 3), b); // do parallel reduce on range [2, n) for b
412 return b.sum.v[0][0];
415 // *** parallel_scan *** //
417 //! Functor for parallel_scan
418 struct parallel_scanFibBody {
419 /** Though parallel_scan is usually used to accumulate running sums,
420 it can be used to accumulate running products too. */
422 /** Pointer to output sequence */
424 //! Constructor sets product to identity matrix
425 parallel_scanFibBody(value* output_) : product( MatrixIdentity ), output(output_) {}
426 //! Splitting constructor
427 parallel_scanFibBody( parallel_scanFibBody &b, split) : product( MatrixIdentity ), output(b.output) {}
428 //! Method for merging summary information from a, which was split off from *this, into *this.
429 void reverse_join( parallel_scanFibBody &a ) {
430 // When using non-commutative reduction operation, reverse_join
431 // should put argument "a" on the left side of the operation.
432 // The reversal from the argument order is why the method is
433 // called "reverse_join" instead of "join".
434 product = a.product * product;
436 //! Method for assigning final result back to original body.
437 void assign( parallel_scanFibBody &b ) {
440 //! Compute matrix running product.
441 /** Tag indicates whether is is the final scan over the range, or
442 just a helper "prescan" that is computing a partial reduction. */
443 template<typename Tag>
444 void operator()( const blocked_range<int> &r, Tag tag) {
445 for( int k = r.begin(); k < r.end(); ++k ) {
446 // Code performs an "exclusive" scan, which outputs a value *before* updating the product.
447 // For an "inclusive" scan, output the value after the update.
448 if( tag.is_final_scan() )
449 output[k] = product.v[0][1];
450 product = product * Matrix1110;
455 value parallel_scanFib(int n)
457 value* output = new value[n];
458 parallel_scanFibBody b(output);
459 parallel_scan(blocked_range<int>(0, n, 3), b);
460 // output[0..n-1] now contains the Fibonacci sequence (modulo integer wrap-around).
461 // Check the last two values for correctness.
462 assert( n<2 || output[n-2]+output[n-1]==b.product.v[0][1] );
464 return b.product.v[0][1];
467 // *** Raw tasks *** //
469 //! task class which computes Fibonacci numbers by Lucas formula
470 struct FibTask: public task {
474 bool second_phase; //< flag of continuation
476 FibTask( int n_, value& sum_ ) :
477 n(n_), sum(sum_), second_phase(false)
480 task* execute() /*override*/ {
481 // Using Lucas' formula here
482 if( second_phase ) { // children finished
483 sum = n&1 ? x*x + y*y : x*x - y*y;
490 recycle_as_continuation(); // repeat this task when children finish
491 second_phase = true; // mark second phase
492 FibTask& a = *new( allocate_child() ) FibTask( n/2 + 1, x );
493 FibTask& b = *new( allocate_child() ) FibTask( n/2 - 1 + (n&1), y );
501 value ParallelTaskFib(int n) {
503 FibTask& a = *new(task::allocate_root()) FibTask(n, sum);
504 task::spawn_root_and_wait(a);
508 /////////////////////////// Main ////////////////////////////////////////////////////
510 //! A closed range of int.
514 void set_from_string( const char* s );
515 IntRange( int low_, int high_ ) : low(low_), high(high_) {}
518 void IntRange::set_from_string( const char* s ) {
520 high = low = strtol(s,&end,0);
523 high = strtol(end+1,0,0);
528 printf("unexpected character = %c\n",*end);
532 //! Tick count for start
533 static tick_count t0;
535 //! Verbose output flag
536 static bool Verbose = false;
538 typedef value (*MeasureFunc)(int);
539 //! Measure ticks count in loop [2..n]
540 value Measure(const char *name, MeasureFunc func, int n)
543 if(Verbose) printf("%s",name);
544 t0 = tick_count::now();
545 for(int number = 2; number <= n; number++)
546 result = func(number);
547 if(Verbose) printf("\t- in %f msec\n", (tick_count::now() - t0).seconds()*1000);
552 int main(int argc, char* argv[])
554 if(argc>1) Verbose = true;
555 int NumbersCount = argc>1 ? strtol(argv[1],0,0) : 500;
556 IntRange NThread(1,4);// Number of threads to use.
557 if(argc>2) NThread.set_from_string(argv[2]);
558 unsigned long ntrial = argc>3? (unsigned long)strtoul(argv[3],0,0) : 1;
561 if(Verbose) printf("Fibonacci numbers example. Generating %d numbers..\n", NumbersCount);
563 result = Measure("Serial loop", SerialFib, NumbersCount);
564 sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount); assert(result == sum);
565 sum = Measure("Serial vector", SerialVectorFib, NumbersCount); assert(result == sum);
566 sum = Measure("Serial queue", SerialQueueFib, NumbersCount); assert(result == sum);
568 for( unsigned long i=0; i<ntrial; ++i ) {
569 for(int threads = NThread.low; threads <= NThread.high; threads *= 2)
571 task_scheduler_init scheduler_init(threads);
572 if(Verbose) printf("\nThreads number is %d\n", threads);
574 sum = Measure("Shared serial (mutex)\t", SharedSerialFib<mutex>, NumbersCount); assert(result == sum);
575 sum = Measure("Shared serial (spin_mutex)", SharedSerialFib<spin_mutex>, NumbersCount); assert(result == sum);
576 sum = Measure("Shared serial (queuing_mutex)", SharedSerialFib<queuing_mutex>, NumbersCount); assert(result == sum);
577 sum = Measure("Shared serial (Conc.HashTable)", ConcurrentHashSerialFib, NumbersCount); assert(result == sum);
578 sum = Measure("Parallel while+for/queue", ParallelQueueFib, NumbersCount); assert(result == sum);
579 sum = Measure("Parallel pipe/queue\t", ParallelPipeFib, NumbersCount); assert(result == sum);
580 sum = Measure("Parallel reduce\t\t", parallel_reduceFib, NumbersCount); assert(result == sum);
581 sum = Measure("Parallel scan\t\t", parallel_scanFib, NumbersCount); assert(result == sum);
582 sum = Measure("Parallel tasks\t\t", ParallelTaskFib, NumbersCount); assert(result == sum);
586 if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %lld\n\n", NumbersCount, result);
588 if(Verbose) printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n", NumbersCount, result);
591 if(!Verbose) printf("TEST PASSED\n");
597 void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2])
599 for( int i = 0; i <= 1; i++)
600 for( int j = 0; j <= 1; j++)
601 c[i][j] = a[i][0]*b[0][j] + a[i][1]*b[1][j];
604 Matrix2x2 Matrix2x2::operator *(const Matrix2x2 &to) const
607 Matrix2x2Multiply(v, to.v, result.v);