Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / src / test / test_queue_node.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 // TO DO: Add overlapping put / receive tests
18
19 #if __TBB_CPF_BUILD
20 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
21 #endif
22
23 #include "harness.h"
24
25 #include "tbb/flow_graph.h"
26 #include "tbb/task_scheduler_init.h"
27 #include "tbb/tick_count.h"
28 #include "harness_checktype.h"
29 #include "harness_graph.h"
30 #include "test_follows_and_precedes_api.h"
31
32 #include <cstdio>
33
34 #define N 1000
35 #define C 10
36
37 template< typename T >
38 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
39     while ( q.try_get(value) != true ) ;
40 }
41
42 template< typename T >
43 void check_item( T* next_value, T &value ) {
44     int tid = value / N;
45     int offset = value % N;
46     ASSERT( next_value[tid] == T(offset), NULL );
47     ++next_value[tid];
48 }
49
50 template< typename T >
51 struct parallel_puts : NoAssign {
52
53     tbb::flow::queue_node<T> &my_q;
54
55     parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
56
57     void operator()(int i) const {
58         for (int j = 0; j < N; ++j) {
59             bool msg = my_q.try_put( T(N*i + j) );
60             ASSERT( msg == true, NULL );
61         }
62     }
63
64 };
65
66
67
68 template< typename T >
69 struct touches {
70
71     bool **my_touches;
72     T **my_last_touch;
73     int my_num_threads;
74
75     touches( int num_threads ) : my_num_threads(num_threads) {
76         my_last_touch = new T* [my_num_threads];
77         my_touches = new bool* [my_num_threads];
78         for ( int p = 0; p < my_num_threads; ++p) {
79             my_last_touch[p] = new T[my_num_threads];
80             for ( int p2 = 0; p2 < my_num_threads; ++p2)
81                 my_last_touch[p][p2] = -1;
82
83             my_touches[p] = new bool[N*my_num_threads];
84             for ( int n = 0; n < N*my_num_threads; ++n)
85                 my_touches[p][n] = false;
86         }
87     }
88
89     ~touches() {
90         for ( int p = 0; p < my_num_threads; ++p) {
91             delete [] my_touches[p];
92             delete [] my_last_touch[p];
93         }
94         delete [] my_touches;
95         delete [] my_last_touch;
96     }
97
98     bool check( int tid, T v ) {
99         int v_tid = v / N;
100         if ( my_touches[tid][v] != false ) {
101             printf("Error: value seen twice by local thread\n");
102             return false;
103         }
104         if ( v <= my_last_touch[tid][v_tid] ) {
105             printf("Error: value seen in wrong order by local thread\n");
106             return false;
107         }
108         my_last_touch[tid][v_tid] = v;
109         my_touches[tid][v] = true;
110         return true;
111     }
112
113     bool validate_touches() {
114         bool *all_touches = new bool[N*my_num_threads];
115         for ( int n = 0; n < N*my_num_threads; ++n)
116             all_touches[n] = false;
117
118         for ( int p = 0; p < my_num_threads; ++p) {
119             for ( int n = 0; n < N*my_num_threads; ++n) {
120                 if ( my_touches[p][n] == true ) {
121                     ASSERT( all_touches[n] == false, "value see by more than one thread\n" );
122                     all_touches[n] = true;
123                 }
124             }
125         }
126         for ( int n = 0; n < N*my_num_threads; ++n) {
127             if ( !all_touches[n] )
128                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
129             //ASSERT( all_touches[n] == true, "value not seen by any thread\n" );
130         }
131         delete [] all_touches;
132         return true;
133     }
134
135 };
136
137 template< typename T >
138 struct parallel_gets : NoAssign {
139
140     tbb::flow::queue_node<T> &my_q;
141     touches<T> &my_touches;
142
143     parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
144
145     void operator()(int tid) const {
146         for (int j = 0; j < N; ++j) {
147             T v;
148             spin_try_get( my_q, v );
149             my_touches.check( tid, v );
150         }
151     }
152
153 };
154
155 template< typename T >
156 struct parallel_put_get : NoAssign {
157
158     tbb::flow::queue_node<T> &my_q;
159     touches<T> &my_touches;
160
161     parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
162
163     void operator()(int tid) const {
164
165         for ( int i = 0; i < N; i+=C ) {
166             int j_end = ( N < i + C ) ? N : i + C;
167             // dump about C values into the Q
168             for ( int j = i; j < j_end; ++j ) {
169                 ASSERT( my_q.try_put( T (N*tid + j ) ) == true, NULL );
170             }
171             // receiver about C values from the Q
172             for ( int j = i; j < j_end; ++j ) {
173                 T v;
174                 spin_try_get( my_q, v );
175                 my_touches.check( tid, v );
176             }
177         }
178     }
179
180 };
181
182 //
183 // Tests
184 //
185 // Item can be reserved, released, consumed ( single serial receiver )
186 //
187 template< typename T >
188 int test_reservation() {
189     tbb::flow::graph g;
190     T bogus_value(-1);
191
192     // Simple tests
193     tbb::flow::queue_node<T> q(g);
194
195     q.try_put(T(1));
196     q.try_put(T(2));
197     q.try_put(T(3));
198
199     T v;
200     ASSERT( q.reserve_item(v) == true, NULL );
201     ASSERT( v == T(1), NULL );
202     ASSERT( q.release_reservation() == true, NULL );
203     v = bogus_value;
204     g.wait_for_all();
205     ASSERT( q.reserve_item(v) == true, NULL );
206     ASSERT( v == T(1), NULL );
207     ASSERT( q.consume_reservation() == true, NULL );
208     v = bogus_value;
209     g.wait_for_all();
210
211     ASSERT( q.try_get(v) == true, NULL );
212     ASSERT( v == T(2), NULL );
213     v = bogus_value;
214     g.wait_for_all();
215
216     ASSERT( q.reserve_item(v) == true, NULL );
217     ASSERT( v == T(3), NULL );
218     ASSERT( q.release_reservation() == true, NULL );
219     v = bogus_value;
220     g.wait_for_all();
221     ASSERT( q.reserve_item(v) == true, NULL );
222     ASSERT( v == T(3), NULL );
223     ASSERT( q.consume_reservation() == true, NULL );
224     v = bogus_value;
225     g.wait_for_all();
226
227     return 0;
228 }
229
230 //
231 // Tests
232 //
233 // multiple parallel senders, items in FIFO (relatively to sender) order
234 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
235 //   * overlapped puts / gets
236 //   * all puts finished before any getS
237 //
238 template< typename T >
239 int test_parallel(int num_threads) {
240     tbb::flow::graph g;
241     tbb::flow::queue_node<T> q(g);
242     tbb::flow::queue_node<T> q2(g);
243     tbb::flow::queue_node<T> q3(g);
244     {
245         Check< T > my_check;
246         T bogus_value(-1);
247         T j = bogus_value;
248         NativeParallelFor( num_threads, parallel_puts<T>(q) );
249
250         T *next_value = new T[num_threads];
251         for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
252
253         for (int i = 0; i < num_threads * N; ++i ) {
254             spin_try_get( q, j );
255             check_item( next_value, j );
256             j = bogus_value;
257         }
258         for (int tid = 0; tid < num_threads; ++tid)  {
259             ASSERT( next_value[tid] == T(N), NULL );
260         }
261         delete[] next_value;
262
263         j = bogus_value;
264         g.wait_for_all();
265         ASSERT( q.try_get( j ) == false, NULL );
266         ASSERT( j == bogus_value, NULL );
267
268         NativeParallelFor( num_threads, parallel_puts<T>(q) );
269
270         {
271             touches< T > t( num_threads );
272             NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
273             g.wait_for_all();
274             ASSERT( t.validate_touches(), NULL );
275         }
276         j = bogus_value;
277         ASSERT( q.try_get( j ) == false, NULL );
278         ASSERT( j == bogus_value, NULL );
279
280         g.wait_for_all();
281         {
282             touches< T > t2( num_threads );
283             NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
284             g.wait_for_all();
285             ASSERT( t2.validate_touches(), NULL );
286         }
287         j = bogus_value;
288         ASSERT( q.try_get( j ) == false, NULL );
289         ASSERT( j == bogus_value, NULL );
290
291         tbb::flow::make_edge( q, q2 );
292         tbb::flow::make_edge( q2, q3 );
293
294         NativeParallelFor( num_threads, parallel_puts<T>(q) );
295         {
296             touches< T > t3( num_threads );
297             NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
298             g.wait_for_all();
299             ASSERT( t3.validate_touches(), NULL );
300         }
301         j = bogus_value;
302         g.wait_for_all();
303         ASSERT( q.try_get( j ) == false, NULL );
304         g.wait_for_all();
305         ASSERT( q2.try_get( j ) == false, NULL );
306         g.wait_for_all();
307         ASSERT( q3.try_get( j ) == false, NULL );
308         ASSERT( j == bogus_value, NULL );
309
310         // test copy constructor
311         ASSERT( q.remove_successor( q2 ), NULL );
312         NativeParallelFor( num_threads, parallel_puts<T>(q) );
313         tbb::flow::queue_node<T> q_copy(q);
314         j = bogus_value;
315         g.wait_for_all();
316         ASSERT( q_copy.try_get( j ) == false, NULL );
317         ASSERT( q.register_successor( q_copy ) == true, NULL );
318         {
319             touches< T > t( num_threads );
320             NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
321             g.wait_for_all();
322             ASSERT( t.validate_touches(), NULL );
323         }
324         j = bogus_value;
325         ASSERT( q.try_get( j ) == false, NULL );
326         ASSERT( j == bogus_value, NULL );
327         ASSERT( q_copy.try_get( j ) == false, NULL );
328         ASSERT( j == bogus_value, NULL );
329     }
330
331     return 0;
332 }
333
334 //
335 // Tests
336 //
337 // Predecessors cannot be registered
338 // Empty Q rejects item requests
339 // Single serial sender, items in FIFO order
340 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
341 //
342
343 template< typename T >
344 int test_serial() {
345     tbb::flow::graph g;
346     tbb::flow::queue_node<T> q(g);
347     tbb::flow::queue_node<T> q2(g);
348     {   // destroy the graph after manipulating it, and see if all the items in the buffers
349         // have been destroyed before the graph
350         Check<T> my_check;  // if check_type< U > count constructions and destructions
351         T bogus_value(-1);
352         T j = bogus_value;
353
354         //
355         // Rejects attempts to add / remove predecessor
356         // Rejects request from empty Q
357         //
358         ASSERT( q.register_predecessor( q2 ) == false, NULL );
359         ASSERT( q.remove_predecessor( q2 ) == false, NULL );
360         ASSERT( q.try_get( j ) == false, NULL );
361         ASSERT( j == bogus_value, NULL );
362
363         //
364         // Simple puts and gets
365         //
366
367         for (int i = 0; i < N; ++i) {
368             bool msg = q.try_put( T(i) );
369             ASSERT( msg == true, NULL );
370         }
371
372
373         for (int i = 0; i < N; ++i) {
374             j = bogus_value;
375             spin_try_get( q, j );
376             ASSERT( i == j, NULL );
377         }
378         j = bogus_value;
379         g.wait_for_all();
380         ASSERT( q.try_get( j ) == false, NULL );
381         ASSERT( j == bogus_value, NULL );
382
383         tbb::flow::make_edge( q, q2 );
384
385         for (int i = 0; i < N; ++i) {
386             bool msg = q.try_put( T(i) );
387             ASSERT( msg == true, NULL );
388         }
389
390
391         for (int i = 0; i < N; ++i) {
392             j = bogus_value;
393             spin_try_get( q2, j );
394             ASSERT( i == j, NULL );
395         }
396         j = bogus_value;
397         g.wait_for_all();
398         ASSERT( q.try_get( j ) == false, NULL );
399         g.wait_for_all();
400         ASSERT( q2.try_get( j ) == false, NULL );
401         ASSERT( j == bogus_value, NULL );
402
403         tbb::flow::remove_edge( q, q2 );
404         ASSERT( q.try_put( 1 ) == true, NULL );
405         g.wait_for_all();
406         ASSERT( q2.try_get( j ) == false, NULL );
407         ASSERT( j == bogus_value, NULL );
408         g.wait_for_all();
409         ASSERT( q.try_get( j ) == true, NULL );
410         ASSERT( j == 1, NULL );
411
412         tbb::flow::queue_node<T> q3(g);
413         tbb::flow::make_edge( q, q2 );
414         tbb::flow::make_edge( q2, q3 );
415
416         for (int i = 0; i < N; ++i) {
417             bool msg = q.try_put( T(i) );
418             ASSERT( msg == true, NULL );
419         }
420
421         for (int i = 0; i < N; ++i) {
422             j = bogus_value;
423             spin_try_get( q3, j );
424             ASSERT( i == j, NULL );
425         }
426         j = bogus_value;
427         g.wait_for_all();
428         ASSERT( q.try_get( j ) == false, NULL );
429         g.wait_for_all();
430         ASSERT( q2.try_get( j ) == false, NULL );
431         g.wait_for_all();
432         ASSERT( q3.try_get( j ) == false, NULL );
433         ASSERT( j == bogus_value, NULL );
434
435         tbb::flow::remove_edge( q,  q2 );
436         ASSERT( q.try_put( 1 ) == true, NULL );
437         g.wait_for_all();
438         ASSERT( q2.try_get( j ) == false, NULL );
439         ASSERT( j == bogus_value, NULL );
440         g.wait_for_all();
441         ASSERT( q3.try_get( j ) == false, NULL );
442         ASSERT( j == bogus_value, NULL );
443         g.wait_for_all();
444         ASSERT( q.try_get( j ) == true, NULL );
445         ASSERT( j == 1, NULL );
446     }
447
448     return 0;
449 }
450
451 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
452 #include <array>
453 #include <vector>
454 void test_follows_and_precedes_api() {
455     std::array<int, 3> messages_for_follows = {0, 1, 2};
456     std::vector<int> messages_for_precedes = {0, 1, 2};
457
458     follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
459     follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
460 }
461 #endif
462
463 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
464 void test_deduction_guides() {
465     using namespace tbb::flow;
466     graph g;
467     broadcast_node<int> br(g);
468     queue_node<int> q0(g);
469
470 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
471     queue_node q1(follows(br));
472     static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
473
474     queue_node q2(precedes(br));
475     static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
476 #endif
477
478     queue_node q3(q0);
479     static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
480     g.wait_for_all();
481 }
482 #endif
483
484 int TestMain() {
485     tbb::tick_count start = tbb::tick_count::now(), stop;
486     for (int p = 2; p <= 4; ++p) {
487         tbb::task_scheduler_init init(p);
488         test_serial<int>();
489         test_serial<check_type<int> >();
490         test_parallel<int>(p);
491         test_parallel<check_type<int> >(p);
492     }
493     stop = tbb::tick_count::now();
494     REMARK("Queue_Node Time=%6.6f\n", (stop-start).seconds());
495     REMARK("Testing resets\n");
496     test_resets<int, tbb::flow::queue_node<int> >();
497     test_resets<float, tbb::flow::queue_node<float> >();
498 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
499     test_buffer_extract<tbb::flow::queue_node<int> >().run_tests();
500 #endif
501 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
502     test_follows_and_precedes_api();
503 #endif
504 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
505     test_deduction_guides();
506 #endif
507     return Harness::Done;
508 }