2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
16 #include <type_traits>
18 #include <boost/config.hpp>
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/detail/config.hpp>
23 #include <boost/fiber/detail/convert.hpp>
24 #include <boost/fiber/detail/spinlock.hpp>
25 #include <boost/fiber/exceptions.hpp>
27 #ifdef BOOST_HAS_ABI_HEADERS
28 # include BOOST_ABI_PREFIX
34 template< typename T >
35 class buffered_channel {
37 typedef typename std::remove_reference< T >::type value_type;
40 typedef context::wait_queue_t wait_queue_type;
41 typedef value_type slot_type;
43 mutable detail::spinlock splk_{};
44 wait_queue_type waiting_producers_{};
45 wait_queue_type waiting_consumers_{};
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
60 bool is_closed_() const noexcept {
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
71 slots_ = new slot_type[capacity_];
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
87 void close() noexcept {
88 context * active_ctx = context::active();
89 detail::spinlock_lock lk{ splk_ };
92 // notify all waiting producers
93 while ( ! waiting_producers_.empty() ) {
94 context * producer_ctx = & waiting_producers_.front();
95 waiting_producers_.pop_front();
96 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
97 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
99 active_ctx->schedule( producer_ctx);
100 } else if ( static_cast< std::intptr_t >( 0) == expected) {
103 active_ctx->schedule( producer_ctx);
106 // notify all waiting consumers
107 while ( ! waiting_consumers_.empty() ) {
108 context * consumer_ctx = & waiting_consumers_.front();
109 waiting_consumers_.pop_front();
110 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
111 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
113 active_ctx->schedule( consumer_ctx);
114 } else if ( static_cast< std::intptr_t >( 0) == expected) {
117 active_ctx->schedule( consumer_ctx);
123 channel_op_status try_push( value_type const& value) {
124 context * active_ctx = context::active();
125 detail::spinlock_lock lk{ splk_ };
126 if ( BOOST_UNLIKELY( is_closed_() ) ) {
127 return channel_op_status::closed;
128 } else if ( is_full_() ) {
129 return channel_op_status::full;
131 slots_[pidx_] = value;
132 pidx_ = (pidx_ + 1) % capacity_;
133 // notify one waiting consumer
134 while ( ! waiting_consumers_.empty() ) {
135 context * consumer_ctx = & waiting_consumers_.front();
136 waiting_consumers_.pop_front();
137 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
138 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
141 active_ctx->schedule( consumer_ctx);
143 } else if ( static_cast< std::intptr_t >( 0) == expected) {
147 active_ctx->schedule( consumer_ctx);
151 return channel_op_status::success;
155 channel_op_status try_push( value_type && value) {
156 context * active_ctx = context::active();
157 detail::spinlock_lock lk{ splk_ };
158 if ( BOOST_UNLIKELY( is_closed_() ) ) {
159 return channel_op_status::closed;
160 } else if ( is_full_() ) {
161 return channel_op_status::full;
163 slots_[pidx_] = std::move( value);
164 pidx_ = (pidx_ + 1) % capacity_;
165 // notify one waiting consumer
166 while ( ! waiting_consumers_.empty() ) {
167 context * consumer_ctx = & waiting_consumers_.front();
168 waiting_consumers_.pop_front();
169 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
170 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
173 active_ctx->schedule( consumer_ctx);
175 } else if ( static_cast< std::intptr_t >( 0) == expected) {
179 active_ctx->schedule( consumer_ctx);
183 return channel_op_status::success;
187 channel_op_status push( value_type const& value) {
188 context * active_ctx = context::active();
190 detail::spinlock_lock lk{ splk_ };
191 if ( BOOST_UNLIKELY( is_closed_() ) ) {
192 return channel_op_status::closed;
193 } else if ( is_full_() ) {
194 active_ctx->wait_link( waiting_producers_);
195 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
196 // suspend this producer
197 active_ctx->suspend( lk);
199 slots_[pidx_] = value;
200 pidx_ = (pidx_ + 1) % capacity_;
201 // notify one waiting consumer
202 while ( ! waiting_consumers_.empty() ) {
203 context * consumer_ctx = & waiting_consumers_.front();
204 waiting_consumers_.pop_front();
205 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
206 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
209 active_ctx->schedule( consumer_ctx);
211 } else if ( static_cast< std::intptr_t >( 0) == expected) {
215 active_ctx->schedule( consumer_ctx);
219 return channel_op_status::success;
224 channel_op_status push( value_type && value) {
225 context * active_ctx = context::active();
227 detail::spinlock_lock lk{ splk_ };
228 if ( BOOST_UNLIKELY( is_closed_() ) ) {
229 return channel_op_status::closed;
230 } else if ( is_full_() ) {
231 active_ctx->wait_link( waiting_producers_);
232 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
233 // suspend this producer
234 active_ctx->suspend( lk);
236 slots_[pidx_] = std::move( value);
237 pidx_ = (pidx_ + 1) % capacity_;
238 // notify one waiting consumer
239 while ( ! waiting_consumers_.empty() ) {
240 context * consumer_ctx = & waiting_consumers_.front();
241 waiting_consumers_.pop_front();
242 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
243 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
246 active_ctx->schedule( consumer_ctx);
248 } else if ( static_cast< std::intptr_t >( 0) == expected) {
252 active_ctx->schedule( consumer_ctx);
256 return channel_op_status::success;
261 template< typename Rep, typename Period >
262 channel_op_status push_wait_for( value_type const& value,
263 std::chrono::duration< Rep, Period > const& timeout_duration) {
264 return push_wait_until( value,
265 std::chrono::steady_clock::now() + timeout_duration);
268 template< typename Rep, typename Period >
269 channel_op_status push_wait_for( value_type && value,
270 std::chrono::duration< Rep, Period > const& timeout_duration) {
271 return push_wait_until( std::forward< value_type >( value),
272 std::chrono::steady_clock::now() + timeout_duration);
275 template< typename Clock, typename Duration >
276 channel_op_status push_wait_until( value_type const& value,
277 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
278 context * active_ctx = context::active();
279 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
281 detail::spinlock_lock lk{ splk_ };
282 if ( BOOST_UNLIKELY( is_closed_() ) ) {
283 return channel_op_status::closed;
284 } else if ( is_full_() ) {
285 active_ctx->wait_link( waiting_producers_);
286 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
287 // suspend this producer
288 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
291 // remove from waiting-queue
292 waiting_producers_.remove( * active_ctx);
293 return channel_op_status::timeout;
296 slots_[pidx_] = value;
297 pidx_ = (pidx_ + 1) % capacity_;
298 // notify one waiting consumer
299 while ( ! waiting_consumers_.empty() ) {
300 context * consumer_ctx = & waiting_consumers_.front();
301 waiting_consumers_.pop_front();
302 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
303 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
306 active_ctx->schedule( consumer_ctx);
308 } else if ( static_cast< std::intptr_t >( 0) == expected) {
312 active_ctx->schedule( consumer_ctx);
316 return channel_op_status::success;
321 template< typename Clock, typename Duration >
322 channel_op_status push_wait_until( value_type && value,
323 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
324 context * active_ctx = context::active();
325 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
327 detail::spinlock_lock lk{ splk_ };
328 if ( BOOST_UNLIKELY( is_closed_() ) ) {
329 return channel_op_status::closed;
330 } else if ( is_full_() ) {
331 active_ctx->wait_link( waiting_producers_);
332 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
333 // suspend this producer
334 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
337 // remove from waiting-queue
338 waiting_producers_.remove( * active_ctx);
339 return channel_op_status::timeout;
342 slots_[pidx_] = std::move( value);
343 pidx_ = (pidx_ + 1) % capacity_;
344 // notify one waiting consumer
345 while ( ! waiting_consumers_.empty() ) {
346 context * consumer_ctx = & waiting_consumers_.front();
347 waiting_consumers_.pop_front();
348 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
349 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
352 active_ctx->schedule( consumer_ctx);
354 } else if ( static_cast< std::intptr_t >( 0) == expected) {
358 active_ctx->schedule( consumer_ctx);
362 return channel_op_status::success;
367 channel_op_status try_pop( value_type & value) {
368 context * active_ctx = context::active();
369 detail::spinlock_lock lk{ splk_ };
372 ? channel_op_status::closed
373 : channel_op_status::empty;
375 value = std::move( slots_[cidx_]);
376 cidx_ = (cidx_ + 1) % capacity_;
377 // notify one waiting producer
378 while ( ! waiting_producers_.empty() ) {
379 context * producer_ctx = & waiting_producers_.front();
380 waiting_producers_.pop_front();
381 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
382 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
385 active_ctx->schedule( producer_ctx);
387 } else if ( static_cast< std::intptr_t >( 0) == expected) {
391 active_ctx->schedule( producer_ctx);
395 return channel_op_status::success;
399 channel_op_status pop( value_type & value) {
400 context * active_ctx = context::active();
402 detail::spinlock_lock lk{ splk_ };
404 if ( BOOST_UNLIKELY( is_closed_() ) ) {
405 return channel_op_status::closed;
407 active_ctx->wait_link( waiting_consumers_);
408 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
409 // suspend this consumer
410 active_ctx->suspend( lk);
413 value = std::move( slots_[cidx_]);
414 cidx_ = (cidx_ + 1) % capacity_;
415 // notify one waiting producer
416 while ( ! waiting_producers_.empty() ) {
417 context * producer_ctx = & waiting_producers_.front();
418 waiting_producers_.pop_front();
419 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
420 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
423 active_ctx->schedule( producer_ctx);
425 } else if ( static_cast< std::intptr_t >( 0) == expected) {
429 active_ctx->schedule( producer_ctx);
433 return channel_op_status::success;
438 value_type value_pop() {
439 context * active_ctx = context::active();
441 detail::spinlock_lock lk{ splk_ };
443 if ( BOOST_UNLIKELY( is_closed_() ) ) {
445 std::make_error_code( std::errc::operation_not_permitted),
446 "boost fiber: channel is closed" };
448 active_ctx->wait_link( waiting_consumers_);
449 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
450 // suspend this consumer
451 active_ctx->suspend( lk);
454 value_type value = std::move( slots_[cidx_]);
455 cidx_ = (cidx_ + 1) % capacity_;
456 // notify one waiting producer
457 while ( ! waiting_producers_.empty() ) {
458 context * producer_ctx = & waiting_producers_.front();
459 waiting_producers_.pop_front();
460 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
461 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
464 active_ctx->schedule( producer_ctx);
466 } else if ( static_cast< std::intptr_t >( 0) == expected) {
470 active_ctx->schedule( producer_ctx);
474 return std::move( value);
479 template< typename Rep, typename Period >
480 channel_op_status pop_wait_for( value_type & value,
481 std::chrono::duration< Rep, Period > const& timeout_duration) {
482 return pop_wait_until( value,
483 std::chrono::steady_clock::now() + timeout_duration);
486 template< typename Clock, typename Duration >
487 channel_op_status pop_wait_until( value_type & value,
488 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
489 context * active_ctx = context::active();
490 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
492 detail::spinlock_lock lk{ splk_ };
494 if ( BOOST_UNLIKELY( is_closed_() ) ) {
495 return channel_op_status::closed;
497 active_ctx->wait_link( waiting_consumers_);
498 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
499 // suspend this consumer
500 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
503 // remove from waiting-queue
504 waiting_consumers_.remove( * active_ctx);
505 return channel_op_status::timeout;
509 value = std::move( slots_[cidx_]);
510 cidx_ = (cidx_ + 1) % capacity_;
511 // notify one waiting producer
512 while ( ! waiting_producers_.empty() ) {
513 context * producer_ctx = & waiting_producers_.front();
514 waiting_producers_.pop_front();
515 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
516 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
519 active_ctx->schedule( producer_ctx);
521 } else if ( static_cast< std::intptr_t >( 0) == expected) {
525 active_ctx->schedule( producer_ctx);
529 return channel_op_status::success;
536 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
538 buffered_channel * chan_{ nullptr };
539 storage_type storage_;
542 BOOST_ASSERT( nullptr != chan_);
544 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
545 } catch ( fiber_error const&) {
551 typedef std::input_iterator_tag iterator_category;
552 typedef std::ptrdiff_t difference_type;
553 typedef value_type * pointer;
554 typedef value_type & reference;
556 typedef pointer pointer_t;
557 typedef reference reference_t;
559 iterator() noexcept = default;
561 explicit iterator( buffered_channel< T > * chan) noexcept :
566 iterator( iterator const& other) noexcept :
567 chan_{ other.chan_ } {
570 iterator & operator=( iterator const& other) noexcept {
571 if ( BOOST_LIKELY( this != & other) ) {
577 bool operator==( iterator const& other) const noexcept {
578 return other.chan_ == chan_;
581 bool operator!=( iterator const& other) const noexcept {
582 return other.chan_ != chan_;
585 iterator & operator++() {
586 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
591 iterator operator++( int) = delete;
593 reference_t operator*() noexcept {
594 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
597 pointer_t operator->() noexcept {
598 return reinterpret_cast< value_type * >( std::addressof( storage_) );
602 friend class iterator;
605 template< typename T >
606 typename buffered_channel< T >::iterator
607 begin( buffered_channel< T > & chan) {
608 return typename buffered_channel< T >::iterator( & chan);
611 template< typename T >
612 typename buffered_channel< T >::iterator
613 end( buffered_channel< T > &) {
614 return typename buffered_channel< T >::iterator();
619 #ifdef BOOST_HAS_ABI_HEADERS
620 # include BOOST_ABI_SUFFIX
623 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H