1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chromecast/media/cma/ipc/media_message_fifo.h"
7 #include "base/atomicops.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "chromecast/media/cma/base/cma_logging.h"
13 #include "chromecast/media/cma/ipc/media_memory_chunk.h"
14 #include "chromecast/media/cma/ipc/media_message.h"
15 #include "chromecast/media/cma/ipc/media_message_type.h"
17 namespace chromecast {
20 class MediaMessageFlag
21 : public base::RefCountedThreadSafe<MediaMessageFlag> {
23 // |offset| is the offset in the fifo of the media message.
24 explicit MediaMessageFlag(size_t offset);
30 size_t offset() const { return offset_; }
33 friend class base::RefCountedThreadSafe<MediaMessageFlag>;
34 virtual ~MediaMessageFlag();
39 DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
42 MediaMessageFlag::MediaMessageFlag(size_t offset)
47 MediaMessageFlag::~MediaMessageFlag() {
50 bool MediaMessageFlag::IsValid() const {
54 void MediaMessageFlag::Invalidate() {
58 class FifoOwnedMemory : public MediaMemoryChunk {
60 FifoOwnedMemory(void* data, size_t size,
61 const scoped_refptr<MediaMessageFlag>& flag,
62 const base::Closure& release_msg_cb);
63 virtual ~FifoOwnedMemory();
65 // MediaMemoryChunk implementation.
66 virtual void* data() const OVERRIDE { return data_; }
67 virtual size_t size() const OVERRIDE { return size_; }
68 virtual bool valid() const OVERRIDE { return flag_->IsValid(); }
71 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
72 base::Closure release_msg_cb_;
76 scoped_refptr<MediaMessageFlag> flag_;
78 DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
81 FifoOwnedMemory::FifoOwnedMemory(
82 void* data, size_t size,
83 const scoped_refptr<MediaMessageFlag>& flag,
84 const base::Closure& release_msg_cb)
85 : task_runner_(base::MessageLoopProxy::current()),
86 release_msg_cb_(release_msg_cb),
92 FifoOwnedMemory::~FifoOwnedMemory() {
93 // Release the flag before notifying that the message has been released.
94 flag_ = scoped_refptr<MediaMessageFlag>();
95 if (!release_msg_cb_.is_null()) {
96 if (task_runner_->BelongsToCurrentThread()) {
97 release_msg_cb_.Run();
99 task_runner_->PostTask(FROM_HERE, release_msg_cb_);
104 MediaMessageFifo::MediaMessageFifo(
105 scoped_ptr<MediaMemoryChunk> mem, bool init)
107 weak_factory_(this) {
108 CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
110 CHECK_GE(mem_->size(), sizeof(Descriptor));
111 Descriptor* desc = static_cast<Descriptor*>(mem_->data());
112 base_ = static_cast<void*>(&desc->first_item);
114 // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
115 // Currently, the sign differs.
116 rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
117 wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
119 size_t max_size = mem_->size() -
120 (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
124 internal_rd_offset_ = 0;
125 internal_wr_offset_ = 0;
126 base::subtle::Acquire_Store(rd_offset_, 0);
127 base::subtle::Acquire_Store(wr_offset_, 0);
130 CHECK_LE(size_, max_size);
131 internal_rd_offset_ = current_rd_offset();
132 internal_wr_offset_ = current_wr_offset();
135 << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
136 CHECK_GT(size_, 0) << size_;
138 weak_this_ = weak_factory_.GetWeakPtr();
139 thread_checker_.DetachFromThread();
142 MediaMessageFifo::~MediaMessageFifo() {
143 DCHECK(thread_checker_.CalledOnValidThread());
146 void MediaMessageFifo::ObserveReadActivity(
147 const base::Closure& read_event_cb) {
148 read_event_cb_ = read_event_cb;
151 void MediaMessageFifo::ObserveWriteActivity(
152 const base::Closure& write_event_cb) {
153 write_event_cb_ = write_event_cb;
156 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
157 size_t size_to_reserve) {
158 DCHECK(thread_checker_.CalledOnValidThread());
160 // Capture first both the read and write offsets.
161 // and exit right away if not enough free space.
162 size_t wr_offset = internal_wr_offset();
163 size_t rd_offset = current_rd_offset();
164 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
165 size_t free_size = size_ - 1 - allocated_size;
166 if (free_size < size_to_reserve)
167 return scoped_ptr<MediaMemoryChunk>();
168 CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
170 // Note: in the next 2 conditions, we have:
171 // trailing_byte_count < size_to_reserve
172 // and since at this stage: size_to_reserve <= free_size
173 // we also have trailing_byte_count <= free_size
174 // which means that all the trailing bytes are free space in the fifo.
175 size_t trailing_byte_count = size_ - wr_offset;
176 if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
177 // If there is no space to even write the smallest message,
178 // skip the trailing bytes and come back to the beginning of the fifo.
179 // (no way to insert a padding message).
180 if (free_size < trailing_byte_count)
181 return scoped_ptr<MediaMemoryChunk>();
183 CommitInternalWrite(wr_offset);
185 } else if (trailing_byte_count < size_to_reserve) {
186 // At this point, we know we have at least the space to write a message.
187 // However, to avoid splitting a message, a padding message is needed.
188 scoped_ptr<MediaMemoryChunk> mem(
189 ReserveMemoryNoCheck(trailing_byte_count));
190 scoped_ptr<MediaMessage> padding_message(
191 MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
194 // Recalculate the free size and exit if not enough free space.
195 wr_offset = internal_wr_offset();
196 allocated_size = (size_ + wr_offset - rd_offset) % size_;
197 free_size = size_ - 1 - allocated_size;
198 if (free_size < size_to_reserve)
199 return scoped_ptr<MediaMemoryChunk>();
201 return ReserveMemoryNoCheck(size_to_reserve);
204 scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
205 DCHECK(thread_checker_.CalledOnValidThread());
207 // Capture the read and write offsets.
208 size_t rd_offset = internal_rd_offset();
209 size_t wr_offset = current_wr_offset();
210 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
212 if (allocated_size < MediaMessage::minimum_msg_size())
213 return scoped_ptr<MediaMessage>();
215 size_t trailing_byte_count = size_ - rd_offset;
216 if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
217 // If there is no space to even have the smallest message,
218 // skip the trailing bytes and come back to the beginning of the fifo.
219 // Note: all the trailing bytes correspond to allocated bytes since:
220 // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
222 allocated_size -= trailing_byte_count;
223 trailing_byte_count = size_;
224 CommitInternalRead(rd_offset);
227 // The message should not be longer than the allocated size
228 // but since a message is a contiguous area of memory, it should also be
229 // smaller than |trailing_byte_count|.
230 size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
231 if (max_msg_size < MediaMessage::minimum_msg_size())
232 return scoped_ptr<MediaMessage>();
233 void* msg_src = static_cast<uint8*>(base_) + rd_offset;
235 // Create a flag to protect the serialized structure of the message
236 // from being overwritten.
237 // The serialized structure starts at offset |rd_offset|.
238 scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
239 rd_flags_.push_back(rd_flag);
240 scoped_ptr<MediaMemoryChunk> mem(
242 msg_src, max_msg_size, rd_flag,
243 base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
245 // Create the message which wraps its the serialized structure.
246 scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
249 // Update the internal read pointer.
250 rd_offset = (rd_offset + message->size()) % size_;
251 CommitInternalRead(rd_offset);
253 return message.Pass();
256 void MediaMessageFifo::Flush() {
257 DCHECK(thread_checker_.CalledOnValidThread());
259 size_t wr_offset = current_wr_offset();
261 // Invalidate every memory region before flushing.
262 while (!rd_flags_.empty()) {
263 CMALOG(kLogControl) << "Invalidate flag";
264 rd_flags_.front()->Invalidate();
265 rd_flags_.pop_front();
268 // Flush by setting the read pointer to the value of the write pointer.
269 // Update first the internal read pointer then the public one.
270 CommitInternalRead(wr_offset);
271 CommitRead(wr_offset);
274 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
275 size_t size_to_reserve) {
276 size_t wr_offset = internal_wr_offset();
278 // Memory block corresponding to the serialized structure of the message.
279 void* msg_start = static_cast<uint8*>(base_) + wr_offset;
280 scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
281 wr_flags_.push_back(wr_flag);
282 scoped_ptr<MediaMemoryChunk> mem(
284 msg_start, size_to_reserve, wr_flag,
285 base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
287 // Update the internal write pointer.
288 wr_offset = (wr_offset + size_to_reserve) % size_;
289 CommitInternalWrite(wr_offset);
294 void MediaMessageFifo::OnWrMemoryReleased() {
295 DCHECK(thread_checker_.CalledOnValidThread());
297 if (wr_flags_.empty()) {
298 // Sanity check: when there is no protected memory area,
299 // the external write offset has no reason to be different from
300 // the internal write offset.
301 DCHECK_EQ(current_wr_offset(), internal_wr_offset());
305 // Update the external write offset.
306 while (!wr_flags_.empty() &&
307 (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
308 // TODO(damienv): Could add a sanity check to make sure the offset is
309 // between the external write offset and the read offset (not included).
310 wr_flags_.pop_front();
313 // Update the read offset to the first locked memory area
314 // or to the internal read pointer if nothing prevents it.
315 size_t external_wr_offset = internal_wr_offset();
316 if (!wr_flags_.empty())
317 external_wr_offset = wr_flags_.front()->offset();
318 CommitWrite(external_wr_offset);
321 void MediaMessageFifo::OnRdMemoryReleased() {
322 DCHECK(thread_checker_.CalledOnValidThread());
324 if (rd_flags_.empty()) {
325 // Sanity check: when there is no protected memory area,
326 // the external read offset has no reason to be different from
327 // the internal read offset.
328 DCHECK_EQ(current_rd_offset(), internal_rd_offset());
332 // Update the external read offset.
333 while (!rd_flags_.empty() &&
334 (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
335 // TODO(damienv): Could add a sanity check to make sure the offset is
336 // between the external read offset and the write offset.
337 rd_flags_.pop_front();
340 // Update the read offset to the first locked memory area
341 // or to the internal read pointer if nothing prevents it.
342 size_t external_rd_offset = internal_rd_offset();
343 if (!rd_flags_.empty())
344 external_rd_offset = rd_flags_.front()->offset();
345 CommitRead(external_rd_offset);
348 size_t MediaMessageFifo::current_rd_offset() const {
349 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
350 size_t rd_offset = base::subtle::Acquire_Load(rd_offset_);
351 CHECK_LT(rd_offset, size_);
355 size_t MediaMessageFifo::current_wr_offset() const {
356 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
358 // When the fifo consumer acquires the write offset,
359 // we have to make sure that any possible following reads are actually
360 // returning results at least inline with the memory snapshot taken
361 // when the write offset was sampled.
362 // That's why an Acquire_Load is used here.
363 size_t wr_offset = base::subtle::Acquire_Load(wr_offset_);
364 CHECK_LT(wr_offset, size_);
368 void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
369 // Add a memory fence to ensure the message content is completely read
370 // before updating the read offset.
371 base::subtle::Release_Store(rd_offset_, new_rd_offset);
373 // Make sure the read pointer has been updated before sending a notification.
374 if (!read_event_cb_.is_null()) {
375 base::subtle::MemoryBarrier();
376 read_event_cb_.Run();
380 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
381 // Add a memory fence to ensure the message content is written
382 // before updating the write offset.
383 base::subtle::Release_Store(wr_offset_, new_wr_offset);
385 // Make sure the write pointer has been updated before sending a notification.
386 if (!write_event_cb_.is_null()) {
387 base::subtle::MemoryBarrier();
388 write_event_cb_.Run();
392 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
393 internal_rd_offset_ = new_rd_offset;
396 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
397 internal_wr_offset_ = new_wr_offset;
401 } // namespace chromecast