2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
11 #define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
13 #include <boost/beast/core/async_base.hpp>
14 #include <boost/beast/core/buffer_traits.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/websocket/teardown.hpp>
17 #include <boost/asio/coroutine.hpp>
18 #include <boost/assert.hpp>
19 #include <boost/make_shared.hpp>
20 #include <boost/core/exchange.hpp>
22 #include <type_traits>
28 //------------------------------------------------------------------------------
30 template<class Protocol, class Executor, class RatePolicy>
31 template<class... Args>
32 basic_stream<Protocol, Executor, RatePolicy>::
34 impl_type(std::false_type, Args&&... args)
35 : socket(std::forward<Args>(args)...)
43 template<class Protocol, class Executor, class RatePolicy>
44 template<class RatePolicy_, class... Args>
45 basic_stream<Protocol, Executor, RatePolicy>::
47 impl_type(std::true_type,
48 RatePolicy_&& policy, Args&&... args)
49 : boost::empty_value<RatePolicy>(
50 boost::empty_init_t{},
51 std::forward<RatePolicy_>(policy))
52 , socket(std::forward<Args>(args)...)
60 template<class Protocol, class Executor, class RatePolicy>
61 template<class Executor2>
63 basic_stream<Protocol, Executor, RatePolicy>::
65 on_timer(Executor2 const& ex2)
67 BOOST_ASSERT(waiting > 0);
69 // the last waiter starts the new slice
73 // update the expiration time
74 BOOST_VERIFY(timer.expires_after(
75 std::chrono::seconds(1)) == 0);
77 rate_policy_access::on_timer(policy());
79 struct handler : boost::empty_value<Executor2>
81 boost::weak_ptr<impl_type> wp;
83 using executor_type = Executor2;
86 get_executor() const noexcept
93 boost::shared_ptr<impl_type> const& sp)
94 : boost::empty_value<Executor2>(
95 boost::empty_init_t{}, ex2)
101 operator()(error_code ec)
106 if(ec == net::error::operation_aborted)
111 sp->on_timer(this->get());
115 // wait on the timer again
117 timer.async_wait(handler(ex2, this->shared_from_this()));
120 template<class Protocol, class Executor, class RatePolicy>
122 basic_stream<Protocol, Executor, RatePolicy>::
126 // If assert goes off, it means that there are
127 // already read or write (or connect) operations
128 // outstanding, so there is nothing to apply
129 // the expiration time to!
131 BOOST_ASSERT(! read.pending || ! write.pending);
135 read.timer.expires_at(never()) == 0);
139 write.timer.expires_at(never()) == 0);
142 template<class Protocol, class Executor, class RatePolicy>
144 basic_stream<Protocol, Executor, RatePolicy>::
155 // have to let the read/write ops cancel the timer,
156 // otherwise we will get error::timeout on close when
157 // we actually want net::error::operation_aborted.
159 //read.timer.cancel();
160 //write.timer.cancel();
163 //------------------------------------------------------------------------------
165 template<class Protocol, class Executor, class RatePolicy>
166 template<class Executor2>
167 struct basic_stream<Protocol, Executor, RatePolicy>::
170 using executor_type = Executor2;
173 boost::weak_ptr<impl_type> wp;
177 executor_type get_executor() const noexcept
183 operator()(error_code ec)
186 if(ec == net::error::operation_aborted)
197 if(tick < state.tick)
199 BOOST_ASSERT(tick == state.tick);
202 BOOST_ASSERT(! state.timeout);
204 state.timeout = true;
208 //------------------------------------------------------------------------------
210 template<class Protocol, class Executor, class RatePolicy>
211 struct basic_stream<Protocol, Executor, RatePolicy>::ops
214 template<bool isRead, class Buffers, class Handler>
216 : public async_base<Handler, Executor>
217 , public boost::asio::coroutine
219 boost::shared_ptr<impl_type> impl_;
223 using is_read = std::integral_constant<bool, isRead>;
238 return rate_policy_access::
239 available_read_bytes(impl_->policy());
241 return rate_policy_access::
242 available_write_bytes(impl_->policy());
246 transfer_bytes(std::size_t n)
250 transfer_read_bytes(impl_->policy(), n);
253 transfer_write_bytes(impl_->policy(), n);
258 std::size_t amount, std::true_type)
260 impl_->socket.async_read_some(
261 beast::buffers_prefix(amount, b_),
267 std::size_t amount, std::false_type)
269 impl_->socket.async_write_some(
270 beast::buffers_prefix(amount, b_),
275 template<class Handler_>
280 : async_base<Handler, Executor>(
281 std::forward<Handler_>(h), s.get_executor())
283 , pg_(state().pending)
292 std::size_t bytes_transferred = 0)
294 BOOST_ASIO_CORO_REENTER(*this)
296 // handle empty buffers
297 if(detail::buffers_empty(b_))
299 // make sure we perform the no-op
300 BOOST_ASIO_CORO_YIELD
301 async_perform(0, is_read{});
302 // apply the timeout manually, otherwise
303 // behavior varies across platforms.
304 if(state().timer.expiry() <= clock_type::now())
307 ec = beast::error::timeout;
312 // if a timeout is active, wait on the timer
313 if(state().timer.expiry() != never())
314 state().timer.async_wait(
315 timeout_handler<decltype(this->get_executor())>{
319 this->get_executor()});
321 // check rate limit, maybe wait
323 amount = available_bytes();
327 BOOST_ASIO_CORO_YIELD
328 impl_->timer.async_wait(std::move(*this));
331 // socket was closed, or a timeout
333 net::error::operation_aborted);
334 // timeout handler invoked?
337 // yes, socket already closed
338 ec = beast::error::timeout;
339 state().timeout = false;
343 impl_->on_timer(this->get_executor());
345 // Allow at least one byte, otherwise
346 // bytes_transferred could be 0.
347 amount = std::max<std::size_t>(
348 available_bytes(), 1);
351 BOOST_ASIO_CORO_YIELD
352 async_perform(amount, is_read{});
354 if(state().timer.expiry() != never())
358 // try cancelling timer
360 state().timer.cancel();
363 // timeout handler invoked?
366 // yes, socket already closed
367 ec = beast::error::timeout;
368 state().timeout = false;
373 BOOST_ASSERT(n == 1);
374 BOOST_ASSERT(! state().timeout);
380 transfer_bytes(bytes_transferred);
381 this->complete_now(ec, bytes_transferred);
386 template<class Handler>
388 : public async_base<Handler, Executor>
390 boost::shared_ptr<impl_type> impl_;
401 template<class Handler_>
406 : async_base<Handler, Executor>(
407 std::forward<Handler_>(h), s.get_executor())
409 , pg0_(impl_->read.pending)
410 , pg1_(impl_->write.pending)
412 if(state().timer.expiry() != stream_base::never())
413 impl_->write.timer.async_wait(
414 timeout_handler<decltype(this->get_executor())>{
418 this->get_executor()});
420 impl_->socket.async_connect(
421 ep, std::move(*this));
422 // *this is now moved-from
426 class Endpoints, class Condition,
431 Endpoints const& eps,
432 Condition const& cond)
433 : async_base<Handler, Executor>(
434 std::forward<Handler_>(h), s.get_executor())
436 , pg0_(impl_->read.pending)
437 , pg1_(impl_->write.pending)
439 if(state().timer.expiry() != stream_base::never())
440 impl_->write.timer.async_wait(
441 timeout_handler<decltype(this->get_executor())>{
445 this->get_executor()});
447 net::async_connect(impl_->socket,
448 eps, cond, std::move(*this));
449 // *this is now moved-from
453 class Iterator, class Condition,
458 Iterator begin, Iterator end,
459 Condition const& cond)
460 : async_base<Handler, Executor>(
461 std::forward<Handler_>(h), s.get_executor())
463 , pg0_(impl_->read.pending)
464 , pg1_(impl_->write.pending)
466 if(state().timer.expiry() != stream_base::never())
467 impl_->write.timer.async_wait(
468 timeout_handler<decltype(this->get_executor())>{
472 this->get_executor()});
474 net::async_connect(impl_->socket,
475 begin, end, cond, std::move(*this));
476 // *this is now moved-from
479 template<class... Args>
481 operator()(error_code ec, Args&&... args)
483 if(state().timer.expiry() != stream_base::never())
487 // try cancelling timer
489 impl_->write.timer.cancel();
492 // timeout handler invoked?
495 // yes, socket already closed
496 ec = beast::error::timeout;
497 state().timeout = false;
502 BOOST_ASSERT(n == 1);
503 BOOST_ASSERT(! state().timeout);
509 this->complete_now(ec, std::forward<Args>(args)...);
515 template<class ReadHandler, class Buffers>
522 // If you get an error on the following line it means
523 // that your handler does not meet the documented type
524 // requirements for the handler.
527 detail::is_invocable<ReadHandler,
528 void(error_code, std::size_t)>::value,
529 "ReadHandler type requirements not met");
534 typename std::decay<ReadHandler>::type>(
535 std::forward<ReadHandler>(h), *s, b);
541 template<class WriteHandler, class Buffers>
548 // If you get an error on the following line it means
549 // that your handler does not meet the documented type
550 // requirements for the handler.
553 detail::is_invocable<WriteHandler,
554 void(error_code, std::size_t)>::value,
555 "WriteHandler type requirements not met");
560 typename std::decay<WriteHandler>::type>(
561 std::forward<WriteHandler>(h), *s, b);
565 struct run_connect_op
567 template<class ConnectHandler>
572 endpoint_type const& ep)
574 // If you get an error on the following line it means
575 // that your handler does not meet the documented type
576 // requirements for the handler.
579 detail::is_invocable<ConnectHandler,
580 void(error_code)>::value,
581 "ConnectHandler type requirements not met");
583 connect_op<typename std::decay<ConnectHandler>::type>(
584 std::forward<ConnectHandler>(h), *s, ep);
588 struct run_connect_range_op
591 class RangeConnectHandler,
592 class EndpointSequence,
596 RangeConnectHandler&& h,
598 EndpointSequence const& eps,
599 Condition const& cond)
601 // If you get an error on the following line it means
602 // that your handler does not meet the documented type
603 // requirements for the handler.
606 detail::is_invocable<RangeConnectHandler,
607 void(error_code, typename Protocol::endpoint)>::value,
608 "RangeConnectHandler type requirements not met");
610 connect_op<typename std::decay<RangeConnectHandler>::type>(
611 std::forward<RangeConnectHandler>(h), *s, eps, cond);
615 struct run_connect_iter_op
618 class IteratorConnectHandler,
623 IteratorConnectHandler&& h,
625 Iterator begin, Iterator end,
626 Condition const& cond)
628 // If you get an error on the following line it means
629 // that your handler does not meet the documented type
630 // requirements for the handler.
633 detail::is_invocable<IteratorConnectHandler,
634 void(error_code, Iterator)>::value,
635 "IteratorConnectHandler type requirements not met");
637 connect_op<typename std::decay<IteratorConnectHandler>::type>(
638 std::forward<IteratorConnectHandler>(h), *s, begin, end, cond);
644 //------------------------------------------------------------------------------
646 template<class Protocol, class Executor, class RatePolicy>
647 basic_stream<Protocol, Executor, RatePolicy>::
650 // the shared object can outlive *this,
651 // cancel any operations so the shared
652 // object is destroyed as soon as possible.
656 template<class Protocol, class Executor, class RatePolicy>
657 template<class Arg0, class... Args, class>
658 basic_stream<Protocol, Executor, RatePolicy>::
659 basic_stream(Arg0&& arg0, Args&&... args)
660 : impl_(boost::make_shared<impl_type>(
662 std::forward<Arg0>(arg0),
663 std::forward<Args>(args)...))
667 template<class Protocol, class Executor, class RatePolicy>
668 template<class RatePolicy_, class Arg0, class... Args, class>
669 basic_stream<Protocol, Executor, RatePolicy>::
671 RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
672 : impl_(boost::make_shared<impl_type>(
674 std::forward<RatePolicy_>(policy),
675 std::forward<Arg0>(arg0),
676 std::forward<Args>(args)...))
680 template<class Protocol, class Executor, class RatePolicy>
681 basic_stream<Protocol, Executor, RatePolicy>::
682 basic_stream(basic_stream&& other)
683 : impl_(boost::make_shared<impl_type>(
684 std::move(*other.impl_)))
686 // VFALCO I'm not sure this implementation is correct...
689 //------------------------------------------------------------------------------
691 template<class Protocol, class Executor, class RatePolicy>
693 basic_stream<Protocol, Executor, RatePolicy>::
698 return std::move(impl_->socket);
701 template<class Protocol, class Executor, class RatePolicy>
703 basic_stream<Protocol, Executor, RatePolicy>::
704 expires_after(std::chrono::nanoseconds expiry_time)
706 // If assert goes off, it means that there are
707 // already read or write (or connect) operations
708 // outstanding, so there is nothing to apply
709 // the expiration time to!
712 ! impl_->read.pending ||
713 ! impl_->write.pending);
715 if(! impl_->read.pending)
717 impl_->read.timer.expires_after(
720 if(! impl_->write.pending)
722 impl_->write.timer.expires_after(
726 template<class Protocol, class Executor, class RatePolicy>
728 basic_stream<Protocol, Executor, RatePolicy>::
730 net::steady_timer::time_point expiry_time)
732 // If assert goes off, it means that there are
733 // already read or write (or connect) operations
734 // outstanding, so there is nothing to apply
735 // the expiration time to!
738 ! impl_->read.pending ||
739 ! impl_->write.pending);
741 if(! impl_->read.pending)
743 impl_->read.timer.expires_at(
746 if(! impl_->write.pending)
748 impl_->write.timer.expires_at(
752 template<class Protocol, class Executor, class RatePolicy>
754 basic_stream<Protocol, Executor, RatePolicy>::
760 template<class Protocol, class Executor, class RatePolicy>
762 basic_stream<Protocol, Executor, RatePolicy>::
766 impl_->socket.cancel(ec);
767 impl_->timer.cancel();
770 template<class Protocol, class Executor, class RatePolicy>
772 basic_stream<Protocol, Executor, RatePolicy>::
778 //------------------------------------------------------------------------------
780 template<class Protocol, class Executor, class RatePolicy>
781 template<class ConnectHandler>
782 BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
783 basic_stream<Protocol, Executor, RatePolicy>::
785 endpoint_type const& ep,
786 ConnectHandler&& handler)
788 return net::async_initiate<
791 typename ops::run_connect_op{},
797 template<class Protocol, class Executor, class RatePolicy>
799 class EndpointSequence,
800 class RangeConnectHandler,
802 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
803 basic_stream<Protocol, Executor, RatePolicy>::
805 EndpointSequence const& endpoints,
806 RangeConnectHandler&& handler)
808 return net::async_initiate<
810 void(error_code, typename Protocol::endpoint)>(
811 typename ops::run_connect_range_op{},
815 detail::any_endpoint{});
818 template<class Protocol, class Executor, class RatePolicy>
820 class EndpointSequence,
821 class ConnectCondition,
822 class RangeConnectHandler,
824 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
825 basic_stream<Protocol, Executor, RatePolicy>::
827 EndpointSequence const& endpoints,
828 ConnectCondition connect_condition,
829 RangeConnectHandler&& handler)
831 return net::async_initiate<
833 void(error_code, typename Protocol::endpoint)>(
834 typename ops::run_connect_range_op{},
841 template<class Protocol, class Executor, class RatePolicy>
844 class IteratorConnectHandler>
845 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
846 basic_stream<Protocol, Executor, RatePolicy>::
848 Iterator begin, Iterator end,
849 IteratorConnectHandler&& handler)
851 return net::async_initiate<
852 IteratorConnectHandler,
853 void(error_code, Iterator)>(
854 typename ops::run_connect_iter_op{},
858 detail::any_endpoint{});
861 template<class Protocol, class Executor, class RatePolicy>
864 class ConnectCondition,
865 class IteratorConnectHandler>
866 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
867 basic_stream<Protocol, Executor, RatePolicy>::
869 Iterator begin, Iterator end,
870 ConnectCondition connect_condition,
871 IteratorConnectHandler&& handler)
873 return net::async_initiate<
874 IteratorConnectHandler,
875 void(error_code, Iterator)>(
876 typename ops::run_connect_iter_op{},
883 //------------------------------------------------------------------------------
885 template<class Protocol, class Executor, class RatePolicy>
886 template<class MutableBufferSequence, class ReadHandler>
887 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
888 basic_stream<Protocol, Executor, RatePolicy>::
890 MutableBufferSequence const& buffers,
891 ReadHandler&& handler)
893 static_assert(net::is_mutable_buffer_sequence<
894 MutableBufferSequence>::value,
895 "MutableBufferSequence type requirements not met");
896 return net::async_initiate<
898 void(error_code, std::size_t)>(
899 typename ops::run_read_op{},
905 template<class Protocol, class Executor, class RatePolicy>
906 template<class ConstBufferSequence, class WriteHandler>
907 BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
908 basic_stream<Protocol, Executor, RatePolicy>::
910 ConstBufferSequence const& buffers,
911 WriteHandler&& handler)
913 static_assert(net::is_const_buffer_sequence<
914 ConstBufferSequence>::value,
915 "ConstBufferSequence type requirements not met");
916 return net::async_initiate<
918 void(error_code, std::size_t)>(
919 typename ops::run_write_op{},
925 //------------------------------------------------------------------------------
927 // Customization points
930 #if ! BOOST_BEAST_DOXYGEN
933 class Protocol, class Executor, class RatePolicy>
936 basic_stream<Protocol, Executor, RatePolicy>& stream)
939 stream.socket().close(ec);
943 class Protocol, class Executor, class RatePolicy>
947 basic_stream<Protocol, Executor, RatePolicy>& stream,
950 using beast::websocket::teardown;
951 teardown(role, stream.socket(), ec);
955 class Protocol, class Executor, class RatePolicy,
956 class TeardownHandler>
960 basic_stream<Protocol, Executor, RatePolicy>& stream,
961 TeardownHandler&& handler)
963 using beast::websocket::async_teardown;
964 async_teardown(role, stream.socket(),
965 std::forward<TeardownHandler>(handler));