70f087305883838d5990b734c60693757281e2c5
[platform/upstream/tbb.git] / src / test / test_eh_flow_graph.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #define HARNESS_DEFAULT_MIN_THREADS 2
18 #define HARNESS_DEFAULT_MAX_THREADS 4
19 #include "harness_defs.h"
20
21 #if _MSC_VER
22     #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
23 #endif
24
25 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
26     // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
27     #pragma warning (disable: 4702)
28 #endif
29
30 #include "harness.h"
31
32 // global task_scheduler_observer is an imperfect tool to find how many threads are really
33 // participating.  That was the hope, but it counts the entries into the marketplace,
34 // not the arena.
35 // #define USE_TASK_SCHEDULER_OBSERVER 1
36
37 #if _MSC_VER && defined(__INTEL_COMPILER) && !TBB_USE_DEBUG
38     #define TBB_RUN_BUFFERING_TEST __INTEL_COMPILER > 1210
39 #else
40     #define TBB_RUN_BUFFERING_TEST 1
41 #endif
42
43 #if TBB_USE_EXCEPTIONS
44 #if USE_TASK_SCHEDULER_OBSERVER
45 #include "tbb/task_scheduler_observer.h"
46 #endif
47 #include "tbb/flow_graph.h"
48 #include "tbb/task_scheduler_init.h"
49 #include <iostream>
50 #include <vector>
51 #include "harness_assert.h"
52 #include "harness_checktype.h"
53
54 inline intptr_t Existed() { return INT_MAX; }  // resolve Existed in harness_eh.h
55
56 #include "harness_eh.h"
57 #include <stdexcept>
58
59 #define NUM_ITEMS 15
60 int g_NumItems;
61
62 tbb::atomic<unsigned> nExceptions;
63 tbb::atomic<intptr_t> g_TGCCancelled;
64
65 enum TestNodeTypeEnum { nonThrowing, isThrowing };
66
67 static const size_t unlimited_type = 0;
68 static const size_t serial_type = 1;
69 static const size_t limited_type = 4;
70
71 template<TestNodeTypeEnum T> struct TestNodeTypeName;
72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
74
75 template<size_t Conc> struct concurrencyName;
76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
79
80 // Class that provides waiting and throwing behavior.  If we are not throwing, do nothing
81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
82 // stop further processing.  We will execute g_NumThreads + 10 times (the "10" is somewhat
83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
84 // If parallel or serial and throwing, use Harness::ConcurrencyTracker to wait.
85
86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
87 class WaitThrow;
88
89 template<>
90 class WaitThrow<serial_type,nonThrowing> {
91 protected:
92     void WaitAndThrow(int cnt, const char * /*name*/) {
93         if(cnt > g_NumThreads + 10) {
94             Harness::ConcurrencyTracker ct;
95             WaitUntilConcurrencyPeaks();
96         }
97     }
98 };
99
100 template<>
101 class WaitThrow<serial_type,isThrowing> {
102 protected:
103     void WaitAndThrow(int cnt, const char * /*name*/) {
104         if(cnt > g_NumThreads + 10) {
105             Harness::ConcurrencyTracker ct;
106             WaitUntilConcurrencyPeaks();
107             ThrowTestException(1);
108         }
109     }
110 };
111
112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
113 // to make sure enough other nodes wait for concurrency to peak.  If we are attached to
114 // N successors, for each item we pass to a successor, we will get N executions of the
115 // "absorbers" (because we broadcast to successors.)  for an odd number of threads we
116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
117 // of an "absorber", but we can't change that without changing the behavior of the node.)
118 template<>
119 class WaitThrow<limited_type,nonThrowing> {
120 protected:
121     void WaitAndThrow(int cnt, const char * /*name*/) {
122         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
123             return;
124         }
125         Harness::ConcurrencyTracker ct;
126         WaitUntilConcurrencyPeaks();
127     }
128 };
129
130 template<>
131 class WaitThrow<limited_type,isThrowing> {
132 protected:
133     void WaitAndThrow(int cnt, const char * /*name*/) {
134         Harness::ConcurrencyTracker ct;
135         if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
136             return;
137         }
138         WaitUntilConcurrencyPeaks();
139         ThrowTestException(1);
140     }
141 };
142
143 template<>
144 class WaitThrow<unlimited_type,nonThrowing> {
145 protected:
146     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
147         Harness::ConcurrencyTracker ct;
148         WaitUntilConcurrencyPeaks();
149     }
150 };
151
152 template<>
153 class WaitThrow<unlimited_type,isThrowing> {
154 protected:
155     void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
156         Harness::ConcurrencyTracker ct;
157         WaitUntilConcurrencyPeaks();
158         ThrowTestException(1);
159     }
160 };
161
162 void
163 ResetGlobals(bool throwException = true, bool flog = false) {
164     nExceptions = 0;
165     g_TGCCancelled = 0;
166     ResetEhGlobals(throwException, flog);
167 }
168
169 // -------source_node body ------------------
170 template <class OutputType, TestNodeTypeEnum TType>
171 class test_source_body : WaitThrow<serial_type, TType> {
172     using WaitThrow<serial_type, TType>::WaitAndThrow;
173     tbb::atomic<int> *my_current_val;
174     int my_mult;
175 public:
176     test_source_body(tbb::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
177         REMARK("- --------- - - -   constructed %lx\n", (size_t)(my_current_val));
178     }
179
180     bool operator()(OutputType & out) {
181         UPDATE_COUNTS();
182         out = OutputType(my_mult * ++(*my_current_val));
183         REMARK("xx(%lx) out == %d\n", (size_t)(my_current_val), (int)out);
184         if(*my_current_val > g_NumItems) {
185             REMARK(" ------ End of the line!\n");
186             *my_current_val = g_NumItems;
187             return false;
188         }
189         WaitAndThrow((int)out,"test_source_body");
190         return true;
191     }
192
193     int count_value() { return (int)*my_current_val; }
194 };
195
196 template <TestNodeTypeEnum TType>
197 class test_source_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
198     using WaitThrow<serial_type, TType>::WaitAndThrow;
199     tbb::atomic<int> *my_current_val;
200 public:
201     test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
202
203     bool operator()(tbb::flow::continue_msg & out) {
204         UPDATE_COUNTS();
205         int outint = ++(*my_current_val);
206         out = tbb::flow::continue_msg();
207         if(*my_current_val > g_NumItems) {
208             *my_current_val = g_NumItems;
209             return false;
210         }
211         WaitAndThrow(outint,"test_source_body");
212         return true;
213     }
214
215     int count_value() { return (int)*my_current_val; }
216 };
217
218 // -------{function/continue}_node body ------------------
219 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
220 class absorber_body : WaitThrow<Conc,T> {
221     using WaitThrow<Conc,T>::WaitAndThrow;
222     tbb::atomic<int> *my_count;
223 public:
224     absorber_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { }
225     OutputType operator()(const InputType &/*p_in*/) {
226         UPDATE_COUNTS();
227         int out = ++(*my_count);
228         WaitAndThrow(out,"absorber_body");
229         return OutputType();
230     }
231     int count_value() { return *my_count; }
232 };
233
234 // -------multifunction_node body ------------------
235
236 // helper classes
237 template<int N,class PortsType>
238 struct IssueOutput {
239     typedef typename tbb::flow::tuple_element<N-1,PortsType>::type::output_type my_type;
240
241     static void issue_tuple_element( PortsType &my_ports) {
242         ASSERT(tbb::flow::get<N-1>(my_ports).try_put(my_type()), "Error putting to successor");
243         IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
244     }
245 };
246
247 template<class PortsType>
248 struct IssueOutput<1,PortsType> {
249     typedef typename tbb::flow::tuple_element<0,PortsType>::type::output_type my_type;
250
251     static void issue_tuple_element( PortsType &my_ports) {
252         ASSERT(tbb::flow::get<0>(my_ports).try_put(my_type()), "Error putting to successor");
253     }
254 };
255
256 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
257 class multifunction_node_body : WaitThrow<Conc,T> {
258     using WaitThrow<Conc,T>::WaitAndThrow;
259     static const int N = tbb::flow::tuple_size<OutputTupleType>::value;
260     typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
261     typedef typename NodeType::output_ports_type PortsType;
262     tbb::atomic<int> *my_count;
263 public:
264     multifunction_node_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { }
265     void operator()(const InputType& /*in*/, PortsType &my_ports) {
266         UPDATE_COUNTS();
267         int out = ++(*my_count);
268         WaitAndThrow(out,"multifunction_node_body");
269         // issue an item to each output port.
270         IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
271     }
272
273     int count_value() { return *my_count; }
274 };
275
276 // --------- body to sort items in sequencer_node
277 template<class BufferItemType>
278 struct sequencer_body {
279     size_t operator()(const BufferItemType &s) {
280         ASSERT(s, "sequencer item out of range (== 0)");
281         return size_t(s) - 1;
282     }
283 };
284
285 // --------- body to compare the "priorities" of objects for priority_queue_node  five priority levels 0-4.
286 template<class T>
287 struct myLess {
288     bool operator()(const T &t1, const T &t2) {
289         return (int(t1) % 5) < (int(t2) % 5);
290     }
291 };
292
293 // --------- type for < comparison in priority_queue_node.
294 template<class ItemType>
295 struct less_body {
296     bool operator()(const ItemType &lhs, const ItemType &rhs) {
297         return (int(lhs) % 3) < (int(rhs) % 3);
298     }
299 };
300
301 // --------- tag methods for tag_matching join_node
302 template<typename TT>
303 class tag_func {
304     TT my_mult;
305 public:
306     tag_func(TT multiplier) : my_mult(multiplier) { }
307     void operator=( const tag_func& other){my_mult = other.my_mult;}
308     // operator() will return [0 .. Count)
309     tbb::flow::tag_value operator()( TT v) {
310         tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
311         return t;
312     }
313 };
314
315 // --------- Source body for split_node test.
316 template <class OutputTuple, TestNodeTypeEnum TType>
317 class tuple_test_source_body : WaitThrow<serial_type, TType> {
318     typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
319     typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
320     using WaitThrow<serial_type, TType>::WaitAndThrow;
321     tbb::atomic<int> *my_current_val;
322 public:
323     tuple_test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
324
325     bool operator()(OutputTuple & out) {
326         UPDATE_COUNTS();
327         int ival = ++(*my_current_val);
328         out = OutputTuple(ItemType0(ival),ItemType1(ival));
329         if(*my_current_val > g_NumItems) {
330             *my_current_val = g_NumItems;  // jam the final value; we assert on it later.
331             return false;
332         }
333         WaitAndThrow(ival,"tuple_test_source_body");
334         return true;
335     }
336
337     int count_value() { return (int)*my_current_val; }
338 };
339
340 // ------- end of node bodies
341
342 // source_node is only-serial.  source_node can throw, or the function_node can throw.
343 // graph being tested is
344 //
345 //      source_node+---+parallel function_node
346 //
347 //    After each run the graph is reset(), to test the reset functionality.
348 //
349
350
351 template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType>
352 void run_one_source_node_test(bool throwException, bool flog) {
353     typedef test_source_body<ItemType,srcThrowType> src_body_type;
354     typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
355     tbb::atomic<int> source_body_count;
356     tbb::atomic<int> absorber_body_count;
357     source_body_count = 0;
358     absorber_body_count = 0;
359
360     tbb::flow::graph g;
361
362     g_Master = Harness::CurrentTid();
363
364 #if USE_TASK_SCHEDULER_OBSERVER
365     eh_test_observer o;
366     o.observe(true);
367 #endif
368
369     tbb::flow::source_node<ItemType> sn(g, src_body_type(source_body_count),/*is_active*/false);
370     parallel_absorb_body_type ab2(absorber_body_count);
371     tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
372     make_edge(sn, parallel_fn);
373     for(int runcnt = 0; runcnt < 2; ++runcnt) {
374         ResetGlobals(throwException,flog);
375         if(throwException) {
376             TRY();
377                 sn.activate();
378                 g.wait_for_all();
379             CATCH_AND_ASSERT();
380         }
381         else {
382             TRY();
383                 sn.activate();
384                 g.wait_for_all();
385             CATCH_AND_FAIL();
386         }
387
388         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
389         int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
390         int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
391         if(throwException) {
392             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception flag in flow::graph not set");
393             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "canceled flag not set");
394             ASSERT(src_cnt <= g_NumItems, "Too many source_node items emitted");
395             ASSERT(sink_cnt <= src_cnt, "Too many source_node items received");
396         }
397         else {
398             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
399             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
400             ASSERT(src_cnt == g_NumItems, "Incorrect # source_node items emitted");
401             ASSERT(sink_cnt == src_cnt, "Incorrect # source_node items received");
402         }
403         g.reset();  // resets the body of the source_node and the absorb_nodes.
404         source_body_count = 0;
405         absorber_body_count = 0;
406         ASSERT(!g.exception_thrown(), "Reset didn't clear exception_thrown()");
407         ASSERT(!g.is_cancelled(), "Reset didn't clear is_cancelled()");
408         src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
409         sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
410         ASSERT(src_cnt == 0, "source_node count not reset");
411         ASSERT(sink_cnt == 0, "sink_node count not reset");
412     }
413 #if USE_TASK_SCHEDULER_OBSERVER
414     o.observe(false);
415 #endif
416 }  // run_one_source_node_test
417
418
419 template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType>
420 void run_source_node_test() {
421     run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(false,false);
422     run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,false);
423     run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,true);
424 }  // run_source_node_test
425
426 void test_source_node() {
427     REMARK("Testing source_node\n");
428     check_type<int>::check_type_counter = 0;
429     g_Wakeup_Msg = "source_node(1): Missed wakeup or machine is overloaded?";
430     run_source_node_test<check_type<int>, isThrowing, nonThrowing>();
431     ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
432     g_Wakeup_Msg = "source_node(2): Missed wakeup or machine is overloaded?";
433     run_source_node_test<int, isThrowing, nonThrowing>();
434     g_Wakeup_Msg = "source_node(3): Missed wakeup or machine is overloaded?";
435     run_source_node_test<int, nonThrowing, isThrowing>();
436     g_Wakeup_Msg = "source_node(4): Missed wakeup or machine is overloaded?";
437     run_source_node_test<int, isThrowing, isThrowing>();
438     g_Wakeup_Msg = "source_node(5): Missed wakeup or machine is overloaded?";
439     run_source_node_test<check_type<int>, isThrowing, isThrowing>();
440     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
441     ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
442 }
443
444 // -------- utilities & types to test function_node and multifunction_node.
445
446 // need to tell the template which node type I am using so it attaches successors correctly.
447 enum NodeFetchType { func_node_type, multifunc_node_type };
448
449 template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
450 struct AttachPoint;
451
452 template<class NodeType, class ItemType, int indx>
453 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
454     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
455         return tbb::flow::output_port<indx>(n);
456     }
457 };
458
459 template<class NodeType, class ItemType, int indx>
460 struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
461     static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
462         return n;
463     }
464 };
465
466
467 // common template for running function_node, multifunction_node.  continue_node
468 // has different firing requirements, so it needs a different graph topology.
469 template<
470     class SourceNodeType,
471     class SourceNodeBodyType0,
472     class SourceNodeBodyType1,
473     NodeFetchType NFT,
474     class TestNodeType,
475     class TestNodeBodyType,
476     class TypeToSink0,          // what kind of item are we sending to sink0
477     class TypeToSink1,          // what kind of item are we sending to sink1
478     class SinkNodeType0,        // will be same for function;
479     class SinkNodeType1,        // may differ for multifunction_node
480     class SinkNodeBodyType0,
481     class SinkNodeBodyType1,
482     size_t Conc
483     >
484 void
485 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
486
487     char mymsg[132];
488     char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
489     tbb::flow::graph g;
490
491     tbb::atomic<int> source0_count;
492     tbb::atomic<int> source1_count;
493     tbb::atomic<int> sink0_count;
494     tbb::atomic<int> sink1_count;
495     tbb::atomic<int> test_count;
496     source0_count = source1_count = sink0_count = sink1_count = test_count = 0;
497
498 #if USE_TASK_SCHEDULER_OBSERVER
499     eh_test_observer o;
500     o.observe(true);
501 #endif
502
503     g_Master = Harness::CurrentTid();
504     SourceNodeType source0(g, SourceNodeBodyType0(source0_count),/*is_active*/false);
505     SourceNodeType source1(g, SourceNodeBodyType1(source1_count),/*is_active*/false);
506     TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
507     SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
508     SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
509     make_edge(source0, node_to_test);
510     make_edge(source1, node_to_test);
511     make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
512     make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
513
514     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
515         sprintf(mymsg, "%s iter=%d, threads=%d, throw=%s, flog=%s", saved_msg, iter, g_NumThreads,
516                throwException?"T":"F", flog?"T":"F");
517         g_Wakeup_Msg = mymsg;
518         ResetGlobals(throwException,flog);
519         if(throwException) {
520             TRY();
521                 source0.activate();
522                 source1.activate();
523                 g.wait_for_all();
524             CATCH_AND_ASSERT();
525         }
526         else {
527             TRY();
528                 source0.activate();
529                 source1.activate();
530                 g.wait_for_all();
531             CATCH_AND_FAIL();
532         }
533         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
534         int sb0_cnt = tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value();
535         int sb1_cnt = tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value();
536         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
537         int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
538         int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
539         if(throwException) {
540             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
541             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
542             ASSERT(sb0_cnt + sb1_cnt <= 2*g_NumItems, "Too many items sent by sources");
543             ASSERT(sb0_cnt + sb1_cnt >= t_cnt, "Too many items received by test node");
544             ASSERT(nb0_cnt + nb1_cnt <= t_cnt*2, "Too many items received by sink nodes");
545         }
546         else {
547             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
548             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
549             ASSERT(sb0_cnt + sb1_cnt == 2*g_NumItems, "Missing invocations of source_nodes");
550             ASSERT(t_cnt == 2*g_NumItems, "Not all items reached test node");
551             ASSERT(nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems, "Missing items in absorbers");
552         }
553         g.reset();  // resets the body of the source_nodes, test_node and the absorb_nodes.
554         source0_count = source1_count = sink0_count = sink1_count = test_count = 0;
555         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value(),"Reset source 0 failed");
556         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value(),"Reset source 1 failed");
557         ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed");
558         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(),"Reset sink 0 failed");
559         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(),"Reset sink 1 failed");
560
561         g_Wakeup_Msg = saved_msg;
562     }
563 #if USE_TASK_SCHEDULER_OBSERVER
564     o.observe(false);
565 #endif
566 }
567
568 //  Test function_node
569 //
570 // graph being tested is
571 //
572 //        source_node -\                 /- parallel function_node
573 //                      \               /
574 //                       +function_node+
575 //                      /               \                                  x
576 //        source_node -/                 \- parallel function_node
577 //
578 //    After each run the graph is reset(), to test the reset functionality.
579 //
580 template<
581     TestNodeTypeEnum SType1,                          // does source node 1 throw?
582     TestNodeTypeEnum SType2,                          // does source node 2 throw?
583     class Item12,                                     // type of item passed between sources and test node
584     TestNodeTypeEnum FType,                           // does function node throw?
585     class Item23,                                     // type passed from function_node to sink nodes
586     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
587     TestNodeTypeEnum NType2,                          // does sink node 1 throw?
588     class NodePolicy,                                 // rejecting,queueing
589     size_t Conc                                       // is node concurrent? {serial | limited | unlimited}
590 >
591 void run_function_node_test() {
592
593     typedef test_source_body<Item12,SType1> SBodyType1;
594     typedef test_source_body<Item12,SType2> SBodyType2;
595     typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
596     typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
597     typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
598
599     typedef tbb::flow::source_node<Item12> SrcType;
600     typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
601     typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
602
603     for(int i = 0; i < 4; ++i ) {
604         if(i != 2) {  // doesn't make sense to flog a non-throwing test
605             bool doThrow = (i & 0x1) != 0;
606             bool doFlog = (i & 0x2) != 0;
607             run_one_functype_node_test<
608                 /*SourceNodeType*/      SrcType,
609                 /*SourceNodeBodyType0*/ SBodyType1,
610                 /*SourceNodeBodyType1*/ SBodyType2,
611                 /* NFT */               func_node_type,
612                 /*TestNodeType*/        TestType,
613                 /*TestNodeBodyType*/    TestBodyType,
614                 /*TypeToSink0 */        Item23,
615                 /*TypeToSink1 */        Item23,
616                 /*SinkNodeType0*/       SnkType,
617                 /*SinkNodeType1*/       SnkType,
618                 /*SinkNodeBodyType1*/   SinkBodyType1,
619                 /*SinkNodeBodyType2*/   SinkBodyType2,
620                 /*Conc*/                Conc>
621                     (doThrow,doFlog,"function_node");
622         }
623     }
624 }  // run_function_node_test
625
626 void test_function_node() {
627     REMARK("Testing function_node\n");
628     // serial rejecting
629     g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
630     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
631     g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
632     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
633     g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
634     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
635
636     // serial queueing
637     g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
638     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
639     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
640     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
641     check_type<int>::check_type_counter = 0;
642     run_function_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, check_type<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
643     ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
644
645     // unlimited parallel rejecting
646     g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
647     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
648     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
649     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
650
651     // limited parallel rejecting
652     g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
653     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
654     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
655     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
656
657     // limited parallel queueing
658     g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
659     run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
660     run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
661     run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
662
663     // everyone throwing
664     g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
665     run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
666     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
667 }
668
669 // ----------------------------------- multifunction_node ----------------------------------
670 //  Test multifunction_node.
671 //
672 // graph being tested is
673 //
674 //        source_node -\                      /- parallel function_node
675 //                      \                    /
676 //                       +multifunction_node+
677 //                      /                    \                                  x
678 //        source_node -/                      \- parallel function_node
679 //
680 //    After each run the graph is reset(), to test the reset functionality.  The
681 //    multifunction_node will put an item to each successor for every item
682 //    received.
683 //
684 template<
685     TestNodeTypeEnum SType0,                          // does source node 1 throw?
686     TestNodeTypeEnum SType1,                          // does source node 2 thorw?
687     class Item12,                                 // type of item passed between sources and test node
688     TestNodeTypeEnum FType,                           // does multifunction node throw?
689     class ItemTuple,                              // tuple of types passed from multifunction_node to sink nodes
690     TestNodeTypeEnum NType1,                          // does sink node 1 throw?
691     TestNodeTypeEnum NType2,                          // does sink node 2 throw?
692     class  NodePolicy,                            // rejecting,queueing
693     size_t Conc                                   // is node concurrent? {serial | limited | unlimited}
694 >
695 void run_multifunction_node_test() {
696
697     typedef typename tbb::flow::tuple_element<0,ItemTuple>::type Item23Type0;
698     typedef typename tbb::flow::tuple_element<1,ItemTuple>::type Item23Type1;
699     typedef test_source_body<Item12,SType0> SBodyType1;
700     typedef test_source_body<Item12,SType1> SBodyType2;
701     typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
702     typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
703     typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
704
705     typedef tbb::flow::source_node<Item12> SrcType;
706     typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
707     typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
708     typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
709
710     for(int i = 0; i < 4; ++i ) {
711         if(i != 2) {  // doesn't make sense to flog a non-throwing test
712             bool doThrow = (i & 0x1) != 0;
713             bool doFlog = (i & 0x2) != 0;
714     run_one_functype_node_test<
715         /*SourceNodeType*/      SrcType,
716         /*SourceNodeBodyType0*/ SBodyType1,
717         /*SourceNodeBodyType1*/ SBodyType2,
718         /*NFT*/                 multifunc_node_type,
719         /*TestNodeType*/        TestType,
720         /*TestNodeBodyType*/    TestBodyType,
721         /*TypeToSink0*/         Item23Type0,
722         /*TypeToSink1*/         Item23Type1,
723         /*SinkNodeType0*/       SnkType0,
724         /*SinkNodeType1*/       SnkType1,
725         /*SinkNodeBodyType0*/   SinkBodyType1,
726         /*SinkNodeBodyType1*/   SinkBodyType2,
727         /*Conc*/                Conc>
728             (doThrow,doFlog,"multifunction_node");
729         }
730     }
731 }  // run_multifunction_node_test
732
733 void test_multifunction_node() {
734     REMARK("Testing multifunction_node\n");
735     g_Wakeup_Msg = "multifunction_node(source throws,rejecting,serial): Missed wakeup or machine is overloaded?";
736     // serial rejecting
737     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
738     g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
739     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
740     g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
741     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
742
743     g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
744     // serial queueing
745     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
746     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
747     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
748     check_type<int>::check_type_counter = 0;
749     run_multifunction_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, tbb::flow::tuple<check_type<int>, check_type<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
750     ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
751
752     g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
753     // unlimited parallel rejecting
754     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
755     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
756     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
757
758     g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
759     // limited parallel rejecting
760     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
761     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
762     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
763
764     g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
765     // limited parallel queueing
766     run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
767     run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
768     run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
769
770     g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
771     // everyone throwing
772     run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, tbb::flow::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
773     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
774 }
775
776 //
777 // Continue node has T predecessors.  when it receives messages (continue_msg) on T predecessors
778 // it executes the body of the node, and forwards a continue_msg to its successors.
779 // However many predecessors the continue_node has, that's how many continue_msgs it receives
780 // on input before forwarding a message.
781 //
782 // The graph will look like
783 //
784 //                                          +broadcast_node+
785 //                                         /                \             ___
786 //      source_node+------>+broadcast_node+                  +continue_node+--->+absorber
787 //                                         \                /
788 //                                          +broadcast_node+
789 //
790 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
791 // The absorber is parallel, so each item emitted by the source will result in one thread
792 // spinning.  So for N threads we pass N-1 continue_messages, then spin wait and then throw if
793 // we are allowed to.
794
795 template < class SourceNodeType, class SourceNodeBodyType, class TTestNodeType, class TestNodeBodyType,
796         class SinkNodeType, class SinkNodeBodyType>
797 void run_one_continue_node_test (bool throwException, bool flog) {
798     tbb::flow::graph g;
799
800     tbb::atomic<int> source_count;
801     tbb::atomic<int> test_count;
802     tbb::atomic<int> sink_count;
803     source_count = test_count = sink_count = 0;
804 #if USE_TASK_SCHEDULER_OBSERVER
805     eh_test_observer o;
806     o.observe(true);
807 #endif
808     g_Master = Harness::CurrentTid();
809     SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
810     TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
811     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
812     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
813     make_edge(source, b1);
814     make_edge(b1,b2);
815     make_edge(b1,b3);
816     make_edge(b2,node_to_test);
817     make_edge(b3,node_to_test);
818     make_edge(node_to_test, sink);
819     for(int iter = 0; iter < 2; ++iter) {
820         ResetGlobals(throwException,flog);
821         if(throwException) {
822             TRY();
823                 source.activate();
824                 g.wait_for_all();
825             CATCH_AND_ASSERT();
826         }
827         else {
828             TRY();
829                 source.activate();
830                 g.wait_for_all();
831             CATCH_AND_FAIL();
832         }
833         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
834         int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
835         int t_cnt   = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
836         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
837         if(throwException) {
838             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
839             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
840             ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
841             ASSERT(sb_cnt >= t_cnt, "Too many items received by test node");
842             ASSERT(nb_cnt <= t_cnt, "Too many items received by sink nodes");
843         }
844         else {
845             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
846             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
847             ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
848             ASSERT(t_cnt == g_NumItems, "Not all items reached test node");
849             ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
850         }
851         g.reset();  // resets the body of the source_nodes, test_node and the absorb_nodes.
852         source_count = test_count = sink_count = 0;
853         ASSERT(0 == (int)test_count, "Atomic wasn't reset properly");
854         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
855         ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed");
856         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
857     }
858 #if USE_TASK_SCHEDULER_OBSERVER
859     o.observe(false);
860 #endif
861 }
862
863 template<
864     class ItemType,
865     TestNodeTypeEnum SType,   // does source node throw?
866     TestNodeTypeEnum CType,   // does continue_node throw?
867     TestNodeTypeEnum AType>    // does absorber throw
868 void run_continue_node_test() {
869     typedef test_source_body<tbb::flow::continue_msg,SType> SBodyType;
870     typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
871     typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
872
873     typedef tbb::flow::source_node<tbb::flow::continue_msg> SrcType;
874     typedef tbb::flow::continue_node<ItemType> TestType;
875     typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
876
877     for(int i = 0; i < 4; ++i ) {
878         if(i == 2) continue;  // don't run (false,true); it doesn't make sense.
879         bool doThrow = (i & 0x1) != 0;
880         bool doFlog = (i & 0x2) != 0;
881         run_one_continue_node_test<
882             /*SourceNodeType*/      SrcType,
883             /*SourceNodeBodyType*/  SBodyType,
884             /*TestNodeType*/        TestType,
885             /*TestNodeBodyType*/    ContBodyType,
886             /*SinkNodeType*/        SnkType,
887             /*SinkNodeBodyType*/    SinkBodyType>
888             (doThrow,doFlog);
889     }
890 }
891
892 //
893 void test_continue_node() {
894     REMARK("Testing continue_node\n");
895     g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
896     run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
897     g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
898     run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
899     g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
900     run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
901     g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
902     run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
903     check_type<double>::check_type_counter = 0;
904     run_continue_node_test<check_type<double>,isThrowing,isThrowing,isThrowing>();
905     ASSERT(!check_type<double>::check_type_counter, "Dropped objects in continue_node test");
906     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
907 }
908
909 // ---------- buffer_node queue_node overwrite_node --------------
910
911 template<
912     class BufferItemType,       //
913     class SourceNodeType,
914     class SourceNodeBodyType,
915     class TestNodeType,
916     class SinkNodeType,
917     class SinkNodeBodyType >
918 void run_one_buffer_node_test(bool throwException,bool flog) {
919     tbb::flow::graph g;
920
921     tbb::atomic<int> source_count;
922     tbb::atomic<int> sink_count;
923     source_count = sink_count = 0;
924 #if USE_TASK_SCHEDULER_OBSERVER
925     eh_test_observer o;
926     o.observe(true);
927 #endif
928     g_Master = Harness::CurrentTid();
929     SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
930     TestNodeType node_to_test(g);
931     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
932     make_edge(source,node_to_test);
933     make_edge(node_to_test, sink);
934     for(int iter = 0; iter < 2; ++iter) {
935         ResetGlobals(throwException,flog);
936         if(throwException) {
937             TRY();
938                 source.activate();
939                 g.wait_for_all();
940             CATCH_AND_ASSERT();
941         }
942         else {
943             TRY();
944                 source.activate();
945                 g.wait_for_all();
946             CATCH_AND_FAIL();
947         }
948         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
949         int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
950         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
951         if(throwException) {
952             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
953             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
954             ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
955             ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
956         }
957         else {
958             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
959             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
960             ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
961             ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
962         }
963         if(iter == 0) {
964             remove_edge(node_to_test, sink);
965             node_to_test.try_put(BufferItemType());
966             g.wait_for_all();
967             g.reset();
968             source_count = sink_count = 0;
969             BufferItemType tmp;
970             ASSERT(!node_to_test.try_get(tmp), "node not empty");
971             make_edge(node_to_test, sink);
972             g.wait_for_all();
973         }
974         else {
975             g.reset();
976             source_count = sink_count = 0;
977         }
978         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
979         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
980     }
981
982 #if USE_TASK_SCHEDULER_OBSERVER
983     o.observe(false);
984 #endif
985 }
986 template<class BufferItemType,
987          TestNodeTypeEnum SourceThrowType,
988          TestNodeTypeEnum SinkThrowType>
989 void run_buffer_queue_and_overwrite_node_test() {
990     typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
991     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
992
993     typedef tbb::flow::source_node<BufferItemType> SrcType;
994     typedef tbb::flow::buffer_node<BufferItemType> BufType;
995     typedef tbb::flow::queue_node<BufferItemType>  QueType;
996     typedef tbb::flow::overwrite_node<BufferItemType>  OvrType;
997     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
998
999     for(int i = 0; i < 4; ++i) {
1000         if(i == 2) continue;  // no need to test flog w/o throws
1001         bool throwException = (i & 0x1) != 0;
1002         bool doFlog = (i & 0x2) != 0;
1003 #if TBB_RUN_BUFFERING_TEST
1004         run_one_buffer_node_test<
1005             /* class BufferItemType*/     BufferItemType,
1006             /*class SourceNodeType*/      SrcType,
1007             /*class SourceNodeBodyType*/  SourceBodyType,
1008             /*class TestNodeType*/        BufType,
1009             /*class SinkNodeType*/        SnkType,
1010             /*class SinkNodeBodyType*/    SinkBodyType
1011             >(throwException, doFlog);
1012         run_one_buffer_node_test<
1013             /* class BufferItemType*/     BufferItemType,
1014             /*class SourceNodeType*/      SrcType,
1015             /*class SourceNodeBodyType*/  SourceBodyType,
1016             /*class TestNodeType*/        QueType,
1017             /*class SinkNodeType*/        SnkType,
1018             /*class SinkNodeBodyType*/    SinkBodyType
1019             >(throwException, doFlog);
1020 #endif
1021         run_one_buffer_node_test<
1022             /* class BufferItemType*/     BufferItemType,
1023             /*class SourceNodeType*/      SrcType,
1024             /*class SourceNodeBodyType*/  SourceBodyType,
1025             /*class TestNodeType*/        OvrType,
1026             /*class SinkNodeType*/        SnkType,
1027             /*class SinkNodeBodyType*/    SinkBodyType
1028             >(throwException, doFlog);
1029     }
1030 }
1031
1032 void test_buffer_queue_and_overwrite_node() {
1033     REMARK("Testing buffer_node, queue_node and overwrite_node\n");
1034 #if TBB_RUN_BUFFERING_TEST
1035 #else
1036     REMARK("skip buffer and queue test (known issue)\n");
1037 #endif
1038     g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1039     run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1040     g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1041     run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1042     g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1043     run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1044     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1045 }
1046
1047 // ---------- sequencer_node -------------------------
1048
1049
1050 template<
1051     class BufferItemType,       //
1052     class SourceNodeType,
1053     class SourceNodeBodyType,
1054     class TestNodeType,
1055     class SeqBodyType,
1056     class SinkNodeType,
1057     class SinkNodeBodyType >
1058 void run_one_sequencer_node_test(bool throwException,bool flog) {
1059     tbb::flow::graph g;
1060
1061     tbb::atomic<int> source_count;
1062     tbb::atomic<int> sink_count;
1063     source_count = sink_count = 0;
1064 #if USE_TASK_SCHEDULER_OBSERVER
1065     eh_test_observer o;
1066     o.observe(true);
1067 #endif
1068     g_Master = Harness::CurrentTid();
1069     SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1070     TestNodeType node_to_test(g,SeqBodyType());
1071     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1072     make_edge(source,node_to_test);
1073     make_edge(node_to_test, sink);
1074     for(int iter = 0; iter < 2; ++iter) {
1075         ResetGlobals(throwException,flog);
1076         if(throwException) {
1077             TRY();
1078                 source.activate();
1079                 g.wait_for_all();
1080             CATCH_AND_ASSERT();
1081         }
1082         else {
1083             TRY();
1084                 source.activate();
1085                 g.wait_for_all();
1086             CATCH_AND_FAIL();
1087         }
1088         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1089         int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1090         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1091         if(throwException) {
1092             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1093             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1094             ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1095             ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1096         }
1097         else {
1098             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1099             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1100             ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
1101             ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1102         }
1103         if(iter == 0) {
1104             remove_edge(node_to_test, sink);
1105             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1106             node_to_test.try_put(BufferItemType(1));
1107             g.wait_for_all();
1108             g.reset();
1109             source_count = sink_count = 0;
1110             make_edge(node_to_test, sink);
1111             g.wait_for_all();
1112         }
1113         else {
1114             g.reset();
1115             source_count = sink_count = 0;
1116         }
1117         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1118         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1119     }
1120
1121 #if USE_TASK_SCHEDULER_OBSERVER
1122     o.observe(false);
1123 #endif
1124 }
1125
1126 template<class BufferItemType,
1127          TestNodeTypeEnum SourceThrowType,
1128          TestNodeTypeEnum SinkThrowType>
1129 void run_sequencer_node_test() {
1130     typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1131     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1132     typedef sequencer_body<BufferItemType> SeqBodyType;
1133
1134     typedef tbb::flow::source_node<BufferItemType> SrcType;
1135     typedef tbb::flow::sequencer_node<BufferItemType>  SeqType;
1136     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1137
1138     for(int i = 0; i < 4; ++i) {
1139         if(i == 2) continue;  // no need to test flog w/o throws
1140         bool throwException = (i & 0x1) != 0;
1141         bool doFlog = (i & 0x2) != 0;
1142         run_one_sequencer_node_test<
1143             /* class BufferItemType*/     BufferItemType,
1144             /*class SourceNodeType*/      SrcType,
1145             /*class SourceNodeBodyType*/  SourceBodyType,
1146             /*class TestNodeType*/        SeqType,
1147             /*class SeqBodyType*/         SeqBodyType,
1148             /*class SinkNodeType*/        SnkType,
1149             /*class SinkNodeBodyType*/    SinkBodyType
1150             >(throwException, doFlog);
1151     }
1152 }
1153
1154
1155
1156 void test_sequencer_node() {
1157     REMARK("Testing sequencer_node\n");
1158     g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1159     run_sequencer_node_test<int, isThrowing,nonThrowing>();
1160     check_type<int>::check_type_counter = 0;
1161     g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1162     run_sequencer_node_test<check_type<int>, nonThrowing,isThrowing>();
1163     ASSERT(!check_type<int>::check_type_counter, "Dropped objects in sequencer_node test");
1164     g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1165     run_sequencer_node_test<int, isThrowing,isThrowing>();
1166     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1167 }
1168
1169 // ------------ priority_queue_node ------------------
1170
1171 template<
1172     class BufferItemType,
1173     class SourceNodeType,
1174     class SourceNodeBodyType,
1175     class TestNodeType,
1176     class SinkNodeType,
1177     class SinkNodeBodyType >
1178 void run_one_priority_queue_node_test(bool throwException,bool flog) {
1179     tbb::flow::graph g;
1180
1181     tbb::atomic<int> source_count;
1182     tbb::atomic<int> sink_count;
1183     source_count = sink_count = 0;
1184 #if USE_TASK_SCHEDULER_OBSERVER
1185     eh_test_observer o;
1186     o.observe(true);
1187 #endif
1188     g_Master = Harness::CurrentTid();
1189     SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1190
1191     TestNodeType node_to_test(g);
1192
1193     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1194
1195     make_edge(source,node_to_test);
1196     make_edge(node_to_test, sink);
1197     for(int iter = 0; iter < 2; ++iter) {
1198         ResetGlobals(throwException,flog);
1199         if(throwException) {
1200             TRY();
1201                 source.activate();
1202                 g.wait_for_all();
1203             CATCH_AND_ASSERT();
1204         }
1205         else {
1206             TRY();
1207                 source.activate();
1208                 g.wait_for_all();
1209             CATCH_AND_FAIL();
1210         }
1211         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1212         int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1213         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1214         if(throwException) {
1215             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1216             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1217             ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1218             ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1219         }
1220         else {
1221             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1222             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1223             ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
1224             ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1225         }
1226         if(iter == 0) {
1227             remove_edge(node_to_test, sink);
1228             node_to_test.try_put(BufferItemType(g_NumItems + 1));
1229             node_to_test.try_put(BufferItemType(g_NumItems + 2));
1230             node_to_test.try_put(BufferItemType());
1231             g.wait_for_all();
1232             g.reset();
1233             source_count = sink_count = 0;
1234             make_edge(node_to_test, sink);
1235             g.wait_for_all();
1236         }
1237         else {
1238             g.reset();
1239             source_count = sink_count = 0;
1240         }
1241         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1242         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1243     }
1244
1245 #if USE_TASK_SCHEDULER_OBSERVER
1246     o.observe(false);
1247 #endif
1248 }
1249
1250 template<class BufferItemType,
1251          TestNodeTypeEnum SourceThrowType,
1252          TestNodeTypeEnum SinkThrowType>
1253 void run_priority_queue_node_test() {
1254     typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1255     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1256     typedef less_body<BufferItemType> LessBodyType;
1257
1258     typedef tbb::flow::source_node<BufferItemType> SrcType;
1259     typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType>  PrqType;
1260     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1261
1262     for(int i = 0; i < 4; ++i) {
1263         if(i == 2) continue;  // no need to test flog w/o throws
1264         bool throwException = (i & 0x1) != 0;
1265         bool doFlog = (i & 0x2) != 0;
1266         run_one_priority_queue_node_test<
1267             /* class BufferItemType*/     BufferItemType,
1268             /*class SourceNodeType*/      SrcType,
1269             /*class SourceNodeBodyType*/  SourceBodyType,
1270             /*class TestNodeType*/        PrqType,
1271             /*class SinkNodeType*/        SnkType,
1272             /*class SinkNodeBodyType*/    SinkBodyType
1273             >(throwException, doFlog);
1274     }
1275 }
1276
1277 void test_priority_queue_node() {
1278     REMARK("Testing priority_queue_node\n");
1279     g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1280     run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1281     check_type<int>::check_type_counter = 0;
1282     g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1283     run_priority_queue_node_test<check_type<int>, nonThrowing,isThrowing>();
1284     ASSERT(!check_type<int>::check_type_counter, "Dropped objects in priority_queue_node test");
1285     g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1286     run_priority_queue_node_test<int, isThrowing,isThrowing>();
1287     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1288 }
1289
1290 // ------------------- join_node ----------------
1291 template<class JP> struct graph_policy_name{
1292     static const char* name() {return "unknown"; }
1293 };
1294 template<> struct graph_policy_name<tbb::flow::queueing>  {
1295     static const char* name() {return "queueing"; }
1296 };
1297 template<> struct graph_policy_name<tbb::flow::reserving> {
1298     static const char* name() {return "reserving"; }
1299 };
1300 template<> struct graph_policy_name<tbb::flow::tag_matching> {
1301     static const char* name() {return "tag_matching"; }
1302 };
1303
1304
1305 template<
1306     class JP,
1307     class OutputTuple,
1308     class SourceType0,
1309     class SourceBodyType0,
1310     class SourceType1,
1311     class SourceBodyType1,
1312     class TestJoinType,
1313     class SinkType,
1314     class SinkBodyType
1315     >
1316 struct run_one_join_node_test {
1317     run_one_join_node_test() {}
1318     static void execute_test(bool throwException,bool flog) {
1319         typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1320         typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1321
1322         tbb::flow::graph g;
1323         tbb::atomic<int>source0_count;
1324         tbb::atomic<int>source1_count;
1325         tbb::atomic<int>sink_count;
1326         source0_count = source1_count = sink_count = 0;
1327 #if USE_TASK_SCHEDULER_OBSERVER
1328         eh_test_observer o;
1329         o.observe(true);
1330 #endif
1331         g_Master = Harness::CurrentTid();
1332         SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false);
1333         SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false);
1334         TestJoinType node_to_test(g);
1335         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1336         make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1337         make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1338         make_edge(node_to_test, sink);
1339         for(int iter = 0; iter < 2; ++iter) {
1340             ResetGlobals(throwException,flog);
1341             if(throwException) {
1342                 TRY();
1343                     source0.activate();
1344                     source1.activate();
1345                     g.wait_for_all();
1346                 CATCH_AND_ASSERT();
1347             }
1348             else {
1349                 TRY();
1350                     source0.activate();
1351                     source1.activate();
1352                     g.wait_for_all();
1353                 CATCH_AND_FAIL();
1354             }
1355             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1356             int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1357             int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1358             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1359             if(throwException) {
1360                 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1361                 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1362                 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1363                 ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes");
1364             }
1365             else {
1366                 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1367                 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1368                 if(sb0_cnt != g_NumItems) {
1369                     REMARK("throwException == %s\n", throwException ? "true" : "false");
1370                     REMARK("iter == %d\n", (int)iter);
1371                     REMARK("sb0_cnt == %d\n", (int)sb0_cnt);
1372                     REMARK("g_NumItems == %d\n", (int)g_NumItems);
1373                 }
1374                 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");  // this one
1375                 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1376                 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1377             }
1378             if(iter == 0) {
1379                 remove_edge(node_to_test, sink);
1380                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1381                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1382                 g.wait_for_all();
1383                 g.reset();
1384                 source0_count = source1_count = sink_count = 0;
1385                 make_edge(node_to_test, sink);
1386                 g.wait_for_all();
1387             }
1388             else {
1389                 g.wait_for_all();
1390                 g.reset();
1391                 source0_count = source1_count = sink_count = 0;
1392             }
1393             ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1394             ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1395             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1396             ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1397         }
1398
1399 #if USE_TASK_SCHEDULER_OBSERVER
1400         o.observe(false);
1401 #endif
1402     }
1403 };  // run_one_join_node_test
1404
1405 template<
1406     class OutputTuple,
1407     class SourceType0,
1408     class SourceBodyType0,
1409     class SourceType1,
1410     class SourceBodyType1,
1411     class TestJoinType,
1412     class SinkType,
1413     class SinkBodyType
1414     >
1415 struct run_one_join_node_test<
1416         tbb::flow::tag_matching,
1417         OutputTuple,
1418         SourceType0,
1419         SourceBodyType0,
1420         SourceType1,
1421         SourceBodyType1,
1422         TestJoinType,
1423         SinkType,
1424         SinkBodyType
1425     > {
1426     run_one_join_node_test() {}
1427     static void execute_test(bool throwException,bool flog) {
1428         typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1429         typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1430
1431         tbb::flow::graph g;
1432
1433         tbb::atomic<int>source0_count;
1434         tbb::atomic<int>source1_count;
1435         tbb::atomic<int>sink_count;
1436         source0_count = source1_count = sink_count = 0;
1437 #if USE_TASK_SCHEDULER_OBSERVER
1438         eh_test_observer o;
1439         o.observe(true);
1440 #endif
1441         g_Master = Harness::CurrentTid();
1442         SourceType0 source0(g, SourceBodyType0(source0_count, 2),/*is_active*/false);
1443         SourceType1 source1(g, SourceBodyType1(source1_count, 3),/*is_active*/false);
1444         TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1445         SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1446         make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1447         make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1448         make_edge(node_to_test, sink);
1449         for(int iter = 0; iter < 2; ++iter) {
1450             ResetGlobals(throwException,flog);
1451             if(throwException) {
1452                 TRY();
1453                     source0.activate();
1454                     source1.activate();
1455                     g.wait_for_all();
1456                 CATCH_AND_ASSERT();
1457             }
1458             else {
1459                 TRY();
1460                     source0.activate();
1461                     source1.activate();
1462                     g.wait_for_all();
1463                 CATCH_AND_FAIL();
1464             }
1465             bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1466             int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1467             int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1468             int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1469             if(throwException) {
1470                 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1471                 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1472                 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1473                 ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes");
1474             }
1475             else {
1476                 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1477                 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1478                 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");
1479                 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1480                 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1481             }
1482             if(iter == 0) {
1483                 remove_edge(node_to_test, sink);
1484                 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1485                 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1486                 g.wait_for_all();   // have to wait for the graph to stop again....
1487                 g.reset();  // resets the body of the source_nodes, test_node and the absorb_nodes.
1488                 source0_count = source1_count = sink_count = 0;
1489                 make_edge(node_to_test, sink);
1490                 g.wait_for_all();   // have to wait for the graph to stop again....
1491             }
1492             else {
1493                 g.wait_for_all();
1494                 g.reset();
1495                 source0_count = source1_count = sink_count = 0;
1496             }
1497             ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1498             ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1499             nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1500             ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1501         }
1502
1503 #if USE_TASK_SCHEDULER_OBSERVER
1504         o.observe(false);
1505 #endif
1506     }
1507 };  // run_one_join_node_test<tag_matching>
1508
1509 template<class JP, class OutputTuple,
1510              TestNodeTypeEnum SourceThrowType,
1511              TestNodeTypeEnum SinkThrowType>
1512 void run_join_node_test() {
1513     typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1514     typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1515     typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0;
1516     typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1;
1517     typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1518
1519     typedef typename tbb::flow::source_node<ItemType0> SourceType0;
1520     typedef typename tbb::flow::source_node<ItemType1> SourceType1;
1521     typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1522     typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1523
1524     for(int i = 0; i < 4; ++i) {
1525         if(2 == i) continue;
1526         bool throwException = (i & 0x1) != 0;
1527         bool doFlog = (i & 0x2) != 0;
1528         run_one_join_node_test<
1529              JP,
1530              OutputTuple,
1531              SourceType0,
1532              SourceBodyType0,
1533              SourceType1,
1534              SourceBodyType1,
1535              TestJoinType,
1536              SinkType,
1537              SinkBodyType>::execute_test(throwException,doFlog);
1538     }
1539 }
1540
1541 template<class JP>
1542 void test_join_node() {
1543     REMARK("Testing join_node<%s>\n", graph_policy_name<JP>::name());
1544     // only doing two-input joins
1545     g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1546     run_join_node_test<JP, tbb::flow::tuple<int,int>,  isThrowing, nonThrowing>();
1547     check_type<int>::check_type_counter = 0;
1548     g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1549     run_join_node_test<JP, tbb::flow::tuple<check_type<int>,int>, nonThrowing, isThrowing>();
1550     ASSERT(!check_type<int>::check_type_counter, "Dropped items in test");
1551     g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1552     run_join_node_test<JP, tbb::flow::tuple<int,int>,  isThrowing, isThrowing>();
1553     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1554 }
1555
1556 // ------------------- limiter_node -------------
1557
1558 template<
1559     class BufferItemType,       //
1560     class SourceNodeType,
1561     class SourceNodeBodyType,
1562     class TestNodeType,
1563     class SinkNodeType,
1564     class SinkNodeBodyType >
1565 void run_one_limiter_node_test(bool throwException,bool flog) {
1566     tbb::flow::graph g;
1567
1568     tbb::atomic<int> source_count;
1569     tbb::atomic<int> sink_count;
1570     source_count = sink_count = 0;
1571 #if USE_TASK_SCHEDULER_OBSERVER
1572     eh_test_observer o;
1573     o.observe(true);
1574 #endif
1575     g_Master = Harness::CurrentTid();
1576     SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1577     TestNodeType node_to_test(g,g_NumThreads + 1);
1578     SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1579     make_edge(source,node_to_test);
1580     make_edge(node_to_test, sink);
1581     for(int iter = 0; iter < 2; ++iter) {
1582         ResetGlobals(throwException,flog);
1583         if(throwException) {
1584             TRY();
1585                 source.activate();
1586                 g.wait_for_all();
1587             CATCH_AND_ASSERT();
1588         }
1589         else {
1590             TRY();
1591                 source.activate();
1592                 g.wait_for_all();
1593             CATCH_AND_FAIL();
1594         }
1595         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1596         int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1597         int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1598         if(throwException) {
1599             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1600             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1601             ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1602             ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1603         }
1604         else {
1605             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1606             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1607             // we stop after limiter's limit, which is g_NumThreads + 1.  The source_node
1608             // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1609             ASSERT(sb_cnt == g_NumThreads + 2, "Missing invocations of source_node");
1610             ASSERT(nb_cnt == g_NumThreads + 1, "Missing items in absorbers");
1611         }
1612         if(iter == 0) {
1613             remove_edge(node_to_test, sink);
1614             node_to_test.try_put(BufferItemType());
1615             node_to_test.try_put(BufferItemType());
1616             g.wait_for_all();
1617             g.reset();
1618             source_count = sink_count = 0;
1619             BufferItemType tmp;
1620             ASSERT(!node_to_test.try_get(tmp), "node not empty");
1621             make_edge(node_to_test, sink);
1622             g.wait_for_all();
1623         }
1624         else {
1625             g.reset();
1626             source_count = sink_count = 0;
1627         }
1628         ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1629         ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1630     }
1631
1632 #if USE_TASK_SCHEDULER_OBSERVER
1633     o.observe(false);
1634 #endif
1635 }
1636
1637 template<class BufferItemType,
1638          TestNodeTypeEnum SourceThrowType,
1639          TestNodeTypeEnum SinkThrowType>
1640 void run_limiter_node_test() {
1641     typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1642     typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1643
1644     typedef tbb::flow::source_node<BufferItemType> SrcType;
1645     typedef tbb::flow::limiter_node<BufferItemType>  LmtType;
1646     typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1647
1648     for(int i = 0; i < 4; ++i) {
1649         if(i == 2) continue;  // no need to test flog w/o throws
1650         bool throwException = (i & 0x1) != 0;
1651         bool doFlog = (i & 0x2) != 0;
1652         run_one_limiter_node_test<
1653             /* class BufferItemType*/     BufferItemType,
1654             /*class SourceNodeType*/      SrcType,
1655             /*class SourceNodeBodyType*/  SourceBodyType,
1656             /*class TestNodeType*/        LmtType,
1657             /*class SinkNodeType*/        SnkType,
1658             /*class SinkNodeBodyType*/    SinkBodyType
1659             >(throwException, doFlog);
1660     }
1661 }
1662
1663 void test_limiter_node() {
1664     REMARK("Testing limiter_node\n");
1665     g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1666     run_limiter_node_test<int,isThrowing,nonThrowing>();
1667     g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1668     run_limiter_node_test<int,nonThrowing,isThrowing>();
1669     g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1670     run_limiter_node_test<int,isThrowing,isThrowing>();
1671     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1672 }
1673
1674 // -------- split_node --------------------
1675
1676 template<
1677     class InputTuple,
1678     class SourceType,
1679     class SourceBodyType,
1680     class TestSplitType,
1681     class SinkType0,
1682     class SinkBodyType0,
1683     class SinkType1,
1684     class SinkBodyType1>
1685 void run_one_split_node_test(bool throwException, bool flog) {
1686
1687     tbb::flow::graph g;
1688
1689     tbb::atomic<int> source_count;
1690     tbb::atomic<int> sink0_count;
1691     tbb::atomic<int> sink1_count;
1692     source_count = sink0_count = sink1_count = 0;
1693 #if USE_TASK_SCHEDULER_OBSERVER
1694     eh_test_observer o;
1695     o.observe(true);
1696 #endif
1697
1698     g_Master = Harness::CurrentTid();
1699     SourceType source(g, SourceBodyType(source_count),/*is_active*/false);
1700     TestSplitType node_to_test(g);
1701     SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1702     SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1703     make_edge(source, node_to_test);
1704     make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1705     make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1706
1707     for(int iter = 0; iter < 2; ++iter) {  // run, reset, run again
1708         ResetGlobals(throwException,flog);
1709         if(throwException) {
1710             TRY();
1711                 source.activate();
1712                 g.wait_for_all();
1713             CATCH_AND_ASSERT();
1714         }
1715         else {
1716             TRY();
1717                 source.activate();
1718                 g.wait_for_all();
1719             CATCH_AND_FAIL();
1720         }
1721         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1722         int sb_cnt = tbb::flow::copy_body<SourceBodyType>(source).count_value();
1723         int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1724         int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1725         if(throwException) {
1726             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1727             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1728             ASSERT(sb_cnt <= 2*g_NumItems, "Too many items sent by source");
1729             ASSERT(nb0_cnt + nb1_cnt <= sb_cnt*2, "Too many items received by sink nodes");
1730         }
1731         else {
1732             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1733             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1734             ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_nodes");
1735             ASSERT(nb0_cnt == g_NumItems && nb1_cnt == g_NumItems, "Missing items in absorbers");
1736         }
1737         g.reset();  // resets the body of the source_nodes and the absorb_nodes.
1738         source_count = sink0_count = sink1_count = 0;
1739         ASSERT(0 == tbb::flow::copy_body<SourceBodyType>(source).count_value(),"Reset source failed");
1740         ASSERT(0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(),"Reset sink 0 failed");
1741         ASSERT(0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(),"Reset sink 1 failed");
1742     }
1743 #if USE_TASK_SCHEDULER_OBSERVER
1744     o.observe(false);
1745 #endif
1746 }
1747
1748 template<class InputTuple,
1749              TestNodeTypeEnum SourceThrowType,
1750              TestNodeTypeEnum SinkThrowType>
1751 void run_split_node_test() {
1752     typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1753     typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1754     typedef tuple_test_source_body<InputTuple,SourceThrowType> SourceBodyType;
1755     typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1756     typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1757
1758     typedef typename tbb::flow::source_node<InputTuple> SourceType;
1759     typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1760     typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1761     typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1762
1763     for(int i = 0; i < 4; ++i) {
1764         if(2 == i) continue;
1765         bool throwException = (i & 0x1) != 0;
1766         bool doFlog = (i & 0x2) != 0;
1767         run_one_split_node_test<
1768             InputTuple,
1769             SourceType,
1770             SourceBodyType,
1771             TestSplitType,
1772             SinkType0,
1773             SinkBodyType0,
1774             SinkType1,
1775             SinkBodyType1>
1776                 (throwException,doFlog);
1777     }
1778 }
1779
1780 void test_split_node() {
1781     REMARK("Testing split_node\n");
1782     g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1783     run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1784     g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1785     run_split_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>();
1786     g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1787     run_split_node_test<tbb::flow::tuple<int,int>, isThrowing,  isThrowing>();
1788     g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1789 }
1790
1791 // --------- indexer_node ----------------------
1792
1793 template < class InputTuple,
1794     class SourceType0,
1795     class SourceBodyType0,
1796     class SourceType1,
1797     class SourceBodyType1,
1798     class TestNodeType,
1799     class SinkType,
1800     class SinkBodyType>
1801 void run_one_indexer_node_test(bool throwException,bool flog) {
1802     typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1803     typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1804
1805     tbb::flow::graph g;
1806
1807     tbb::atomic<int> source0_count;
1808     tbb::atomic<int> source1_count;
1809     tbb::atomic<int> sink_count;
1810     source0_count = source1_count = sink_count = 0;
1811 #if USE_TASK_SCHEDULER_OBSERVER
1812     eh_test_observer o;
1813     o.observe(true);
1814 #endif
1815     g_Master = Harness::CurrentTid();
1816     SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false);
1817     SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false);
1818     TestNodeType node_to_test(g);
1819     SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1820     make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1821     make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1822     make_edge(node_to_test, sink);
1823     for(int iter = 0; iter < 2; ++iter) {
1824         ResetGlobals(throwException,flog);
1825         if(throwException) {
1826             TRY();
1827                 source0.activate();
1828                 source1.activate();
1829                 g.wait_for_all();
1830             CATCH_AND_ASSERT();
1831         }
1832         else {
1833             TRY();
1834                 source0.activate();
1835                 source1.activate();
1836                 g.wait_for_all();
1837             CATCH_AND_FAIL();
1838         }
1839         bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1840         int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1841         int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1842         int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1843         if(throwException) {
1844             ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1845             ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1846             ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1847             ASSERT(nb_cnt <= sb0_cnt + sb1_cnt, "Too many items received by sink nodes");
1848         }
1849         else {
1850             ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1851             ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1852             ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");
1853             ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1854             ASSERT(nb_cnt == 2*g_NumItems, "Missing items in absorbers");
1855         }
1856         if(iter == 0) {
1857             remove_edge(node_to_test, sink);
1858             tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1859             tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1860             g.wait_for_all();
1861             g.reset();
1862             source0_count = source1_count = sink_count = 0;
1863             make_edge(node_to_test, sink);
1864             g.wait_for_all();
1865         }
1866         else {
1867             g.wait_for_all();
1868             g.reset();
1869             source0_count = source1_count = sink_count = 0;
1870         }
1871         ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1872         ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1873         nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1874         ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1875     }
1876
1877 #if USE_TASK_SCHEDULER_OBSERVER
1878     o.observe(false);
1879 #endif
1880 }
1881
1882 template<class InputTuple,
1883     TestNodeTypeEnum SourceThrowType,
1884     TestNodeTypeEnum SinkThrowType>
1885 void run_indexer_node_test() {
1886     typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1887     typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1888     typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0;
1889     typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1;
1890     typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1891     typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1892
1893     typedef typename tbb::flow::source_node<ItemType0> SourceType0;
1894     typedef typename tbb::flow::source_node<ItemType1> SourceType1;
1895     typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1896
1897     for(int i = 0; i < 4; ++i) {
1898         if(2 == i) continue;
1899         bool throwException = (i & 0x1) != 0;
1900         bool doFlog = (i & 0x2) != 0;
1901         run_one_indexer_node_test<
1902              InputTuple,
1903              SourceType0,
1904              SourceBodyType0,
1905              SourceType1,
1906              SourceBodyType1,
1907              TestNodeType,
1908              SinkType,
1909              SinkBodyType>(throwException,doFlog);
1910     }
1911 }
1912
1913 void test_indexer_node() {
1914     REMARK("Testing indexer_node\n");
1915     g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1916     run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1917     g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1918     run_indexer_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>();
1919     g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1920     run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing,  isThrowing>();
1921     g_Wakeup_Msg = g_Orig_Wakeup_Msg;;
1922 }
1923
1924 ///////////////////////////////////////////////
1925 // whole-graph exception test
1926
1927 class Foo {
1928 private:
1929     // std::vector<int>& m_vec;
1930     std::vector<int>* m_vec;
1931 public:
1932     Foo(std::vector<int>& vec) : m_vec(&vec) { }
1933     void operator() (tbb::flow::continue_msg) const {
1934         ++nExceptions;
1935         m_vec->at(m_vec->size()); // Will throw out_of_range exception
1936         ASSERT(false, "Exception not thrown by invalid access");
1937     }
1938 };
1939
1940 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786
1941 // exception thrown in graph node, not caught in wait_for_all()
1942 void
1943 test_flow_graph_exception0() {
1944     // Initializes body
1945     std::vector<int> vec;
1946     vec.push_back(0);
1947     Foo f(vec);
1948     nExceptions = 0;
1949
1950     // Construct graph and nodes
1951     tbb::flow::graph g;
1952     tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1953     tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1954
1955     // Construct edge
1956     tbb::flow::make_edge(start, fooNode);
1957
1958     // Execute graph
1959     ASSERT(!g.exception_thrown(), "exception_thrown flag already set");
1960     ASSERT(!g.is_cancelled(), "canceled flag already set");
1961     try {
1962         start.try_put(tbb::flow::continue_msg());
1963         g.wait_for_all();
1964         ASSERT(false, "Exception not thrown");
1965     }
1966     catch(std::out_of_range& ex) {
1967         REMARK("Exception: %s (expected)\n", ex.what());
1968     }
1969     catch(...) {
1970         REMARK("Unknown exception caught (expected)\n");
1971     }
1972     ASSERT(nExceptions > 0, "Exception caught, but no body signaled exception being thrown");
1973     nExceptions = 0;
1974     ASSERT(g.exception_thrown(), "Exception not intercepted");
1975     // if exception set, cancellation also set.
1976     ASSERT(g.is_cancelled(), "Exception cancellation not signaled");
1977     // in case we got an exception
1978     try {
1979         g.wait_for_all();  // context still signalled canceled, my_exception still set.
1980     }
1981     catch(...) {
1982         ASSERT(false, "Second exception thrown but no task executing");
1983     }
1984     ASSERT(nExceptions == 0, "body signaled exception being thrown, but no body executed");
1985     ASSERT(!g.exception_thrown(), "exception_thrown flag not reset");
1986     ASSERT(!g.is_cancelled(), "canceled flag not reset");
1987 }
1988
1989 void TestOneThreadNum(int nThread) {
1990     REMARK("Testing %d threads\n", nThread);
1991     g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1992     g_NumThreads = nThread;
1993     tbb::task_scheduler_init init(nThread);
1994     // whole-graph exception catch and rethrow test
1995     test_flow_graph_exception0();
1996     for(int i = 0; i < 4; ++i) {
1997         g_ExceptionInMaster = (i & 1) != 0;
1998         g_SolitaryException = (i & 2) != 0;
1999         REMARK("g_ExceptionInMaster == %s, g_SolitaryException == %s\n",
2000                 g_ExceptionInMaster ? "T":"F",
2001                 g_SolitaryException ? "T":"F");
2002         test_source_node();
2003         test_function_node();
2004         test_continue_node();  // also test broadcast_node
2005         test_multifunction_node();
2006         // single- and multi-item buffering nodes
2007         test_buffer_queue_and_overwrite_node();
2008         test_sequencer_node();
2009         test_priority_queue_node();
2010
2011         // join_nodes
2012         test_join_node<tbb::flow::queueing>();
2013         test_join_node<tbb::flow::reserving>();
2014         test_join_node<tbb::flow::tag_matching>();
2015
2016         test_limiter_node();
2017         test_split_node();
2018         // graph for write_once_node will be complicated by the fact the node will
2019         // not do try_puts after it has been set.  To get parallelism of N we have
2020         // to attach N successor nodes to the write_once (or play some similar game).
2021         // test_write_once_node();
2022         test_indexer_node();
2023     }
2024 }
2025 #endif // TBB_USE_EXCEPTIONS
2026
2027 #if TBB_USE_EXCEPTIONS
2028 int TestMain() {
2029     // reversing the order of tests
2030     for(int nThread=MaxThread; nThread >= MinThread; --nThread) {
2031         TestOneThreadNum(nThread);
2032     }
2033
2034     return Harness::Done;
2035 }
2036 #else
2037 int TestMain() {
2038     return Harness::Skipped;
2039 }
2040 #endif // TBB_USE_EXCEPTIONS