- add sources.
[platform/framework/web/crosswalk.git] / src / content / browser / byte_stream.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/byte_stream.h"
6
7 #include <deque>
8 #include <set>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
15
16 namespace content {
17 namespace {
18
19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20 ContentVector;
21
22 class ByteStreamReaderImpl;
23
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence.  We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31  public:
32   LifetimeFlag() : is_alive(true) { }
33   bool is_alive;
34
35  protected:
36   friend class base::RefCountedThreadSafe<LifetimeFlag>;
37   virtual ~LifetimeFlag() { }
38
39  private:
40   DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
41 };
42
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl : public ByteStreamWriter {
47  public:
48   ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49                        scoped_refptr<LifetimeFlag> lifetime_flag,
50                        size_t buffer_size);
51   virtual ~ByteStreamWriterImpl();
52
53   // Must be called before any operations are performed.
54   void SetPeer(ByteStreamReaderImpl* peer,
55                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
57
58   // Overridden from ByteStreamWriter.
59   virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
60                      size_t byte_count) OVERRIDE;
61   virtual void Flush() OVERRIDE;
62   virtual void Close(int status) OVERRIDE;
63   virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
64   virtual size_t GetTotalBufferedBytes() const OVERRIDE;
65
66   // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67   static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
68                            ByteStreamWriterImpl* target,
69                            size_t bytes_consumed);
70
71  private:
72   // Called from UpdateWindow when object existence has been validated.
73   void UpdateWindowInternal(size_t bytes_consumed);
74
75   void PostToPeer(bool complete, int status);
76
77   const size_t total_buffer_size_;
78
79   // All data objects in this class are only valid to access on
80   // this task runner except as otherwise noted.
81   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
82
83   // True while this object is alive.
84   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
85
86   base::Closure space_available_callback_;
87   ContentVector input_contents_;
88   size_t input_contents_size_;
89
90   // ** Peer information.
91
92   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
93
94   // How much we've sent to the output that for flow control purposes we
95   // must assume hasn't been read yet.
96   size_t output_size_used_;
97
98   // Only valid to access on peer_task_runner_.
99   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
100
101   // Only valid to access on peer_task_runner_ if
102   // |*peer_lifetime_flag_ == true|
103   ByteStreamReaderImpl* peer_;
104 };
105
106 class ByteStreamReaderImpl : public ByteStreamReader {
107  public:
108   ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
109                        scoped_refptr<LifetimeFlag> lifetime_flag,
110                        size_t buffer_size);
111   virtual ~ByteStreamReaderImpl();
112
113   // Must be called before any operations are performed.
114   void SetPeer(ByteStreamWriterImpl* peer,
115                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
116                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
117
118   // Overridden from ByteStreamReader.
119   virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
120                            size_t* length) OVERRIDE;
121   virtual int GetStatus() const OVERRIDE;
122   virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
123
124   // PostTask target from |ByteStreamWriterImpl::Write| and
125   // |ByteStreamWriterImpl::Close|.
126   // Receive data from our peer.
127   // static because it may be called after the object it is targeting
128   // has been destroyed.  It may not access |*target|
129   // if |*object_lifetime_flag| is false.
130   static void TransferData(
131       scoped_refptr<LifetimeFlag> object_lifetime_flag,
132       ByteStreamReaderImpl* target,
133       scoped_ptr<ContentVector> transfer_buffer,
134       size_t transfer_buffer_bytes,
135       bool source_complete,
136       int status);
137
138  private:
139   // Called from TransferData once object existence has been validated.
140   void TransferDataInternal(
141       scoped_ptr<ContentVector> transfer_buffer,
142       size_t transfer_buffer_bytes,
143       bool source_complete,
144       int status);
145
146   void MaybeUpdateInput();
147
148   const size_t total_buffer_size_;
149
150   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
151
152   // True while this object is alive.
153   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
154
155   ContentVector available_contents_;
156
157   bool received_status_;
158   int status_;
159
160   base::Closure data_available_callback_;
161
162   // Time of last point at which data in stream transitioned from full
163   // to non-full.  Nulled when a callback is sent.
164   base::Time last_non_full_time_;
165
166   // ** Peer information
167
168   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
169
170   // How much has been removed from this class that we haven't told
171   // the input about yet.
172   size_t unreported_consumed_bytes_;
173
174   // Only valid to access on peer_task_runner_.
175   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
176
177   // Only valid to access on peer_task_runner_ if
178   // |*peer_lifetime_flag_ == true|
179   ByteStreamWriterImpl* peer_;
180 };
181
182 ByteStreamWriterImpl::ByteStreamWriterImpl(
183     scoped_refptr<base::SequencedTaskRunner> task_runner,
184     scoped_refptr<LifetimeFlag> lifetime_flag,
185     size_t buffer_size)
186     : total_buffer_size_(buffer_size),
187       my_task_runner_(task_runner),
188       my_lifetime_flag_(lifetime_flag),
189       input_contents_size_(0),
190       output_size_used_(0),
191       peer_(NULL) {
192   DCHECK(my_lifetime_flag_.get());
193   my_lifetime_flag_->is_alive = true;
194 }
195
196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197   my_lifetime_flag_->is_alive = false;
198 }
199
200 void ByteStreamWriterImpl::SetPeer(
201     ByteStreamReaderImpl* peer,
202     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
203     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
204   peer_ = peer;
205   peer_task_runner_ = peer_task_runner;
206   peer_lifetime_flag_ = peer_lifetime_flag;
207 }
208
209 bool ByteStreamWriterImpl::Write(
210     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
211   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
212
213   // Check overflow.
214   //
215   // TODO(tyoshino): Discuss with content/browser/download developer and if
216   // they're fine with, set smaller limit and make it configurable.
217   size_t space_limit = std::numeric_limits<size_t>::max() -
218       GetTotalBufferedBytes();
219   if (byte_count > space_limit) {
220     // TODO(tyoshino): Tell the user that Write() failed.
221     // Ignore input.
222     return false;
223   }
224
225   input_contents_.push_back(std::make_pair(buffer, byte_count));
226   input_contents_size_ += byte_count;
227
228   // Arbitrarily, we buffer to a third of the total size before sending.
229   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
230     PostToPeer(false, 0);
231
232   return GetTotalBufferedBytes() <= total_buffer_size_;
233 }
234
235 void ByteStreamWriterImpl::Flush() {
236   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
237   if (input_contents_size_ > 0)
238     PostToPeer(false, 0);
239 }
240
241 void ByteStreamWriterImpl::Close(int status) {
242   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
243   PostToPeer(true, status);
244 }
245
246 void ByteStreamWriterImpl::RegisterCallback(
247     const base::Closure& source_callback) {
248   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
249   space_available_callback_ = source_callback;
250 }
251
252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
253   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
254   // This sum doesn't overflow since Write() fails if this sum is going to
255   // overflow.
256   return input_contents_size_ + output_size_used_;
257 }
258
259 // static
260 void ByteStreamWriterImpl::UpdateWindow(
261     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
262     size_t bytes_consumed) {
263   // If the target object isn't alive anymore, we do nothing.
264   if (!lifetime_flag->is_alive) return;
265
266   target->UpdateWindowInternal(bytes_consumed);
267 }
268
269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
270   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
271
272   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
273
274   DCHECK_GE(output_size_used_, bytes_consumed);
275   output_size_used_ -= bytes_consumed;
276
277   // Callback if we were above the limit and we're now <= to it.
278   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
279
280   if (no_longer_above_limit && was_above_limit &&
281       !space_available_callback_.is_null())
282     space_available_callback_.Run();
283 }
284
285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
286   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
287   // Valid contexts in which to call.
288   DCHECK(complete || 0 != input_contents_size_);
289
290   scoped_ptr<ContentVector> transfer_buffer;
291   size_t buffer_size = 0;
292   if (0 != input_contents_size_) {
293     transfer_buffer.reset(new ContentVector);
294     transfer_buffer->swap(input_contents_);
295     buffer_size = input_contents_size_;
296     output_size_used_ += input_contents_size_;
297     input_contents_size_ = 0;
298   }
299   peer_task_runner_->PostTask(
300       FROM_HERE, base::Bind(
301           &ByteStreamReaderImpl::TransferData,
302           peer_lifetime_flag_,
303           peer_,
304           base::Passed(&transfer_buffer),
305           buffer_size,
306           complete,
307           status));
308 }
309
310 ByteStreamReaderImpl::ByteStreamReaderImpl(
311     scoped_refptr<base::SequencedTaskRunner> task_runner,
312     scoped_refptr<LifetimeFlag> lifetime_flag,
313     size_t buffer_size)
314     : total_buffer_size_(buffer_size),
315       my_task_runner_(task_runner),
316       my_lifetime_flag_(lifetime_flag),
317       received_status_(false),
318       status_(0),
319       unreported_consumed_bytes_(0),
320       peer_(NULL) {
321   DCHECK(my_lifetime_flag_.get());
322   my_lifetime_flag_->is_alive = true;
323 }
324
325 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
326   my_lifetime_flag_->is_alive = false;
327 }
328
329 void ByteStreamReaderImpl::SetPeer(
330     ByteStreamWriterImpl* peer,
331     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
332     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
333   peer_ = peer;
334   peer_task_runner_ = peer_task_runner;
335   peer_lifetime_flag_ = peer_lifetime_flag;
336 }
337
338 ByteStreamReaderImpl::StreamState
339 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
340                            size_t* length) {
341   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
342
343   if (available_contents_.size()) {
344     *data = available_contents_.front().first;
345     *length = available_contents_.front().second;
346     available_contents_.pop_front();
347     unreported_consumed_bytes_ += *length;
348
349     MaybeUpdateInput();
350     return STREAM_HAS_DATA;
351   }
352   if (received_status_) {
353     return STREAM_COMPLETE;
354   }
355   return STREAM_EMPTY;
356 }
357
358 int ByteStreamReaderImpl::GetStatus() const {
359   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
360   DCHECK(received_status_);
361   return status_;
362 }
363
364 void ByteStreamReaderImpl::RegisterCallback(
365     const base::Closure& sink_callback) {
366   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
367
368   data_available_callback_ = sink_callback;
369 }
370
371 // static
372 void ByteStreamReaderImpl::TransferData(
373     scoped_refptr<LifetimeFlag> object_lifetime_flag,
374     ByteStreamReaderImpl* target,
375     scoped_ptr<ContentVector> transfer_buffer,
376     size_t buffer_size,
377     bool source_complete,
378     int status) {
379   // If our target is no longer alive, do nothing.
380   if (!object_lifetime_flag->is_alive) return;
381
382   target->TransferDataInternal(
383       transfer_buffer.Pass(), buffer_size, source_complete, status);
384 }
385
386 void ByteStreamReaderImpl::TransferDataInternal(
387     scoped_ptr<ContentVector> transfer_buffer,
388     size_t buffer_size,
389     bool source_complete,
390     int status) {
391   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
392
393   bool was_empty = available_contents_.empty();
394
395   if (transfer_buffer) {
396     available_contents_.insert(available_contents_.end(),
397                                transfer_buffer->begin(),
398                                transfer_buffer->end());
399   }
400
401   if (source_complete) {
402     received_status_ = true;
403     status_ = status;
404   }
405
406   // Callback on transition from empty to non-empty, or
407   // source complete.
408   if (((was_empty && !available_contents_.empty()) ||
409        source_complete) &&
410       !data_available_callback_.is_null())
411     data_available_callback_.Run();
412 }
413
414 // Decide whether or not to send the input a window update.
415 // Currently we do that whenever we've got unreported consumption
416 // greater than 1/3 of total size.
417 void ByteStreamReaderImpl::MaybeUpdateInput() {
418   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
419
420   if (unreported_consumed_bytes_ <=
421       total_buffer_size_ / kFractionReadBeforeWindowUpdate)
422     return;
423
424   peer_task_runner_->PostTask(
425       FROM_HERE, base::Bind(
426           &ByteStreamWriterImpl::UpdateWindow,
427           peer_lifetime_flag_,
428           peer_,
429           unreported_consumed_bytes_));
430   unreported_consumed_bytes_ = 0;
431 }
432
433 }  // namespace
434
435 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
436 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
437
438 ByteStreamReader::~ByteStreamReader() { }
439
440 ByteStreamWriter::~ByteStreamWriter() { }
441
442 void CreateByteStream(
443     scoped_refptr<base::SequencedTaskRunner> input_task_runner,
444     scoped_refptr<base::SequencedTaskRunner> output_task_runner,
445     size_t buffer_size,
446     scoped_ptr<ByteStreamWriter>* input,
447     scoped_ptr<ByteStreamReader>* output) {
448   scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
449   scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
450
451   ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
452       input_task_runner, input_flag, buffer_size);
453   ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
454       output_task_runner, output_flag, buffer_size);
455
456   in->SetPeer(out, output_task_runner, output_flag);
457   out->SetPeer(in, input_task_runner, input_flag);
458   input->reset(in);
459   output->reset(out);
460 }
461
462 }  // namespace content