Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / src / test / test_broadcast_node.cpp
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #if __TBB_CPF_BUILD
18 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19 #endif
20
21 #include "harness.h"
22
23 #include "tbb/flow_graph.h"
24 #include "tbb/task.h"
25 #include "tbb/atomic.h"
26 #include "test_follows_and_precedes_api.h"
27
28 const int N = 1000;
29 const int R = 4;
30
31 class int_convertable_type : private NoAssign {
32
33    int my_value;
34
35 public:
36
37    int_convertable_type( int v ) : my_value(v) {}
38    operator int() const { return my_value; }
39
40 };
41
42
43 template< typename T >
44 class counting_array_receiver : public tbb::flow::receiver<T> {
45
46     tbb::atomic<size_t> my_counters[N];
47     tbb::flow::graph& my_graph;
48
49 public:
50
51     counting_array_receiver(tbb::flow::graph& g) : my_graph(g) {
52         for (int i = 0; i < N; ++i )
53            my_counters[i] = 0;
54     }
55
56     size_t operator[]( int i ) {
57         size_t v = my_counters[i];
58         return v;
59     }
60
61     tbb::task * try_put_task( const T &v ) __TBB_override {
62         ++my_counters[(int)v];
63         return const_cast<tbb::task *>(tbb::flow::internal::SUCCESSFULLY_ENQUEUED);
64     }
65
66     tbb::flow::graph& graph_reference() const __TBB_override {
67         return my_graph;
68     }
69
70 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
71     typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
72     built_predecessors_type mbp;
73     built_predecessors_type &built_predecessors() __TBB_override { return mbp; }
74     typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
75     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
76     void internal_add_built_predecessor(predecessor_type &) __TBB_override {}
77     void internal_delete_built_predecessor(predecessor_type &) __TBB_override {}
78     void copy_predecessors(predecessor_list_type &) __TBB_override {}
79     size_t predecessor_count() __TBB_override { return 0; }
80 #endif
81     void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override { }
82
83 };
84
85 template< typename T >
86 void test_serial_broadcasts() {
87
88     tbb::flow::graph g;
89     tbb::flow::broadcast_node<T> b(g);
90
91     for ( int num_receivers = 1; num_receivers < R; ++num_receivers ) {
92         std::vector< counting_array_receiver<T> > receivers(num_receivers, counting_array_receiver<T>(g));
93 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
94         ASSERT(b.successor_count() == 0, NULL);
95         ASSERT(b.predecessor_count() == 0, NULL);
96         typename tbb::flow::broadcast_node<T>::successor_list_type my_succs;
97         b.copy_successors(my_succs);
98         ASSERT(my_succs.size() == 0, NULL);
99         typename tbb::flow::broadcast_node<T>::predecessor_list_type my_preds;
100         b.copy_predecessors(my_preds);
101         ASSERT(my_preds.size() == 0, NULL);
102 #endif
103
104         for ( int r = 0; r < num_receivers; ++r ) {
105             tbb::flow::make_edge( b, receivers[r] );
106         }
107 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
108         ASSERT( b.successor_count() == (size_t)num_receivers, NULL);
109 #endif
110
111         for (int n = 0; n < N; ++n ) {
112             ASSERT( b.try_put( (T)n ), NULL );
113         }
114
115         for ( int r = 0; r < num_receivers; ++r ) {
116             for (int n = 0; n < N; ++n ) {
117                 ASSERT( receivers[r][n] == 1, NULL );
118             }
119             tbb::flow::remove_edge( b, receivers[r] );
120         }
121         ASSERT( b.try_put( (T)0 ), NULL );
122         for ( int r = 0; r < num_receivers; ++r )
123             ASSERT( receivers[0][0] == 1, NULL );
124     }
125
126 }
127
128 template< typename T >
129 class native_body : private NoAssign {
130
131     tbb::flow::broadcast_node<T> &my_b;
132
133 public:
134
135     native_body( tbb::flow::broadcast_node<T> &b ) : my_b(b) {}
136
137     void operator()(int) const {
138         for (int n = 0; n < N; ++n ) {
139             ASSERT( my_b.try_put( (T)n ), NULL );
140         }
141     }
142
143 };
144
145 template< typename T >
146 void run_parallel_broadcasts(tbb::flow::graph& g, int p, tbb::flow::broadcast_node<T>& b) {
147     for ( int num_receivers = 1; num_receivers < R; ++num_receivers ) {
148         std::vector< counting_array_receiver<T> > receivers(num_receivers, counting_array_receiver<T>(g));
149
150         for ( int r = 0; r < num_receivers; ++r ) {
151             tbb::flow::make_edge( b, receivers[r] );
152         }
153
154         NativeParallelFor( p, native_body<T>( b ) );
155
156         for ( int r = 0; r < num_receivers; ++r ) {
157             for (int n = 0; n < N; ++n ) {
158                 ASSERT( (int)receivers[r][n] == p, NULL );
159             }
160             tbb::flow::remove_edge( b, receivers[r] );
161         }
162         ASSERT( b.try_put( (T)0 ), NULL );
163         for ( int r = 0; r < num_receivers; ++r )
164             ASSERT( (int)receivers[r][0] == p, NULL );
165     }
166 }
167
168 template< typename T >
169 void test_parallel_broadcasts(int p) {
170
171     tbb::flow::graph g;
172     tbb::flow::broadcast_node<T> b(g);
173     run_parallel_broadcasts(g, p, b);
174
175     // test copy constructor
176     tbb::flow::broadcast_node<T> b_copy(b);
177     run_parallel_broadcasts(g, p, b_copy);
178 }
179
180 // broadcast_node does not allow successors to try_get from it (it does not allow
181 // the flow edge to switch) so we only need test the forward direction.
182 template<typename T>
183 void test_resets() {
184     tbb::flow::graph g;
185     tbb::flow::broadcast_node<T> b0(g);
186     tbb::flow::broadcast_node<T> b1(g);
187     tbb::flow::queue_node<T> q0(g);
188     tbb::flow::make_edge(b0,b1);
189     tbb::flow::make_edge(b1,q0);
190     T j;
191
192     // test standard reset
193     for(int testNo = 0; testNo < 2; ++testNo) {
194         for(T i= 0; i <= 3; i += 1) {
195             b0.try_put(i);
196         }
197         g.wait_for_all();
198         for(T i= 0; i <= 3; i += 1) {
199             ASSERT(q0.try_get(j) && j == i, "Bad value in queue");
200         }
201         ASSERT(!q0.try_get(j), "extra value in queue");
202
203         // reset the graph.  It should work as before.
204         if (testNo == 0) g.reset();
205     }
206
207     g.reset(tbb::flow::rf_clear_edges);
208     for(T i= 0; i <= 3; i += 1) {
209         b0.try_put(i);
210     }
211     g.wait_for_all();
212     ASSERT(!q0.try_get(j), "edge between nodes not removed");
213     for(T i= 0; i <= 3; i += 1) {
214         b1.try_put(i);
215     }
216     g.wait_for_all();
217     ASSERT(!q0.try_get(j), "edge between nodes not removed");
218 }
219
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
221 void test_extract() {
222     int dont_care;
223     tbb::flow::graph g;
224     tbb::flow::broadcast_node<int> b0(g);
225     tbb::flow::broadcast_node<int> b1(g);
226     tbb::flow::broadcast_node<int> b2(g);
227     tbb::flow::broadcast_node<int> b3(g);
228     tbb::flow::broadcast_node<int> b4(g);
229     tbb::flow::broadcast_node<int> b5(g);
230     tbb::flow::queue_node<int> q0(g);
231     tbb::flow::make_edge(b0,b1);
232     tbb::flow::make_edge(b0,b2);
233     tbb::flow::make_edge(b1,b3);
234     tbb::flow::make_edge(b1,b4);
235     tbb::flow::make_edge(b2,b4);
236     tbb::flow::make_edge(b2,b5);
237     tbb::flow::make_edge(b3,q0);
238     tbb::flow::make_edge(b4,q0);
239     tbb::flow::make_edge(b5,q0);
240
241     /*          b3       */
242     /*         /  \      */
243     /*        b1   \     */
244     /*       / \    \    */
245     /*     b0   b4---q0  */
246     /*       \ /    /    */
247     /*        b2   /     */
248     /*         \  /      */
249     /*          b5       */
250
251     g.wait_for_all();
252     b0.try_put(1);
253     g.wait_for_all();
254     for( int i = 0; i < 4; ++i ) {
255         int j;
256         ASSERT(q0.try_get(j) && j == 1, "missing or incorrect message");
257     }
258     ASSERT(!q0.try_get(dont_care), "extra message in queue");
259     ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 2, "improper count for b0");
260     ASSERT(b1.predecessor_count() == 1 && b1.successor_count() == 2, "improper count for b1");
261     ASSERT(b2.predecessor_count() == 1 && b2.successor_count() == 2, "improper count for b2");
262     ASSERT(b3.predecessor_count() == 1 && b3.successor_count() == 1, "improper count for b3");
263     ASSERT(b4.predecessor_count() == 2 && b4.successor_count() == 1, "improper count before extract of b4");
264     ASSERT(b5.predecessor_count() == 1 && b5.successor_count() == 1, "improper count for b5");
265     b4.extract();  // remove from tree of nodes.
266     ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 2, "improper count for b0 after");
267     ASSERT(b1.predecessor_count() == 1 && b1.successor_count() == 1, "improper succ count for b1 after");
268     ASSERT(b2.predecessor_count() == 1 && b2.successor_count() == 1, "improper succ count for b2 after");
269     ASSERT(b3.predecessor_count() == 1 && b3.successor_count() == 1, "improper succ count for b3 after");
270     ASSERT(b4.predecessor_count() == 0 && b4.successor_count() == 0, "improper succ count after extract");
271     ASSERT(b5.predecessor_count() == 1 && b5.successor_count() == 1, "improper succ count for b5 after");
272
273     /*          b3       */
274     /*         /  \      */
275     /*        b1   \     */
276     /*       /      \    */
277     /*     b0        q0  */
278     /*       \      /    */
279     /*        b2   /     */
280     /*         \  /      */
281     /*          b5       */
282
283     b0.try_put(1);
284     g.wait_for_all();
285     for( int i = 0; i < 2; ++i ) {
286         int j;
287         ASSERT(q0.try_get(j) && j == 1, "missing or incorrect message");
288     }
289     ASSERT(!q0.try_get(dont_care), "extra message in queue");
290     tbb::flow::make_edge(b0,b4);
291     tbb::flow::make_edge(b4,q0);
292     g.wait_for_all();
293     ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 3, "improper count for b0 after");
294     ASSERT(b1.predecessor_count() == 1 && b1.successor_count() == 1, "improper succ count for b1 after");
295     ASSERT(b2.predecessor_count() == 1 && b2.successor_count() == 1, "improper succ count for b2 after");
296     ASSERT(b3.predecessor_count() == 1 && b3.successor_count() == 1, "improper succ count for b3 after");
297     ASSERT(b4.predecessor_count() == 1 && b4.successor_count() == 1, "improper succ count after extract");
298     ASSERT(b5.predecessor_count() == 1 && b5.successor_count() == 1, "improper succ count for b5 after");
299
300     /*          b3       */
301     /*         /  \      */
302     /*        b1   \     */
303     /*       /      \    */
304     /*     b0---b4---q0  */
305     /*       \      /    */
306     /*        b2   /     */
307     /*         \  /      */
308     /*          b5       */
309
310     b0.try_put(1);
311     g.wait_for_all();
312     for( int i = 0; i < 3; ++i ) {
313         int j;
314         ASSERT(q0.try_get(j) && j == 1, "missing or incorrect message");
315     }
316     ASSERT(!q0.try_get(dont_care), "extra message in queue");
317 }
318 #endif  // TBB_DEPRECATED_FLOW_NODE_EXTRACTION
319
320 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
321 #include <array>
322 #include <vector>
323 void test_follows_and_precedes_api() {
324     using msg_t = tbb::flow::continue_msg;
325
326     std::array<msg_t, 3> messages_for_follows= {msg_t(), msg_t(), msg_t()};
327     std::vector<msg_t> messages_for_precedes = {msg_t()};
328
329     follows_and_precedes_testing::test_follows <msg_t, tbb::flow::broadcast_node<msg_t>>(messages_for_follows);
330     follows_and_precedes_testing::test_precedes <msg_t, tbb::flow::broadcast_node<msg_t>>(messages_for_precedes);
331 }
332 #endif
333
334 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
335 void test_deduction_guides() {
336     using namespace tbb::flow;
337
338     graph g;
339
340     broadcast_node<int> b0(g);
341 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
342     buffer_node<int> buf(g);
343
344     broadcast_node b1(follows(buf));
345     static_assert(std::is_same_v<decltype(b1), broadcast_node<int>>);
346
347     broadcast_node b2(precedes(buf));
348     static_assert(std::is_same_v<decltype(b2), broadcast_node<int>>);
349 #endif
350
351     broadcast_node b3(b0);
352     static_assert(std::is_same_v<decltype(b3), broadcast_node<int>>);
353     g.wait_for_all();
354 }
355 #endif
356
357 int TestMain() {
358     if( MinThread<1 ) {
359         REPORT("number of threads must be positive\n");
360         exit(1);
361     }
362
363    test_serial_broadcasts<int>();
364    test_serial_broadcasts<float>();
365    test_serial_broadcasts<int_convertable_type>();
366
367    for( int p=MinThread; p<=MaxThread; ++p ) {
368        test_parallel_broadcasts<int>(p);
369        test_parallel_broadcasts<float>(p);
370        test_parallel_broadcasts<int_convertable_type>(p);
371    }
372
373    test_resets<int>();
374    test_resets<float>();
375 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
376     test_extract();
377 #endif
378 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
379     test_follows_and_precedes_api();
380 #endif
381 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
382     test_deduction_guides();
383 #endif
384    return Harness::Done;
385 }