Imported Upstream version 1.72.0
[platform/upstream/boost.git] / boost / fiber / buffered_channel.hpp
1
2 //          Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
10
11 #include <atomic>
12 #include <chrono>
13 #include <cstddef>
14 #include <cstdint>
15 #include <memory>
16 #include <type_traits>
17
18 #include <boost/config.hpp>
19
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/detail/config.hpp>
23 #include <boost/fiber/detail/convert.hpp>
24 #include <boost/fiber/detail/spinlock.hpp>
25 #include <boost/fiber/exceptions.hpp>
26
27 #ifdef BOOST_HAS_ABI_HEADERS
28 #  include BOOST_ABI_PREFIX
29 #endif
30
31 namespace boost {
32 namespace fibers {
33
34 template< typename T >
35 class buffered_channel {
36 public:
37     typedef typename std::remove_reference< T >::type   value_type;
38
39 private:
40     typedef context::wait_queue_t                       wait_queue_type;
41         typedef value_type                                  slot_type;
42
43     mutable detail::spinlock   splk_{};
44     wait_queue_type                                     waiting_producers_{};
45     wait_queue_type                                     waiting_consumers_{};
46         slot_type                                       *   slots_;
47         std::size_t                                         pidx_{ 0 };
48         std::size_t                                         cidx_{ 0 };
49         std::size_t                                         capacity_;
50     bool                                                closed_{ false };
51
52         bool is_full_() const noexcept {
53                 return cidx_ == ((pidx_ + 1) % capacity_);
54         }
55
56         bool is_empty_() const noexcept {
57                 return cidx_ == pidx_;
58         }
59
60     bool is_closed_() const noexcept {
61         return closed_;
62     }
63
64 public:
65     explicit buffered_channel( std::size_t capacity) :
66             capacity_{ capacity } {
67         if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) { 
68             throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69                                "boost fiber: buffer capacity is invalid" };
70         }
71         slots_ = new slot_type[capacity_];
72     }
73
74     ~buffered_channel() {
75         close();
76         delete [] slots_;
77     }
78
79     buffered_channel( buffered_channel const&) = delete;
80     buffered_channel & operator=( buffered_channel const&) = delete;
81
82     bool is_closed() const noexcept {
83         detail::spinlock_lock lk{ splk_ };
84         return is_closed_();
85     }
86
87     void close() noexcept {
88         context * active_ctx = context::active();
89         detail::spinlock_lock lk{ splk_ };
90         if ( ! closed_) {
91             closed_ = true;
92             // notify all waiting producers
93             while ( ! waiting_producers_.empty() ) {
94                 context * producer_ctx = & waiting_producers_.front();
95                 waiting_producers_.pop_front();
96                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
97                 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
98                     // notify context
99                     active_ctx->schedule( producer_ctx);
100                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
101                     // no timed-wait op.
102                     // notify context
103                     active_ctx->schedule( producer_ctx);
104                 }
105             }
106             // notify all waiting consumers
107             while ( ! waiting_consumers_.empty() ) {
108                 context * consumer_ctx = & waiting_consumers_.front();
109                 waiting_consumers_.pop_front();
110                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
111                 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
112                     // notify context
113                     active_ctx->schedule( consumer_ctx);
114                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
115                     // no timed-wait op.
116                     // notify context
117                     active_ctx->schedule( consumer_ctx);
118                 }
119             }
120         }
121     }
122
123     channel_op_status try_push( value_type const& value) {
124         context * active_ctx = context::active();
125         detail::spinlock_lock lk{ splk_ };
126         if ( BOOST_UNLIKELY( is_closed_() ) ) {
127             return channel_op_status::closed;
128         } else if ( is_full_() ) {
129             return channel_op_status::full;
130         } else {
131             slots_[pidx_] = value;
132             pidx_ = (pidx_ + 1) % capacity_;
133             // notify one waiting consumer
134             while ( ! waiting_consumers_.empty() ) {
135                 context * consumer_ctx = & waiting_consumers_.front();
136                 waiting_consumers_.pop_front();
137                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
138                 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
139                     lk.unlock();
140                     // notify context
141                     active_ctx->schedule( consumer_ctx);
142                     break;
143                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
144                     lk.unlock();
145                     // no timed-wait op.
146                     // notify context
147                     active_ctx->schedule( consumer_ctx);
148                     break;
149                 }
150             }
151             return channel_op_status::success;
152         }
153     }
154
155     channel_op_status try_push( value_type && value) {
156         context * active_ctx = context::active();
157         detail::spinlock_lock lk{ splk_ };
158         if ( BOOST_UNLIKELY( is_closed_() ) ) {
159             return channel_op_status::closed;
160         } else if ( is_full_() ) {
161             return channel_op_status::full;
162         } else {
163             slots_[pidx_] = std::move( value);
164             pidx_ = (pidx_ + 1) % capacity_;
165             // notify one waiting consumer
166             while ( ! waiting_consumers_.empty() ) {
167                 context * consumer_ctx = & waiting_consumers_.front();
168                 waiting_consumers_.pop_front();
169                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
170                 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
171                     lk.unlock();
172                     // notify context
173                     active_ctx->schedule( consumer_ctx);
174                     break;
175                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
176                     lk.unlock();
177                     // no timed-wait op.
178                     // notify context
179                     active_ctx->schedule( consumer_ctx);
180                     break;
181                 }
182             }
183             return channel_op_status::success;
184         }
185     }
186
187     channel_op_status push( value_type const& value) {
188         context * active_ctx = context::active();
189         for (;;) {
190             detail::spinlock_lock lk{ splk_ };
191             if ( BOOST_UNLIKELY( is_closed_() ) ) {
192                 return channel_op_status::closed;
193             } else if ( is_full_() ) {
194                 active_ctx->wait_link( waiting_producers_);
195                 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
196                 // suspend this producer
197                 active_ctx->suspend( lk);
198             } else {
199                 slots_[pidx_] = value;
200                 pidx_ = (pidx_ + 1) % capacity_;
201                 // notify one waiting consumer
202                 while ( ! waiting_consumers_.empty() ) {
203                     context * consumer_ctx = & waiting_consumers_.front();
204                     waiting_consumers_.pop_front();
205                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
206                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
207                         lk.unlock();
208                         // notify context
209                         active_ctx->schedule( consumer_ctx);
210                         break;
211                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
212                         lk.unlock();
213                         // no timed-wait op.
214                         // notify context
215                         active_ctx->schedule( consumer_ctx);
216                         break;
217                     }
218                 }
219                 return channel_op_status::success;
220             }
221         }
222     }
223
224     channel_op_status push( value_type && value) {
225         context * active_ctx = context::active();
226         for (;;) {
227             detail::spinlock_lock lk{ splk_ };
228             if ( BOOST_UNLIKELY( is_closed_() ) ) {
229                 return channel_op_status::closed;
230             } else if ( is_full_() ) {
231                 active_ctx->wait_link( waiting_producers_);
232                 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
233                 // suspend this producer
234                 active_ctx->suspend( lk);
235             } else {
236                 slots_[pidx_] = std::move( value);
237                 pidx_ = (pidx_ + 1) % capacity_;
238                 // notify one waiting consumer
239                 while ( ! waiting_consumers_.empty() ) {
240                     context * consumer_ctx = & waiting_consumers_.front();
241                     waiting_consumers_.pop_front();
242                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
243                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
244                         lk.unlock();
245                         // notify context
246                         active_ctx->schedule( consumer_ctx);
247                         break;
248                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
249                         lk.unlock();
250                         // no timed-wait op.
251                         // notify context
252                         active_ctx->schedule( consumer_ctx);
253                         break;
254                     }
255                 }
256                 return channel_op_status::success;
257             }
258         }
259     }
260
261     template< typename Rep, typename Period >
262     channel_op_status push_wait_for( value_type const& value,
263                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
264         return push_wait_until( value,
265                                 std::chrono::steady_clock::now() + timeout_duration);
266     }
267
268     template< typename Rep, typename Period >
269     channel_op_status push_wait_for( value_type && value,
270                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
271         return push_wait_until( std::forward< value_type >( value),
272                                 std::chrono::steady_clock::now() + timeout_duration);
273     }
274
275     template< typename Clock, typename Duration >
276     channel_op_status push_wait_until( value_type const& value,
277                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
278         context * active_ctx = context::active();
279         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
280         for (;;) {
281             detail::spinlock_lock lk{ splk_ };
282             if ( BOOST_UNLIKELY( is_closed_() ) ) {
283                 return channel_op_status::closed;
284             } else if ( is_full_() ) {
285                 active_ctx->wait_link( waiting_producers_);
286                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
287                 // suspend this producer
288                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
289                     // relock local lk
290                     lk.lock();
291                     // remove from waiting-queue
292                     waiting_producers_.remove( * active_ctx);
293                     return channel_op_status::timeout;
294                 }
295             } else {
296                 slots_[pidx_] = value;
297                 pidx_ = (pidx_ + 1) % capacity_;
298                 // notify one waiting consumer
299                 while ( ! waiting_consumers_.empty() ) {
300                     context * consumer_ctx = & waiting_consumers_.front();
301                     waiting_consumers_.pop_front();
302                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
303                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
304                         lk.unlock();
305                         // notify context
306                         active_ctx->schedule( consumer_ctx);
307                         break;
308                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
309                         lk.unlock();
310                         // no timed-wait op.
311                         // notify context
312                         active_ctx->schedule( consumer_ctx);
313                         break;
314                     }
315                 }
316                 return channel_op_status::success;
317             }
318         }
319     }
320
321     template< typename Clock, typename Duration >
322     channel_op_status push_wait_until( value_type && value,
323                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
324         context * active_ctx = context::active();
325         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
326         for (;;) {
327             detail::spinlock_lock lk{ splk_ };
328             if ( BOOST_UNLIKELY( is_closed_() ) ) {
329                 return channel_op_status::closed;
330             } else if ( is_full_() ) {
331                 active_ctx->wait_link( waiting_producers_);
332                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
333                 // suspend this producer
334                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
335                     // relock local lk
336                     lk.lock();
337                     // remove from waiting-queue
338                     waiting_producers_.remove( * active_ctx);
339                     return channel_op_status::timeout;
340                 }
341             } else {
342                 slots_[pidx_] = std::move( value);
343                 pidx_ = (pidx_ + 1) % capacity_;
344                 // notify one waiting consumer
345                 while ( ! waiting_consumers_.empty() ) {
346                     context * consumer_ctx = & waiting_consumers_.front();
347                     waiting_consumers_.pop_front();
348                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
349                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
350                         lk.unlock();
351                         // notify context
352                         active_ctx->schedule( consumer_ctx);
353                         break;
354                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
355                         lk.unlock();
356                         // no timed-wait op.
357                         // notify context
358                         active_ctx->schedule( consumer_ctx);
359                         break;
360                     }
361                 }
362                 return channel_op_status::success;
363             }
364         }
365     }
366
367     channel_op_status try_pop( value_type & value) {
368         context * active_ctx = context::active();
369         detail::spinlock_lock lk{ splk_ };
370         if ( is_empty_() ) {
371             return is_closed_()
372                 ? channel_op_status::closed
373                 : channel_op_status::empty;
374         } else {
375             value = std::move( slots_[cidx_]);
376             cidx_ = (cidx_ + 1) % capacity_;
377             // notify one waiting producer
378             while ( ! waiting_producers_.empty() ) {
379                 context * producer_ctx = & waiting_producers_.front();
380                 waiting_producers_.pop_front();
381                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
382                 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
383                     lk.unlock();
384                     // notify context
385                     active_ctx->schedule( producer_ctx);
386                     break;
387                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
388                     lk.unlock();
389                     // no timed-wait op.
390                     // notify context
391                     active_ctx->schedule( producer_ctx);
392                     break;
393                 }
394             }
395             return channel_op_status::success;
396         }
397     }
398
399     channel_op_status pop( value_type & value) {
400         context * active_ctx = context::active();
401         for (;;) {
402             detail::spinlock_lock lk{ splk_ };
403             if ( is_empty_() ) {
404                 if ( BOOST_UNLIKELY( is_closed_() ) ) {
405                     return channel_op_status::closed;
406                 } else {
407                     active_ctx->wait_link( waiting_consumers_);
408                     active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
409                     // suspend this consumer
410                     active_ctx->suspend( lk);
411                 }
412             } else {
413                 value = std::move( slots_[cidx_]);
414                 cidx_ = (cidx_ + 1) % capacity_;
415                 // notify one waiting producer
416                 while ( ! waiting_producers_.empty() ) {
417                     context * producer_ctx = & waiting_producers_.front();
418                     waiting_producers_.pop_front();
419                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
420                     if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
421                         lk.unlock();
422                         // notify context
423                         active_ctx->schedule( producer_ctx);
424                         break;
425                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
426                         lk.unlock();
427                         // no timed-wait op.
428                         // notify context
429                         active_ctx->schedule( producer_ctx);
430                         break;
431                     }
432                 }
433                 return channel_op_status::success;
434             }
435         }
436     }
437
438     value_type value_pop() {
439         context * active_ctx = context::active();
440         for (;;) {
441             detail::spinlock_lock lk{ splk_ };
442             if ( is_empty_() ) {
443                 if ( BOOST_UNLIKELY( is_closed_() ) ) {
444                     throw fiber_error{
445                         std::make_error_code( std::errc::operation_not_permitted),
446                         "boost fiber: channel is closed" };
447                 } else {
448                     active_ctx->wait_link( waiting_consumers_);
449                     active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
450                     // suspend this consumer
451                     active_ctx->suspend( lk);
452                 }
453             } else {
454                 value_type value = std::move( slots_[cidx_]);
455                 cidx_ = (cidx_ + 1) % capacity_;
456                 // notify one waiting producer
457                 while ( ! waiting_producers_.empty() ) {
458                     context * producer_ctx = & waiting_producers_.front();
459                     waiting_producers_.pop_front();
460                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
461                     if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
462                         lk.unlock();
463                         // notify context
464                         active_ctx->schedule( producer_ctx);
465                         break;
466                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
467                         lk.unlock();
468                         // no timed-wait op.
469                         // notify context
470                         active_ctx->schedule( producer_ctx);
471                         break;
472                     }
473                 }
474                 return std::move( value);
475             }
476         }
477     }
478
479     template< typename Rep, typename Period >
480     channel_op_status pop_wait_for( value_type & value,
481                                     std::chrono::duration< Rep, Period > const& timeout_duration) {
482         return pop_wait_until( value,
483                                std::chrono::steady_clock::now() + timeout_duration);
484     }
485
486     template< typename Clock, typename Duration >
487     channel_op_status pop_wait_until( value_type & value,
488                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {
489         context * active_ctx = context::active();
490         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
491         for (;;) {
492             detail::spinlock_lock lk{ splk_ };
493             if ( is_empty_() ) {
494                 if ( BOOST_UNLIKELY( is_closed_() ) ) {
495                     return channel_op_status::closed;
496                 } else {
497                     active_ctx->wait_link( waiting_consumers_);
498                     active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
499                     // suspend this consumer
500                     if ( ! active_ctx->wait_until( timeout_time, lk) ) {
501                         // relock local lk
502                         lk.lock();
503                         // remove from waiting-queue
504                         waiting_consumers_.remove( * active_ctx);
505                         return channel_op_status::timeout;
506                     }
507                 }
508             } else {
509                 value = std::move( slots_[cidx_]);
510                 cidx_ = (cidx_ + 1) % capacity_;
511                 // notify one waiting producer
512                 while ( ! waiting_producers_.empty() ) {
513                     context * producer_ctx = & waiting_producers_.front();
514                     waiting_producers_.pop_front();
515                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
516                     if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
517                         lk.unlock();
518                         // notify context
519                         active_ctx->schedule( producer_ctx);
520                         break;
521                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
522                         lk.unlock();
523                         // no timed-wait op.
524                         // notify context
525                         active_ctx->schedule( producer_ctx);
526                         break;
527                     }
528                 }
529                 return channel_op_status::success;
530             }
531         }
532     }
533
534     class iterator {
535     private:
536         typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type  storage_type;
537
538         buffered_channel    *   chan_{ nullptr };
539         storage_type            storage_;
540
541         void increment_() {
542             BOOST_ASSERT( nullptr != chan_);
543             try {
544                 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
545             } catch ( fiber_error const&) {
546                 chan_ = nullptr;
547             }
548         }
549
550     public:
551         typedef std::input_iterator_tag                     iterator_category;
552         typedef std::ptrdiff_t                              difference_type;
553         typedef value_type                              *   pointer;
554         typedef value_type                              &   reference;
555
556         typedef pointer     pointer_t;
557         typedef reference   reference_t;
558
559         iterator() noexcept = default;
560
561         explicit iterator( buffered_channel< T > * chan) noexcept :
562             chan_{ chan } {
563             increment_();
564         }
565
566         iterator( iterator const& other) noexcept :
567             chan_{ other.chan_ } {
568         }
569
570         iterator & operator=( iterator const& other) noexcept {
571             if ( BOOST_LIKELY( this != & other) ) {
572                 chan_ = other.chan_;
573             }
574             return * this;
575         }
576
577         bool operator==( iterator const& other) const noexcept {
578             return other.chan_ == chan_;
579         }
580
581         bool operator!=( iterator const& other) const noexcept {
582             return other.chan_ != chan_;
583         }
584
585         iterator & operator++() {
586             reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
587             increment_();
588             return * this;
589         }
590
591         iterator operator++( int) = delete;
592
593         reference_t operator*() noexcept {
594             return * reinterpret_cast< value_type * >( std::addressof( storage_) );
595         }
596
597         pointer_t operator->() noexcept {
598             return reinterpret_cast< value_type * >( std::addressof( storage_) );
599         }
600     };
601
602     friend class iterator;
603 };
604
605 template< typename T >
606 typename buffered_channel< T >::iterator
607 begin( buffered_channel< T > & chan) {
608     return typename buffered_channel< T >::iterator( & chan);
609 }
610
611 template< typename T >
612 typename buffered_channel< T >::iterator
613 end( buffered_channel< T > &) {
614     return typename buffered_channel< T >::iterator();
615 }
616
617 }}
618
619 #ifdef BOOST_HAS_ABI_HEADERS
620 #  include BOOST_ABI_SUFFIX
621 #endif
622
623 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H