Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / google_apis / gcm / base / socket_stream.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/base/socket_stream.h"
6
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
11
12 namespace gcm {
13
14 namespace {
15
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
17 // expensive.
18 const uint32 kDefaultBufferSize = 8*1024;
19
20 }  // namespace
21
22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
23     : socket_(socket),
24       io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
25       read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
26                                               kDefaultBufferSize)),
27       next_pos_(0),
28       last_error_(net::OK),
29       weak_ptr_factory_(this) {
30   DCHECK(socket->IsConnected());
31 }
32
33 SocketInputStream::~SocketInputStream() {
34 }
35
36 bool SocketInputStream::Next(const void** data, int* size) {
37   if (GetState() != EMPTY && GetState() != READY) {
38     NOTREACHED() << "Invalid input stream read attempt.";
39     return false;
40   }
41
42   if (GetState() == EMPTY) {
43     DVLOG(1) << "No unread data remaining, ending read.";
44     return false;
45   }
46
47   DCHECK_EQ(GetState(), READY)
48       << " Input stream must have pending data before reading.";
49   DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
50   *data = io_buffer_->data() + next_pos_;
51   *size = UnreadByteCount();
52   next_pos_ = read_buffer_->BytesConsumed();
53   DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
54   return true;
55 }
56
57 void SocketInputStream::BackUp(int count) {
58   DCHECK(GetState() == READY || GetState() == EMPTY);
59   // TODO(zea): investigating crbug.com/409985
60   CHECK_GT(count, 0);
61   CHECK_LE(count, next_pos_);
62
63   next_pos_ -= count;
64   DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
65            << "Current position now at " << next_pos_
66            << " of " << read_buffer_->BytesConsumed();
67 }
68
69 bool SocketInputStream::Skip(int count) {
70   NOTIMPLEMENTED();
71   return false;
72 }
73
74 int64 SocketInputStream::ByteCount() const {
75   DCHECK_NE(GetState(), CLOSED);
76   DCHECK_NE(GetState(), READING);
77   return next_pos_;
78 }
79
80 int SocketInputStream::UnreadByteCount() const {
81   DCHECK_NE(GetState(), CLOSED);
82   DCHECK_NE(GetState(), READING);
83   return read_buffer_->BytesConsumed() - next_pos_;
84 }
85
86 net::Error SocketInputStream::Refresh(const base::Closure& callback,
87                                       int byte_limit) {
88   DCHECK_NE(GetState(), CLOSED);
89   DCHECK_NE(GetState(), READING);
90   DCHECK_GT(byte_limit, 0);
91
92   if (byte_limit > read_buffer_->BytesRemaining()) {
93     LOG(ERROR) << "Out of buffer space, closing input stream.";
94     CloseStream(net::ERR_FILE_TOO_BIG, base::Closure());
95     return net::OK;
96   }
97
98   if (!socket_->IsConnected()) {
99     LOG(ERROR) << "Socket was disconnected, closing input stream";
100     CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
101     return net::OK;
102   }
103
104   DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
105   int result =
106       socket_->Read(read_buffer_.get(),
107                     byte_limit,
108                     base::Bind(&SocketInputStream::RefreshCompletionCallback,
109                                weak_ptr_factory_.GetWeakPtr(),
110                                callback));
111   DVLOG(1) << "Read returned " << result;
112   if (result == net::ERR_IO_PENDING) {
113     last_error_ = net::ERR_IO_PENDING;
114     return net::ERR_IO_PENDING;
115   }
116
117   RefreshCompletionCallback(base::Closure(), result);
118   return net::OK;
119 }
120
121 void SocketInputStream::RebuildBuffer() {
122   DVLOG(1) << "Rebuilding input stream, consumed "
123            << next_pos_ << " bytes.";
124   DCHECK_NE(GetState(), READING);
125   DCHECK_NE(GetState(), CLOSED);
126
127   int unread_data_size = 0;
128   const void* unread_data_ptr = NULL;
129   Next(&unread_data_ptr, &unread_data_size);
130   ResetInternal();
131
132   if (unread_data_ptr != io_buffer_->data()) {
133     DVLOG(1) << "Have " << unread_data_size
134              << " unread bytes remaining, shifting.";
135     // Move any remaining unread data to the start of the buffer;
136     std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
137   } else {
138     DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
139   }
140   read_buffer_->DidConsume(unread_data_size);
141   // TODO(zea): investigating crbug.com/409985
142   CHECK_GE(UnreadByteCount(), 0);
143 }
144
145 net::Error SocketInputStream::last_error() const {
146   return last_error_;
147 }
148
149 SocketInputStream::State SocketInputStream::GetState() const {
150   if (last_error_ < net::ERR_IO_PENDING)
151     return CLOSED;
152
153   if (last_error_ == net::ERR_IO_PENDING)
154     return READING;
155
156   DCHECK_EQ(last_error_, net::OK);
157   if (read_buffer_->BytesConsumed() == next_pos_)
158     return EMPTY;
159
160   return READY;
161 }
162
163 void SocketInputStream::RefreshCompletionCallback(
164     const base::Closure& callback, int result) {
165   // If an error occurred before the completion callback could complete, ignore
166   // the result.
167   if (GetState() == CLOSED)
168     return;
169
170   // Result == 0 implies EOF, which is treated as an error.
171   if (result == 0)
172     result = net::ERR_CONNECTION_CLOSED;
173
174   DCHECK_NE(result, net::ERR_IO_PENDING);
175
176   if (result < net::OK) {
177     DVLOG(1) << "Failed to refresh socket: " << result;
178     CloseStream(static_cast<net::Error>(result), callback);
179     return;
180   }
181
182   DCHECK_GT(result, 0);
183   last_error_ = net::OK;
184   read_buffer_->DidConsume(result);
185   // TODO(zea): investigating crbug.com/409985
186   CHECK_GT(UnreadByteCount(), 0);
187
188   DVLOG(1) << "Refresh complete with " << result << " new bytes. "
189            << "Current position " << next_pos_
190            << " of " << read_buffer_->BytesConsumed() << ".";
191
192   if (!callback.is_null())
193     callback.Run();
194 }
195
196 void SocketInputStream::ResetInternal() {
197   read_buffer_->SetOffset(0);
198   next_pos_ = 0;
199   last_error_ = net::OK;
200   weak_ptr_factory_.InvalidateWeakPtrs();  // Invalidate any callbacks.
201 }
202
203 void SocketInputStream::CloseStream(net::Error error,
204                                     const base::Closure& callback) {
205   DCHECK_LT(error, net::ERR_IO_PENDING);
206   ResetInternal();
207   last_error_ = error;
208   LOG(ERROR) << "Closing stream with result " << error;
209   if (!callback.is_null())
210     callback.Run();
211 }
212
213 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
214     : socket_(socket),
215       io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
216       write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
217                                                kDefaultBufferSize)),
218       next_pos_(0),
219       last_error_(net::OK),
220       weak_ptr_factory_(this) {
221   DCHECK(socket->IsConnected());
222 }
223
224 SocketOutputStream::~SocketOutputStream() {
225 }
226
227 bool SocketOutputStream::Next(void** data, int* size) {
228   DCHECK_NE(GetState(), CLOSED);
229   DCHECK_NE(GetState(), FLUSHING);
230   if (next_pos_ == write_buffer_->size())
231     return false;
232
233   *data = write_buffer_->data() + next_pos_;
234   *size = write_buffer_->size() - next_pos_;
235   next_pos_ = write_buffer_->size();
236   return true;
237 }
238
239 void SocketOutputStream::BackUp(int count) {
240   DCHECK_GE(count, 0);
241   if (count > next_pos_)
242     next_pos_ = 0;
243   next_pos_ -= count;
244   DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
245            << next_pos_ << " bytes used.";
246 }
247
248 int64 SocketOutputStream::ByteCount() const {
249   DCHECK_NE(GetState(), CLOSED);
250   DCHECK_NE(GetState(), FLUSHING);
251   return next_pos_;
252 }
253
254 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
255   DCHECK_EQ(GetState(), READY);
256
257   if (!socket_->IsConnected()) {
258     LOG(ERROR) << "Socket was disconnected, closing output stream";
259     last_error_ = net::ERR_CONNECTION_CLOSED;
260     return net::OK;
261   }
262
263   DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
264   int result =
265       socket_->Write(write_buffer_.get(),
266                      next_pos_,
267                      base::Bind(&SocketOutputStream::FlushCompletionCallback,
268                                 weak_ptr_factory_.GetWeakPtr(),
269                                 callback));
270   DVLOG(1) << "Write returned " << result;
271   if (result == net::ERR_IO_PENDING) {
272     last_error_ = net::ERR_IO_PENDING;
273     return net::ERR_IO_PENDING;
274   }
275
276   FlushCompletionCallback(base::Closure(), result);
277   return net::OK;
278 }
279
280 SocketOutputStream::State SocketOutputStream::GetState() const{
281   if (last_error_ < net::ERR_IO_PENDING)
282     return CLOSED;
283
284   if (last_error_ == net::ERR_IO_PENDING)
285     return FLUSHING;
286
287   DCHECK_EQ(last_error_, net::OK);
288   if (next_pos_ == 0)
289     return EMPTY;
290
291   return READY;
292 }
293
294 net::Error SocketOutputStream::last_error() const {
295   return last_error_;
296 }
297
298 void SocketOutputStream::FlushCompletionCallback(
299     const base::Closure& callback, int result) {
300   // If an error occurred before the completion callback could complete, ignore
301   // the result.
302   if (GetState() == CLOSED)
303     return;
304
305   // Result == 0 implies EOF, which is treated as an error.
306   if (result == 0)
307     result = net::ERR_CONNECTION_CLOSED;
308
309   DCHECK_NE(result, net::ERR_IO_PENDING);
310
311   if (result < net::OK) {
312     LOG(ERROR) << "Failed to flush socket.";
313     last_error_ = static_cast<net::Error>(result);
314     if (!callback.is_null())
315       callback.Run();
316     return;
317   }
318
319   DCHECK_GT(result, net::OK);
320   last_error_ = net::OK;
321
322   if (write_buffer_->BytesConsumed() + result < next_pos_) {
323     DVLOG(1) << "Partial flush complete. Retrying.";
324      // Only a partial write was completed. Flush again to finish the write.
325     write_buffer_->DidConsume(result);
326     Flush(callback);
327     return;
328   }
329
330   DVLOG(1) << "Socket flush complete.";
331   write_buffer_->SetOffset(0);
332   next_pos_ = 0;
333   if (!callback.is_null())
334     callback.Run();
335 }
336
337 }  // namespace gcm