Upstream version 11.40.277.0
[platform/framework/web/crosswalk.git] / src / content / browser / byte_stream.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/byte_stream.h"
6
7 #include <deque>
8 #include <set>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
15
16 namespace content {
17 namespace {
18
19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20 ContentVector;
21
22 class ByteStreamReaderImpl;
23
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence.  We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31  public:
32   LifetimeFlag() : is_alive(true) { }
33   bool is_alive;
34
35  protected:
36   friend class base::RefCountedThreadSafe<LifetimeFlag>;
37   virtual ~LifetimeFlag() { }
38
39  private:
40   DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
41 };
42
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl : public ByteStreamWriter {
47  public:
48   ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49                        scoped_refptr<LifetimeFlag> lifetime_flag,
50                        size_t buffer_size);
51   ~ByteStreamWriterImpl() override;
52
53   // Must be called before any operations are performed.
54   void SetPeer(ByteStreamReaderImpl* peer,
55                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
57
58   // Overridden from ByteStreamWriter.
59   bool Write(scoped_refptr<net::IOBuffer> buffer, size_t byte_count) override;
60   void Flush() override;
61   void Close(int status) override;
62   void RegisterCallback(const base::Closure& source_callback) override;
63   size_t GetTotalBufferedBytes() const override;
64
65   // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
66   static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
67                            ByteStreamWriterImpl* target,
68                            size_t bytes_consumed);
69
70  private:
71   // Called from UpdateWindow when object existence has been validated.
72   void UpdateWindowInternal(size_t bytes_consumed);
73
74   void PostToPeer(bool complete, int status);
75
76   const size_t total_buffer_size_;
77
78   // All data objects in this class are only valid to access on
79   // this task runner except as otherwise noted.
80   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
81
82   // True while this object is alive.
83   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
84
85   base::Closure space_available_callback_;
86   ContentVector input_contents_;
87   size_t input_contents_size_;
88
89   // ** Peer information.
90
91   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
92
93   // How much we've sent to the output that for flow control purposes we
94   // must assume hasn't been read yet.
95   size_t output_size_used_;
96
97   // Only valid to access on peer_task_runner_.
98   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
99
100   // Only valid to access on peer_task_runner_ if
101   // |*peer_lifetime_flag_ == true|
102   ByteStreamReaderImpl* peer_;
103 };
104
105 class ByteStreamReaderImpl : public ByteStreamReader {
106  public:
107   ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
108                        scoped_refptr<LifetimeFlag> lifetime_flag,
109                        size_t buffer_size);
110   ~ByteStreamReaderImpl() override;
111
112   // Must be called before any operations are performed.
113   void SetPeer(ByteStreamWriterImpl* peer,
114                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
115                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
116
117   // Overridden from ByteStreamReader.
118   StreamState Read(scoped_refptr<net::IOBuffer>* data, size_t* length) override;
119   int GetStatus() const override;
120   void RegisterCallback(const base::Closure& sink_callback) override;
121
122   // PostTask target from |ByteStreamWriterImpl::Write| and
123   // |ByteStreamWriterImpl::Close|.
124   // Receive data from our peer.
125   // static because it may be called after the object it is targeting
126   // has been destroyed.  It may not access |*target|
127   // if |*object_lifetime_flag| is false.
128   static void TransferData(
129       scoped_refptr<LifetimeFlag> object_lifetime_flag,
130       ByteStreamReaderImpl* target,
131       scoped_ptr<ContentVector> transfer_buffer,
132       size_t transfer_buffer_bytes,
133       bool source_complete,
134       int status);
135
136  private:
137   // Called from TransferData once object existence has been validated.
138   void TransferDataInternal(
139       scoped_ptr<ContentVector> transfer_buffer,
140       size_t transfer_buffer_bytes,
141       bool source_complete,
142       int status);
143
144   void MaybeUpdateInput();
145
146   const size_t total_buffer_size_;
147
148   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
149
150   // True while this object is alive.
151   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
152
153   ContentVector available_contents_;
154
155   bool received_status_;
156   int status_;
157
158   base::Closure data_available_callback_;
159
160   // Time of last point at which data in stream transitioned from full
161   // to non-full.  Nulled when a callback is sent.
162   base::Time last_non_full_time_;
163
164   // ** Peer information
165
166   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
167
168   // How much has been removed from this class that we haven't told
169   // the input about yet.
170   size_t unreported_consumed_bytes_;
171
172   // Only valid to access on peer_task_runner_.
173   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
174
175   // Only valid to access on peer_task_runner_ if
176   // |*peer_lifetime_flag_ == true|
177   ByteStreamWriterImpl* peer_;
178 };
179
180 ByteStreamWriterImpl::ByteStreamWriterImpl(
181     scoped_refptr<base::SequencedTaskRunner> task_runner,
182     scoped_refptr<LifetimeFlag> lifetime_flag,
183     size_t buffer_size)
184     : total_buffer_size_(buffer_size),
185       my_task_runner_(task_runner),
186       my_lifetime_flag_(lifetime_flag),
187       input_contents_size_(0),
188       output_size_used_(0),
189       peer_(NULL) {
190   DCHECK(my_lifetime_flag_.get());
191   my_lifetime_flag_->is_alive = true;
192 }
193
194 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
195   // No RunsTasksOnCurrentThread() check to allow deleting a created writer
196   // before we start using it. Once started, should be deleted on the specified
197   // task runner.
198   my_lifetime_flag_->is_alive = false;
199 }
200
201 void ByteStreamWriterImpl::SetPeer(
202     ByteStreamReaderImpl* peer,
203     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
204     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
205   peer_ = peer;
206   peer_task_runner_ = peer_task_runner;
207   peer_lifetime_flag_ = peer_lifetime_flag;
208 }
209
210 bool ByteStreamWriterImpl::Write(
211     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
212   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
213
214   // Check overflow.
215   //
216   // TODO(tyoshino): Discuss with content/browser/download developer and if
217   // they're fine with, set smaller limit and make it configurable.
218   size_t space_limit = std::numeric_limits<size_t>::max() -
219       GetTotalBufferedBytes();
220   if (byte_count > space_limit) {
221     // TODO(tyoshino): Tell the user that Write() failed.
222     // Ignore input.
223     return false;
224   }
225
226   input_contents_.push_back(std::make_pair(buffer, byte_count));
227   input_contents_size_ += byte_count;
228
229   // Arbitrarily, we buffer to a third of the total size before sending.
230   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
231     PostToPeer(false, 0);
232
233   return GetTotalBufferedBytes() <= total_buffer_size_;
234 }
235
236 void ByteStreamWriterImpl::Flush() {
237   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
238   if (input_contents_size_ > 0)
239     PostToPeer(false, 0);
240 }
241
242 void ByteStreamWriterImpl::Close(int status) {
243   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
244   PostToPeer(true, status);
245 }
246
247 void ByteStreamWriterImpl::RegisterCallback(
248     const base::Closure& source_callback) {
249   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
250   space_available_callback_ = source_callback;
251 }
252
253 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
254   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
255   // This sum doesn't overflow since Write() fails if this sum is going to
256   // overflow.
257   return input_contents_size_ + output_size_used_;
258 }
259
260 // static
261 void ByteStreamWriterImpl::UpdateWindow(
262     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
263     size_t bytes_consumed) {
264   // If the target object isn't alive anymore, we do nothing.
265   if (!lifetime_flag->is_alive) return;
266
267   target->UpdateWindowInternal(bytes_consumed);
268 }
269
270 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
271   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
272
273   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
274
275   DCHECK_GE(output_size_used_, bytes_consumed);
276   output_size_used_ -= bytes_consumed;
277
278   // Callback if we were above the limit and we're now <= to it.
279   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
280
281   if (no_longer_above_limit && was_above_limit &&
282       !space_available_callback_.is_null())
283     space_available_callback_.Run();
284 }
285
286 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
287   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
288   // Valid contexts in which to call.
289   DCHECK(complete || 0 != input_contents_size_);
290
291   scoped_ptr<ContentVector> transfer_buffer;
292   size_t buffer_size = 0;
293   if (0 != input_contents_size_) {
294     transfer_buffer.reset(new ContentVector);
295     transfer_buffer->swap(input_contents_);
296     buffer_size = input_contents_size_;
297     output_size_used_ += input_contents_size_;
298     input_contents_size_ = 0;
299   }
300   peer_task_runner_->PostTask(
301       FROM_HERE, base::Bind(
302           &ByteStreamReaderImpl::TransferData,
303           peer_lifetime_flag_,
304           peer_,
305           base::Passed(&transfer_buffer),
306           buffer_size,
307           complete,
308           status));
309 }
310
311 ByteStreamReaderImpl::ByteStreamReaderImpl(
312     scoped_refptr<base::SequencedTaskRunner> task_runner,
313     scoped_refptr<LifetimeFlag> lifetime_flag,
314     size_t buffer_size)
315     : total_buffer_size_(buffer_size),
316       my_task_runner_(task_runner),
317       my_lifetime_flag_(lifetime_flag),
318       received_status_(false),
319       status_(0),
320       unreported_consumed_bytes_(0),
321       peer_(NULL) {
322   DCHECK(my_lifetime_flag_.get());
323   my_lifetime_flag_->is_alive = true;
324 }
325
326 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
327   // No RunsTasksOnCurrentThread() check to allow deleting a created writer
328   // before we start using it. Once started, should be deleted on the specified
329   // task runner.
330   my_lifetime_flag_->is_alive = false;
331 }
332
333 void ByteStreamReaderImpl::SetPeer(
334     ByteStreamWriterImpl* peer,
335     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
336     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
337   peer_ = peer;
338   peer_task_runner_ = peer_task_runner;
339   peer_lifetime_flag_ = peer_lifetime_flag;
340 }
341
342 ByteStreamReaderImpl::StreamState
343 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
344                            size_t* length) {
345   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
346
347   if (available_contents_.size()) {
348     *data = available_contents_.front().first;
349     *length = available_contents_.front().second;
350     available_contents_.pop_front();
351     unreported_consumed_bytes_ += *length;
352
353     MaybeUpdateInput();
354     return STREAM_HAS_DATA;
355   }
356   if (received_status_) {
357     return STREAM_COMPLETE;
358   }
359   return STREAM_EMPTY;
360 }
361
362 int ByteStreamReaderImpl::GetStatus() const {
363   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
364   DCHECK(received_status_);
365   return status_;
366 }
367
368 void ByteStreamReaderImpl::RegisterCallback(
369     const base::Closure& sink_callback) {
370   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
371
372   data_available_callback_ = sink_callback;
373 }
374
375 // static
376 void ByteStreamReaderImpl::TransferData(
377     scoped_refptr<LifetimeFlag> object_lifetime_flag,
378     ByteStreamReaderImpl* target,
379     scoped_ptr<ContentVector> transfer_buffer,
380     size_t buffer_size,
381     bool source_complete,
382     int status) {
383   // If our target is no longer alive, do nothing.
384   if (!object_lifetime_flag->is_alive) return;
385
386   target->TransferDataInternal(
387       transfer_buffer.Pass(), buffer_size, source_complete, status);
388 }
389
390 void ByteStreamReaderImpl::TransferDataInternal(
391     scoped_ptr<ContentVector> transfer_buffer,
392     size_t buffer_size,
393     bool source_complete,
394     int status) {
395   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
396
397   bool was_empty = available_contents_.empty();
398
399   if (transfer_buffer) {
400     available_contents_.insert(available_contents_.end(),
401                                transfer_buffer->begin(),
402                                transfer_buffer->end());
403   }
404
405   if (source_complete) {
406     received_status_ = true;
407     status_ = status;
408   }
409
410   // Callback on transition from empty to non-empty, or
411   // source complete.
412   if (((was_empty && !available_contents_.empty()) ||
413        source_complete) &&
414       !data_available_callback_.is_null())
415     data_available_callback_.Run();
416 }
417
418 // Decide whether or not to send the input a window update.
419 // Currently we do that whenever we've got unreported consumption
420 // greater than 1/3 of total size.
421 void ByteStreamReaderImpl::MaybeUpdateInput() {
422   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
423
424   if (unreported_consumed_bytes_ <=
425       total_buffer_size_ / kFractionReadBeforeWindowUpdate)
426     return;
427
428   peer_task_runner_->PostTask(
429       FROM_HERE, base::Bind(
430           &ByteStreamWriterImpl::UpdateWindow,
431           peer_lifetime_flag_,
432           peer_,
433           unreported_consumed_bytes_));
434   unreported_consumed_bytes_ = 0;
435 }
436
437 }  // namespace
438
439 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
440 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
441
442 ByteStreamReader::~ByteStreamReader() { }
443
444 ByteStreamWriter::~ByteStreamWriter() { }
445
446 void CreateByteStream(
447     scoped_refptr<base::SequencedTaskRunner> input_task_runner,
448     scoped_refptr<base::SequencedTaskRunner> output_task_runner,
449     size_t buffer_size,
450     scoped_ptr<ByteStreamWriter>* input,
451     scoped_ptr<ByteStreamReader>* output) {
452   scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
453   scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
454
455   ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
456       input_task_runner, input_flag, buffer_size);
457   ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
458       output_task_runner, output_flag, buffer_size);
459
460   in->SetPeer(out, output_task_runner, output_flag);
461   out->SetPeer(in, input_task_runner, input_flag);
462   input->reset(in);
463   output->reset(out);
464 }
465
466 }  // namespace content