Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / mojo / services / network / tcp_connected_socket_impl.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
6
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
10
11 namespace mojo {
12
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14     scoped_ptr<net::TCPSocket> socket,
15     ScopedDataPipeConsumerHandle send_stream,
16     ScopedDataPipeProducerHandle receive_stream)
17     : socket_(socket.Pass()),
18       send_stream_(send_stream.Pass()),
19       receive_stream_(receive_stream.Pass()),
20       weak_ptr_factory_(this) {
21   // Queue up async communication.
22   ReceiveMore();
23   SendMore();
24 }
25
26 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
27 }
28
29 void TCPConnectedSocketImpl::ReceiveMore() {
30   DCHECK(!pending_receive_.get());
31
32   uint32_t num_bytes;
33   MojoResult result = NetToMojoPendingBuffer::BeginWrite(
34       &receive_stream_, &pending_receive_, &num_bytes);
35   if (result == MOJO_RESULT_SHOULD_WAIT) {
36     // The pipe is full. We need to wait for it to have more space.
37     receive_handle_watcher_.Start(
38         receive_stream_.get(),
39         MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
40         base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
41                    weak_ptr_factory_.GetWeakPtr()));
42     return;
43   } else if (result != MOJO_RESULT_OK) {
44     // The receive stream is in a bad state.
45     // TODO(darin): How should this be communicated to our client?
46     socket_->Close();
47     return;
48   }
49
50   // Mojo is ready for the receive.
51   CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
52   scoped_refptr<net::IOBuffer> buf(
53       new NetToMojoIOBuffer(pending_receive_.get()));
54   int read_result = socket_->Read(
55       buf.get(), static_cast<int>(num_bytes),
56       base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this),
57                  false));
58   if (read_result == net::ERR_IO_PENDING) {
59     // Pending I/O, wait for result in DidReceive().
60   } else if (read_result >= 0) {
61     // Synchronous data ready.
62     DidReceive(true, read_result);
63   } else {
64     // Some kind of error.
65     // TODO(brettw) notify caller of error.
66     socket_->Close();
67   }
68 }
69
70 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
71   // TODO(darin): Handle a bad |result| value.
72   ReceiveMore();
73 }
74
75 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
76                                         int result) {
77   if (result < 0) {
78     // Error.
79     pending_receive_ = NULL;  // Closes the pipe (owned by the pending write).
80     // TODO(brettw) notify the caller of an error?
81     socket_->Close();
82     return;
83   }
84
85   receive_stream_ = pending_receive_->Complete(result);
86   pending_receive_ = NULL;
87
88   // Schedule more reading.
89   if (completed_synchronously) {
90     // Don't recursively call ReceiveMore if this is a sync read.
91     base::MessageLoop::current()->PostTask(
92         FROM_HERE,
93         base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
94                    weak_ptr_factory_.GetWeakPtr()));
95   } else {
96     ReceiveMore();
97   }
98 }
99
100 void TCPConnectedSocketImpl::SendMore() {
101   uint32_t num_bytes = 0;
102   MojoResult result = MojoToNetPendingBuffer::BeginRead(
103       &send_stream_, &pending_send_, &num_bytes);
104   if (result == MOJO_RESULT_SHOULD_WAIT) {
105     // Data not ready, wait for it.
106     send_handle_watcher_.Start(
107         send_stream_.get(),
108         MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE,
109         base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
110                    weak_ptr_factory_.GetWeakPtr()));
111     return;
112   } else if (result != MOJO_RESULT_OK) {
113     // TODO(brettw) notify caller of error.
114     socket_->Close();
115     return;
116   }
117
118   // Got a buffer from Mojo, give it to the socket. Note that the sockets may
119   // do partial writes.
120   scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
121   int write_result = socket_->Write(
122       buf.get(), static_cast<int>(num_bytes),
123       base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this),
124                  false));
125   if (write_result == net::ERR_IO_PENDING) {
126     // Pending I/O, wait for result in DidSend().
127   } else if (write_result >= 0) {
128     // Synchronous data consumed.
129     DidSend(true, write_result);
130   }
131 }
132
133 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
134   // TODO(brettw): Handle a bad |result| value.
135   SendMore();
136 }
137
138 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
139                                      int result) {
140   if (result < 0) {
141     // TODO(brettw) report error.
142     pending_send_ = NULL;
143     socket_->Close();
144     return;
145   }
146
147   // Take back ownership of the stream and free the IOBuffer.
148   send_stream_ = pending_send_->Complete(result);
149   pending_send_ = NULL;
150
151   // Schedule more writing.
152   if (completed_synchronously) {
153     // Don't recursively call SendMore if this is a sync read.
154     base::MessageLoop::current()->PostTask(
155         FROM_HERE,
156         base::Bind(&TCPConnectedSocketImpl::SendMore,
157                    weak_ptr_factory_.GetWeakPtr()));
158   } else {
159     SendMore();
160   }
161 }
162
163 }  // namespace mojo