2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #define HARNESS_DEFAULT_MIN_THREADS 3
18 #define HARNESS_DEFAULT_MAX_THREADS 4
21 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
22 #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
23 // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
24 #pragma warning (disable: 4702)
29 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
31 // need these to get proper external names for private methods in library.
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
35 #include "tbb/task_arena.h"
37 #define private public
38 #define protected public
39 #include "tbb/flow_graph.h"
42 #include "tbb/task_scheduler_init.h"
43 #include "harness_graph.h"
47 tbb::flow::continue_msg operator()(const T &/*in*/) {
48 return tbb::flow::continue_msg();
52 // split_nodes cannot have predecessors
53 // they do not reject messages and always forward.
54 // they reject edge reversals from successors.
55 void TestSplitNode() {
56 typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type;
59 tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
60 REMARK("Testing split_node\n");
61 ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors");
62 // tbb::flow::output_port<0>(snode)
63 tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
64 ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor.");
65 snode.try_put(tbb::flow::tuple<int>(1));
68 ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor.");
69 g.reset(tbb::flow::rf_clear_edges);
70 ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor.");
73 // buffering nodes cannot have predecessors
74 // they do not reject messages and always save or forward
75 // they allow edge reversals from successors
76 template< typename B >
77 void TestBufferingNode(const char * name) {
80 tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
81 REMARK("Testing %s:", name);
82 for(int icnt = 0; icnt < 2; icnt++) {
83 bool reverse_edge = (icnt & 0x2) != 0;
84 serial_fn_state0 = 0; // reset to waiting state.
86 tbb::flow::make_edge(bnode, fnode);
87 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
89 bnode.try_put(1); // will forward to the fnode
90 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put");
93 bnode.try_put(2); // will reverse the edge
94 // cannot do a wait_for_all here; the function_node is still executing
95 BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put");
96 // at this point the only task running is the one for the function_node.
97 ASSERT(bnode.my_successors.empty(), "successor not removed");
100 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
102 serial_fn_state0 = 0; // release the function_node.
104 // have to do a second release because the function_node will get the 2nd item
105 BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put");
106 serial_fn_state0 = 0; // release the function_node.
109 REMARK(" remove_edge");
110 tbb::flow::remove_edge(bnode, fnode);
111 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
113 tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
114 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
116 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
118 bnode.try_put(1); // the edge should reverse
120 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
123 g.reset(); // should be in forward direction again
124 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
125 REMARK(" remove_edge");
126 g.reset(tbb::flow::rf_clear_edges);
127 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
128 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // add edge again
129 // reverse edge by adding to buffer.
130 bnode.try_put(1); // the edge should reverse
132 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
133 REMARK(" remove_edge(reversed)");
134 g.reset(tbb::flow::rf_clear_edges);
135 ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()");
136 ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset");
141 // continue_node has only predecessor count
142 // they do not have predecessors, only the counts
143 // successor edges cannot be reversed
144 void TestContinueNode() {
146 tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
147 tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0));
148 tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
149 tbb::flow::make_edge(fnode0, cnode);
150 tbb::flow::make_edge(cnode, fnode1);
151 REMARK("Testing continue_node:");
152 for( int icnt = 0; icnt < 2; ++icnt ) {
153 REMARK( " initial%d", icnt);
154 ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count");
155 ASSERT(!cnode.successors().empty(), "successors empty though we added one");
156 ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect");
157 serial_continue_state0 = 0;
158 serial_fn_state0 = 0;
159 serial_fn_state1 = 0;
161 fnode0.try_put(1); // start the first function node.
162 BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start");
163 // Now the body of function_node 0 is executing.
164 serial_fn_state0 = 0; // release the node
165 // wait for node to count the message (or for the node body to execute, which would be wrong)
166 BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change");
167 ASSERT(serial_continue_state0 == 0, "Improperly released continue_node");
168 ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect");
169 if(icnt == 0) { // first time through, let the continue_node fire
171 fnode0.try_put(1); // second message
172 BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute");
173 // Now the body of function_node 0 is executing.
174 serial_fn_state0 = 0; // release the node
176 BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start"); // now we wait for the continue_node.
177 ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started");
178 serial_continue_state0 = 0; // release the continue_node
179 BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start"); // wait for the successor function_node to enter body
180 serial_fn_state1 = 0; // release successor function_node.
186 ASSERT(!cnode.try_get(i), "try_get not rejected");
190 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
191 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
192 g.reset(); // should still be the same
193 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" );
194 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)");
196 else { // we're going to see if the rf_clear_edges resets things.
198 REMARK(" reset(rf_clear_edges)");
199 ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
200 ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
201 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
202 ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)");
203 ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
204 ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset");
212 // function_node has predecessors and successors
214 // successor edges cannot be reversed
215 // predecessors will reverse (only rejecting will reverse)
216 void TestFunctionNode() {
218 tbb::flow::queue_node<int> qnode0(g);
219 tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
220 // queueing function node
221 tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
223 tbb::flow::queue_node<int> qnode1(g);
225 tbb::flow::make_edge(fnode0, qnode1);
226 tbb::flow::make_edge(qnode0, fnode0);
228 serial_fn_state0 = 2; // just let it go
229 // see if the darned thing will work....
233 ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
234 tbb::flow::remove_edge(qnode0, fnode0);
235 tbb::flow::remove_edge(fnode0, qnode1);
237 tbb::flow::make_edge(fnode1, qnode1);
238 tbb::flow::make_edge(qnode0, fnode1);
240 serial_fn_state0 = 2; // just let it go
241 // see if the darned thing will work....
244 ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
245 tbb::flow::remove_edge(qnode0, fnode1);
246 tbb::flow::remove_edge(fnode1, qnode1);
249 serial_fn_state0 = 0;
250 tbb::flow::make_edge(fnode0, qnode1);
251 tbb::flow::make_edge(qnode0, fnode0);
252 REMARK("Testing rejecting function_node:");
253 ASSERT(!fnode0.my_queue, "node should have no queue");
254 ASSERT(!fnode0.my_successors.empty(), "successor edge not added");
256 BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start");
257 qnode0.try_put(2); // rejecting node should reject, reverse.
258 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---");
259 serial_fn_state0 = 2; // release function_node body.
262 g.reset(); // should reverse the edge from the input to the function node.
263 ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
264 ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed");
265 tbb::flow::remove_edge(qnode0, fnode0);
266 tbb::flow::remove_edge(fnode0, qnode1);
270 tbb::flow::make_edge(fnode1, qnode1);
271 REMARK("Testing queueing function_node:");
272 ASSERT(fnode1.my_queue, "node should have no queue");
273 ASSERT(!fnode1.my_successors.empty(), "successor edge not added");
275 ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor");
276 ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor");
279 g.reset(); // should reverse the edge from the input to the function node.
280 ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
281 ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed");
282 tbb::flow::remove_edge(qnode0, fnode1);
283 tbb::flow::remove_edge(fnode1, qnode1);
286 serial_fn_state0 = 0; // make the function_node wait
287 tbb::flow::make_edge(qnode0, fnode0);
288 REMARK(" start_func");
290 BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put");
291 // now if we put an item to the queues the edges to the function_node will reverse.
292 REMARK(" put_node(2)");
293 qnode0.try_put(2); // start queue node.
294 // wait for the edges to reverse
295 BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting");
296 ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed");
297 g.my_root_task->cancel_group_execution();
298 // release the function_node
299 serial_fn_state0 = 2;
301 ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed");
302 g.reset(tbb::flow::rf_clear_edges);
303 ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed");
304 ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed");
308 template<typename TT>
312 tag_func(TT multiplier) : my_mult(multiplier) { }
313 void operator=( const tag_func& other){my_mult = other.my_mult;}
314 // operator() will return [0 .. Count)
315 tbb::flow::tag_value operator()( TT v) {
316 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
321 template<typename JNODE_TYPE>
323 TestSimpleSuccessorArc(const char *name) {
326 REMARK("Join<%s> successor test ", name);
327 tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g);
328 tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g);
329 tbb::flow::make_edge(qj, bnode);
330 ASSERT(!qj.my_successors.empty(),"successor missing after linking");
332 ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
333 g.reset(tbb::flow::rf_clear_edges);
334 ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
340 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
343 REMARK("Join<%s> successor test ", name);
344 typedef tbb::flow::tuple<int,int> my_tuple;
345 tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
349 tbb::flow::broadcast_node<my_tuple > bnode(g);
350 tbb::flow::make_edge(qj, bnode);
351 ASSERT(!qj.my_successors.empty(),"successor missing after linking");
353 ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
354 g.reset(tbb::flow::rf_clear_edges);
355 ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
363 TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
364 TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
365 TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
367 // queueing and tagging join nodes have input queues, so the input ports do not reverse.
368 REMARK(" reserving preds");
370 tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g);
371 tbb::flow::queue_node<int> q0(g);
372 tbb::flow::queue_node<int> q1(g);
373 tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
374 tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
376 g.wait_for_all(); // quiesce
377 ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor");
378 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred");
380 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
381 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
383 g.wait_for_all(); // quiesce
384 ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
385 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
387 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
388 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
389 // should reset predecessors just as regular reset.
391 g.wait_for_all(); // quiesce
392 ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
393 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
394 g.reset(tbb::flow::rf_clear_edges);
395 ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
396 ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
397 ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
398 ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
407 tbb::flow::limiter_node<int> ln(g,1);
408 REMARK("Testing limiter_node: preds and succs");
409 ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
410 ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
411 ASSERT(ln.decrement.my_current_count == 0, "error in current count");
412 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
413 ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
415 ASSERT(ln.my_threshold == 1, "error in my_threshold");
416 tbb::flow::queue_node<int> inq(g);
417 tbb::flow::queue_node<int> outq(g);
418 tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g);
420 tbb::flow::make_edge(inq,ln);
421 tbb::flow::make_edge(ln,outq);
422 tbb::flow::make_edge(bn,ln.decrement);
425 ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge");
426 ASSERT(ln.my_predecessors.empty(), "input edge reversed");
429 ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value");
430 ASSERT(ln.my_predecessors.empty(), "input edge reversed");
433 ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input");
434 ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed");
435 bn.try_put(tbb::flow::continue_msg());
437 ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value");
439 ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())");
441 ASSERT(ln.my_predecessors.empty(), "input edge not reset");
444 ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value");
446 REMARK(" rf_clear_edges");
447 // currently the limiter_node will not pass another message
448 g.reset(tbb::flow::rf_clear_edges);
449 ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
450 ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
451 ASSERT(ln.decrement.my_current_count == 0, "error in current count");
452 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
453 ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
455 ASSERT(ln.my_threshold == 1, "error in my_threshold");
456 ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)");
457 ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)");
458 ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
459 ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
460 ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)");
461 tbb::flow::make_edge(inq,ln);
462 tbb::flow::make_edge(ln,outq);
466 ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)");
467 ASSERT(out_int == 4, "input incorrect (4)");
468 bn.try_put(tbb::flow::continue_msg());
470 ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)");
474 template<typename MF_TYPE>
476 tbb::atomic<int> *_flag;
477 mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { }
478 void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) {
481 BACKOFF_WAIT(*_flag == 1, "multifunction_node not released");
484 if(in & 0x1) tbb::flow::get<1>(outports).try_put(in);
485 else tbb::flow::get<0>(outports).try_put(in);
489 template<typename P, typename T>
490 struct test_reversal;
492 struct test_reversal<tbb::flow::queueing, T> {
493 test_reversal() { REMARK("<queueing>"); }
494 // queueing node will not reverse.
495 bool operator()( T &node) { return node.my_predecessors.empty(); }
499 struct test_reversal<tbb::flow::rejecting, T> {
500 test_reversal() { REMARK("<rejecting>"); }
501 bool operator()( T &node) { return !node.my_predecessors.empty(); }
506 TestMultifunctionNode() {
507 typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type;
508 REMARK("Testing multifunction_node");
509 test_reversal<P,multinode_type> my_test;
512 multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
513 tbb::flow::queue_node<int> qin(g);
514 tbb::flow::queue_node<int> qodd_out(g);
515 tbb::flow::queue_node<int> qeven_out(g);
516 tbb::flow::make_edge(qin,mf);
517 tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
518 tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
520 for( int ii = 0; ii < 2 ; ++ii) {
521 serial_fn_state0 = 0;
522 if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");
524 // wait for node to be active
525 BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put");
527 BACKOFF_WAIT((!my_test(mf)), "Timed out waiting");
528 ASSERT(my_test(mf), "fail second put test");
529 g.my_root_task->cancel_group_execution();
531 serial_fn_state0 = 2;
533 ASSERT(my_test(mf), "fail cancel group test");
535 REMARK(" rf_clear_edges");
536 g.reset(tbb::flow::rf_clear_edges);
537 ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)");
538 ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)");
544 ASSERT(mf.my_predecessors.empty(), "edge didn't reset");
545 ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset");
550 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
551 // never allows a successor to reverse its edge, so we only need test the successors.
555 typedef tbb::flow::indexer_node< int, int > indexernode_type;
556 indexernode_type inode(g);
557 REMARK("Testing indexer_node:");
558 tbb::flow::queue_node<indexernode_type::output_type> qout(g);
559 tbb::flow::make_edge(inode,qout);
561 ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing");
563 ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset");
564 g.reset(tbb::flow::rf_clear_edges);
565 ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)");
569 template<typename Node>
571 TestScalarNode(const char *name) {
574 tbb::flow::queue_node<int> qout(g);
575 REMARK("Testing %s:", name);
576 tbb::flow::make_edge(on,qout);
578 ASSERT(!on.my_successors.empty(), "edge not added");
580 ASSERT(!on.my_successors.empty(), "edge improperly removed");
581 g.reset(tbb::flow::rf_clear_edges);
582 ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
587 size_t operator()(const int &in) {
588 return size_t(in / 3);
592 // sequencer_node behaves like a queueing node, but requires a different constructor.
594 TestSequencerNode() {
596 tbb::flow::sequencer_node<int> bnode(g, seq_body());
597 REMARK("Testing sequencer_node:");
598 tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
599 REMARK("Testing sequencer_node:");
600 serial_fn_state0 = 0; // reset to waiting state.
601 REMARK(" make_edge");
602 tbb::flow::make_edge(bnode, fnode);
603 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
605 bnode.try_put(0); // will forward to the fnode
606 BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node"); // wait for the function_node to fire up
607 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
608 serial_fn_state0 = 0;
610 REMARK(" remove_edge");
611 tbb::flow::remove_edge(bnode, fnode);
612 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
613 tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
614 tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode)); // will spawn a task
616 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
618 bnode.try_put(3); // the edge should reverse
620 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
623 g.reset(); // should be in forward direction again
624 ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
625 REMARK(" remove_edge");
626 g.reset(tbb::flow::rf_clear_edges); // should be in forward direction again
627 ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
628 ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)");
636 snode_body( const int &in) : max_cnt(in) { my_cnt = 0; }
637 bool operator()(int &out) {
638 if(max_cnt <= my_cnt++) return false;
647 tbb::flow::source_node<int> sn(g, snode_body(4), false);
648 REMARK("Testing source_node:");
649 tbb::flow::queue_node<int> qin(g);
650 tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g);
651 tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g);
653 REMARK(" make_edges");
654 tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
655 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
656 tbb::flow::make_edge(jn,qout);
657 ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge");
660 ASSERT(!sn.my_successors.empty(), "source node has no successor after reset");
662 g.reset(tbb::flow::rf_clear_edges);
663 ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)");
664 tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
665 tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
666 tbb::flow::make_edge(jn,qout);
669 sn.activate(); // will forward to the fnode
671 BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse");
672 ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message");
676 ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset");
677 ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset.");
683 if(MinThread < 3) MinThread = 3;
684 tbb::task_scheduler_init init(MinThread); // tests presume at least three threads
686 TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
687 TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
688 TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
691 TestMultifunctionNode<tbb::flow::rejecting>();
692 TestMultifunctionNode<tbb::flow::queueing>();
702 TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
703 TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
704 TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
706 return Harness::Done;