2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
19 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
20 #include "harness_graph.h"
22 #include "tbb/flow_graph.h"
23 #include "tbb/atomic.h"
24 #include "tbb/task_scheduler_init.h"
29 using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
31 template< typename T >
32 struct serial_receiver : public tbb::flow::receiver<T>, NoAssign {
34 tbb::flow::graph& my_graph;
36 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
38 tbb::task *try_put_task( const T &v ) __TBB_override {
39 ASSERT( next_value++ == v, NULL );
40 return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
43 tbb::flow::graph& graph_reference() __TBB_override {
47 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
48 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
49 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
50 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
51 built_predecessors_type bpt;
52 built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
53 void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
54 void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
55 void copy_predecessors( predecessor_list_type & ) __TBB_override { }
56 size_t predecessor_count() __TBB_override { return 0; }
59 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {next_value = T(0);}
62 template< typename T >
63 struct parallel_receiver : public tbb::flow::receiver<T>, NoAssign {
65 tbb::atomic<int> my_count;
66 tbb::flow::graph& my_graph;
68 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
70 tbb::task *try_put_task( const T &/*v*/ ) __TBB_override {
72 return const_cast<tbb::task *>(tbb::flow::internal::SUCCESSFULLY_ENQUEUED);
75 tbb::flow::graph& graph_reference() __TBB_override {
79 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
80 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
81 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
82 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
83 built_predecessors_type bpt;
84 built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
85 void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
86 void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
87 void copy_predecessors( predecessor_list_type & ) __TBB_override { }
88 size_t predecessor_count( ) __TBB_override { return 0; }
90 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {my_count = 0;}
93 template< typename T >
94 struct empty_sender : public tbb::flow::sender<T> {
95 typedef typename tbb::flow::sender<T>::successor_type successor_type;
97 bool register_successor( successor_type & ) __TBB_override { return false; }
98 bool remove_successor( successor_type & ) __TBB_override { return false; }
99 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
100 typedef typename tbb::flow::sender<T>::built_successors_type built_successors_type;
101 typedef typename tbb::flow::sender<T>::successor_list_type successor_list_type;
102 built_successors_type bst;
103 built_successors_type &built_successors() __TBB_override { return bst; }
104 void internal_add_built_successor( successor_type & ) __TBB_override { }
105 void internal_delete_built_successor( successor_type & ) __TBB_override { }
106 void copy_successors( successor_list_type & ) __TBB_override { }
107 size_t successor_count() __TBB_override { return 0; }
112 template< typename T >
113 struct put_body : NoAssign {
115 tbb::flow::limiter_node<T> &my_lim;
116 tbb::atomic<int> &my_accept_count;
118 put_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) :
119 my_lim(lim), my_accept_count(accept_count) {}
121 void operator()( int ) const {
122 for ( int i = 0; i < L; ++i ) {
123 bool msg = my_lim.try_put( T(i) );
130 template< typename T >
131 struct put_dec_body : NoAssign {
133 tbb::flow::limiter_node<T> &my_lim;
134 tbb::atomic<int> &my_accept_count;
136 put_dec_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) :
137 my_lim(lim), my_accept_count(accept_count) {}
139 void operator()( int ) const {
140 int local_accept_count = 0;
141 while ( local_accept_count < N ) {
142 bool msg = my_lim.try_put( T(local_accept_count) );
144 ++local_accept_count;
146 my_lim.decrement.try_put( tbb::flow::continue_msg() );
153 template< typename T >
154 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
155 parallel_receiver<T> r(g);
156 empty_sender< tbb::flow::continue_msg > s;
157 tbb::atomic<int> accept_count;
159 tbb::flow::make_edge( lim, r );
160 tbb::flow::make_edge(s, lim.decrement);
161 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
162 ASSERT(lim.decrement.predecessor_count() == 1, NULL);
163 ASSERT(lim.successor_count() == 1, NULL);
164 ASSERT(lim.predecessor_count() == 0, NULL);
165 typename tbb::flow::interface10::internal::decrementer
166 <tbb::flow::limiter_node<T>, tbb::flow::continue_msg>::predecessor_list_type dec_preds;
167 lim.decrement.copy_predecessors(dec_preds);
168 ASSERT(dec_preds.size() == 1, NULL);
170 // test puts with decrements
171 NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
172 int c = accept_count;
173 ASSERT( c == N*num_threads, NULL );
174 ASSERT( r.my_count == N*num_threads, NULL );
180 // limiter only forwards below the limit, multiple parallel senders / single receiver
181 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
184 template< typename T >
185 int test_parallel(int num_threads) {
187 // test puts with no decrements
188 for ( int i = 0; i < L; ++i ) {
190 tbb::flow::limiter_node< T > lim(g, i);
191 parallel_receiver<T> r(g);
192 tbb::atomic<int> accept_count;
194 tbb::flow::make_edge( lim, r );
195 // test puts with no decrements
196 NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
198 int c = accept_count;
199 ASSERT( c == i, NULL );
202 // test puts with decrements
203 for ( int i = 1; i < L; ++i ) {
205 tbb::flow::limiter_node< T > lim(g, i);
206 test_puts_with_decrements(num_threads, lim, g);
207 tbb::flow::limiter_node< T > lim_copy( lim );
208 test_puts_with_decrements(num_threads, lim_copy, g);
217 // limiter only forwards below the limit, single sender / single receiver
218 // at reject, a put to decrement, will cause next message to be accepted
220 template< typename T >
223 // test puts with no decrements
224 for ( int i = 0; i < L; ++i ) {
226 tbb::flow::limiter_node< T > lim(g, i);
227 serial_receiver<T> r(g);
228 tbb::flow::make_edge( lim, r );
229 for ( int j = 0; j < L; ++j ) {
230 bool msg = lim.try_put( T(j) );
231 ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL );
236 // test puts with decrements
237 for ( int i = 1; i < L; ++i ) {
239 tbb::flow::limiter_node< T > lim(g, i);
240 serial_receiver<T> r(g);
241 empty_sender< tbb::flow::continue_msg > s;
242 tbb::flow::make_edge( lim, r );
243 tbb::flow::make_edge(s, lim.decrement);
244 for ( int j = 0; j < N; ++j ) {
245 bool msg = lim.try_put( T(j) );
246 ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL );
247 if ( msg == false ) {
248 lim.decrement.try_put( tbb::flow::continue_msg() );
249 msg = lim.try_put( T(j) );
250 ASSERT( msg == true, NULL );
257 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
258 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node
259 #define LIMITER_OUTPUT 0 // port number of the integer output
261 typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int,tbb::flow::continue_msg> > mfnode_type;
263 tbb::atomic<size_t> emit_count;
264 tbb::atomic<size_t> emit_sum;
265 tbb::atomic<size_t> receive_count;
266 tbb::atomic<size_t> receive_sum;
270 tbb::atomic<int>* my_cnt;
271 mfnode_body(const int& _max, tbb::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { }
272 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
273 int lcnt = ++(*my_cnt);
277 // put one continue_msg to the decrement of the limiter.
278 if(!tbb::flow::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
279 ASSERT(false,"Unexpected rejection of decrement");
282 // put messages to the input of the limiter_node until it rejects.
283 while( tbb::flow::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
292 int operator()(const int &in) {
301 // | mf_node |0---+ +----------+ +----------+
302 // +->| |1---------->| lim_node |--------->| fn_node |--+
303 // | +---------+ +----------+ +----------+ |
306 // +-------------------------------------------------------------+
309 test_multifunction_to_limiter(int _max, int _nparallel) {
315 tbb::atomic<int> local_cnt;
317 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
318 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
319 tbb::flow::limiter_node<int> lim_node(g, _nparallel);
320 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
321 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrement);
322 tbb::flow::make_edge(lim_node, fn_node);
323 tbb::flow::make_edge(fn_node, mf_node);
324 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
325 REMARK("pred cnt == %d\n",(int)(lim_node.predecessor_count()));
326 REMARK("succ cnt == %d\n",(int)(lim_node.successor_count()));
327 tbb::flow::limiter_node<int>::successor_list_type my_succs;
328 lim_node.copy_successors(my_succs);
329 REMARK("succ cnt from vector == %d\n",(int)(my_succs.size()));
330 tbb::flow::limiter_node<int>::predecessor_list_type my_preds;
331 lim_node.copy_predecessors(my_preds);
332 REMARK("pred cnt from vector == %d\n",(int)(my_preds.size()));
336 ASSERT(emit_count == receive_count, "counts do not match");
337 ASSERT(emit_sum == receive_sum, "sums do not match");
348 ASSERT(emit_count == receive_count, "counts do not match");
349 ASSERT(emit_sum == receive_sum, "sums do not match");
354 test_continue_msg_reception() {
356 tbb::flow::limiter_node<int> ln(g,2);
357 tbb::flow::queue_node<int> qn(g);
358 tbb::flow::make_edge(ln, qn);
359 ln.decrement.try_put(tbb::flow::continue_msg());
363 ASSERT(qn.try_get(outint) && outint == 42, "initial put to decrement stops node");
366 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
367 using namespace tbb::flow;
368 void run_and_check_result(graph& g, limiter_node<int>& limit, queue_node<int>& queue, broadcast_node<continue_msg>& broad) {
369 ASSERT( limit.try_put(1), NULL );
370 ASSERT( limit.try_put(2), NULL );
371 ASSERT( !limit.try_put(3), NULL );
372 ASSERT( broad.try_put(continue_msg()), NULL );
373 ASSERT( limit.decrement.try_put(continue_msg()), NULL );
374 ASSERT( limit.try_put(4), NULL );
375 ASSERT( !limit.try_put(5), NULL );
378 int list[] = {1, 2, 4};
380 for (size_t i = 0; i < sizeof(list)/sizeof(list[0]); i++) {
382 ASSERT(var==list[i], "some data dropped, input does not match output");
386 void test_num_decrement_predecessors() {
388 queue_node<int> output_queue(g);
389 limiter_node<int> limit1(g, 2, /*number_of_predecessors*/1);
390 limiter_node<int, continue_msg> limit2(g, 2, /*number_of_predecessors*/1);
391 broadcast_node<continue_msg> broadcast(g);
393 make_edge(limit1, output_queue);
394 make_edge(limit2, output_queue);
396 make_edge(broadcast, limit1.decrement);
397 make_edge(broadcast, limit2.decrement);
399 run_and_check_result(g, limit1, output_queue, broadcast);
400 run_and_check_result(g, limit2, output_queue, broadcast);
402 #else // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
404 // This test ascertains that if a message is not successfully put
405 // to a successor, the message is not dropped but released.
408 void test_reserve_release_messages() {
409 using namespace tbb::flow;
412 //making two queue_nodes: one broadcast_node and one limiter_node
413 queue_node<int> input_queue(g);
414 queue_node<int> output_queue(g);
415 broadcast_node<int> broad(g);
416 limiter_node<int, int> limit(g,2); //threshold of 2
419 make_edge(input_queue, limit);
420 make_edge(limit, output_queue);
421 make_edge(broad,limit.decrement);
423 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
425 input_queue.try_put(list[0]); // succeeds
426 input_queue.try_put(list[1]); // succeeds
427 input_queue.try_put(list[2]); // fails, stored in upstream buffer
430 remove_edge(limit, output_queue); //remove successor
432 //sending message to the decrement port of the limiter
433 broad.try_put(1); //failed message retrieved.
436 make_edge(limit, output_queue); //putting the successor back
438 broad.try_put(1); //drop the count
440 input_queue.try_put(list[3]); //success
445 for (int i=0; i<4; i++) {
446 output_queue.try_get(var);
447 ASSERT(var==list[i], "some data dropped, input does not match output");
452 void test_decrementer() {
453 const int threshold = 5;
455 tbb::flow::limiter_node<int, int> limit(g, threshold);
456 tbb::flow::queue_node<int> queue(g);
457 make_edge(limit, queue);
459 ASSERT( limit.try_put( m++ ), "Newly constructed limiter node does not accept message." );
460 ASSERT( limit.decrement.try_put( -threshold ), // close limiter's gate
461 "Limiter node decrementer's port does not accept message." );
462 ASSERT( !limit.try_put( m++ ), "Closed limiter node's accepts message." );
463 ASSERT( limit.decrement.try_put( threshold + 5 ), // open limiter's gate
464 "Limiter node decrementer's port does not accept message." );
465 for( int i = 0; i < threshold; ++i )
466 ASSERT( limit.try_put( m++ ), "Limiter node does not accept message while open." );
467 ASSERT( !limit.try_put( m ), "Limiter node's gate is not closed." );
469 int expected[] = {0, 2, 3, 4, 5, 6};
470 int actual = -1; m = 0;
471 while( queue.try_get(actual) )
472 ASSERT( actual == expected[m++], NULL );
473 ASSERT( sizeof(expected) / sizeof(expected[0]) == m, "Not all messages have been processed." );
476 const size_t threshold2 = size_t(-1);
477 tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
478 make_edge(limit2, queue);
479 ASSERT( limit2.try_put( 1 ), "Newly constructed limiter node does not accept message." );
480 long long decrement_value = (long long)( size_t(-1)/2 );
481 ASSERT( limit2.decrement.try_put( -decrement_value ),
482 "Limiter node decrementer's port does not accept message" );
483 ASSERT( limit2.try_put( 2 ), "Limiter's gate should not be closed yet." );
484 ASSERT( limit2.decrement.try_put( -decrement_value ),
485 "Limiter node decrementer's port does not accept message" );
486 ASSERT( !limit2.try_put( 3 ), "Overflow happened for internal counter." );
487 int expected2[] = {1, 2};
489 while( queue.try_get(actual) )
490 ASSERT( actual == expected2[m++], NULL );
491 ASSERT( sizeof(expected2) / sizeof(expected2[0]) == m, "Not all messages have been processed." );
494 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
496 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
497 void test_extract() {
500 tbb::flow::limiter_node<int> node0(g, /*threshold*/1);
501 tbb::flow::queue_node<int> q0(g);
502 tbb::flow::queue_node<int> q1(g);
503 tbb::flow::queue_node<int> q2(g);
504 tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
505 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
507 for( int i = 0; i < 2; ++i ) {
508 REMARK("At pass %d\n", i);
509 ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count at start");
510 ASSERT(node0.successor_count() == 0, "incorrect successor count at start");
511 ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count at start");
513 tbb::flow::make_edge(q0, node0);
514 tbb::flow::make_edge(q1, node0);
515 tbb::flow::make_edge(node0, q2);
516 tbb::flow::make_edge(b0, node0.decrement);
517 tbb::flow::make_edge(b1, node0.decrement);
530 ASSERT(node0.predecessor_count() == 2, "incorrect predecessor count after construction");
531 ASSERT(node0.successor_count() == 1, "incorrect successor count after construction");
532 ASSERT(node0.decrement.predecessor_count() == 2, "incorrect decrement pred count after construction");
533 ASSERT(q2.try_get(j), "fetch of value forwarded to output queue failed");
534 ASSERT(j == i, "improper value forwarded to output queue");
537 ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly");
538 b0.try_put(tbb::flow::continue_msg());
540 ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly");
541 b0.try_put(tbb::flow::continue_msg());
543 ASSERT(q2.try_get(j) && j == 2*i, "limiter_node failed to forward item");
545 tbb::flow::limiter_node<int>::successor_list_type sv;
546 tbb::flow::limiter_node<int>::predecessor_list_type pv;
547 tbb::flow::continue_receiver::predecessor_list_type dv;
548 tbb::flow::limiter_node<int>::successor_list_type sv1;
549 tbb::flow::limiter_node<int>::predecessor_list_type pv1;
550 tbb::flow::continue_receiver::predecessor_list_type dv1;
552 node0.copy_predecessors(pv);
553 node0.copy_successors(sv);
554 node0.decrement.copy_predecessors(dv);
555 pv1.push_back(&(q0));
556 pv1.push_back(&(q1));
557 sv1.push_back(&(q2));
558 dv1.push_back(&(b0));
559 dv1.push_back(&(b1));
561 ASSERT(pv.size() == 2, "improper size for predecessors");
562 ASSERT(sv.size() == 1, "improper size for successors");
563 ASSERT(lists_match(pv,pv1), "predecessor lists do not match");
564 ASSERT(lists_match(sv,sv1), "successor lists do not match");
565 ASSERT(lists_match(dv,dv1), "successor lists do not match");
569 ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extraction");
570 ASSERT(node0.successor_count() == 0, "incorrect successor count after extraction");
571 ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extraction");
578 ASSERT(node0.predecessor_count() == 1, "incorrect predecessor count after extract second iter");
579 ASSERT(node0.successor_count() == 0, "incorrect successor count after extract second iter");
580 ASSERT(node0.decrement.predecessor_count() == 1, "incorrect decrement pred count after extract second iter");
582 node0.copy_predecessors(pv);
583 node0.copy_successors(sv);
584 node0.decrement.copy_predecessors(dv);
588 pv1.push_back(&(q1));
589 dv1.push_back(&(b1));
591 ASSERT(lists_match(pv,pv1), "predecessor lists do not match second iter");
592 ASSERT(lists_match(sv,sv1), "successor lists do not match second iter");
593 ASSERT(lists_match(dv,dv1), "successor lists do not match second iter");
598 ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extract");
599 ASSERT(node0.successor_count() == 0, "incorrect successor count after extract");
600 ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extract");
605 #endif // TBB_DEPRECATED_FLOW_NODE_EXTRACTION
608 for (int i = 1; i <= 8; ++i) {
609 tbb::task_scheduler_init init(i);
611 test_parallel<int>(i);
613 test_continue_msg_reception();
614 test_multifunction_to_limiter(30,3);
615 test_multifunction_to_limiter(300,13);
616 test_multifunction_to_limiter(3000,1);
617 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
618 test_num_decrement_predecessors();
620 test_reserve_release_messages();
623 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
626 return Harness::Done;