1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "tools/android/forwarder2/forwarder.h"
7 #include "base/basictypes.h"
9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "base/posix/eintr_wrapper.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread.h"
15 #include "tools/android/forwarder2/pipe_notifier.h"
16 #include "tools/android/forwarder2/socket.h"
18 namespace forwarder2 {
21 // Helper class to buffer reads and writes from one socket to another.
22 // Each implements a small buffer connected two one input socket, and
25 // socket_from_ ---> [BufferedCopier] ---> socket_to_
27 // These objects are used in a pair to handle duplex traffic, as in:
29 // ------> [BufferedCopier_1] --->
31 // socket_1 * * socket_2
33 // <------ [BufferedCopier_2] <----
35 // When a BufferedCopier is in the READING state (see below), it only listens
36 // to events on its input socket, and won't detect when its output socket
37 // disconnects. To work around this, its peer will call its Close() method
40 class BufferedCopier {
43 // READING - Empty buffer and Waiting for input.
44 // WRITING - Data in buffer, and waiting for output.
45 // CLOSING - Like WRITING, but do not try to read after that.
46 // CLOSED - Completely closed.
48 // State transitions are:
50 // T01: READING ---[receive data]---> WRITING
51 // T02: READING ---[error on input socket]---> CLOSED
52 // T03: READING ---[Close() call]---> CLOSED
54 // T04: WRITING ---[write partial data]---> WRITING
55 // T05: WRITING ---[write all data]----> READING
56 // T06: WRITING ---[error on output socket]----> CLOSED
57 // T07: WRITING ---[Close() call]---> CLOSING
59 // T08: CLOSING ---[write partial data]---> CLOSING
60 // T09: CLOSING ---[write all data]----> CLOSED
61 // T10: CLOSING ---[Close() call]---> CLOSING
62 // T11: CLOSING ---[error on output socket] ---> CLOSED
71 // Does NOT own the pointers.
72 BufferedCopier(Socket* socket_from, Socket* socket_to)
73 : socket_from_(socket_from),
74 socket_to_(socket_to),
78 state_(STATE_READING) {}
80 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
81 void SetPeer(BufferedCopier* peer) {
86 // Gently asks to close a buffer. Called either by the peer or the forwarder.
90 state_ = STATE_CLOSED; // T03
93 state_ = STATE_CLOSING; // T07
102 // Call this before select(). This updates |read_fds|,
103 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
104 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
108 DCHECK(bytes_read_ == 0);
109 DCHECK(write_offset_ == 0);
110 fd = socket_from_->fd();
115 FD_SET(fd, read_fds);
120 DCHECK(bytes_read_ > 0);
121 DCHECK(write_offset_ < bytes_read_);
122 fd = socket_to_->fd();
127 FD_SET(fd, write_fds);
133 *max_fd = std::max(*max_fd, fd);
136 // Call this after a select() call to operate over the buffer.
137 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
141 fd = socket_from_->fd();
143 state_ = STATE_CLOSED; // T02
146 if (!FD_ISSET(fd, &read_fds))
149 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
156 state_ = STATE_WRITING; // T01
161 fd = socket_to_->fd();
163 ForceClose(); // T06 + T11
166 if (!FD_ISSET(fd, &write_fds))
169 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
170 bytes_read_ - write_offset_);
172 ForceClose(); // T06 + T11
176 write_offset_ += ret;
177 if (write_offset_ < bytes_read_)
182 if (state_ == STATE_CLOSING) {
186 state_ = STATE_READING; // T05
195 // Internal method used to close the buffer and notify the peer, if any.
201 state_ = STATE_CLOSED;
205 Socket* socket_from_;
208 // A big buffer to let the file-over-http bridge work more like real file.
209 static const int kBufferSize = 1024 * 128;
212 BufferedCopier* peer_;
214 char buffer_[kBufferSize];
216 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
221 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
222 scoped_ptr<Socket> socket2,
223 PipeNotifier* deletion_notifier,
224 const ErrorCallback& error_callback)
225 : self_deleter_helper_(this, error_callback),
226 deletion_notifier_(deletion_notifier),
227 socket1_(socket1.Pass()),
228 socket2_(socket2.Pass()),
229 thread_("ForwarderThread") {
230 DCHECK(deletion_notifier_);
233 Forwarder::~Forwarder() {}
235 void Forwarder::Start() {
237 thread_.message_loop_proxy()->PostTask(
239 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
242 void Forwarder::ThreadHandler() {
246 // Copy from socket1 to socket2
247 BufferedCopier buffer1(socket1_.get(), socket2_.get());
248 // Copy from socket2 to socket1
249 BufferedCopier buffer2(socket2_.get(), socket1_.get());
251 buffer1.SetPeer(&buffer2);
252 buffer2.SetPeer(&buffer1);
259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
263 // Both buffers are closed. Exit immediately.
267 const int deletion_fd = deletion_notifier_->receiver_fd();
268 if (deletion_fd >= 0) {
269 FD_SET(deletion_fd, &read_fds);
270 max_fd = std::max(max_fd, deletion_fd);
273 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
275 PLOG(ERROR) << "select";
279 buffer1.ProcessSelect(read_fds, write_fds);
280 buffer2.ProcessSelect(read_fds, write_fds);
282 if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
288 // Note that the thread that the destructor will run on could be temporarily
289 // blocked on I/O (e.g. select()) therefore it is safer to close the sockets
290 // now rather than relying on the destructor.
294 self_deleter_helper_.MaybeSelfDeleteSoon();
297 } // namespace forwarder2