2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #define HARNESS_DEFAULT_MIN_THREADS 3
18 #define HARNESS_DEFAULT_MAX_THREADS 4
21 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
22 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
23 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
24 #pragma warning (disable: 4702)
29 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
31 // need these to get proper external names for private methods in library.
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
35 #include "tbb/task_arena.h"
37 #define private public
38 #define protected public
39 #include "tbb/flow_graph.h"
42 #include "tbb/task_scheduler_init.h"
43 #include "harness_graph.h"
47 tbb::flow::continue_msg operator()(const T &/*in*/) {
48 return tbb::flow::continue_msg();
52 // split_nodes cannot have predecessors
53 // they do not reject messages and always forward.
54 // they reject edge reversals from successors.
55 void TestSplitNode() {
56 typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type;
59 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
60 REMARK("Testing split_node\n");
61 ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors");
62 // tbb::flow::output_port<0>(snode)
63 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
64 ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor.");
65 snode.try_put(tbb::flow::tuple<int>(1));
68 ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor.");
69 g.reset(tbb::flow::rf_clear_edges);
70 ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor.");
73 // buffering nodes cannot have predecessors
74 // they do not reject messages and always save or forward
75 // they allow edge reversals from successors
76 template< typename B >
77 void TestBufferingNode(const char * name) {
80 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
81 REMARK("Testing %s:", name);
82 for(int icnt = 0; icnt < 2; icnt++) {
83 bool reverse_edge = (icnt & 0x2) != 0;
84 serial_fn_state0 = 0; // reset to waiting state.
86 tbb::flow::make_edge(bnode, fnode);
87 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
89 bnode.try_put(1); // will forward to the fnode
90 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put");
93 bnode.try_put(2); // will reverse the edge
94 // cannot do a wait_for_all here; the function_node is still executing
95 BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put");
96 // at this point the only task running is the one for the function_node.
97 ASSERT(bnode.my_successors.empty(), "successor not removed");
100 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
102 serial_fn_state0 = 0; // release the function_node.
104 // have to do a second release because the function_node will get the 2nd item
105 BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put");
106 serial_fn_state0 = 0; // release the function_node.
109 REMARK(" remove_edge");
110 tbb::flow::remove_edge(bnode, fnode);
111 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
113 tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
114 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
116 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
118 bnode.try_put(1); // the edge should reverse
120 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
123 g.reset(); // should be in forward direction again
124 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
125 REMARK(" remove_edge");
126 g.reset(tbb::flow::rf_clear_edges);
127 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
128 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again
129 // reverse edge by adding to buffer.
130 bnode.try_put(1); // the edge should reverse
132 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
133 REMARK(" remove_edge(reversed)");
134 g.reset(tbb::flow::rf_clear_edges);
135 ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()");
136 ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset");
141 // continue_node has only predecessor count
142 // they do not have predecessors, only the counts
143 // successor edges cannot be reversed
144 void TestContinueNode() {
146 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
147 tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0));
148 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
149 tbb::flow::make_edge(fnode0, cnode);
150 tbb::flow::make_edge(cnode, fnode1);
151 REMARK("Testing continue_node:");
152 for( int icnt = 0; icnt < 2; ++icnt ) {
153 REMARK( " initial%d", icnt);
154 ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count");
155 ASSERT(!cnode.successors().empty(), "successors empty though we added one");
156 ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect");
157 serial_continue_state0 = 0;
158 serial_fn_state0 = 0;
159 serial_fn_state1 = 0;
161 fnode0.try_put(1); // start the first function node.
162 BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start");
163 // Now the body of function_node 0 is executing.
164 serial_fn_state0 = 0; // release the node
165 // wait for node to count the message (or for the node body to execute, which would be wrong)
166 BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change");
167 ASSERT(serial_continue_state0 == 0, "Improperly released continue_node");
168 ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect");
169 if(icnt == 0) { // first time through, let the continue_node fire
171 fnode0.try_put(1); // second message
172 BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute");
173 // Now the body of function_node 0 is executing.
174 serial_fn_state0 = 0; // release the node
176 BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start"); // now we wait for the continue_node.
177 ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started");
178 serial_continue_state0 = 0; // release the continue_node
179 BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start"); // wait for the successor function_node to enter body
180 serial_fn_state1 = 0; // release successor function_node.
186 ASSERT(!cnode.try_get(i), "try_get not rejected");
190 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
191 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
192 g.reset(); // should still be the same
193 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" );
194 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)");
196 else { // we're going to see if the rf_clear_edges resets things.
198 REMARK(" reset(rf_clear_edges)");
199 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
200 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
201 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
202 ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)");
203 ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
204 ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset");
212 // function_node has predecessors and successors
214 // successor edges cannot be reversed
215 // predecessors will reverse (only rejecting will reverse)
216 void TestFunctionNode() {
218 tbb::flow::queue_node<int> qnode0(g);
219 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
220 // queueing function node
221 tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
223 tbb::flow::queue_node<int> qnode1(g);
225 tbb::flow::make_edge(fnode0, qnode1);
226 tbb::flow::make_edge(qnode0, fnode0);
228 serial_fn_state0 = 2; // just let it go
229 // see if the darned thing will work....
233 ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
234 tbb::flow::remove_edge(qnode0, fnode0);
235 tbb::flow::remove_edge(fnode0, qnode1);
237 tbb::flow::make_edge(fnode1, qnode1);
238 tbb::flow::make_edge(qnode0, fnode1);
240 serial_fn_state0 = 2; // just let it go
241 // see if the darned thing will work....
244 ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
245 tbb::flow::remove_edge(qnode0, fnode1);
246 tbb::flow::remove_edge(fnode1, qnode1);
249 serial_fn_state0 = 0;
250 tbb::flow::make_edge(fnode0, qnode1);
251 tbb::flow::make_edge(qnode0, fnode0);
252 REMARK("Testing rejecting function_node:");
253 ASSERT(!fnode0.my_queue, "node should have no queue");
254 ASSERT(!fnode0.my_successors.empty(), "successor edge not added");
256 BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start");
257 qnode0.try_put(2); // rejecting node should reject, reverse.
258 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---");
259 serial_fn_state0 = 2; // release function_node body.
262 g.reset(); // should reverse the edge from the input to the function node.
263 ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
264 ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed");
265 tbb::flow::remove_edge(qnode0, fnode0);
266 tbb::flow::remove_edge(fnode0, qnode1);
270 tbb::flow::make_edge(fnode1, qnode1);
271 REMARK("Testing queueing function_node:");
272 ASSERT(fnode1.my_queue, "node should have no queue");
273 ASSERT(!fnode1.my_successors.empty(), "successor edge not added");
275 ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor");
276 ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor");
279 g.reset(); // should reverse the edge from the input to the function node.
280 ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
281 ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed");
282 tbb::flow::remove_edge(qnode0, fnode1);
283 tbb::flow::remove_edge(fnode1, qnode1);
286 serial_fn_state0 = 0; // make the function_node wait
287 tbb::flow::make_edge(qnode0, fnode0);
288 REMARK(" start_func");
290 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put");
291 // now if we put an item to the queues the edges to the function_node will reverse.
292 REMARK(" put_node(2)");
293 qnode0.try_put(2); // start queue node.
294 // wait for the edges to reverse
295 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting");
296 ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed");
297 g.my_root_task->cancel_group_execution();
298 // release the function_node
299 serial_fn_state0 = 2;
301 ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed");
302 g.reset(tbb::flow::rf_clear_edges);
303 ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed");
304 ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed");
308 template<typename TT>
312 tag_func(TT multiplier) : my_mult(multiplier) { }
313 // operator() will return [0 .. Count)
314 tbb::flow::tag_value operator()( TT v) {
315 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
320 template<typename JNODE_TYPE>
322 TestSimpleSuccessorArc(const char *name) {
325 REMARK("Join<%s> successor test ", name);
326 tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g);
327 tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g);
328 tbb::flow::make_edge(qj, bnode);
329 ASSERT(!qj.my_successors.empty(),"successor missing after linking");
331 ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
332 g.reset(tbb::flow::rf_clear_edges);
333 ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
339 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
342 REMARK("Join<%s> successor test ", name);
343 typedef tbb::flow::tuple<int,int> my_tuple;
344 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
348 tbb::flow::broadcast_node<my_tuple > bnode(g);
349 tbb::flow::make_edge(qj, bnode);
350 ASSERT(!qj.my_successors.empty(),"successor missing after linking");
352 ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
353 g.reset(tbb::flow::rf_clear_edges);
354 ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
362 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
363 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
364 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
366 // queueing and tagging join nodes have input queues, so the input ports do not reverse.
367 REMARK(" reserving preds");
369 tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g);
370 tbb::flow::queue_node<int> q0(g);
371 tbb::flow::queue_node<int> q1(g);
372 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
373 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
375 g.wait_for_all(); // quiesce
376 ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor");
377 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred");
379 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
380 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
382 g.wait_for_all(); // quiesce
383 ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
384 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
386 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
387 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
388 // should reset predecessors just as regular reset.
390 g.wait_for_all(); // quiesce
391 ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
392 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
393 g.reset(tbb::flow::rf_clear_edges);
394 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
395 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
396 ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
397 ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
406 tbb::flow::limiter_node<int> ln(g,1);
407 REMARK("Testing limiter_node: preds and succs");
408 ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
409 ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
410 ASSERT(ln.decrement.my_current_count == 0, "error in current count");
411 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
412 ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
414 ASSERT(ln.my_threshold == 1, "error in my_threshold");
415 tbb::flow::queue_node<int> inq(g);
416 tbb::flow::queue_node<int> outq(g);
417 tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g);
419 tbb::flow::make_edge(inq,ln);
420 tbb::flow::make_edge(ln,outq);
421 tbb::flow::make_edge(bn,ln.decrement);
424 ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge");
425 ASSERT(ln.my_predecessors.empty(), "input edge reversed");
428 ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value");
429 ASSERT(ln.my_predecessors.empty(), "input edge reversed");
432 ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input");
433 ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed");
434 bn.try_put(tbb::flow::continue_msg());
436 ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value");
438 ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())");
440 ASSERT(ln.my_predecessors.empty(), "input edge not reset");
443 ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value");
445 REMARK(" rf_clear_edges");
446 // currently the limiter_node will not pass another message
447 g.reset(tbb::flow::rf_clear_edges);
448 ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
449 ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
450 ASSERT(ln.decrement.my_current_count == 0, "error in current count");
451 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
452 ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
454 ASSERT(ln.my_threshold == 1, "error in my_threshold");
455 ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)");
456 ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)");
457 ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
458 ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
459 ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)");
460 tbb::flow::make_edge(inq,ln);
461 tbb::flow::make_edge(ln,outq);
465 ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)");
466 ASSERT(out_int == 4, "input incorrect (4)");
467 bn.try_put(tbb::flow::continue_msg());
469 ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)");
473 template<typename MF_TYPE>
475 tbb::atomic<int> *_flag;
476 mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { }
477 void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) {
480 BACKOFF_WAIT(*_flag == 1, "multifunction_node not released");
483 if(in & 0x1) tbb::flow::get<1>(outports).try_put(in);
484 else tbb::flow::get<0>(outports).try_put(in);
488 template<typename P, typename T>
489 struct test_reversal;
491 struct test_reversal<tbb::flow::queueing, T> {
492 test_reversal() { REMARK("<queueing>"); }
493 // queueing node will not reverse.
494 bool operator()( T &node) { return node.my_predecessors.empty(); }
498 struct test_reversal<tbb::flow::rejecting, T> {
499 test_reversal() { REMARK("<rejecting>"); }
500 bool operator()( T &node) { return !node.my_predecessors.empty(); }
505 TestMultifunctionNode() {
506 typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type;
507 REMARK("Testing multifunction_node");
508 test_reversal<P,multinode_type> my_test;
511 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
512 tbb::flow::queue_node<int> qin(g);
513 tbb::flow::queue_node<int> qodd_out(g);
514 tbb::flow::queue_node<int> qeven_out(g);
515 tbb::flow::make_edge(qin,mf);
516 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
517 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
519 for( int ii = 0; ii < 2 ; ++ii) {
520 serial_fn_state0 = 0;
521 if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");
523 // wait for node to be active
524 BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put");
526 BACKOFF_WAIT((!my_test(mf)), "Timed out waiting");
527 ASSERT(my_test(mf), "fail second put test");
528 g.my_root_task->cancel_group_execution();
530 serial_fn_state0 = 2;
532 ASSERT(my_test(mf), "fail cancel group test");
534 REMARK(" rf_clear_edges");
535 g.reset(tbb::flow::rf_clear_edges);
536 ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)");
537 ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)");
543 ASSERT(mf.my_predecessors.empty(), "edge didn't reset");
544 ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset");
549 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
550 // never allows a successor to reverse its edge, so we only need test the successors.
554 typedef tbb::flow::indexer_node< int, int > indexernode_type;
555 indexernode_type inode(g);
556 REMARK("Testing indexer_node:");
557 tbb::flow::queue_node<indexernode_type::output_type> qout(g);
558 tbb::flow::make_edge(inode,qout);
560 ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing");
562 ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset");
563 g.reset(tbb::flow::rf_clear_edges);
564 ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)");
568 template<typename Node>
570 TestScalarNode(const char *name) {
573 tbb::flow::queue_node<int> qout(g);
574 REMARK("Testing %s:", name);
575 tbb::flow::make_edge(on,qout);
577 ASSERT(!on.my_successors.empty(), "edge not added");
579 ASSERT(!on.my_successors.empty(), "edge improperly removed");
580 g.reset(tbb::flow::rf_clear_edges);
581 ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
586 size_t operator()(const int &in) {
587 return size_t(in / 3);
591 // sequencer_node behaves like a queueing node, but requires a different constructor.
593 TestSequencerNode() {
595 tbb::flow::sequencer_node<int> bnode(g, seq_body());
596 REMARK("Testing sequencer_node:");
597 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
598 REMARK("Testing sequencer_node:");
599 serial_fn_state0 = 0; // reset to waiting state.
600 REMARK(" make_edge");
601 tbb::flow::make_edge(bnode, fnode);
602 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
604 bnode.try_put(0); // will forward to the fnode
605 BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node"); // wait for the function_node to fire up
606 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
607 serial_fn_state0 = 0;
609 REMARK(" remove_edge");
610 tbb::flow::remove_edge(bnode, fnode);
611 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
612 tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
613 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
615 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
617 bnode.try_put(3); // the edge should reverse
619 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
622 g.reset(); // should be in forward direction again
623 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
624 REMARK(" remove_edge");
625 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
626 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
627 ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)");
635 snode_body( const int &in) : max_cnt(in) { my_cnt = 0; }
636 bool operator()(int &out) {
637 if(max_cnt <= my_cnt++) return false;
646 tbb::flow::source_node<int> sn(g, snode_body(4), false);
647 REMARK("Testing source_node:");
648 tbb::flow::queue_node<int> qin(g);
649 tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g);
650 tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g);
652 REMARK(" make_edges");
653 tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
654 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
655 tbb::flow::make_edge(jn,qout);
656 ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge");
659 ASSERT(!sn.my_successors.empty(), "source node has no successor after reset");
661 g.reset(tbb::flow::rf_clear_edges);
662 ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)");
663 tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
664 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
665 tbb::flow::make_edge(jn,qout);
668 sn.activate(); // will forward to the fnode
670 BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse");
671 ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message");
675 ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset");
676 ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset.");
682 if(MinThread < 3) MinThread = 3;
683 tbb::task_scheduler_init init(MinThread); // tests presume at least three threads
685 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
686 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
687 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
690 TestMultifunctionNode<tbb::flow::rejecting>();
691 TestMultifunctionNode<tbb::flow::queueing>();
701 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
702 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
703 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
705 return Harness::Done;