2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
22 #include "harness_graph.h"
24 #include "tbb/flow_graph.h"
25 #include "tbb/task_scheduler_init.h"
26 #include "test_follows_and_precedes_api.h"
32 struct empty_no_assign : private NoAssign {
34 empty_no_assign( int ) {}
35 operator int() { return 0; }
38 // A class to use as a fake predecessor of continue_node
39 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
41 typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
42 // Define implementations of virtual methods that are abstract in the base class
43 bool register_successor( successor_type& ) __TBB_override { return false; }
44 bool remove_successor( successor_type& ) __TBB_override { return false; }
45 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
46 typedef tbb::flow::sender<tbb::flow::continue_msg>::built_successors_type built_successors_type;
47 built_successors_type bst;
48 built_successors_type &built_successors() __TBB_override { return bst; }
49 void internal_add_built_successor( successor_type &) __TBB_override { }
50 void internal_delete_built_successor( successor_type &) __TBB_override { }
51 void copy_successors(successor_list_type &) __TBB_override {}
52 size_t successor_count() __TBB_override {return 0;}
56 template< typename InputType >
57 struct parallel_puts : private NoAssign {
59 tbb::flow::receiver< InputType > * const my_exe_node;
61 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
63 void operator()( int ) const {
64 for ( int i = 0; i < N; ++i ) {
65 // the nodes will accept all puts
66 ASSERT( my_exe_node->try_put( InputType() ) == true, NULL );
72 template< typename OutputType >
73 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
74 fake_continue_sender fake_sender;
75 for (size_t i = 0; i < N; ++i) {
76 n.register_predecessor( fake_sender );
79 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
80 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
81 harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
83 for (size_t r = 0; r < num_receivers; ++r ) {
84 tbb::flow::make_edge( n, receivers[r] );
86 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
87 ASSERT(n.successor_count() == (size_t)num_receivers, NULL);
88 ASSERT(n.predecessor_count() == 0, NULL);
89 typename tbb::flow::continue_node<OutputType>::successor_list_type my_succs;
90 typedef typename tbb::flow::continue_node<OutputType>::successor_list_type::iterator sv_iter_type;
91 n.copy_successors(my_succs);
92 ASSERT(my_succs.size() == num_receivers, NULL);
95 NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
98 // 2) the nodes will receive puts from multiple predecessors simultaneously,
99 size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
100 ASSERT( (int)ec == p, NULL );
101 for (size_t r = 0; r < num_receivers; ++r ) {
102 size_t c = receivers[r].my_count;
103 // 3) the nodes will send to multiple successors.
104 ASSERT( (int)c == p, NULL );
107 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
108 for(sv_iter_type si=my_succs.begin(); si != my_succs.end(); ++si) {
109 tbb::flow::remove_edge( n, **si );
112 for (size_t r = 0; r < num_receivers; ++r ) {
113 tbb::flow::remove_edge( n, receivers[r] );
119 template< typename OutputType, typename Body >
120 void continue_nodes( Body body ) {
121 for (int p = 1; p < 2*MaxThread; ++p) {
123 tbb::flow::continue_node< OutputType > exe_node( g, body );
124 run_continue_nodes( p, g, exe_node);
125 exe_node.try_put(tbb::flow::continue_msg());
126 tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
127 run_continue_nodes( p, g, exe_node_copy);
131 const size_t Offset = 123;
132 tbb::atomic<size_t> global_execute_count;
134 template< typename OutputType >
137 tbb::atomic<size_t> local_execute_count;
138 inc_functor( ) { local_execute_count = 0; }
139 inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
140 void operator=(const inc_functor &f) { local_execute_count = f.local_execute_count; }
142 OutputType operator()( tbb::flow::continue_msg ) {
143 ++global_execute_count;
144 ++local_execute_count;
150 template< typename OutputType >
151 void continue_nodes_with_copy( ) {
153 for (int p = 1; p < 2*MaxThread; ++p) {
155 inc_functor<OutputType> cf;
156 cf.local_execute_count = Offset;
157 global_execute_count = Offset;
159 tbb::flow::continue_node< OutputType > exe_node( g, cf );
160 fake_continue_sender fake_sender;
161 for (size_t i = 0; i < N; ++i) {
162 exe_node.register_predecessor( fake_sender );
165 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
166 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
168 for (size_t r = 0; r < num_receivers; ++r ) {
169 tbb::flow::make_edge( exe_node, receivers[r] );
172 NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
175 // 2) the nodes will receive puts from multiple predecessors simultaneously,
176 for (size_t r = 0; r < num_receivers; ++r ) {
177 size_t c = receivers[r].my_count;
178 // 3) the nodes will send to multiple successors.
179 ASSERT( (int)c == p, NULL );
181 for (size_t r = 0; r < num_receivers; ++r ) {
182 tbb::flow::remove_edge( exe_node, receivers[r] );
186 // validate that the local body matches the global execute_count and both are correct
187 inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
188 const size_t expected_count = p*MAX_NODES + Offset;
189 size_t global_count = global_execute_count;
190 size_t inc_count = body_copy.local_execute_count;
191 ASSERT( global_count == expected_count && global_count == inc_count, NULL );
192 g.reset(tbb::flow::rf_reset_bodies);
193 body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
194 inc_count = body_copy.local_execute_count;
195 ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
200 template< typename OutputType >
201 void run_continue_nodes() {
202 harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
203 #if __TBB_CPP11_LAMBDAS_PRESENT
204 continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
206 continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
207 continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
208 continue_nodes_with_copy<OutputType>();
211 //! Tests limited concurrency cases for nodes that accept data messages
212 void test_concurrency(int num_threads) {
213 tbb::task_scheduler_init init(num_threads);
214 run_continue_nodes<tbb::flow::continue_msg>();
215 run_continue_nodes<int>();
216 run_continue_nodes<empty_no_assign>();
219 * Connection of two graphs is not currently supported, but works to some limited extent.
220 * This test is included to check for backward compatibility. It checks that a continue_node
221 * with predecessors in two different graphs receives the required
222 * number of continue messages before it executes.
224 using namespace tbb::flow;
226 struct add_to_counter {
228 add_to_counter(int& var):counter(&var){}
229 void operator()(continue_msg){*counter+=1;}
232 void test_two_graphs(){
235 //graph g with broadcast_node and continue_node
237 broadcast_node<continue_msg> start_g(g);
238 continue_node<continue_msg> first_g(g, add_to_counter(count));
240 //graph h with broadcast_node
242 broadcast_node<continue_msg> start_h(h);
244 //making two edges to first_g from the two graphs
245 make_edge(start_g,first_g);
246 make_edge(start_h, first_g);
248 //two try_puts from the two graphs
249 start_g.try_put(continue_msg());
250 start_h.try_put(continue_msg());
252 ASSERT(count==1, "Not all continue messages received");
254 //two try_puts from the graph that doesn't contain the node
256 start_h.try_put(continue_msg());
257 start_h.try_put(continue_msg());
259 ASSERT(count==1, "Not all continue messages received -1");
263 start_g.try_put(continue_msg());
265 ASSERT(count==0, "Node executed without waiting for all predecessors");
268 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
269 void test_extract() {
271 tbb::flow::continue_msg cm;
273 tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
274 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
275 tbb::flow::continue_node<tbb::flow::continue_msg> c0(g, add_to_counter(my_count));
276 tbb::flow::queue_node<tbb::flow::continue_msg> q0(g);
278 tbb::flow::make_edge(b0, c0);
279 tbb::flow::make_edge(b1, c0);
280 tbb::flow::make_edge(c0, q0);
281 for( int i = 0; i < 2; ++i ) {
282 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
283 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
284 ASSERT(c0.predecessor_count() == 2 && c0.successor_count() == 1, "c0 has incorrect counts");
285 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
293 b0.try_put(tbb::flow::continue_msg());
295 ASSERT(my_count == 0, "continue_node fired too soon");
296 b1.try_put(tbb::flow::continue_msg());
298 ASSERT(my_count == 1, "continue_node didn't fire");
299 ASSERT(q0.try_get(cm), "continue_node didn't forward");
309 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
310 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
311 ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 1, "c0 has incorrect counts");
312 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
313 b0.try_put(tbb::flow::continue_msg());
314 b0.try_put(tbb::flow::continue_msg());
316 ASSERT(my_count == 1, "b0 messages being forwarded to continue_node even though it is disconnected");
317 b1.try_put(tbb::flow::continue_msg());
319 ASSERT(my_count == 2, "continue_node didn't fire though it has only one predecessor");
320 ASSERT(q0.try_get(cm), "continue_node didn't forward second time");
330 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
331 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
332 ASSERT(c0.predecessor_count() == 0 && c0.successor_count() == 0, "c0 has incorrect counts");
333 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
334 b0.try_put(tbb::flow::continue_msg());
335 b0.try_put(tbb::flow::continue_msg());
336 b1.try_put(tbb::flow::continue_msg());
337 b1.try_put(tbb::flow::continue_msg());
339 ASSERT(my_count == 2, "continue didn't fire though it has only one predecessor");
340 ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
349 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
350 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
351 ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 0, "c0 has incorrect counts");
352 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
354 b0.try_put(tbb::flow::continue_msg());
357 ASSERT(my_count == 3, "continue didn't fire though it has only one predecessor");
358 ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't");
360 tbb::flow::make_edge(b1, c0);
361 tbb::flow::make_edge(c0, q0);
367 struct lightweight_policy_body : NoAssign {
368 const tbb::tbb_thread::id my_thread_id;
369 tbb::atomic<size_t> my_count;
371 lightweight_policy_body() : my_thread_id(tbb::this_tbb_thread::get_id()) {
374 void operator()(tbb::flow::continue_msg) {
376 tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id();
377 ASSERT(body_thread_id == my_thread_id, "Body executed as not lightweight");
381 void test_lightweight_policy() {
383 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node1(g, lightweight_policy_body());
384 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node2(g, lightweight_policy_body());
386 tbb::flow::make_edge(node1, node2);
388 for(size_t i = 0; i < n; ++i) {
389 node1.try_put(tbb::flow::continue_msg());
393 lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
394 lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
395 ASSERT(body1.my_count == n, "Body of the first node needs to be executed N times");
396 ASSERT(body2.my_count == n, "Body of the second node needs to be executed N times");
399 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
402 void test_follows_and_precedes_api() {
403 using msg_t = tbb::flow::continue_msg;
405 std::array<msg_t, 3> messages_for_follows = { msg_t(), msg_t(), msg_t() };
406 std::vector<msg_t> messages_for_precedes = { msg_t() };
408 auto pass_through = [](const msg_t& msg) { return msg; };
410 follows_and_precedes_testing::test_follows
411 <msg_t, tbb::flow::continue_node<msg_t>>
412 (messages_for_follows, pass_through);
414 follows_and_precedes_testing::test_precedes
415 <msg_t, tbb::flow::continue_node<msg_t>>
416 (messages_for_precedes, pass_through);
418 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
420 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
422 template <typename ExpectedType, typename Body>
423 void test_deduction_guides_common(Body body) {
424 using namespace tbb::flow;
427 continue_node c1(g, body);
428 static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
430 continue_node c2(g, body, lightweight());
431 static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
433 continue_node c3(g, 5, body);
434 static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
436 continue_node c4(g, 5, body, lightweight());
437 static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
439 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
440 continue_node c5(g, body, node_priority_t(5));
441 static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
443 continue_node c6(g, body, lightweight(), node_priority_t(5));
444 static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
446 continue_node c7(g, 5, body, node_priority_t(5));
447 static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
449 continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
450 static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454 broadcast_node<continue_msg> b(g);
456 continue_node c9(follows(b), body);
457 static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
459 continue_node c10(follows(b), body, lightweight());
460 static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
462 continue_node c11(follows(b), 5, body);
463 static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
465 continue_node c12(follows(b), 5, body, lightweight());
466 static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
468 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
469 continue_node c13(follows(b), body, node_priority_t(5));
470 static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
472 continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
473 static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
475 continue_node c15(follows(b), 5, body, node_priority_t(5));
476 static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
478 continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
479 static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
481 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483 continue_node c17(c1);
484 static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
487 int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
488 void continue_void_body_f(const tbb::flow::continue_msg&) {}
490 void test_deduction_guides() {
491 using tbb::flow::continue_msg;
492 test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
493 test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
495 test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
496 test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
498 test_deduction_guides_common<int>(continue_body_f);
499 test_deduction_guides_common<continue_msg>(continue_void_body_f);
502 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
504 // TODO: use pass_through from test_function_node instead
506 struct passing_body {
507 T operator()(const T& val) {
513 The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
514 because there used to be a bug when make_edge(node, continue_node)
515 did not update continue_node's predecesosor threshold
516 since the specialization of node's successor_cache for a continue_node was not chosen.
518 void test_successor_cache_specialization() {
519 using namespace tbb::flow;
523 broadcast_node<continue_msg> node_with_default_mutex_type(g);
524 buffer_node<continue_msg> node_with_non_default_mutex_type(g);
526 continue_node<continue_msg> node(g, passing_body<continue_msg>());
528 make_edge(node_with_default_mutex_type, node);
529 make_edge(node_with_non_default_mutex_type, node);
531 buffer_node<continue_msg> buf(g);
533 make_edge(node, buf);
535 node_with_default_mutex_type.try_put(continue_msg());
536 node_with_non_default_mutex_type.try_put(continue_msg());
540 continue_msg storage;
541 ASSERT((buf.try_get(storage) && !buf.try_get(storage)),
542 "Wrong number of messages is passed via continue_node");
547 REPORT("number of threads must be positive\n");
550 for( int p=MinThread; p<=MaxThread; ++p ) {
554 #if __TBB_PREVIEW_LIGHTWEIGHT_POLICY
555 test_lightweight_policy();
557 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
560 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
561 test_follows_and_precedes_api();
563 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
564 test_deduction_guides();
566 test_successor_cache_specialization();
567 return Harness::Done;