2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
20 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
23 #include "tbb/flow_graph.h"
24 #include "tbb/task_scheduler_init.h"
25 #include "tbb/tick_count.h"
26 #include "tbb/atomic.h"
27 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
28 #include "harness_graph.h"
36 template< typename T >
37 struct seq_inspector {
38 size_t operator()(const T &v) const { return size_t(v); }
41 template< typename T >
42 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
44 return q.try_get(value);
47 template< typename T >
48 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
49 while ( q.try_get(value) != true ) ;
52 template< typename T >
53 struct parallel_puts : NoAssign {
55 tbb::flow::sequencer_node<T> &my_q;
58 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
60 void operator()(int tid) const {
61 for (int j = tid; j < N; j+=my_num_threads) {
62 bool msg = my_q.try_put( T(j) );
63 ASSERT( msg == true, NULL );
69 template< typename T >
76 touches( int num_threads ) : my_num_threads(num_threads) {
77 my_last_touch = new T[my_num_threads];
78 my_touches = new bool* [my_num_threads];
79 for ( int p = 0; p < my_num_threads; ++p) {
80 my_last_touch[p] = T(-1);
81 my_touches[p] = new bool[N];
82 for ( int n = 0; n < N; ++n)
83 my_touches[p][n] = false;
88 for ( int p = 0; p < my_num_threads; ++p) {
89 delete [] my_touches[p];
92 delete [] my_last_touch;
95 bool check( int tid, T v ) {
96 if ( my_touches[tid][v] != false ) {
97 printf("Error: value seen twice by local thread\n");
100 if ( v <= my_last_touch[tid] ) {
101 printf("Error: value seen in wrong order by local thread\n");
104 my_last_touch[tid] = v;
105 my_touches[tid][v] = true;
109 bool validate_touches() {
110 bool *all_touches = new bool[N];
111 for ( int n = 0; n < N; ++n)
112 all_touches[n] = false;
114 for ( int p = 0; p < my_num_threads; ++p) {
115 for ( int n = 0; n < N; ++n) {
116 if ( my_touches[p][n] == true ) {
117 ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
118 all_touches[n] = true;
122 for ( int n = 0; n < N; ++n) {
123 if ( !all_touches[n] )
124 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
125 //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
127 delete [] all_touches;
133 template< typename T >
134 struct parallel_gets : NoAssign {
136 tbb::flow::sequencer_node<T> &my_q;
138 touches<T> &my_touches;
140 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
142 void operator()(int tid) const {
143 for (int j = tid; j < N; j+=my_num_threads) {
145 spin_try_get( my_q, v );
146 my_touches.check( tid, v );
152 template< typename T >
153 struct parallel_put_get : NoAssign {
155 tbb::flow::sequencer_node<T> &my_s1;
156 tbb::flow::sequencer_node<T> &my_s2;
158 tbb::atomic< int > &my_counter;
159 touches<T> &my_touches;
161 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
162 tbb::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
164 void operator()(int tid) const {
167 while ( (i_start = my_counter.fetch_and_add(C)) < N ) {
168 int i_end = ( N < i_start + C ) ? N : i_start + C;
169 for (int i = i_start; i < i_end; ++i) {
170 bool msg = my_s1.try_put( T(i) );
171 ASSERT( msg == true, NULL );
174 for (int i = i_start; i < i_end; ++i) {
176 spin_try_get( my_s2, v );
177 my_touches.check( tid, v );
187 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
188 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
191 template< typename T >
192 int test_parallel(int num_threads) {
195 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
196 NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
198 touches<T> t( num_threads );
199 NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
201 ASSERT( t.validate_touches(), NULL );
205 ASSERT( s.try_get( j ) == false, NULL );
206 ASSERT( j == bogus_value, NULL );
209 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
210 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
211 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
212 tbb::flow::make_edge( s1, s2 );
213 tbb::flow::make_edge( s2, s3 );
216 touches<T> t( num_threads );
217 tbb::atomic<int> counter;
219 NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
221 t.validate_touches();
224 ASSERT( s1.try_get( j ) == false, NULL );
226 ASSERT( s2.try_get( j ) == false, NULL );
228 ASSERT( s3.try_get( j ) == false, NULL );
229 ASSERT( j == bogus_value, NULL );
231 // test copy constructor
232 tbb::flow::sequencer_node<T> s_copy(s);
233 NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
234 for (int i = 0; i < N; ++i) {
236 spin_try_get( s_copy, j );
237 ASSERT( i == j, NULL );
241 ASSERT( s_copy.try_get( j ) == false, NULL );
242 ASSERT( j == bogus_value, NULL );
251 // No predecessors can be registered
252 // Request from empty buffer fails
253 // In-order puts, single sender, single receiver, properly sequenced at output
254 // Reverse-order puts, single sender, single receiver, properly sequenced at output
255 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
258 template< typename T >
263 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
264 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
268 // Rejects attempts to add / remove predecessor
269 // Rejects request from empty Q
271 ASSERT( s.register_predecessor( s2 ) == false, NULL );
272 ASSERT( s.remove_predecessor( s2 ) == false, NULL );
273 ASSERT( s.try_get( j ) == false, NULL );
274 ASSERT( j == bogus_value, NULL );
277 // In-order simple puts and gets
280 for (int i = 0; i < N; ++i) {
281 bool msg = s.try_put( T(i) );
282 ASSERT( msg == true, NULL );
283 ASSERT(!s.try_put( T(i) ), NULL); // second attempt to put should reject
287 for (int i = 0; i < N; ++i) {
289 ASSERT(wait_try_get( g, s, j ) == true, NULL);
290 ASSERT( i == j, NULL );
291 ASSERT(!s.try_put( T(i) ),NULL ); // after retrieving value, subsequent put should fail
295 ASSERT( s.try_get( j ) == false, NULL );
296 ASSERT( j == bogus_value, NULL );
299 // Reverse-order simple puts and gets
302 for (int i = N-1; i >= 0; --i) {
303 bool msg = s2.try_put( T(i) );
304 ASSERT( msg == true, NULL );
307 for (int i = 0; i < N; ++i) {
309 ASSERT(wait_try_get( g, s2, j ) == true, NULL);
310 ASSERT( i == j, NULL );
314 ASSERT( s2.try_get( j ) == false, NULL );
315 ASSERT( j == bogus_value, NULL );
318 // Chained in-order simple puts and gets
321 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
322 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
323 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
324 tbb::flow::make_edge( s3, s4 );
325 tbb::flow::make_edge( s4, s5 );
327 for (int i = 0; i < N; ++i) {
328 bool msg = s3.try_put( T(i) );
329 ASSERT( msg == true, NULL );
332 for (int i = 0; i < N; ++i) {
334 ASSERT(wait_try_get( g, s5, j ) == true, NULL);
335 ASSERT( i == j, NULL );
338 ASSERT( wait_try_get( g, s3, j ) == false, NULL );
339 ASSERT( wait_try_get( g, s4, j ) == false, NULL );
340 ASSERT( wait_try_get( g, s5, j ) == false, NULL );
341 ASSERT( j == bogus_value, NULL );
344 tbb::flow::remove_edge( s3, s4 );
345 ASSERT( s3.try_put( N ) == true, NULL );
346 ASSERT( wait_try_get( g, s4, j ) == false, NULL );
347 ASSERT( j == bogus_value, NULL );
348 ASSERT( wait_try_get( g, s5, j ) == false, NULL );
349 ASSERT( j == bogus_value, NULL );
350 ASSERT( wait_try_get( g, s3, j ) == true, NULL );
351 ASSERT( j == N, NULL );
354 // Chained reverse-order simple puts and gets
357 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
358 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
359 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
360 tbb::flow::make_edge( s6, s7 );
361 tbb::flow::make_edge( s7, s8 );
363 for (int i = N-1; i >= 0; --i) {
364 bool msg = s6.try_put( T(i) );
365 ASSERT( msg == true, NULL );
368 for (int i = 0; i < N; ++i) {
370 ASSERT( wait_try_get( g, s8, j ) == true, NULL );
371 ASSERT( i == j, NULL );
374 ASSERT( wait_try_get( g, s6, j ) == false, NULL );
375 ASSERT( wait_try_get( g, s7, j ) == false, NULL );
376 ASSERT( wait_try_get( g, s8, j ) == false, NULL );
377 ASSERT( j == bogus_value, NULL );
380 tbb::flow::remove_edge( s6, s7 );
381 ASSERT( s6.try_put( N ) == true, NULL );
382 ASSERT( wait_try_get( g, s7, j ) == false, NULL );
383 ASSERT( j == bogus_value, NULL );
384 ASSERT( wait_try_get( g, s8, j ) == false, NULL );
385 ASSERT( j == bogus_value, NULL );
386 ASSERT( wait_try_get( g, s6, j ) == true, NULL );
387 ASSERT( j == N, NULL );
393 tbb::tick_count start = tbb::tick_count::now(), stop;
394 for (int p = 2; p <= 4; ++p) {
395 tbb::task_scheduler_init init(p);
397 test_parallel<int>(p);
399 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
400 test_buffer_extract<tbb::flow::sequencer_node<int> >().run_tests();
402 stop = tbb::tick_count::now();
403 REMARK("Sequencer_Node Time=%6.6f\n", (stop-start).seconds());
404 return Harness::Done;