2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 // TO DO: Add overlapping put / receive tests
20 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
25 #include "tbb/flow_graph.h"
26 #include "tbb/task_scheduler_init.h"
27 #include "tbb/tick_count.h"
28 #include "harness_checktype.h"
29 #include "harness_graph.h"
30 #include "test_follows_and_precedes_api.h"
37 template< typename T >
38 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
39 while ( q.try_get(value) != true ) ;
42 template< typename T >
43 void check_item( T* next_value, T &value ) {
45 int offset = value % N;
46 ASSERT( next_value[tid] == T(offset), NULL );
50 template< typename T >
51 struct parallel_puts : NoAssign {
53 tbb::flow::queue_node<T> &my_q;
55 parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
57 void operator()(int i) const {
58 for (int j = 0; j < N; ++j) {
59 bool msg = my_q.try_put( T(N*i + j) );
60 ASSERT( msg == true, NULL );
68 template< typename T >
75 touches( int num_threads ) : my_num_threads(num_threads) {
76 my_last_touch = new T* [my_num_threads];
77 my_touches = new bool* [my_num_threads];
78 for ( int p = 0; p < my_num_threads; ++p) {
79 my_last_touch[p] = new T[my_num_threads];
80 for ( int p2 = 0; p2 < my_num_threads; ++p2)
81 my_last_touch[p][p2] = -1;
83 my_touches[p] = new bool[N*my_num_threads];
84 for ( int n = 0; n < N*my_num_threads; ++n)
85 my_touches[p][n] = false;
90 for ( int p = 0; p < my_num_threads; ++p) {
91 delete [] my_touches[p];
92 delete [] my_last_touch[p];
95 delete [] my_last_touch;
98 bool check( int tid, T v ) {
100 if ( my_touches[tid][v] != false ) {
101 printf("Error: value seen twice by local thread\n");
104 if ( v <= my_last_touch[tid][v_tid] ) {
105 printf("Error: value seen in wrong order by local thread\n");
108 my_last_touch[tid][v_tid] = v;
109 my_touches[tid][v] = true;
113 bool validate_touches() {
114 bool *all_touches = new bool[N*my_num_threads];
115 for ( int n = 0; n < N*my_num_threads; ++n)
116 all_touches[n] = false;
118 for ( int p = 0; p < my_num_threads; ++p) {
119 for ( int n = 0; n < N*my_num_threads; ++n) {
120 if ( my_touches[p][n] == true ) {
121 ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
122 all_touches[n] = true;
126 for ( int n = 0; n < N*my_num_threads; ++n) {
127 if ( !all_touches[n] )
128 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
129 //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
131 delete [] all_touches;
137 template< typename T >
138 struct parallel_gets : NoAssign {
140 tbb::flow::queue_node<T> &my_q;
141 touches<T> &my_touches;
143 parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
145 void operator()(int tid) const {
146 for (int j = 0; j < N; ++j) {
148 spin_try_get( my_q, v );
149 my_touches.check( tid, v );
155 template< typename T >
156 struct parallel_put_get : NoAssign {
158 tbb::flow::queue_node<T> &my_q;
159 touches<T> &my_touches;
161 parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
163 void operator()(int tid) const {
165 for ( int i = 0; i < N; i+=C ) {
166 int j_end = ( N < i + C ) ? N : i + C;
167 // dump about C values into the Q
168 for ( int j = i; j < j_end; ++j ) {
169 ASSERT( my_q.try_put( T (N*tid + j ) ) == true, NULL );
171 // receiver about C values from the Q
172 for ( int j = i; j < j_end; ++j ) {
174 spin_try_get( my_q, v );
175 my_touches.check( tid, v );
185 // Item can be reserved, released, consumed ( single serial receiver )
187 template< typename T >
188 int test_reservation() {
193 tbb::flow::queue_node<T> q(g);
200 ASSERT( q.reserve_item(v) == true, NULL );
201 ASSERT( v == T(1), NULL );
202 ASSERT( q.release_reservation() == true, NULL );
205 ASSERT( q.reserve_item(v) == true, NULL );
206 ASSERT( v == T(1), NULL );
207 ASSERT( q.consume_reservation() == true, NULL );
211 ASSERT( q.try_get(v) == true, NULL );
212 ASSERT( v == T(2), NULL );
216 ASSERT( q.reserve_item(v) == true, NULL );
217 ASSERT( v == T(3), NULL );
218 ASSERT( q.release_reservation() == true, NULL );
221 ASSERT( q.reserve_item(v) == true, NULL );
222 ASSERT( v == T(3), NULL );
223 ASSERT( q.consume_reservation() == true, NULL );
233 // multiple parallel senders, items in FIFO (relatively to sender) order
234 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
235 // * overlapped puts / gets
236 // * all puts finished before any getS
238 template< typename T >
239 int test_parallel(int num_threads) {
241 tbb::flow::queue_node<T> q(g);
242 tbb::flow::queue_node<T> q2(g);
243 tbb::flow::queue_node<T> q3(g);
248 NativeParallelFor( num_threads, parallel_puts<T>(q) );
250 T *next_value = new T[num_threads];
251 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
253 for (int i = 0; i < num_threads * N; ++i ) {
254 spin_try_get( q, j );
255 check_item( next_value, j );
258 for (int tid = 0; tid < num_threads; ++tid) {
259 ASSERT( next_value[tid] == T(N), NULL );
265 ASSERT( q.try_get( j ) == false, NULL );
266 ASSERT( j == bogus_value, NULL );
268 NativeParallelFor( num_threads, parallel_puts<T>(q) );
271 touches< T > t( num_threads );
272 NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
274 ASSERT( t.validate_touches(), NULL );
277 ASSERT( q.try_get( j ) == false, NULL );
278 ASSERT( j == bogus_value, NULL );
282 touches< T > t2( num_threads );
283 NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
285 ASSERT( t2.validate_touches(), NULL );
288 ASSERT( q.try_get( j ) == false, NULL );
289 ASSERT( j == bogus_value, NULL );
291 tbb::flow::make_edge( q, q2 );
292 tbb::flow::make_edge( q2, q3 );
294 NativeParallelFor( num_threads, parallel_puts<T>(q) );
296 touches< T > t3( num_threads );
297 NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
299 ASSERT( t3.validate_touches(), NULL );
303 ASSERT( q.try_get( j ) == false, NULL );
305 ASSERT( q2.try_get( j ) == false, NULL );
307 ASSERT( q3.try_get( j ) == false, NULL );
308 ASSERT( j == bogus_value, NULL );
310 // test copy constructor
311 ASSERT( q.remove_successor( q2 ), NULL );
312 NativeParallelFor( num_threads, parallel_puts<T>(q) );
313 tbb::flow::queue_node<T> q_copy(q);
316 ASSERT( q_copy.try_get( j ) == false, NULL );
317 ASSERT( q.register_successor( q_copy ) == true, NULL );
319 touches< T > t( num_threads );
320 NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
322 ASSERT( t.validate_touches(), NULL );
325 ASSERT( q.try_get( j ) == false, NULL );
326 ASSERT( j == bogus_value, NULL );
327 ASSERT( q_copy.try_get( j ) == false, NULL );
328 ASSERT( j == bogus_value, NULL );
337 // Predecessors cannot be registered
338 // Empty Q rejects item requests
339 // Single serial sender, items in FIFO order
340 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
343 template< typename T >
346 tbb::flow::queue_node<T> q(g);
347 tbb::flow::queue_node<T> q2(g);
348 { // destroy the graph after manipulating it, and see if all the items in the buffers
349 // have been destroyed before the graph
350 Check<T> my_check; // if check_type< U > count constructions and destructions
355 // Rejects attempts to add / remove predecessor
356 // Rejects request from empty Q
358 ASSERT( q.register_predecessor( q2 ) == false, NULL );
359 ASSERT( q.remove_predecessor( q2 ) == false, NULL );
360 ASSERT( q.try_get( j ) == false, NULL );
361 ASSERT( j == bogus_value, NULL );
364 // Simple puts and gets
367 for (int i = 0; i < N; ++i) {
368 bool msg = q.try_put( T(i) );
369 ASSERT( msg == true, NULL );
373 for (int i = 0; i < N; ++i) {
375 spin_try_get( q, j );
376 ASSERT( i == j, NULL );
380 ASSERT( q.try_get( j ) == false, NULL );
381 ASSERT( j == bogus_value, NULL );
383 tbb::flow::make_edge( q, q2 );
385 for (int i = 0; i < N; ++i) {
386 bool msg = q.try_put( T(i) );
387 ASSERT( msg == true, NULL );
391 for (int i = 0; i < N; ++i) {
393 spin_try_get( q2, j );
394 ASSERT( i == j, NULL );
398 ASSERT( q.try_get( j ) == false, NULL );
400 ASSERT( q2.try_get( j ) == false, NULL );
401 ASSERT( j == bogus_value, NULL );
403 tbb::flow::remove_edge( q, q2 );
404 ASSERT( q.try_put( 1 ) == true, NULL );
406 ASSERT( q2.try_get( j ) == false, NULL );
407 ASSERT( j == bogus_value, NULL );
409 ASSERT( q.try_get( j ) == true, NULL );
410 ASSERT( j == 1, NULL );
412 tbb::flow::queue_node<T> q3(g);
413 tbb::flow::make_edge( q, q2 );
414 tbb::flow::make_edge( q2, q3 );
416 for (int i = 0; i < N; ++i) {
417 bool msg = q.try_put( T(i) );
418 ASSERT( msg == true, NULL );
421 for (int i = 0; i < N; ++i) {
423 spin_try_get( q3, j );
424 ASSERT( i == j, NULL );
428 ASSERT( q.try_get( j ) == false, NULL );
430 ASSERT( q2.try_get( j ) == false, NULL );
432 ASSERT( q3.try_get( j ) == false, NULL );
433 ASSERT( j == bogus_value, NULL );
435 tbb::flow::remove_edge( q, q2 );
436 ASSERT( q.try_put( 1 ) == true, NULL );
438 ASSERT( q2.try_get( j ) == false, NULL );
439 ASSERT( j == bogus_value, NULL );
441 ASSERT( q3.try_get( j ) == false, NULL );
442 ASSERT( j == bogus_value, NULL );
444 ASSERT( q.try_get( j ) == true, NULL );
445 ASSERT( j == 1, NULL );
451 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454 void test_follows_and_precedes_api() {
455 std::array<int, 3> messages_for_follows = {0, 1, 2};
456 std::vector<int> messages_for_precedes = {0, 1, 2};
458 follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
459 follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
463 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
464 void test_deduction_guides() {
465 using namespace tbb::flow;
467 broadcast_node<int> br(g);
468 queue_node<int> q0(g);
470 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
471 queue_node q1(follows(br));
472 static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
474 queue_node q2(precedes(br));
475 static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
479 static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
485 tbb::tick_count start = tbb::tick_count::now(), stop;
486 for (int p = 2; p <= 4; ++p) {
487 tbb::task_scheduler_init init(p);
489 test_serial<check_type<int> >();
490 test_parallel<int>(p);
491 test_parallel<check_type<int> >(p);
493 stop = tbb::tick_count::now();
494 REMARK("Queue_Node Time=%6.6f\n", (stop-start).seconds());
495 REMARK("Testing resets\n");
496 test_resets<int, tbb::flow::queue_node<int> >();
497 test_resets<float, tbb::flow::queue_node<float> >();
498 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
499 test_buffer_extract<tbb::flow::queue_node<int> >().run_tests();
501 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
502 test_follows_and_precedes_api();
504 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
505 test_deduction_guides();
507 return Harness::Done;