2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #define HARNESS_DEFAULT_MIN_THREADS 2
18 #define HARNESS_DEFAULT_MAX_THREADS 4
19 #include "harness_defs.h"
22 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
25 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
26 // Suppress "unreachable code" warning by VC++ 17.0-18.0 (VS 2012 or newer)
27 #pragma warning (disable: 4702)
32 // global task_scheduler_observer is an imperfect tool to find how many threads are really
33 // participating. That was the hope, but it counts the entries into the marketplace,
35 // #define USE_TASK_SCHEDULER_OBSERVER 1
37 #if _MSC_VER && defined(__INTEL_COMPILER) && !TBB_USE_DEBUG
38 #define TBB_RUN_BUFFERING_TEST __INTEL_COMPILER > 1210
40 #define TBB_RUN_BUFFERING_TEST 1
43 #if TBB_USE_EXCEPTIONS
44 #if USE_TASK_SCHEDULER_OBSERVER
45 #include "tbb/task_scheduler_observer.h"
47 #include "tbb/flow_graph.h"
48 #include "tbb/task_scheduler_init.h"
51 #include "harness_assert.h"
52 #include "harness_checktype.h"
54 inline intptr_t Existed() { return INT_MAX; } // resolve Existed in harness_eh.h
56 #include "harness_eh.h"
62 tbb::atomic<unsigned> nExceptions;
63 tbb::atomic<intptr_t> g_TGCCancelled;
65 enum TestNodeTypeEnum { nonThrowing, isThrowing };
67 static const size_t unlimited_type = 0;
68 static const size_t serial_type = 1;
69 static const size_t limited_type = 4;
71 template<TestNodeTypeEnum T> struct TestNodeTypeName;
72 template<> struct TestNodeTypeName<nonThrowing> { static const char *name() { return "nonThrowing"; } };
73 template<> struct TestNodeTypeName<isThrowing> { static const char *name() { return "isThrowing"; } };
75 template<size_t Conc> struct concurrencyName;
76 template<> struct concurrencyName<serial_type>{ static const char *name() { return "serial"; } };
77 template<> struct concurrencyName<unlimited_type>{ static const char *name() { return "unlimited"; } };
78 template<> struct concurrencyName<limited_type>{ static const char *name() { return "limited"; } };
80 // Class that provides waiting and throwing behavior. If we are not throwing, do nothing
81 // If serial, we can't wait for concurrency to peak; we may be the bottleneck and will
82 // stop further processing. We will execute g_NumThreads + 10 times (the "10" is somewhat
83 // arbitrary, and just makes sure there are enough items in the graph to keep it flowing),
84 // If parallel or serial and throwing, use Harness::ConcurrencyTracker to wait.
86 template<size_t Conc, TestNodeTypeEnum t = nonThrowing>
90 class WaitThrow<serial_type,nonThrowing> {
92 void WaitAndThrow(int cnt, const char * /*name*/) {
93 if(cnt > g_NumThreads + 10) {
94 Harness::ConcurrencyTracker ct;
95 WaitUntilConcurrencyPeaks();
101 class WaitThrow<serial_type,isThrowing> {
103 void WaitAndThrow(int cnt, const char * /*name*/) {
104 if(cnt > g_NumThreads + 10) {
105 Harness::ConcurrencyTracker ct;
106 WaitUntilConcurrencyPeaks();
107 ThrowTestException(1);
112 // for nodes with limited concurrency, if that concurrency is < g_NumThreads, we need
113 // to make sure enough other nodes wait for concurrency to peak. If we are attached to
114 // N successors, for each item we pass to a successor, we will get N executions of the
115 // "absorbers" (because we broadcast to successors.) for an odd number of threads we
116 // need (g_NumThreads - limited + 1) / 2 items (that will give us one extra execution
117 // of an "absorber", but we can't change that without changing the behavior of the node.)
119 class WaitThrow<limited_type,nonThrowing> {
121 void WaitAndThrow(int cnt, const char * /*name*/) {
122 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
125 Harness::ConcurrencyTracker ct;
126 WaitUntilConcurrencyPeaks();
131 class WaitThrow<limited_type,isThrowing> {
133 void WaitAndThrow(int cnt, const char * /*name*/) {
134 Harness::ConcurrencyTracker ct;
135 if(cnt <= (g_NumThreads - (int)limited_type + 1)/2) {
138 WaitUntilConcurrencyPeaks();
139 ThrowTestException(1);
144 class WaitThrow<unlimited_type,nonThrowing> {
146 void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
147 Harness::ConcurrencyTracker ct;
148 WaitUntilConcurrencyPeaks();
153 class WaitThrow<unlimited_type,isThrowing> {
155 void WaitAndThrow(int /*cnt*/, const char * /*name*/) {
156 Harness::ConcurrencyTracker ct;
157 WaitUntilConcurrencyPeaks();
158 ThrowTestException(1);
163 ResetGlobals(bool throwException = true, bool flog = false) {
166 ResetEhGlobals(throwException, flog);
169 // -------source_node body ------------------
170 template <class OutputType, TestNodeTypeEnum TType>
171 class test_source_body : WaitThrow<serial_type, TType> {
172 using WaitThrow<serial_type, TType>::WaitAndThrow;
173 tbb::atomic<int> *my_current_val;
176 test_source_body(tbb::atomic<int> &my_cnt, int multiplier = 1) : my_current_val(&my_cnt), my_mult(multiplier) {
177 REMARK("- --------- - - - constructed %lx\n", (size_t)(my_current_val));
180 bool operator()(OutputType & out) {
182 out = OutputType(my_mult * ++(*my_current_val));
183 REMARK("xx(%lx) out == %d\n", (size_t)(my_current_val), (int)out);
184 if(*my_current_val > g_NumItems) {
185 REMARK(" ------ End of the line!\n");
186 *my_current_val = g_NumItems;
189 WaitAndThrow((int)out,"test_source_body");
193 int count_value() { return (int)*my_current_val; }
196 template <TestNodeTypeEnum TType>
197 class test_source_body<tbb::flow::continue_msg, TType> : WaitThrow<serial_type, TType> {
198 using WaitThrow<serial_type, TType>::WaitAndThrow;
199 tbb::atomic<int> *my_current_val;
201 test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
203 bool operator()(tbb::flow::continue_msg & out) {
205 int outint = ++(*my_current_val);
206 out = tbb::flow::continue_msg();
207 if(*my_current_val > g_NumItems) {
208 *my_current_val = g_NumItems;
211 WaitAndThrow(outint,"test_source_body");
215 int count_value() { return (int)*my_current_val; }
218 // -------{function/continue}_node body ------------------
219 template<class InputType, class OutputType, TestNodeTypeEnum T, size_t Conc>
220 class absorber_body : WaitThrow<Conc,T> {
221 using WaitThrow<Conc,T>::WaitAndThrow;
222 tbb::atomic<int> *my_count;
224 absorber_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { }
225 OutputType operator()(const InputType &/*p_in*/) {
227 int out = ++(*my_count);
228 WaitAndThrow(out,"absorber_body");
231 int count_value() { return *my_count; }
234 // -------multifunction_node body ------------------
237 template<int N,class PortsType>
239 typedef typename tbb::flow::tuple_element<N-1,PortsType>::type::output_type my_type;
241 static void issue_tuple_element( PortsType &my_ports) {
242 ASSERT(tbb::flow::get<N-1>(my_ports).try_put(my_type()), "Error putting to successor");
243 IssueOutput<N-1,PortsType>::issue_tuple_element(my_ports);
247 template<class PortsType>
248 struct IssueOutput<1,PortsType> {
249 typedef typename tbb::flow::tuple_element<0,PortsType>::type::output_type my_type;
251 static void issue_tuple_element( PortsType &my_ports) {
252 ASSERT(tbb::flow::get<0>(my_ports).try_put(my_type()), "Error putting to successor");
256 template<class InputType, class OutputTupleType, TestNodeTypeEnum T, size_t Conc>
257 class multifunction_node_body : WaitThrow<Conc,T> {
258 using WaitThrow<Conc,T>::WaitAndThrow;
259 static const int N = tbb::flow::tuple_size<OutputTupleType>::value;
260 typedef typename tbb::flow::multifunction_node<InputType,OutputTupleType> NodeType;
261 typedef typename NodeType::output_ports_type PortsType;
262 tbb::atomic<int> *my_count;
264 multifunction_node_body(tbb::atomic<int> &my_cnt) : my_count(&my_cnt) { }
265 void operator()(const InputType& /*in*/, PortsType &my_ports) {
267 int out = ++(*my_count);
268 WaitAndThrow(out,"multifunction_node_body");
269 // issue an item to each output port.
270 IssueOutput<N,PortsType>::issue_tuple_element(my_ports);
273 int count_value() { return *my_count; }
276 // --------- body to sort items in sequencer_node
277 template<class BufferItemType>
278 struct sequencer_body {
279 size_t operator()(const BufferItemType &s) {
280 ASSERT(s, "sequencer item out of range (== 0)");
281 return size_t(s) - 1;
285 // --------- body to compare the "priorities" of objects for priority_queue_node five priority levels 0-4.
288 bool operator()(const T &t1, const T &t2) {
289 return (int(t1) % 5) < (int(t2) % 5);
293 // --------- type for < comparison in priority_queue_node.
294 template<class ItemType>
296 bool operator()(const ItemType &lhs, const ItemType &rhs) {
297 return (int(lhs) % 3) < (int(rhs) % 3);
301 // --------- tag methods for tag_matching join_node
302 template<typename TT>
306 tag_func(TT multiplier) : my_mult(multiplier) { }
307 void operator=( const tag_func& other){my_mult = other.my_mult;}
308 // operator() will return [0 .. Count)
309 tbb::flow::tag_value operator()( TT v) {
310 tbb::flow::tag_value t = tbb::flow::tag_value(v / my_mult);
315 // --------- Source body for split_node test.
316 template <class OutputTuple, TestNodeTypeEnum TType>
317 class tuple_test_source_body : WaitThrow<serial_type, TType> {
318 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
319 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
320 using WaitThrow<serial_type, TType>::WaitAndThrow;
321 tbb::atomic<int> *my_current_val;
323 tuple_test_source_body(tbb::atomic<int> &my_cnt) : my_current_val(&my_cnt) { }
325 bool operator()(OutputTuple & out) {
327 int ival = ++(*my_current_val);
328 out = OutputTuple(ItemType0(ival),ItemType1(ival));
329 if(*my_current_val > g_NumItems) {
330 *my_current_val = g_NumItems; // jam the final value; we assert on it later.
333 WaitAndThrow(ival,"tuple_test_source_body");
337 int count_value() { return (int)*my_current_val; }
340 // ------- end of node bodies
342 // source_node is only-serial. source_node can throw, or the function_node can throw.
343 // graph being tested is
345 // source_node+---+parallel function_node
347 // After each run the graph is reset(), to test the reset functionality.
351 template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType>
352 void run_one_source_node_test(bool throwException, bool flog) {
353 typedef test_source_body<ItemType,srcThrowType> src_body_type;
354 typedef absorber_body<ItemType, tbb::flow::continue_msg, absorbThrowType, unlimited_type> parallel_absorb_body_type;
355 tbb::atomic<int> source_body_count;
356 tbb::atomic<int> absorber_body_count;
357 source_body_count = 0;
358 absorber_body_count = 0;
362 g_Master = Harness::CurrentTid();
364 #if USE_TASK_SCHEDULER_OBSERVER
369 tbb::flow::source_node<ItemType> sn(g, src_body_type(source_body_count),/*is_active*/false);
370 parallel_absorb_body_type ab2(absorber_body_count);
371 tbb::flow::function_node<ItemType> parallel_fn(g,tbb::flow::unlimited,ab2);
372 make_edge(sn, parallel_fn);
373 for(int runcnt = 0; runcnt < 2; ++runcnt) {
374 ResetGlobals(throwException,flog);
388 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
389 int src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
390 int sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
392 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception flag in flow::graph not set");
393 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "canceled flag not set");
394 ASSERT(src_cnt <= g_NumItems, "Too many source_node items emitted");
395 ASSERT(sink_cnt <= src_cnt, "Too many source_node items received");
398 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
399 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
400 ASSERT(src_cnt == g_NumItems, "Incorrect # source_node items emitted");
401 ASSERT(sink_cnt == src_cnt, "Incorrect # source_node items received");
403 g.reset(); // resets the body of the source_node and the absorb_nodes.
404 source_body_count = 0;
405 absorber_body_count = 0;
406 ASSERT(!g.exception_thrown(), "Reset didn't clear exception_thrown()");
407 ASSERT(!g.is_cancelled(), "Reset didn't clear is_cancelled()");
408 src_cnt = tbb::flow::copy_body<src_body_type>(sn).count_value();
409 sink_cnt = tbb::flow::copy_body<parallel_absorb_body_type>(parallel_fn).count_value();
410 ASSERT(src_cnt == 0, "source_node count not reset");
411 ASSERT(sink_cnt == 0, "sink_node count not reset");
413 #if USE_TASK_SCHEDULER_OBSERVER
416 } // run_one_source_node_test
419 template<class ItemType, TestNodeTypeEnum srcThrowType, TestNodeTypeEnum absorbThrowType>
420 void run_source_node_test() {
421 run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(false,false);
422 run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,false);
423 run_one_source_node_test<ItemType,srcThrowType,absorbThrowType>(true,true);
424 } // run_source_node_test
426 void test_source_node() {
427 REMARK("Testing source_node\n");
428 check_type<int>::check_type_counter = 0;
429 g_Wakeup_Msg = "source_node(1): Missed wakeup or machine is overloaded?";
430 run_source_node_test<check_type<int>, isThrowing, nonThrowing>();
431 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
432 g_Wakeup_Msg = "source_node(2): Missed wakeup or machine is overloaded?";
433 run_source_node_test<int, isThrowing, nonThrowing>();
434 g_Wakeup_Msg = "source_node(3): Missed wakeup or machine is overloaded?";
435 run_source_node_test<int, nonThrowing, isThrowing>();
436 g_Wakeup_Msg = "source_node(4): Missed wakeup or machine is overloaded?";
437 run_source_node_test<int, isThrowing, isThrowing>();
438 g_Wakeup_Msg = "source_node(5): Missed wakeup or machine is overloaded?";
439 run_source_node_test<check_type<int>, isThrowing, isThrowing>();
440 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
441 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
444 // -------- utilities & types to test function_node and multifunction_node.
446 // need to tell the template which node type I am using so it attaches successors correctly.
447 enum NodeFetchType { func_node_type, multifunc_node_type };
449 template<class NodeType, class ItemType, int indx, NodeFetchType NFT>
452 template<class NodeType, class ItemType, int indx>
453 struct AttachPoint<NodeType,ItemType,indx,multifunc_node_type> {
454 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
455 return tbb::flow::output_port<indx>(n);
459 template<class NodeType, class ItemType, int indx>
460 struct AttachPoint<NodeType,ItemType,indx,func_node_type> {
461 static tbb::flow::sender<ItemType> &GetSender(NodeType &n) {
467 // common template for running function_node, multifunction_node. continue_node
468 // has different firing requirements, so it needs a different graph topology.
470 class SourceNodeType,
471 class SourceNodeBodyType0,
472 class SourceNodeBodyType1,
475 class TestNodeBodyType,
476 class TypeToSink0, // what kind of item are we sending to sink0
477 class TypeToSink1, // what kind of item are we sending to sink1
478 class SinkNodeType0, // will be same for function;
479 class SinkNodeType1, // may differ for multifunction_node
480 class SinkNodeBodyType0,
481 class SinkNodeBodyType1,
485 run_one_functype_node_test(bool throwException, bool flog, const char * /*name*/) {
488 char *saved_msg = const_cast<char *>(g_Wakeup_Msg);
491 tbb::atomic<int> source0_count;
492 tbb::atomic<int> source1_count;
493 tbb::atomic<int> sink0_count;
494 tbb::atomic<int> sink1_count;
495 tbb::atomic<int> test_count;
496 source0_count = source1_count = sink0_count = sink1_count = test_count = 0;
498 #if USE_TASK_SCHEDULER_OBSERVER
503 g_Master = Harness::CurrentTid();
504 SourceNodeType source0(g, SourceNodeBodyType0(source0_count),/*is_active*/false);
505 SourceNodeType source1(g, SourceNodeBodyType1(source1_count),/*is_active*/false);
506 TestNodeType node_to_test(g, Conc, TestNodeBodyType(test_count));
507 SinkNodeType0 sink0(g,tbb::flow::unlimited,SinkNodeBodyType0(sink0_count));
508 SinkNodeType1 sink1(g,tbb::flow::unlimited,SinkNodeBodyType1(sink1_count));
509 make_edge(source0, node_to_test);
510 make_edge(source1, node_to_test);
511 make_edge(AttachPoint<TestNodeType, TypeToSink0, 0, NFT>::GetSender(node_to_test), sink0);
512 make_edge(AttachPoint<TestNodeType, TypeToSink1, 1, NFT>::GetSender(node_to_test), sink1);
514 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
515 sprintf(mymsg, "%s iter=%d, threads=%d, throw=%s, flog=%s", saved_msg, iter, g_NumThreads,
516 throwException?"T":"F", flog?"T":"F");
517 g_Wakeup_Msg = mymsg;
518 ResetGlobals(throwException,flog);
533 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
534 int sb0_cnt = tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value();
535 int sb1_cnt = tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value();
536 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
537 int nb0_cnt = tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value();
538 int nb1_cnt = tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value();
540 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
541 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
542 ASSERT(sb0_cnt + sb1_cnt <= 2*g_NumItems, "Too many items sent by sources");
543 ASSERT(sb0_cnt + sb1_cnt >= t_cnt, "Too many items received by test node");
544 ASSERT(nb0_cnt + nb1_cnt <= t_cnt*2, "Too many items received by sink nodes");
547 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
548 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
549 ASSERT(sb0_cnt + sb1_cnt == 2*g_NumItems, "Missing invocations of source_nodes");
550 ASSERT(t_cnt == 2*g_NumItems, "Not all items reached test node");
551 ASSERT(nb0_cnt == 2*g_NumItems && nb1_cnt == 2*g_NumItems, "Missing items in absorbers");
553 g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes.
554 source0_count = source1_count = sink0_count = sink1_count = test_count = 0;
555 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType0>(source0).count_value(),"Reset source 0 failed");
556 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType1>(source1).count_value(),"Reset source 1 failed");
557 ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed");
558 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType0>(sink0).count_value(),"Reset sink 0 failed");
559 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType1>(sink1).count_value(),"Reset sink 1 failed");
561 g_Wakeup_Msg = saved_msg;
563 #if USE_TASK_SCHEDULER_OBSERVER
568 // Test function_node
570 // graph being tested is
572 // source_node -\ /- parallel function_node
576 // source_node -/ \- parallel function_node
578 // After each run the graph is reset(), to test the reset functionality.
581 TestNodeTypeEnum SType1, // does source node 1 throw?
582 TestNodeTypeEnum SType2, // does source node 2 throw?
583 class Item12, // type of item passed between sources and test node
584 TestNodeTypeEnum FType, // does function node throw?
585 class Item23, // type passed from function_node to sink nodes
586 TestNodeTypeEnum NType1, // does sink node 1 throw?
587 TestNodeTypeEnum NType2, // does sink node 1 throw?
588 class NodePolicy, // rejecting,queueing
589 size_t Conc // is node concurrent? {serial | limited | unlimited}
591 void run_function_node_test() {
593 typedef test_source_body<Item12,SType1> SBodyType1;
594 typedef test_source_body<Item12,SType2> SBodyType2;
595 typedef absorber_body<Item12, Item23, FType, Conc> TestBodyType;
596 typedef absorber_body<Item23,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
597 typedef absorber_body<Item23,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
599 typedef tbb::flow::source_node<Item12> SrcType;
600 typedef tbb::flow::function_node<Item12, Item23, NodePolicy> TestType;
601 typedef tbb::flow::function_node<Item23,tbb::flow::continue_msg> SnkType;
603 for(int i = 0; i < 4; ++i ) {
604 if(i != 2) { // doesn't make sense to flog a non-throwing test
605 bool doThrow = (i & 0x1) != 0;
606 bool doFlog = (i & 0x2) != 0;
607 run_one_functype_node_test<
608 /*SourceNodeType*/ SrcType,
609 /*SourceNodeBodyType0*/ SBodyType1,
610 /*SourceNodeBodyType1*/ SBodyType2,
611 /* NFT */ func_node_type,
612 /*TestNodeType*/ TestType,
613 /*TestNodeBodyType*/ TestBodyType,
614 /*TypeToSink0 */ Item23,
615 /*TypeToSink1 */ Item23,
616 /*SinkNodeType0*/ SnkType,
617 /*SinkNodeType1*/ SnkType,
618 /*SinkNodeBodyType1*/ SinkBodyType1,
619 /*SinkNodeBodyType2*/ SinkBodyType2,
621 (doThrow,doFlog,"function_node");
624 } // run_function_node_test
626 void test_function_node() {
627 REMARK("Testing function_node\n");
629 g_Wakeup_Msg = "function_node(1a): Missed wakeup or machine is overloaded?";
630 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
631 g_Wakeup_Msg = "function_node(1b): Missed wakeup or machine is overloaded?";
632 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
633 g_Wakeup_Msg = "function_node(1c): Missed wakeup or machine is overloaded?";
634 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
637 g_Wakeup_Msg = "function_node(2): Missed wakeup or machine is overloaded?";
638 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
639 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
640 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
641 check_type<int>::check_type_counter = 0;
642 run_function_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, check_type<int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
643 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
645 // unlimited parallel rejecting
646 g_Wakeup_Msg = "function_node(3): Missed wakeup or machine is overloaded?";
647 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
648 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
649 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
651 // limited parallel rejecting
652 g_Wakeup_Msg = "function_node(4): Missed wakeup or machine is overloaded?";
653 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
654 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
655 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
657 // limited parallel queueing
658 g_Wakeup_Msg = "function_node(5): Missed wakeup or machine is overloaded?";
659 run_function_node_test<isThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
660 run_function_node_test<nonThrowing, nonThrowing, int, isThrowing, int, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
661 run_function_node_test<nonThrowing, nonThrowing, int, nonThrowing, int, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
664 g_Wakeup_Msg = "function_node(6): Missed wakeup or machine is overloaded?";
665 run_function_node_test<isThrowing, isThrowing, int, isThrowing, int, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
666 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
669 // ----------------------------------- multifunction_node ----------------------------------
670 // Test multifunction_node.
672 // graph being tested is
674 // source_node -\ /- parallel function_node
676 // +multifunction_node+
678 // source_node -/ \- parallel function_node
680 // After each run the graph is reset(), to test the reset functionality. The
681 // multifunction_node will put an item to each successor for every item
685 TestNodeTypeEnum SType0, // does source node 1 throw?
686 TestNodeTypeEnum SType1, // does source node 2 thorw?
687 class Item12, // type of item passed between sources and test node
688 TestNodeTypeEnum FType, // does multifunction node throw?
689 class ItemTuple, // tuple of types passed from multifunction_node to sink nodes
690 TestNodeTypeEnum NType1, // does sink node 1 throw?
691 TestNodeTypeEnum NType2, // does sink node 2 throw?
692 class NodePolicy, // rejecting,queueing
693 size_t Conc // is node concurrent? {serial | limited | unlimited}
695 void run_multifunction_node_test() {
697 typedef typename tbb::flow::tuple_element<0,ItemTuple>::type Item23Type0;
698 typedef typename tbb::flow::tuple_element<1,ItemTuple>::type Item23Type1;
699 typedef test_source_body<Item12,SType0> SBodyType1;
700 typedef test_source_body<Item12,SType1> SBodyType2;
701 typedef multifunction_node_body<Item12, ItemTuple, FType, Conc> TestBodyType;
702 typedef absorber_body<Item23Type0,tbb::flow::continue_msg, NType1, unlimited_type> SinkBodyType1;
703 typedef absorber_body<Item23Type1,tbb::flow::continue_msg, NType2, unlimited_type> SinkBodyType2;
705 typedef tbb::flow::source_node<Item12> SrcType;
706 typedef tbb::flow::multifunction_node<Item12, ItemTuple, NodePolicy> TestType;
707 typedef tbb::flow::function_node<Item23Type0,tbb::flow::continue_msg> SnkType0;
708 typedef tbb::flow::function_node<Item23Type1,tbb::flow::continue_msg> SnkType1;
710 for(int i = 0; i < 4; ++i ) {
711 if(i != 2) { // doesn't make sense to flog a non-throwing test
712 bool doThrow = (i & 0x1) != 0;
713 bool doFlog = (i & 0x2) != 0;
714 run_one_functype_node_test<
715 /*SourceNodeType*/ SrcType,
716 /*SourceNodeBodyType0*/ SBodyType1,
717 /*SourceNodeBodyType1*/ SBodyType2,
718 /*NFT*/ multifunc_node_type,
719 /*TestNodeType*/ TestType,
720 /*TestNodeBodyType*/ TestBodyType,
721 /*TypeToSink0*/ Item23Type0,
722 /*TypeToSink1*/ Item23Type1,
723 /*SinkNodeType0*/ SnkType0,
724 /*SinkNodeType1*/ SnkType1,
725 /*SinkNodeBodyType0*/ SinkBodyType1,
726 /*SinkNodeBodyType1*/ SinkBodyType2,
728 (doThrow,doFlog,"multifunction_node");
731 } // run_multifunction_node_test
733 void test_multifunction_node() {
734 REMARK("Testing multifunction_node\n");
735 g_Wakeup_Msg = "multifunction_node(source throws,rejecting,serial): Missed wakeup or machine is overloaded?";
737 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,float>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
738 g_Wakeup_Msg = "multifunction_node(test throws,rejecting,serial): Missed wakeup or machine is overloaded?";
739 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
740 g_Wakeup_Msg = "multifunction_node(sink throws,rejecting,serial): Missed wakeup or machine is overloaded?";
741 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::rejecting, serial_type>();
743 g_Wakeup_Msg = "multifunction_node(2): Missed wakeup or machine is overloaded?";
745 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
746 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
747 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
748 check_type<int>::check_type_counter = 0;
749 run_multifunction_node_test<nonThrowing, nonThrowing, check_type<int>, nonThrowing, tbb::flow::tuple<check_type<int>, check_type<int> >, isThrowing, nonThrowing, tbb::flow::queueing, serial_type>();
750 ASSERT(!check_type<int>::check_type_counter, "Some items leaked in test");
752 g_Wakeup_Msg = "multifunction_node(3): Missed wakeup or machine is overloaded?";
753 // unlimited parallel rejecting
754 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
755 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, unlimited_type>();
756 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
758 g_Wakeup_Msg = "multifunction_node(4): Missed wakeup or machine is overloaded?";
759 // limited parallel rejecting
760 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, limited_type>();
761 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::rejecting, (size_t)limited_type>();
762 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::rejecting, (size_t)limited_type>();
764 g_Wakeup_Msg = "multifunction_node(5): Missed wakeup or machine is overloaded?";
765 // limited parallel queueing
766 run_multifunction_node_test<isThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
767 run_multifunction_node_test<nonThrowing, nonThrowing, int, isThrowing, tbb::flow::tuple<int,int>, nonThrowing, nonThrowing, tbb::flow::queueing, (size_t)limited_type>();
768 run_multifunction_node_test<nonThrowing, nonThrowing, int, nonThrowing, tbb::flow::tuple<int,int>, nonThrowing, isThrowing, tbb::flow::queueing, (size_t)limited_type>();
770 g_Wakeup_Msg = "multifunction_node(6): Missed wakeup or machine is overloaded?";
772 run_multifunction_node_test<isThrowing, isThrowing, int, isThrowing, tbb::flow::tuple<int,int>, isThrowing, isThrowing, tbb::flow::rejecting, unlimited_type>();
773 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
777 // Continue node has T predecessors. when it receives messages (continue_msg) on T predecessors
778 // it executes the body of the node, and forwards a continue_msg to its successors.
779 // However many predecessors the continue_node has, that's how many continue_msgs it receives
780 // on input before forwarding a message.
782 // The graph will look like
786 // source_node+------>+broadcast_node+ +continue_node+--->+absorber
790 // The continue_node has unlimited parallelism, no input buffering, and broadcasts to successors.
791 // The absorber is parallel, so each item emitted by the source will result in one thread
792 // spinning. So for N threads we pass N-1 continue_messages, then spin wait and then throw if
793 // we are allowed to.
795 template < class SourceNodeType, class SourceNodeBodyType, class TTestNodeType, class TestNodeBodyType,
796 class SinkNodeType, class SinkNodeBodyType>
797 void run_one_continue_node_test (bool throwException, bool flog) {
800 tbb::atomic<int> source_count;
801 tbb::atomic<int> test_count;
802 tbb::atomic<int> sink_count;
803 source_count = test_count = sink_count = 0;
804 #if USE_TASK_SCHEDULER_OBSERVER
808 g_Master = Harness::CurrentTid();
809 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
810 TTestNodeType node_to_test(g, TestNodeBodyType(test_count));
811 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
812 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g), b2(g), b3(g);
813 make_edge(source, b1);
816 make_edge(b2,node_to_test);
817 make_edge(b3,node_to_test);
818 make_edge(node_to_test, sink);
819 for(int iter = 0; iter < 2; ++iter) {
820 ResetGlobals(throwException,flog);
833 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
834 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
835 int t_cnt = tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value();
836 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
838 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
839 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
840 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
841 ASSERT(sb_cnt >= t_cnt, "Too many items received by test node");
842 ASSERT(nb_cnt <= t_cnt, "Too many items received by sink nodes");
845 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
846 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
847 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
848 ASSERT(t_cnt == g_NumItems, "Not all items reached test node");
849 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
851 g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes.
852 source_count = test_count = sink_count = 0;
853 ASSERT(0 == (int)test_count, "Atomic wasn't reset properly");
854 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
855 ASSERT(0 == tbb::flow::copy_body<TestNodeBodyType>(node_to_test).count_value(),"Reset test_node failed");
856 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
858 #if USE_TASK_SCHEDULER_OBSERVER
865 TestNodeTypeEnum SType, // does source node throw?
866 TestNodeTypeEnum CType, // does continue_node throw?
867 TestNodeTypeEnum AType> // does absorber throw
868 void run_continue_node_test() {
869 typedef test_source_body<tbb::flow::continue_msg,SType> SBodyType;
870 typedef absorber_body<tbb::flow::continue_msg,ItemType,CType,unlimited_type> ContBodyType;
871 typedef absorber_body<ItemType,tbb::flow::continue_msg, AType, unlimited_type> SinkBodyType;
873 typedef tbb::flow::source_node<tbb::flow::continue_msg> SrcType;
874 typedef tbb::flow::continue_node<ItemType> TestType;
875 typedef tbb::flow::function_node<ItemType,tbb::flow::continue_msg> SnkType;
877 for(int i = 0; i < 4; ++i ) {
878 if(i == 2) continue; // don't run (false,true); it doesn't make sense.
879 bool doThrow = (i & 0x1) != 0;
880 bool doFlog = (i & 0x2) != 0;
881 run_one_continue_node_test<
882 /*SourceNodeType*/ SrcType,
883 /*SourceNodeBodyType*/ SBodyType,
884 /*TestNodeType*/ TestType,
885 /*TestNodeBodyType*/ ContBodyType,
886 /*SinkNodeType*/ SnkType,
887 /*SinkNodeBodyType*/ SinkBodyType>
893 void test_continue_node() {
894 REMARK("Testing continue_node\n");
895 g_Wakeup_Msg = "buffer_node(non,is,non): Missed wakeup or machine is overloaded?";
896 run_continue_node_test<int,nonThrowing,isThrowing,nonThrowing>();
897 g_Wakeup_Msg = "buffer_node(non,non,is): Missed wakeup or machine is overloaded?";
898 run_continue_node_test<int,nonThrowing,nonThrowing,isThrowing>();
899 g_Wakeup_Msg = "buffer_node(is,non,non): Missed wakeup or machine is overloaded?";
900 run_continue_node_test<int,isThrowing,nonThrowing,nonThrowing>();
901 g_Wakeup_Msg = "buffer_node(is,is,is): Missed wakeup or machine is overloaded?";
902 run_continue_node_test<int,isThrowing,isThrowing,isThrowing>();
903 check_type<double>::check_type_counter = 0;
904 run_continue_node_test<check_type<double>,isThrowing,isThrowing,isThrowing>();
905 ASSERT(!check_type<double>::check_type_counter, "Dropped objects in continue_node test");
906 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
909 // ---------- buffer_node queue_node overwrite_node --------------
912 class BufferItemType, //
913 class SourceNodeType,
914 class SourceNodeBodyType,
917 class SinkNodeBodyType >
918 void run_one_buffer_node_test(bool throwException,bool flog) {
921 tbb::atomic<int> source_count;
922 tbb::atomic<int> sink_count;
923 source_count = sink_count = 0;
924 #if USE_TASK_SCHEDULER_OBSERVER
928 g_Master = Harness::CurrentTid();
929 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
930 TestNodeType node_to_test(g);
931 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
932 make_edge(source,node_to_test);
933 make_edge(node_to_test, sink);
934 for(int iter = 0; iter < 2; ++iter) {
935 ResetGlobals(throwException,flog);
948 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
949 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
950 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
952 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
953 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
954 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
955 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
958 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
959 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
960 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
961 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
964 remove_edge(node_to_test, sink);
965 node_to_test.try_put(BufferItemType());
968 source_count = sink_count = 0;
970 ASSERT(!node_to_test.try_get(tmp), "node not empty");
971 make_edge(node_to_test, sink);
976 source_count = sink_count = 0;
978 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
979 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
982 #if USE_TASK_SCHEDULER_OBSERVER
986 template<class BufferItemType,
987 TestNodeTypeEnum SourceThrowType,
988 TestNodeTypeEnum SinkThrowType>
989 void run_buffer_queue_and_overwrite_node_test() {
990 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
991 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
993 typedef tbb::flow::source_node<BufferItemType> SrcType;
994 typedef tbb::flow::buffer_node<BufferItemType> BufType;
995 typedef tbb::flow::queue_node<BufferItemType> QueType;
996 typedef tbb::flow::overwrite_node<BufferItemType> OvrType;
997 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
999 for(int i = 0; i < 4; ++i) {
1000 if(i == 2) continue; // no need to test flog w/o throws
1001 bool throwException = (i & 0x1) != 0;
1002 bool doFlog = (i & 0x2) != 0;
1003 #if TBB_RUN_BUFFERING_TEST
1004 run_one_buffer_node_test<
1005 /* class BufferItemType*/ BufferItemType,
1006 /*class SourceNodeType*/ SrcType,
1007 /*class SourceNodeBodyType*/ SourceBodyType,
1008 /*class TestNodeType*/ BufType,
1009 /*class SinkNodeType*/ SnkType,
1010 /*class SinkNodeBodyType*/ SinkBodyType
1011 >(throwException, doFlog);
1012 run_one_buffer_node_test<
1013 /* class BufferItemType*/ BufferItemType,
1014 /*class SourceNodeType*/ SrcType,
1015 /*class SourceNodeBodyType*/ SourceBodyType,
1016 /*class TestNodeType*/ QueType,
1017 /*class SinkNodeType*/ SnkType,
1018 /*class SinkNodeBodyType*/ SinkBodyType
1019 >(throwException, doFlog);
1021 run_one_buffer_node_test<
1022 /* class BufferItemType*/ BufferItemType,
1023 /*class SourceNodeType*/ SrcType,
1024 /*class SourceNodeBodyType*/ SourceBodyType,
1025 /*class TestNodeType*/ OvrType,
1026 /*class SinkNodeType*/ SnkType,
1027 /*class SinkNodeBodyType*/ SinkBodyType
1028 >(throwException, doFlog);
1032 void test_buffer_queue_and_overwrite_node() {
1033 REMARK("Testing buffer_node, queue_node and overwrite_node\n");
1034 #if TBB_RUN_BUFFERING_TEST
1036 REMARK("skip buffer and queue test (known issue)\n");
1038 g_Wakeup_Msg = "buffer, queue, overwrite(is,non): Missed wakeup or machine is overloaded?";
1039 run_buffer_queue_and_overwrite_node_test<int,isThrowing,nonThrowing>();
1040 g_Wakeup_Msg = "buffer, queue, overwrite(non,is): Missed wakeup or machine is overloaded?";
1041 run_buffer_queue_and_overwrite_node_test<int,nonThrowing,isThrowing>();
1042 g_Wakeup_Msg = "buffer, queue, overwrite(is,is): Missed wakeup or machine is overloaded?";
1043 run_buffer_queue_and_overwrite_node_test<int,isThrowing,isThrowing>();
1044 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1047 // ---------- sequencer_node -------------------------
1051 class BufferItemType, //
1052 class SourceNodeType,
1053 class SourceNodeBodyType,
1057 class SinkNodeBodyType >
1058 void run_one_sequencer_node_test(bool throwException,bool flog) {
1061 tbb::atomic<int> source_count;
1062 tbb::atomic<int> sink_count;
1063 source_count = sink_count = 0;
1064 #if USE_TASK_SCHEDULER_OBSERVER
1068 g_Master = Harness::CurrentTid();
1069 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1070 TestNodeType node_to_test(g,SeqBodyType());
1071 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1072 make_edge(source,node_to_test);
1073 make_edge(node_to_test, sink);
1074 for(int iter = 0; iter < 2; ++iter) {
1075 ResetGlobals(throwException,flog);
1076 if(throwException) {
1088 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1089 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1090 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1091 if(throwException) {
1092 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1093 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1094 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1095 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1098 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1099 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1100 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
1101 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1104 remove_edge(node_to_test, sink);
1105 node_to_test.try_put(BufferItemType(g_NumItems + 1));
1106 node_to_test.try_put(BufferItemType(1));
1109 source_count = sink_count = 0;
1110 make_edge(node_to_test, sink);
1115 source_count = sink_count = 0;
1117 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1118 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1121 #if USE_TASK_SCHEDULER_OBSERVER
1126 template<class BufferItemType,
1127 TestNodeTypeEnum SourceThrowType,
1128 TestNodeTypeEnum SinkThrowType>
1129 void run_sequencer_node_test() {
1130 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1131 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1132 typedef sequencer_body<BufferItemType> SeqBodyType;
1134 typedef tbb::flow::source_node<BufferItemType> SrcType;
1135 typedef tbb::flow::sequencer_node<BufferItemType> SeqType;
1136 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1138 for(int i = 0; i < 4; ++i) {
1139 if(i == 2) continue; // no need to test flog w/o throws
1140 bool throwException = (i & 0x1) != 0;
1141 bool doFlog = (i & 0x2) != 0;
1142 run_one_sequencer_node_test<
1143 /* class BufferItemType*/ BufferItemType,
1144 /*class SourceNodeType*/ SrcType,
1145 /*class SourceNodeBodyType*/ SourceBodyType,
1146 /*class TestNodeType*/ SeqType,
1147 /*class SeqBodyType*/ SeqBodyType,
1148 /*class SinkNodeType*/ SnkType,
1149 /*class SinkNodeBodyType*/ SinkBodyType
1150 >(throwException, doFlog);
1156 void test_sequencer_node() {
1157 REMARK("Testing sequencer_node\n");
1158 g_Wakeup_Msg = "sequencer_node(is,non): Missed wakeup or machine is overloaded?";
1159 run_sequencer_node_test<int, isThrowing,nonThrowing>();
1160 check_type<int>::check_type_counter = 0;
1161 g_Wakeup_Msg = "sequencer_node(non,is): Missed wakeup or machine is overloaded?";
1162 run_sequencer_node_test<check_type<int>, nonThrowing,isThrowing>();
1163 ASSERT(!check_type<int>::check_type_counter, "Dropped objects in sequencer_node test");
1164 g_Wakeup_Msg = "sequencer_node(is,is): Missed wakeup or machine is overloaded?";
1165 run_sequencer_node_test<int, isThrowing,isThrowing>();
1166 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1169 // ------------ priority_queue_node ------------------
1172 class BufferItemType,
1173 class SourceNodeType,
1174 class SourceNodeBodyType,
1177 class SinkNodeBodyType >
1178 void run_one_priority_queue_node_test(bool throwException,bool flog) {
1181 tbb::atomic<int> source_count;
1182 tbb::atomic<int> sink_count;
1183 source_count = sink_count = 0;
1184 #if USE_TASK_SCHEDULER_OBSERVER
1188 g_Master = Harness::CurrentTid();
1189 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1191 TestNodeType node_to_test(g);
1193 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1195 make_edge(source,node_to_test);
1196 make_edge(node_to_test, sink);
1197 for(int iter = 0; iter < 2; ++iter) {
1198 ResetGlobals(throwException,flog);
1199 if(throwException) {
1211 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1212 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1213 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1214 if(throwException) {
1215 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1216 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1217 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1218 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1221 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1222 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1223 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_node");
1224 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1227 remove_edge(node_to_test, sink);
1228 node_to_test.try_put(BufferItemType(g_NumItems + 1));
1229 node_to_test.try_put(BufferItemType(g_NumItems + 2));
1230 node_to_test.try_put(BufferItemType());
1233 source_count = sink_count = 0;
1234 make_edge(node_to_test, sink);
1239 source_count = sink_count = 0;
1241 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1242 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1245 #if USE_TASK_SCHEDULER_OBSERVER
1250 template<class BufferItemType,
1251 TestNodeTypeEnum SourceThrowType,
1252 TestNodeTypeEnum SinkThrowType>
1253 void run_priority_queue_node_test() {
1254 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1255 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1256 typedef less_body<BufferItemType> LessBodyType;
1258 typedef tbb::flow::source_node<BufferItemType> SrcType;
1259 typedef tbb::flow::priority_queue_node<BufferItemType,LessBodyType> PrqType;
1260 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1262 for(int i = 0; i < 4; ++i) {
1263 if(i == 2) continue; // no need to test flog w/o throws
1264 bool throwException = (i & 0x1) != 0;
1265 bool doFlog = (i & 0x2) != 0;
1266 run_one_priority_queue_node_test<
1267 /* class BufferItemType*/ BufferItemType,
1268 /*class SourceNodeType*/ SrcType,
1269 /*class SourceNodeBodyType*/ SourceBodyType,
1270 /*class TestNodeType*/ PrqType,
1271 /*class SinkNodeType*/ SnkType,
1272 /*class SinkNodeBodyType*/ SinkBodyType
1273 >(throwException, doFlog);
1277 void test_priority_queue_node() {
1278 REMARK("Testing priority_queue_node\n");
1279 g_Wakeup_Msg = "priority_queue_node(is,non): Missed wakeup or machine is overloaded?";
1280 run_priority_queue_node_test<int, isThrowing,nonThrowing>();
1281 check_type<int>::check_type_counter = 0;
1282 g_Wakeup_Msg = "priority_queue_node(non,is): Missed wakeup or machine is overloaded?";
1283 run_priority_queue_node_test<check_type<int>, nonThrowing,isThrowing>();
1284 ASSERT(!check_type<int>::check_type_counter, "Dropped objects in priority_queue_node test");
1285 g_Wakeup_Msg = "priority_queue_node(is,is): Missed wakeup or machine is overloaded?";
1286 run_priority_queue_node_test<int, isThrowing,isThrowing>();
1287 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1290 // ------------------- join_node ----------------
1291 template<class JP> struct graph_policy_name{
1292 static const char* name() {return "unknown"; }
1294 template<> struct graph_policy_name<tbb::flow::queueing> {
1295 static const char* name() {return "queueing"; }
1297 template<> struct graph_policy_name<tbb::flow::reserving> {
1298 static const char* name() {return "reserving"; }
1300 template<> struct graph_policy_name<tbb::flow::tag_matching> {
1301 static const char* name() {return "tag_matching"; }
1309 class SourceBodyType0,
1311 class SourceBodyType1,
1316 struct run_one_join_node_test {
1317 run_one_join_node_test() {}
1318 static void execute_test(bool throwException,bool flog) {
1319 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1320 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1323 tbb::atomic<int>source0_count;
1324 tbb::atomic<int>source1_count;
1325 tbb::atomic<int>sink_count;
1326 source0_count = source1_count = sink_count = 0;
1327 #if USE_TASK_SCHEDULER_OBSERVER
1331 g_Master = Harness::CurrentTid();
1332 SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false);
1333 SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false);
1334 TestJoinType node_to_test(g);
1335 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1336 make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1337 make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1338 make_edge(node_to_test, sink);
1339 for(int iter = 0; iter < 2; ++iter) {
1340 ResetGlobals(throwException,flog);
1341 if(throwException) {
1355 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1356 int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1357 int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1358 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1359 if(throwException) {
1360 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1361 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1362 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1363 ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes");
1366 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1367 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1368 if(sb0_cnt != g_NumItems) {
1369 REMARK("throwException == %s\n", throwException ? "true" : "false");
1370 REMARK("iter == %d\n", (int)iter);
1371 REMARK("sb0_cnt == %d\n", (int)sb0_cnt);
1372 REMARK("g_NumItems == %d\n", (int)g_NumItems);
1374 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0"); // this one
1375 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1376 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1379 remove_edge(node_to_test, sink);
1380 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 1));
1381 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1384 source0_count = source1_count = sink_count = 0;
1385 make_edge(node_to_test, sink);
1391 source0_count = source1_count = sink_count = 0;
1393 ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1394 ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1395 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1396 ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1399 #if USE_TASK_SCHEDULER_OBSERVER
1403 }; // run_one_join_node_test
1408 class SourceBodyType0,
1410 class SourceBodyType1,
1415 struct run_one_join_node_test<
1416 tbb::flow::tag_matching,
1426 run_one_join_node_test() {}
1427 static void execute_test(bool throwException,bool flog) {
1428 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1429 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1433 tbb::atomic<int>source0_count;
1434 tbb::atomic<int>source1_count;
1435 tbb::atomic<int>sink_count;
1436 source0_count = source1_count = sink_count = 0;
1437 #if USE_TASK_SCHEDULER_OBSERVER
1441 g_Master = Harness::CurrentTid();
1442 SourceType0 source0(g, SourceBodyType0(source0_count, 2),/*is_active*/false);
1443 SourceType1 source1(g, SourceBodyType1(source1_count, 3),/*is_active*/false);
1444 TestJoinType node_to_test(g, tag_func<ItemType0>(ItemType0(2)), tag_func<ItemType1>(ItemType1(3)));
1445 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1446 make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1447 make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1448 make_edge(node_to_test, sink);
1449 for(int iter = 0; iter < 2; ++iter) {
1450 ResetGlobals(throwException,flog);
1451 if(throwException) {
1465 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1466 int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1467 int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1468 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1469 if(throwException) {
1470 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1471 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1472 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1473 ASSERT(nb_cnt <= ((sb0_cnt < sb1_cnt) ? sb0_cnt : sb1_cnt), "Too many items received by sink nodes");
1476 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1477 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1478 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");
1479 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1480 ASSERT(nb_cnt == g_NumItems, "Missing items in absorbers");
1483 remove_edge(node_to_test, sink);
1484 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1485 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1486 g.wait_for_all(); // have to wait for the graph to stop again....
1487 g.reset(); // resets the body of the source_nodes, test_node and the absorb_nodes.
1488 source0_count = source1_count = sink_count = 0;
1489 make_edge(node_to_test, sink);
1490 g.wait_for_all(); // have to wait for the graph to stop again....
1495 source0_count = source1_count = sink_count = 0;
1497 ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1498 ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1499 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1500 ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1503 #if USE_TASK_SCHEDULER_OBSERVER
1507 }; // run_one_join_node_test<tag_matching>
1509 template<class JP, class OutputTuple,
1510 TestNodeTypeEnum SourceThrowType,
1511 TestNodeTypeEnum SinkThrowType>
1512 void run_join_node_test() {
1513 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type ItemType0;
1514 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type ItemType1;
1515 typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0;
1516 typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1;
1517 typedef absorber_body<OutputTuple,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1519 typedef typename tbb::flow::source_node<ItemType0> SourceType0;
1520 typedef typename tbb::flow::source_node<ItemType1> SourceType1;
1521 typedef typename tbb::flow::join_node<OutputTuple,JP> TestJoinType;
1522 typedef typename tbb::flow::function_node<OutputTuple,tbb::flow::continue_msg> SinkType;
1524 for(int i = 0; i < 4; ++i) {
1525 if(2 == i) continue;
1526 bool throwException = (i & 0x1) != 0;
1527 bool doFlog = (i & 0x2) != 0;
1528 run_one_join_node_test<
1537 SinkBodyType>::execute_test(throwException,doFlog);
1542 void test_join_node() {
1543 REMARK("Testing join_node<%s>\n", graph_policy_name<JP>::name());
1544 // only doing two-input joins
1545 g_Wakeup_Msg = "join(is,non): Missed wakeup or machine is overloaded?";
1546 run_join_node_test<JP, tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1547 check_type<int>::check_type_counter = 0;
1548 g_Wakeup_Msg = "join(non,is): Missed wakeup or machine is overloaded?";
1549 run_join_node_test<JP, tbb::flow::tuple<check_type<int>,int>, nonThrowing, isThrowing>();
1550 ASSERT(!check_type<int>::check_type_counter, "Dropped items in test");
1551 g_Wakeup_Msg = "join(is,is): Missed wakeup or machine is overloaded?";
1552 run_join_node_test<JP, tbb::flow::tuple<int,int>, isThrowing, isThrowing>();
1553 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1556 // ------------------- limiter_node -------------
1559 class BufferItemType, //
1560 class SourceNodeType,
1561 class SourceNodeBodyType,
1564 class SinkNodeBodyType >
1565 void run_one_limiter_node_test(bool throwException,bool flog) {
1568 tbb::atomic<int> source_count;
1569 tbb::atomic<int> sink_count;
1570 source_count = sink_count = 0;
1571 #if USE_TASK_SCHEDULER_OBSERVER
1575 g_Master = Harness::CurrentTid();
1576 SourceNodeType source(g, SourceNodeBodyType(source_count),/*is_active*/false);
1577 TestNodeType node_to_test(g,g_NumThreads + 1);
1578 SinkNodeType sink(g,tbb::flow::unlimited,SinkNodeBodyType(sink_count));
1579 make_edge(source,node_to_test);
1580 make_edge(node_to_test, sink);
1581 for(int iter = 0; iter < 2; ++iter) {
1582 ResetGlobals(throwException,flog);
1583 if(throwException) {
1595 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1596 int sb_cnt = tbb::flow::copy_body<SourceNodeBodyType>(source).count_value();
1597 int nb_cnt = tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value();
1598 if(throwException) {
1599 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1600 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1601 ASSERT(sb_cnt <= g_NumItems, "Too many items sent by sources");
1602 ASSERT(nb_cnt <= sb_cnt, "Too many items received by sink nodes");
1605 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1606 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1607 // we stop after limiter's limit, which is g_NumThreads + 1. The source_node
1608 // is invoked one extra time, filling its buffer, so its limit is g_NumThreads + 2.
1609 ASSERT(sb_cnt == g_NumThreads + 2, "Missing invocations of source_node");
1610 ASSERT(nb_cnt == g_NumThreads + 1, "Missing items in absorbers");
1613 remove_edge(node_to_test, sink);
1614 node_to_test.try_put(BufferItemType());
1615 node_to_test.try_put(BufferItemType());
1618 source_count = sink_count = 0;
1620 ASSERT(!node_to_test.try_get(tmp), "node not empty");
1621 make_edge(node_to_test, sink);
1626 source_count = sink_count = 0;
1628 ASSERT(0 == tbb::flow::copy_body<SourceNodeBodyType>(source).count_value(),"Reset source failed");
1629 ASSERT(0 == tbb::flow::copy_body<SinkNodeBodyType>(sink).count_value(),"Reset sink failed");
1632 #if USE_TASK_SCHEDULER_OBSERVER
1637 template<class BufferItemType,
1638 TestNodeTypeEnum SourceThrowType,
1639 TestNodeTypeEnum SinkThrowType>
1640 void run_limiter_node_test() {
1641 typedef test_source_body<BufferItemType,SourceThrowType> SourceBodyType;
1642 typedef absorber_body<BufferItemType,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1644 typedef tbb::flow::source_node<BufferItemType> SrcType;
1645 typedef tbb::flow::limiter_node<BufferItemType> LmtType;
1646 typedef tbb::flow::function_node<BufferItemType,tbb::flow::continue_msg> SnkType;
1648 for(int i = 0; i < 4; ++i) {
1649 if(i == 2) continue; // no need to test flog w/o throws
1650 bool throwException = (i & 0x1) != 0;
1651 bool doFlog = (i & 0x2) != 0;
1652 run_one_limiter_node_test<
1653 /* class BufferItemType*/ BufferItemType,
1654 /*class SourceNodeType*/ SrcType,
1655 /*class SourceNodeBodyType*/ SourceBodyType,
1656 /*class TestNodeType*/ LmtType,
1657 /*class SinkNodeType*/ SnkType,
1658 /*class SinkNodeBodyType*/ SinkBodyType
1659 >(throwException, doFlog);
1663 void test_limiter_node() {
1664 REMARK("Testing limiter_node\n");
1665 g_Wakeup_Msg = "limiter_node(is,non): Missed wakeup or machine is overloaded?";
1666 run_limiter_node_test<int,isThrowing,nonThrowing>();
1667 g_Wakeup_Msg = "limiter_node(non,is): Missed wakeup or machine is overloaded?";
1668 run_limiter_node_test<int,nonThrowing,isThrowing>();
1669 g_Wakeup_Msg = "limiter_node(is,is): Missed wakeup or machine is overloaded?";
1670 run_limiter_node_test<int,isThrowing,isThrowing>();
1671 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1674 // -------- split_node --------------------
1679 class SourceBodyType,
1680 class TestSplitType,
1682 class SinkBodyType0,
1684 class SinkBodyType1>
1685 void run_one_split_node_test(bool throwException, bool flog) {
1689 tbb::atomic<int> source_count;
1690 tbb::atomic<int> sink0_count;
1691 tbb::atomic<int> sink1_count;
1692 source_count = sink0_count = sink1_count = 0;
1693 #if USE_TASK_SCHEDULER_OBSERVER
1698 g_Master = Harness::CurrentTid();
1699 SourceType source(g, SourceBodyType(source_count),/*is_active*/false);
1700 TestSplitType node_to_test(g);
1701 SinkType0 sink0(g,tbb::flow::unlimited,SinkBodyType0(sink0_count));
1702 SinkType1 sink1(g,tbb::flow::unlimited,SinkBodyType1(sink1_count));
1703 make_edge(source, node_to_test);
1704 make_edge(tbb::flow::output_port<0>(node_to_test), sink0);
1705 make_edge(tbb::flow::output_port<1>(node_to_test), sink1);
1707 for(int iter = 0; iter < 2; ++iter) { // run, reset, run again
1708 ResetGlobals(throwException,flog);
1709 if(throwException) {
1721 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1722 int sb_cnt = tbb::flow::copy_body<SourceBodyType>(source).count_value();
1723 int nb0_cnt = tbb::flow::copy_body<SinkBodyType0>(sink0).count_value();
1724 int nb1_cnt = tbb::flow::copy_body<SinkBodyType1>(sink1).count_value();
1725 if(throwException) {
1726 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1727 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1728 ASSERT(sb_cnt <= 2*g_NumItems, "Too many items sent by source");
1729 ASSERT(nb0_cnt + nb1_cnt <= sb_cnt*2, "Too many items received by sink nodes");
1732 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1733 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1734 ASSERT(sb_cnt == g_NumItems, "Missing invocations of source_nodes");
1735 ASSERT(nb0_cnt == g_NumItems && nb1_cnt == g_NumItems, "Missing items in absorbers");
1737 g.reset(); // resets the body of the source_nodes and the absorb_nodes.
1738 source_count = sink0_count = sink1_count = 0;
1739 ASSERT(0 == tbb::flow::copy_body<SourceBodyType>(source).count_value(),"Reset source failed");
1740 ASSERT(0 == tbb::flow::copy_body<SinkBodyType0>(sink0).count_value(),"Reset sink 0 failed");
1741 ASSERT(0 == tbb::flow::copy_body<SinkBodyType1>(sink1).count_value(),"Reset sink 1 failed");
1743 #if USE_TASK_SCHEDULER_OBSERVER
1748 template<class InputTuple,
1749 TestNodeTypeEnum SourceThrowType,
1750 TestNodeTypeEnum SinkThrowType>
1751 void run_split_node_test() {
1752 typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1753 typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1754 typedef tuple_test_source_body<InputTuple,SourceThrowType> SourceBodyType;
1755 typedef absorber_body<ItemType0,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType0;
1756 typedef absorber_body<ItemType1,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType1;
1758 typedef typename tbb::flow::source_node<InputTuple> SourceType;
1759 typedef typename tbb::flow::split_node<InputTuple> TestSplitType;
1760 typedef typename tbb::flow::function_node<ItemType0,tbb::flow::continue_msg> SinkType0;
1761 typedef typename tbb::flow::function_node<ItemType1,tbb::flow::continue_msg> SinkType1;
1763 for(int i = 0; i < 4; ++i) {
1764 if(2 == i) continue;
1765 bool throwException = (i & 0x1) != 0;
1766 bool doFlog = (i & 0x2) != 0;
1767 run_one_split_node_test<
1776 (throwException,doFlog);
1780 void test_split_node() {
1781 REMARK("Testing split_node\n");
1782 g_Wakeup_Msg = "split_node(is,non): Missed wakeup or machine is overloaded?";
1783 run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1784 g_Wakeup_Msg = "split_node(non,is): Missed wakeup or machine is overloaded?";
1785 run_split_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>();
1786 g_Wakeup_Msg = "split_node(is,is): Missed wakeup or machine is overloaded?";
1787 run_split_node_test<tbb::flow::tuple<int,int>, isThrowing, isThrowing>();
1788 g_Wakeup_Msg = g_Orig_Wakeup_Msg;
1791 // --------- indexer_node ----------------------
1793 template < class InputTuple,
1795 class SourceBodyType0,
1797 class SourceBodyType1,
1801 void run_one_indexer_node_test(bool throwException,bool flog) {
1802 typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1803 typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1807 tbb::atomic<int> source0_count;
1808 tbb::atomic<int> source1_count;
1809 tbb::atomic<int> sink_count;
1810 source0_count = source1_count = sink_count = 0;
1811 #if USE_TASK_SCHEDULER_OBSERVER
1815 g_Master = Harness::CurrentTid();
1816 SourceType0 source0(g, SourceBodyType0(source0_count),/*is_active*/false);
1817 SourceType1 source1(g, SourceBodyType1(source1_count),/*is_active*/false);
1818 TestNodeType node_to_test(g);
1819 SinkType sink(g,tbb::flow::unlimited,SinkBodyType(sink_count));
1820 make_edge(source0,tbb::flow::input_port<0>(node_to_test));
1821 make_edge(source1,tbb::flow::input_port<1>(node_to_test));
1822 make_edge(node_to_test, sink);
1823 for(int iter = 0; iter < 2; ++iter) {
1824 ResetGlobals(throwException,flog);
1825 if(throwException) {
1839 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) || !throwException;
1840 int sb0_cnt = tbb::flow::copy_body<SourceBodyType0>(source0).count_value();
1841 int sb1_cnt = tbb::flow::copy_body<SourceBodyType1>(source1).count_value();
1842 int nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1843 if(throwException) {
1844 ASSERT(g.exception_thrown() || okayNoExceptionsCaught, "Exception not caught by graph");
1845 ASSERT(g.is_cancelled() || okayNoExceptionsCaught, "Cancellation not signalled in graph");
1846 ASSERT(sb0_cnt <= g_NumItems && sb1_cnt <= g_NumItems, "Too many items sent by sources");
1847 ASSERT(nb_cnt <= sb0_cnt + sb1_cnt, "Too many items received by sink nodes");
1850 ASSERT(!g.exception_thrown(), "Exception flag in flow::graph set but no throw occurred");
1851 ASSERT(!g.is_cancelled(), "canceled flag set but no throw occurred");
1852 ASSERT(sb0_cnt == g_NumItems, "Missing invocations of source_node0");
1853 ASSERT(sb1_cnt == g_NumItems, "Missing invocations of source_node1");
1854 ASSERT(nb_cnt == 2*g_NumItems, "Missing items in absorbers");
1857 remove_edge(node_to_test, sink);
1858 tbb::flow::input_port<0>(node_to_test).try_put(ItemType0(g_NumItems + 4));
1859 tbb::flow::input_port<1>(node_to_test).try_put(ItemType1(g_NumItems + 2));
1862 source0_count = source1_count = sink_count = 0;
1863 make_edge(node_to_test, sink);
1869 source0_count = source1_count = sink_count = 0;
1871 ASSERT(0 == tbb::flow::copy_body<SourceBodyType0>(source0).count_value(),"Reset source failed");
1872 ASSERT(0 == tbb::flow::copy_body<SourceBodyType1>(source1).count_value(),"Reset source failed");
1873 nb_cnt = tbb::flow::copy_body<SinkBodyType>(sink).count_value();
1874 ASSERT(0 == tbb::flow::copy_body<SinkBodyType>(sink).count_value(),"Reset sink failed");
1877 #if USE_TASK_SCHEDULER_OBSERVER
1882 template<class InputTuple,
1883 TestNodeTypeEnum SourceThrowType,
1884 TestNodeTypeEnum SinkThrowType>
1885 void run_indexer_node_test() {
1886 typedef typename tbb::flow::tuple_element<0,InputTuple>::type ItemType0;
1887 typedef typename tbb::flow::tuple_element<1,InputTuple>::type ItemType1;
1888 typedef test_source_body<ItemType0,SourceThrowType> SourceBodyType0;
1889 typedef test_source_body<ItemType1,SourceThrowType> SourceBodyType1;
1890 typedef typename tbb::flow::indexer_node<ItemType0, ItemType1> TestNodeType;
1891 typedef absorber_body<typename TestNodeType::output_type,tbb::flow::continue_msg,SinkThrowType,unlimited_type> SinkBodyType;
1893 typedef typename tbb::flow::source_node<ItemType0> SourceType0;
1894 typedef typename tbb::flow::source_node<ItemType1> SourceType1;
1895 typedef typename tbb::flow::function_node<typename TestNodeType::output_type,tbb::flow::continue_msg> SinkType;
1897 for(int i = 0; i < 4; ++i) {
1898 if(2 == i) continue;
1899 bool throwException = (i & 0x1) != 0;
1900 bool doFlog = (i & 0x2) != 0;
1901 run_one_indexer_node_test<
1909 SinkBodyType>(throwException,doFlog);
1913 void test_indexer_node() {
1914 REMARK("Testing indexer_node\n");
1915 g_Wakeup_Msg = "indexer_node(is,non): Missed wakeup or machine is overloaded?";
1916 run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, nonThrowing>();
1917 g_Wakeup_Msg = "indexer_node(non,is): Missed wakeup or machine is overloaded?";
1918 run_indexer_node_test<tbb::flow::tuple<int,int>, nonThrowing, isThrowing>();
1919 g_Wakeup_Msg = "indexer_node(is,is): Missed wakeup or machine is overloaded?";
1920 run_indexer_node_test<tbb::flow::tuple<int,int>, isThrowing, isThrowing>();
1921 g_Wakeup_Msg = g_Orig_Wakeup_Msg;;
1924 ///////////////////////////////////////////////
1925 // whole-graph exception test
1929 // std::vector<int>& m_vec;
1930 std::vector<int>* m_vec;
1932 Foo(std::vector<int>& vec) : m_vec(&vec) { }
1933 void operator() (tbb::flow::continue_msg) const {
1935 m_vec->at(m_vec->size()); // Will throw out_of_range exception
1936 ASSERT(false, "Exception not thrown by invalid access");
1940 // test from user ahelwer: http://software.intel.com/en-us/forums/showthread.php?t=103786
1941 // exception thrown in graph node, not caught in wait_for_all()
1943 test_flow_graph_exception0() {
1945 std::vector<int> vec;
1950 // Construct graph and nodes
1952 tbb::flow::broadcast_node<tbb::flow::continue_msg> start(g);
1953 tbb::flow::continue_node<tbb::flow::continue_msg> fooNode(g, f);
1956 tbb::flow::make_edge(start, fooNode);
1959 ASSERT(!g.exception_thrown(), "exception_thrown flag already set");
1960 ASSERT(!g.is_cancelled(), "canceled flag already set");
1962 start.try_put(tbb::flow::continue_msg());
1964 ASSERT(false, "Exception not thrown");
1966 catch(std::out_of_range& ex) {
1967 REMARK("Exception: %s (expected)\n", ex.what());
1970 REMARK("Unknown exception caught (expected)\n");
1972 ASSERT(nExceptions > 0, "Exception caught, but no body signaled exception being thrown");
1974 ASSERT(g.exception_thrown(), "Exception not intercepted");
1975 // if exception set, cancellation also set.
1976 ASSERT(g.is_cancelled(), "Exception cancellation not signaled");
1977 // in case we got an exception
1979 g.wait_for_all(); // context still signalled canceled, my_exception still set.
1982 ASSERT(false, "Second exception thrown but no task executing");
1984 ASSERT(nExceptions == 0, "body signaled exception being thrown, but no body executed");
1985 ASSERT(!g.exception_thrown(), "exception_thrown flag not reset");
1986 ASSERT(!g.is_cancelled(), "canceled flag not reset");
1989 void TestOneThreadNum(int nThread) {
1990 REMARK("Testing %d threads\n", nThread);
1991 g_NumItems = ((nThread > NUM_ITEMS) ? nThread *2 : NUM_ITEMS);
1992 g_NumThreads = nThread;
1993 tbb::task_scheduler_init init(nThread);
1994 // whole-graph exception catch and rethrow test
1995 test_flow_graph_exception0();
1996 for(int i = 0; i < 4; ++i) {
1997 g_ExceptionInMaster = (i & 1) != 0;
1998 g_SolitaryException = (i & 2) != 0;
1999 REMARK("g_ExceptionInMaster == %s, g_SolitaryException == %s\n",
2000 g_ExceptionInMaster ? "T":"F",
2001 g_SolitaryException ? "T":"F");
2003 test_function_node();
2004 test_continue_node(); // also test broadcast_node
2005 test_multifunction_node();
2006 // single- and multi-item buffering nodes
2007 test_buffer_queue_and_overwrite_node();
2008 test_sequencer_node();
2009 test_priority_queue_node();
2012 test_join_node<tbb::flow::queueing>();
2013 test_join_node<tbb::flow::reserving>();
2014 test_join_node<tbb::flow::tag_matching>();
2016 test_limiter_node();
2018 // graph for write_once_node will be complicated by the fact the node will
2019 // not do try_puts after it has been set. To get parallelism of N we have
2020 // to attach N successor nodes to the write_once (or play some similar game).
2021 // test_write_once_node();
2022 test_indexer_node();
2025 #endif // TBB_USE_EXCEPTIONS
2027 #if TBB_USE_EXCEPTIONS
2029 // reversing the order of tests
2030 for(int nThread=MaxThread; nThread >= MinThread; --nThread) {
2031 TestOneThreadNum(nThread);
2034 return Harness::Done;
2038 return Harness::Skipped;
2040 #endif // TBB_USE_EXCEPTIONS