Imported Upstream version 1.72.0
[platform/upstream/boost.git] / boost / fiber / unbuffered_channel.hpp
1
2 //          Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
9
10 #include <atomic>
11 #include <chrono>
12 #include <cstddef>
13 #include <cstdint>
14 #include <memory>
15 #include <vector>
16
17 #include <boost/config.hpp>
18
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
24 #include <boost/fiber/detail/exchange.hpp>
25 #endif
26 #include <boost/fiber/detail/spinlock.hpp>
27 #include <boost/fiber/exceptions.hpp>
28
29 #ifdef BOOST_HAS_ABI_HEADERS
30 #  include BOOST_ABI_PREFIX
31 #endif
32
33 namespace boost {
34 namespace fibers {
35
36 template< typename T >
37 class unbuffered_channel {
38 public:
39     typedef typename std::remove_reference< T >::type   value_type;
40
41 private:
42     typedef context::wait_queue_t   wait_queue_type;
43
44     struct slot {
45         value_type  value;
46         context *   ctx;
47
48         slot( value_type const& value_, context * ctx_) :
49             value{ value_ },
50             ctx{ ctx_ } {
51         }
52
53         slot( value_type && value_, context * ctx_) :
54             value{ std::move( value_) },
55             ctx{ ctx_ } {
56         }
57     };
58
59     // shared cacheline
60     std::atomic< slot * >       slot_{ nullptr };
61     // shared cacheline
62     std::atomic_bool            closed_{ false };
63     mutable detail::spinlock    splk_producers_{};
64     wait_queue_type             waiting_producers_{};
65     mutable detail::spinlock    splk_consumers_{};
66     wait_queue_type             waiting_consumers_{};
67     char                        pad_[cacheline_length];
68
69     bool is_empty_() {
70         return nullptr == slot_.load( std::memory_order_acquire);
71     }
72
73     bool try_push_( slot * own_slot) {
74         for (;;) {
75             slot * s = slot_.load( std::memory_order_acquire);
76             if ( nullptr == s) {
77                 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
78                     continue;
79                 }
80                 return true;
81             } else {
82                 return false;
83             }
84         }
85     }
86
87     slot * try_pop_() {
88         slot * nil_slot = nullptr;
89         for (;;) {
90             slot * s = slot_.load( std::memory_order_acquire);
91             if ( nullptr != s) {
92                 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
93                     continue;}
94             }
95             return s;
96         }
97     }
98
99 public:
100     unbuffered_channel() {
101     }
102
103     ~unbuffered_channel() {
104         close();
105     }
106
107     unbuffered_channel( unbuffered_channel const&) = delete;
108     unbuffered_channel & operator=( unbuffered_channel const&) = delete;
109
110     bool is_closed() const noexcept {
111         return closed_.load( std::memory_order_acquire);
112     }
113
114     void close() noexcept {
115         context * active_ctx = context::active();
116         // set flag
117         if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
118             // notify current waiting  
119             slot * s = slot_.load( std::memory_order_acquire);
120             if ( nullptr != s) {
121                 // notify context
122                 active_ctx->schedule( s->ctx);
123             }
124             // notify all waiting producers
125             detail::spinlock_lock lk1{ splk_producers_ };
126             while ( ! waiting_producers_.empty() ) {
127                 context * producer_ctx = & waiting_producers_.front();
128                 waiting_producers_.pop_front();
129                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
130                 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
131                     // notify context
132                     active_ctx->schedule( producer_ctx);
133                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
134                     // no timed-wait op.
135                     // notify context
136                     active_ctx->schedule( producer_ctx);
137                 }
138             }
139             // notify all waiting consumers
140             detail::spinlock_lock lk2{ splk_consumers_ };
141             while ( ! waiting_consumers_.empty() ) {
142                 context * consumer_ctx = & waiting_consumers_.front();
143                 waiting_consumers_.pop_front();
144                 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
145                 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
146                     // notify context
147                     active_ctx->schedule( consumer_ctx);
148                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
149                     // no timed-wait op.
150                     // notify context
151                     active_ctx->schedule( consumer_ctx);
152                 }
153             }
154         }
155     }
156
157     channel_op_status push( value_type const& value) {
158         context * active_ctx = context::active();
159         slot s{ value, active_ctx };
160         for (;;) {
161             if ( BOOST_UNLIKELY( is_closed() ) ) {
162                 return channel_op_status::closed;
163             }
164             if ( try_push_( & s) ) {
165                 detail::spinlock_lock lk{ splk_consumers_ };
166                 // notify one waiting consumer
167                 while ( ! waiting_consumers_.empty() ) {
168                     context * consumer_ctx = & waiting_consumers_.front();
169                     waiting_consumers_.pop_front();
170                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
171                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
172                         // notify context
173                         active_ctx->schedule( consumer_ctx);
174                         break;
175                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
176                         // no timed-wait op.
177                         // notify context
178                         active_ctx->schedule( consumer_ctx);
179                         break;
180                     }
181                 }
182                 // suspend till value has been consumed
183                 active_ctx->suspend( lk);
184                 // resumed
185                 if ( nullptr == s.ctx) {
186                     // value has been consumed
187                     return channel_op_status::success;
188                 } else {
189                     // channel was closed before value was consumed
190                     return channel_op_status::closed;
191                 }
192             } else {
193                 detail::spinlock_lock lk{ splk_producers_ };
194                 if ( BOOST_UNLIKELY( is_closed() ) ) {
195                     return channel_op_status::closed;
196                 }
197                 if ( is_empty_() ) {
198                     continue;
199                 }
200                 active_ctx->wait_link( waiting_producers_);
201                 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
202                 // suspend this producer
203                 active_ctx->suspend( lk);
204                 // resumed, slot mabye free
205             }
206         }
207     }
208
209     channel_op_status push( value_type && value) {
210         context * active_ctx = context::active();
211         slot s{ std::move( value), active_ctx };
212         for (;;) {
213             if ( BOOST_UNLIKELY( is_closed() ) ) {
214                 return channel_op_status::closed;
215             }
216             if ( try_push_( & s) ) {
217                 detail::spinlock_lock lk{ splk_consumers_ };
218                 // notify one waiting consumer
219                 while ( ! waiting_consumers_.empty() ) {
220                     context * consumer_ctx = & waiting_consumers_.front();
221                     waiting_consumers_.pop_front();
222                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
223                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
224                         // notify context
225                         active_ctx->schedule( consumer_ctx);
226                         break;
227                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
228                         // no timed-wait op.
229                         // notify context
230                         active_ctx->schedule( consumer_ctx);
231                         break;
232                     }
233                 }
234                 // suspend till value has been consumed
235                 active_ctx->suspend( lk);
236                 // resumed
237                 if ( nullptr == s.ctx) {
238                     // value has been consumed
239                     return channel_op_status::success;
240                 } else {
241                     // channel was closed before value was consumed
242                     return channel_op_status::closed;
243                 }
244             } else {
245                 detail::spinlock_lock lk{ splk_producers_ };
246                 if ( BOOST_UNLIKELY( is_closed() ) ) {
247                     return channel_op_status::closed;
248                 }
249                 if ( is_empty_() ) {
250                     continue;
251                 }
252                 active_ctx->wait_link( waiting_producers_);
253                 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
254                 // suspend this producer
255                 active_ctx->suspend( lk);
256                 // resumed, slot mabye free
257             }
258         }
259     }
260
261     template< typename Rep, typename Period >
262     channel_op_status push_wait_for( value_type const& value,
263                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
264         return push_wait_until( value,
265                                 std::chrono::steady_clock::now() + timeout_duration);
266     }
267
268     template< typename Rep, typename Period >
269     channel_op_status push_wait_for( value_type && value,
270                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
271         return push_wait_until( std::forward< value_type >( value),
272                                 std::chrono::steady_clock::now() + timeout_duration);
273     }
274
275     template< typename Clock, typename Duration >
276     channel_op_status push_wait_until( value_type const& value,
277                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
278         context * active_ctx = context::active();
279         slot s{ value, active_ctx };
280         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
281         for (;;) {
282             if ( BOOST_UNLIKELY( is_closed() ) ) {
283                 return channel_op_status::closed;
284             }
285             if ( try_push_( & s) ) {
286                 detail::spinlock_lock lk{ splk_consumers_ };
287                 // notify one waiting consumer
288                 while ( ! waiting_consumers_.empty() ) {
289                     context * consumer_ctx = & waiting_consumers_.front();
290                     waiting_consumers_.pop_front();
291                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
292                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
293                         // notify context
294                         active_ctx->schedule( consumer_ctx);
295                         break;
296                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
297                         // no timed-wait op.
298                         // notify context
299                         active_ctx->schedule( consumer_ctx);
300                         break;
301                     }
302                 }
303                 // suspend this producer
304                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
305                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
306                     // clear slot
307                     slot * nil_slot = nullptr, * own_slot = & s;
308                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
309                     // resumed, value has not been consumed
310                     return channel_op_status::timeout;
311                 }
312                 // resumed
313                 if ( nullptr == s.ctx) {
314                     // value has been consumed
315                     return channel_op_status::success;
316                 } else {
317                     // channel was closed before value was consumed
318                     return channel_op_status::closed;
319                 }
320             } else {
321                 detail::spinlock_lock lk{ splk_producers_ };
322                 if ( BOOST_UNLIKELY( is_closed() ) ) {
323                     return channel_op_status::closed;
324                 }
325                 if ( is_empty_() ) {
326                     continue;
327                 }
328                 active_ctx->wait_link( waiting_producers_);
329                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
330                 // suspend this producer
331                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
332                     // relock local lk
333                     lk.lock();
334                     // remove from waiting-queue
335                     waiting_producers_.remove( * active_ctx);
336                     return channel_op_status::timeout;
337                 }
338                 // resumed, slot maybe free
339             }
340         }
341     }
342
343     template< typename Clock, typename Duration >
344     channel_op_status push_wait_until( value_type && value,
345                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
346         context * active_ctx = context::active();
347         slot s{ std::move( value), active_ctx };
348         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
349         for (;;) {
350             if ( BOOST_UNLIKELY( is_closed() ) ) {
351                 return channel_op_status::closed;
352             }
353             if ( try_push_( & s) ) {
354                 detail::spinlock_lock lk{ splk_consumers_ };
355                 // notify one waiting consumer
356                 while ( ! waiting_consumers_.empty() ) {
357                     context * consumer_ctx = & waiting_consumers_.front();
358                     waiting_consumers_.pop_front();
359                     std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
360                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
361                         // notify context
362                         active_ctx->schedule( consumer_ctx);
363                         break;
364                     } else if ( static_cast< std::intptr_t >( 0) == expected) {
365                         // no timed-wait op.
366                         // notify context
367                         active_ctx->schedule( consumer_ctx);
368                         break;
369                     }
370                 }
371                 // suspend this producer
372                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
373                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
374                     // clear slot
375                     slot * nil_slot = nullptr, * own_slot = & s;
376                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
377                     // resumed, value has not been consumed
378                     return channel_op_status::timeout;
379                 }
380                 // resumed
381                 if ( nullptr == s.ctx) {
382                     // value has been consumed
383                     return channel_op_status::success;
384                 } else {
385                     // channel was closed before value was consumed
386                     return channel_op_status::closed;
387                 }
388             } else {
389                 detail::spinlock_lock lk{ splk_producers_ };
390                 if ( BOOST_UNLIKELY( is_closed() ) ) {
391                     return channel_op_status::closed;
392                 }
393                 if ( is_empty_() ) {
394                     continue;
395                 }
396                 active_ctx->wait_link( waiting_producers_);
397                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
398                 // suspend this producer
399                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
400                     // relock local lk
401                     lk.lock();
402                     // remove from waiting-queue
403                     waiting_producers_.remove( * active_ctx);
404                     return channel_op_status::timeout;
405                 }
406                 // resumed, slot maybe free
407             }
408         }
409     }
410
411     channel_op_status pop( value_type & value) {
412         context * active_ctx = context::active();
413         slot * s = nullptr;
414         for (;;) {
415             if ( nullptr != ( s = try_pop_() ) ) {
416                 {
417                     detail::spinlock_lock lk{ splk_producers_ };
418                     // notify one waiting producer
419                     while ( ! waiting_producers_.empty() ) {
420                         context * producer_ctx = & waiting_producers_.front();
421                         waiting_producers_.pop_front();
422                         std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
423                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
424                             lk.unlock();
425                             // notify context
426                             active_ctx->schedule( producer_ctx);
427                             break;
428                         } else if ( static_cast< std::intptr_t >( 0) == expected) {
429                             lk.unlock();
430                             // no timed-wait op.
431                             // notify context
432                             active_ctx->schedule( producer_ctx);
433                             break;
434                         }
435                     }
436                 }
437                 value = std::move( s->value);
438                 // notify context
439 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
440                 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
441 #else
442                 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
443 #endif
444                 return channel_op_status::success;
445             } else {
446                 detail::spinlock_lock lk{ splk_consumers_ };
447                 if ( BOOST_UNLIKELY( is_closed() ) ) {
448                     return channel_op_status::closed;
449                 }
450                 if ( ! is_empty_() ) {
451                     continue;
452                 }
453                 active_ctx->wait_link( waiting_consumers_);
454                 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
455                 // suspend this consumer
456                 active_ctx->suspend( lk);
457                 // resumed, slot mabye set
458             }
459         }
460     }
461
462     value_type value_pop() {
463         context * active_ctx = context::active();
464         slot * s = nullptr;
465         for (;;) {
466             if ( nullptr != ( s = try_pop_() ) ) {
467                 {
468                     detail::spinlock_lock lk{ splk_producers_ };
469                     // notify one waiting producer
470                     while ( ! waiting_producers_.empty() ) {
471                         context * producer_ctx = & waiting_producers_.front();
472                         waiting_producers_.pop_front();
473                         std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
474                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
475                             lk.unlock();
476                             // notify context
477                             active_ctx->schedule( producer_ctx);
478                             break;
479                         } else if ( static_cast< std::intptr_t >( 0) == expected) {
480                             lk.unlock();
481                             // no timed-wait op.
482                             // notify context
483                             active_ctx->schedule( producer_ctx);
484                             break;
485                         }
486                     }
487                 }
488                 // consume value
489                 value_type value = std::move( s->value);
490                 // notify context
491 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
492                 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
493 #else
494                 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
495 #endif
496                 return std::move( value);
497             } else {
498                 detail::spinlock_lock lk{ splk_consumers_ };
499                 if ( BOOST_UNLIKELY( is_closed() ) ) {
500                     throw fiber_error{
501                             std::make_error_code( std::errc::operation_not_permitted),
502                             "boost fiber: channel is closed" };
503                 }
504                 if ( ! is_empty_() ) {
505                     continue;
506                 }
507                 active_ctx->wait_link( waiting_consumers_);
508                 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
509                 // suspend this consumer
510                 active_ctx->suspend( lk);
511                 // resumed, slot mabye set
512             }
513         }
514     }
515
516     template< typename Rep, typename Period >
517     channel_op_status pop_wait_for( value_type & value,
518                                     std::chrono::duration< Rep, Period > const& timeout_duration) {
519         return pop_wait_until( value,
520                                std::chrono::steady_clock::now() + timeout_duration);
521     }
522
523     template< typename Clock, typename Duration >
524     channel_op_status pop_wait_until( value_type & value,
525                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {
526         context * active_ctx = context::active();
527         slot * s = nullptr;
528         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
529         for (;;) {
530             if ( nullptr != ( s = try_pop_() ) ) {
531                 {
532                     detail::spinlock_lock lk{ splk_producers_ };
533                     // notify one waiting producer
534                     while ( ! waiting_producers_.empty() ) {
535                         context * producer_ctx = & waiting_producers_.front();
536                         waiting_producers_.pop_front();
537                         std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
538                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
539                             lk.unlock();
540                             // notify context
541                             active_ctx->schedule( producer_ctx);
542                             break;
543                         } else if ( static_cast< std::intptr_t >( 0) == expected) {
544                             lk.unlock();
545                             // no timed-wait op.
546                             // notify context
547                             active_ctx->schedule( producer_ctx);
548                             break;
549                         }
550                     }
551                 }
552                 // consume value
553                 value = std::move( s->value);
554                 // notify context
555 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
556                 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
557 #else
558                 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
559 #endif
560                 return channel_op_status::success;
561             } else {
562                 detail::spinlock_lock lk{ splk_consumers_ };
563                 if ( BOOST_UNLIKELY( is_closed() ) ) {
564                     return channel_op_status::closed;
565                 }
566                 if ( ! is_empty_() ) {
567                     continue;
568                 }
569                 active_ctx->wait_link( waiting_consumers_);
570                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
571                 // suspend this consumer
572                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
573                     // relock local lk
574                     lk.lock();
575                     // remove from waiting-queue
576                     waiting_consumers_.remove( * active_ctx);
577                     return channel_op_status::timeout;
578                 }
579             }
580         }
581     }
582
583     class iterator {
584     private:
585         typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type  storage_type;
586
587         unbuffered_channel  *   chan_{ nullptr };
588         storage_type            storage_;
589
590         void increment_() {
591             BOOST_ASSERT( nullptr != chan_);
592             try {
593                 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
594             } catch ( fiber_error const&) {
595                 chan_ = nullptr;
596             }
597         }
598
599     public:
600         typedef std::input_iterator_tag                     iterator_category;
601         typedef std::ptrdiff_t                              difference_type;
602         typedef value_type                              *   pointer;
603         typedef value_type                              &   reference;
604
605         typedef pointer     pointer_t;
606         typedef reference   reference_t;
607
608         iterator() noexcept = default;
609
610         explicit iterator( unbuffered_channel< T > * chan) noexcept :
611             chan_{ chan } {
612             increment_();
613         }
614
615         iterator( iterator const& other) noexcept :
616             chan_{ other.chan_ } {
617         }
618
619         iterator & operator=( iterator const& other) noexcept {
620             if ( this == & other) return * this;
621             chan_ = other.chan_;
622             return * this;
623         }
624
625         bool operator==( iterator const& other) const noexcept {
626             return other.chan_ == chan_;
627         }
628
629         bool operator!=( iterator const& other) const noexcept {
630             return other.chan_ != chan_;
631         }
632
633         iterator & operator++() {
634             reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
635             increment_();
636             return * this;
637         }
638
639         iterator operator++( int) = delete;
640
641         reference_t operator*() noexcept {
642             return * reinterpret_cast< value_type * >( std::addressof( storage_) );
643         }
644
645         pointer_t operator->() noexcept {
646             return reinterpret_cast< value_type * >( std::addressof( storage_) );
647         }
648     };
649
650     friend class iterator;
651 };
652
653 template< typename T >
654 typename unbuffered_channel< T >::iterator
655 begin( unbuffered_channel< T > & chan) {
656     return typename unbuffered_channel< T >::iterator( & chan);
657 }
658
659 template< typename T >
660 typename unbuffered_channel< T >::iterator
661 end( unbuffered_channel< T > &) {
662     return typename unbuffered_channel< T >::iterator();
663 }
664
665 }}
666
667 #ifdef BOOST_HAS_ABI_HEADERS
668 #  include BOOST_ABI_SUFFIX
669 #endif
670
671 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H