322fc70e82ab636dee7bc91d66c92e8c48a6e4e2
[platform/upstream/tbb.git] / src / test / test_limiter_node.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #include "harness.h"
18 #if __TBB_CPF_BUILD
19 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
20 #include "harness_graph.h"
21 #endif
22 #include "tbb/flow_graph.h"
23 #include "tbb/atomic.h"
24 #include "tbb/task_scheduler_init.h"
25
26 const int L = 10;
27 const int N = 1000;
28
29 using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
30
31 template< typename T >
32 struct serial_receiver : public tbb::flow::receiver<T>, NoAssign {
33    T next_value;
34    tbb::flow::graph& my_graph;
35
36    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
37
38    tbb::task *try_put_task( const T &v ) __TBB_override {
39        ASSERT( next_value++  == v, NULL );
40        return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
41    }
42
43     tbb::flow::graph& graph_reference() __TBB_override {
44         return my_graph;
45     }
46
47 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
48     typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
49     typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
50     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
51     built_predecessors_type bpt;
52     built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
53     void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
54     void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
55     void copy_predecessors( predecessor_list_type & ) __TBB_override { }
56     size_t predecessor_count() __TBB_override { return 0; }
57 #endif
58
59    void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {next_value = T(0);}
60 };
61
62 template< typename T >
63 struct parallel_receiver : public tbb::flow::receiver<T>, NoAssign {
64
65     tbb::atomic<int> my_count;
66     tbb::flow::graph& my_graph;
67
68     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
69
70     tbb::task *try_put_task( const T &/*v*/ ) __TBB_override {
71        ++my_count;
72        return const_cast<tbb::task *>(tbb::flow::internal::SUCCESSFULLY_ENQUEUED);
73     }
74
75     tbb::flow::graph& graph_reference() __TBB_override {
76         return my_graph;
77     }
78
79 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
80     typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
81     typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
82     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
83     built_predecessors_type bpt;
84     built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
85     void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
86     void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
87     void copy_predecessors( predecessor_list_type & ) __TBB_override { }
88     size_t predecessor_count( ) __TBB_override { return 0; }
89 #endif
90     void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {my_count = 0;}
91 };
92
93 template< typename T >
94 struct empty_sender : public tbb::flow::sender<T> {
95         typedef typename tbb::flow::sender<T>::successor_type successor_type;
96
97         bool register_successor( successor_type & ) __TBB_override { return false; }
98         bool remove_successor( successor_type & ) __TBB_override { return false; }
99 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
100         typedef typename tbb::flow::sender<T>::built_successors_type built_successors_type;
101         typedef typename tbb::flow::sender<T>::successor_list_type successor_list_type;
102         built_successors_type bst;
103         built_successors_type &built_successors() __TBB_override { return bst; }
104         void    internal_add_built_successor( successor_type & ) __TBB_override { }
105         void internal_delete_built_successor( successor_type & ) __TBB_override { }
106         void copy_successors( successor_list_type & ) __TBB_override { }
107         size_t successor_count() __TBB_override { return 0; }
108 #endif
109 };
110
111
112 template< typename T >
113 struct put_body : NoAssign {
114
115     tbb::flow::limiter_node<T> &my_lim;
116     tbb::atomic<int> &my_accept_count;
117
118     put_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) :
119         my_lim(lim), my_accept_count(accept_count) {}
120
121     void operator()( int ) const {
122         for ( int i = 0; i < L; ++i ) {
123             bool msg = my_lim.try_put( T(i) );
124             if ( msg == true )
125                ++my_accept_count;
126         }
127     }
128 };
129
130 template< typename T >
131 struct put_dec_body : NoAssign {
132
133     tbb::flow::limiter_node<T> &my_lim;
134     tbb::atomic<int> &my_accept_count;
135
136     put_dec_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) :
137         my_lim(lim), my_accept_count(accept_count) {}
138
139     void operator()( int ) const {
140         int local_accept_count = 0;
141         while ( local_accept_count < N ) {
142             bool msg = my_lim.try_put( T(local_accept_count) );
143             if ( msg == true ) {
144                 ++local_accept_count;
145                 ++my_accept_count;
146                 my_lim.decrement.try_put( tbb::flow::continue_msg() );
147             }
148         }
149     }
150
151 };
152
153 template< typename T >
154 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
155     parallel_receiver<T> r(g);
156     empty_sender< tbb::flow::continue_msg > s;
157     tbb::atomic<int> accept_count;
158     accept_count = 0;
159     tbb::flow::make_edge( lim, r );
160     tbb::flow::make_edge(s, lim.decrement);
161 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
162     ASSERT(lim.decrement.predecessor_count() == 1, NULL);
163     ASSERT(lim.successor_count() == 1, NULL);
164     ASSERT(lim.predecessor_count() == 0, NULL);
165     typename tbb::flow::interface10::internal::decrementer
166         <tbb::flow::limiter_node<T>, tbb::flow::continue_msg>::predecessor_list_type dec_preds;
167     lim.decrement.copy_predecessors(dec_preds);
168     ASSERT(dec_preds.size() == 1, NULL);
169 #endif
170     // test puts with decrements
171     NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
172     int c = accept_count;
173     ASSERT( c == N*num_threads, NULL );
174     ASSERT( r.my_count == N*num_threads, NULL );
175 }
176
177 //
178 // Tests
179 //
180 // limiter only forwards below the limit, multiple parallel senders / single receiver
181 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
182 //
183 //
184 template< typename T >
185 int test_parallel(int num_threads) {
186
187    // test puts with no decrements
188    for ( int i = 0; i < L; ++i ) {
189        tbb::flow::graph g;
190        tbb::flow::limiter_node< T > lim(g, i);
191        parallel_receiver<T> r(g);
192        tbb::atomic<int> accept_count;
193        accept_count = 0;
194        tbb::flow::make_edge( lim, r );
195        // test puts with no decrements
196        NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
197        g.wait_for_all();
198        int c = accept_count;
199        ASSERT( c == i, NULL );
200    }
201
202    // test puts with decrements
203    for ( int i = 1; i < L; ++i ) {
204        tbb::flow::graph g;
205        tbb::flow::limiter_node< T > lim(g, i);
206        test_puts_with_decrements(num_threads, lim, g);
207        tbb::flow::limiter_node< T > lim_copy( lim );
208        test_puts_with_decrements(num_threads, lim_copy, g);
209    }
210
211    return 0;
212 }
213
214 //
215 // Tests
216 //
217 // limiter only forwards below the limit, single sender / single receiver
218 // at reject, a put to decrement, will cause next message to be accepted
219 //
220 template< typename T >
221 int test_serial() {
222
223    // test puts with no decrements
224    for ( int i = 0; i < L; ++i ) {
225        tbb::flow::graph g;
226        tbb::flow::limiter_node< T > lim(g, i);
227        serial_receiver<T> r(g);
228        tbb::flow::make_edge( lim, r );
229        for ( int j = 0; j < L; ++j ) {
230            bool msg = lim.try_put( T(j) );
231            ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL );
232        }
233        g.wait_for_all();
234    }
235
236    // test puts with decrements
237    for ( int i = 1; i < L; ++i ) {
238        tbb::flow::graph g;
239        tbb::flow::limiter_node< T > lim(g, i);
240        serial_receiver<T> r(g);
241        empty_sender< tbb::flow::continue_msg > s;
242        tbb::flow::make_edge( lim, r );
243        tbb::flow::make_edge(s, lim.decrement);
244        for ( int j = 0; j < N; ++j ) {
245            bool msg = lim.try_put( T(j) );
246            ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL );
247            if ( msg == false ) {
248                lim.decrement.try_put( tbb::flow::continue_msg() );
249                msg = lim.try_put( T(j) );
250                ASSERT( msg == true, NULL );
251            }
252        }
253    }
254    return 0;
255 }
256
257 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
258 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
259 #define LIMITER_OUTPUT 0    // port number of the integer output
260
261 typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int,tbb::flow::continue_msg> > mfnode_type;
262
263 tbb::atomic<size_t> emit_count;
264 tbb::atomic<size_t> emit_sum;
265 tbb::atomic<size_t> receive_count;
266 tbb::atomic<size_t> receive_sum;
267
268 struct mfnode_body {
269     int max_cnt;
270     tbb::atomic<int>* my_cnt;
271     mfnode_body(const int& _max, tbb::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
272     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
273         int lcnt = ++(*my_cnt);
274         if(lcnt > max_cnt) {
275             return;
276         }
277         // put one continue_msg to the decrement of the limiter.
278         if(!tbb::flow::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
279             ASSERT(false,"Unexpected rejection of decrement");
280         }
281         {
282             // put messages to the input of the limiter_node until it rejects.
283             while( tbb::flow::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
284                 emit_sum += lcnt;
285                 ++emit_count;
286             }
287         }
288     }
289 };
290
291 struct fn_body {
292     int operator()(const int &in) {
293         receive_sum += in;
294         ++receive_count;
295         return in;
296     }
297 };
298
299 //                   +------------+
300 //    +---------+    |            v
301 //    | mf_node |0---+       +----------+          +----------+
302 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
303 // |  +---------+            +----------+          +----------+  |
304 // |                                                             |
305 // |                                                             |
306 // +-------------------------------------------------------------+
307 //
308 void
309 test_multifunction_to_limiter(int _max, int _nparallel) {
310     tbb::flow::graph g;
311     emit_count = 0;
312     emit_sum = 0;
313     receive_count = 0;
314     receive_sum = 0;
315     tbb::atomic<int> local_cnt;
316     local_cnt = 0;
317     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
318     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
319     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
320     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
321     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrement);
322     tbb::flow::make_edge(lim_node, fn_node);
323     tbb::flow::make_edge(fn_node, mf_node);
324 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
325     REMARK("pred cnt == %d\n",(int)(lim_node.predecessor_count()));
326     REMARK("succ cnt == %d\n",(int)(lim_node.successor_count()));
327     tbb::flow::limiter_node<int>::successor_list_type my_succs;
328     lim_node.copy_successors(my_succs);
329     REMARK("succ cnt from vector  == %d\n",(int)(my_succs.size()));
330     tbb::flow::limiter_node<int>::predecessor_list_type my_preds;
331     lim_node.copy_predecessors(my_preds);
332     REMARK("pred cnt from vector  == %d\n",(int)(my_preds.size()));
333 #endif
334     mf_node.try_put(1);
335     g.wait_for_all();
336     ASSERT(emit_count == receive_count, "counts do not match");
337     ASSERT(emit_sum == receive_sum, "sums do not match");
338
339     // reset, test again
340     g.reset();
341     emit_count = 0;
342     emit_sum = 0;
343     receive_count = 0;
344     receive_sum = 0;
345     local_cnt = 0;;
346     mf_node.try_put(1);
347     g.wait_for_all();
348     ASSERT(emit_count == receive_count, "counts do not match");
349     ASSERT(emit_sum == receive_sum, "sums do not match");
350 }
351
352
353 void
354 test_continue_msg_reception() {
355     tbb::flow::graph g;
356     tbb::flow::limiter_node<int> ln(g,2);
357     tbb::flow::queue_node<int>   qn(g);
358     tbb::flow::make_edge(ln, qn);
359     ln.decrement.try_put(tbb::flow::continue_msg());
360     ln.try_put(42);
361     g.wait_for_all();
362     int outint;
363     ASSERT(qn.try_get(outint) && outint == 42, "initial put to decrement stops node");
364 }
365
366 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
367 using namespace tbb::flow;
368 void run_and_check_result(graph& g, limiter_node<int>& limit, queue_node<int>& queue, broadcast_node<continue_msg>& broad) {
369     ASSERT( limit.try_put(1), NULL );
370     ASSERT( limit.try_put(2), NULL );
371     ASSERT( !limit.try_put(3), NULL );
372     ASSERT( broad.try_put(continue_msg()), NULL );
373     ASSERT( limit.decrement.try_put(continue_msg()), NULL );
374     ASSERT( limit.try_put(4), NULL );
375     ASSERT( !limit.try_put(5), NULL );
376     g.wait_for_all();
377
378     int list[] = {1, 2, 4};
379     int var = 0;
380     for (size_t i = 0; i < sizeof(list)/sizeof(list[0]); i++) {
381         queue.try_get(var);
382         ASSERT(var==list[i], "some data dropped, input does not match output");
383     }
384 }
385
386 void test_num_decrement_predecessors() {
387     graph g;
388     queue_node<int> output_queue(g);
389     limiter_node<int> limit1(g, 2, /*number_of_predecessors*/1);
390     limiter_node<int, continue_msg> limit2(g, 2, /*number_of_predecessors*/1);
391     broadcast_node<continue_msg> broadcast(g);
392
393     make_edge(limit1, output_queue);
394     make_edge(limit2, output_queue);
395
396     make_edge(broadcast, limit1.decrement);
397     make_edge(broadcast, limit2.decrement);
398
399     run_and_check_result(g, limit1, output_queue, broadcast);
400     run_and_check_result(g, limit2, output_queue, broadcast);
401 }
402 #else // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
403 //
404 // This test ascertains that if a message is not successfully put
405 // to a successor, the message is not dropped but released.
406 //
407
408 void test_reserve_release_messages() {
409     using namespace tbb::flow;
410     graph g;
411
412     //making two queue_nodes: one broadcast_node and one limiter_node
413     queue_node<int> input_queue(g);
414     queue_node<int> output_queue(g);
415     broadcast_node<int> broad(g);
416     limiter_node<int, int> limit(g,2); //threshold of 2
417
418     //edges
419     make_edge(input_queue, limit);
420     make_edge(limit, output_queue);
421     make_edge(broad,limit.decrement);
422
423     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
424
425     input_queue.try_put(list[0]); // succeeds
426     input_queue.try_put(list[1]); // succeeds
427     input_queue.try_put(list[2]); // fails, stored in upstream buffer
428     g.wait_for_all();
429
430     remove_edge(limit, output_queue); //remove successor
431
432     //sending message to the decrement port of the limiter
433     broad.try_put(1); //failed message retrieved.
434     g.wait_for_all();
435
436     make_edge(limit, output_queue); //putting the successor back
437
438     broad.try_put(1);  //drop the count
439
440     input_queue.try_put(list[3]);  //success
441     g.wait_for_all();
442
443     int var=0;
444
445     for (int i=0; i<4; i++) {
446         output_queue.try_get(var);
447         ASSERT(var==list[i], "some data dropped, input does not match output");
448         g.wait_for_all();
449     }
450 }
451
452 void test_decrementer() {
453     const int threshold = 5;
454     tbb::flow::graph g;
455     tbb::flow::limiter_node<int, int> limit(g, threshold);
456     tbb::flow::queue_node<int> queue(g);
457     make_edge(limit, queue);
458     int m = 0;
459     ASSERT( limit.try_put( m++ ), "Newly constructed limiter node does not accept message." );
460     ASSERT( limit.decrement.try_put( -threshold ), // close limiter's gate
461             "Limiter node decrementer's port does not accept message." );
462     ASSERT( !limit.try_put( m++ ), "Closed limiter node's accepts message." );
463     ASSERT( limit.decrement.try_put( threshold + 5 ),  // open limiter's gate
464             "Limiter node decrementer's port does not accept message." );
465     for( int i = 0; i < threshold; ++i )
466         ASSERT( limit.try_put( m++ ), "Limiter node does not accept message while open." );
467     ASSERT( !limit.try_put( m ), "Limiter node's gate is not closed." );
468     g.wait_for_all();
469     int expected[] = {0, 2, 3, 4, 5, 6};
470     int actual = -1; m = 0;
471     while( queue.try_get(actual) )
472         ASSERT( actual == expected[m++], NULL );
473     ASSERT( sizeof(expected) / sizeof(expected[0]) == m, "Not all messages have been processed." );
474     g.wait_for_all();
475
476     const size_t threshold2 = size_t(-1);
477     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
478     make_edge(limit2, queue);
479     ASSERT( limit2.try_put( 1 ), "Newly constructed limiter node does not accept message." );
480     long long decrement_value = (long long)( size_t(-1)/2 );
481     ASSERT( limit2.decrement.try_put( -decrement_value ),
482             "Limiter node decrementer's port does not accept message" );
483     ASSERT( limit2.try_put( 2 ), "Limiter's gate should not be closed yet." );
484     ASSERT( limit2.decrement.try_put( -decrement_value ),
485             "Limiter node decrementer's port does not accept message" );
486     ASSERT( !limit2.try_put( 3 ), "Overflow happened for internal counter." );
487     int expected2[] = {1, 2};
488     actual = -1; m = 0;
489     while( queue.try_get(actual) )
490         ASSERT( actual == expected2[m++], NULL );
491     ASSERT( sizeof(expected2) / sizeof(expected2[0]) == m, "Not all messages have been processed." );
492     g.wait_for_all();
493 }
494 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
495
496 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
497 void test_extract() {
498     tbb::flow::graph g;
499     int j;
500     tbb::flow::limiter_node<int> node0(g, /*threshold*/1);
501     tbb::flow::queue_node<int> q0(g);
502     tbb::flow::queue_node<int> q1(g);
503     tbb::flow::queue_node<int> q2(g);
504     tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
505     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
506
507     for( int i = 0; i < 2; ++i ) {
508         REMARK("At pass %d\n", i);
509         ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count at start");
510         ASSERT(node0.successor_count() == 0, "incorrect successor count at start");
511         ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count at start");
512
513         tbb::flow::make_edge(q0, node0);
514         tbb::flow::make_edge(q1, node0);
515         tbb::flow::make_edge(node0, q2);
516         tbb::flow::make_edge(b0, node0.decrement);
517         tbb::flow::make_edge(b1, node0.decrement);
518         g.wait_for_all();
519
520         /*    b0   b1              */
521         /*      \  |               */
522         /*  q0\  \ |               */
523         /*     \  \|               */
524         /*      +-node0---q2       */
525         /*     /                   */
526         /*  q1/                    */
527
528         q0.try_put(i);
529         g.wait_for_all();
530         ASSERT(node0.predecessor_count() == 2, "incorrect predecessor count after construction");
531         ASSERT(node0.successor_count() == 1, "incorrect successor count after construction");
532         ASSERT(node0.decrement.predecessor_count() == 2, "incorrect decrement pred count after construction");
533         ASSERT(q2.try_get(j), "fetch of value forwarded to output queue failed");
534         ASSERT(j == i, "improper value forwarded to output queue");
535         q0.try_put(2*i);
536         g.wait_for_all();
537         ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly");
538         b0.try_put(tbb::flow::continue_msg());
539         g.wait_for_all();
540         ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly");
541         b0.try_put(tbb::flow::continue_msg());
542         g.wait_for_all();
543         ASSERT(q2.try_get(j) && j == 2*i, "limiter_node failed to forward item");
544
545         tbb::flow::limiter_node<int>::successor_list_type sv;
546         tbb::flow::limiter_node<int>::predecessor_list_type pv;
547         tbb::flow::continue_receiver::predecessor_list_type dv;
548         tbb::flow::limiter_node<int>::successor_list_type sv1;
549         tbb::flow::limiter_node<int>::predecessor_list_type pv1;
550         tbb::flow::continue_receiver::predecessor_list_type dv1;
551
552         node0.copy_predecessors(pv);
553         node0.copy_successors(sv);
554         node0.decrement.copy_predecessors(dv);
555         pv1.push_back(&(q0));
556         pv1.push_back(&(q1));
557         sv1.push_back(&(q2));
558         dv1.push_back(&(b0));
559         dv1.push_back(&(b1));
560
561         ASSERT(pv.size() == 2, "improper size for predecessors");
562         ASSERT(sv.size() == 1, "improper size for successors");
563         ASSERT(lists_match(pv,pv1), "predecessor lists do not match");
564         ASSERT(lists_match(sv,sv1), "successor lists do not match");
565         ASSERT(lists_match(dv,dv1), "successor lists do not match");
566
567         if(i == 0) {
568             node0.extract();
569             ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extraction");
570             ASSERT(node0.successor_count() == 0, "incorrect successor count after extraction");
571             ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extraction");
572         }
573         else {
574             q0.extract();
575             b0.extract();
576             q2.extract();
577
578             ASSERT(node0.predecessor_count() == 1, "incorrect predecessor count after extract second iter");
579             ASSERT(node0.successor_count() == 0, "incorrect successor count after extract second iter");
580             ASSERT(node0.decrement.predecessor_count() == 1, "incorrect decrement pred count after extract second iter");
581
582             node0.copy_predecessors(pv);
583             node0.copy_successors(sv);
584             node0.decrement.copy_predecessors(dv);
585             pv1.clear();
586             sv1.clear();
587             dv1.clear();
588             pv1.push_back(&(q1));
589             dv1.push_back(&(b1));
590
591             ASSERT(lists_match(pv,pv1), "predecessor lists do not match second iter");
592             ASSERT(lists_match(sv,sv1), "successor lists do not match second iter");
593             ASSERT(lists_match(dv,dv1), "successor lists do not match second iter");
594
595             q1.extract();
596             b1.extract();
597         }
598         ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extract");
599         ASSERT(node0.successor_count() == 0, "incorrect successor count after extract");
600         ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extract");
601
602     }
603
604 }
605 #endif  // TBB_DEPRECATED_FLOW_NODE_EXTRACTION
606
607 int TestMain() {
608     for (int i = 1; i <= 8; ++i) {
609         tbb::task_scheduler_init init(i);
610         test_serial<int>();
611         test_parallel<int>(i);
612     }
613     test_continue_msg_reception();
614     test_multifunction_to_limiter(30,3);
615     test_multifunction_to_limiter(300,13);
616     test_multifunction_to_limiter(3000,1);
617 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
618     test_num_decrement_predecessors();
619 #else
620     test_reserve_release_messages();
621     test_decrementer();
622 #endif
623 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
624     test_extract();
625 #endif
626    return Harness::Done;
627 }