2 Copyright (c) 2005-2017 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
21 #include "tbb/tbb_stddef.h"
22 #include "tbb/pipeline.h"
23 #include "tbb/spin_mutex.h"
24 #include "tbb/atomic.h"
29 // In the test, variables related to token counting are declared
30 // as unsigned long to match definition of tbb::internal::Token.
33 //! Indicates that the buffer is not used.
34 static const unsigned long unused = ~0ul;
36 //! True if Buffer is in use.
38 unsigned long sequence_number;
39 Buffer() : id(unused), is_busy(false), sequence_number(unused) {}
45 waiting_probe() : check_counter(0) {}
48 return !((check_counter+1)&size_t(0x7FFF));
50 void probe( ); // defined below
53 static const unsigned MaxStreamSize = 8000;
54 static const unsigned MaxStreamItemsPerThread = 1000;
55 //! Maximum number of filters allowed
56 static const unsigned MaxFilters = 5;
57 static unsigned StreamSize;
58 static const unsigned MaxBuffer = 8;
59 static bool Done[MaxFilters][MaxStreamSize];
60 static waiting_probe WaitTest;
61 static unsigned out_of_order_count;
63 #include "harness_concurrency_tracker.h"
65 class BaseFilter: public tbb::filter {
67 const bool my_is_last;
70 tbb::atomic<tbb::internal::Token> current_token;
71 BaseFilter( tbb::filter::mode type, bool done[], bool is_last ) :
78 virtual Buffer* get_buffer( void* item ) {
80 return static_cast<Buffer*>(item);
82 void* operator()( void* item ) __TBB_override {
83 Harness::ConcurrencyTracker ct;
85 ASSERT( !my_is_running, "premature entry to serial stage" );
87 Buffer* b = get_buffer(item);
90 if( b->sequence_number == Buffer::unused )
91 b->sequence_number = current_token-1;
93 ASSERT( b->sequence_number==current_token-1, "item arrived out of order" );
94 } else if( is_serial() ) {
95 if( b->sequence_number != current_token-1 && b->sequence_number != Buffer::unused )
98 ASSERT( b->id < StreamSize, NULL );
99 ASSERT( !my_done[b->id], "duplicate processing of token?" );
100 ASSERT( b->is_busy, NULL );
101 my_done[b->id] = true;
103 b->id = Buffer::unused;
104 b->sequence_number = Buffer::unused;
105 __TBB_store_with_release(b->is_busy, false);
108 my_is_running = false;
113 class InputFilter: public BaseFilter {
114 tbb::spin_mutex input_lock;
115 Buffer buffer[MaxBuffer];
116 const tbb::internal::Token my_number_of_tokens;
118 InputFilter( tbb::filter::mode type, tbb::internal::Token ntokens, bool done[], bool is_last ) :
119 BaseFilter(type, done, is_last),
120 my_number_of_tokens(ntokens)
122 Buffer* get_buffer( void* ) __TBB_override {
123 unsigned long next_input;
124 unsigned free_buffer = 0;
125 { // lock protected scope
126 tbb::spin_mutex::scoped_lock lock(input_lock);
127 if( current_token>=StreamSize )
129 next_input = current_token++;
130 // once in a while, emulate waiting for input; this only makes sense for serial input
131 if( is_serial() && WaitTest.required() )
133 while( free_buffer<MaxBuffer )
134 if( __TBB_load_with_acquire(buffer[free_buffer].is_busy) )
137 buffer[free_buffer].is_busy = true;
141 ASSERT( free_buffer<my_number_of_tokens, "premature reuse of buffer" );
142 Buffer* b = &buffer[free_buffer];
143 ASSERT( &buffer[0] <= b, NULL );
144 ASSERT( b <= &buffer[MaxBuffer-1], NULL );
145 ASSERT( b->id == Buffer::unused, NULL);
147 ASSERT( b->sequence_number == Buffer::unused, NULL);
152 //! The struct below repeats layout of tbb::pipeline.
153 struct hacked_pipeline {
154 tbb::filter* filter_list;
155 tbb::filter* filter_end;
156 tbb::empty_task* end_counter;
157 tbb::atomic<tbb::internal::Token> input_tokens;
158 tbb::atomic<tbb::internal::Token> token_counter;
160 bool has_thread_bound_filters;
162 virtual ~hacked_pipeline();
165 //! The struct below repeats layout of tbb::internal::input_buffer.
166 struct hacked_input_buffer {
167 void* array; // This should be changed to task_info* if ever used
168 void* my_sem; // This should be changed to semaphore* if ever used
169 tbb::internal::Token array_size;
170 tbb::internal::Token low_token;
171 tbb::spin_mutex array_mutex;
172 tbb::internal::Token high_token;
177 //! The struct below repeats layout of tbb::filter.
178 struct hacked_filter {
179 tbb::filter* next_filter_in_pipeline;
180 hacked_input_buffer* my_input_buffer;
181 unsigned char my_filter_mode;
182 tbb::filter* prev_filter_in_pipeline;
183 tbb::pipeline* my_pipeline;
184 tbb::filter* next_segment;
186 virtual ~hacked_filter();
189 bool do_hacking_tests = true;
190 const tbb::internal::Token tokens_before_wraparound = 0xF;
192 void TestTrivialPipeline( unsigned nthread, unsigned number_of_filters ) {
193 // There are 3 filter types: parallel, serial_in_order and serial_out_of_order
194 static const tbb::filter::mode filter_table[] = { tbb::filter::parallel, tbb::filter::serial_in_order, tbb::filter::serial_out_of_order};
195 const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
196 REMARK( "testing with %lu threads and %lu filters\n", nthread, number_of_filters );
197 ASSERT( number_of_filters<=MaxFilters, "too many filters" );
198 ASSERT( sizeof(hacked_pipeline) == sizeof(tbb::pipeline), "layout changed for tbb::pipeline?" );
199 ASSERT( sizeof(hacked_filter) == sizeof(tbb::filter), "layout changed for tbb::filter?" );
200 tbb::internal::Token ntokens = nthread<MaxBuffer ? nthread : MaxBuffer;
201 // Count maximum iterations number
203 for( unsigned i=0; i<number_of_filters; ++i)
204 limit *= number_of_filter_types;
205 // Iterate over possible filter sequences
206 for( unsigned numeral=0; numeral<limit; ++numeral ) {
208 tbb::pipeline pipeline;
209 if( do_hacking_tests ) {
210 // A private member of pipeline is hacked there for sake of testing wrap-around immunity.
211 tbb::internal::punned_cast<hacked_pipeline*>(&pipeline)->token_counter = ~tokens_before_wraparound;
213 tbb::filter* filter[MaxFilters];
214 unsigned temp = numeral;
215 // parallelism_limit is the upper bound on the possible parallelism
216 unsigned parallelism_limit = 0;
217 for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) {
218 tbb::filter::mode filter_type = filter_table[temp%number_of_filter_types];
219 const bool is_last = i==number_of_filters-1;
221 filter[i] = new InputFilter(filter_type,ntokens,Done[i],is_last);
223 filter[i] = new BaseFilter(filter_type,Done[i],is_last);
224 pipeline.add_filter(*filter[i]);
225 // The ordered buffer of serial filters is hacked as well.
226 if ( filter[i]->is_serial() ) {
227 if( do_hacking_tests ) {
228 ((hacked_filter*)(void*)filter[i])->my_input_buffer->low_token = ~tokens_before_wraparound;
229 ((hacked_filter*)(void*)filter[i])->my_input_buffer->high_token = ~tokens_before_wraparound;
231 parallelism_limit += 1;
233 parallelism_limit = nthread;
236 // Account for clipping of parallelism.
237 if( parallelism_limit>nthread )
238 parallelism_limit = nthread;
239 if( parallelism_limit>ntokens )
240 parallelism_limit = (unsigned)ntokens;
241 Harness::ConcurrencyTracker::Reset();
242 unsigned streamSizeLimit = min( MaxStreamSize, nthread * MaxStreamItemsPerThread );
243 for( StreamSize=0; StreamSize<=streamSizeLimit; ) {
244 memset( Done, 0, sizeof(Done) );
245 for( unsigned i=0; i<number_of_filters; ++i ) {
246 static_cast<BaseFilter*>(filter[i])->current_token=0;
248 pipeline.run( ntokens );
249 ASSERT( !Harness::ConcurrencyTracker::InstantParallelism(), "filter still running?" );
250 for( unsigned i=0; i<number_of_filters; ++i )
251 ASSERT( static_cast<BaseFilter*>(filter[i])->current_token==StreamSize, NULL );
252 for( unsigned i=0; i<MaxFilters; ++i )
253 for( unsigned j=0; j<StreamSize; ++j ) {
254 ASSERT( Done[i][j]==(i<number_of_filters), NULL );
256 if( StreamSize < min(nthread*8, 32u) ) {
259 StreamSize = StreamSize*8/3;
262 if( Harness::ConcurrencyTracker::PeakParallelism() < parallelism_limit )
263 REMARK( "nthread=%lu ntokens=%lu MaxParallelism=%lu parallelism_limit=%lu\n",
264 nthread, ntokens, Harness::ConcurrencyTracker::PeakParallelism(), parallelism_limit );
265 for( unsigned i=0; i < number_of_filters; ++i ) {
273 #include "harness_cpu.h"
275 static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
277 void waiting_probe::probe( ) {
278 if( nthread==1 ) return;
279 REMARK("emulating wait for input\n");
280 // Test that threads sleep while no work.
281 // The master doesn't sleep so there could be 2 active threads if a worker is waiting for input
282 TestCPUUserTime(nthread, 2);
285 #include "tbb/task_scheduler_init.h"
288 out_of_order_count = 0;
290 REPORT("must have at least one thread");
293 if( tbb::TBB_runtime_interface_version()>TBB_INTERFACE_VERSION) {
294 REMARK("Warning: implementation dependent tests disabled\n");
295 do_hacking_tests = false;
298 // Test with varying number of threads.
299 for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
300 // Initialize TBB task scheduler
301 tbb::task_scheduler_init init(nthread);
303 // Test pipelines with n filters
304 for( unsigned n=0; n<=MaxFilters; ++n )
305 TestTrivialPipeline(nthread,n);
307 // Test that all workers sleep when no work
308 TestCPUUserTime(nthread);
310 if( !out_of_order_count )
311 REPORT("Warning: out of order serial filter received tokens in order\n");
312 return Harness::Done;