2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
18 #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
23 #include "tbb/flow_graph.h"
25 #include "tbb/atomic.h"
26 #include "test_follows_and_precedes_api.h"
31 class int_convertable_type : private NoAssign {
37 int_convertable_type( int v ) : my_value(v) {}
38 operator int() const { return my_value; }
43 template< typename T >
44 class counting_array_receiver : public tbb::flow::receiver<T> {
46 tbb::atomic<size_t> my_counters[N];
47 tbb::flow::graph& my_graph;
51 counting_array_receiver(tbb::flow::graph& g) : my_graph(g) {
52 for (int i = 0; i < N; ++i )
56 size_t operator[]( int i ) {
57 size_t v = my_counters[i];
61 tbb::task * try_put_task( const T &v ) __TBB_override {
62 ++my_counters[(int)v];
63 return const_cast<tbb::task *>(tbb::flow::internal::SUCCESSFULLY_ENQUEUED);
66 tbb::flow::graph& graph_reference() const __TBB_override {
70 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
71 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
72 built_predecessors_type mbp;
73 built_predecessors_type &built_predecessors() __TBB_override { return mbp; }
74 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
75 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
76 void internal_add_built_predecessor(predecessor_type &) __TBB_override {}
77 void internal_delete_built_predecessor(predecessor_type &) __TBB_override {}
78 void copy_predecessors(predecessor_list_type &) __TBB_override {}
79 size_t predecessor_count() __TBB_override { return 0; }
81 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override { }
85 template< typename T >
86 void test_serial_broadcasts() {
89 tbb::flow::broadcast_node<T> b(g);
91 for ( int num_receivers = 1; num_receivers < R; ++num_receivers ) {
92 std::vector< counting_array_receiver<T> > receivers(num_receivers, counting_array_receiver<T>(g));
93 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
94 ASSERT(b.successor_count() == 0, NULL);
95 ASSERT(b.predecessor_count() == 0, NULL);
96 typename tbb::flow::broadcast_node<T>::successor_list_type my_succs;
97 b.copy_successors(my_succs);
98 ASSERT(my_succs.size() == 0, NULL);
99 typename tbb::flow::broadcast_node<T>::predecessor_list_type my_preds;
100 b.copy_predecessors(my_preds);
101 ASSERT(my_preds.size() == 0, NULL);
104 for ( int r = 0; r < num_receivers; ++r ) {
105 tbb::flow::make_edge( b, receivers[r] );
107 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
108 ASSERT( b.successor_count() == (size_t)num_receivers, NULL);
111 for (int n = 0; n < N; ++n ) {
112 ASSERT( b.try_put( (T)n ), NULL );
115 for ( int r = 0; r < num_receivers; ++r ) {
116 for (int n = 0; n < N; ++n ) {
117 ASSERT( receivers[r][n] == 1, NULL );
119 tbb::flow::remove_edge( b, receivers[r] );
121 ASSERT( b.try_put( (T)0 ), NULL );
122 for ( int r = 0; r < num_receivers; ++r )
123 ASSERT( receivers[0][0] == 1, NULL );
128 template< typename T >
129 class native_body : private NoAssign {
131 tbb::flow::broadcast_node<T> &my_b;
135 native_body( tbb::flow::broadcast_node<T> &b ) : my_b(b) {}
137 void operator()(int) const {
138 for (int n = 0; n < N; ++n ) {
139 ASSERT( my_b.try_put( (T)n ), NULL );
145 template< typename T >
146 void run_parallel_broadcasts(tbb::flow::graph& g, int p, tbb::flow::broadcast_node<T>& b) {
147 for ( int num_receivers = 1; num_receivers < R; ++num_receivers ) {
148 std::vector< counting_array_receiver<T> > receivers(num_receivers, counting_array_receiver<T>(g));
150 for ( int r = 0; r < num_receivers; ++r ) {
151 tbb::flow::make_edge( b, receivers[r] );
154 NativeParallelFor( p, native_body<T>( b ) );
156 for ( int r = 0; r < num_receivers; ++r ) {
157 for (int n = 0; n < N; ++n ) {
158 ASSERT( (int)receivers[r][n] == p, NULL );
160 tbb::flow::remove_edge( b, receivers[r] );
162 ASSERT( b.try_put( (T)0 ), NULL );
163 for ( int r = 0; r < num_receivers; ++r )
164 ASSERT( (int)receivers[r][0] == p, NULL );
168 template< typename T >
169 void test_parallel_broadcasts(int p) {
172 tbb::flow::broadcast_node<T> b(g);
173 run_parallel_broadcasts(g, p, b);
175 // test copy constructor
176 tbb::flow::broadcast_node<T> b_copy(b);
177 run_parallel_broadcasts(g, p, b_copy);
180 // broadcast_node does not allow successors to try_get from it (it does not allow
181 // the flow edge to switch) so we only need test the forward direction.
185 tbb::flow::broadcast_node<T> b0(g);
186 tbb::flow::broadcast_node<T> b1(g);
187 tbb::flow::queue_node<T> q0(g);
188 tbb::flow::make_edge(b0,b1);
189 tbb::flow::make_edge(b1,q0);
192 // test standard reset
193 for(int testNo = 0; testNo < 2; ++testNo) {
194 for(T i= 0; i <= 3; i += 1) {
198 for(T i= 0; i <= 3; i += 1) {
199 ASSERT(q0.try_get(j) && j == i, "Bad value in queue");
201 ASSERT(!q0.try_get(j), "extra value in queue");
203 // reset the graph. It should work as before.
204 if (testNo == 0) g.reset();
207 g.reset(tbb::flow::rf_clear_edges);
208 for(T i= 0; i <= 3; i += 1) {
212 ASSERT(!q0.try_get(j), "edge between nodes not removed");
213 for(T i= 0; i <= 3; i += 1) {
217 ASSERT(!q0.try_get(j), "edge between nodes not removed");
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
221 void test_extract() {
224 tbb::flow::broadcast_node<int> b0(g);
225 tbb::flow::broadcast_node<int> b1(g);
226 tbb::flow::broadcast_node<int> b2(g);
227 tbb::flow::broadcast_node<int> b3(g);
228 tbb::flow::broadcast_node<int> b4(g);
229 tbb::flow::broadcast_node<int> b5(g);
230 tbb::flow::queue_node<int> q0(g);
231 tbb::flow::make_edge(b0,b1);
232 tbb::flow::make_edge(b0,b2);
233 tbb::flow::make_edge(b1,b3);
234 tbb::flow::make_edge(b1,b4);
235 tbb::flow::make_edge(b2,b4);
236 tbb::flow::make_edge(b2,b5);
237 tbb::flow::make_edge(b3,q0);
238 tbb::flow::make_edge(b4,q0);
239 tbb::flow::make_edge(b5,q0);
254 for( int i = 0; i < 4; ++i ) {
256 ASSERT(q0.try_get(j) && j == 1, "missing or incorrect message");
258 ASSERT(!q0.try_get(dont_care), "extra message in queue");
259 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 2, "improper count for b0");
260 ASSERT(b1.predecessor_count() == 1 && b1.successor_count() == 2, "improper count for b1");
261 ASSERT(b2.predecessor_count() == 1 && b2.successor_count() == 2, "improper count for b2");
262 ASSERT(b3.predecessor_count() == 1 && b3.successor_count() == 1, "improper count for b3");
263 ASSERT(b4.predecessor_count() == 2 && b4.successor_count() == 1, "improper count before extract of b4");
264 ASSERT(b5.predecessor_count() == 1 && b5.successor_count() == 1, "improper count for b5");
265 b4.extract(); // remove from tree of nodes.
266 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 2, "improper count for b0 after");
267 ASSERT(b1.predecessor_count() == 1 && b1.successor_count() == 1, "improper succ count for b1 after");
268 ASSERT(b2.predecessor_count() == 1 && b2.successor_count() == 1, "improper succ count for b2 after");
269 ASSERT(b3.predecessor_count() == 1 && b3.successor_count() == 1, "improper succ count for b3 after");
270 ASSERT(b4.predecessor_count() == 0 && b4.successor_count() == 0, "improper succ count after extract");
271 ASSERT(b5.predecessor_count() == 1 && b5.successor_count() == 1, "improper succ count for b5 after");
285 for( int i = 0; i < 2; ++i ) {
287 ASSERT(q0.try_get(j) && j == 1, "missing or incorrect message");
289 ASSERT(!q0.try_get(dont_care), "extra message in queue");
290 tbb::flow::make_edge(b0,b4);
291 tbb::flow::make_edge(b4,q0);
293 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 3, "improper count for b0 after");
294 ASSERT(b1.predecessor_count() == 1 && b1.successor_count() == 1, "improper succ count for b1 after");
295 ASSERT(b2.predecessor_count() == 1 && b2.successor_count() == 1, "improper succ count for b2 after");
296 ASSERT(b3.predecessor_count() == 1 && b3.successor_count() == 1, "improper succ count for b3 after");
297 ASSERT(b4.predecessor_count() == 1 && b4.successor_count() == 1, "improper succ count after extract");
298 ASSERT(b5.predecessor_count() == 1 && b5.successor_count() == 1, "improper succ count for b5 after");
312 for( int i = 0; i < 3; ++i ) {
314 ASSERT(q0.try_get(j) && j == 1, "missing or incorrect message");
316 ASSERT(!q0.try_get(dont_care), "extra message in queue");
318 #endif // TBB_DEPRECATED_FLOW_NODE_EXTRACTION
320 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
323 void test_follows_and_precedes_api() {
324 using msg_t = tbb::flow::continue_msg;
326 std::array<msg_t, 3> messages_for_follows= {msg_t(), msg_t(), msg_t()};
327 std::vector<msg_t> messages_for_precedes = {msg_t()};
329 follows_and_precedes_testing::test_follows <msg_t, tbb::flow::broadcast_node<msg_t>>(messages_for_follows);
330 follows_and_precedes_testing::test_precedes <msg_t, tbb::flow::broadcast_node<msg_t>>(messages_for_precedes);
334 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
335 void test_deduction_guides() {
336 using namespace tbb::flow;
340 broadcast_node<int> b0(g);
341 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
342 buffer_node<int> buf(g);
344 broadcast_node b1(follows(buf));
345 static_assert(std::is_same_v<decltype(b1), broadcast_node<int>>);
347 broadcast_node b2(precedes(buf));
348 static_assert(std::is_same_v<decltype(b2), broadcast_node<int>>);
351 broadcast_node b3(b0);
352 static_assert(std::is_same_v<decltype(b3), broadcast_node<int>>);
359 REPORT("number of threads must be positive\n");
363 test_serial_broadcasts<int>();
364 test_serial_broadcasts<float>();
365 test_serial_broadcasts<int_convertable_type>();
367 for( int p=MinThread; p<=MaxThread; ++p ) {
368 test_parallel_broadcasts<int>(p);
369 test_parallel_broadcasts<float>(p);
370 test_parallel_broadcasts<int_convertable_type>(p);
374 test_resets<float>();
375 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
378 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
379 test_follows_and_precedes_api();
381 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
382 test_deduction_guides();
384 return Harness::Done;