2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
20 #define __TBB_concurrent_queue_H_include_area
21 #include "internal/_warning_suppress_enable_notice.h"
23 #include "internal/_concurrent_queue_impl.h"
24 #include "internal/_allocator_traits.h"
28 namespace strict_ppl {
30 //! A high-performance thread-safe non-blocking concurrent queue.
31 /** Multiple threads may each push and pop concurrently.
32 Assignment construction is not allowed.
33 @ingroup containers */
34 template<typename T, typename A = cache_aligned_allocator<T> >
35 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
36 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
39 typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
40 page_allocator_type my_allocator;
42 //! Allocates a block of size n (bytes)
43 virtual void *allocate_block( size_t n ) __TBB_override {
44 void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
46 internal::throw_exception(internal::eid_bad_alloc);
50 //! Deallocates block created by allocate_block.
51 virtual void deallocate_block( void *b, size_t n ) __TBB_override {
52 my_allocator.deallocate( reinterpret_cast<char*>(b), n );
55 static void copy_construct_item(T* location, const void* src){
56 new (location) T(*static_cast<const T*>(src));
59 #if __TBB_CPP11_RVALUE_REF_PRESENT
60 static void move_construct_item(T* location, const void* src) {
61 new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
63 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
65 //! Element type in the queue.
71 //! Const reference type
72 typedef const T& const_reference;
74 //! Integral type for representing size of the queue.
75 typedef size_t size_type;
77 //! Difference type for iterator
78 typedef ptrdiff_t difference_type;
81 typedef A allocator_type;
83 //! Construct empty queue
84 explicit concurrent_queue(const allocator_type& a = allocator_type()) :
89 //! [begin,end) constructor
90 template<typename InputIterator>
91 concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
94 for( ; begin != end; ++begin )
99 concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
100 internal::concurrent_queue_base_v3<T>(), my_allocator( a )
102 this->assign( src, copy_construct_item );
105 #if __TBB_CPP11_RVALUE_REF_PRESENT
106 //! Move constructors
107 concurrent_queue( concurrent_queue&& src ) :
108 internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
110 this->internal_swap( src );
113 concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
114 internal::concurrent_queue_base_v3<T>(), my_allocator( a )
116 // checking that memory allocated by one instance of allocator can be deallocated
118 if( my_allocator == src.my_allocator) {
119 this->internal_swap( src );
121 // allocators are different => performing per-element move
122 this->assign( src, move_construct_item );
126 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
131 //! Enqueue an item at tail of queue.
132 void push( const T& source ) {
133 this->internal_push( &source, copy_construct_item );
136 #if __TBB_CPP11_RVALUE_REF_PRESENT
137 void push( T&& source ) {
138 this->internal_push( &source, move_construct_item );
141 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
142 template<typename... Arguments>
143 void emplace( Arguments&&... args ) {
144 push( T(std::forward<Arguments>( args )...) );
146 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
147 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
149 //! Attempt to dequeue an item from head of queue.
150 /** Does not wait for item to become available.
151 Returns true if successful; false otherwise. */
152 bool try_pop( T& result ) {
153 return this->internal_try_pop( &result );
156 //! Return the number of items in the queue; thread unsafe
157 size_type unsafe_size() const {return this->internal_size();}
159 //! Equivalent to size()==0.
160 bool empty() const {return this->internal_empty();}
162 //! Clear the queue. not thread-safe.
165 //! Return allocator object
166 allocator_type get_allocator() const { return this->my_allocator; }
168 typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
169 typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
171 //------------------------------------------------------------------------
172 // The iterators are intended only for debugging. They are slow and not thread safe.
173 //------------------------------------------------------------------------
174 iterator unsafe_begin() {return iterator(*this);}
175 iterator unsafe_end() {return iterator();}
176 const_iterator unsafe_begin() const {return const_iterator(*this);}
177 const_iterator unsafe_end() const {return const_iterator();}
180 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
181 // Deduction guide for the constructor from two iterators
182 template<typename InputIterator,
183 typename T = typename std::iterator_traits<InputIterator>::value_type,
184 typename A = cache_aligned_allocator<T>
185 > concurrent_queue(InputIterator, InputIterator, const A& = A())
186 -> concurrent_queue<T, A>;
187 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
189 template<typename T, class A>
190 concurrent_queue<T,A>::~concurrent_queue() {
192 this->internal_finish_clear();
195 template<typename T, class A>
196 void concurrent_queue<T,A>::clear() {
198 while( !empty() ) try_pop(value);
201 } // namespace strict_ppl
203 //! A high-performance thread-safe blocking concurrent bounded queue.
204 /** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics.
205 Note that method names agree with the PPL-style concurrent queue.
206 Multiple threads may each push and pop concurrently.
207 Assignment construction is not allowed.
208 @ingroup containers */
209 template<typename T, class A = cache_aligned_allocator<T> >
210 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
211 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
212 typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
215 page_allocator_type my_allocator;
217 typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
218 typedef typename concurrent_queue_base_v3::copy_specifics copy_specifics;
220 //! Class used to ensure exception-safety of method "pop"
221 class destroyer: internal::no_copy {
224 destroyer( T& value ) : my_value(value) {}
225 ~destroyer() {my_value.~T();}
228 T& get_ref( page& p, size_t index ) {
229 __TBB_ASSERT( index<items_per_page, NULL );
230 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
233 virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
234 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
237 #if __TBB_CPP11_RVALUE_REF_PRESENT
238 virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
239 new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
242 virtual void move_item( page&, size_t, const void* ) __TBB_override {
243 __TBB_ASSERT( false, "Unreachable code" );
247 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
248 new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
251 #if __TBB_CPP11_RVALUE_REF_PRESENT
252 virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
253 new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
256 virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
257 __TBB_ASSERT( false, "Unreachable code" );
261 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
262 T& from = get_ref(src,index);
264 *static_cast<T*>(dst) = tbb::internal::move( from );
267 virtual page *allocate_page() __TBB_override {
268 size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
269 page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
271 internal::throw_exception(internal::eid_bad_alloc);
275 virtual void deallocate_page( page *p ) __TBB_override {
276 size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
277 my_allocator.deallocate( reinterpret_cast<char*>(p), n );
281 //! Element type in the queue.
282 typedef T value_type;
285 typedef A allocator_type;
288 typedef T& reference;
290 //! Const reference type
291 typedef const T& const_reference;
293 //! Integral type for representing size of the queue.
294 /** Note that the size_type is a signed integral type.
295 This is because the size can be negative if there are pending pops without corresponding pushes. */
296 typedef std::ptrdiff_t size_type;
298 //! Difference type for iterator
299 typedef std::ptrdiff_t difference_type;
301 //! Construct empty queue
302 explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
303 concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
308 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
309 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
314 #if __TBB_CPP11_RVALUE_REF_PRESENT
315 //! Move constructors
316 concurrent_bounded_queue( concurrent_bounded_queue&& src )
317 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
319 internal_swap( src );
322 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
323 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
325 // checking that memory allocated by one instance of allocator can be deallocated
327 if( my_allocator == src.my_allocator) {
328 this->internal_swap( src );
330 // allocators are different => performing per-element move
331 this->move_content( src );
335 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
337 //! [begin,end) constructor
338 template<typename InputIterator>
339 concurrent_bounded_queue( InputIterator begin, InputIterator end,
340 const allocator_type& a = allocator_type())
341 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
343 for( ; begin != end; ++begin )
344 internal_push_if_not_full(&*begin);
348 ~concurrent_bounded_queue();
350 //! Enqueue an item at tail of queue.
351 void push( const T& source ) {
352 internal_push( &source );
355 #if __TBB_CPP11_RVALUE_REF_PRESENT
356 //! Move an item at tail of queue.
357 void push( T&& source ) {
358 internal_push_move( &source );
361 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
362 template<typename... Arguments>
363 void emplace( Arguments&&... args ) {
364 push( T(std::forward<Arguments>( args )...) );
366 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
367 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
369 //! Dequeue item from head of queue.
370 /** Block until an item becomes available, and then dequeue it. */
371 void pop( T& destination ) {
372 internal_pop( &destination );
375 #if TBB_USE_EXCEPTIONS
376 //! Abort all pending queue operations
382 //! Enqueue an item at tail of queue if queue is not already full.
383 /** Does not wait for queue to become not full.
384 Returns true if item is pushed; false if queue was already full. */
385 bool try_push( const T& source ) {
386 return internal_push_if_not_full( &source );
389 #if __TBB_CPP11_RVALUE_REF_PRESENT
390 //! Move an item at tail of queue if queue is not already full.
391 /** Does not wait for queue to become not full.
392 Returns true if item is pushed; false if queue was already full. */
393 bool try_push( T&& source ) {
394 return internal_push_move_if_not_full( &source );
396 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
397 template<typename... Arguments>
398 bool try_emplace( Arguments&&... args ) {
399 return try_push( T(std::forward<Arguments>( args )...) );
401 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
402 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
404 //! Attempt to dequeue an item from head of queue.
405 /** Does not wait for item to become available.
406 Returns true if successful; false otherwise. */
407 bool try_pop( T& destination ) {
408 return internal_pop_if_present( &destination );
411 //! Return number of pushes minus number of pops.
412 /** Note that the result can be negative if there are pops waiting for the
413 corresponding pushes. The result can also exceed capacity() if there
414 are push operations in flight. */
415 size_type size() const {return internal_size();}
417 //! Equivalent to size()<=0.
418 bool empty() const {return internal_empty();}
420 //! Maximum number of allowed elements
421 size_type capacity() const {
426 /** Setting the capacity to 0 causes subsequent try_push operations to always fail,
427 and subsequent push operations to block forever. */
428 void set_capacity( size_type new_capacity ) {
429 internal_set_capacity( new_capacity, sizeof(T) );
432 //! return allocator object
433 allocator_type get_allocator() const { return this->my_allocator; }
435 //! clear the queue. not thread-safe.
438 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
439 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
441 //------------------------------------------------------------------------
442 // The iterators are intended only for debugging. They are slow and not thread safe.
443 //------------------------------------------------------------------------
444 iterator unsafe_begin() {return iterator(*this);}
445 iterator unsafe_end() {return iterator();}
446 const_iterator unsafe_begin() const {return const_iterator(*this);}
447 const_iterator unsafe_end() const {return const_iterator();}
451 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
452 // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
453 template<typename InputIterator,
454 typename T = typename std::iterator_traits<InputIterator>::value_type,
455 typename A = cache_aligned_allocator<T>
456 > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
457 -> concurrent_bounded_queue<T, A>;
458 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
460 template<typename T, class A>
461 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
463 internal_finish_clear();
466 template<typename T, class A>
467 void concurrent_bounded_queue<T,A>::clear() {
469 while( try_pop(value) ) /*noop*/;
472 using strict_ppl::concurrent_queue;
476 #include "internal/_warning_suppress_disable_notice.h"
477 #undef __TBB_concurrent_queue_H_include_area
479 #endif /* __TBB_concurrent_queue_H */