2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
17 #include <boost/config.hpp>
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #include <boost/fiber/detail/spinlock.hpp>
24 #include <boost/fiber/exceptions.hpp>
26 #ifdef BOOST_HAS_ABI_HEADERS
27 # include BOOST_ABI_PREFIX
33 template< typename T >
34 class unbuffered_channel {
39 typedef context::wait_queue_t wait_queue_type;
41 struct alignas(cache_alignment) slot {
45 slot( value_type const& value_, context * ctx_) :
50 slot( value_type && value_, context * ctx_) :
51 value{ std::move( value_) },
57 alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr };
59 alignas(cache_alignment) std::atomic_bool closed_{ false };
60 alignas(cache_alignment) mutable detail::spinlock splk_producers_{};
61 wait_queue_type waiting_producers_{};
62 alignas( cache_alignment) mutable detail::spinlock splk_consumers_{};
63 wait_queue_type waiting_consumers_{};
64 char pad_[cacheline_length];
67 return nullptr == slot_.load( std::memory_order_acquire);
70 bool try_push_( slot * own_slot) {
72 slot * s = slot_.load( std::memory_order_acquire);
74 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
85 slot * nil_slot = nullptr;
87 slot * s = slot_.load( std::memory_order_acquire);
89 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
97 unbuffered_channel() = default;
99 ~unbuffered_channel() {
102 if ( nullptr != ( s = try_pop_() ) ) {
103 BOOST_ASSERT( nullptr != s);
104 BOOST_ASSERT( nullptr != s->ctx);
105 // value will be destructed in the context of the waiting fiber
106 context::active()->schedule( s->ctx);
110 unbuffered_channel( unbuffered_channel const&) = delete;
111 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
113 bool is_closed() const noexcept {
114 return closed_.load( std::memory_order_acquire);
117 void close() noexcept {
118 context * active_ctx = context::active();
119 // notify all waiting producers
120 closed_.store( true, std::memory_order_release);
121 detail::spinlock_lock lk1{ splk_producers_ };
122 while ( ! waiting_producers_.empty() ) {
123 context * producer_ctx = & waiting_producers_.front();
124 waiting_producers_.pop_front();
125 active_ctx->schedule( producer_ctx);
127 // notify all waiting consumers
128 detail::spinlock_lock lk2{ splk_consumers_ };
129 while ( ! waiting_consumers_.empty() ) {
130 context * consumer_ctx = & waiting_consumers_.front();
131 waiting_consumers_.pop_front();
132 active_ctx->schedule( consumer_ctx);
136 channel_op_status push( value_type const& value) {
137 context * active_ctx = context::active();
138 slot s{ value, active_ctx };
141 return channel_op_status::closed;
143 if ( try_push_( & s) ) {
144 detail::spinlock_lock lk{ splk_consumers_ };
145 // notify one waiting consumer
146 if ( ! waiting_consumers_.empty() ) {
147 context * consumer_ctx = & waiting_consumers_.front();
148 waiting_consumers_.pop_front();
149 active_ctx->schedule( consumer_ctx);
151 // suspend till value has been consumed
152 active_ctx->suspend( lk);
153 // resumed, value has been consumed
154 return channel_op_status::success;
156 detail::spinlock_lock lk{ splk_producers_ };
158 return channel_op_status::closed;
163 active_ctx->wait_link( waiting_producers_);
164 // suspend this producer
165 active_ctx->suspend( lk);
166 // resumed, slot mabye free
171 channel_op_status push( value_type && value) {
172 context * active_ctx = context::active();
173 slot s{ std::move( value), active_ctx };
176 return channel_op_status::closed;
178 if ( try_push_( & s) ) {
179 detail::spinlock_lock lk{ splk_consumers_ };
180 // notify one waiting consumer
181 if ( ! waiting_consumers_.empty() ) {
182 context * consumer_ctx = & waiting_consumers_.front();
183 waiting_consumers_.pop_front();
184 active_ctx->schedule( consumer_ctx);
186 // suspend till value has been consumed
187 active_ctx->suspend( lk);
188 // resumed, value has been consumed
189 return channel_op_status::success;
191 detail::spinlock_lock lk{ splk_producers_ };
193 return channel_op_status::closed;
198 active_ctx->wait_link( waiting_producers_);
199 // suspend this producer
200 active_ctx->suspend( lk);
201 // resumed, slot mabye free
206 template< typename Rep, typename Period >
207 channel_op_status push_wait_for( value_type const& value,
208 std::chrono::duration< Rep, Period > const& timeout_duration) {
209 return push_wait_until( value,
210 std::chrono::steady_clock::now() + timeout_duration);
213 template< typename Rep, typename Period >
214 channel_op_status push_wait_for( value_type && value,
215 std::chrono::duration< Rep, Period > const& timeout_duration) {
216 return push_wait_until( std::forward< value_type >( value),
217 std::chrono::steady_clock::now() + timeout_duration);
220 template< typename Clock, typename Duration >
221 channel_op_status push_wait_until( value_type const& value,
222 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
223 context * active_ctx = context::active();
224 slot s{ value, active_ctx };
225 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
228 return channel_op_status::closed;
230 if ( try_push_( & s) ) {
231 detail::spinlock_lock lk{ splk_consumers_ };
232 // notify one waiting consumer
233 if ( ! waiting_consumers_.empty() ) {
234 context * consumer_ctx = & waiting_consumers_.front();
235 waiting_consumers_.pop_front();
236 active_ctx->schedule( consumer_ctx);
238 // suspend this producer
239 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
241 slot * nil_slot = nullptr, * own_slot = & s;
242 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
243 // resumed, value has not been consumed
244 return channel_op_status::timeout;
246 // resumed, value has been consumed
247 return channel_op_status::success;
249 detail::spinlock_lock lk{ splk_producers_ };
251 return channel_op_status::closed;
256 active_ctx->wait_link( waiting_producers_);
257 // suspend this producer
258 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
261 // remove from waiting-queue
262 waiting_producers_.remove( * active_ctx);
263 return channel_op_status::timeout;
265 // resumed, slot maybe free
270 template< typename Clock, typename Duration >
271 channel_op_status push_wait_until( value_type && value,
272 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
273 context * active_ctx = context::active();
274 slot s{ std::move( value), active_ctx };
275 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
278 return channel_op_status::closed;
280 if ( try_push_( & s) ) {
281 detail::spinlock_lock lk{ splk_consumers_ };
282 // notify one waiting consumer
283 if ( ! waiting_consumers_.empty() ) {
284 context * consumer_ctx = & waiting_consumers_.front();
285 waiting_consumers_.pop_front();
286 active_ctx->schedule( consumer_ctx);
288 // suspend this producer
289 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
291 slot * nil_slot = nullptr, * own_slot = & s;
292 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
293 // resumed, value has not been consumed
294 return channel_op_status::timeout;
296 // resumed, value has been consumed
297 return channel_op_status::success;
299 detail::spinlock_lock lk{ splk_producers_ };
301 return channel_op_status::closed;
306 active_ctx->wait_link( waiting_producers_);
307 // suspend this producer
308 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
311 // remove from waiting-queue
312 waiting_producers_.remove( * active_ctx);
313 return channel_op_status::timeout;
315 // resumed, slot maybe free
320 channel_op_status pop( value_type & value) {
321 context * active_ctx = context::active();
324 if ( nullptr != ( s = try_pop_() ) ) {
326 detail::spinlock_lock lk{ splk_producers_ };
327 // notify one waiting producer
328 if ( ! waiting_producers_.empty() ) {
329 context * producer_ctx = & waiting_producers_.front();
330 waiting_producers_.pop_front();
332 active_ctx->schedule( producer_ctx);
336 value = std::move( s->value);
337 // resume suspended producer
338 active_ctx->schedule( s->ctx);
339 return channel_op_status::success;
341 detail::spinlock_lock lk{ splk_consumers_ };
343 return channel_op_status::closed;
345 if ( ! is_empty_() ) {
348 active_ctx->wait_link( waiting_consumers_);
349 // suspend this consumer
350 active_ctx->suspend( lk);
351 // resumed, slot mabye set
356 value_type value_pop() {
357 context * active_ctx = context::active();
360 if ( nullptr != ( s = try_pop_() ) ) {
362 detail::spinlock_lock lk{ splk_producers_ };
363 // notify one waiting producer
364 if ( ! waiting_producers_.empty() ) {
365 context * producer_ctx = & waiting_producers_.front();
366 waiting_producers_.pop_front();
368 active_ctx->schedule( producer_ctx);
372 value_type value{ std::move( s->value) };
373 // resume suspended producer
374 active_ctx->schedule( s->ctx);
375 return std::move( value);
377 detail::spinlock_lock lk{ splk_consumers_ };
380 std::make_error_code( std::errc::operation_not_permitted),
381 "boost fiber: channel is closed" };
383 if ( ! is_empty_() ) {
386 active_ctx->wait_link( waiting_consumers_);
387 // suspend this consumer
388 active_ctx->suspend( lk);
389 // resumed, slot mabye set
394 template< typename Rep, typename Period >
395 channel_op_status pop_wait_for( value_type & value,
396 std::chrono::duration< Rep, Period > const& timeout_duration) {
397 return pop_wait_until( value,
398 std::chrono::steady_clock::now() + timeout_duration);
401 template< typename Clock, typename Duration >
402 channel_op_status pop_wait_until( value_type & value,
403 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
404 context * active_ctx = context::active();
406 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
408 if ( nullptr != ( s = try_pop_() ) ) {
410 detail::spinlock_lock lk{ splk_producers_ };
411 // notify one waiting producer
412 if ( ! waiting_producers_.empty() ) {
413 context * producer_ctx = & waiting_producers_.front();
414 waiting_producers_.pop_front();
416 active_ctx->schedule( producer_ctx);
420 value = std::move( s->value);
421 // resume suspended producer
422 active_ctx->schedule( s->ctx);
423 return channel_op_status::success;
425 detail::spinlock_lock lk{ splk_consumers_ };
427 return channel_op_status::closed;
429 if ( ! is_empty_() ) {
432 active_ctx->wait_link( waiting_consumers_);
433 // suspend this consumer
434 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
437 // remove from waiting-queue
438 waiting_consumers_.remove( * active_ctx);
439 return channel_op_status::timeout;
445 class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
447 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
449 unbuffered_channel * chan_{ nullptr };
450 storage_type storage_;
453 BOOST_ASSERT( nullptr != chan_);
455 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
456 } catch ( fiber_error const&) {
462 typedef typename iterator::pointer pointer_t;
463 typedef typename iterator::reference reference_t;
465 iterator() noexcept = default;
467 explicit iterator( unbuffered_channel< T > * chan) noexcept :
472 iterator( iterator const& other) noexcept :
473 chan_{ other.chan_ } {
476 iterator & operator=( iterator const& other) noexcept {
477 if ( this == & other) return * this;
482 bool operator==( iterator const& other) const noexcept {
483 return other.chan_ == chan_;
486 bool operator!=( iterator const& other) const noexcept {
487 return other.chan_ != chan_;
490 iterator & operator++() {
495 iterator operator++( int) = delete;
497 reference_t operator*() noexcept {
498 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
501 pointer_t operator->() noexcept {
502 return reinterpret_cast< value_type * >( std::addressof( storage_) );
506 friend class iterator;
509 template< typename T >
510 typename unbuffered_channel< T >::iterator
511 begin( unbuffered_channel< T > & chan) {
512 return typename unbuffered_channel< T >::iterator( & chan);
515 template< typename T >
516 typename unbuffered_channel< T >::iterator
517 end( unbuffered_channel< T > &) {
518 return typename unbuffered_channel< T >::iterator();
523 #ifdef BOOST_HAS_ABI_HEADERS
524 # include BOOST_ABI_SUFFIX
527 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H