Imported Upstream version 1.72.0
[platform/upstream/boost.git] / boost / beast / core / impl / basic_stream.hpp
1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
11 #define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
12
13 #include <boost/beast/core/async_base.hpp>
14 #include <boost/beast/core/buffer_traits.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/websocket/teardown.hpp>
17 #include <boost/asio/coroutine.hpp>
18 #include <boost/assert.hpp>
19 #include <boost/make_shared.hpp>
20 #include <boost/core/exchange.hpp>
21 #include <cstdlib>
22 #include <type_traits>
23 #include <utility>
24
25 namespace boost {
26 namespace beast {
27
28 //------------------------------------------------------------------------------
29
30 template<class Protocol, class Executor, class RatePolicy>
31 template<class... Args>
32 basic_stream<Protocol, Executor, RatePolicy>::
33 impl_type::
34 impl_type(std::false_type, Args&&... args)
35     : socket(std::forward<Args>(args)...)
36     , read(ex())
37     , write(ex())
38     , timer(ex())
39 {
40     reset();
41 }
42
43 template<class Protocol, class Executor, class RatePolicy>
44 template<class RatePolicy_, class... Args>
45 basic_stream<Protocol, Executor, RatePolicy>::
46 impl_type::
47 impl_type(std::true_type,
48     RatePolicy_&& policy, Args&&... args)
49     : boost::empty_value<RatePolicy>(
50         boost::empty_init_t{},
51         std::forward<RatePolicy_>(policy))
52     , socket(std::forward<Args>(args)...)
53     , read(ex())
54     , write(ex())
55     , timer(ex())
56 {
57     reset();
58 }
59
60 template<class Protocol, class Executor, class RatePolicy>
61 template<class Executor2>
62 void
63 basic_stream<Protocol, Executor, RatePolicy>::
64 impl_type::
65 on_timer(Executor2 const& ex2)
66 {
67     BOOST_ASSERT(waiting > 0);
68
69     // the last waiter starts the new slice
70     if(--waiting > 0)
71         return;
72
73     // update the expiration time
74     BOOST_VERIFY(timer.expires_after(
75         std::chrono::seconds(1)) == 0);
76
77     rate_policy_access::on_timer(policy());
78
79     struct handler : boost::empty_value<Executor2>
80     {
81         boost::weak_ptr<impl_type> wp;
82
83         using executor_type = Executor2;
84
85         executor_type
86         get_executor() const noexcept
87         {
88             return this->get();
89         }
90
91         handler(
92             Executor2 const& ex2,
93             boost::shared_ptr<impl_type> const& sp)
94             : boost::empty_value<Executor2>(
95                 boost::empty_init_t{}, ex2)
96             , wp(sp)
97         {
98         }
99
100         void
101         operator()(error_code ec)
102         {
103             auto sp = wp.lock();
104             if(! sp)
105                 return;
106             if(ec == net::error::operation_aborted)
107                 return;
108             BOOST_ASSERT(! ec);
109             if(ec)
110                 return;
111             sp->on_timer(this->get());
112         }
113     };
114
115     // wait on the timer again
116     ++waiting;
117     timer.async_wait(handler(ex2, this->shared_from_this()));
118 }
119
120 template<class Protocol, class Executor, class RatePolicy>
121 void
122 basic_stream<Protocol, Executor, RatePolicy>::
123 impl_type::
124 reset()
125 {
126     // If assert goes off, it means that there are
127     // already read or write (or connect) operations
128     // outstanding, so there is nothing to apply
129     // the expiration time to!
130     //
131     BOOST_ASSERT(! read.pending || ! write.pending);
132
133     if(! read.pending)
134         BOOST_VERIFY(
135             read.timer.expires_at(never()) == 0);
136
137     if(! write.pending)
138         BOOST_VERIFY(
139             write.timer.expires_at(never()) == 0);
140 }
141
142 template<class Protocol, class Executor, class RatePolicy>
143 void
144 basic_stream<Protocol, Executor, RatePolicy>::
145 impl_type::
146 close()
147 {
148     {
149         error_code ec;
150         socket.close(ec);
151     }
152     timer.cancel();
153
154
155     // have to let the read/write ops cancel the timer,
156     // otherwise we will get error::timeout on close when
157     // we actually want net::error::operation_aborted.
158     //
159     //read.timer.cancel();
160     //write.timer.cancel();
161 }
162
163 //------------------------------------------------------------------------------
164
165 template<class Protocol, class Executor, class RatePolicy>
166 template<class Executor2>
167 struct basic_stream<Protocol, Executor, RatePolicy>::
168     timeout_handler
169 {
170     using executor_type = Executor2;
171
172     op_state& state;
173     boost::weak_ptr<impl_type> wp;
174     tick_type tick;
175     executor_type ex;
176
177     executor_type get_executor() const noexcept
178     {
179         return ex;
180     }
181
182     void
183     operator()(error_code ec)
184     {
185         // timer canceled
186         if(ec == net::error::operation_aborted)
187             return;
188         BOOST_ASSERT(! ec);
189
190         auto sp = wp.lock();
191
192         // stream destroyed
193         if(! sp)
194             return;
195
196         // stale timer
197         if(tick < state.tick)
198             return;
199         BOOST_ASSERT(tick == state.tick);
200
201         // timeout
202         BOOST_ASSERT(! state.timeout);
203         sp->close();
204         state.timeout = true;
205     }
206 };
207
208 //------------------------------------------------------------------------------
209
210 template<class Protocol, class Executor, class RatePolicy>
211 struct basic_stream<Protocol, Executor, RatePolicy>::ops
212 {
213
214 template<bool isRead, class Buffers, class Handler>
215 class transfer_op
216     : public async_base<Handler, Executor>
217     , public boost::asio::coroutine
218 {
219     boost::shared_ptr<impl_type> impl_;
220     pending_guard pg_;
221     Buffers b_;
222
223     using is_read = std::integral_constant<bool, isRead>;
224
225     op_state&
226     state()
227     {
228         if (isRead)
229             return impl_->read;
230         else
231             return impl_->write;
232     }
233
234     std::size_t
235     available_bytes()
236     {
237         if (isRead)
238             return rate_policy_access::
239                 available_read_bytes(impl_->policy());
240         else
241             return rate_policy_access::
242                 available_write_bytes(impl_->policy());
243     }
244
245     void
246     transfer_bytes(std::size_t n)
247     {
248         if (isRead)
249             rate_policy_access::
250                 transfer_read_bytes(impl_->policy(), n);
251         else
252             rate_policy_access::
253                 transfer_write_bytes(impl_->policy(), n);
254     }
255
256     void
257     async_perform(
258         std::size_t amount, std::true_type)
259     {
260         impl_->socket.async_read_some(
261             beast::buffers_prefix(amount, b_),
262                 std::move(*this));
263     }
264
265     void
266     async_perform(
267         std::size_t amount, std::false_type)
268     {
269         impl_->socket.async_write_some(
270             beast::buffers_prefix(amount, b_),
271                 std::move(*this));
272     }
273
274 public:
275     template<class Handler_>
276     transfer_op(
277         Handler_&& h,
278         basic_stream& s,
279         Buffers const& b)
280         : async_base<Handler, Executor>(
281             std::forward<Handler_>(h), s.get_executor())
282         , impl_(s.impl_)
283         , pg_(state().pending)
284         , b_(b)
285     {
286         (*this)({});
287     }
288
289     void
290     operator()(
291         error_code ec,
292         std::size_t bytes_transferred = 0)
293     {
294         BOOST_ASIO_CORO_REENTER(*this)
295         {
296             // handle empty buffers
297             if(detail::buffers_empty(b_))
298             {
299                 // make sure we perform the no-op
300                 BOOST_ASIO_CORO_YIELD
301                 async_perform(0, is_read{});
302                 // apply the timeout manually, otherwise
303                 // behavior varies across platforms.
304                 if(state().timer.expiry() <= clock_type::now())
305                 {
306                     impl_->close();
307                     ec = beast::error::timeout;
308                 }
309                 goto upcall;
310             }
311
312             // if a timeout is active, wait on the timer
313             if(state().timer.expiry() != never())
314                 state().timer.async_wait(
315                     timeout_handler<decltype(this->get_executor())>{
316                         state(),
317                         impl_,
318                         state().tick,
319                         this->get_executor()});
320
321             // check rate limit, maybe wait
322             std::size_t amount;
323             amount = available_bytes();
324             if(amount == 0)
325             {
326                 ++impl_->waiting;
327                 BOOST_ASIO_CORO_YIELD
328                 impl_->timer.async_wait(std::move(*this));
329                 if(ec)
330                 {
331                     // socket was closed, or a timeout
332                     BOOST_ASSERT(ec ==
333                         net::error::operation_aborted);
334                     // timeout handler invoked?
335                     if(state().timeout)
336                     {
337                         // yes, socket already closed
338                         ec = beast::error::timeout;
339                         state().timeout = false;
340                     }
341                     goto upcall;
342                 }
343                 impl_->on_timer(this->get_executor());
344
345                 // Allow at least one byte, otherwise
346                 // bytes_transferred could be 0.
347                 amount = std::max<std::size_t>(
348                     available_bytes(), 1);
349             }
350
351             BOOST_ASIO_CORO_YIELD
352             async_perform(amount, is_read{});
353
354             if(state().timer.expiry() != never())
355             {
356                 ++state().tick;
357
358                 // try cancelling timer
359                 auto const n =
360                     state().timer.cancel();
361                 if(n == 0)
362                 {
363                     // timeout handler invoked?
364                     if(state().timeout)
365                     {
366                         // yes, socket already closed
367                         ec = beast::error::timeout;
368                         state().timeout = false;
369                     }
370                 }
371                 else
372                 {
373                     BOOST_ASSERT(n == 1);
374                     BOOST_ASSERT(! state().timeout);
375                 }
376             }
377
378         upcall:
379             pg_.reset();
380             transfer_bytes(bytes_transferred);
381             this->complete_now(ec, bytes_transferred);
382         }
383     }
384 };
385
386 template<class Handler>
387 class connect_op
388     : public async_base<Handler, Executor>
389 {
390     boost::shared_ptr<impl_type> impl_;
391     pending_guard pg0_;
392     pending_guard pg1_;
393
394     op_state&
395     state() noexcept
396     {
397         return impl_->write;
398     }
399
400 public:
401     template<class Handler_>
402     connect_op(
403         Handler_&& h,
404         basic_stream& s,
405         endpoint_type ep)
406         : async_base<Handler, Executor>(
407             std::forward<Handler_>(h), s.get_executor())
408         , impl_(s.impl_)
409         , pg0_(impl_->read.pending)
410         , pg1_(impl_->write.pending)
411     {
412         if(state().timer.expiry() != stream_base::never())
413             impl_->write.timer.async_wait(
414                 timeout_handler<decltype(this->get_executor())>{
415                     state(),
416                     impl_,
417                     state().tick,
418                     this->get_executor()});
419
420         impl_->socket.async_connect(
421             ep, std::move(*this));
422         // *this is now moved-from
423     }
424
425     template<
426         class Endpoints, class Condition,
427         class Handler_>
428     connect_op(
429         Handler_&& h,
430         basic_stream& s,
431         Endpoints const& eps,
432         Condition const& cond)
433         : async_base<Handler, Executor>(
434             std::forward<Handler_>(h), s.get_executor())
435         , impl_(s.impl_)
436         , pg0_(impl_->read.pending)
437         , pg1_(impl_->write.pending)
438     {
439         if(state().timer.expiry() != stream_base::never())
440             impl_->write.timer.async_wait(
441                 timeout_handler<decltype(this->get_executor())>{
442                     state(),
443                     impl_,
444                     state().tick,
445                     this->get_executor()});
446
447         net::async_connect(impl_->socket,
448             eps, cond, std::move(*this));
449         // *this is now moved-from
450     }
451
452     template<
453         class Iterator, class Condition,
454         class Handler_>
455     connect_op(
456         Handler_&& h,
457         basic_stream& s,
458         Iterator begin, Iterator end,
459         Condition const& cond)
460         : async_base<Handler, Executor>(
461             std::forward<Handler_>(h), s.get_executor())
462         , impl_(s.impl_)
463         , pg0_(impl_->read.pending)
464         , pg1_(impl_->write.pending)
465     {
466         if(state().timer.expiry() != stream_base::never())
467             impl_->write.timer.async_wait(
468                 timeout_handler<decltype(this->get_executor())>{
469                     state(),
470                     impl_,
471                     state().tick,
472                     this->get_executor()});
473
474         net::async_connect(impl_->socket,
475             begin, end, cond, std::move(*this));
476         // *this is now moved-from
477     }
478
479     template<class... Args>
480     void
481     operator()(error_code ec, Args&&... args)
482     {
483         if(state().timer.expiry() != stream_base::never())
484         {
485             ++state().tick;
486
487             // try cancelling timer
488             auto const n =
489                 impl_->write.timer.cancel();
490             if(n == 0)
491             {
492                 // timeout handler invoked?
493                 if(state().timeout)
494                 {
495                     // yes, socket already closed
496                     ec = beast::error::timeout;
497                     state().timeout = false;
498                 }
499             }
500             else
501             {
502                 BOOST_ASSERT(n == 1);
503                 BOOST_ASSERT(! state().timeout);
504             }
505         }
506
507         pg0_.reset();
508         pg1_.reset();
509         this->complete_now(ec, std::forward<Args>(args)...);
510     }
511 };
512
513 struct run_read_op
514 {
515     template<class ReadHandler, class Buffers>
516     void
517     operator()(
518         ReadHandler&& h,
519         basic_stream* s,
520         Buffers const& b)
521     {
522         // If you get an error on the following line it means
523         // that your handler does not meet the documented type
524         // requirements for the handler.
525
526         static_assert(
527             detail::is_invocable<ReadHandler,
528                 void(error_code, std::size_t)>::value,
529             "ReadHandler type requirements not met");
530
531         transfer_op<
532             true,
533             Buffers,
534             typename std::decay<ReadHandler>::type>(
535                 std::forward<ReadHandler>(h), *s, b);
536     }
537 };
538
539 struct run_write_op
540 {
541     template<class WriteHandler, class Buffers>
542     void
543     operator()(
544         WriteHandler&& h,
545         basic_stream* s,
546         Buffers const& b)
547     {
548         // If you get an error on the following line it means
549         // that your handler does not meet the documented type
550         // requirements for the handler.
551
552         static_assert(
553             detail::is_invocable<WriteHandler,
554                 void(error_code, std::size_t)>::value,
555             "WriteHandler type requirements not met");
556
557         transfer_op<
558             false,
559             Buffers,
560             typename std::decay<WriteHandler>::type>(
561                 std::forward<WriteHandler>(h), *s, b);
562     }
563 };
564
565 struct run_connect_op
566 {
567     template<class ConnectHandler>
568     void
569     operator()(
570         ConnectHandler&& h,
571         basic_stream* s,
572         endpoint_type const& ep)
573     {
574         // If you get an error on the following line it means
575         // that your handler does not meet the documented type
576         // requirements for the handler.
577
578         static_assert(
579             detail::is_invocable<ConnectHandler,
580                 void(error_code)>::value,
581             "ConnectHandler type requirements not met");
582
583         connect_op<typename std::decay<ConnectHandler>::type>(
584             std::forward<ConnectHandler>(h), *s, ep);
585     }
586 };
587
588 struct run_connect_range_op
589 {
590     template<
591         class RangeConnectHandler,
592         class EndpointSequence,
593         class Condition>
594     void
595     operator()(
596         RangeConnectHandler&& h,
597         basic_stream* s,
598         EndpointSequence const& eps,
599         Condition const& cond)
600     {
601         // If you get an error on the following line it means
602         // that your handler does not meet the documented type
603         // requirements for the handler.
604
605         static_assert(
606             detail::is_invocable<RangeConnectHandler,
607                 void(error_code, typename Protocol::endpoint)>::value,
608             "RangeConnectHandler type requirements not met");
609
610         connect_op<typename std::decay<RangeConnectHandler>::type>(
611             std::forward<RangeConnectHandler>(h), *s, eps, cond);
612     }
613 };
614
615 struct run_connect_iter_op
616 {
617     template<
618         class IteratorConnectHandler,
619         class Iterator,
620         class Condition>
621     void
622     operator()(
623         IteratorConnectHandler&& h,
624         basic_stream* s,
625         Iterator begin, Iterator end,
626         Condition const& cond)
627     {
628         // If you get an error on the following line it means
629         // that your handler does not meet the documented type
630         // requirements for the handler.
631
632         static_assert(
633             detail::is_invocable<IteratorConnectHandler,
634                 void(error_code, Iterator)>::value,
635             "IteratorConnectHandler type requirements not met");
636
637         connect_op<typename std::decay<IteratorConnectHandler>::type>(
638             std::forward<IteratorConnectHandler>(h), *s, begin, end, cond);
639     }
640 };
641
642 };
643
644 //------------------------------------------------------------------------------
645
646 template<class Protocol, class Executor, class RatePolicy>
647 basic_stream<Protocol, Executor, RatePolicy>::
648 ~basic_stream()
649 {
650     // the shared object can outlive *this,
651     // cancel any operations so the shared
652     // object is destroyed as soon as possible.
653     impl_->close();
654 }
655
656 template<class Protocol, class Executor, class RatePolicy>
657 template<class Arg0, class... Args, class>
658 basic_stream<Protocol, Executor, RatePolicy>::
659 basic_stream(Arg0&& arg0, Args&&... args)
660     : impl_(boost::make_shared<impl_type>(
661         std::false_type{},
662         std::forward<Arg0>(arg0),
663         std::forward<Args>(args)...))
664 {
665 }
666
667 template<class Protocol, class Executor, class RatePolicy>
668 template<class RatePolicy_, class Arg0, class... Args, class>
669 basic_stream<Protocol, Executor, RatePolicy>::
670 basic_stream(
671     RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
672     : impl_(boost::make_shared<impl_type>(
673         std::true_type{},
674         std::forward<RatePolicy_>(policy),
675         std::forward<Arg0>(arg0),
676         std::forward<Args>(args)...))
677 {
678 }
679
680 template<class Protocol, class Executor, class RatePolicy>
681 basic_stream<Protocol, Executor, RatePolicy>::
682 basic_stream(basic_stream&& other)
683     : impl_(boost::make_shared<impl_type>(
684         std::move(*other.impl_)))
685 {
686     // VFALCO I'm not sure this implementation is correct...
687 }
688
689 //------------------------------------------------------------------------------
690
691 template<class Protocol, class Executor, class RatePolicy>
692 auto
693 basic_stream<Protocol, Executor, RatePolicy>::
694 release_socket() ->
695     socket_type
696 {
697     this->cancel();
698     return std::move(impl_->socket);
699 }
700
701 template<class Protocol, class Executor, class RatePolicy>
702 void
703 basic_stream<Protocol, Executor, RatePolicy>::
704 expires_after(std::chrono::nanoseconds expiry_time)
705 {
706     // If assert goes off, it means that there are
707     // already read or write (or connect) operations
708     // outstanding, so there is nothing to apply
709     // the expiration time to!
710     //
711     BOOST_ASSERT(
712         ! impl_->read.pending ||
713         ! impl_->write.pending);
714
715     if(! impl_->read.pending)
716         BOOST_VERIFY(
717             impl_->read.timer.expires_after(
718                 expiry_time) == 0);
719
720     if(! impl_->write.pending)
721         BOOST_VERIFY(
722             impl_->write.timer.expires_after(
723                 expiry_time) == 0);
724 }
725
726 template<class Protocol, class Executor, class RatePolicy>
727 void
728 basic_stream<Protocol, Executor, RatePolicy>::
729 expires_at(
730     net::steady_timer::time_point expiry_time)
731 {
732     // If assert goes off, it means that there are
733     // already read or write (or connect) operations
734     // outstanding, so there is nothing to apply
735     // the expiration time to!
736     //
737     BOOST_ASSERT(
738         ! impl_->read.pending ||
739         ! impl_->write.pending);
740
741     if(! impl_->read.pending)
742         BOOST_VERIFY(
743             impl_->read.timer.expires_at(
744                 expiry_time) == 0);
745
746     if(! impl_->write.pending)
747         BOOST_VERIFY(
748             impl_->write.timer.expires_at(
749                 expiry_time) == 0);
750 }
751
752 template<class Protocol, class Executor, class RatePolicy>
753 void
754 basic_stream<Protocol, Executor, RatePolicy>::
755 expires_never()
756 {
757     impl_->reset();
758 }
759
760 template<class Protocol, class Executor, class RatePolicy>
761 void
762 basic_stream<Protocol, Executor, RatePolicy>::
763 cancel()
764 {
765     error_code ec;
766     impl_->socket.cancel(ec);
767     impl_->timer.cancel();
768 }
769
770 template<class Protocol, class Executor, class RatePolicy>
771 void
772 basic_stream<Protocol, Executor, RatePolicy>::
773 close()
774 {
775     impl_->close();
776 }
777
778 //------------------------------------------------------------------------------
779
780 template<class Protocol, class Executor, class RatePolicy>
781 template<class ConnectHandler>
782 BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
783 basic_stream<Protocol, Executor, RatePolicy>::
784 async_connect(
785     endpoint_type const& ep,
786     ConnectHandler&& handler)
787 {
788     return net::async_initiate<
789         ConnectHandler,
790         void(error_code)>(
791             typename ops::run_connect_op{},
792             handler,
793             this,
794             ep);
795 }
796
797 template<class Protocol, class Executor, class RatePolicy>
798 template<
799     class EndpointSequence,
800     class RangeConnectHandler,
801     class>
802 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
803 basic_stream<Protocol, Executor, RatePolicy>::
804 async_connect(
805     EndpointSequence const& endpoints,
806     RangeConnectHandler&& handler)
807 {
808     return net::async_initiate<
809         RangeConnectHandler,
810         void(error_code, typename Protocol::endpoint)>(
811             typename ops::run_connect_range_op{},
812             handler,
813             this,
814             endpoints,
815             detail::any_endpoint{});
816 }
817
818 template<class Protocol, class Executor, class RatePolicy>
819 template<
820     class EndpointSequence,
821     class ConnectCondition,
822     class RangeConnectHandler,
823     class>
824 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
825 basic_stream<Protocol, Executor, RatePolicy>::
826 async_connect(
827     EndpointSequence const& endpoints,
828     ConnectCondition connect_condition,
829     RangeConnectHandler&& handler)
830 {
831     return net::async_initiate<
832         RangeConnectHandler,
833         void(error_code, typename Protocol::endpoint)>(
834             typename ops::run_connect_range_op{},
835             handler,
836             this,
837             endpoints,
838             connect_condition);
839 }
840
841 template<class Protocol, class Executor, class RatePolicy>
842 template<
843     class Iterator,
844     class IteratorConnectHandler>
845 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
846 basic_stream<Protocol, Executor, RatePolicy>::
847 async_connect(
848     Iterator begin, Iterator end,
849     IteratorConnectHandler&& handler)
850 {
851     return net::async_initiate<
852         IteratorConnectHandler,
853         void(error_code, Iterator)>(
854             typename ops::run_connect_iter_op{},
855             handler,
856             this,
857             begin, end,
858             detail::any_endpoint{});
859 }
860
861 template<class Protocol, class Executor, class RatePolicy>
862 template<
863     class Iterator,
864     class ConnectCondition,
865     class IteratorConnectHandler>
866 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
867 basic_stream<Protocol, Executor, RatePolicy>::
868 async_connect(
869     Iterator begin, Iterator end,
870     ConnectCondition connect_condition,
871     IteratorConnectHandler&& handler)
872 {
873     return net::async_initiate<
874         IteratorConnectHandler,
875         void(error_code, Iterator)>(
876             typename ops::run_connect_iter_op{},
877             handler,
878             this,
879             begin, end,
880             connect_condition);
881 }
882
883 //------------------------------------------------------------------------------
884
885 template<class Protocol, class Executor, class RatePolicy>
886 template<class MutableBufferSequence, class ReadHandler>
887 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
888 basic_stream<Protocol, Executor, RatePolicy>::
889 async_read_some(
890     MutableBufferSequence const& buffers,
891     ReadHandler&& handler)
892 {
893     static_assert(net::is_mutable_buffer_sequence<
894         MutableBufferSequence>::value,
895         "MutableBufferSequence type requirements not met");
896     return net::async_initiate<
897         ReadHandler,
898         void(error_code, std::size_t)>(
899             typename ops::run_read_op{},
900             handler,
901             this,
902             buffers);
903 }
904
905 template<class Protocol, class Executor, class RatePolicy>
906 template<class ConstBufferSequence, class WriteHandler>
907 BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
908 basic_stream<Protocol, Executor, RatePolicy>::
909 async_write_some(
910     ConstBufferSequence const& buffers,
911     WriteHandler&& handler)
912 {
913     static_assert(net::is_const_buffer_sequence<
914         ConstBufferSequence>::value,
915         "ConstBufferSequence type requirements not met");
916     return net::async_initiate<
917         WriteHandler,
918         void(error_code, std::size_t)>(
919             typename ops::run_write_op{},
920             handler,
921             this,
922             buffers);
923 }
924
925 //------------------------------------------------------------------------------
926 //
927 // Customization points
928 //
929
930 #if ! BOOST_BEAST_DOXYGEN
931
932 template<
933     class Protocol, class Executor, class RatePolicy>
934 void
935 beast_close_socket(
936     basic_stream<Protocol, Executor, RatePolicy>& stream)
937 {
938     error_code ec;
939     stream.socket().close(ec);
940 }
941
942 template<
943     class Protocol, class Executor, class RatePolicy>
944 void
945 teardown(
946     role_type role,
947     basic_stream<Protocol, Executor, RatePolicy>& stream,
948     error_code& ec)
949 {
950     using beast::websocket::teardown;
951     teardown(role, stream.socket(), ec);
952 }
953
954 template<
955     class Protocol, class Executor, class RatePolicy,
956     class TeardownHandler>
957 void
958 async_teardown(
959     role_type role,
960     basic_stream<Protocol, Executor, RatePolicy>& stream,
961     TeardownHandler&& handler)
962 {
963     using beast::websocket::async_teardown;
964     async_teardown(role, stream.socket(),
965         std::forward<TeardownHandler>(handler));
966 }
967
968 #endif
969
970 } // beast
971 } // boost
972
973 #endif