1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/base/socket_stream.h"
8 #include "base/callback.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
18 const uint32 kDefaultBufferSize = 8*1024;
22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
24 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
25 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
29 weak_ptr_factory_(this) {
30 DCHECK(socket->IsConnected());
33 SocketInputStream::~SocketInputStream() {
36 bool SocketInputStream::Next(const void** data, int* size) {
37 if (GetState() != EMPTY && GetState() != READY) {
38 NOTREACHED() << "Invalid input stream read attempt.";
42 if (GetState() == EMPTY) {
43 DVLOG(1) << "No unread data remaining, ending read.";
47 DCHECK_EQ(GetState(), READY)
48 << " Input stream must have pending data before reading.";
49 DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
50 *data = io_buffer_->data() + next_pos_;
51 *size = UnreadByteCount();
52 next_pos_ = read_buffer_->BytesConsumed();
53 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
57 void SocketInputStream::BackUp(int count) {
58 DCHECK(GetState() == READY || GetState() == EMPTY);
59 // TODO(zea): investigating crbug.com/409985
61 CHECK_LE(count, next_pos_);
64 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
65 << "Current position now at " << next_pos_
66 << " of " << read_buffer_->BytesConsumed();
69 bool SocketInputStream::Skip(int count) {
74 int64 SocketInputStream::ByteCount() const {
75 DCHECK_NE(GetState(), CLOSED);
76 DCHECK_NE(GetState(), READING);
80 int SocketInputStream::UnreadByteCount() const {
81 DCHECK_NE(GetState(), CLOSED);
82 DCHECK_NE(GetState(), READING);
83 return read_buffer_->BytesConsumed() - next_pos_;
86 net::Error SocketInputStream::Refresh(const base::Closure& callback,
88 DCHECK_NE(GetState(), CLOSED);
89 DCHECK_NE(GetState(), READING);
90 DCHECK_GT(byte_limit, 0);
92 if (byte_limit > read_buffer_->BytesRemaining()) {
93 LOG(ERROR) << "Out of buffer space, closing input stream.";
94 CloseStream(net::ERR_FILE_TOO_BIG, base::Closure());
98 if (!socket_->IsConnected()) {
99 LOG(ERROR) << "Socket was disconnected, closing input stream";
100 CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
104 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
106 socket_->Read(read_buffer_.get(),
108 base::Bind(&SocketInputStream::RefreshCompletionCallback,
109 weak_ptr_factory_.GetWeakPtr(),
111 DVLOG(1) << "Read returned " << result;
112 if (result == net::ERR_IO_PENDING) {
113 last_error_ = net::ERR_IO_PENDING;
114 return net::ERR_IO_PENDING;
117 RefreshCompletionCallback(base::Closure(), result);
121 void SocketInputStream::RebuildBuffer() {
122 DVLOG(1) << "Rebuilding input stream, consumed "
123 << next_pos_ << " bytes.";
124 DCHECK_NE(GetState(), READING);
125 DCHECK_NE(GetState(), CLOSED);
127 int unread_data_size = 0;
128 const void* unread_data_ptr = NULL;
129 Next(&unread_data_ptr, &unread_data_size);
132 if (unread_data_ptr != io_buffer_->data()) {
133 DVLOG(1) << "Have " << unread_data_size
134 << " unread bytes remaining, shifting.";
135 // Move any remaining unread data to the start of the buffer;
136 std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
138 DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
140 read_buffer_->DidConsume(unread_data_size);
141 // TODO(zea): investigating crbug.com/409985
142 CHECK_GE(UnreadByteCount(), 0);
145 net::Error SocketInputStream::last_error() const {
149 SocketInputStream::State SocketInputStream::GetState() const {
150 if (last_error_ < net::ERR_IO_PENDING)
153 if (last_error_ == net::ERR_IO_PENDING)
156 DCHECK_EQ(last_error_, net::OK);
157 if (read_buffer_->BytesConsumed() == next_pos_)
163 void SocketInputStream::RefreshCompletionCallback(
164 const base::Closure& callback, int result) {
165 // If an error occurred before the completion callback could complete, ignore
167 if (GetState() == CLOSED)
170 // Result == 0 implies EOF, which is treated as an error.
172 result = net::ERR_CONNECTION_CLOSED;
174 DCHECK_NE(result, net::ERR_IO_PENDING);
176 if (result < net::OK) {
177 DVLOG(1) << "Failed to refresh socket: " << result;
178 CloseStream(static_cast<net::Error>(result), callback);
182 DCHECK_GT(result, 0);
183 last_error_ = net::OK;
184 read_buffer_->DidConsume(result);
185 // TODO(zea): investigating crbug.com/409985
186 CHECK_GT(UnreadByteCount(), 0);
188 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
189 << "Current position " << next_pos_
190 << " of " << read_buffer_->BytesConsumed() << ".";
192 if (!callback.is_null())
196 void SocketInputStream::ResetInternal() {
197 read_buffer_->SetOffset(0);
199 last_error_ = net::OK;
200 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
203 void SocketInputStream::CloseStream(net::Error error,
204 const base::Closure& callback) {
205 DCHECK_LT(error, net::ERR_IO_PENDING);
208 LOG(ERROR) << "Closing stream with result " << error;
209 if (!callback.is_null())
213 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
215 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
216 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
217 kDefaultBufferSize)),
219 last_error_(net::OK),
220 weak_ptr_factory_(this) {
221 DCHECK(socket->IsConnected());
224 SocketOutputStream::~SocketOutputStream() {
227 bool SocketOutputStream::Next(void** data, int* size) {
228 DCHECK_NE(GetState(), CLOSED);
229 DCHECK_NE(GetState(), FLUSHING);
230 if (next_pos_ == write_buffer_->size())
233 *data = write_buffer_->data() + next_pos_;
234 *size = write_buffer_->size() - next_pos_;
235 next_pos_ = write_buffer_->size();
239 void SocketOutputStream::BackUp(int count) {
241 if (count > next_pos_)
244 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
245 << next_pos_ << " bytes used.";
248 int64 SocketOutputStream::ByteCount() const {
249 DCHECK_NE(GetState(), CLOSED);
250 DCHECK_NE(GetState(), FLUSHING);
254 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
255 DCHECK_EQ(GetState(), READY);
257 if (!socket_->IsConnected()) {
258 LOG(ERROR) << "Socket was disconnected, closing output stream";
259 last_error_ = net::ERR_CONNECTION_CLOSED;
263 DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
265 socket_->Write(write_buffer_.get(),
267 base::Bind(&SocketOutputStream::FlushCompletionCallback,
268 weak_ptr_factory_.GetWeakPtr(),
270 DVLOG(1) << "Write returned " << result;
271 if (result == net::ERR_IO_PENDING) {
272 last_error_ = net::ERR_IO_PENDING;
273 return net::ERR_IO_PENDING;
276 FlushCompletionCallback(base::Closure(), result);
280 SocketOutputStream::State SocketOutputStream::GetState() const{
281 if (last_error_ < net::ERR_IO_PENDING)
284 if (last_error_ == net::ERR_IO_PENDING)
287 DCHECK_EQ(last_error_, net::OK);
294 net::Error SocketOutputStream::last_error() const {
298 void SocketOutputStream::FlushCompletionCallback(
299 const base::Closure& callback, int result) {
300 // If an error occurred before the completion callback could complete, ignore
302 if (GetState() == CLOSED)
305 // Result == 0 implies EOF, which is treated as an error.
307 result = net::ERR_CONNECTION_CLOSED;
309 DCHECK_NE(result, net::ERR_IO_PENDING);
311 if (result < net::OK) {
312 LOG(ERROR) << "Failed to flush socket.";
313 last_error_ = static_cast<net::Error>(result);
314 if (!callback.is_null())
319 DCHECK_GT(result, net::OK);
320 last_error_ = net::OK;
322 if (write_buffer_->BytesConsumed() + result < next_pos_) {
323 DVLOG(1) << "Partial flush complete. Retrying.";
324 // Only a partial write was completed. Flush again to finish the write.
325 write_buffer_->DidConsume(result);
330 DVLOG(1) << "Socket flush complete.";
331 write_buffer_->SetOffset(0);
333 if (!callback.is_null())