2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // Official repository: https://github.com/boostorg/beast
10 #include "snippets.hpp"
12 #include <boost/beast/_experimental/unit_test/suite.hpp>
13 #include <boost/beast/_experimental/test/stream.hpp>
14 #include <boost/beast/core/async_base.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/core/error.hpp>
17 #include <boost/beast/core/flat_buffer.hpp>
18 #include <boost/beast/core/stream_traits.hpp>
19 #include <boost/beast/core/tcp_stream.hpp>
20 #include <boost/beast/http.hpp>
21 #include <boost/beast/ssl/ssl_stream.hpp>
22 #include <boost/asio/buffer.hpp>
23 #include <boost/asio/read.hpp>
24 #include <boost/asio/spawn.hpp>
36 template<class... Args>
37 void operator()(Args&&...)
43 core_3_timeouts_snippets()
47 #include "snippets.ipp"
50 //[code_core_3_timeouts_1
52 // `ioc` will be used to dispatch completion handlers
53 tcp_stream stream(ioc);
59 //[code_core_3_timeouts_2
61 // The resolver is used to look up the IP addresses for a domain name
62 net::ip::tcp::resolver resolver(ioc);
64 // The stream will use the same executor as the resolver
65 tcp_stream stream(resolver.get_executor());
71 //[code_core_3_timeouts_3
73 // The strand will be used to invoke all completion handlers
74 tcp_stream stream(net::make_strand(ioc));
78 net::ip::tcp::resolver resolver(ioc);
80 //[code_core_3_timeouts_4
82 // Set the logical operation timer to 30 seconds
83 stream.expires_after (std::chrono::seconds(30));
85 // If the connection is not established within 30 seconds,
86 // the operation will be canceled and the handler will receive
87 // error::timeout as the error code.
89 stream.async_connect(resolver.resolve("www.example.com", "http"),
90 [](error_code ec, net::ip::tcp::endpoint ep)
92 if(ec == error::timeout)
93 std::cerr << "async_connect took too long\n";
95 std::cout << "Connected to " << ep << "\n";
99 // The timer is still running. If we don't want the next
100 // operation to time out 30 seconds relative to the previous
101 // call to `expires_after`, we need to turn it off before
102 // starting another asynchronous operation.
104 stream.expires_never();
110 //[code_core_3_timeouts_5
112 // The acceptor is used to listen and accept incoming connections.
113 // We construct the acceptor to use a new strand, and listen
114 // on the loopback address with an operating-system assigned port.
116 net::ip::tcp::acceptor acceptor(net::make_strand(ioc));
117 acceptor.bind(net::ip::tcp::endpoint(net::ip::make_address_v4("127.0.0.1"), 0));
120 // This blocks until a new incoming connection is established.
121 // Upon success, the function returns a new socket which is
122 // connected to the peer. The socket will have its own executor,
123 // which in the call below is a new strand for the I/O context.
125 net::ip::tcp::socket s = acceptor.accept(net::make_strand(ioc));
127 // Construct a new tcp_stream from the connected socket.
128 // The stream will use the strand created when the connection
131 tcp_stream stream(std::move(s));
136 tcp_stream stream(ioc);
138 //[code_core_3_timeouts_6
142 // Set the logical operation timer to 30 seconds.
143 stream.expires_after (std::chrono::seconds(30));
145 // Read a line from the stream into the string.
146 net::async_read_until(stream, net::dynamic_buffer(s), '\n',
147 [&s, &stream](error_code ec, std::size_t bytes_transferred)
152 // read_until can read past the '\n', these will end up in
153 // our buffer but we don't want to echo those extra received
154 // bytes. `bytes_transferred` will be the number of bytes
155 // up to and including the '\n'. We use `buffers_prefix` so
156 // that extra data is not written.
158 net::async_write(stream, buffers_prefix(bytes_transferred, net::buffer(s)),
159 [&s](error_code ec, std::size_t bytes_transferred)
161 // Consume the line from the buffer
162 s.erase(s.begin(), s.begin() + bytes_transferred);
165 std::cerr << "Error: " << ec.message() << "\n";
172 tcp_stream stream(ioc);
174 //[code_core_3_timeouts_7
179 // Set the logical operation timer to 15 seconds.
180 stream.expires_after (std::chrono::seconds(15));
182 // Read another line from the stream into our dynamic buffer.
183 // The operation will time out after 15 seconds.
185 net::async_read_until(stream, net::dynamic_buffer(s1), '\n', handler);
187 // Set the logical operation timer to 30 seconds.
188 stream.expires_after (std::chrono::seconds(30));
190 // Write the contents of the other buffer.
191 // This operation will time out after 30 seconds.
193 net::async_write(stream, net::buffer(s2), handler);
199 //[code_core_3_timeouts_8
201 // To declare a stream with a rate policy, it is necessary to
202 // write out all of the template parameter types.
204 // `simple_rate_policy` is default constructible, but
205 // if the choice of RatePolicy is not DefaultConstructible,
206 // an instance of the type may be passed to the constructor.
208 basic_stream<net::ip::tcp, net::executor, simple_rate_policy> stream(ioc);
210 // The policy object, which is default constructed, or
211 // decay-copied upon construction, is attached to the stream
212 // and may be accessed through the function `rate_policy`.
214 // Here we set individual rate limits for reading and writing
216 stream.rate_policy().read_limit(10000); // bytes per second
218 stream.rate_policy().write_limit(850000); // bytes per second
223 //[code_core_3_timeouts_1f
225 /** This function echoes back received lines from a peer, with a timeout.
227 The algorithm terminates upon any error (including timeout).
229 template <class Protocol, class Executor>
230 void do_async_echo (basic_stream<Protocol, Executor>& stream)
232 // This object will hold our state when reading the line.
236 basic_stream<Protocol, Executor>& stream;
238 // The shared pointer is used to extend the lifetime of the
239 // string until the last asynchronous operation completes.
240 std::shared_ptr<std::string> s;
242 // This starts a new operation to read and echo a line
245 // If a line is not sent and received within 30 seconds, then
246 // the connection will be closed and this algorithm will terminate.
248 stream.expires_after(std::chrono::seconds(30));
250 // Read a line from the stream into our dynamic buffer, with a timeout
251 net::async_read_until(stream, net::dynamic_buffer(*s), '\n', std::move(*this));
254 // This function is called when the read completes
255 void operator()(error_code ec, std::size_t bytes_transferred)
260 net::async_write(stream, buffers_prefix(bytes_transferred, net::buffer(*s)),
261 [this](error_code ec, std::size_t bytes_transferred)
263 s->erase(s->begin(), s->begin() + bytes_transferred);
267 // Run this algorithm again
268 echo_line{stream, std::move(s)}();
272 std::cerr << "Error: " << ec.message() << "\n";
278 // Create the operation and run it
279 echo_line{stream, std::make_shared<std::string>()}();
284 //[code_core_3_timeouts_2f
286 /** Request an HTTP resource from a TLS host and return it as a string, with a timeout.
288 This example uses fibers (stackful coroutines) and its own I/O context.
291 https_get (std::string const& host, std::string const& target, error_code& ec)
293 // It is the responsibility of the algorithm to clear the error first.
296 // We use our own I/O context, to make this function blocking.
299 // This context is used to hold client and server certificates.
300 // We do not perform certificate verification in this example.
302 net::ssl::context ctx(net::ssl::context::tlsv12);
304 // This string will hold the body of the HTTP response, if any.
307 // Note that Networking TS does not come with spawn. This function
308 // launches a "fiber" which is a coroutine that has its own separately
311 boost::asio::spawn(ioc,
312 [&](boost::asio::yield_context yield)
314 // We use the Beast ssl_stream wrapped around a beast tcp_stream.
315 ssl_stream<tcp_stream> stream(ioc, ctx);
317 // The resolver will be used to look up the IP addresses for the host name
318 net::ip::tcp::resolver resolver(ioc);
320 // First, look up the name. Networking has its own timeout for this.
321 // The `yield` object is a CompletionToken which specializes the
322 // `net::async_result` customization point to make the fiber work.
324 // This call will appear to "block" until the operation completes.
325 // It isn't really blocking. Instead, the fiber implementation saves
326 // the call stack and suspends the function until the asynchronous
327 // operation is complete. Then it restores the call stack, and resumes
328 // the function to the statement following the async_resolve. This
329 // allows an asynchronous algorithm to be expressed synchronously.
331 auto const endpoints = resolver.async_resolve(host, "https", {}, yield[ec]);
335 // The function `get_lowest_layer` retrieves the "bottom most" object
336 // in the stack of stream layers. In this case it will be the tcp_stream.
337 // This timeout will apply to all subsequent operations collectively.
338 // That is to say, they must all complete within the same 30 second
341 get_lowest_layer(stream).expires_after(std::chrono::seconds(30));
343 // `tcp_stream` range connect algorithms are member functions, unlike net::
344 get_lowest_layer(stream).async_connect(endpoints, yield[ec]);
348 // Perform the TLS handshake
349 stream.async_handshake(net::ssl::stream_base::client, yield[ec]);
353 // Send an HTTP GET request for the target
355 http::request<http::empty_body> req;
356 req.method(http::verb::get);
359 req.set(http::field::host, host);
360 req.set(http::field::user_agent, "Beast");
361 http::async_write(stream, req, yield[ec]);
366 // Now read the response
368 http::response<http::string_body> res;
369 http::async_read(stream, buffer, res, yield[ec]);
373 // Try to perform the TLS shutdown handshake
374 stream.async_shutdown(yield[ec]);
376 // `net::ssl::error::stream_truncated`, also known as an SSL "short read",
377 // indicates the peer closed the connection without performing the
378 // required closing handshake (for example, Google does this to
379 // improve performance). Generally this can be a security issue,
380 // but if your communication protocol is self-terminated (as
381 // it is with both HTTP and WebSocket) then you may simply
382 // ignore the lack of close_notify:
384 // https://github.com/boostorg/beast/issues/38
386 // https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown
388 // When a short read would cut off the end of an HTTP message,
389 // Beast returns the error beast::http::error::partial_message.
390 // Therefore, if we see a short read here, it has occurred
391 // after the message has been completed, so it is safe to ignore it.
393 if(ec == net::ssl::error::stream_truncated)
398 // Set the string to return to the caller
399 result = std::move(res.body());
402 // `run` will dispatch completion handlers, and block until there is
403 // no more "work" remaining. When this call returns, the operations
404 // are complete and we can give the caller the result.
412 //[code_core_3_timeouts_3f
416 std::size_t value_ = 0;
418 // The size of the exponential window, in seconds.
419 // This should be a power of two.
421 static std::size_t constexpr Window = 4;
424 /** Returns the number of elapsed seconds since the given time, and adjusts the time.
426 This function returns the number of elapsed seconds since the
427 specified time point, rounding down. It also moves the specified
428 time point forward by the number of elapsed seconds.
430 @param since The time point from which to calculate elapsed time.
431 The function will modify the value, by adding the number of elapsed
434 @return The number of elapsed seconds.
436 template<class Clock, class Duration>
439 get_elapsed(std::chrono::time_point<Clock, Duration>& since) noexcept
441 auto const elapsed = std::chrono::duration_cast<
442 std::chrono::seconds>(Clock::now() - since);
447 /// Returns the current value, after adding the given sample.
449 update(std::size_t sample, std::chrono::seconds elapsed) noexcept
451 // Apply exponential decay.
453 // This formula is fast (no division or multiplication) but inaccurate.
454 // It overshoots by `n*(1-a)/(1-a^n), where a=(window-1)/window`.
455 // Could be good enough for a rough approximation, but if relying
456 // on this for production please perform tests!
458 auto count = elapsed.count();
460 value_ -= (value_ + Window - 1) / Window;
462 return value_ / Window;
464 /// Returns the current value
466 value() const noexcept
468 return value_ / Window;
474 //[code_core_3_timeouts_4f
476 /** A RatePolicy to measure instantaneous throughput.
478 This measures the rate of transfer for reading and writing
479 using a simple exponential decay function.
483 // The clock used to measure elapsed time
484 using clock_type = std::chrono::steady_clock;
486 // This implements an exponential smoothing window function.
487 // The value `Seconds` is the size of the window in seconds.
489 clock_type::time_point when_;
490 std::size_t read_bytes_ = 0;
491 std::size_t write_bytes_ = 0;
493 window write_window_;
495 // Friending this type allows us to mark the
496 // member functions required by RatePolicy as private.
497 friend class rate_policy_access;
499 // Returns the number of bytes available to read currently
500 // Required by RatePolicy
502 available_read_bytes() const noexcept
505 return (std::numeric_limits<std::size_t>::max)();
508 // Returns the number of bytes available to write currently
509 // Required by RatePolicy
511 available_write_bytes() const noexcept
514 return (std::numeric_limits<std::size_t>::max)();
517 // Called every time bytes are read
518 // Required by RatePolicy
520 transfer_read_bytes(std::size_t n) noexcept
522 // Add this to our running total of bytes read
526 // Called every time bytes are written
527 // Required by RatePolicy
529 transfer_write_bytes(std::size_t n) noexcept
531 // Add this to our running total of bytes written
535 // Called approximately once per second
536 // Required by RatePolicy
540 // Calculate elapsed time in seconds, and adjust our time point
541 auto const elapsed = window::get_elapsed(when_);
543 // Skip the update when elapsed==0,
544 // otherwise the measurement will have jitter
545 if(elapsed.count() == 0)
548 // Add our samples and apply exponential decay
549 read_window_.update(read_bytes_, elapsed);
550 write_window_.update(write_bytes_, elapsed);
552 // Reset our counts of bytes transferred
559 : when_(clock_type::now())
563 /// Returns the current rate of reading in bytes per second
565 read_bytes_per_second() const noexcept
567 return read_window_.value();
570 /// Returns the current rate of writing in bytes per second
572 write_bytes_per_second() const noexcept
574 return write_window_.value();
581 core_3_timeouts_snippets2()
583 #include "snippets.ipp"
586 //[code_core_3_timeouts_9
588 // This stream will use our new rate_gauge policy
589 basic_stream<net::ip::tcp, net::executor, rate_gauge> stream(ioc);
593 // Print the current rates
595 stream.rate_policy().read_bytes_per_second() << " bytes/second read\n" <<
596 stream.rate_policy().write_bytes_per_second() << " bytes/second written\n";
603 template class basic_stream<net::ip::tcp, net::executor, rate_gauge>;
605 struct core_3_timeouts_test
606 : public beast::unit_test::suite
612 std::size_t v0 = w.value();
613 std::size_t const N = 100000;
614 for(std::size_t n = 1; n <= 2; ++n)
616 for(std::size_t i = 0;;++i)
618 auto const v = w.update(n * N, std::chrono::seconds(n));
624 "update(" << n*N << ", " << n <<
625 ") converged to " << w.value() <<
626 " in " << i << std::endl;
645 BEAST_EXPECT(&core_3_timeouts_snippets);
646 BEAST_EXPECT(&core_3_timeouts_snippets2);
647 BEAST_EXPECT((&do_async_echo<net::ip::tcp, net::io_context::executor_type>));
648 BEAST_EXPECT(&https_get);
652 BEAST_DEFINE_TESTSUITE(beast,doc,core_3_timeouts);