1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/streams/stream.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "content/browser/streams/stream_handle_impl.h"
11 #include "content/browser/streams/stream_read_observer.h"
12 #include "content/browser/streams/stream_registry.h"
13 #include "content/browser/streams/stream_write_observer.h"
14 #include "net/base/io_buffer.h"
17 // Start throttling the connection at about 1MB.
18 const size_t kDeferSizeThreshold = 40 * 32768;
23 Stream::Stream(StreamRegistry* registry,
24 StreamWriteObserver* write_observer,
26 : can_add_data_(true),
30 last_total_buffered_bytes_(0),
33 write_observer_(write_observer),
35 weak_ptr_factory_(this) {
36 CreateByteStream(base::MessageLoopProxy::current(),
37 base::MessageLoopProxy::current(),
42 // Setup callback for writing.
43 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
44 weak_ptr_factory_.GetWeakPtr()));
45 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
46 weak_ptr_factory_.GetWeakPtr()));
48 registry_->RegisterStream(this);
54 bool Stream::SetReadObserver(StreamReadObserver* observer) {
57 read_observer_ = observer;
61 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
62 DCHECK(observer == read_observer_);
63 read_observer_ = NULL;
66 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
67 DCHECK(observer == write_observer_);
68 write_observer_ = NULL;
71 void Stream::Abort() {
72 // Clear all buffer. It's safe to clear reader_ here since the same thread
73 // is used for both input and output operation.
77 can_add_data_ = false;
78 registry_->UnregisterStream(url());
81 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
85 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
86 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
91 // Now it's guaranteed that this doesn't overflow. This must be done before
92 // Write() since GetTotalBufferedBytes() may return different value after
93 // Write() call, so if we use the new value, information in this instance and
94 // one in |registry_| become inconsistent.
95 last_total_buffered_bytes_ = current_buffered_bytes + size;
97 can_add_data_ = writer_->Write(buffer, size);
100 void Stream::AddData(const char* data, size_t size) {
104 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
105 memcpy(io_buffer->data(), data, size);
106 AddData(io_buffer, size);
109 void Stream::Finalize() {
116 // Continue asynchronously.
117 base::MessageLoopProxy::current()->PostTask(
119 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
122 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
130 DCHECK(!data_length_);
131 DCHECK(!data_bytes_read_);
134 return STREAM_ABORTED;
136 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
138 case ByteStreamReader::STREAM_HAS_DATA:
140 case ByteStreamReader::STREAM_COMPLETE:
141 registry_->UnregisterStream(url());
142 return STREAM_COMPLETE;
143 case ByteStreamReader::STREAM_EMPTY:
148 const size_t remaining_bytes = data_length_ - data_bytes_read_;
150 static_cast<size_t>(buf_size) < remaining_bytes ?
151 buf_size : remaining_bytes;
152 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
153 data_bytes_read_ += to_read;
154 if (data_bytes_read_ >= data_length_)
157 *bytes_read = to_read;
158 return STREAM_HAS_DATA;
161 scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
162 const std::string& mime_type) {
163 CHECK(!stream_handle_);
164 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
167 return scoped_ptr<StreamHandle>(stream_handle_).Pass();
170 void Stream::CloseHandle() {
171 // Prevent deletion until this function ends.
172 scoped_refptr<Stream> ref(this);
174 CHECK(stream_handle_);
175 stream_handle_ = NULL;
176 registry_->UnregisterStream(url());
178 write_observer_->OnClose(this);
181 void Stream::OnSpaceAvailable() {
182 can_add_data_ = true;
184 write_observer_->OnSpaceAvailable(this);
187 void Stream::OnDataAvailable() {
189 read_observer_->OnDataAvailable(this);
192 void Stream::ClearBuffer() {
195 data_bytes_read_ = 0;
198 } // namespace content