Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / src / test / test_flow_graph_whitebox.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #define HARNESS_DEFAULT_MIN_THREADS 3
18 #define HARNESS_DEFAULT_MAX_THREADS 4
19
20 #if _MSC_VER
21     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
22     #if _MSC_VER==1700 && !defined(__INTEL_COMPILER)
23         // Suppress "unreachable code" warning by VC++ 17.0 (VS 2012)
24         #pragma warning (disable: 4702)
25     #endif
26 #endif
27
28 #include "harness.h"
29 #include <string> // merely prevents LNK2001 error to happen (on ICL+VC9 configurations)
30
31 // need these to get proper external names for private methods in library.
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
34 #include "tbb/task.h"
35 #include "tbb/task_arena.h"
36
37 #define private public
38 #define protected public
39 #include "tbb/flow_graph.h"
40 #undef protected
41 #undef private
42 #include "tbb/task_scheduler_init.h"
43 #include "harness_graph.h"
44
45 template<typename T>
46 struct receiverBody {
47     tbb::flow::continue_msg operator()(const T &/*in*/) {
48         return tbb::flow::continue_msg();
49     }
50 };
51
52 // split_nodes cannot have predecessors
53 // they do not reject messages and always forward.
54 // they reject edge reversals from successors.
55 void TestSplitNode() {
56     typedef tbb::flow::split_node<tbb::flow::tuple<int> > snode_type;
57     tbb::flow::graph g;
58     snode_type snode(g);
59     tbb::flow::function_node<int> rcvr(g,tbb::flow::unlimited, receiverBody<int>());
60     REMARK("Testing split_node\n");
61     ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "Constructed split_node has successors");
62     // tbb::flow::output_port<0>(snode)
63     tbb::flow::make_edge(tbb::flow::output_port<0>(snode), rcvr);
64     ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after make_edge, split_node has no successor.");
65     snode.try_put(tbb::flow::tuple<int>(1));
66     g.wait_for_all();
67     g.reset();
68     ASSERT(!(tbb::flow::output_port<0>(snode).my_successors.empty()), "after reset(), split_node has no successor.");
69     g.reset(tbb::flow::rf_clear_edges);
70     ASSERT(tbb::flow::output_port<0>(snode).my_successors.empty(), "after reset(rf_clear_edges), split_node has a successor.");
71 }
72
73 // buffering nodes cannot have predecessors
74 // they do not reject messages and always save or forward
75 // they allow edge reversals from successors
76 template< typename B >
77 void TestBufferingNode(const char * name) {
78     tbb::flow::graph g;
79     B                bnode(g);
80     tbb::flow::function_node<int,int,tbb::flow::rejecting> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
81     REMARK("Testing %s:", name);
82     for(int icnt = 0; icnt < 2; icnt++) {
83         bool reverse_edge = (icnt & 0x2) != 0;
84         serial_fn_state0 = 0;  // reset to waiting state.
85         REMARK(" make_edge");
86         tbb::flow::make_edge(bnode, fnode);
87         ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
88         REMARK(" try_put");
89         bnode.try_put(1);  // will forward to the fnode
90         BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting for first put");
91         if(reverse_edge) {
92             REMARK(" try_put2");
93             bnode.try_put(2);  // will reverse the edge
94             // cannot do a wait_for_all here; the function_node is still executing
95             BACKOFF_WAIT(!bnode.my_successors.empty(), "Timed out waiting after 2nd put");
96             // at this point the only task running is the one for the function_node.
97             ASSERT(bnode.my_successors.empty(), "successor not removed");
98         }
99         else {
100             ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
101         }
102         serial_fn_state0 = 0;  // release the function_node.
103         if(reverse_edge) {
104             // have to do a second release because the function_node will get the 2nd item
105             BACKOFF_WAIT( serial_fn_state0 == 0, "Timed out waiting after 2nd put");
106             serial_fn_state0 = 0;  // release the function_node.
107         }
108         g.wait_for_all();
109         REMARK(" remove_edge");
110         tbb::flow::remove_edge(bnode, fnode);
111         ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
112     }
113     tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
114     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
115     g.wait_for_all();
116     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
117     REMARK(" reverse");
118     bnode.try_put(1);  // the edge should reverse
119     g.wait_for_all();
120     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
121     REMARK(" reset()");
122     g.wait_for_all();
123     g.reset();  // should be in forward direction again
124     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
125     REMARK(" remove_edge");
126     g.reset(tbb::flow::rf_clear_edges);
127     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
128     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // add edge again
129     // reverse edge by adding to buffer.
130     bnode.try_put(1);  // the edge should reverse
131     g.wait_for_all();
132     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
133     REMARK(" remove_edge(reversed)");
134     g.reset(tbb::flow::rf_clear_edges);
135     ASSERT(bnode.my_successors.empty(), "buffering node has no successor after reset()");
136     ASSERT(tbb::flow::input_port<0>(jnode).my_predecessors.empty(), "predecessor not reset");
137     REMARK("  done\n");
138     g.wait_for_all();
139 }
140
141 // continue_node has only predecessor count
142 // they do not have predecessors, only the counts
143 // successor edges cannot be reversed
144 void TestContinueNode() {
145     tbb::flow::graph g;
146     tbb::flow::function_node<int> fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
147     tbb::flow::continue_node<int> cnode(g, 1, serial_continue_body<int>(serial_continue_state0));
148     tbb::flow::function_node<int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state1));
149     tbb::flow::make_edge(fnode0, cnode);
150     tbb::flow::make_edge(cnode, fnode1);
151     REMARK("Testing continue_node:");
152     for( int icnt = 0; icnt < 2; ++icnt ) {
153         REMARK( " initial%d", icnt);
154         ASSERT(cnode.my_predecessor_count == 2, "predecessor addition didn't increment count");
155         ASSERT(!cnode.successors().empty(), "successors empty though we added one");
156         ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect");
157         serial_continue_state0 = 0;
158         serial_fn_state0 = 0;
159         serial_fn_state1 = 0;
160
161         fnode0.try_put(1);  // start the first function node.
162         BACKOFF_WAIT(!serial_fn_state0, "Timed out waiting for function_node to start");
163         // Now the body of function_node 0 is executing.
164         serial_fn_state0 = 0;  // release the node
165         // wait for node to count the message (or for the node body to execute, which would be wrong)
166         BACKOFF_WAIT(serial_continue_state0 == 0 && cnode.my_current_count == 0, "Timed out waiting for continue_state0 to change");
167         ASSERT(serial_continue_state0 == 0, "Improperly released continue_node");
168         ASSERT(cnode.my_current_count == 1, "state of continue_receiver incorrect");
169         if(icnt == 0) {  // first time through, let the continue_node fire
170             REMARK(" firing");
171             fnode0.try_put(1);  // second message
172             BACKOFF_WAIT(serial_fn_state0 == 0, "timeout waiting for continue_body to execute");
173             // Now the body of function_node 0 is executing.
174             serial_fn_state0 = 0;  // release the node
175
176             BACKOFF_WAIT(!serial_continue_state0,"continue_node didn't start");  // now we wait for the continue_node.
177             ASSERT(cnode.my_current_count == 0, " my_current_count not reset before body of continue_node started");
178             serial_continue_state0 = 0;  // release the continue_node
179             BACKOFF_WAIT(!serial_fn_state1,"successor function_node didn't start");    // wait for the successor function_node to enter body
180             serial_fn_state1 = 0;  // release successor function_node.
181             g.wait_for_all();
182
183             // try a try_get()
184             {
185                 int i;
186                 ASSERT(!cnode.try_get(i), "try_get not rejected");
187             }
188
189             REMARK(" reset");
190             ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
191             ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
192             g.reset();  // should still be the same
193             ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (after reset)" );
194             ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (after reset)");
195         }
196         else {  // we're going to see if the rf_clear_edges resets things.
197             g.wait_for_all();
198             REMARK(" reset(rf_clear_edges)");
199             ASSERT(!cnode.my_successors.empty(), "Empty successors in built graph (before reset)");
200             ASSERT(cnode.my_predecessor_count == 2, "predecessor_count reset (before reset)");
201             g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
202             ASSERT(cnode.my_current_count == 0, "state of continue_receiver incorrect after reset(rf_clear_edges)");
203             ASSERT(cnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
204             ASSERT(cnode.my_predecessor_count == cnode.my_initial_predecessor_count, "predecessor count not reset");
205         }
206     }
207
208     REMARK(" done\n");
209
210 }
211
212 // function_node has predecessors and successors
213 // try_get() rejects
214 // successor edges cannot be reversed
215 // predecessors will reverse (only rejecting will reverse)
216 void TestFunctionNode() {
217     tbb::flow::graph g;
218     tbb::flow::queue_node<int> qnode0(g);
219     tbb::flow::function_node<int,int, tbb::flow::rejecting > fnode0(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
220     // queueing function node
221     tbb::flow::function_node<int,int> fnode1(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
222
223     tbb::flow::queue_node<int> qnode1(g);
224
225     tbb::flow::make_edge(fnode0, qnode1);
226     tbb::flow::make_edge(qnode0, fnode0);
227
228     serial_fn_state0 = 2;  // just let it go
229     // see if the darned thing will work....
230     qnode0.try_put(1);
231     g.wait_for_all();
232     int ii;
233     ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
234     tbb::flow::remove_edge(qnode0, fnode0);
235     tbb::flow::remove_edge(fnode0, qnode1);
236
237     tbb::flow::make_edge(fnode1, qnode1);
238     tbb::flow::make_edge(qnode0, fnode1);
239
240     serial_fn_state0 = 2;  // just let it go
241     // see if the darned thing will work....
242     qnode0.try_put(1);
243     g.wait_for_all();
244     ASSERT(qnode1.try_get(ii) && ii == 1, "output not passed");
245     tbb::flow::remove_edge(qnode0, fnode1);
246     tbb::flow::remove_edge(fnode1, qnode1);
247
248     // rejecting
249     serial_fn_state0 = 0;
250     tbb::flow::make_edge(fnode0, qnode1);
251     tbb::flow::make_edge(qnode0, fnode0);
252     REMARK("Testing rejecting function_node:");
253     ASSERT(!fnode0.my_queue, "node should have no queue");
254     ASSERT(!fnode0.my_successors.empty(), "successor edge not added");
255     qnode0.try_put(1);
256     BACKOFF_WAIT(!serial_fn_state0,"rejecting function_node didn't start");
257     qnode0.try_put(2);   // rejecting node should reject, reverse.
258     BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Missing predecessor ---");
259     serial_fn_state0 = 2;   // release function_node body.
260     g.wait_for_all();
261     REMARK(" reset");
262     g.reset();  // should reverse the edge from the input to the function node.
263     ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
264     ASSERT(fnode0.my_predecessors.empty(), "predecessor not reversed");
265     tbb::flow::remove_edge(qnode0, fnode0);
266     tbb::flow::remove_edge(fnode0, qnode1);
267     REMARK("\n");
268
269     // queueing
270     tbb::flow::make_edge(fnode1, qnode1);
271     REMARK("Testing queueing function_node:");
272     ASSERT(fnode1.my_queue, "node should have no queue");
273     ASSERT(!fnode1.my_successors.empty(), "successor edge not added");
274     REMARK(" add_pred");
275     ASSERT(fnode1.register_predecessor(qnode0), "Cannot register as predecessor");
276     ASSERT(!fnode1.my_predecessors.empty(), "Missing predecessor");
277     REMARK(" reset");
278     g.wait_for_all();
279     g.reset();  // should reverse the edge from the input to the function node.
280     ASSERT(!qnode0.my_successors.empty(), "empty successors after reset()");
281     ASSERT(fnode1.my_predecessors.empty(), "predecessor not reversed");
282     tbb::flow::remove_edge(qnode0, fnode1);
283     tbb::flow::remove_edge(fnode1, qnode1);
284     REMARK("\n");
285
286     serial_fn_state0 = 0;  // make the function_node wait
287     tbb::flow::make_edge(qnode0, fnode0);
288     REMARK(" start_func");
289     qnode0.try_put(1);
290     BACKOFF_WAIT(serial_fn_state0 == 0, "Timed out waiting after 1st put");
291     // now if we put an item to the queues the edges to the function_node will reverse.
292     REMARK(" put_node(2)");
293     qnode0.try_put(2);   // start queue node.
294     // wait for the edges to reverse
295     BACKOFF_WAIT(fnode0.my_predecessors.empty(), "Timed out waiting");
296     ASSERT(!fnode0.my_predecessors.empty(), "function_node edge not reversed");
297     g.my_root_task->cancel_group_execution();
298     // release the function_node
299     serial_fn_state0 = 2;
300     g.wait_for_all();
301     ASSERT(!fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not reversed");
302     g.reset(tbb::flow::rf_clear_edges);
303     ASSERT(fnode0.my_predecessors.empty() && qnode0.my_successors.empty(), "function_node edge not removed");
304     ASSERT(fnode0.my_successors.empty(), "successor to fnode not removed");
305     REMARK(" done\n");
306 }
307
308 template<typename TT>
309 class tag_func {
310     TT my_mult;
311 public:
312     tag_func(TT multiplier) : my_mult(multiplier) { }
313     // operator() will return [0 .. Count)
314     tbb::flow::tag_value operator()( TT v) {
315         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
316         return t;
317     }
318 };
319
320 template<typename JNODE_TYPE>
321 void
322 TestSimpleSuccessorArc(const char *name) {
323     tbb::flow::graph g;
324     {
325         REMARK("Join<%s> successor test ", name);
326         tbb::flow::join_node<tbb::flow::tuple<int>, JNODE_TYPE> qj(g);
327         tbb::flow::broadcast_node<tbb::flow::tuple<int> > bnode(g);
328         tbb::flow::make_edge(qj, bnode);
329         ASSERT(!qj.my_successors.empty(),"successor missing after linking");
330         g.reset();
331         ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
332         g.reset(tbb::flow::rf_clear_edges);
333         ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
334     }
335 }
336
337 template<>
338 void
339 TestSimpleSuccessorArc<tbb::flow::tag_matching>(const char *name) {
340     tbb::flow::graph g;
341     {
342         REMARK("Join<%s> successor test ", name);
343         typedef tbb::flow::tuple<int,int> my_tuple;
344         tbb::flow::join_node<my_tuple, tbb::flow::tag_matching> qj(g,
345                 tag_func<int>(1),
346                 tag_func<int>(1)
347                 );
348         tbb::flow::broadcast_node<my_tuple > bnode(g);
349         tbb::flow::make_edge(qj, bnode);
350         ASSERT(!qj.my_successors.empty(),"successor missing after linking");
351         g.reset();
352         ASSERT(!qj.my_successors.empty(),"successor missing after reset()");
353         g.reset(tbb::flow::rf_clear_edges);
354         ASSERT(qj.my_successors.empty(), "successors not removed after reset(rf_clear_edges)");
355     }
356 }
357
358 void
359 TestJoinNode() {
360     tbb::flow::graph g;
361
362     TestSimpleSuccessorArc<tbb::flow::queueing>("queueing");
363     TestSimpleSuccessorArc<tbb::flow::reserving>("reserving");
364     TestSimpleSuccessorArc<tbb::flow::tag_matching>("tag_matching");
365
366     // queueing and tagging join nodes have input queues, so the input ports do not reverse.
367     REMARK(" reserving preds");
368     {
369         tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> rj(g);
370         tbb::flow::queue_node<int> q0(g);
371         tbb::flow::queue_node<int> q1(g);
372         tbb::flow::make_edge(q0,tbb::flow::input_port<0>(rj));
373         tbb::flow::make_edge(q1,tbb::flow::input_port<1>(rj));
374         q0.try_put(1);
375         g.wait_for_all();  // quiesce
376         ASSERT(!(tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port missing predecessor");
377         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred");
378         g.reset();
379         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
380         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
381         q1.try_put(2);
382         g.wait_for_all();  // quiesce
383         ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
384         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
385         g.reset();
386         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
387         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
388         // should reset predecessors just as regular reset.
389         q1.try_put(3);
390         g.wait_for_all();  // quiesce
391         ASSERT(!(tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port missing predecessor");
392         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred");
393         g.reset(tbb::flow::rf_clear_edges);
394         ASSERT((tbb::flow::input_port<1>(rj).my_predecessors.empty()),"reversed port has pred after reset()");
395         ASSERT((tbb::flow::input_port<0>(rj).my_predecessors.empty()),"non-reversed port has pred after reset()");
396         ASSERT(q0.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
397         ASSERT(q1.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
398     }
399     REMARK(" done\n");
400 }
401
402 void
403 TestLimiterNode() {
404     int out_int;
405     tbb::flow::graph g;
406     tbb::flow::limiter_node<int> ln(g,1);
407     REMARK("Testing limiter_node: preds and succs");
408     ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
409     ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
410     ASSERT(ln.decrement.my_current_count == 0, "error in current count");
411 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
412     ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
413 #endif
414     ASSERT(ln.my_threshold == 1, "error in my_threshold");
415     tbb::flow::queue_node<int> inq(g);
416     tbb::flow::queue_node<int> outq(g);
417     tbb::flow::broadcast_node<tbb::flow::continue_msg> bn(g);
418
419     tbb::flow::make_edge(inq,ln);
420     tbb::flow::make_edge(ln,outq);
421     tbb::flow::make_edge(bn,ln.decrement);
422
423     g.wait_for_all();
424     ASSERT(!(ln.my_successors.empty()),"successors empty after make_edge");
425     ASSERT(ln.my_predecessors.empty(), "input edge reversed");
426     inq.try_put(1);
427     g.wait_for_all();
428     ASSERT(outq.try_get(out_int) && out_int == 1, "limiter_node didn't pass first value");
429     ASSERT(ln.my_predecessors.empty(), "input edge reversed");
430     inq.try_put(2);
431     g.wait_for_all();
432     ASSERT(!outq.try_get(out_int), "limiter_node incorrectly passed second input");
433     ASSERT(!ln.my_predecessors.empty(), "input edge to limiter_node not reversed");
434     bn.try_put(tbb::flow::continue_msg());
435     g.wait_for_all();
436     ASSERT(outq.try_get(out_int) && out_int == 2, "limiter_node didn't pass second value");
437     g.wait_for_all();
438     ASSERT(!ln.my_predecessors.empty(), "input edge was reversed(after try_get())");
439     g.reset();
440     ASSERT(ln.my_predecessors.empty(), "input edge not reset");
441     inq.try_put(3);
442     g.wait_for_all();
443     ASSERT(outq.try_get(out_int) && out_int == 3, "limiter_node didn't pass third value");
444
445     REMARK(" rf_clear_edges");
446     // currently the limiter_node will not pass another message
447     g.reset(tbb::flow::rf_clear_edges);
448     ASSERT(ln.decrement.my_predecessor_count == 0, "error in pred count");
449     ASSERT(ln.decrement.my_initial_predecessor_count == 0, "error in initial pred count");
450     ASSERT(ln.decrement.my_current_count == 0, "error in current count");
451 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
452     ASSERT(ln.init_decrement_predecessors == 0, "error in decrement predecessors");
453 #endif
454     ASSERT(ln.my_threshold == 1, "error in my_threshold");
455     ASSERT(ln.my_predecessors.empty(), "preds not reset(rf_clear_edges)");
456     ASSERT(ln.my_successors.empty(), "preds not reset(rf_clear_edges)");
457     ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
458     ASSERT(inq.my_successors.empty(), "Arc not removed on reset(rf_clear_edges)");
459     ASSERT(bn.my_successors.empty(), "control edge not removed on reset(rf_clear_edges)");
460     tbb::flow::make_edge(inq,ln);
461     tbb::flow::make_edge(ln,outq);
462     inq.try_put(4);
463     inq.try_put(5);
464     g.wait_for_all();
465     ASSERT(outq.try_get(out_int),"missing output after reset(rf_clear_edges)");
466     ASSERT(out_int == 4, "input incorrect (4)");
467     bn.try_put(tbb::flow::continue_msg());
468     g.wait_for_all();
469     ASSERT(!outq.try_get(out_int),"second output incorrectly passed (rf_clear_edges)");
470     REMARK(" done\n");
471 }
472
473 template<typename MF_TYPE>
474 struct mf_body {
475     tbb::atomic<int> *_flag;
476     mf_body( tbb::atomic<int> &myatomic) : _flag(&myatomic) { }
477     void operator()( const int& in, typename MF_TYPE::output_ports_type &outports) {
478         if(*_flag == 0) {
479             *_flag = 1;
480             BACKOFF_WAIT(*_flag == 1, "multifunction_node not released");
481         }
482
483         if(in & 0x1) tbb::flow::get<1>(outports).try_put(in);
484         else         tbb::flow::get<0>(outports).try_put(in);
485     }
486 };
487
488 template<typename P, typename T>
489 struct test_reversal;
490 template<typename T>
491 struct test_reversal<tbb::flow::queueing, T> {
492     test_reversal() { REMARK("<queueing>"); }
493     // queueing node will not reverse.
494     bool operator()( T &node) { return node.my_predecessors.empty(); }
495 };
496
497 template<typename T>
498 struct test_reversal<tbb::flow::rejecting, T> {
499     test_reversal() { REMARK("<rejecting>"); }
500     bool operator()( T &node) { return !node.my_predecessors.empty(); }
501 };
502
503 template<typename P>
504 void
505 TestMultifunctionNode() {
506     typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int>, P> multinode_type;
507     REMARK("Testing multifunction_node");
508     test_reversal<P,multinode_type> my_test;
509     REMARK(":");
510     tbb::flow::graph g;
511     multinode_type mf(g, tbb::flow::serial, mf_body<multinode_type>(serial_fn_state0));
512     tbb::flow::queue_node<int> qin(g);
513     tbb::flow::queue_node<int> qodd_out(g);
514     tbb::flow::queue_node<int> qeven_out(g);
515     tbb::flow::make_edge(qin,mf);
516     tbb::flow::make_edge(tbb::flow::output_port<0>(mf), qeven_out);
517     tbb::flow::make_edge(tbb::flow::output_port<1>(mf), qodd_out);
518     g.wait_for_all();
519     for( int ii = 0; ii < 2 ; ++ii) {
520         serial_fn_state0 = 0;
521         if(ii == 0) REMARK(" reset preds"); else REMARK(" 2nd");
522         qin.try_put(0);
523         // wait for node to be active
524         BACKOFF_WAIT(serial_fn_state0 == 0, "timed out waiting for first put");
525         qin.try_put(1);
526         BACKOFF_WAIT((!my_test(mf)), "Timed out waiting");
527         ASSERT(my_test(mf), "fail second put test");
528         g.my_root_task->cancel_group_execution();
529         // release node
530         serial_fn_state0 = 2;
531         g.wait_for_all();
532         ASSERT(my_test(mf), "fail cancel group test");
533         if( ii == 1) {
534             REMARK(" rf_clear_edges");
535             g.reset(tbb::flow::rf_clear_edges);
536             ASSERT(tbb::flow::output_port<0>(mf).my_successors.empty(), "output_port<0> not reset (rf_clear_edges)");
537             ASSERT(tbb::flow::output_port<1>(mf).my_successors.empty(), "output_port<1> not reset (rf_clear_edges)");
538         }
539         else
540         {
541             g.reset();
542         }
543         ASSERT(mf.my_predecessors.empty(), "edge didn't reset");
544         ASSERT((ii == 0 && !qin.my_successors.empty()) || (ii == 1 && qin.my_successors.empty()), "edge didn't reset");
545     }
546     REMARK(" done\n");
547 }
548
549 // indexer_node is like a broadcast_node, in that none of its inputs reverse, and it
550 // never allows a successor to reverse its edge, so we only need test the successors.
551 void
552 TestIndexerNode() {
553     tbb::flow::graph g;
554     typedef tbb::flow::indexer_node< int, int > indexernode_type;
555     indexernode_type inode(g);
556     REMARK("Testing indexer_node:");
557     tbb::flow::queue_node<indexernode_type::output_type> qout(g);
558     tbb::flow::make_edge(inode,qout);
559     g.wait_for_all();
560     ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing");
561     g.reset();
562     ASSERT(!inode.my_successors.empty(), "successor of indexer_node missing after reset");
563     g.reset(tbb::flow::rf_clear_edges);
564     ASSERT(inode.my_successors.empty(), "successor of indexer_node not removed by reset(rf_clear_edges)");
565     REMARK(" done\n");
566 }
567
568 template<typename Node>
569 void
570 TestScalarNode(const char *name) {
571     tbb::flow::graph g;
572     Node on(g);
573     tbb::flow::queue_node<int> qout(g);
574     REMARK("Testing %s:", name);
575     tbb::flow::make_edge(on,qout);
576     g.wait_for_all();
577     ASSERT(!on.my_successors.empty(), "edge not added");
578     g.reset();
579     ASSERT(!on.my_successors.empty(), "edge improperly removed");
580     g.reset(tbb::flow::rf_clear_edges);
581     ASSERT(on.my_successors.empty(), "edge not removed by reset(rf_clear_edges)");
582     REMARK(" done\n");
583 }
584
585 struct seq_body {
586     size_t operator()(const int &in) {
587         return size_t(in / 3);
588     }
589 };
590
591 // sequencer_node behaves like a queueing node, but requires a different constructor.
592 void
593 TestSequencerNode() {
594     tbb::flow::graph g;
595     tbb::flow::sequencer_node<int> bnode(g, seq_body());
596     REMARK("Testing sequencer_node:");
597     tbb::flow::function_node<int> fnode(g, tbb::flow::serial, serial_fn_body<int>(serial_fn_state0));
598     REMARK("Testing sequencer_node:");
599     serial_fn_state0 = 0;  // reset to waiting state.
600     REMARK(" make_edge");
601     tbb::flow::make_edge(bnode, fnode);
602     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after make_edge");
603     REMARK(" try_put");
604     bnode.try_put(0);  // will forward to the fnode
605     BACKOFF_WAIT( serial_fn_state0 == 0, "timeout waiting for function_node");  // wait for the function_node to fire up
606     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after forwarding message");
607     serial_fn_state0 = 0;
608     g.wait_for_all();
609     REMARK(" remove_edge");
610     tbb::flow::remove_edge(bnode, fnode);
611     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after remove_edge");
612     tbb::flow::join_node<tbb::flow::tuple<int,int>,tbb::flow::reserving> jnode(g);
613     tbb::flow::make_edge(bnode, tbb::flow::input_port<0>(jnode));  // will spawn a task
614     g.wait_for_all();
615     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after attaching to join");
616     REMARK(" reverse");
617     bnode.try_put(3);  // the edge should reverse
618     g.wait_for_all();
619     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reserving");
620     REMARK(" reset()");
621     g.wait_for_all();
622     g.reset();  // should be in forward direction again
623     ASSERT(!bnode.my_successors.empty(), "buffering node has no successor after reset()");
624     REMARK(" remove_edge");
625     g.reset(tbb::flow::rf_clear_edges);  // should be in forward direction again
626     ASSERT(bnode.my_successors.empty(), "buffering node has a successor after reset(rf_clear_edges)");
627     ASSERT(fnode.my_predecessors.empty(), "buffering node reversed after reset(rf_clear_edges)");
628     REMARK("  done\n");
629     g.wait_for_all();
630 }
631
632 struct snode_body {
633     int max_cnt;
634     int my_cnt;
635     snode_body( const int &in) : max_cnt(in) { my_cnt = 0; }
636     bool operator()(int &out) {
637         if(max_cnt <= my_cnt++) return false;
638         out = my_cnt;
639         return true;
640     }
641 };
642
643 void
644 TestSourceNode() {
645     tbb::flow::graph g;
646     tbb::flow::source_node<int> sn(g, snode_body(4), false);
647     REMARK("Testing source_node:");
648     tbb::flow::queue_node<int> qin(g);
649     tbb::flow::join_node<tbb::flow::tuple<int,int>, tbb::flow::reserving> jn(g);
650     tbb::flow::queue_node<tbb::flow::tuple<int,int> > qout(g);
651
652     REMARK(" make_edges");
653     tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
654     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
655     tbb::flow::make_edge(jn,qout);
656     ASSERT(!sn.my_successors.empty(), "source node has no successor after make_edge");
657     g.wait_for_all();
658     g.reset();
659     ASSERT(!sn.my_successors.empty(), "source node has no successor after reset");
660     g.wait_for_all();
661     g.reset(tbb::flow::rf_clear_edges);
662     ASSERT(sn.my_successors.empty(), "source node has successor after reset(rf_clear_edges)");
663     tbb::flow::make_edge(sn, tbb::flow::input_port<0>(jn));
664     tbb::flow::make_edge(qin, tbb::flow::input_port<1>(jn));
665     tbb::flow::make_edge(jn,qout);
666     g.wait_for_all();
667     REMARK(" activate");
668     sn.activate();  // will forward to the fnode
669     REMARK(" wait1");
670     BACKOFF_WAIT( !sn.my_successors.empty(), "Timed out waiting for edge to reverse");
671     ASSERT(sn.my_successors.empty(), "source node has no successor after forwarding message");
672
673     g.wait_for_all();
674     g.reset();
675     ASSERT(!sn.my_successors.empty(), "source_node has no successors after reset");
676     ASSERT(tbb::flow::input_port<0>(jn).my_predecessors.empty(), "successor if source_node has pred after reset.");
677     REMARK(" done\n");
678 }
679
680 int TestMain() {
681
682     if(MinThread < 3) MinThread = 3;
683     tbb::task_scheduler_init init(MinThread);  // tests presume at least three threads
684
685     TestBufferingNode< tbb::flow::buffer_node<int> >("buffer_node");
686     TestBufferingNode< tbb::flow::priority_queue_node<int> >("priority_queue_node");
687     TestBufferingNode< tbb::flow::queue_node<int> >("queue_node");
688     TestSequencerNode();
689
690     TestMultifunctionNode<tbb::flow::rejecting>();
691     TestMultifunctionNode<tbb::flow::queueing>();
692     TestSourceNode();
693     TestContinueNode();
694     TestFunctionNode();
695
696     TestJoinNode();
697
698     TestLimiterNode();
699     TestIndexerNode();
700     TestSplitNode();
701     TestScalarNode<tbb::flow::broadcast_node<int> >("broadcast_node");
702     TestScalarNode<tbb::flow::overwrite_node<int> >("overwrite_node");
703     TestScalarNode<tbb::flow::write_once_node<int> >("write_once_node");
704
705     return Harness::Done;
706 }
707