4d40bed7af1684c447ac61747f4561a5f7e6ec7d
[platform/upstream/tbb.git] / src / test / test_sequencer_node.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #include "harness.h"
18
19 #if __TBB_CPF_BUILD
20 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
21 #endif
22
23 #include "tbb/flow_graph.h"
24 #include "tbb/task_scheduler_init.h"
25 #include "tbb/tick_count.h"
26 #include "tbb/atomic.h"
27 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
28 #include "harness_graph.h"
29 #endif
30
31 #include <cstdio>
32
33 #define N 1000
34 #define C 10
35
36 template< typename T >
37 struct seq_inspector {
38     size_t operator()(const T &v) const { return size_t(v); }
39 };
40
41 template< typename T >
42 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
43     g.wait_for_all();
44     return q.try_get(value);
45 }
46
47 template< typename T >
48 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
49     while ( q.try_get(value) != true ) ;
50 }
51
52 template< typename T >
53 struct parallel_puts : NoAssign {
54
55     tbb::flow::sequencer_node<T> &my_q;
56     int my_num_threads;
57
58     parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
59
60     void operator()(int tid) const {
61         for (int j = tid; j < N; j+=my_num_threads) {
62             bool msg = my_q.try_put( T(j) );
63             ASSERT( msg == true, NULL );
64         }
65     }
66
67 };
68
69 template< typename T >
70 struct touches {
71
72     bool **my_touches;
73     T *my_last_touch;
74     int my_num_threads;
75
76     touches( int num_threads ) : my_num_threads(num_threads) {
77         my_last_touch = new T[my_num_threads];
78         my_touches = new bool* [my_num_threads];
79         for ( int p = 0; p < my_num_threads; ++p) {
80             my_last_touch[p] = T(-1);
81             my_touches[p] = new bool[N];
82             for ( int n = 0; n < N; ++n)
83                 my_touches[p][n] = false;
84         }
85     }
86
87     ~touches() {
88         for ( int p = 0; p < my_num_threads; ++p) {
89             delete [] my_touches[p];
90         }
91         delete [] my_touches;
92         delete [] my_last_touch;
93     }
94
95     bool check( int tid, T v ) {
96         if ( my_touches[tid][v] != false ) {
97             printf("Error: value seen twice by local thread\n");
98             return false;
99         }
100         if ( v <= my_last_touch[tid] ) {
101             printf("Error: value seen in wrong order by local thread\n");
102             return false;
103         }
104         my_last_touch[tid] = v;
105         my_touches[tid][v] = true;
106         return true;
107     }
108
109     bool validate_touches() {
110         bool *all_touches = new bool[N];
111         for ( int n = 0; n < N; ++n)
112             all_touches[n] = false;
113
114         for ( int p = 0; p < my_num_threads; ++p) {
115             for ( int n = 0; n < N; ++n) {
116                 if ( my_touches[p][n] == true ) {
117                     ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
118                     all_touches[n] = true;
119                 }
120             }
121         }
122         for ( int n = 0; n < N; ++n) {
123             if ( !all_touches[n] )
124                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
125             //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
126         }
127         delete [] all_touches;
128         return true;
129     }
130
131 };
132
133 template< typename T >
134 struct parallel_gets : NoAssign {
135
136     tbb::flow::sequencer_node<T> &my_q;
137     int my_num_threads;
138     touches<T> &my_touches;
139
140     parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
141
142     void operator()(int tid) const {
143         for (int j = tid; j < N; j+=my_num_threads) {
144             T v;
145             spin_try_get( my_q, v );
146             my_touches.check( tid, v );
147         }
148     }
149
150 };
151
152 template< typename T >
153 struct parallel_put_get : NoAssign {
154
155     tbb::flow::sequencer_node<T> &my_s1;
156     tbb::flow::sequencer_node<T> &my_s2;
157     int my_num_threads;
158     tbb::atomic< int > &my_counter;
159     touches<T> &my_touches;
160
161     parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
162                       tbb::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
163
164     void operator()(int tid) const {
165         int i_start = 0;
166
167         while ( (i_start = my_counter.fetch_and_add(C)) < N ) {
168             int i_end = ( N < i_start + C ) ? N : i_start + C;
169             for (int i = i_start; i < i_end; ++i) {
170                 bool msg = my_s1.try_put( T(i) );
171                 ASSERT( msg == true, NULL );
172             }
173
174             for (int i = i_start; i < i_end; ++i) {
175                 T v;
176                 spin_try_get( my_s2, v );
177                 my_touches.check( tid, v );
178             }
179         }
180     }
181
182 };
183
184 //
185 // Tests
186 //
187 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
188 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
189 //
190
191 template< typename T >
192 int test_parallel(int num_threads) {
193     tbb::flow::graph g;
194
195     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
196     NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
197     {
198         touches<T> t( num_threads );
199         NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
200         g.wait_for_all();
201         ASSERT( t.validate_touches(), NULL );
202     }
203     T bogus_value(-1);
204     T j = bogus_value;
205     ASSERT( s.try_get( j ) == false, NULL );
206     ASSERT( j == bogus_value, NULL );
207     g.wait_for_all();
208
209     tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
210     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
211     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
212     tbb::flow::make_edge( s1, s2 );
213     tbb::flow::make_edge( s2, s3 );
214
215     {
216         touches<T> t( num_threads );
217         tbb::atomic<int> counter;
218         counter = 0;
219         NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
220         g.wait_for_all();
221         t.validate_touches();
222     }
223     g.wait_for_all();
224     ASSERT( s1.try_get( j ) == false, NULL );
225     g.wait_for_all();
226     ASSERT( s2.try_get( j ) == false, NULL );
227     g.wait_for_all();
228     ASSERT( s3.try_get( j ) == false, NULL );
229     ASSERT( j == bogus_value, NULL );
230
231     // test copy constructor
232     tbb::flow::sequencer_node<T> s_copy(s);
233     NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
234     for (int i = 0; i < N; ++i) {
235         j = bogus_value;
236         spin_try_get( s_copy, j );
237         ASSERT( i == j, NULL );
238     }
239     j = bogus_value;
240     g.wait_for_all();
241     ASSERT( s_copy.try_get( j ) == false, NULL );
242     ASSERT( j == bogus_value, NULL );
243
244     return 0;
245 }
246
247
248 //
249 // Tests
250 //
251 // No predecessors can be registered
252 // Request from empty buffer fails
253 // In-order puts, single sender, single receiver, properly sequenced at output
254 // Reverse-order puts, single sender, single receiver, properly sequenced at output
255 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
256 //
257
258 template< typename T >
259 int test_serial() {
260     tbb::flow::graph g;
261     T bogus_value(-1);
262
263     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
264     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
265     T j = bogus_value;
266
267     //
268     // Rejects attempts to add / remove predecessor
269     // Rejects request from empty Q
270     //
271     ASSERT( s.register_predecessor( s2 ) == false, NULL );
272     ASSERT( s.remove_predecessor( s2 ) == false, NULL );
273     ASSERT( s.try_get( j ) == false, NULL );
274     ASSERT( j == bogus_value, NULL );
275
276     //
277     // In-order simple puts and gets
278     //
279
280     for (int i = 0; i < N; ++i) {
281         bool msg = s.try_put( T(i) );
282         ASSERT( msg == true, NULL );
283         ASSERT(!s.try_put( T(i) ), NULL);  // second attempt to put should reject
284     }
285
286
287     for (int i = 0; i < N; ++i) {
288         j = bogus_value;
289         ASSERT(wait_try_get( g, s, j ) == true, NULL);
290         ASSERT( i == j, NULL );
291         ASSERT(!s.try_put( T(i) ),NULL );  // after retrieving value, subsequent put should fail
292     }
293     j = bogus_value;
294     g.wait_for_all();
295     ASSERT( s.try_get( j ) == false, NULL );
296     ASSERT( j == bogus_value, NULL );
297
298     //
299     // Reverse-order simple puts and gets
300     //
301
302     for (int i = N-1; i >= 0; --i) {
303         bool msg = s2.try_put( T(i) );
304         ASSERT( msg == true, NULL );
305     }
306
307     for (int i = 0; i < N; ++i) {
308         j = bogus_value;
309         ASSERT(wait_try_get( g, s2, j ) == true, NULL);
310         ASSERT( i == j, NULL );
311     }
312     j = bogus_value;
313     g.wait_for_all();
314     ASSERT( s2.try_get( j ) == false, NULL );
315     ASSERT( j == bogus_value, NULL );
316
317     //
318     // Chained in-order simple puts and gets
319     //
320
321     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
322     tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
323     tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
324     tbb::flow::make_edge( s3, s4 );
325     tbb::flow::make_edge( s4, s5 );
326
327     for (int i = 0; i < N; ++i) {
328         bool msg = s3.try_put( T(i) );
329         ASSERT( msg == true, NULL );
330     }
331
332     for (int i = 0; i < N; ++i) {
333         j = bogus_value;
334         ASSERT(wait_try_get( g, s5, j ) == true, NULL);
335         ASSERT( i == j, NULL );
336     }
337     j = bogus_value;
338     ASSERT( wait_try_get( g, s3, j ) == false, NULL );
339     ASSERT( wait_try_get( g, s4, j ) == false, NULL );
340     ASSERT( wait_try_get( g, s5, j ) == false, NULL );
341     ASSERT( j == bogus_value, NULL );
342
343     g.wait_for_all();
344     tbb::flow::remove_edge( s3, s4 );
345     ASSERT( s3.try_put( N ) == true, NULL );
346     ASSERT( wait_try_get( g, s4, j ) == false, NULL );
347     ASSERT( j == bogus_value, NULL );
348     ASSERT( wait_try_get( g, s5, j ) == false, NULL );
349     ASSERT( j == bogus_value, NULL );
350     ASSERT( wait_try_get( g, s3, j ) == true, NULL );
351     ASSERT( j == N, NULL );
352
353     //
354     // Chained reverse-order simple puts and gets
355     //
356
357     tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
358     tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
359     tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
360     tbb::flow::make_edge( s6, s7 );
361     tbb::flow::make_edge( s7, s8 );
362
363     for (int i = N-1; i >= 0; --i) {
364         bool msg = s6.try_put( T(i) );
365         ASSERT( msg == true, NULL );
366     }
367
368     for (int i = 0; i < N; ++i) {
369         j = bogus_value;
370         ASSERT( wait_try_get( g, s8, j ) == true, NULL );
371         ASSERT( i == j, NULL );
372     }
373     j = bogus_value;
374     ASSERT( wait_try_get( g, s6, j ) == false, NULL );
375     ASSERT( wait_try_get( g, s7, j ) == false, NULL );
376     ASSERT( wait_try_get( g, s8, j ) == false, NULL );
377     ASSERT( j == bogus_value, NULL );
378
379     g.wait_for_all();
380     tbb::flow::remove_edge( s6, s7 );
381     ASSERT( s6.try_put( N ) == true, NULL );
382     ASSERT( wait_try_get( g, s7, j ) == false, NULL );
383     ASSERT( j == bogus_value, NULL );
384     ASSERT( wait_try_get( g, s8, j ) == false, NULL );
385     ASSERT( j == bogus_value, NULL );
386     ASSERT( wait_try_get( g, s6, j ) == true, NULL );
387     ASSERT( j == N, NULL );
388
389     return 0;
390 }
391
392 int TestMain() {
393     tbb::tick_count start = tbb::tick_count::now(), stop;
394     for (int p = 2; p <= 4; ++p) {
395         tbb::task_scheduler_init init(p);
396         test_serial<int>();
397         test_parallel<int>(p);
398     }
399 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
400     test_buffer_extract<tbb::flow::sequencer_node<int> >().run_tests();
401 #endif
402     stop = tbb::tick_count::now();
403     REMARK("Sequencer_Node Time=%6.6f\n", (stop-start).seconds());
404     return Harness::Done;
405 }