Imported Upstream version 1.57.0
[platform/upstream/boost.git] / doc / html / boost_asio / example / cpp03 / timeouts / server.cpp
1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <set>
16 #include <boost/bind.hpp>
17 #include <boost/shared_ptr.hpp>
18 #include <boost/enable_shared_from_this.hpp>
19 #include <boost/asio/deadline_timer.hpp>
20 #include <boost/asio/io_service.hpp>
21 #include <boost/asio/ip/tcp.hpp>
22 #include <boost/asio/ip/udp.hpp>
23 #include <boost/asio/read_until.hpp>
24 #include <boost/asio/streambuf.hpp>
25 #include <boost/asio/write.hpp>
26
27 using boost::asio::deadline_timer;
28 using boost::asio::ip::tcp;
29 using boost::asio::ip::udp;
30
31 //----------------------------------------------------------------------
32
33 class subscriber
34 {
35 public:
36   virtual ~subscriber() {}
37   virtual void deliver(const std::string& msg) = 0;
38 };
39
40 typedef boost::shared_ptr<subscriber> subscriber_ptr;
41
42 //----------------------------------------------------------------------
43
44 class channel
45 {
46 public:
47   void join(subscriber_ptr subscriber)
48   {
49     subscribers_.insert(subscriber);
50   }
51
52   void leave(subscriber_ptr subscriber)
53   {
54     subscribers_.erase(subscriber);
55   }
56
57   void deliver(const std::string& msg)
58   {
59     std::for_each(subscribers_.begin(), subscribers_.end(),
60         boost::bind(&subscriber::deliver, _1, boost::ref(msg)));
61   }
62
63 private:
64   std::set<subscriber_ptr> subscribers_;
65 };
66
67 //----------------------------------------------------------------------
68
69 //
70 // This class manages socket timeouts by applying the concept of a deadline.
71 // Some asynchronous operations are given deadlines by which they must complete.
72 // Deadlines are enforced by two "actors" that persist for the lifetime of the
73 // session object, one for input and one for output:
74 //
75 //  +----------------+                     +----------------+
76 //  |                |                     |                |
77 //  | check_deadline |<---+                | check_deadline |<---+
78 //  |                |    | async_wait()   |                |    | async_wait()
79 //  +----------------+    |  on input      +----------------+    |  on output
80 //              |         |  deadline                  |         |  deadline
81 //              +---------+                            +---------+
82 //
83 // If either deadline actor determines that the corresponding deadline has
84 // expired, the socket is closed and any outstanding operations are cancelled.
85 //
86 // The input actor reads messages from the socket, where messages are delimited
87 // by the newline character:
88 //
89 //  +------------+
90 //  |            |
91 //  | start_read |<---+
92 //  |            |    |
93 //  +------------+    |
94 //          |         |
95 //  async_- |    +-------------+
96 //   read_- |    |             |
97 //  until() +--->| handle_read |
98 //               |             |
99 //               +-------------+
100 //
101 // The deadline for receiving a complete message is 30 seconds. If a non-empty
102 // message is received, it is delivered to all subscribers. If a heartbeat (a
103 // message that consists of a single newline character) is received, a heartbeat
104 // is enqueued for the client, provided there are no other messages waiting to
105 // be sent.
106 //
107 // The output actor is responsible for sending messages to the client:
108 //
109 //  +--------------+
110 //  |              |<---------------------+
111 //  | await_output |                      |
112 //  |              |<---+                 |
113 //  +--------------+    |                 |
114 //      |      |        | async_wait()    |
115 //      |      +--------+                 |
116 //      V                                 |
117 //  +-------------+               +--------------+
118 //  |             | async_write() |              |
119 //  | start_write |-------------->| handle_write |
120 //  |             |               |              |
121 //  +-------------+               +--------------+
122 //
123 // The output actor first waits for an output message to be enqueued. It does
124 // this by using a deadline_timer as an asynchronous condition variable. The
125 // deadline_timer will be signalled whenever the output queue is non-empty.
126 //
127 // Once a message is available, it is sent to the client. The deadline for
128 // sending a complete message is 30 seconds. After the message is successfully
129 // sent, the output actor again waits for the output queue to become non-empty.
130 //
131 class tcp_session
132   : public subscriber,
133     public boost::enable_shared_from_this<tcp_session>
134 {
135 public:
136   tcp_session(boost::asio::io_service& io_service, channel& ch)
137     : channel_(ch),
138       socket_(io_service),
139       input_deadline_(io_service),
140       non_empty_output_queue_(io_service),
141       output_deadline_(io_service)
142   {
143     input_deadline_.expires_at(boost::posix_time::pos_infin);
144     output_deadline_.expires_at(boost::posix_time::pos_infin);
145
146     // The non_empty_output_queue_ deadline_timer is set to pos_infin whenever
147     // the output queue is empty. This ensures that the output actor stays
148     // asleep until a message is put into the queue.
149     non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
150   }
151
152   tcp::socket& socket()
153   {
154     return socket_;
155   }
156
157   // Called by the server object to initiate the four actors.
158   void start()
159   {
160     channel_.join(shared_from_this());
161
162     start_read();
163
164     input_deadline_.async_wait(
165         boost::bind(&tcp_session::check_deadline,
166         shared_from_this(), &input_deadline_));
167
168     await_output();
169
170     output_deadline_.async_wait(
171         boost::bind(&tcp_session::check_deadline,
172         shared_from_this(), &output_deadline_));
173   }
174
175 private:
176   void stop()
177   {
178     channel_.leave(shared_from_this());
179
180     boost::system::error_code ignored_ec;
181     socket_.close(ignored_ec);
182     input_deadline_.cancel();
183     non_empty_output_queue_.cancel();
184     output_deadline_.cancel();
185   }
186
187   bool stopped() const
188   {
189     return !socket_.is_open();
190   }
191
192   void deliver(const std::string& msg)
193   {
194     output_queue_.push_back(msg + "\n");
195
196     // Signal that the output queue contains messages. Modifying the expiry
197     // will wake the output actor, if it is waiting on the timer.
198     non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
199   }
200
201   void start_read()
202   {
203     // Set a deadline for the read operation.
204     input_deadline_.expires_from_now(boost::posix_time::seconds(30));
205
206     // Start an asynchronous operation to read a newline-delimited message.
207     boost::asio::async_read_until(socket_, input_buffer_, '\n',
208         boost::bind(&tcp_session::handle_read, shared_from_this(), _1));
209   }
210
211   void handle_read(const boost::system::error_code& ec)
212   {
213     if (stopped())
214       return;
215
216     if (!ec)
217     {
218       // Extract the newline-delimited message from the buffer.
219       std::string msg;
220       std::istream is(&input_buffer_);
221       std::getline(is, msg);
222
223       if (!msg.empty())
224       {
225         channel_.deliver(msg);
226       }
227       else
228       {
229         // We received a heartbeat message from the client. If there's nothing
230         // else being sent or ready to be sent, send a heartbeat right back.
231         if (output_queue_.empty())
232         {
233           output_queue_.push_back("\n");
234
235           // Signal that the output queue contains messages. Modifying the
236           // expiry will wake the output actor, if it is waiting on the timer.
237           non_empty_output_queue_.expires_at(boost::posix_time::neg_infin);
238         }
239       }
240
241       start_read();
242     }
243     else
244     {
245       stop();
246     }
247   }
248
249   void await_output()
250   {
251     if (stopped())
252       return;
253
254     if (output_queue_.empty())
255     {
256       // There are no messages that are ready to be sent. The actor goes to
257       // sleep by waiting on the non_empty_output_queue_ timer. When a new
258       // message is added, the timer will be modified and the actor will wake.
259       non_empty_output_queue_.expires_at(boost::posix_time::pos_infin);
260       non_empty_output_queue_.async_wait(
261           boost::bind(&tcp_session::await_output, shared_from_this()));
262     }
263     else
264     {
265       start_write();
266     }
267   }
268
269   void start_write()
270   {
271     // Set a deadline for the write operation.
272     output_deadline_.expires_from_now(boost::posix_time::seconds(30));
273
274     // Start an asynchronous operation to send a message.
275     boost::asio::async_write(socket_,
276         boost::asio::buffer(output_queue_.front()),
277         boost::bind(&tcp_session::handle_write, shared_from_this(), _1));
278   }
279
280   void handle_write(const boost::system::error_code& ec)
281   {
282     if (stopped())
283       return;
284
285     if (!ec)
286     {
287       output_queue_.pop_front();
288
289       await_output();
290     }
291     else
292     {
293       stop();
294     }
295   }
296
297   void check_deadline(deadline_timer* deadline)
298   {
299     if (stopped())
300       return;
301
302     // Check whether the deadline has passed. We compare the deadline against
303     // the current time since a new asynchronous operation may have moved the
304     // deadline before this actor had a chance to run.
305     if (deadline->expires_at() <= deadline_timer::traits_type::now())
306     {
307       // The deadline has passed. Stop the session. The other actors will
308       // terminate as soon as possible.
309       stop();
310     }
311     else
312     {
313       // Put the actor back to sleep.
314       deadline->async_wait(
315           boost::bind(&tcp_session::check_deadline,
316           shared_from_this(), deadline));
317     }
318   }
319
320   channel& channel_;
321   tcp::socket socket_;
322   boost::asio::streambuf input_buffer_;
323   deadline_timer input_deadline_;
324   std::deque<std::string> output_queue_;
325   deadline_timer non_empty_output_queue_;
326   deadline_timer output_deadline_;
327 };
328
329 typedef boost::shared_ptr<tcp_session> tcp_session_ptr;
330
331 //----------------------------------------------------------------------
332
333 class udp_broadcaster
334   : public subscriber
335 {
336 public:
337   udp_broadcaster(boost::asio::io_service& io_service,
338       const udp::endpoint& broadcast_endpoint)
339     : socket_(io_service)
340   {
341     socket_.connect(broadcast_endpoint);
342   }
343
344 private:
345   void deliver(const std::string& msg)
346   {
347     boost::system::error_code ignored_ec;
348     socket_.send(boost::asio::buffer(msg), 0, ignored_ec);
349   }
350
351   udp::socket socket_;
352 };
353
354 //----------------------------------------------------------------------
355
356 class server
357 {
358 public:
359   server(boost::asio::io_service& io_service,
360       const tcp::endpoint& listen_endpoint,
361       const udp::endpoint& broadcast_endpoint)
362     : io_service_(io_service),
363       acceptor_(io_service, listen_endpoint)
364   {
365     subscriber_ptr bc(new udp_broadcaster(io_service_, broadcast_endpoint));
366     channel_.join(bc);
367
368     start_accept();
369   }
370
371   void start_accept()
372   {
373     tcp_session_ptr new_session(new tcp_session(io_service_, channel_));
374
375     acceptor_.async_accept(new_session->socket(),
376         boost::bind(&server::handle_accept, this, new_session, _1));
377   }
378
379   void handle_accept(tcp_session_ptr session,
380       const boost::system::error_code& ec)
381   {
382     if (!ec)
383     {
384       session->start();
385     }
386
387     start_accept();
388   }
389
390 private:
391   boost::asio::io_service& io_service_;
392   tcp::acceptor acceptor_;
393   channel channel_;
394 };
395
396 //----------------------------------------------------------------------
397
398 int main(int argc, char* argv[])
399 {
400   try
401   {
402     using namespace std; // For atoi.
403
404     if (argc != 4)
405     {
406       std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
407       return 1;
408     }
409
410     boost::asio::io_service io_service;
411
412     tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
413
414     udp::endpoint broadcast_endpoint(
415         boost::asio::ip::address::from_string(argv[2]), atoi(argv[3]));
416
417     server s(io_service, listen_endpoint, broadcast_endpoint);
418
419     io_service.run();
420   }
421   catch (std::exception& e)
422   {
423     std::cerr << "Exception: " << e.what() << "\n";
424   }
425
426   return 0;
427 }