Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / src / test / test_continue_node.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #if __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19 #endif
20
21 #include "harness.h"
22 #include "harness_graph.h"
23
24 #include "tbb/flow_graph.h"
25 #include "tbb/task_scheduler_init.h"
26 #include "test_follows_and_precedes_api.h"
27
28 #define N 1000
29 #define MAX_NODES 4
30 #define C 8
31
32 struct empty_no_assign : private NoAssign {
33    empty_no_assign() {}
34    empty_no_assign( int ) {}
35    operator int() { return 0; }
36 };
37
38 // A class to use as a fake predecessor of continue_node
39 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
40 {
41     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
42     // Define implementations of virtual methods that are abstract in the base class
43     bool register_successor( successor_type& ) __TBB_override { return false; }
44     bool remove_successor( successor_type& )   __TBB_override { return false; }
45 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
46     typedef tbb::flow::sender<tbb::flow::continue_msg>::built_successors_type built_successors_type;
47     built_successors_type bst;
48     built_successors_type &built_successors() __TBB_override { return bst; }
49     void internal_add_built_successor( successor_type &) __TBB_override { }
50     void internal_delete_built_successor( successor_type &) __TBB_override { }
51     void copy_successors(successor_list_type &) __TBB_override {}
52     size_t successor_count() __TBB_override {return 0;}
53 #endif
54 };
55
56 template< typename InputType >
57 struct parallel_puts : private NoAssign {
58
59     tbb::flow::receiver< InputType > * const my_exe_node;
60
61     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
62
63     void operator()( int ) const  {
64         for ( int i = 0; i < N; ++i ) {
65             // the nodes will accept all puts
66             ASSERT( my_exe_node->try_put( InputType() ) == true, NULL );
67         }
68     }
69
70 };
71
72 template< typename OutputType >
73 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
74     fake_continue_sender fake_sender;
75     for (size_t i = 0; i < N; ++i) {
76         n.register_predecessor( fake_sender );
77     }
78
79     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
80         std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
81         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
82
83         for (size_t r = 0; r < num_receivers; ++r ) {
84             tbb::flow::make_edge( n, receivers[r] );
85         }
86 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
87         ASSERT(n.successor_count() == (size_t)num_receivers, NULL);
88         ASSERT(n.predecessor_count() == 0, NULL);
89         typename tbb::flow::continue_node<OutputType>::successor_list_type my_succs;
90         typedef typename tbb::flow::continue_node<OutputType>::successor_list_type::iterator sv_iter_type;
91         n.copy_successors(my_succs);
92         ASSERT(my_succs.size() == num_receivers, NULL);
93 #endif
94
95         NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
96         g.wait_for_all();
97
98         // 2) the nodes will receive puts from multiple predecessors simultaneously,
99         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
100         ASSERT( (int)ec == p, NULL );
101         for (size_t r = 0; r < num_receivers; ++r ) {
102             size_t c = receivers[r].my_count;
103             // 3) the nodes will send to multiple successors.
104             ASSERT( (int)c == p, NULL );
105         }
106
107 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
108         for(sv_iter_type si=my_succs.begin(); si != my_succs.end(); ++si) {
109             tbb::flow::remove_edge( n, **si );
110         }
111 #else
112         for (size_t r = 0; r < num_receivers; ++r ) {
113             tbb::flow::remove_edge( n, receivers[r] );
114         }
115 #endif
116     }
117 }
118
119 template< typename OutputType, typename Body >
120 void continue_nodes( Body body ) {
121     for (int p = 1; p < 2*MaxThread; ++p) {
122         tbb::flow::graph g;
123         tbb::flow::continue_node< OutputType > exe_node( g, body );
124         run_continue_nodes( p, g, exe_node);
125         exe_node.try_put(tbb::flow::continue_msg());
126         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
127         run_continue_nodes( p, g, exe_node_copy);
128     }
129 }
130
131 const size_t Offset = 123;
132 tbb::atomic<size_t> global_execute_count;
133
134 template< typename OutputType >
135 struct inc_functor {
136
137     tbb::atomic<size_t> local_execute_count;
138     inc_functor( ) { local_execute_count = 0; }
139     inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
140     void operator=(const inc_functor &f) { local_execute_count = f.local_execute_count; }
141
142     OutputType operator()( tbb::flow::continue_msg ) {
143        ++global_execute_count;
144        ++local_execute_count;
145        return OutputType();
146     }
147
148 };
149
150 template< typename OutputType >
151 void continue_nodes_with_copy( ) {
152
153     for (int p = 1; p < 2*MaxThread; ++p) {
154         tbb::flow::graph g;
155         inc_functor<OutputType> cf;
156         cf.local_execute_count = Offset;
157         global_execute_count = Offset;
158
159         tbb::flow::continue_node< OutputType > exe_node( g, cf );
160         fake_continue_sender fake_sender;
161         for (size_t i = 0; i < N; ++i) {
162            exe_node.register_predecessor( fake_sender );
163         }
164
165         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
166             std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
167
168             for (size_t r = 0; r < num_receivers; ++r ) {
169                 tbb::flow::make_edge( exe_node, receivers[r] );
170             }
171
172             NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
173             g.wait_for_all();
174
175             // 2) the nodes will receive puts from multiple predecessors simultaneously,
176             for (size_t r = 0; r < num_receivers; ++r ) {
177                 size_t c = receivers[r].my_count;
178                 // 3) the nodes will send to multiple successors.
179                 ASSERT( (int)c == p, NULL );
180             }
181             for (size_t r = 0; r < num_receivers; ++r ) {
182                 tbb::flow::remove_edge( exe_node, receivers[r] );
183             }
184         }
185
186         // validate that the local body matches the global execute_count and both are correct
187         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
188         const size_t expected_count = p*MAX_NODES + Offset;
189         size_t global_count = global_execute_count;
190         size_t inc_count = body_copy.local_execute_count;
191         ASSERT( global_count == expected_count && global_count == inc_count, NULL );
192         g.reset(tbb::flow::rf_reset_bodies);
193         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
194         inc_count = body_copy.local_execute_count;
195         ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
196
197     }
198 }
199
200 template< typename OutputType >
201 void run_continue_nodes() {
202     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
203     #if __TBB_CPP11_LAMBDAS_PRESENT
204     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
205     #endif
206     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
207     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
208     continue_nodes_with_copy<OutputType>();
209 }
210
211 //! Tests limited concurrency cases for nodes that accept data messages
212 void test_concurrency(int num_threads) {
213     tbb::task_scheduler_init init(num_threads);
214     run_continue_nodes<tbb::flow::continue_msg>();
215     run_continue_nodes<int>();
216     run_continue_nodes<empty_no_assign>();
217 }
218 /*
219  * Connection of two graphs is not currently supported, but works to some limited extent.
220  * This test is included to check for backward compatibility. It checks that a continue_node
221  * with predecessors in two different graphs receives the required
222  * number of continue messages before it executes.
223  */
224 using namespace tbb::flow;
225
226 struct add_to_counter {
227     int* counter;
228     add_to_counter(int& var):counter(&var){}
229     void operator()(continue_msg){*counter+=1;}
230 };
231
232 void test_two_graphs(){
233     int count=0;
234
235     //graph g with broadcast_node and continue_node
236     graph g;
237     broadcast_node<continue_msg> start_g(g);
238     continue_node<continue_msg> first_g(g, add_to_counter(count));
239
240     //graph h with broadcast_node
241     graph h;
242     broadcast_node<continue_msg> start_h(h);
243
244     //making two edges to first_g from the two graphs
245     make_edge(start_g,first_g);
246     make_edge(start_h, first_g);
247
248     //two try_puts from the two graphs
249     start_g.try_put(continue_msg());
250     start_h.try_put(continue_msg());
251     g.wait_for_all();
252     ASSERT(count==1, "Not all continue messages received");
253
254     //two try_puts from the graph that doesn't contain the node
255     count=0;
256     start_h.try_put(continue_msg());
257     start_h.try_put(continue_msg());
258     g.wait_for_all();
259     ASSERT(count==1, "Not all continue messages received -1");
260
261     //only one try_put
262     count=0;
263     start_g.try_put(continue_msg());
264     g.wait_for_all();
265     ASSERT(count==0, "Node executed without waiting for all predecessors");
266 }
267
268 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
269 void test_extract() {
270     int my_count = 0;
271     tbb::flow::continue_msg cm;
272     tbb::flow::graph g;
273     tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
274     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
275     tbb::flow::continue_node<tbb::flow::continue_msg>  c0(g, add_to_counter(my_count));
276     tbb::flow::queue_node<tbb::flow::continue_msg> q0(g);
277
278     tbb::flow::make_edge(b0, c0);
279     tbb::flow::make_edge(b1, c0);
280     tbb::flow::make_edge(c0, q0);
281     for( int i = 0; i < 2; ++i ) {
282         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
283         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
284         ASSERT(c0.predecessor_count() == 2 && c0.successor_count() == 1, "c0 has incorrect counts");
285         ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
286
287         /* b0         */
288         /*   \        */
289         /*    c0 - q0 */
290         /*   /        */
291         /* b1         */
292
293         b0.try_put(tbb::flow::continue_msg());
294         g.wait_for_all();
295         ASSERT(my_count == 0, "continue_node fired too soon");
296         b1.try_put(tbb::flow::continue_msg());
297         g.wait_for_all();
298         ASSERT(my_count == 1, "continue_node didn't fire");
299         ASSERT(q0.try_get(cm), "continue_node didn't forward");
300
301         b0.extract();
302
303         /* b0         */
304         /*            */
305         /*    c0 - q0 */
306         /*   /        */
307         /* b1         */
308
309         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
310         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
311         ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 1, "c0 has incorrect counts");
312         ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
313         b0.try_put(tbb::flow::continue_msg());
314         b0.try_put(tbb::flow::continue_msg());
315         g.wait_for_all();
316         ASSERT(my_count == 1, "b0 messages being forwarded to continue_node even though it is disconnected");
317         b1.try_put(tbb::flow::continue_msg());
318         g.wait_for_all();
319         ASSERT(my_count == 2, "continue_node didn't fire though it has only one predecessor");
320         ASSERT(q0.try_get(cm), "continue_node didn't forward second time");
321
322         c0.extract();
323
324         /* b0         */
325         /*            */
326         /*    c0   q0 */
327         /*            */
328         /* b1         */
329
330         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
331         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
332         ASSERT(c0.predecessor_count() == 0 && c0.successor_count() == 0, "c0 has incorrect counts");
333         ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
334         b0.try_put(tbb::flow::continue_msg());
335         b0.try_put(tbb::flow::continue_msg());
336         b1.try_put(tbb::flow::continue_msg());
337         b1.try_put(tbb::flow::continue_msg());
338         g.wait_for_all();
339         ASSERT(my_count == 2, "continue didn't fire though it has only one predecessor");
340         ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
341         make_edge(b0, c0);
342
343         /* b0         */
344         /*   \        */
345         /*    c0   q0 */
346         /*            */
347         /* b1         */
348
349         ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
350         ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
351         ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 0, "c0 has incorrect counts");
352         ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
353
354         b0.try_put(tbb::flow::continue_msg());
355         g.wait_for_all();
356
357         ASSERT(my_count == 3, "continue didn't fire though it has only one predecessor");
358         ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
359
360         tbb::flow::make_edge(b1, c0);
361         tbb::flow::make_edge(c0, q0);
362         my_count = 0;
363     }
364 }
365 #endif
366
367 struct lightweight_policy_body : NoAssign {
368     const tbb::tbb_thread::id my_thread_id;
369     tbb::atomic<size_t> my_count;
370
371     lightweight_policy_body() : my_thread_id(tbb::this_tbb_thread::get_id()) {
372         my_count = 0;
373     }
374     void operator()(tbb::flow::continue_msg) {
375         ++my_count;
376         tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id();
377         ASSERT(body_thread_id == my_thread_id, "Body executed as not lightweight");
378     }
379 };
380
381 void test_lightweight_policy() {
382     tbb::flow::graph g;
383     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node1(g, lightweight_policy_body());
384     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node2(g, lightweight_policy_body());
385
386     tbb::flow::make_edge(node1, node2);
387     const size_t n = 10;
388     for(size_t i = 0; i < n; ++i) {
389         node1.try_put(tbb::flow::continue_msg());
390     }
391     g.wait_for_all();
392
393     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
394     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
395     ASSERT(body1.my_count == n, "Body of the first node needs to be executed N times");
396     ASSERT(body2.my_count == n, "Body of the second node needs to be executed N times");
397 }
398
399 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
400 #include <array>
401 #include <vector>
402 void test_follows_and_precedes_api() {
403     using msg_t = tbb::flow::continue_msg;
404
405     std::array<msg_t, 3> messages_for_follows = { msg_t(), msg_t(), msg_t() };
406     std::vector<msg_t> messages_for_precedes  = { msg_t() };
407
408     auto pass_through = [](const msg_t& msg) { return msg; };
409
410     follows_and_precedes_testing::test_follows
411         <msg_t, tbb::flow::continue_node<msg_t>>
412         (messages_for_follows, pass_through);
413
414     follows_and_precedes_testing::test_precedes
415         <msg_t, tbb::flow::continue_node<msg_t>>
416         (messages_for_precedes, pass_through);
417 }
418 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
419
420 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
421
422 template <typename ExpectedType, typename Body>
423 void test_deduction_guides_common(Body body) {
424     using namespace tbb::flow;
425     graph g;
426
427     continue_node c1(g, body);
428     static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
429
430     continue_node c2(g, body, lightweight());
431     static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
432
433     continue_node c3(g, 5, body);
434     static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
435
436     continue_node c4(g, 5, body, lightweight());
437     static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
438
439 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
440     continue_node c5(g, body, node_priority_t(5));
441     static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
442
443     continue_node c6(g, body, lightweight(), node_priority_t(5));
444     static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
445
446     continue_node c7(g, 5, body, node_priority_t(5));
447     static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
448
449     continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
450     static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
451 #endif
452
453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454     broadcast_node<continue_msg> b(g);
455
456     continue_node c9(follows(b), body);
457     static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
458
459     continue_node c10(follows(b), body, lightweight());
460     static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
461
462     continue_node c11(follows(b), 5, body);
463     static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
464
465     continue_node c12(follows(b), 5, body, lightweight());
466     static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
467
468 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
469     continue_node c13(follows(b), body, node_priority_t(5));
470     static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
471
472     continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
473     static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
474
475     continue_node c15(follows(b), 5, body, node_priority_t(5));
476     static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
477
478     continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
479     static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
480 #endif
481 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
482
483     continue_node c17(c1);
484     static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
485 }
486
487 int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
488 void continue_void_body_f(const tbb::flow::continue_msg&) {}
489
490 void test_deduction_guides() {
491     using tbb::flow::continue_msg;
492     test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
493     test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
494
495     test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
496     test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
497
498     test_deduction_guides_common<int>(continue_body_f);
499     test_deduction_guides_common<continue_msg>(continue_void_body_f);
500 }
501
502 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
503
504 // TODO: use pass_through from test_function_node instead
505 template<typename T>
506 struct passing_body {
507     T operator()(const T& val) {
508         return val;
509     }
510 };
511
512 /*
513     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
514     because there used to be a bug when make_edge(node, continue_node)
515     did not update continue_node's predecesosor threshold
516     since the specialization of node's successor_cache for a continue_node was not chosen.
517 */
518 void test_successor_cache_specialization() {
519     using namespace tbb::flow;
520
521     graph g;
522
523     broadcast_node<continue_msg> node_with_default_mutex_type(g);
524     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
525
526     continue_node<continue_msg> node(g, passing_body<continue_msg>());
527
528     make_edge(node_with_default_mutex_type, node);
529     make_edge(node_with_non_default_mutex_type, node);
530
531     buffer_node<continue_msg> buf(g);
532
533     make_edge(node, buf);
534
535     node_with_default_mutex_type.try_put(continue_msg());
536     node_with_non_default_mutex_type.try_put(continue_msg());
537
538     g.wait_for_all();
539
540     continue_msg storage;
541     ASSERT((buf.try_get(storage) && !buf.try_get(storage)),
542             "Wrong number of messages is passed via continue_node");
543 }
544
545 int TestMain() {
546     if( MinThread<1 ) {
547         REPORT("number of threads must be positive\n");
548         exit(1);
549     }
550     for( int p=MinThread; p<=MaxThread; ++p ) {
551        test_concurrency(p);
552    }
553    test_two_graphs();
554 #if __TBB_PREVIEW_LIGHTWEIGHT_POLICY
555    test_lightweight_policy();
556 #endif
557 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
558    test_extract();
559 #endif
560 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
561     test_follows_and_precedes_api();
562 #endif
563 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
564     test_deduction_guides();
565 #endif
566    test_successor_cache_specialization();
567    return Harness::Done;
568 }