2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 #include "harness_graph.h"
19 #include "harness_barrier.h"
20 #include "tbb/concurrent_queue.h"
21 #include "tbb/flow_graph.h"
23 #include "tbb/tbb_thread.h"
24 #include "tbb/mutex.h"
25 #include "tbb/compat/condition_variable"
31 friend struct place_wrapper;
36 minimal_type() : value(-1) {}
37 minimal_type(int v) : value(v) {}
38 minimal_type(const minimal_type &m) : value(m.value) { }
39 minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
43 struct place_wrapper {
44 typedef T wrapped_type;
46 tbb::tbb_thread::id thread_id;
49 place_wrapper( ) : value(0) {
50 thread_id = tbb::this_tbb_thread::get_id();
51 task_ptr = &tbb::task::self();
53 place_wrapper( int v ) : value(v) {
54 thread_id = tbb::this_tbb_thread::get_id();
55 task_ptr = &tbb::task::self();
58 place_wrapper( const place_wrapper<int> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
60 place_wrapper( const place_wrapper<minimal_type> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
63 template<typename T1, typename T2>
64 struct wrapper_helper {
65 static void check(const T1 &, const T2 &) { }
67 static void copy_value(const T1 &in, T2 &out) {
72 template<typename T1, typename T2>
73 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
74 static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
75 REMARK("a.task_ptr == %p != b.task_ptr == %p\n", a.task_ptr, b.task_ptr);
76 ASSERT( (a.thread_id != b.thread_id), "same thread used to execute adjacent nodes");
77 ASSERT( (a.task_ptr != b.task_ptr), "same task used to execute adjacent nodes");
80 static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
85 const int NUMBER_OF_MSGS = 10;
86 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
87 tbb::atomic<int> async_body_exec_count;
88 tbb::atomic<int> async_activity_processed_msg_count;
89 tbb::atomic<int> end_body_exec_count;
91 // queueing required in test_reset for testing of cancellation
92 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
93 typedef counting_async_node_type::gateway_type counting_gateway_type;
95 struct counting_async_body {
96 tbb::atomic<int> my_async_body_exec_count;
98 counting_async_body() {
99 my_async_body_exec_count = 0;
102 void operator()( const int &input, counting_gateway_type& gateway) {
103 REMARK( "Body execution with input == %d\n", input);
104 ++my_async_body_exec_count;
105 ++async_body_exec_count;
107 bool result = tbb::task::self().group()->cancel_group_execution();
108 REMARK( "Canceling graph execution\n" );
109 ASSERT( result == true, "attempted to cancel graph twice" );
112 gateway.try_put(input);
117 const int N = NUMBER_OF_MSGS;
118 async_body_exec_count = 0;
121 counting_async_node_type a(g, tbb::flow::serial, counting_async_body() );
124 std::vector< harness_counting_receiver<int> > r(R, harness_counting_receiver<int>(g));
126 for (int i = 0; i < R; ++i) {
127 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
128 tbb::flow::make_edge(a, r[i]);
130 tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[i] );
134 REMARK( "One body execution\n" );
136 for (int i = 0; i < N; ++i) {
140 // should be canceled with only 1 item reaching the async_body and the counting receivers
141 // and N items left in the node's queue
142 ASSERT( g.is_cancelled() == true, "task group not canceled" );
144 counting_async_body b1 = tbb::flow::copy_body<counting_async_body>(a);
145 ASSERT( int(async_body_exec_count) == int(b1.my_async_body_exec_count), "body and global body counts are different" );
146 ASSERT( int(async_body_exec_count) == 1, "global body execution count not 1" );
147 for (int i = 0; i < R; ++i) {
148 ASSERT( int(r[i].my_count) == 1, "counting receiver count not 1" );
151 // should clear the async_node queue, but retain its local count at 1 and keep all edges
152 g.reset(tbb::flow::rf_reset_protocol);
154 REMARK( "N body executions\n" );
155 for (int i = 0; i < N; ++i) {
159 ASSERT( g.is_cancelled() == false, "task group not canceled" );
161 // a total of N+1 items should have passed through the node body
162 // the local body count should also be N+1
163 // and the counting receivers should all have a count of N+1
164 counting_async_body b2 = tbb::flow::copy_body<counting_async_body>(a);
165 ASSERT( int(async_body_exec_count) == int(b2.my_async_body_exec_count), "local and global body execution counts are different" );
166 REMARK( "async_body_exec_count==%d\n", int(async_body_exec_count) );
167 ASSERT( int(async_body_exec_count) == N+1, "globcal body execution count not N+1" );
168 for (int i = 0; i < R; ++i) {
169 ASSERT( int(r[i].my_count) == N+1, "counting receiver has not received N+1 items" );
172 REMARK( "N body executions with new bodies\n" );
173 // should clear the async_node queue and reset its local count to 0, but keep all edges
174 g.reset(tbb::flow::rf_reset_bodies);
175 for (int i = 0; i < N; ++i) {
179 ASSERT( g.is_cancelled() == false, "task group not canceled" );
181 // a total of 2N+1 items should have passed through the node body
182 // the local body count should be N
183 // and the counting receivers should all have a count of 2N+1
184 counting_async_body b3 = tbb::flow::copy_body<counting_async_body>(a);
185 ASSERT( int(async_body_exec_count) == 2*N+1, "global body execution count not 2N+1" );
186 ASSERT( int(b3.my_async_body_exec_count) == N, "local body execution count not N" );
187 for (int i = 0; i < R; ++i) {
188 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
191 // should clear the async_node queue and keep its local count at N and remove all edges
192 REMARK( "N body executions with no edges\n" );
193 g.reset(tbb::flow::rf_clear_edges);
194 for (int i = 0; i < N; ++i) {
198 ASSERT( g.is_cancelled() == false, "task group not canceled" );
200 // a total of 3N+1 items should have passed through the node body
201 // the local body count should now be 2*N
202 // and the counting receivers should remain at a count of 2N+1
203 counting_async_body b4 = tbb::flow::copy_body<counting_async_body>(a);
204 ASSERT( int(async_body_exec_count) == 3*N+1, "global body execution count not 3N+1" );
205 ASSERT( int(b4.my_async_body_exec_count) == 2*N, "local body execution count not 2N" );
206 for (int i = 0; i < R; ++i) {
207 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
210 // put back 1 edge to receiver 0
211 REMARK( "N body executions with 1 edge\n" );
212 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
213 tbb::flow::make_edge(a, r[0]);
215 tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[0] );
217 for (int i = 0; i < N; ++i) {
221 ASSERT( g.is_cancelled() == false, "task group not canceled" );
223 // a total of 4N+1 items should have passed through the node body
224 // the local body count should now be 3*N
225 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
226 counting_async_body b5 = tbb::flow::copy_body<counting_async_body>(a);
227 ASSERT( int(async_body_exec_count) == 4*N+1, "global body execution count not 4N+1" );
228 ASSERT( int(b5.my_async_body_exec_count) == 3*N, "local body execution count not 3N" );
229 ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
230 for (int i = 1; i < R; ++i) {
231 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
234 // should clear the async_node queue and keep its local count at N and remove all edges
235 REMARK( "N body executions with no edges and new body\n" );
236 g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
237 for (int i = 0; i < N; ++i) {
241 ASSERT( g.is_cancelled() == false, "task group not canceled" );
243 // a total of 4N+1 items should have passed through the node body
244 // the local body count should now be 3*N
245 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
246 counting_async_body b6 = tbb::flow::copy_body<counting_async_body>(a);
247 ASSERT( int(async_body_exec_count) == 5*N+1, "global body execution count not 5N+1" );
248 ASSERT( int(b6.my_async_body_exec_count) == N, "local body execution count not N" );
249 ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
250 for (int i = 1; i < R; ++i) {
251 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
255 template< typename Input, typename Output >
256 class async_activity : NoAssign {
258 typedef Input input_type;
259 typedef Output output_type;
260 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
261 typedef typename async_node_type::gateway_type gateway_type;
265 gateway_type* gateway;
268 class ServiceThreadBody {
270 ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
273 my_activity->process();
276 async_activity* my_activity;
279 async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
280 : my_expected_items(expected_items), my_sleep_time(sleep_time) {
281 is_active = !deferred;
283 tbb::tbb_thread( ServiceThreadBody( this ) ).swap( my_service_thread );
288 async_activity( const async_activity& )
289 : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) {
296 my_service_thread.join();
299 void submit( const input_type &input, gateway_type& gateway ) {
300 work_type work = { input, &gateway};
301 my_work_queue.push( work );
307 if( is_active && my_work_queue.try_pop( work ) ) {
308 Harness::Sleep(my_sleep_time);
309 ++async_activity_processed_msg_count;
311 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
312 wrapper_helper<output_type, output_type>::check(work.input, output);
313 work.gateway->try_put(output);
314 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
315 int(async_activity_processed_msg_count) == my_expected_items ) {
316 work.gateway->release_wait();
319 } while( my_quit == false || !my_work_queue.empty());
330 bool should_reserve_each_time() {
331 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
339 const int my_expected_items;
340 const int my_sleep_time;
341 tbb::atomic< bool > is_active;
343 tbb::concurrent_queue< work_type > my_work_queue;
345 tbb::atomic< bool > my_quit;
347 tbb::tbb_thread my_service_thread;
350 template<typename Input, typename Output>
352 typedef Input input_type;
353 typedef Output output_type;
354 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
355 typedef typename async_node_type::gateway_type gateway_type;
357 class start_body_type {
358 typedef Input input_type;
360 input_type operator()( int input ) {
361 return input_type(input);
365 #if !__TBB_CPP11_LAMBDAS_PRESENT
366 class async_body_type {
367 typedef Input input_type;
368 typedef Output output_type;
369 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
370 typedef typename async_node_type::gateway_type gateway_type;
372 typedef async_activity<input_type, output_type> async_activity_type;
374 async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
376 async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
378 void operator()( const input_type &input, gateway_type& gateway ) {
379 ++async_body_exec_count;
380 my_async_activity->submit( input, gateway);
381 if ( my_async_activity->should_reserve_each_time() )
382 gateway.reserve_wait();
386 async_activity_type* my_async_activity;
390 class end_body_type {
391 typedef Output output_type;
393 void operator()( const output_type &input ) {
394 ++end_body_exec_count;
396 wrapper_helper<output_type, output_type>::check(input, output);
404 static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
405 async_activity<input_type, output_type> my_async_activity(async_expected_items);
407 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
408 #if __TBB_CPP11_LAMBDAS_PRESENT
409 async_node_type offload_node(g, tbb::flow::unlimited, [&] (const input_type &input, gateway_type& gateway) {
410 ++async_body_exec_count;
411 my_async_activity.submit(input, gateway);
412 if(my_async_activity.should_reserve_each_time())
413 gateway.reserve_wait();
416 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
419 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
421 tbb::flow::make_edge( start_node, offload_node );
422 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
423 tbb::flow::make_edge( offload_node, end_node );
425 tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
427 async_body_exec_count = 0;
428 async_activity_processed_msg_count = 0;
429 end_body_exec_count = 0;
431 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
432 offload_node.gateway().reserve_wait();
434 for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
435 start_node.try_put(i);
438 ASSERT( async_body_exec_count == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
439 ASSERT( async_activity_processed_msg_count == NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
440 ASSERT( end_body_exec_count == NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
441 REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
442 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
443 return Harness::Done;
448 int test_copy_ctor() {
449 const int N = NUMBER_OF_MSGS;
450 async_body_exec_count = 0;
454 harness_counting_receiver<int> r1(g);
455 harness_counting_receiver<int> r2(g);
457 counting_async_node_type a(g, tbb::flow::unlimited, counting_async_body() );
458 counting_async_node_type b(a);
459 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
460 tbb::flow::make_edge(a, r1);
461 tbb::flow::make_edge(b, r2);
463 tbb::flow::make_edge(tbb::flow::output_port<0>(a), r1);
464 tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);
467 for (int i = 0; i < N; ++i) {
472 REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
473 REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
474 ASSERT( int(async_body_exec_count) == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
475 ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
476 ASSERT( int(r2.my_count) == 0, "counting receiver r2 has not received 0 items" );
478 for (int i = 0; i < N; ++i) {
483 REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
484 REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
485 ASSERT( int(async_body_exec_count) == 2*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
486 ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
487 ASSERT( int(r2.my_count) == N, "counting receiver r2 has not received N items" );
488 return Harness::Done;
491 tbb::atomic<int> main_tid_count;
493 template<typename Input, typename Output>
495 typedef Input input_type;
496 typedef Output output_type;
497 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
498 typedef typename async_node_type::gateway_type gateway_type;
500 class start_body_type {
501 typedef Input input_type;
503 input_type operator()( int input ) {
504 return input_type(input);
508 #if !__TBB_CPP11_LAMBDAS_PRESENT
509 class async_body_type {
510 typedef Input input_type;
511 typedef Output output_type;
512 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
513 typedef typename async_node_type::gateway_type gateway_type;
515 typedef async_activity<input_type, output_type> async_activity_type;
517 async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
519 async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
521 void operator()(const input_type &input, gateway_type& gateway) {
522 ++async_body_exec_count;
523 my_async_activity->submit(input, gateway);
524 if(my_async_activity->should_reserve_each_time())
525 gateway.reserve_wait();
529 async_activity_type* my_async_activity;
533 class end_body_type {
534 typedef Output output_type;
535 tbb::tbb_thread::id my_main_tid;
536 Harness::SpinBarrier *my_barrier;
538 end_body_type(tbb::tbb_thread::id t, Harness::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
540 void operator()( const output_type & ) {
541 ++end_body_exec_count;
542 if (tbb::this_tbb_thread::get_id() == my_main_tid) {
545 my_barrier->timed_wait_noerror(10);
551 static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
552 async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
553 Harness::SpinBarrier spin_barrier(nthreads);
555 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
556 #if __TBB_CPP11_LAMBDAS_PRESENT
557 async_node_type offload_node(g, tbb::flow::unlimited, [&](const input_type &input, gateway_type& gateway) {
558 ++async_body_exec_count;
559 my_async_activity.submit(input, gateway);
560 if(my_async_activity.should_reserve_each_time())
561 gateway.reserve_wait();
564 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
566 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type(tbb::this_tbb_thread::get_id(), spin_barrier) );
567 tbb::flow::make_edge( start_node, offload_node );
568 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
569 tbb::flow::make_edge( offload_node, end_node );
571 tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
573 async_body_exec_count = 0;
574 async_activity_processed_msg_count = 0;
575 end_body_exec_count = 0;
578 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
579 offload_node.gateway().reserve_wait();
581 for (int i = 0; i < nthreads*NUMBER_OF_MSGS; ++i) {
582 start_node.try_put(i);
585 ASSERT( async_body_exec_count == nthreads*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
586 ASSERT( async_activity_processed_msg_count == nthreads*NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
587 ASSERT( end_body_exec_count == nthreads*NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
588 ASSERT_WARNING( main_tid_count != 0, "Main thread did not participate in end_body tasks");
589 REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
590 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
591 return Harness::Done;
596 void test_for_spin_avoidance() {
597 spin_test<int, int>::run(4);
600 template< typename Input, typename Output >
602 basic_test<Input, Output>::run();
603 basic_test<Input, Output>::run(NUMBER_OF_MSGS);
604 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
605 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
606 return Harness::Done;
609 #include "tbb/parallel_for.h"
610 template<typename Input, typename Output>
611 class equeueing_on_inner_level {
612 typedef Input input_type;
613 typedef Output output_type;
614 typedef async_activity<input_type, output_type> async_activity_type;
615 typedef tbb::flow::async_node<Input, Output> async_node_type;
616 typedef typename async_node_type::gateway_type gateway_type;
618 class start_body_type {
620 input_type operator() ( int input ) {
621 return input_type( input);
625 class async_body_type {
627 async_body_type( async_activity_type& activity ) : my_async_activity(&activity) {}
629 void operator() ( const input_type &input, gateway_type& gateway ) {
630 gateway.reserve_wait();
631 my_async_activity->submit( input, gateway );
634 async_activity_type* my_async_activity;
637 class end_body_type {
639 void operator()( output_type ) {}
642 class body_graph_with_async {
644 body_graph_with_async( Harness::SpinBarrier& barrier, async_activity_type& activity )
645 : spin_barrier(&barrier), my_async_activity(&activity) {}
647 void operator()(int) const {
649 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
651 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( *my_async_activity ) );
653 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
655 tbb::flow::make_edge( start_node, offload_node );
656 tbb::flow::make_edge( offload_node, end_node );
658 start_node.try_put(1);
660 spin_barrier->wait();
662 my_async_activity->activate();
668 Harness::SpinBarrier* spin_barrier;
669 async_activity_type* my_async_activity;
676 const int nthreads = tbb::this_task_arena::max_concurrency();
677 Harness::SpinBarrier spin_barrier( nthreads );
679 async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
681 tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
682 return Harness::Done;
686 int run_test_equeueing_on_inner_level() {
687 equeueing_on_inner_level<int, int>::run();
688 return Harness::Done;
691 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
695 template<typename NodeType>
696 class AsyncActivity {
698 using gateway_t = typename NodeType::gateway_type;
705 AsyncActivity(size_t limit) : thr([this]() {
706 while(!end_of_work()) {
708 while( my_q.try_pop(w) ) {
709 int res = do_work(w.input);
710 w.gateway->try_put(res);
711 w.gateway->release_wait();
715 }), stop_limit(limit), c(0) {}
717 void submit(int i, gateway_t* gateway) {
718 work_type w = {i, gateway};
719 gateway->reserve_wait();
723 void wait_for_all() { thr.join(); }
726 bool end_of_work() { return c >= stop_limit; }
728 int do_work(int& i) { return i + i; }
730 tbb::concurrent_queue<work_type> my_q;
736 void test_follows() {
737 using namespace tbb::flow;
740 using output_t = int;
741 using node_t = async_node<input_t, output_t>;
745 AsyncActivity<node_t> async_activity(3);
747 std::array<broadcast_node<input_t>, 3> preds = {
748 broadcast_node<input_t>(g),
749 broadcast_node<input_t>(g),
750 broadcast_node<input_t>(g)
753 node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
754 async_activity.submit(input, >w);
757 buffer_node<output_t> buf(g);
758 make_edge(node, buf);
760 for(auto& pred: preds) {
765 async_activity.wait_for_all();
768 ASSERT((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
769 "Not exact edge quantity was made");
772 void test_precedes() {
773 using namespace tbb::flow;
776 using output_t = int;
777 using node_t = async_node<input_t, output_t>;
781 AsyncActivity<node_t> async_activity(1);
783 std::array<buffer_node<input_t>, 2> successors = {
784 buffer_node<input_t>(g),
785 buffer_node<input_t>(g)
788 broadcast_node<input_t> start(g);
790 node_t node(precedes(successors[0], successors[1]), unlimited, [&](int input, node_t::gateway_type& gtw) {
791 async_activity.submit(input, >w);
794 make_edge(start, node);
799 async_activity.wait_for_all();
801 for(auto& successor : successors) {
803 ASSERT(successor.try_get(storage) && !successor.try_get(storage),
804 "Not exact edge quantity was made");
808 void test_follows_and_precedes_api() {
812 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
815 tbb::task_scheduler_init init(4);
816 run_tests<int, int>();
817 run_tests<minimal_type, minimal_type>();
818 run_tests<int, minimal_type>();
820 lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
824 test_for_spin_avoidance();
825 run_test_equeueing_on_inner_level();
826 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
827 test_follows_and_precedes_api();
829 return Harness::Done;