d747dbe6558ae98623f5ea92b840c01b11bf8b6f
[platform/framework/web/crosswalk.git] / src / tools / android / forwarder2 / forwarder.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "tools/android/forwarder2/forwarder.h"
6
7 #include "base/basictypes.h"
8 #include "base/bind.h"
9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "base/posix/eintr_wrapper.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread.h"
15 #include "tools/android/forwarder2/pipe_notifier.h"
16 #include "tools/android/forwarder2/socket.h"
17
18 namespace forwarder2 {
19 namespace {
20
21 // Helper class to buffer reads and writes from one socket to another.
22 // Each implements a small buffer connected two one input socket, and
23 // one output socket.
24 //
25 //   socket_from_ ---> [BufferedCopier] ---> socket_to_
26 //
27 // These objects are used in a pair to handle duplex traffic, as in:
28 //
29 //                    ------> [BufferedCopier_1] --->
30 //                  /                                \
31 //      socket_1   *                                  * socket_2
32 //                  \                                /
33 //                   <------ [BufferedCopier_2] <----
34 //
35 // When a BufferedCopier is in the READING state (see below), it only listens
36 // to events on its input socket, and won't detect when its output socket
37 // disconnects. To work around this, its peer will call its Close() method
38 // when that happens.
39
40 class BufferedCopier {
41  public:
42   // Possible states:
43   //    READING - Empty buffer and Waiting for input.
44   //    WRITING - Data in buffer, and waiting for output.
45   //    CLOSING - Like WRITING, but do not try to read after that.
46   //    CLOSED  - Completely closed.
47   //
48   // State transitions are:
49   //
50   //   T01:  READING ---[receive data]---> WRITING
51   //   T02:  READING ---[error on input socket]---> CLOSED
52   //   T03:  READING ---[Close() call]---> CLOSED
53   //
54   //   T04:  WRITING ---[write partial data]---> WRITING
55   //   T05:  WRITING ---[write all data]----> READING
56   //   T06:  WRITING ---[error on output socket]----> CLOSED
57   //   T07:  WRITING ---[Close() call]---> CLOSING
58   //
59   //   T08:  CLOSING ---[write partial data]---> CLOSING
60   //   T09:  CLOSING ---[write all data]----> CLOSED
61   //   T10:  CLOSING ---[Close() call]---> CLOSING
62   //   T11:  CLOSING ---[error on output socket] ---> CLOSED
63   //
64   enum State {
65     STATE_READING = 0,
66     STATE_WRITING = 1,
67     STATE_CLOSING = 2,
68     STATE_CLOSED = 3,
69   };
70
71   // Does NOT own the pointers.
72   BufferedCopier(Socket* socket_from, Socket* socket_to)
73       : socket_from_(socket_from),
74         socket_to_(socket_to),
75         bytes_read_(0),
76         write_offset_(0),
77         peer_(NULL),
78         state_(STATE_READING) {}
79
80   // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
81   void SetPeer(BufferedCopier* peer) {
82     DCHECK(!peer_);
83     peer_ = peer;
84   }
85
86   // Gently asks to close a buffer. Called either by the peer or the forwarder.
87   void Close() {
88     switch (state_) {
89       case STATE_READING:
90         state_ = STATE_CLOSED;  // T03
91         break;
92       case STATE_WRITING:
93         state_ = STATE_CLOSING;  // T07
94         break;
95       case STATE_CLOSING:
96         break;  // T10
97       case STATE_CLOSED:
98         ;
99     }
100   }
101
102   // Call this before select(). This updates |read_fds|,
103   // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
104   void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
105     int fd;
106     switch (state_) {
107       case STATE_READING:
108         DCHECK(bytes_read_ == 0);
109         DCHECK(write_offset_ == 0);
110         fd = socket_from_->fd();
111         if (fd < 0) {
112           ForceClose();  // T02
113           return;
114         }
115         FD_SET(fd, read_fds);
116         break;
117
118       case STATE_WRITING:
119       case STATE_CLOSING:
120         DCHECK(bytes_read_ > 0);
121         DCHECK(write_offset_ < bytes_read_);
122         fd = socket_to_->fd();
123         if (fd < 0) {
124           ForceClose();  // T06
125           return;
126         }
127         FD_SET(fd, write_fds);
128         break;
129
130       case STATE_CLOSED:
131         return;
132     }
133     *max_fd = std::max(*max_fd, fd);
134   }
135
136   // Call this after a select() call to operate over the buffer.
137   void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
138     int fd, ret;
139     switch (state_) {
140       case STATE_READING:
141         fd = socket_from_->fd();
142         if (fd < 0) {
143           state_ = STATE_CLOSED;  // T02
144           return;
145         }
146         if (!FD_ISSET(fd, &read_fds))
147           return;
148
149         ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
150         if (ret <= 0) {
151           ForceClose();  // T02
152           return;
153         }
154         bytes_read_ = ret;
155         write_offset_ = 0;
156         state_ = STATE_WRITING;  // T01
157         break;
158
159       case STATE_WRITING:
160       case STATE_CLOSING:
161         fd = socket_to_->fd();
162         if (fd < 0) {
163           ForceClose();  // T06 + T11
164           return;
165         }
166         if (!FD_ISSET(fd, &write_fds))
167           return;
168
169         ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
170                                            bytes_read_ - write_offset_);
171         if (ret <= 0) {
172           ForceClose();  // T06 + T11
173           return;
174         }
175
176         write_offset_ += ret;
177         if (write_offset_ < bytes_read_)
178           return;  // T08 + T04
179
180         write_offset_ = 0;
181         bytes_read_ = 0;
182         if (state_ == STATE_CLOSING) {
183           ForceClose();  // T09
184           return;
185         }
186         state_ = STATE_READING;  // T05
187         break;
188
189       case STATE_CLOSED:
190         ;
191     }
192   }
193
194  private:
195   // Internal method used to close the buffer and notify the peer, if any.
196   void ForceClose() {
197     if (peer_) {
198       peer_->Close();
199       peer_ = NULL;
200     }
201     state_ = STATE_CLOSED;
202   }
203
204   // Not owned.
205   Socket* socket_from_;
206   Socket* socket_to_;
207
208   // A big buffer to let the file-over-http bridge work more like real file.
209   static const int kBufferSize = 1024 * 128;
210   int bytes_read_;
211   int write_offset_;
212   BufferedCopier* peer_;
213   State state_;
214   char buffer_[kBufferSize];
215
216   DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
217 };
218
219 }  // namespace
220
221 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
222                      scoped_ptr<Socket> socket2,
223                      PipeNotifier* deletion_notifier,
224                      const ErrorCallback& error_callback)
225     : self_deleter_helper_(this, error_callback),
226       deletion_notifier_(deletion_notifier),
227       socket1_(socket1.Pass()),
228       socket2_(socket2.Pass()),
229       thread_("ForwarderThread") {
230   DCHECK(deletion_notifier_);
231 }
232
233 Forwarder::~Forwarder() {}
234
235 void Forwarder::Start() {
236   thread_.Start();
237   thread_.message_loop_proxy()->PostTask(
238       FROM_HERE,
239       base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
240 }
241
242 void Forwarder::ThreadHandler() {
243   fd_set read_fds;
244   fd_set write_fds;
245
246   // Copy from socket1 to socket2
247   BufferedCopier buffer1(socket1_.get(), socket2_.get());
248   // Copy from socket2 to socket1
249   BufferedCopier buffer2(socket2_.get(), socket1_.get());
250
251   buffer1.SetPeer(&buffer2);
252   buffer2.SetPeer(&buffer1);
253
254   for (;;) {
255     FD_ZERO(&read_fds);
256     FD_ZERO(&write_fds);
257
258     int max_fd = -1;
259     buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
260     buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
261
262     if (max_fd < 0) {
263       // Both buffers are closed. Exit immediately.
264       break;
265     }
266
267     const int deletion_fd = deletion_notifier_->receiver_fd();
268     if (deletion_fd >= 0) {
269       FD_SET(deletion_fd, &read_fds);
270       max_fd = std::max(max_fd, deletion_fd);
271     }
272
273     if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
274         0) {
275       PLOG(ERROR) << "select";
276       break;
277     }
278
279     buffer1.ProcessSelect(read_fds, write_fds);
280     buffer2.ProcessSelect(read_fds, write_fds);
281
282     if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
283       buffer1.Close();
284       buffer2.Close();
285     }
286   }
287
288   // Note that the thread that the destructor will run on could be temporarily
289   // blocked on I/O (e.g. select()) therefore it is safer to close the sockets
290   // now rather than relying on the destructor.
291   socket1_.reset();
292   socket2_.reset();
293
294   self_deleter_helper_.MaybeSelfDeleteSoon();
295 }
296
297 }  // namespace forwarder2