Imported Upstream version 1.64.0
[platform/upstream/boost.git] / boost / fiber / unbuffered_channel.hpp
1
2 //          Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
9
10 #include <atomic>
11 #include <chrono>
12 #include <cstddef>
13 #include <cstdint>
14 #include <memory>
15 #include <vector>
16
17 #include <boost/config.hpp>
18
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #include <boost/fiber/detail/spinlock.hpp>
24 #include <boost/fiber/exceptions.hpp>
25
26 #ifdef BOOST_HAS_ABI_HEADERS
27 #  include BOOST_ABI_PREFIX
28 #endif
29
30 namespace boost {
31 namespace fibers {
32
33 template< typename T >
34 class unbuffered_channel {
35 public:
36     typedef T   value_type;
37
38 private:
39     typedef context::wait_queue_t   wait_queue_type;
40
41     struct alignas(cache_alignment) slot {
42         value_type  value;
43         context *   ctx;
44
45         slot( value_type const& value_, context * ctx_) :
46             value{ value_ },
47             ctx{ ctx_ } {
48         }
49
50         slot( value_type && value_, context * ctx_) :
51             value{ std::move( value_) },
52             ctx{ ctx_ } {
53         }
54     };
55
56     // shared cacheline
57     alignas(cache_alignment) std::atomic< slot * >      slot_{ nullptr };
58     // shared cacheline
59     alignas(cache_alignment) std::atomic_bool           closed_{ false };
60     alignas(cache_alignment) mutable detail::spinlock   splk_producers_{};
61     wait_queue_type                                     waiting_producers_{};
62     alignas( cache_alignment) mutable detail::spinlock  splk_consumers_{};
63     wait_queue_type                                     waiting_consumers_{};
64     char                                                pad_[cacheline_length];
65
66     bool is_empty_() {
67         return nullptr == slot_.load( std::memory_order_acquire);
68     }
69
70     bool try_push_( slot * own_slot) {
71         for (;;) {
72             slot * s = slot_.load( std::memory_order_acquire);
73             if ( nullptr == s) {
74                 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
75                     continue;
76                 }
77                 return true;
78             } else {
79                 return false;
80             }
81         }
82     }
83
84     slot * try_pop_() {
85         slot * nil_slot = nullptr;
86         for (;;) {
87             slot * s = slot_.load( std::memory_order_acquire);
88             if ( nullptr != s) {
89                 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
90                     continue;}
91             }
92             return s;
93         }
94     }
95
96 public:
97     unbuffered_channel() = default;
98
99     ~unbuffered_channel() {
100         close();
101         slot * s = nullptr;
102         if ( nullptr != ( s = try_pop_() ) ) {
103             BOOST_ASSERT( nullptr != s);
104             BOOST_ASSERT( nullptr != s->ctx);
105             // value will be destructed in the context of the waiting fiber
106             context::active()->schedule( s->ctx);
107         }
108     }
109
110     unbuffered_channel( unbuffered_channel const&) = delete;
111     unbuffered_channel & operator=( unbuffered_channel const&) = delete;
112
113     bool is_closed() const noexcept {
114         return closed_.load( std::memory_order_acquire);
115     }
116
117     void close() noexcept {
118         context * active_ctx = context::active();
119         // notify all waiting producers
120         closed_.store( true, std::memory_order_release);
121         detail::spinlock_lock lk1{ splk_producers_ };
122         while ( ! waiting_producers_.empty() ) {
123             context * producer_ctx = & waiting_producers_.front();
124             waiting_producers_.pop_front();
125             active_ctx->schedule( producer_ctx);
126         }
127         // notify all waiting consumers
128         detail::spinlock_lock lk2{ splk_consumers_ };
129         while ( ! waiting_consumers_.empty() ) {
130             context * consumer_ctx = & waiting_consumers_.front();
131             waiting_consumers_.pop_front();
132             active_ctx->schedule( consumer_ctx);
133         }
134     }
135
136     channel_op_status push( value_type const& value) {
137         context * active_ctx = context::active();
138         slot s{ value, active_ctx };
139         for (;;) {
140             if ( is_closed() ) {
141                 return channel_op_status::closed;
142             }
143             if ( try_push_( & s) ) {
144                 detail::spinlock_lock lk{ splk_consumers_ };
145                 // notify one waiting consumer
146                 if ( ! waiting_consumers_.empty() ) {
147                     context * consumer_ctx = & waiting_consumers_.front();
148                     waiting_consumers_.pop_front();
149                     active_ctx->schedule( consumer_ctx);
150                 }
151                 // suspend till value has been consumed
152                 active_ctx->suspend( lk);
153                 // resumed, value has been consumed
154                 return channel_op_status::success;
155             } else {
156                 detail::spinlock_lock lk{ splk_producers_ };
157                 if ( is_closed() ) {
158                     return channel_op_status::closed;
159                 }
160                 if ( is_empty_() ) {
161                     continue;
162                 }
163                 active_ctx->wait_link( waiting_producers_);
164                 // suspend this producer
165                 active_ctx->suspend( lk);
166                 // resumed, slot mabye free
167             }
168         }
169     }
170
171     channel_op_status push( value_type && value) {
172         context * active_ctx = context::active();
173         slot s{ std::move( value), active_ctx };
174         for (;;) {
175             if ( is_closed() ) {
176                 return channel_op_status::closed;
177             }
178             if ( try_push_( & s) ) {
179                 detail::spinlock_lock lk{ splk_consumers_ };
180                 // notify one waiting consumer
181                 if ( ! waiting_consumers_.empty() ) {
182                     context * consumer_ctx = & waiting_consumers_.front();
183                     waiting_consumers_.pop_front();
184                     active_ctx->schedule( consumer_ctx);
185                 }
186                 // suspend till value has been consumed
187                 active_ctx->suspend( lk);
188                 // resumed, value has been consumed
189                 return channel_op_status::success;
190             } else {
191                 detail::spinlock_lock lk{ splk_producers_ };
192                 if ( is_closed() ) {
193                     return channel_op_status::closed;
194                 }
195                 if ( is_empty_() ) {
196                     continue;
197                 }
198                 active_ctx->wait_link( waiting_producers_);
199                 // suspend this producer
200                 active_ctx->suspend( lk);
201                 // resumed, slot mabye free
202             }
203         }
204     }
205
206     template< typename Rep, typename Period >
207     channel_op_status push_wait_for( value_type const& value,
208                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
209         return push_wait_until( value,
210                                 std::chrono::steady_clock::now() + timeout_duration);
211     }
212
213     template< typename Rep, typename Period >
214     channel_op_status push_wait_for( value_type && value,
215                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
216         return push_wait_until( std::forward< value_type >( value),
217                                 std::chrono::steady_clock::now() + timeout_duration);
218     }
219
220     template< typename Clock, typename Duration >
221     channel_op_status push_wait_until( value_type const& value,
222                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
223         context * active_ctx = context::active();
224         slot s{ value, active_ctx };
225         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
226         for (;;) {
227             if ( is_closed() ) {
228                 return channel_op_status::closed;
229             }
230             if ( try_push_( & s) ) {
231                 detail::spinlock_lock lk{ splk_consumers_ };
232                 // notify one waiting consumer
233                 if ( ! waiting_consumers_.empty() ) {
234                     context * consumer_ctx = & waiting_consumers_.front();
235                     waiting_consumers_.pop_front();
236                     active_ctx->schedule( consumer_ctx);
237                 }
238                 // suspend this producer
239                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
240                     // clear slot
241                     slot * nil_slot = nullptr, * own_slot = & s;
242                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
243                     // resumed, value has not been consumed
244                     return channel_op_status::timeout;
245                 }
246                 // resumed, value has been consumed
247                 return channel_op_status::success;
248             } else {
249                 detail::spinlock_lock lk{ splk_producers_ };
250                 if ( is_closed() ) {
251                     return channel_op_status::closed;
252                 }
253                 if ( is_empty_() ) {
254                     continue;
255                 }
256                 active_ctx->wait_link( waiting_producers_);
257                 // suspend this producer
258                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
259                     // relock local lk
260                     lk.lock();
261                     // remove from waiting-queue
262                     waiting_producers_.remove( * active_ctx);
263                     return channel_op_status::timeout;
264                 }
265                 // resumed, slot maybe free
266             }
267         }
268     }
269
270     template< typename Clock, typename Duration >
271     channel_op_status push_wait_until( value_type && value,
272                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
273         context * active_ctx = context::active();
274         slot s{ std::move( value), active_ctx };
275         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
276         for (;;) {
277             if ( is_closed() ) {
278                 return channel_op_status::closed;
279             }
280             if ( try_push_( & s) ) {
281                 detail::spinlock_lock lk{ splk_consumers_ };
282                 // notify one waiting consumer
283                 if ( ! waiting_consumers_.empty() ) {
284                     context * consumer_ctx = & waiting_consumers_.front();
285                     waiting_consumers_.pop_front();
286                     active_ctx->schedule( consumer_ctx);
287                 }
288                 // suspend this producer
289                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
290                     // clear slot
291                     slot * nil_slot = nullptr, * own_slot = & s;
292                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
293                     // resumed, value has not been consumed
294                     return channel_op_status::timeout;
295                 }
296                 // resumed, value has been consumed
297                 return channel_op_status::success;
298             } else {
299                 detail::spinlock_lock lk{ splk_producers_ };
300                 if ( is_closed() ) {
301                     return channel_op_status::closed;
302                 }
303                 if ( is_empty_() ) {
304                     continue;
305                 }
306                 active_ctx->wait_link( waiting_producers_);
307                 // suspend this producer
308                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
309                     // relock local lk
310                     lk.lock();
311                     // remove from waiting-queue
312                     waiting_producers_.remove( * active_ctx);
313                     return channel_op_status::timeout;
314                 }
315                 // resumed, slot maybe free
316             }
317         }
318     }
319
320     channel_op_status pop( value_type & value) {
321         context * active_ctx = context::active();
322         slot * s = nullptr;
323         for (;;) {
324             if ( nullptr != ( s = try_pop_() ) ) {
325                 {
326                     detail::spinlock_lock lk{ splk_producers_ };
327                     // notify one waiting producer
328                     if ( ! waiting_producers_.empty() ) {
329                         context * producer_ctx = & waiting_producers_.front();
330                         waiting_producers_.pop_front();
331                         lk.unlock();
332                         active_ctx->schedule( producer_ctx);
333                     }
334                 }
335                 // consume value
336                 value = std::move( s->value);
337                 // resume suspended producer
338                 active_ctx->schedule( s->ctx);
339                 return channel_op_status::success;
340             } else {
341                 detail::spinlock_lock lk{ splk_consumers_ };
342                 if ( is_closed() ) {
343                     return channel_op_status::closed;
344                 }
345                 if ( ! is_empty_() ) {
346                     continue;
347                 }
348                 active_ctx->wait_link( waiting_consumers_);
349                 // suspend this consumer
350                 active_ctx->suspend( lk);
351                 // resumed, slot mabye set
352             }
353         }
354     }
355
356     value_type value_pop() {
357         context * active_ctx = context::active();
358         slot * s = nullptr;
359         for (;;) {
360             if ( nullptr != ( s = try_pop_() ) ) {
361                 {
362                     detail::spinlock_lock lk{ splk_producers_ };
363                     // notify one waiting producer
364                     if ( ! waiting_producers_.empty() ) {
365                         context * producer_ctx = & waiting_producers_.front();
366                         waiting_producers_.pop_front();
367                         lk.unlock();
368                         active_ctx->schedule( producer_ctx);
369                     }
370                 }
371                 // consume value
372                 value_type value{ std::move( s->value) };
373                 // resume suspended producer
374                 active_ctx->schedule( s->ctx);
375                 return std::move( value);
376             } else {
377                 detail::spinlock_lock lk{ splk_consumers_ };
378                 if ( is_closed() ) {
379                     throw fiber_error{
380                             std::make_error_code( std::errc::operation_not_permitted),
381                             "boost fiber: channel is closed" };
382                 }
383                 if ( ! is_empty_() ) {
384                     continue;
385                 }
386                 active_ctx->wait_link( waiting_consumers_);
387                 // suspend this consumer
388                 active_ctx->suspend( lk);
389                 // resumed, slot mabye set
390             }
391         }
392     }
393
394     template< typename Rep, typename Period >
395     channel_op_status pop_wait_for( value_type & value,
396                                     std::chrono::duration< Rep, Period > const& timeout_duration) {
397         return pop_wait_until( value,
398                                std::chrono::steady_clock::now() + timeout_duration);
399     }
400
401     template< typename Clock, typename Duration >
402     channel_op_status pop_wait_until( value_type & value,
403                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {
404         context * active_ctx = context::active();
405         slot * s = nullptr;
406         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
407         for (;;) {
408             if ( nullptr != ( s = try_pop_() ) ) {
409                 {
410                     detail::spinlock_lock lk{ splk_producers_ };
411                     // notify one waiting producer
412                     if ( ! waiting_producers_.empty() ) {
413                         context * producer_ctx = & waiting_producers_.front();
414                         waiting_producers_.pop_front();
415                         lk.unlock();
416                         active_ctx->schedule( producer_ctx);
417                     }
418                 }
419                 // consume value
420                 value = std::move( s->value);
421                 // resume suspended producer
422                 active_ctx->schedule( s->ctx);
423                 return channel_op_status::success;
424             } else {
425                 detail::spinlock_lock lk{ splk_consumers_ };
426                 if ( is_closed() ) {
427                     return channel_op_status::closed;
428                 }
429                 if ( ! is_empty_() ) {
430                     continue;
431                 }
432                 active_ctx->wait_link( waiting_consumers_);
433                 // suspend this consumer
434                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
435                     // relock local lk
436                     lk.lock();
437                     // remove from waiting-queue
438                     waiting_consumers_.remove( * active_ctx);
439                     return channel_op_status::timeout;
440                 }
441             }
442         }
443     }
444
445     class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
446     private:
447         typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type  storage_type;
448
449         unbuffered_channel  *   chan_{ nullptr };
450         storage_type            storage_;
451
452         void increment_() {
453             BOOST_ASSERT( nullptr != chan_);
454             try {
455                 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
456             } catch ( fiber_error const&) {
457                 chan_ = nullptr;
458             }
459         }
460
461     public:
462         typedef typename iterator::pointer pointer_t;
463         typedef typename iterator::reference reference_t;
464
465         iterator() noexcept = default;
466
467         explicit iterator( unbuffered_channel< T > * chan) noexcept :
468             chan_{ chan } {
469             increment_();
470         }
471
472         iterator( iterator const& other) noexcept :
473             chan_{ other.chan_ } {
474         }
475
476         iterator & operator=( iterator const& other) noexcept {
477             if ( this == & other) return * this;
478             chan_ = other.chan_;
479             return * this;
480         }
481
482         bool operator==( iterator const& other) const noexcept {
483             return other.chan_ == chan_;
484         }
485
486         bool operator!=( iterator const& other) const noexcept {
487             return other.chan_ != chan_;
488         }
489
490         iterator & operator++() {
491             increment_();
492             return * this;
493         }
494
495         iterator operator++( int) = delete;
496
497         reference_t operator*() noexcept {
498             return * reinterpret_cast< value_type * >( std::addressof( storage_) );
499         }
500
501         pointer_t operator->() noexcept {
502             return reinterpret_cast< value_type * >( std::addressof( storage_) );
503         }
504     };
505
506     friend class iterator;
507 };
508
509 template< typename T >
510 typename unbuffered_channel< T >::iterator
511 begin( unbuffered_channel< T > & chan) {
512     return typename unbuffered_channel< T >::iterator( & chan);
513 }
514
515 template< typename T >
516 typename unbuffered_channel< T >::iterator
517 end( unbuffered_channel< T > &) {
518     return typename unbuffered_channel< T >::iterator();
519 }
520
521 }}
522
523 #ifdef BOOST_HAS_ABI_HEADERS
524 #  include BOOST_ABI_SUFFIX
525 #endif
526
527 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H