Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / src / test / test_async_node.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #include "harness.h"
18 #include "harness_graph.h"
19 #include "harness_barrier.h"
20 #include "tbb/concurrent_queue.h"
21 #include "tbb/flow_graph.h"
22 #include "tbb/task.h"
23 #include "tbb/tbb_thread.h"
24 #include "tbb/mutex.h"
25 #include "tbb/compat/condition_variable"
26
27 #include <string>
28
29 class minimal_type {
30     template<typename T>
31     friend struct place_wrapper;
32
33     int value;
34
35 public:
36     minimal_type() : value(-1) {}
37     minimal_type(int v) : value(v) {}
38     minimal_type(const minimal_type &m) : value(m.value) { }
39     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
40 };
41
42 template <typename T>
43 struct place_wrapper {
44     typedef T wrapped_type;
45     T value;
46     tbb::tbb_thread::id thread_id;
47     tbb::task* task_ptr;
48
49     place_wrapper( ) : value(0) {
50         thread_id = tbb::this_tbb_thread::get_id();
51         task_ptr = &tbb::task::self();
52     }
53     place_wrapper( int v ) : value(v) {
54         thread_id = tbb::this_tbb_thread::get_id();
55         task_ptr = &tbb::task::self();
56     }
57
58     place_wrapper( const place_wrapper<int> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
59
60     place_wrapper( const place_wrapper<minimal_type> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
61 };
62
63 template<typename T1, typename T2>
64 struct wrapper_helper {
65     static void check(const T1 &, const T2 &) { }
66
67     static void copy_value(const T1 &in, T2 &out) {
68         out = in;
69     }
70 };
71
72 template<typename T1, typename T2>
73 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
74     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
75        REMARK("a.task_ptr == %p != b.task_ptr == %p\n", a.task_ptr, b.task_ptr);
76        ASSERT( (a.thread_id != b.thread_id), "same thread used to execute adjacent nodes");
77        ASSERT( (a.task_ptr != b.task_ptr), "same task used to execute adjacent nodes");
78        return;
79     }
80     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
81         out.value = in.value;
82     }
83 };
84
85 const int NUMBER_OF_MSGS = 10;
86 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
87 tbb::atomic<int> async_body_exec_count;
88 tbb::atomic<int> async_activity_processed_msg_count;
89 tbb::atomic<int> end_body_exec_count;
90
91 // queueing required in test_reset for testing of cancellation
92 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
93 typedef counting_async_node_type::gateway_type counting_gateway_type;
94
95 struct counting_async_body {
96     tbb::atomic<int> my_async_body_exec_count;
97
98     counting_async_body() {
99         my_async_body_exec_count = 0;
100     }
101
102     void operator()( const int &input, counting_gateway_type& gateway) {
103         REMARK( "Body execution with input == %d\n", input);
104         ++my_async_body_exec_count;
105         ++async_body_exec_count;
106         if ( input == -1 ) {
107             bool result = tbb::task::self().group()->cancel_group_execution();
108             REMARK( "Canceling graph execution\n" );
109             ASSERT( result == true, "attempted to cancel graph twice" );
110             Harness::Sleep(50);
111         }
112         gateway.try_put(input);
113     }
114 };
115
116 void test_reset() {
117     const int N = NUMBER_OF_MSGS;
118     async_body_exec_count = 0;
119
120     tbb::flow::graph g;
121     counting_async_node_type a(g, tbb::flow::serial, counting_async_body() );
122
123     const int R = 3;
124     std::vector< harness_counting_receiver<int> > r(R, harness_counting_receiver<int>(g));
125
126     for (int i = 0; i < R; ++i) {
127 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
128         tbb::flow::make_edge(a, r[i]);
129 #else
130         tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[i] );
131 #endif
132     }
133
134     REMARK( "One body execution\n" );
135     a.try_put(-1);
136     for (int i = 0; i < N; ++i) {
137        a.try_put(i);
138     }
139     g.wait_for_all();
140     // should be canceled with only 1 item reaching the async_body and the counting receivers
141     // and N items left in the node's queue
142     ASSERT( g.is_cancelled() == true, "task group not canceled" );
143
144     counting_async_body b1 = tbb::flow::copy_body<counting_async_body>(a);
145     ASSERT( int(async_body_exec_count) == int(b1.my_async_body_exec_count), "body and global body counts are different" );
146     ASSERT( int(async_body_exec_count) == 1, "global body execution count not 1"  );
147     for (int i = 0; i < R; ++i) {
148         ASSERT( int(r[i].my_count) == 1, "counting receiver count not 1" );
149     }
150
151     // should clear the async_node queue, but retain its local count at 1 and keep all edges
152     g.reset(tbb::flow::rf_reset_protocol);
153
154     REMARK( "N body executions\n" );
155     for (int i = 0; i < N; ++i) {
156        a.try_put(i);
157     }
158     g.wait_for_all();
159     ASSERT( g.is_cancelled() == false, "task group not canceled" );
160
161     // a total of N+1 items should have passed through the node body
162     // the local body count should also be N+1
163     // and the counting receivers should all have a count of N+1
164     counting_async_body b2 = tbb::flow::copy_body<counting_async_body>(a);
165     ASSERT( int(async_body_exec_count) == int(b2.my_async_body_exec_count), "local and global body execution counts are different" );
166     REMARK( "async_body_exec_count==%d\n", int(async_body_exec_count) );
167     ASSERT( int(async_body_exec_count) == N+1, "globcal body execution count not N+1"  );
168     for (int i = 0; i < R; ++i) {
169         ASSERT( int(r[i].my_count) == N+1, "counting receiver has not received N+1 items" );
170     }
171
172     REMARK( "N body executions with new bodies\n" );
173     // should clear the async_node queue and reset its local count to 0, but keep all edges
174     g.reset(tbb::flow::rf_reset_bodies);
175     for (int i = 0; i < N; ++i) {
176        a.try_put(i);
177     }
178     g.wait_for_all();
179     ASSERT( g.is_cancelled() == false, "task group not canceled" );
180
181     // a total of 2N+1 items should have passed through the node body
182     // the local body count should be N
183     // and the counting receivers should all have a count of 2N+1
184     counting_async_body b3 = tbb::flow::copy_body<counting_async_body>(a);
185     ASSERT( int(async_body_exec_count) == 2*N+1, "global body execution count not 2N+1"  );
186     ASSERT( int(b3.my_async_body_exec_count) == N, "local body execution count not N"  );
187     for (int i = 0; i < R; ++i) {
188         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
189     }
190
191     // should clear the async_node queue and keep its local count at N and remove all edges
192     REMARK( "N body executions with no edges\n" );
193     g.reset(tbb::flow::rf_clear_edges);
194     for (int i = 0; i < N; ++i) {
195        a.try_put(i);
196     }
197     g.wait_for_all();
198     ASSERT( g.is_cancelled() == false, "task group not canceled" );
199
200     // a total of 3N+1 items should have passed through the node body
201     // the local body count should now be 2*N
202     // and the counting receivers should remain at a count of 2N+1
203     counting_async_body b4 = tbb::flow::copy_body<counting_async_body>(a);
204     ASSERT( int(async_body_exec_count) == 3*N+1, "global body execution count not 3N+1"  );
205     ASSERT( int(b4.my_async_body_exec_count) == 2*N, "local body execution count not 2N"  );
206     for (int i = 0; i < R; ++i) {
207         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
208     }
209
210     // put back 1 edge to receiver 0
211     REMARK( "N body executions with 1 edge\n" );
212 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
213     tbb::flow::make_edge(a, r[0]);
214 #else
215     tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[0] );
216 #endif
217     for (int i = 0; i < N; ++i) {
218        a.try_put(i);
219     }
220     g.wait_for_all();
221     ASSERT( g.is_cancelled() == false, "task group not canceled" );
222
223     // a total of 4N+1 items should have passed through the node body
224     // the local body count should now be 3*N
225     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
226     counting_async_body b5 = tbb::flow::copy_body<counting_async_body>(a);
227     ASSERT( int(async_body_exec_count) == 4*N+1, "global body execution count not 4N+1"  );
228     ASSERT( int(b5.my_async_body_exec_count) == 3*N, "local body execution count not 3N"  );
229     ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
230     for (int i = 1; i < R; ++i) {
231         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
232     }
233
234     // should clear the async_node queue and keep its local count at N and remove all edges
235     REMARK( "N body executions with no edges and new body\n" );
236     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
237     for (int i = 0; i < N; ++i) {
238        a.try_put(i);
239     }
240     g.wait_for_all();
241     ASSERT( g.is_cancelled() == false, "task group not canceled" );
242
243     // a total of 4N+1 items should have passed through the node body
244     // the local body count should now be 3*N
245     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
246     counting_async_body b6 = tbb::flow::copy_body<counting_async_body>(a);
247     ASSERT( int(async_body_exec_count) == 5*N+1, "global body execution count not 5N+1"  );
248     ASSERT( int(b6.my_async_body_exec_count) == N, "local body execution count not N"  );
249     ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
250     for (int i = 1; i < R; ++i) {
251         ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
252     }
253 }
254
255 template< typename Input, typename Output >
256 class async_activity : NoAssign {
257 public:
258     typedef Input input_type;
259     typedef Output output_type;
260     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
261     typedef typename async_node_type::gateway_type gateway_type;
262
263     struct work_type {
264         input_type input;
265         gateway_type* gateway;
266     };
267
268     class ServiceThreadBody {
269     public:
270         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
271
272         void operator()() {
273             my_activity->process();
274         }
275     private:
276         async_activity* my_activity;
277     };
278
279     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
280         : my_expected_items(expected_items), my_sleep_time(sleep_time) {
281         is_active = !deferred;
282         my_quit = false;
283         tbb::tbb_thread( ServiceThreadBody( this ) ).swap( my_service_thread );
284     }
285
286 private:
287
288     async_activity( const async_activity& )
289         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) {
290         is_active = true;
291     }
292
293 public:
294     ~async_activity() {
295         stop();
296         my_service_thread.join();
297     }
298
299     void submit( const input_type &input, gateway_type& gateway ) {
300         work_type work = { input, &gateway};
301         my_work_queue.push( work );
302     }
303
304     void process() {
305         do {
306             work_type work;
307             if( is_active && my_work_queue.try_pop( work ) ) {
308                 Harness::Sleep(my_sleep_time);
309                 ++async_activity_processed_msg_count;
310                 output_type output;
311                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
312                 wrapper_helper<output_type, output_type>::check(work.input, output);
313                 work.gateway->try_put(output);
314                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
315                      int(async_activity_processed_msg_count) == my_expected_items ) {
316                     work.gateway->release_wait();
317                 }
318             }
319         } while( my_quit == false || !my_work_queue.empty());
320     }
321
322     void stop() {
323         my_quit = true;
324     }
325
326     void activate() {
327         is_active = true;
328     }
329
330     bool should_reserve_each_time() {
331         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
332             return true;
333         else
334             return false;
335     }
336
337 private:
338
339     const int my_expected_items;
340     const int my_sleep_time;
341     tbb::atomic< bool > is_active;
342
343     tbb::concurrent_queue< work_type > my_work_queue;
344
345     tbb::atomic< bool > my_quit;
346
347     tbb::tbb_thread my_service_thread;
348 };
349
350 template<typename Input, typename Output>
351 struct basic_test {
352     typedef Input input_type;
353     typedef Output output_type;
354     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
355     typedef typename async_node_type::gateway_type gateway_type;
356
357     class start_body_type {
358         typedef Input input_type;
359     public:
360         input_type operator()( int input ) {
361             return input_type(input);
362         }
363     };
364
365 #if !__TBB_CPP11_LAMBDAS_PRESENT
366     class async_body_type {
367         typedef Input input_type;
368         typedef Output output_type;
369         typedef tbb::flow::async_node< input_type, output_type > async_node_type;
370         typedef typename async_node_type::gateway_type gateway_type;
371     public:
372         typedef async_activity<input_type, output_type> async_activity_type;
373
374         async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
375
376         async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
377
378         void operator()( const input_type &input, gateway_type& gateway ) {
379             ++async_body_exec_count;
380             my_async_activity->submit( input, gateway);
381             if ( my_async_activity->should_reserve_each_time() )
382                 gateway.reserve_wait();
383         }
384
385     private:
386         async_activity_type* my_async_activity;
387     };
388 #endif
389
390     class end_body_type {
391         typedef Output output_type;
392     public:
393         void operator()( const output_type &input ) {
394             ++end_body_exec_count;
395             output_type output;
396             wrapper_helper<output_type, output_type>::check(input, output);
397         }
398     };
399
400     basic_test() {}
401
402 public:
403
404     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
405         async_activity<input_type, output_type> my_async_activity(async_expected_items);
406         tbb::flow::graph g;
407         tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
408 #if __TBB_CPP11_LAMBDAS_PRESENT
409         async_node_type offload_node(g, tbb::flow::unlimited, [&] (const input_type &input, gateway_type& gateway) {
410             ++async_body_exec_count;
411             my_async_activity.submit(input, gateway);
412             if(my_async_activity.should_reserve_each_time())
413                 gateway.reserve_wait();
414         } );
415 #else
416         async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
417 #endif
418
419         tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
420
421         tbb::flow::make_edge( start_node, offload_node );
422 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
423         tbb::flow::make_edge( offload_node, end_node );
424 #else
425         tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
426 #endif
427         async_body_exec_count = 0;
428         async_activity_processed_msg_count = 0;
429         end_body_exec_count = 0;
430
431         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
432             offload_node.gateway().reserve_wait();
433         }
434         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
435             start_node.try_put(i);
436         }
437         g.wait_for_all();
438         ASSERT( async_body_exec_count == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
439         ASSERT( async_activity_processed_msg_count == NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
440         ASSERT( end_body_exec_count == NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
441         REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
442                 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
443         return Harness::Done;
444     }
445
446 };
447
448 int test_copy_ctor() {
449     const int N = NUMBER_OF_MSGS;
450     async_body_exec_count = 0;
451
452     tbb::flow::graph g;
453
454     harness_counting_receiver<int> r1(g);
455     harness_counting_receiver<int> r2(g);
456
457     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_body() );
458     counting_async_node_type b(a);
459 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
460     tbb::flow::make_edge(a, r1);
461     tbb::flow::make_edge(b, r2);
462 #else
463     tbb::flow::make_edge(tbb::flow::output_port<0>(a), r1);
464     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);
465 #endif
466
467     for (int i = 0; i < N; ++i) {
468        a.try_put(i);
469     }
470     g.wait_for_all();
471
472     REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
473     REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
474     ASSERT( int(async_body_exec_count) == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
475     ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
476     ASSERT( int(r2.my_count) == 0, "counting receiver r2 has not received 0 items" );
477
478     for (int i = 0; i < N; ++i) {
479        b.try_put(i);
480     }
481     g.wait_for_all();
482
483     REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
484     REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
485     ASSERT( int(async_body_exec_count) == 2*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
486     ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
487     ASSERT( int(r2.my_count) == N, "counting receiver r2 has not received N items" );
488     return Harness::Done;
489 }
490
491 tbb::atomic<int> main_tid_count;
492
493 template<typename Input, typename Output>
494 struct spin_test {
495     typedef Input input_type;
496     typedef Output output_type;
497     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
498     typedef typename async_node_type::gateway_type gateway_type;
499
500     class start_body_type {
501         typedef Input input_type;
502     public:
503         input_type operator()( int input ) {
504             return input_type(input);
505         }
506     };
507
508 #if !__TBB_CPP11_LAMBDAS_PRESENT
509     class async_body_type {
510         typedef Input input_type;
511         typedef Output output_type;
512         typedef tbb::flow::async_node< input_type, output_type > async_node_type;
513         typedef typename async_node_type::gateway_type gateway_type;
514     public:
515         typedef async_activity<input_type, output_type> async_activity_type;
516
517         async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
518
519         async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
520
521         void operator()(const input_type &input, gateway_type& gateway) {
522             ++async_body_exec_count;
523             my_async_activity->submit(input, gateway);
524             if(my_async_activity->should_reserve_each_time())
525                 gateway.reserve_wait();
526         }
527
528     private:
529         async_activity_type* my_async_activity;
530     };
531 #endif
532
533     class end_body_type {
534         typedef Output output_type;
535         tbb::tbb_thread::id my_main_tid;
536         Harness::SpinBarrier *my_barrier;
537     public:
538         end_body_type(tbb::tbb_thread::id t, Harness::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
539
540         void operator()( const output_type & ) {
541             ++end_body_exec_count;
542             if (tbb::this_tbb_thread::get_id() == my_main_tid) {
543                ++main_tid_count;
544             }
545             my_barrier->timed_wait_noerror(10);
546         }
547     };
548
549     spin_test() {}
550
551     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
552         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
553         Harness::SpinBarrier spin_barrier(nthreads);
554         tbb::flow::graph g;
555         tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
556 #if __TBB_CPP11_LAMBDAS_PRESENT
557         async_node_type offload_node(g, tbb::flow::unlimited, [&](const input_type &input, gateway_type& gateway) {
558             ++async_body_exec_count;
559             my_async_activity.submit(input, gateway);
560             if(my_async_activity.should_reserve_each_time())
561                 gateway.reserve_wait();
562         });
563 #else
564         async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
565 #endif
566         tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type(tbb::this_tbb_thread::get_id(), spin_barrier) );
567         tbb::flow::make_edge( start_node, offload_node );
568 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
569         tbb::flow::make_edge( offload_node, end_node );
570 #else
571         tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
572 #endif
573         async_body_exec_count = 0;
574         async_activity_processed_msg_count = 0;
575         end_body_exec_count = 0;
576         main_tid_count = 0;
577
578         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
579             offload_node.gateway().reserve_wait();
580         }
581         for (int i = 0; i < nthreads*NUMBER_OF_MSGS; ++i) {
582             start_node.try_put(i);
583         }
584         g.wait_for_all();
585         ASSERT( async_body_exec_count == nthreads*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
586         ASSERT( async_activity_processed_msg_count == nthreads*NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
587         ASSERT( end_body_exec_count == nthreads*NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
588         ASSERT_WARNING( main_tid_count != 0, "Main thread did not participate in end_body tasks");
589         REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
590                 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
591         return Harness::Done;
592     }
593
594 };
595
596 void test_for_spin_avoidance() {
597     spin_test<int, int>::run(4);
598 }
599
600 template< typename Input, typename Output >
601 int run_tests() {
602     basic_test<Input, Output>::run();
603     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
604     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
605     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
606     return Harness::Done;
607 }
608
609 #include "tbb/parallel_for.h"
610 template<typename Input, typename Output>
611 class equeueing_on_inner_level {
612     typedef Input input_type;
613     typedef Output output_type;
614     typedef async_activity<input_type, output_type> async_activity_type;
615     typedef tbb::flow::async_node<Input, Output> async_node_type;
616     typedef typename async_node_type::gateway_type gateway_type;
617
618     class start_body_type {
619     public:
620         input_type operator() ( int input ) {
621             return input_type( input);
622         }
623     };
624
625     class async_body_type {
626     public:
627         async_body_type( async_activity_type& activity ) : my_async_activity(&activity) {}
628
629         void operator() ( const input_type &input, gateway_type& gateway ) {
630             gateway.reserve_wait();
631             my_async_activity->submit( input, gateway );
632         }
633     private:
634         async_activity_type* my_async_activity;
635     };
636
637     class end_body_type {
638     public:
639         void operator()( output_type ) {}
640     };
641
642     class body_graph_with_async {
643     public:
644         body_graph_with_async( Harness::SpinBarrier& barrier, async_activity_type& activity )
645             : spin_barrier(&barrier), my_async_activity(&activity) {}
646
647         void operator()(int) const {
648             tbb::flow::graph g;
649             tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
650
651             async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( *my_async_activity ) );
652
653             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
654
655             tbb::flow::make_edge( start_node, offload_node );
656             tbb::flow::make_edge( offload_node, end_node );
657
658             start_node.try_put(1);
659
660             spin_barrier->wait();
661
662             my_async_activity->activate();
663
664             g.wait_for_all();
665         }
666
667     private:
668         Harness::SpinBarrier* spin_barrier;
669         async_activity_type* my_async_activity;
670     };
671
672
673 public:
674     static int run ()
675     {
676         const int nthreads = tbb::this_task_arena::max_concurrency();
677         Harness::SpinBarrier spin_barrier( nthreads );
678
679         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
680
681         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
682         return Harness::Done;
683     }
684 };
685
686 int run_test_equeueing_on_inner_level() {
687     equeueing_on_inner_level<int, int>::run();
688     return Harness::Done;
689 }
690
691 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
692 #include <array>
693 #include <thread>
694
695 template<typename NodeType>
696 class AsyncActivity {
697 public:
698     using gateway_t = typename NodeType::gateway_type;
699
700     struct work_type {
701         int input;
702         gateway_t* gateway;
703     };
704
705     AsyncActivity(size_t limit) : thr([this]() {
706         while(!end_of_work()) {
707             work_type w;
708             while( my_q.try_pop(w) ) {
709                 int res = do_work(w.input);
710                 w.gateway->try_put(res);
711                 w.gateway->release_wait();
712                 ++c;
713             }
714         }
715     }), stop_limit(limit), c(0) {}
716
717     void submit(int i, gateway_t* gateway) {
718         work_type w = {i, gateway};
719         gateway->reserve_wait();
720         my_q.push(w);
721     }
722
723     void wait_for_all() { thr.join(); }
724
725 private:
726     bool end_of_work() { return c >= stop_limit; }
727
728     int do_work(int& i) { return i + i; }
729
730     tbb::concurrent_queue<work_type> my_q;
731     tbb::tbb_thread thr;
732     size_t stop_limit;
733     size_t c;
734 };
735
736 void test_follows() {
737     using namespace tbb::flow;
738
739     using input_t = int;
740     using output_t = int;
741     using node_t = async_node<input_t, output_t>;
742
743     graph g;
744
745     AsyncActivity<node_t> async_activity(3);
746
747     std::array<broadcast_node<input_t>, 3> preds = {
748         broadcast_node<input_t>(g),
749         broadcast_node<input_t>(g),
750         broadcast_node<input_t>(g)
751     };
752
753     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
754         async_activity.submit(input, &gtw);
755     });
756
757     buffer_node<output_t> buf(g);
758     make_edge(node, buf);
759
760     for(auto& pred: preds) {
761         pred.try_put(1);
762     }
763
764     g.wait_for_all();
765     async_activity.wait_for_all();
766
767     output_t storage;
768     ASSERT((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
769             "Not exact edge quantity was made");
770 }
771
772 void test_precedes() {
773     using namespace tbb::flow;
774
775     using input_t = int;
776     using output_t = int;
777     using node_t = async_node<input_t, output_t>;
778
779     graph g;
780
781     AsyncActivity<node_t> async_activity(1);
782
783     std::array<buffer_node<input_t>, 2> successors = {
784         buffer_node<input_t>(g),
785         buffer_node<input_t>(g)
786     };
787
788     broadcast_node<input_t> start(g);
789
790     node_t node(precedes(successors[0], successors[1]), unlimited, [&](int input, node_t::gateway_type& gtw) {
791         async_activity.submit(input, &gtw);
792     });
793
794     make_edge(start, node);
795
796     start.try_put(1);
797
798     g.wait_for_all();
799     async_activity.wait_for_all();
800
801     for(auto& successor : successors) {
802         output_t storage;
803         ASSERT(successor.try_get(storage) && !successor.try_get(storage),
804                "Not exact edge quantity was made");
805     }
806 }
807
808 void test_follows_and_precedes_api() {
809     test_follows();
810     test_precedes();
811 }
812 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
813
814 int TestMain() {
815     tbb::task_scheduler_init init(4);
816     run_tests<int, int>();
817     run_tests<minimal_type, minimal_type>();
818     run_tests<int, minimal_type>();
819
820     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
821
822     test_reset();
823     test_copy_ctor();
824     test_for_spin_avoidance();
825     run_test_equeueing_on_inner_level();
826 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
827     test_follows_and_precedes_api();
828 #endif
829     return Harness::Done;
830 }
831