- add sources.
[platform/framework/web/crosswalk.git] / src / content / browser / streams / stream.cc
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/streams/stream.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "content/browser/streams/stream_handle_impl.h"
11 #include "content/browser/streams/stream_read_observer.h"
12 #include "content/browser/streams/stream_registry.h"
13 #include "content/browser/streams/stream_write_observer.h"
14 #include "net/base/io_buffer.h"
15
16 namespace {
17 // Start throttling the connection at about 1MB.
18 const size_t kDeferSizeThreshold = 40 * 32768;
19 }
20
21 namespace content {
22
23 Stream::Stream(StreamRegistry* registry,
24                StreamWriteObserver* write_observer,
25                const GURL& url)
26     : can_add_data_(true),
27       url_(url),
28       data_length_(0),
29       data_bytes_read_(0),
30       last_total_buffered_bytes_(0),
31       registry_(registry),
32       read_observer_(NULL),
33       write_observer_(write_observer),
34       stream_handle_(NULL),
35       weak_ptr_factory_(this) {
36   CreateByteStream(base::MessageLoopProxy::current(),
37                    base::MessageLoopProxy::current(),
38                    kDeferSizeThreshold,
39                    &writer_,
40                    &reader_);
41
42   // Setup callback for writing.
43   writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
44                                        weak_ptr_factory_.GetWeakPtr()));
45   reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
46                                        weak_ptr_factory_.GetWeakPtr()));
47
48   registry_->RegisterStream(this);
49 }
50
51 Stream::~Stream() {
52 }
53
54 bool Stream::SetReadObserver(StreamReadObserver* observer) {
55   if (read_observer_)
56     return false;
57   read_observer_ = observer;
58   return true;
59 }
60
61 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
62   DCHECK(observer == read_observer_);
63   read_observer_ = NULL;
64 }
65
66 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
67   DCHECK(observer == write_observer_);
68   write_observer_ = NULL;
69 }
70
71 void Stream::Abort() {
72   // Clear all buffer. It's safe to clear reader_ here since the same thread
73   // is used for both input and output operation.
74   writer_.reset();
75   reader_.reset();
76   ClearBuffer();
77   can_add_data_ = false;
78   registry_->UnregisterStream(url());
79 }
80
81 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
82   if (!writer_.get())
83     return;
84
85   size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
86   if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
87     Abort();
88     return;
89   }
90
91   // Now it's guaranteed that this doesn't overflow. This must be done before
92   // Write() since GetTotalBufferedBytes() may return different value after
93   // Write() call, so if we use the new value, information in this instance and
94   // one in |registry_| become inconsistent.
95   last_total_buffered_bytes_ = current_buffered_bytes + size;
96
97   can_add_data_ = writer_->Write(buffer, size);
98 }
99
100 void Stream::AddData(const char* data, size_t size) {
101   if (!writer_.get())
102     return;
103
104   scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
105   memcpy(io_buffer->data(), data, size);
106   AddData(io_buffer, size);
107 }
108
109 void Stream::Finalize() {
110   if (!writer_.get())
111     return;
112
113   writer_->Close(0);
114   writer_.reset();
115
116   // Continue asynchronously.
117   base::MessageLoopProxy::current()->PostTask(
118       FROM_HERE,
119       base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
120 }
121
122 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
123                                         int buf_size,
124                                         int* bytes_read) {
125   DCHECK(buf);
126   DCHECK(bytes_read);
127
128   *bytes_read = 0;
129   if (!data_.get()) {
130     DCHECK(!data_length_);
131     DCHECK(!data_bytes_read_);
132
133     if (!reader_.get())
134       return STREAM_ABORTED;
135
136     ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
137     switch (state) {
138       case ByteStreamReader::STREAM_HAS_DATA:
139         break;
140       case ByteStreamReader::STREAM_COMPLETE:
141         registry_->UnregisterStream(url());
142         return STREAM_COMPLETE;
143       case ByteStreamReader::STREAM_EMPTY:
144         return STREAM_EMPTY;
145     }
146   }
147
148   const size_t remaining_bytes = data_length_ - data_bytes_read_;
149   size_t to_read =
150       static_cast<size_t>(buf_size) < remaining_bytes ?
151       buf_size : remaining_bytes;
152   memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
153   data_bytes_read_ += to_read;
154   if (data_bytes_read_ >= data_length_)
155     ClearBuffer();
156
157   *bytes_read = to_read;
158   return STREAM_HAS_DATA;
159 }
160
161 scoped_ptr<StreamHandle> Stream::CreateHandle(const GURL& original_url,
162                                               const std::string& mime_type) {
163   CHECK(!stream_handle_);
164   stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
165                                         original_url,
166                                         mime_type);
167   return scoped_ptr<StreamHandle>(stream_handle_).Pass();
168 }
169
170 void Stream::CloseHandle() {
171   // Prevent deletion until this function ends.
172   scoped_refptr<Stream> ref(this);
173
174   CHECK(stream_handle_);
175   stream_handle_ = NULL;
176   registry_->UnregisterStream(url());
177   if (write_observer_)
178     write_observer_->OnClose(this);
179 }
180
181 void Stream::OnSpaceAvailable() {
182   can_add_data_ = true;
183   if (write_observer_)
184     write_observer_->OnSpaceAvailable(this);
185 }
186
187 void Stream::OnDataAvailable() {
188   if (read_observer_)
189     read_observer_->OnDataAvailable(this);
190 }
191
192 void Stream::ClearBuffer() {
193   data_ = NULL;
194   data_length_ = 0;
195   data_bytes_read_ = 0;
196 }
197
198 }  // namespace content