5 // Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
16 #include <boost/bind.hpp>
17 #include <boost/shared_ptr.hpp>
18 #include <boost/enable_shared_from_this.hpp>
19 #include <boost/asio/deadline_timer.hpp>
20 #include <boost/asio/io_service.hpp>
21 #include <boost/asio/ip/tcp.hpp>
22 #include <boost/asio/ip/udp.hpp>
23 #include <boost/asio/read_until.hpp>
24 #include <boost/asio/streambuf.hpp>
25 #include <boost/asio/write.hpp>
27 using boost::asio::deadline_timer;
28 using boost::asio::ip::tcp;
29 using boost::asio::ip::udp;
31 //----------------------------------------------------------------------
36 virtual ~subscriber() {}
37 virtual void deliver(const std::string& msg) = 0;
40 typedef boost::shared_ptr<subscriber> subscriber_ptr;
42 //----------------------------------------------------------------------
47 void join(subscriber_ptr subscriber)
49 subscribers_.insert(subscriber);
52 void leave(subscriber_ptr subscriber)
54 subscribers_.erase(subscriber);
57 void deliver(const std::string& msg)
59 std::for_each(subscribers_.begin(), subscribers_.end(),
60 boost::bind(&subscriber::deliver, _1, boost::ref(msg)));
64 std::set<subscriber_ptr> subscribers_;
67 //----------------------------------------------------------------------
70 // This class manages socket timeouts by applying the concept of a deadline.
71 // Some asynchronous operations are given deadlines by which they must complete.
72 // Deadlines are enforced by two "actors" that persist for the lifetime of the
73 // session object, one for input and one for output:
75 // +----------------+ +----------------+
77 // | check_deadline |<---+ | check_deadline |<---+
78 // | | | async_wait() | | | async_wait()
79 // +----------------+ | on input +----------------+ | on output
80 // | | deadline | | deadline
81 // +---------+ +---------+
83 // If either deadline actor determines that the corresponding deadline has
84 // expired, the socket is closed and any outstanding operations are cancelled.
86 // The input actor reads messages from the socket, where messages are delimited
87 // by the newline character:
91 // | start_read |<---+
95 // async_- | +-------------+
97 // until() +--->| handle_read |
101 // The deadline for receiving a complete message is 30 seconds. If a non-empty
102 // message is received, it is delivered to all subscribers. If a heartbeat (a
103 // message that consists of a single newline character) is received, a heartbeat
104 // is enqueued for the client, provided there are no other messages waiting to
107 // The output actor is responsible for sending messages to the client:
110 // | |<---------------------+
111 // | await_output | |
113 // +--------------+ | |
114 // | | | async_wait() |
117 // +-------------+ +--------------+
118 // | | async_write() | |
119 // | start_write |-------------->| handle_write |
121 // +-------------+ +--------------+
123 // The output actor first waits for an output message to be enqueued. It does
124 // this by using a deadline_timer as an asynchronous condition variable. The
125 // deadline_timer will be signalled whenever the output queue is non-empty.
127 // Once a message is available, it is sent to the client. The deadline for
128 // sending a complete message is 30 seconds. After the message is successfully
129 // sent, the output actor again waits for the output queue to become non-empty.
133 public boost::enable_shared_from_this<tcp_session>
136 tcp_session(boost::asio::io_service& io_service, channel& ch)
139 input_deadline_(io_service),
140 non_empty_output_queue_(io_service),
141 output_deadline_(io_service)
143 input_deadline_.expires_at(boost::posix_time::pos_infin);
144 output_deadline_.expires_at(boost::posix_time::pos_infin);
146 // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever
147 // the output queue is empty. This ensures that the output actor stays
148 // asleep until a message is put into the queue.
149 non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
152 tcp::socket& socket()
157 // Called by the server object to initiate the four actors.
160 channel_.join(shared_from_this());
164 input_deadline_.async_wait(
165 boost::bind(&tcp_session::check_deadline,
166 shared_from_this(), &input_deadline_));
170 output_deadline_.async_wait(
171 boost::bind(&tcp_session::check_deadline,
172 shared_from_this(), &output_deadline_));
178 channel_.leave(shared_from_this());
180 boost::system::error_code ignored_ec;
181 socket_.close(ignored_ec);
182 input_deadline_.cancel();
183 non_empty_output_queue_.cancel();
184 output_deadline_.cancel();
189 return !socket_.is_open();
192 void deliver(const std::string& msg)
194 output_queue_.push_back(msg + "\n");
196 // Signal that the output queue contains messages. Modifying the expiry
197 // will wake the output actor, if it is waiting on the timer.
198 non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
203 // Set a deadline for the read operation.
204 input_deadline_.expires_from_now(boost::posix_time::seconds(30));
206 // Start an asynchronous operation to read a newline-delimited message.
207 boost::asio::async_read_until(socket_, input_buffer_, '\n',
208 boost::bind(&tcp_session::handle_read, shared_from_this(), _1));
211 void handle_read(const boost::system::error_code& ec)
218 // Extract the newline-delimited message from the buffer.
220 std::istream is(&input_buffer_);
221 std::getline(is, msg);
225 channel_.deliver(msg);
229 // We received a heartbeat message from the client. If there's nothing
230 // else being sent or ready to be sent, send a heartbeat right back.
231 if (output_queue_.empty())
233 output_queue_.push_back("\n");
235 // Signal that the output queue contains messages. Modifying the
236 // expiry will wake the output actor, if it is waiting on the timer.
237 non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
254 if (output_queue_.empty())
256 // There are no messages that are ready to be sent. The actor goes to
257 // sleep by waiting on the non_empty_output_queue_ timer. When a new
258 // message is added, the timer will be modified and the actor will wake.
259 non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
260 non_empty_output_queue_.async_wait(
261 boost::bind(&tcp_session::await_output, shared_from_this()));
271 // Set a deadline for the write operation.
272 output_deadline_.expires_from_now(boost::posix_time::seconds(30));
274 // Start an asynchronous operation to send a message.
275 boost::asio::async_write(socket_,
276 boost::asio::buffer(output_queue_.front()),
277 boost::bind(&tcp_session::handle_write, shared_from_this(), _1));
280 void handle_write(const boost::system::error_code& ec)
287 output_queue_.pop_front();
297 void check_deadline(deadline_timer* deadline)
302 // Check whether the deadline has passed. We compare the deadline against
303 // the current time since a new asynchronous operation may have moved the
304 // deadline before this actor had a chance to run.
305 if (deadline->expires_at() <= deadline_timer::traits_type::now())
307 // The deadline has passed. Stop the session. The other actors will
308 // terminate as soon as possible.
313 // Put the actor back to sleep.
314 deadline->async_wait(
315 boost::bind(&tcp_session::check_deadline,
316 shared_from_this(), deadline));
322 boost::asio::streambuf input_buffer_;
323 deadline_timer input_deadline_;
324 std::deque<std::string> output_queue_;
325 deadline_timer non_empty_output_queue_;
326 deadline_timer output_deadline_;
329 typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
331 //----------------------------------------------------------------------
333 class udp_broadcaster
337 udp_broadcaster(boost::asio::io_service& io_service,
338 const udp::endpoint& broadcast_endpoint)
339 : socket_(io_service)
341 socket_.connect(broadcast_endpoint);
345 void deliver(const std::string& msg)
347 boost::system::error_code ignored_ec;
348 socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
354 //----------------------------------------------------------------------
359 server(boost::asio::io_service& io_service,
360 const tcp::endpoint& listen_endpoint,
361 const udp::endpoint& broadcast_endpoint)
362 : io_service_(io_service),
363 acceptor_(io_service, listen_endpoint)
365 subscriber_ptr bc(new udp_broadcaster(io_service_, broadcast_endpoint));
373 tcp_session_ptr new_session(new tcp_session(io_service_, channel_));
375 acceptor_.async_accept(new_session->socket(),
376 boost::bind(&server::handle_accept, this, new_session, _1));
379 void handle_accept(tcp_session_ptr session,
380 const boost::system::error_code& ec)
391 boost::asio::io_service& io_service_;
392 tcp::acceptor acceptor_;
396 //----------------------------------------------------------------------
398 int main(int argc, char* argv[])
402 using namespace std; // For atoi.
406 std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
410 boost::asio::io_service io_service;
412 tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
414 udp::endpoint broadcast_endpoint(
415 boost::asio::ip::address::from_string(argv[2]), atoi(argv[3]));
417 server s(io_service, listen_endpoint, broadcast_endpoint);
421 catch (std::exception& e)
423 std::cerr << "Exception: " << e.what() << "\n";