2929455e437a805c1affaa0c2bbe447c6a2cac56
[platform/upstream/tbb.git] / src / test / test_flow_graph_whitebox.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #define HARNESS_DEFAULT_MIN_THREADS 3
18 #define HARNESS_DEFAULT_MAX_THREADS 4
19
20 #if _MSC_VER
21     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
22     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
23         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
24         #pragma warning (disable: 4702)
25     #endif
26 #endif
27
28 #include "harness.h"
29 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
30
31 // need these to get proper external names for private methods in library.
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
34 #include "tbb/task.h"
35 #include "tbb/task_arena.h"
36
37 #define private public
38 #define protected public
39 #include "tbb/flow_graph.h"
40 #undef protected
41 #undef private
42 #include "tbb/task_scheduler_init.h"
43 #include "harness_graph.h"
44
45 template<typename T>
46 struct receiverBody {
47     tbb::flow::continue_msg operator()(const T &/*in*/) {
48         return tbb::flow::continue_msg();
49     }
50 };
51
52 // split_nodes cannot have predecessors
53 // they do not reject messages and always forward.
54 // they reject edge reversals from successors.
55 void TestSplitNode() {
56     typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type;
57     tbb::flow::graph g;
58     snode_type snode(g);
59     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
60     REMARK("Testing split_node\n");
61     ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors");
62     // tbb::flow::output_port<0>(snode)
63     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
64     ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor.");
65     snode.try_put(tbb::flow::tuple<int>(1));
66     g.wait_for_all();
67     g.reset();
68     ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor.");
69     g.reset(tbb::flow::rf_clear_edges);
70     ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor.");
71 }
72
73 // buffering nodes cannot have predecessors
74 // they do not reject messages and always save or forward
75 // they allow edge reversals from successors
76 template< typename B >
77 void TestBufferingNode(const char * name) {
78     tbb::flow::graph g;
79     B                bnode(g);
80     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
81     REMARK("Testing %s:", name);
82     for(int icnt = 0; icnt < 2; icnt++) {
83         bool reverse_edge = (icnt & 0x2) != 0;
84         serial_fn_state0 = 0;  // reset to waiting state.
85         REMARK(" make_edge");
86         tbb::flow::make_edge(bnode, fnode);
87         ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
88         REMARK(" try_put");
89         bnode.try_put(1);  // will forward to the fnode
90         BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put");
91         if(reverse_edge) {
92             REMARK(" try_put2");
93             bnode.try_put(2);  // will reverse the edge
94             // cannot do a wait_for_all here; the function_node is still executing
95             BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put");
96             // at this point the only task running is the one for the function_node.
97             ASSERT(bnode.my_successors.empty(), "successor not removed");
98         }
99         else {
100             ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
101         }
102         serial_fn_state0 = 0;  // release the function_node.
103         if(reverse_edge) {
104             // have to do a second release because the function_node will get the 2nd item
105             BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put");
106             serial_fn_state0 = 0;  // release the function_node.
107         }
108         g.wait_for_all();
109         REMARK(" remove_edge");
110         tbb::flow::remove_edge(bnode, fnode);
111         ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
112     }
113     tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
114     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
115     g.wait_for_all();
116     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
117     REMARK(" reverse");
118     bnode.try_put(1);  // the edge should reverse
119     g.wait_for_all();
120     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
121     REMARK(" reset()");
122     g.wait_for_all();
123     g.reset();  // should be in forward direction again
124     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
125     REMARK(" remove_edge");
126     g.reset(tbb::flow::rf_clear_edges);
127     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
128     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
129     // reverse edge by adding to buffer.
130     bnode.try_put(1);  // the edge should reverse
131     g.wait_for_all();
132     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
133     REMARK(" remove_edge(reversed)");
134     g.reset(tbb::flow::rf_clear_edges);
135     ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()");
136     ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset");
137     REMARK("  done\n");
138     g.wait_for_all();
139 }
140
141 // continue_node has only predecessor count
142 // they do not have predecessors, only the counts
143 // successor edges cannot be reversed
144 void TestContinueNode() {
145     tbb::flow::graph g;
146     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
147     tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0));
148     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
149     tbb::flow::make_edge(fnode0, cnode);
150     tbb::flow::make_edge(cnode, fnode1);
151     REMARK("Testing continue_node:");
152     for( int icnt = 0; icnt < 2; ++icnt ) {
153         REMARK( " initial%d", icnt);
154         ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count");
155         ASSERT(!cnode.successors().empty(), "successors empty though we added one");
156         ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect");
157         serial_continue_state0 = 0;
158         serial_fn_state0 = 0;
159         serial_fn_state1 = 0;
160
161         fnode0.try_put(1);  // start the first function node.
162         BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start");
163         // Now the body of function_node 0 is executing.
164         serial_fn_state0 = 0;  // release the node
165         // wait for node to count the message (or for the node body to execute, which would be wrong)
166         BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change");
167         ASSERT(serial_continue_state0 == 0, "Improperly released continue_node");
168         ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect");
169         if(icnt == 0) {  // first time through, let the continue_node fire
170             REMARK(" firing");
171             fnode0.try_put(1);  // second message
172             BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute");
173             // Now the body of function_node 0 is executing.
174             serial_fn_state0 = 0;  // release the node
175
176             BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start");  // now we wait for the continue_node.
177             ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started");
178             serial_continue_state0 = 0;  // release the continue_node
179             BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start");    // wait for the successor function_node to enter body
180             serial_fn_state1 = 0;  // release successor function_node.
181             g.wait_for_all();
182
183             // try a try_get()
184             {
185                 int i;
186                 ASSERT(!cnode.try_get(i), "try_get not rejected");
187             }
188
189             REMARK(" reset");
190             ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
191             ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
192             g.reset();  // should still be the same
193             ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" );
194             ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)");
195         }
196         else {  // we're going to see if the rf_clear_edges resets things.
197             g.wait_for_all();
198             REMARK(" reset(rf_clear_edges)");
199             ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
200             ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
201             g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
202             ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)");
203             ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
204             ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset");
205         }
206     }
207
208     REMARK(" done\n");
209
210 }
211
212 // function_node has predecessors and successors
213 // try_get() rejects
214 // successor edges cannot be reversed
215 // predecessors will reverse (only rejecting will reverse)
216 void TestFunctionNode() {
217     tbb::flow::graph g;
218     tbb::flow::queue_node<int> qnode0(g);
219     tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
220     // queueing function node
221     tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
222
223     tbb::flow::queue_node<int> qnode1(g);
224
225     tbb::flow::make_edge(fnode0, qnode1);
226     tbb::flow::make_edge(qnode0, fnode0);
227
228     serial_fn_state0 = 2;  // just let it go
229     // see if the darned thing will work....
230     qnode0.try_put(1);
231     g.wait_for_all();
232     int ii;
233     ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
234     tbb::flow::remove_edge(qnode0, fnode0);
235     tbb::flow::remove_edge(fnode0, qnode1);
236
237     tbb::flow::make_edge(fnode1, qnode1);
238     tbb::flow::make_edge(qnode0, fnode1);
239
240     serial_fn_state0 = 2;  // just let it go
241     // see if the darned thing will work....
242     qnode0.try_put(1);
243     g.wait_for_all();
244     ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
245     tbb::flow::remove_edge(qnode0, fnode1);
246     tbb::flow::remove_edge(fnode1, qnode1);
247
248     // rejecting
249     serial_fn_state0 = 0;
250     tbb::flow::make_edge(fnode0, qnode1);
251     tbb::flow::make_edge(qnode0, fnode0);
252     REMARK("Testing rejecting function_node:");
253     ASSERT(!fnode0.my_queue, "node should have no queue");
254     ASSERT(!fnode0.my_successors.empty(), "successor edge not added");
255     qnode0.try_put(1);
256     BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start");
257     qnode0.try_put(2);   // rejecting node should reject, reverse.
258     BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---");
259     serial_fn_state0 = 2;   // release function_node body.
260     g.wait_for_all();
261     REMARK(" reset");
262     g.reset();  // should reverse the edge from the input to the function node.
263     ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
264     ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed");
265     tbb::flow::remove_edge(qnode0, fnode0);
266     tbb::flow::remove_edge(fnode0, qnode1);
267     REMARK("\n");
268
269     // queueing
270     tbb::flow::make_edge(fnode1, qnode1);
271     REMARK("Testing queueing function_node:");
272     ASSERT(fnode1.my_queue, "node should have no queue");
273     ASSERT(!fnode1.my_successors.empty(), "successor edge not added");
274     REMARK(" add_pred");
275     ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor");
276     ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor");
277     REMARK(" reset");
278     g.wait_for_all();
279     g.reset();  // should reverse the edge from the input to the function node.
280     ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
281     ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed");
282     tbb::flow::remove_edge(qnode0, fnode1);
283     tbb::flow::remove_edge(fnode1, qnode1);
284     REMARK("\n");
285
286     serial_fn_state0 = 0;  // make the function_node wait
287     tbb::flow::make_edge(qnode0, fnode0);
288     REMARK(" start_func");
289     qnode0.try_put(1);
290     BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put");
291     // now if we put an item to the queues the edges to the function_node will reverse.
292     REMARK(" put_node(2)");
293     qnode0.try_put(2);   // start queue node.
294     // wait for the edges to reverse
295     BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting");
296     ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed");
297     g.my_root_task->cancel_group_execution();
298     // release the function_node
299     serial_fn_state0 = 2;
300     g.wait_for_all();
301     ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed");
302     g.reset(tbb::flow::rf_clear_edges);
303     ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed");
304     ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed");
305     REMARK(" done\n");
306 }
307
308 template<typename TT>
309 class tag_func {
310     TT my_mult;
311 public:
312     tag_func(TT multiplier) : my_mult(multiplier) { }
313     void operator=( const tag_func& other){my_mult = other.my_mult;}
314     // operator() will return [0 .. Count)
315     tbb::flow::tag_value operator()( TT v) {
316         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
317         return t;
318     }
319 };
320
321 template<typename JNODE_TYPE>
322 void
323 TestSimpleSuccessorArc(const char *name) {
324     tbb::flow::graph g;
325     {
326         REMARK("Join<%s> successor test ", name);
327         tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g);
328         tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g);
329         tbb::flow::make_edge(qj, bnode);
330         ASSERT(!qj.my_successors.empty(),"successor missing after linking");
331         g.reset();
332         ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
333         g.reset(tbb::flow::rf_clear_edges);
334         ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
335     }
336 }
337
338 template<>
339 void
340 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
341     tbb::flow::graph g;
342     {
343         REMARK("Join<%s> successor test ", name);
344         typedef tbb::flow::tuple<int,int> my_tuple;
345         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
346                 tag_func<int>(1),
347                 tag_func<int>(1)
348                 );
349         tbb::flow::broadcast_node<my_tuple > bnode(g);
350         tbb::flow::make_edge(qj, bnode);
351         ASSERT(!qj.my_successors.empty(),"successor missing after linking");
352         g.reset();
353         ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
354         g.reset(tbb::flow::rf_clear_edges);
355         ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
356     }
357 }
358
359 void
360 TestJoinNode() {
361     tbb::flow::graph g;
362
363     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
364     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
365     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
366
367     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
368     REMARK(" reserving preds");
369     {
370         tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g);
371         tbb::flow::queue_node<int> q0(g);
372         tbb::flow::queue_node<int> q1(g);
373         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
374         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
375         q0.try_put(1);
376         g.wait_for_all();  // quiesce
377         ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor");
378         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred");
379         g.reset();
380         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
381         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
382         q1.try_put(2);
383         g.wait_for_all();  // quiesce
384         ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
385         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
386         g.reset();
387         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
388         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
389         // should reset predecessors just as regular reset.
390         q1.try_put(3);
391         g.wait_for_all();  // quiesce
392         ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
393         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
394         g.reset(tbb::flow::rf_clear_edges);
395         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
396         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
397         ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
398         ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
399     }
400     REMARK(" done\n");
401 }
402
403 void
404 TestLimiterNode() {
405     int out_int;
406     tbb::flow::graph g;
407     tbb::flow::limiter_node<int> ln(g,1);
408     REMARK("Testing limiter_node: preds and succs");
409     ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
410     ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
411     ASSERT(ln.decrement.my_current_count == 0, "error in current count");
412 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
413     ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
414 #endif
415     ASSERT(ln.my_threshold == 1, "error in my_threshold");
416     tbb::flow::queue_node<int> inq(g);
417     tbb::flow::queue_node<int> outq(g);
418     tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g);
419
420     tbb::flow::make_edge(inq,ln);
421     tbb::flow::make_edge(ln,outq);
422     tbb::flow::make_edge(bn,ln.decrement);
423
424     g.wait_for_all();
425     ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge");
426     ASSERT(ln.my_predecessors.empty(), "input edge reversed");
427     inq.try_put(1);
428     g.wait_for_all();
429     ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value");
430     ASSERT(ln.my_predecessors.empty(), "input edge reversed");
431     inq.try_put(2);
432     g.wait_for_all();
433     ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input");
434     ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed");
435     bn.try_put(tbb::flow::continue_msg());
436     g.wait_for_all();
437     ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value");
438     g.wait_for_all();
439     ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())");
440     g.reset();
441     ASSERT(ln.my_predecessors.empty(), "input edge not reset");
442     inq.try_put(3);
443     g.wait_for_all();
444     ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value");
445
446     REMARK(" rf_clear_edges");
447     // currently the limiter_node will not pass another message
448     g.reset(tbb::flow::rf_clear_edges);
449     ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
450     ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
451     ASSERT(ln.decrement.my_current_count == 0, "error in current count");
452 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
453     ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
454 #endif
455     ASSERT(ln.my_threshold == 1, "error in my_threshold");
456     ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)");
457     ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)");
458     ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
459     ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
460     ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)");
461     tbb::flow::make_edge(inq,ln);
462     tbb::flow::make_edge(ln,outq);
463     inq.try_put(4);
464     inq.try_put(5);
465     g.wait_for_all();
466     ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)");
467     ASSERT(out_int == 4, "input incorrect (4)");
468     bn.try_put(tbb::flow::continue_msg());
469     g.wait_for_all();
470     ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)");
471     REMARK(" done\n");
472 }
473
474 template<typename MF_TYPE>
475 struct mf_body {
476     tbb::atomic<int> *_flag;
477     mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { }
478     void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) {
479         if(*_flag == 0) {
480             *_flag = 1;
481             BACKOFF_WAIT(*_flag == 1, "multifunction_node not released");
482         }
483
484         if(in & 0x1) tbb::flow::get<1>(outports).try_put(in);
485         else         tbb::flow::get<0>(outports).try_put(in);
486     }
487 };
488
489 template<typename P, typename T>
490 struct test_reversal;
491 template<typename T>
492 struct test_reversal<tbb::flow::queueing, T> {
493     test_reversal() { REMARK("<queueing>"); }
494     // queueing node will not reverse.
495     bool operator()( T &node) { return node.my_predecessors.empty(); }
496 };
497
498 template<typename T>
499 struct test_reversal<tbb::flow::rejecting, T> {
500     test_reversal() { REMARK("<rejecting>"); }
501     bool operator()( T &node) { return !node.my_predecessors.empty(); }
502 };
503
504 template<typename P>
505 void
506 TestMultifunctionNode() {
507     typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type;
508     REMARK("Testing multifunction_node");
509     test_reversal<P,multinode_type> my_test;
510     REMARK(":");
511     tbb::flow::graph g;
512     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
513     tbb::flow::queue_node<int> qin(g);
514     tbb::flow::queue_node<int> qodd_out(g);
515     tbb::flow::queue_node<int> qeven_out(g);
516     tbb::flow::make_edge(qin,mf);
517     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
518     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
519     g.wait_for_all();
520     for( int ii = 0; ii < 2 ; ++ii) {
521         serial_fn_state0 = 0;
522         if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");
523         qin.try_put(0);
524         // wait for node to be active
525         BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put");
526         qin.try_put(1);
527         BACKOFF_WAIT((!my_test(mf)), "Timed out waiting");
528         ASSERT(my_test(mf), "fail second put test");
529         g.my_root_task->cancel_group_execution();
530         // release node
531         serial_fn_state0 = 2;
532         g.wait_for_all();
533         ASSERT(my_test(mf), "fail cancel group test");
534         if( ii == 1) {
535             REMARK(" rf_clear_edges");
536             g.reset(tbb::flow::rf_clear_edges);
537             ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)");
538             ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)");
539         }
540         else
541         {
542             g.reset();
543         }
544         ASSERT(mf.my_predecessors.empty(), "edge didn't reset");
545         ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset");
546     }
547     REMARK(" done\n");
548 }
549
550 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
551 // never allows a successor to reverse its edge, so we only need test the successors.
552 void
553 TestIndexerNode() {
554     tbb::flow::graph g;
555     typedef tbb::flow::indexer_node< int, int > indexernode_type;
556     indexernode_type inode(g);
557     REMARK("Testing indexer_node:");
558     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
559     tbb::flow::make_edge(inode,qout);
560     g.wait_for_all();
561     ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing");
562     g.reset();
563     ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset");
564     g.reset(tbb::flow::rf_clear_edges);
565     ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)");
566     REMARK(" done\n");
567 }
568
569 template<typename Node>
570 void
571 TestScalarNode(const char *name) {
572     tbb::flow::graph g;
573     Node on(g);
574     tbb::flow::queue_node<int> qout(g);
575     REMARK("Testing %s:", name);
576     tbb::flow::make_edge(on,qout);
577     g.wait_for_all();
578     ASSERT(!on.my_successors.empty(), "edge not added");
579     g.reset();
580     ASSERT(!on.my_successors.empty(), "edge improperly removed");
581     g.reset(tbb::flow::rf_clear_edges);
582     ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
583     REMARK(" done\n");
584 }
585
586 struct seq_body {
587     size_t operator()(const int &in) {
588         return size_t(in / 3);
589     }
590 };
591
592 // sequencer_node behaves like a queueing node, but requires a different constructor.
593 void
594 TestSequencerNode() {
595     tbb::flow::graph g;
596     tbb::flow::sequencer_node<int> bnode(g, seq_body());
597     REMARK("Testing sequencer_node:");
598     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
599     REMARK("Testing sequencer_node:");
600     serial_fn_state0 = 0;  // reset to waiting state.
601     REMARK(" make_edge");
602     tbb::flow::make_edge(bnode, fnode);
603     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
604     REMARK(" try_put");
605     bnode.try_put(0);  // will forward to the fnode
606     BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node");  // wait for the function_node to fire up
607     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
608     serial_fn_state0 = 0;
609     g.wait_for_all();
610     REMARK(" remove_edge");
611     tbb::flow::remove_edge(bnode, fnode);
612     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
613     tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
614     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
615     g.wait_for_all();
616     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
617     REMARK(" reverse");
618     bnode.try_put(3);  // the edge should reverse
619     g.wait_for_all();
620     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
621     REMARK(" reset()");
622     g.wait_for_all();
623     g.reset();  // should be in forward direction again
624     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
625     REMARK(" remove_edge");
626     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
627     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
628     ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)");
629     REMARK("  done\n");
630     g.wait_for_all();
631 }
632
633 struct snode_body {
634     int max_cnt;
635     int my_cnt;
636     snode_body( const int &in) : max_cnt(in) { my_cnt = 0; }
637     bool operator()(int &out) {
638         if(max_cnt <= my_cnt++) return false;
639         out = my_cnt;
640         return true;
641     }
642 };
643
644 void
645 TestSourceNode() {
646     tbb::flow::graph g;
647     tbb::flow::source_node<int> sn(g, snode_body(4), false);
648     REMARK("Testing source_node:");
649     tbb::flow::queue_node<int> qin(g);
650     tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g);
651     tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g);
652
653     REMARK(" make_edges");
654     tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
655     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
656     tbb::flow::make_edge(jn,qout);
657     ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge");
658     g.wait_for_all();
659     g.reset();
660     ASSERT(!sn.my_successors.empty(), "source node has no successor after reset");
661     g.wait_for_all();
662     g.reset(tbb::flow::rf_clear_edges);
663     ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)");
664     tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
665     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
666     tbb::flow::make_edge(jn,qout);
667     g.wait_for_all();
668     REMARK(" activate");
669     sn.activate();  // will forward to the fnode
670     REMARK(" wait1");
671     BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse");
672     ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message");
673
674     g.wait_for_all();
675     g.reset();
676     ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset");
677     ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset.");
678     REMARK(" done\n");
679 }
680
681 int TestMain() {
682
683     if(MinThread < 3) MinThread = 3;
684     tbb::task_scheduler_init init(MinThread);  // tests presume at least three threads
685
686     TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
687     TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
688     TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
689     TestSequencerNode();
690
691     TestMultifunctionNode<tbb::flow::rejecting>();
692     TestMultifunctionNode<tbb::flow::queueing>();
693     TestSourceNode();
694     TestContinueNode();
695     TestFunctionNode();
696
697     TestJoinNode();
698
699     TestLimiterNode();
700     TestIndexerNode();
701     TestSplitNode();
702     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
703     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
704     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
705
706     return Harness::Done;
707 }
708