Committing Intel(R) TBB 2018 source code
[platform/upstream/tbb.git] / src / test / test_pipeline.cpp
1 /*
2     Copyright (c) 2005-2017 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15
16
17
18
19 */
20
21 #include "tbb/tbb_stddef.h"
22 #include "tbb/pipeline.h"
23 #include "tbb/spin_mutex.h"
24 #include "tbb/atomic.h"
25 #include <cstdlib>
26 #include <cstdio>
27 #include "harness.h"
28
29 // In the test, variables related to token counting are declared
30 // as unsigned long to match definition of tbb::internal::Token.
31
32 struct Buffer {
33     //! Indicates that the buffer is not used.
34     static const unsigned long unused = ~0ul;
35     unsigned long id;
36     //! True if Buffer is in use.
37     bool is_busy;
38     unsigned long sequence_number;
39     Buffer() : id(unused), is_busy(false), sequence_number(unused) {}
40 };
41
42 class waiting_probe {
43     size_t check_counter;
44 public:
45     waiting_probe() : check_counter(0) {}
46     bool required( ) {
47         ++check_counter;
48         return !((check_counter+1)&size_t(0x7FFF));
49     }
50     void probe( ); // defined below
51 };
52
53 static const unsigned MaxStreamSize = 8000;
54 static const unsigned MaxStreamItemsPerThread = 1000;
55 //! Maximum number of filters allowed
56 static const unsigned MaxFilters = 5;
57 static unsigned StreamSize;
58 static const unsigned MaxBuffer = 8;
59 static bool Done[MaxFilters][MaxStreamSize];
60 static waiting_probe WaitTest;
61 static unsigned out_of_order_count;
62
63 #include "harness_concurrency_tracker.h"
64
65 class BaseFilter: public tbb::filter {
66     bool* const my_done;
67     const bool my_is_last;
68     bool my_is_running;
69 public:
70     tbb::atomic<tbb::internal::Token> current_token;
71     BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) :
72         filter(type),
73         my_done(done),
74         my_is_last(is_last),
75         my_is_running(false),
76         current_token()
77     {}
78     virtual Buffer* get_buffer( void* item ) {
79         current_token++;
80         return static_cast<Buffer*>(item);
81     }
82     void* operator()( void* item ) __TBB_override {
83         Harness::ConcurrencyTracker ct;
84         if( is_serial() )
85             ASSERT( !my_is_running, "premature entry to serial stage" );
86         my_is_running = true;
87         Buffer* b = get_buffer(item);
88         if( b ) {
89             if( is_ordered() ) {
90                 if( b->sequence_number == Buffer::unused )
91                     b->sequence_number = current_token-1;
92                 else
93                     ASSERT( b->sequence_number==current_token-1, "item arrived out of order" );
94             } else if( is_serial() ) {
95                 if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused )
96                     out_of_order_count++;
97             }
98             ASSERT( b->id < StreamSize, NULL );
99             ASSERT( !my_done[b->id], "duplicate processing of token?" );
100             ASSERT( b->is_busy, NULL );
101             my_done[b->id] = true;
102             if( my_is_last ) {
103                 b->id = Buffer::unused;
104                 b->sequence_number = Buffer::unused;
105                 __TBB_store_with_release(b->is_busy, false);
106             }
107         }
108         my_is_running = false;
109         return b;
110     }
111 };
112
113 class InputFilter: public BaseFilter {
114     tbb::spin_mutex input_lock;
115     Buffer buffer[MaxBuffer];
116     const tbb::internal::Token my_number_of_tokens;
117 public:
118     InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) :
119         BaseFilter(type, done, is_last),
120         my_number_of_tokens(ntokens)
121     {}
122     Buffer* get_buffer( void* ) __TBB_override {
123         unsigned long next_input;
124         unsigned free_buffer = 0;
125         { // lock protected scope
126             tbb::spin_mutex::scoped_lock lock(input_lock);
127             if( current_token>=StreamSize )
128                 return NULL;
129             next_input = current_token++;
130             // once in a while, emulate waiting for input; this only makes sense for serial input
131             if( is_serial() && WaitTest.required() )
132                 WaitTest.probe( );
133             while( free_buffer<MaxBuffer )
134                 if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) )
135                     ++free_buffer;
136                 else {
137                     buffer[free_buffer].is_busy = true;
138                     break;
139                 }
140         }
141         ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" );
142         Buffer* b = &buffer[free_buffer];
143         ASSERT( &buffer[0] <= b, NULL );
144         ASSERT( b <= &buffer[MaxBuffer-1], NULL );
145         ASSERT( b->id == Buffer::unused, NULL);
146         b->id = next_input;
147         ASSERT( b->sequence_number == Buffer::unused, NULL);
148         return b;
149     }
150 };
151
152 //! The struct below repeats layout of tbb::pipeline.
153 struct hacked_pipeline {
154     tbb::filter* filter_list;
155     tbb::filter* filter_end;
156     tbb::empty_task* end_counter;
157     tbb::atomic<tbb::internal::Token> input_tokens;
158     tbb::atomic<tbb::internal::Token> token_counter;
159     bool end_of_input;
160     bool has_thread_bound_filters;
161
162     virtual ~hacked_pipeline();
163 };
164
165 //! The struct below repeats layout of tbb::internal::input_buffer.
166 struct hacked_input_buffer {
167     void* array; // This should be changed to task_info* if ever used
168     void* my_sem; // This should be changed to semaphore* if ever used
169     tbb::internal::Token array_size;
170     tbb::internal::Token low_token;
171     tbb::spin_mutex array_mutex;
172     tbb::internal::Token high_token;
173     bool is_ordered;
174     bool is_bound;
175 };
176
177 //! The struct below repeats layout of tbb::filter.
178 struct hacked_filter {
179     tbb::filter* next_filter_in_pipeline;
180     hacked_input_buffer* my_input_buffer;
181     unsigned char my_filter_mode;
182     tbb::filter* prev_filter_in_pipeline;
183     tbb::pipeline* my_pipeline;
184     tbb::filter* next_segment;
185
186     virtual ~hacked_filter();
187 };
188
189 bool do_hacking_tests = true;
190 const tbb::internal::Token tokens_before_wraparound = 0xF;
191
192 void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) {
193     // There are 3 filter types: parallel, serial_in_order and serial_out_of_order
194     static const tbb::filter::mode filter_table[] = { tbb::filter::parallel, tbb::filter::serial_in_order, tbb::filter::serial_out_of_order};
195     const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
196     REMARK( "testing with %lu threads and %lu filters\n", nthread, number_of_filters );
197     ASSERT( number_of_filters<=MaxFilters, "too many filters" );
198     ASSERT( sizeof(hacked_pipeline) == sizeof(tbb::pipeline), "layout changed for tbb::pipeline?" );
199     ASSERT( sizeof(hacked_filter) == sizeof(tbb::filter), "layout changed for tbb::filter?" );
200     tbb::internal::Token ntokens = nthread<MaxBuffer ? nthread : MaxBuffer;
201     // Count maximum iterations number
202     unsigned limit = 1;
203     for( unsigned i=0; i<number_of_filters; ++i)
204         limit *= number_of_filter_types;
205     // Iterate over possible filter sequences
206     for( unsigned numeral=0; numeral<limit; ++numeral ) {
207         // Build pipeline
208         tbb::pipeline pipeline;
209         if( do_hacking_tests ) {
210             // A private member of pipeline is hacked there for sake of testing wrap-around immunity.
211             tbb::internal::punned_cast<hacked_pipeline*>(&pipeline)->token_counter = ~tokens_before_wraparound;
212         }
213         tbb::filter* filter[MaxFilters];
214         unsigned temp = numeral;
215         // parallelism_limit is the upper bound on the possible parallelism
216         unsigned parallelism_limit = 0;
217         for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) {
218             tbb::filter::mode filter_type = filter_table[temp%number_of_filter_types];
219             const bool is_last = i==number_of_filters-1;
220             if( i==0 )
221                 filter[i] = new InputFilter(filter_type,ntokens,Done[i],is_last);
222             else
223                 filter[i] = new BaseFilter(filter_type,Done[i],is_last);
224             pipeline.add_filter(*filter[i]);
225             // The ordered buffer of serial filters is hacked as well.
226             if ( filter[i]->is_serial() ) {
227                 if( do_hacking_tests ) {
228                     ((hacked_filter*)(void*)filter[i])->my_input_buffer->low_token = ~tokens_before_wraparound;
229                     ((hacked_filter*)(void*)filter[i])->my_input_buffer->high_token = ~tokens_before_wraparound;
230                 }
231                 parallelism_limit += 1;
232             } else {
233                 parallelism_limit = nthread;
234             }
235         }
236         // Account for clipping of parallelism.
237         if( parallelism_limit>nthread )
238             parallelism_limit = nthread;
239         if( parallelism_limit>ntokens )
240             parallelism_limit = (unsigned)ntokens;
241         Harness::ConcurrencyTracker::Reset();
242         unsigned streamSizeLimit = min( MaxStreamSize, nthread * MaxStreamItemsPerThread );
243         for( StreamSize=0; StreamSize<=streamSizeLimit; ) {
244             memset( Done, 0, sizeof(Done) );
245             for( unsigned i=0; i<number_of_filters; ++i ) {
246                 static_cast<BaseFilter*>(filter[i])->current_token=0;
247             }
248             pipeline.run( ntokens );
249             ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" );
250             for( unsigned i=0; i<number_of_filters; ++i )
251                 ASSERT( static_cast<BaseFilter*>(filter[i])->current_token==StreamSize, NULL );
252             for( unsigned i=0; i<MaxFilters; ++i )
253                 for( unsigned j=0; j<StreamSize; ++j ) {
254                     ASSERT( Done[i][j]==(i<number_of_filters), NULL );
255                 }
256             if( StreamSize < min(nthread*8, 32u) ) {
257                 ++StreamSize;
258             } else {
259                 StreamSize = StreamSize*8/3;
260             }
261         }
262         if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit )
263             REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n",
264                 nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit );
265         for( unsigned i=0; i < number_of_filters; ++i ) {
266             delete filter[i];
267             filter[i] = NULL;
268         }
269         pipeline.clear();
270     }
271 }
272
273 #include "harness_cpu.h"
274
275 static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
276
277 void waiting_probe::probe( ) {
278     if( nthread==1 ) return;
279     REMARK("emulating wait for input\n");
280     // Test that threads sleep while no work.
281     // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input
282     TestCPUUserTime(nthread, 2);
283 }
284
285 #include "tbb/task_scheduler_init.h"
286
287 int TestMain () {
288     out_of_order_count = 0;
289     if( MinThread<1 ) {
290         REPORT("must have at least one thread");
291         exit(1);
292     }
293     if( tbb::TBB_runtime_interface_version()>TBB_INTERFACE_VERSION) {
294         REMARK("Warning: implementation dependent tests disabled\n");
295         do_hacking_tests = false;
296     }
297
298     // Test with varying number of threads.
299     for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
300         // Initialize TBB task scheduler
301         tbb::task_scheduler_init init(nthread);
302
303         // Test pipelines with n filters
304         for( unsigned n=0; n<=MaxFilters; ++n )
305             TestTrivialPipeline(nthread,n);
306
307         // Test that all workers sleep when no work
308         TestCPUUserTime(nthread);
309     }
310     if( !out_of_order_count )
311         REPORT("Warning: out of order serial filter received tokens in order\n");
312     return Harness::Done;
313 }