Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / chromecast / media / cma / ipc / media_message_fifo.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chromecast/media/cma/ipc/media_message_fifo.h"
6
7 #include "base/atomicops.h"
8 #include "base/bind.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "chromecast/media/cma/base/cma_logging.h"
13 #include "chromecast/media/cma/ipc/media_memory_chunk.h"
14 #include "chromecast/media/cma/ipc/media_message.h"
15 #include "chromecast/media/cma/ipc/media_message_type.h"
16
17 namespace chromecast {
18 namespace media {
19
20 class MediaMessageFlag
21     : public base::RefCountedThreadSafe<MediaMessageFlag> {
22  public:
23   // |offset| is the offset in the fifo of the media message.
24   explicit MediaMessageFlag(size_t offset);
25
26   bool IsValid() const;
27
28   void Invalidate();
29
30   size_t offset() const { return offset_; }
31
32  private:
33   friend class base::RefCountedThreadSafe<MediaMessageFlag>;
34   virtual ~MediaMessageFlag();
35
36   const size_t offset_;
37   bool flag_;
38
39   DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
40 };
41
42 MediaMessageFlag::MediaMessageFlag(size_t offset)
43   : offset_(offset),
44     flag_(true) {
45 }
46
47 MediaMessageFlag::~MediaMessageFlag() {
48 }
49
50 bool MediaMessageFlag::IsValid() const {
51   return flag_;
52 }
53
54 void MediaMessageFlag::Invalidate() {
55   flag_ = false;
56 }
57
58 class FifoOwnedMemory : public MediaMemoryChunk {
59  public:
60   FifoOwnedMemory(void* data, size_t size,
61                   const scoped_refptr<MediaMessageFlag>& flag,
62                   const base::Closure& release_msg_cb);
63   virtual ~FifoOwnedMemory();
64
65   // MediaMemoryChunk implementation.
66   virtual void* data() const OVERRIDE { return data_; }
67   virtual size_t size() const OVERRIDE { return size_; }
68   virtual bool valid() const OVERRIDE { return flag_->IsValid(); }
69
70  private:
71   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
72   base::Closure release_msg_cb_;
73
74   void* const data_;
75   const size_t size_;
76   scoped_refptr<MediaMessageFlag> flag_;
77
78   DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
79 };
80
81 FifoOwnedMemory::FifoOwnedMemory(
82     void* data, size_t size,
83     const scoped_refptr<MediaMessageFlag>& flag,
84     const base::Closure& release_msg_cb)
85   : task_runner_(base::MessageLoopProxy::current()),
86     release_msg_cb_(release_msg_cb),
87     data_(data),
88     size_(size),
89     flag_(flag) {
90 }
91
92 FifoOwnedMemory::~FifoOwnedMemory() {
93   // Release the flag before notifying that the message has been released.
94   flag_ = scoped_refptr<MediaMessageFlag>();
95   if (!release_msg_cb_.is_null()) {
96     if (task_runner_->BelongsToCurrentThread()) {
97       release_msg_cb_.Run();
98     } else {
99       task_runner_->PostTask(FROM_HERE, release_msg_cb_);
100     }
101   }
102 }
103
104 MediaMessageFifo::MediaMessageFifo(
105     scoped_ptr<MediaMemoryChunk> mem, bool init)
106   : mem_(mem.Pass()),
107     weak_factory_(this) {
108   CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
109            0u);
110   CHECK_GE(mem_->size(), sizeof(Descriptor));
111   Descriptor* desc = static_cast<Descriptor*>(mem_->data());
112   base_ = static_cast<void*>(&desc->first_item);
113
114   // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
115   // Currently, the sign differs.
116   rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
117   wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
118
119   size_t max_size = mem_->size() -
120       (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
121   if (init) {
122     size_ = max_size;
123     desc->size = size_;
124     internal_rd_offset_ = 0;
125     internal_wr_offset_ = 0;
126     base::subtle::Acquire_Store(rd_offset_, 0);
127     base::subtle::Acquire_Store(wr_offset_, 0);
128   } else {
129     size_ = desc->size;
130     CHECK_LE(size_, max_size);
131     internal_rd_offset_ = current_rd_offset();
132     internal_wr_offset_ = current_wr_offset();
133   }
134   CMALOG(kLogControl)
135       << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
136   CHECK_GT(size_, 0) << size_;
137
138   weak_this_ = weak_factory_.GetWeakPtr();
139   thread_checker_.DetachFromThread();
140 }
141
142 MediaMessageFifo::~MediaMessageFifo() {
143   DCHECK(thread_checker_.CalledOnValidThread());
144 }
145
146 void MediaMessageFifo::ObserveReadActivity(
147     const base::Closure& read_event_cb) {
148   read_event_cb_ = read_event_cb;
149 }
150
151 void MediaMessageFifo::ObserveWriteActivity(
152     const base::Closure& write_event_cb) {
153   write_event_cb_ = write_event_cb;
154 }
155
156 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
157     size_t size_to_reserve) {
158   DCHECK(thread_checker_.CalledOnValidThread());
159
160   // Capture first both the read and write offsets.
161   // and exit right away if not enough free space.
162   size_t wr_offset = internal_wr_offset();
163   size_t rd_offset = current_rd_offset();
164   size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
165   size_t free_size = size_ - 1 - allocated_size;
166   if (free_size < size_to_reserve)
167     return scoped_ptr<MediaMemoryChunk>();
168   CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
169
170   // Note: in the next 2 conditions, we have:
171   // trailing_byte_count < size_to_reserve
172   // and since at this stage: size_to_reserve <= free_size
173   // we also have trailing_byte_count <= free_size
174   // which means that all the trailing bytes are free space in the fifo.
175   size_t trailing_byte_count = size_ - wr_offset;
176   if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
177     // If there is no space to even write the smallest message,
178     // skip the trailing bytes and come back to the beginning of the fifo.
179     // (no way to insert a padding message).
180     if (free_size < trailing_byte_count)
181       return scoped_ptr<MediaMemoryChunk>();
182     wr_offset = 0;
183     CommitInternalWrite(wr_offset);
184
185   } else if (trailing_byte_count < size_to_reserve) {
186     // At this point, we know we have at least the space to write a message.
187     // However, to avoid splitting a message, a padding message is needed.
188     scoped_ptr<MediaMemoryChunk> mem(
189         ReserveMemoryNoCheck(trailing_byte_count));
190     scoped_ptr<MediaMessage> padding_message(
191         MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
192   }
193
194   // Recalculate the free size and exit if not enough free space.
195   wr_offset = internal_wr_offset();
196   allocated_size = (size_ + wr_offset - rd_offset) % size_;
197   free_size = size_ - 1 - allocated_size;
198   if (free_size < size_to_reserve)
199     return scoped_ptr<MediaMemoryChunk>();
200
201   return ReserveMemoryNoCheck(size_to_reserve);
202 }
203
204 scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
205   DCHECK(thread_checker_.CalledOnValidThread());
206
207   // Capture the read and write offsets.
208   size_t rd_offset = internal_rd_offset();
209   size_t wr_offset = current_wr_offset();
210   size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
211
212   if (allocated_size < MediaMessage::minimum_msg_size())
213     return scoped_ptr<MediaMessage>();
214
215   size_t trailing_byte_count = size_ - rd_offset;
216   if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
217     // If there is no space to even have the smallest message,
218     // skip the trailing bytes and come back to the beginning of the fifo.
219     // Note: all the trailing bytes correspond to allocated bytes since:
220     // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
221     rd_offset = 0;
222     allocated_size -= trailing_byte_count;
223     trailing_byte_count = size_;
224     CommitInternalRead(rd_offset);
225   }
226
227   // The message should not be longer than the allocated size
228   // but since a message is a contiguous area of memory, it should also be
229   // smaller than |trailing_byte_count|.
230   size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
231   if (max_msg_size < MediaMessage::minimum_msg_size())
232     return scoped_ptr<MediaMessage>();
233   void* msg_src = static_cast<uint8*>(base_) + rd_offset;
234
235   // Create a flag to protect the serialized structure of the message
236   // from being overwritten.
237   // The serialized structure starts at offset |rd_offset|.
238   scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
239   rd_flags_.push_back(rd_flag);
240   scoped_ptr<MediaMemoryChunk> mem(
241       new FifoOwnedMemory(
242           msg_src, max_msg_size, rd_flag,
243           base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
244
245   // Create the message which wraps its the serialized structure.
246   scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
247   CHECK(message);
248
249   // Update the internal read pointer.
250   rd_offset = (rd_offset + message->size()) % size_;
251   CommitInternalRead(rd_offset);
252
253   return message.Pass();
254 }
255
256 void MediaMessageFifo::Flush() {
257   DCHECK(thread_checker_.CalledOnValidThread());
258
259   size_t wr_offset = current_wr_offset();
260
261   // Invalidate every memory region before flushing.
262   while (!rd_flags_.empty()) {
263     CMALOG(kLogControl) << "Invalidate flag";
264     rd_flags_.front()->Invalidate();
265     rd_flags_.pop_front();
266   }
267
268   // Flush by setting the read pointer to the value of the write pointer.
269   // Update first the internal read pointer then the public one.
270   CommitInternalRead(wr_offset);
271   CommitRead(wr_offset);
272 }
273
274 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
275     size_t size_to_reserve) {
276   size_t wr_offset = internal_wr_offset();
277
278   // Memory block corresponding to the serialized structure of the message.
279   void* msg_start = static_cast<uint8*>(base_) + wr_offset;
280   scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
281   wr_flags_.push_back(wr_flag);
282   scoped_ptr<MediaMemoryChunk> mem(
283       new FifoOwnedMemory(
284           msg_start, size_to_reserve, wr_flag,
285           base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
286
287   // Update the internal write pointer.
288   wr_offset = (wr_offset + size_to_reserve) % size_;
289   CommitInternalWrite(wr_offset);
290
291   return mem.Pass();
292 }
293
294 void MediaMessageFifo::OnWrMemoryReleased() {
295   DCHECK(thread_checker_.CalledOnValidThread());
296
297   if (wr_flags_.empty()) {
298     // Sanity check: when there is no protected memory area,
299     // the external write offset has no reason to be different from
300     // the internal write offset.
301     DCHECK_EQ(current_wr_offset(), internal_wr_offset());
302     return;
303   }
304
305   // Update the external write offset.
306   while (!wr_flags_.empty() &&
307          (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
308     // TODO(damienv): Could add a sanity check to make sure the offset is
309     // between the external write offset and the read offset (not included).
310     wr_flags_.pop_front();
311   }
312
313   // Update the read offset to the first locked memory area
314   // or to the internal read pointer if nothing prevents it.
315   size_t external_wr_offset = internal_wr_offset();
316   if (!wr_flags_.empty())
317     external_wr_offset = wr_flags_.front()->offset();
318   CommitWrite(external_wr_offset);
319 }
320
321 void MediaMessageFifo::OnRdMemoryReleased() {
322   DCHECK(thread_checker_.CalledOnValidThread());
323
324   if (rd_flags_.empty()) {
325     // Sanity check: when there is no protected memory area,
326     // the external read offset has no reason to be different from
327     // the internal read offset.
328     DCHECK_EQ(current_rd_offset(), internal_rd_offset());
329     return;
330   }
331
332   // Update the external read offset.
333   while (!rd_flags_.empty() &&
334          (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
335     // TODO(damienv): Could add a sanity check to make sure the offset is
336     // between the external read offset and the write offset.
337     rd_flags_.pop_front();
338   }
339
340   // Update the read offset to the first locked memory area
341   // or to the internal read pointer if nothing prevents it.
342   size_t external_rd_offset = internal_rd_offset();
343   if (!rd_flags_.empty())
344     external_rd_offset = rd_flags_.front()->offset();
345   CommitRead(external_rd_offset);
346 }
347
348 size_t MediaMessageFifo::current_rd_offset() const {
349   DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
350   size_t rd_offset = base::subtle::Acquire_Load(rd_offset_);
351   CHECK_LT(rd_offset, size_);
352   return rd_offset;
353 }
354
355 size_t MediaMessageFifo::current_wr_offset() const {
356   DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
357
358   // When the fifo consumer acquires the write offset,
359   // we have to make sure that any possible following reads are actually
360   // returning results at least inline with the memory snapshot taken
361   // when the write offset was sampled.
362   // That's why an Acquire_Load is used here.
363   size_t wr_offset = base::subtle::Acquire_Load(wr_offset_);
364   CHECK_LT(wr_offset, size_);
365   return wr_offset;
366 }
367
368 void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
369   // Add a memory fence to ensure the message content is completely read
370   // before updating the read offset.
371   base::subtle::Release_Store(rd_offset_, new_rd_offset);
372
373   // Make sure the read pointer has been updated before sending a notification.
374   if (!read_event_cb_.is_null()) {
375     base::subtle::MemoryBarrier();
376     read_event_cb_.Run();
377   }
378 }
379
380 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
381   // Add a memory fence to ensure the message content is written
382   // before updating the write offset.
383   base::subtle::Release_Store(wr_offset_, new_wr_offset);
384
385   // Make sure the write pointer has been updated before sending a notification.
386   if (!write_event_cb_.is_null()) {
387     base::subtle::MemoryBarrier();
388     write_event_cb_.Run();
389   }
390 }
391
392 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
393   internal_rd_offset_ = new_rd_offset;
394 }
395
396 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
397   internal_wr_offset_ = new_wr_offset;
398 }
399
400 }  // namespace media
401 }  // namespace chromecast