2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
17 #include <boost/config.hpp>
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
24 #include <boost/fiber/detail/exchange.hpp>
26 #include <boost/fiber/detail/spinlock.hpp>
27 #include <boost/fiber/exceptions.hpp>
29 #ifdef BOOST_HAS_ABI_HEADERS
30 # include BOOST_ABI_PREFIX
36 template< typename T >
37 class unbuffered_channel {
39 typedef typename std::remove_reference< T >::type value_type;
42 typedef context::wait_queue_t wait_queue_type;
48 slot( value_type const& value_, context * ctx_) :
53 slot( value_type && value_, context * ctx_) :
54 value{ std::move( value_) },
60 std::atomic< slot * > slot_{ nullptr };
62 std::atomic_bool closed_{ false };
63 mutable detail::spinlock splk_producers_{};
64 wait_queue_type waiting_producers_{};
65 mutable detail::spinlock splk_consumers_{};
66 wait_queue_type waiting_consumers_{};
67 char pad_[cacheline_length];
70 return nullptr == slot_.load( std::memory_order_acquire);
73 bool try_push_( slot * own_slot) {
75 slot * s = slot_.load( std::memory_order_acquire);
77 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
88 slot * nil_slot = nullptr;
90 slot * s = slot_.load( std::memory_order_acquire);
92 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
100 unbuffered_channel() {
103 ~unbuffered_channel() {
107 unbuffered_channel( unbuffered_channel const&) = delete;
108 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
110 bool is_closed() const noexcept {
111 return closed_.load( std::memory_order_acquire);
114 void close() noexcept {
115 context * active_ctx = context::active();
117 if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
118 // notify current waiting
119 slot * s = slot_.load( std::memory_order_acquire);
122 active_ctx->schedule( s->ctx);
124 // notify all waiting producers
125 detail::spinlock_lock lk1{ splk_producers_ };
126 while ( ! waiting_producers_.empty() ) {
127 context * producer_ctx = & waiting_producers_.front();
128 waiting_producers_.pop_front();
129 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
130 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
132 active_ctx->schedule( producer_ctx);
133 } else if ( static_cast< std::intptr_t >( 0) == expected) {
136 active_ctx->schedule( producer_ctx);
139 // notify all waiting consumers
140 detail::spinlock_lock lk2{ splk_consumers_ };
141 while ( ! waiting_consumers_.empty() ) {
142 context * consumer_ctx = & waiting_consumers_.front();
143 waiting_consumers_.pop_front();
144 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
145 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
147 active_ctx->schedule( consumer_ctx);
148 } else if ( static_cast< std::intptr_t >( 0) == expected) {
151 active_ctx->schedule( consumer_ctx);
157 channel_op_status push( value_type const& value) {
158 context * active_ctx = context::active();
159 slot s{ value, active_ctx };
161 if ( BOOST_UNLIKELY( is_closed() ) ) {
162 return channel_op_status::closed;
164 if ( try_push_( & s) ) {
165 detail::spinlock_lock lk{ splk_consumers_ };
166 // notify one waiting consumer
167 while ( ! waiting_consumers_.empty() ) {
168 context * consumer_ctx = & waiting_consumers_.front();
169 waiting_consumers_.pop_front();
170 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
171 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
173 active_ctx->schedule( consumer_ctx);
175 } else if ( static_cast< std::intptr_t >( 0) == expected) {
178 active_ctx->schedule( consumer_ctx);
182 // suspend till value has been consumed
183 active_ctx->suspend( lk);
185 if ( nullptr == s.ctx) {
186 // value has been consumed
187 return channel_op_status::success;
189 // channel was closed before value was consumed
190 return channel_op_status::closed;
193 detail::spinlock_lock lk{ splk_producers_ };
194 if ( BOOST_UNLIKELY( is_closed() ) ) {
195 return channel_op_status::closed;
200 active_ctx->wait_link( waiting_producers_);
201 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
202 // suspend this producer
203 active_ctx->suspend( lk);
204 // resumed, slot mabye free
209 channel_op_status push( value_type && value) {
210 context * active_ctx = context::active();
211 slot s{ std::move( value), active_ctx };
213 if ( BOOST_UNLIKELY( is_closed() ) ) {
214 return channel_op_status::closed;
216 if ( try_push_( & s) ) {
217 detail::spinlock_lock lk{ splk_consumers_ };
218 // notify one waiting consumer
219 while ( ! waiting_consumers_.empty() ) {
220 context * consumer_ctx = & waiting_consumers_.front();
221 waiting_consumers_.pop_front();
222 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
223 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
225 active_ctx->schedule( consumer_ctx);
227 } else if ( static_cast< std::intptr_t >( 0) == expected) {
230 active_ctx->schedule( consumer_ctx);
234 // suspend till value has been consumed
235 active_ctx->suspend( lk);
237 if ( nullptr == s.ctx) {
238 // value has been consumed
239 return channel_op_status::success;
241 // channel was closed before value was consumed
242 return channel_op_status::closed;
245 detail::spinlock_lock lk{ splk_producers_ };
246 if ( BOOST_UNLIKELY( is_closed() ) ) {
247 return channel_op_status::closed;
252 active_ctx->wait_link( waiting_producers_);
253 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
254 // suspend this producer
255 active_ctx->suspend( lk);
256 // resumed, slot mabye free
261 template< typename Rep, typename Period >
262 channel_op_status push_wait_for( value_type const& value,
263 std::chrono::duration< Rep, Period > const& timeout_duration) {
264 return push_wait_until( value,
265 std::chrono::steady_clock::now() + timeout_duration);
268 template< typename Rep, typename Period >
269 channel_op_status push_wait_for( value_type && value,
270 std::chrono::duration< Rep, Period > const& timeout_duration) {
271 return push_wait_until( std::forward< value_type >( value),
272 std::chrono::steady_clock::now() + timeout_duration);
275 template< typename Clock, typename Duration >
276 channel_op_status push_wait_until( value_type const& value,
277 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
278 context * active_ctx = context::active();
279 slot s{ value, active_ctx };
280 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
282 if ( BOOST_UNLIKELY( is_closed() ) ) {
283 return channel_op_status::closed;
285 if ( try_push_( & s) ) {
286 detail::spinlock_lock lk{ splk_consumers_ };
287 // notify one waiting consumer
288 while ( ! waiting_consumers_.empty() ) {
289 context * consumer_ctx = & waiting_consumers_.front();
290 waiting_consumers_.pop_front();
291 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
292 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
294 active_ctx->schedule( consumer_ctx);
296 } else if ( static_cast< std::intptr_t >( 0) == expected) {
299 active_ctx->schedule( consumer_ctx);
303 // suspend this producer
304 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
305 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
307 slot * nil_slot = nullptr, * own_slot = & s;
308 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
309 // resumed, value has not been consumed
310 return channel_op_status::timeout;
313 if ( nullptr == s.ctx) {
314 // value has been consumed
315 return channel_op_status::success;
317 // channel was closed before value was consumed
318 return channel_op_status::closed;
321 detail::spinlock_lock lk{ splk_producers_ };
322 if ( BOOST_UNLIKELY( is_closed() ) ) {
323 return channel_op_status::closed;
328 active_ctx->wait_link( waiting_producers_);
329 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
330 // suspend this producer
331 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
334 // remove from waiting-queue
335 waiting_producers_.remove( * active_ctx);
336 return channel_op_status::timeout;
338 // resumed, slot maybe free
343 template< typename Clock, typename Duration >
344 channel_op_status push_wait_until( value_type && value,
345 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
346 context * active_ctx = context::active();
347 slot s{ std::move( value), active_ctx };
348 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
350 if ( BOOST_UNLIKELY( is_closed() ) ) {
351 return channel_op_status::closed;
353 if ( try_push_( & s) ) {
354 detail::spinlock_lock lk{ splk_consumers_ };
355 // notify one waiting consumer
356 while ( ! waiting_consumers_.empty() ) {
357 context * consumer_ctx = & waiting_consumers_.front();
358 waiting_consumers_.pop_front();
359 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
360 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
362 active_ctx->schedule( consumer_ctx);
364 } else if ( static_cast< std::intptr_t >( 0) == expected) {
367 active_ctx->schedule( consumer_ctx);
371 // suspend this producer
372 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
373 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
375 slot * nil_slot = nullptr, * own_slot = & s;
376 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
377 // resumed, value has not been consumed
378 return channel_op_status::timeout;
381 if ( nullptr == s.ctx) {
382 // value has been consumed
383 return channel_op_status::success;
385 // channel was closed before value was consumed
386 return channel_op_status::closed;
389 detail::spinlock_lock lk{ splk_producers_ };
390 if ( BOOST_UNLIKELY( is_closed() ) ) {
391 return channel_op_status::closed;
396 active_ctx->wait_link( waiting_producers_);
397 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
398 // suspend this producer
399 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
402 // remove from waiting-queue
403 waiting_producers_.remove( * active_ctx);
404 return channel_op_status::timeout;
406 // resumed, slot maybe free
411 channel_op_status pop( value_type & value) {
412 context * active_ctx = context::active();
415 if ( nullptr != ( s = try_pop_() ) ) {
417 detail::spinlock_lock lk{ splk_producers_ };
418 // notify one waiting producer
419 while ( ! waiting_producers_.empty() ) {
420 context * producer_ctx = & waiting_producers_.front();
421 waiting_producers_.pop_front();
422 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
423 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
426 active_ctx->schedule( producer_ctx);
428 } else if ( static_cast< std::intptr_t >( 0) == expected) {
432 active_ctx->schedule( producer_ctx);
437 value = std::move( s->value);
439 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
440 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
442 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
444 return channel_op_status::success;
446 detail::spinlock_lock lk{ splk_consumers_ };
447 if ( BOOST_UNLIKELY( is_closed() ) ) {
448 return channel_op_status::closed;
450 if ( ! is_empty_() ) {
453 active_ctx->wait_link( waiting_consumers_);
454 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
455 // suspend this consumer
456 active_ctx->suspend( lk);
457 // resumed, slot mabye set
462 value_type value_pop() {
463 context * active_ctx = context::active();
466 if ( nullptr != ( s = try_pop_() ) ) {
468 detail::spinlock_lock lk{ splk_producers_ };
469 // notify one waiting producer
470 while ( ! waiting_producers_.empty() ) {
471 context * producer_ctx = & waiting_producers_.front();
472 waiting_producers_.pop_front();
473 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
474 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
477 active_ctx->schedule( producer_ctx);
479 } else if ( static_cast< std::intptr_t >( 0) == expected) {
483 active_ctx->schedule( producer_ctx);
489 value_type value = std::move( s->value);
491 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
492 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
494 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
496 return std::move( value);
498 detail::spinlock_lock lk{ splk_consumers_ };
499 if ( BOOST_UNLIKELY( is_closed() ) ) {
501 std::make_error_code( std::errc::operation_not_permitted),
502 "boost fiber: channel is closed" };
504 if ( ! is_empty_() ) {
507 active_ctx->wait_link( waiting_consumers_);
508 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
509 // suspend this consumer
510 active_ctx->suspend( lk);
511 // resumed, slot mabye set
516 template< typename Rep, typename Period >
517 channel_op_status pop_wait_for( value_type & value,
518 std::chrono::duration< Rep, Period > const& timeout_duration) {
519 return pop_wait_until( value,
520 std::chrono::steady_clock::now() + timeout_duration);
523 template< typename Clock, typename Duration >
524 channel_op_status pop_wait_until( value_type & value,
525 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
526 context * active_ctx = context::active();
528 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
530 if ( nullptr != ( s = try_pop_() ) ) {
532 detail::spinlock_lock lk{ splk_producers_ };
533 // notify one waiting producer
534 while ( ! waiting_producers_.empty() ) {
535 context * producer_ctx = & waiting_producers_.front();
536 waiting_producers_.pop_front();
537 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
538 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
541 active_ctx->schedule( producer_ctx);
543 } else if ( static_cast< std::intptr_t >( 0) == expected) {
547 active_ctx->schedule( producer_ctx);
553 value = std::move( s->value);
555 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
556 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
558 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
560 return channel_op_status::success;
562 detail::spinlock_lock lk{ splk_consumers_ };
563 if ( BOOST_UNLIKELY( is_closed() ) ) {
564 return channel_op_status::closed;
566 if ( ! is_empty_() ) {
569 active_ctx->wait_link( waiting_consumers_);
570 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
571 // suspend this consumer
572 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
575 // remove from waiting-queue
576 waiting_consumers_.remove( * active_ctx);
577 return channel_op_status::timeout;
585 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
587 unbuffered_channel * chan_{ nullptr };
588 storage_type storage_;
591 BOOST_ASSERT( nullptr != chan_);
593 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
594 } catch ( fiber_error const&) {
600 typedef std::input_iterator_tag iterator_category;
601 typedef std::ptrdiff_t difference_type;
602 typedef value_type * pointer;
603 typedef value_type & reference;
605 typedef pointer pointer_t;
606 typedef reference reference_t;
608 iterator() noexcept = default;
610 explicit iterator( unbuffered_channel< T > * chan) noexcept :
615 iterator( iterator const& other) noexcept :
616 chan_{ other.chan_ } {
619 iterator & operator=( iterator const& other) noexcept {
620 if ( this == & other) return * this;
625 bool operator==( iterator const& other) const noexcept {
626 return other.chan_ == chan_;
629 bool operator!=( iterator const& other) const noexcept {
630 return other.chan_ != chan_;
633 iterator & operator++() {
634 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
639 iterator operator++( int) = delete;
641 reference_t operator*() noexcept {
642 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
645 pointer_t operator->() noexcept {
646 return reinterpret_cast< value_type * >( std::addressof( storage_) );
650 friend class iterator;
653 template< typename T >
654 typename unbuffered_channel< T >::iterator
655 begin( unbuffered_channel< T > & chan) {
656 return typename unbuffered_channel< T >::iterator( & chan);
659 template< typename T >
660 typename unbuffered_channel< T >::iterator
661 end( unbuffered_channel< T > &) {
662 return typename unbuffered_channel< T >::iterator();
667 #ifdef BOOST_HAS_ABI_HEADERS
668 # include BOOST_ABI_SUFFIX
671 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H