1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/debug/trace_event.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/single_thread_task_runner.h"
13 #include "media/base/audio_decoder.h"
14 #include "media/base/bind_to_current_loop.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/demuxer_stream.h"
17 #include "media/base/video_decoder.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << \
29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
32 const char* GetTraceString<DemuxerStream::VIDEO>() {
33 return "DecoderStream<VIDEO>::Decode";
37 const char* GetTraceString<DemuxerStream::AUDIO>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType>
42 DecoderStream<StreamType>::DecoderStream(
43 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
44 ScopedVector<Decoder> decoders,
45 const SetDecryptorReadyCB& set_decryptor_ready_cb,
46 const scoped_refptr<MediaLog>& media_log)
47 : task_runner_(task_runner),
48 media_log_(media_log),
49 state_(STATE_UNINITIALIZED),
53 new DecoderSelector<StreamType>(task_runner,
55 set_decryptor_ready_cb)),
56 active_splice_(false),
58 pending_decode_requests_(0),
59 weak_factory_(this) {}
61 template <DemuxerStream::Type StreamType>
62 DecoderStream<StreamType>::~DecoderStream() {
64 DCHECK(task_runner_->BelongsToCurrentThread());
66 decoder_selector_.reset();
68 if (!init_cb_.is_null()) {
69 task_runner_->PostTask(FROM_HERE,
70 base::Bind(base::ResetAndReturn(&init_cb_), false));
72 if (!read_cb_.is_null()) {
73 task_runner_->PostTask(FROM_HERE, base::Bind(
74 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
76 if (!reset_cb_.is_null())
77 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
81 decrypting_demuxer_stream_.reset();
84 template <DemuxerStream::Type StreamType>
85 void DecoderStream<StreamType>::Initialize(DemuxerStream* stream,
87 const StatisticsCB& statistics_cb,
88 const InitCB& init_cb) {
90 DCHECK(task_runner_->BelongsToCurrentThread());
91 DCHECK_EQ(state_, STATE_UNINITIALIZED) << state_;
92 DCHECK(init_cb_.is_null());
93 DCHECK(!init_cb.is_null());
95 statistics_cb_ = statistics_cb;
98 low_delay_ = low_delay;
100 state_ = STATE_INITIALIZING;
101 // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
102 decoder_selector_->SelectDecoder(
104 base::Bind(&DecoderStream<StreamType>::OnDecoderSelected,
105 weak_factory_.GetWeakPtr()),
106 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
107 weak_factory_.GetWeakPtr()));
110 template <DemuxerStream::Type StreamType>
111 void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
113 DCHECK(task_runner_->BelongsToCurrentThread());
114 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_INITIALIZING)
116 // No two reads in the flight at any time.
117 DCHECK(read_cb_.is_null());
118 // No read during resetting or stopping process.
119 DCHECK(reset_cb_.is_null());
121 if (state_ == STATE_ERROR) {
122 task_runner_->PostTask(
123 FROM_HERE, base::Bind(read_cb, DECODE_ERROR, scoped_refptr<Output>()));
127 if (state_ == STATE_END_OF_STREAM && ready_outputs_.empty()) {
128 task_runner_->PostTask(
129 FROM_HERE, base::Bind(read_cb, OK, StreamTraits::CreateEOSOutput()));
133 if (!ready_outputs_.empty()) {
134 task_runner_->PostTask(FROM_HERE,
135 base::Bind(read_cb, OK, ready_outputs_.front()));
136 ready_outputs_.pop_front();
141 if (state_ == STATE_NORMAL && CanDecodeMore())
142 ReadFromDemuxerStream();
145 template <DemuxerStream::Type StreamType>
146 void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
148 DCHECK(task_runner_->BelongsToCurrentThread());
149 DCHECK(state_ != STATE_UNINITIALIZED)<< state_;
150 DCHECK(reset_cb_.is_null());
154 if (!read_cb_.is_null()) {
155 task_runner_->PostTask(FROM_HERE, base::Bind(
156 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
159 ready_outputs_.clear();
161 // During decoder reinitialization, the Decoder does not need to be and
162 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
164 if (state_ == STATE_REINITIALIZING_DECODER)
167 // During pending demuxer read and when not using DecryptingDemuxerStream,
168 // the Decoder will be reset after demuxer read is returned
169 // (in OnBufferReady()).
170 if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
173 if (decrypting_demuxer_stream_) {
174 decrypting_demuxer_stream_->Reset(base::Bind(
175 &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
182 template <DemuxerStream::Type StreamType>
183 bool DecoderStream<StreamType>::CanReadWithoutStalling() const {
184 DCHECK(task_runner_->BelongsToCurrentThread());
185 return !ready_outputs_.empty() || decoder_->CanReadWithoutStalling();
189 bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
190 DCHECK(task_runner_->BelongsToCurrentThread());
194 template <DemuxerStream::Type StreamType>
195 int DecoderStream<StreamType>::GetMaxDecodeRequests() const {
196 return decoder_->GetMaxDecodeRequests();
200 int DecoderStream<DemuxerStream::AUDIO>::GetMaxDecodeRequests() const {
204 template <DemuxerStream::Type StreamType>
205 bool DecoderStream<StreamType>::CanDecodeMore() const {
206 DCHECK(task_runner_->BelongsToCurrentThread());
208 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
209 // It only makes sense to saturate decoder completely when output queue is
212 static_cast<int>(ready_outputs_.size()) + pending_decode_requests_;
213 return !decoding_eos_ && num_decodes < GetMaxDecodeRequests();
216 template <DemuxerStream::Type StreamType>
217 void DecoderStream<StreamType>::OnDecoderSelected(
218 scoped_ptr<Decoder> selected_decoder,
219 scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
220 FUNCTION_DVLOG(2) << ": "
221 << (selected_decoder ? selected_decoder->GetDisplayName()
222 : "No decoder selected.");
223 DCHECK(task_runner_->BelongsToCurrentThread());
224 DCHECK_EQ(state_, STATE_INITIALIZING) << state_;
225 DCHECK(!init_cb_.is_null());
226 DCHECK(read_cb_.is_null());
227 DCHECK(reset_cb_.is_null());
229 decoder_selector_.reset();
230 if (decrypting_demuxer_stream)
231 stream_ = decrypting_demuxer_stream.get();
233 if (!selected_decoder) {
234 state_ = STATE_UNINITIALIZED;
235 base::ResetAndReturn(&init_cb_).Run(false);
239 state_ = STATE_NORMAL;
240 decoder_ = selected_decoder.Pass();
241 decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass();
243 const std::string stream_type = DecoderStreamTraits<StreamType>::ToString();
244 media_log_->SetBooleanProperty((stream_type + "_dds").c_str(),
245 decrypting_demuxer_stream_);
246 media_log_->SetStringProperty((stream_type + "_decoder").c_str(),
247 decoder_->GetDisplayName());
249 if (StreamTraits::NeedsBitstreamConversion(decoder_.get()))
250 stream_->EnableBitstreamConverter();
251 base::ResetAndReturn(&init_cb_).Run(true);
254 template <DemuxerStream::Type StreamType>
255 void DecoderStream<StreamType>::SatisfyRead(
257 const scoped_refptr<Output>& output) {
258 DCHECK(!read_cb_.is_null());
259 base::ResetAndReturn(&read_cb_).Run(status, output);
262 template <DemuxerStream::Type StreamType>
263 void DecoderStream<StreamType>::Decode(
264 const scoped_refptr<DecoderBuffer>& buffer) {
266 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
267 DCHECK_LT(pending_decode_requests_, GetMaxDecodeRequests());
268 DCHECK(reset_cb_.is_null());
269 DCHECK(buffer.get());
271 int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
273 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
275 if (buffer->end_of_stream())
276 decoding_eos_ = true;
278 ++pending_decode_requests_;
279 decoder_->Decode(buffer,
280 base::Bind(&DecoderStream<StreamType>::OnDecodeDone,
281 weak_factory_.GetWeakPtr(),
283 buffer->end_of_stream()));
286 template <DemuxerStream::Type StreamType>
287 void DecoderStream<StreamType>::FlushDecoder() {
288 Decode(DecoderBuffer::CreateEOSBuffer());
291 template <DemuxerStream::Type StreamType>
292 void DecoderStream<StreamType>::OnDecodeDone(int buffer_size,
294 typename Decoder::Status status) {
295 FUNCTION_DVLOG(2) << ": " << status;
296 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
297 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
299 DCHECK_GT(pending_decode_requests_, 0);
301 --pending_decode_requests_;
303 TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
306 DCHECK(!pending_decode_requests_);
307 decoding_eos_ = false;
310 if (state_ == STATE_ERROR) {
311 DCHECK(read_cb_.is_null());
315 // Drop decoding result if Reset() was called during decoding.
316 // The resetting process will be handled when the decoder is reset.
317 if (!reset_cb_.is_null())
321 case Decoder::kDecodeError:
322 case Decoder::kDecryptError:
323 state_ = STATE_ERROR;
324 ready_outputs_.clear();
325 if (!read_cb_.is_null())
326 SatisfyRead(DECODE_ERROR, NULL);
329 case Decoder::kAborted:
330 // Decoder can return kAborted only when Reset is pending.
335 // Any successful decode counts!
337 StreamTraits::ReportStatistics(statistics_cb_, buffer_size);
339 if (state_ == STATE_NORMAL) {
341 state_ = STATE_END_OF_STREAM;
342 if (ready_outputs_.empty() && !read_cb_.is_null())
343 SatisfyRead(OK, StreamTraits::CreateEOSOutput());
348 ReadFromDemuxerStream();
352 if (state_ == STATE_FLUSHING_DECODER && !pending_decode_requests_)
353 ReinitializeDecoder();
358 template <DemuxerStream::Type StreamType>
359 void DecoderStream<StreamType>::OnDecodeOutputReady(
360 const scoped_refptr<Output>& output) {
361 FUNCTION_DVLOG(2) << ": " << output->timestamp().InMilliseconds() << " ms";
362 DCHECK(output.get());
363 DCHECK(!output->end_of_stream());
364 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
365 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
368 if (state_ == STATE_ERROR) {
369 DCHECK(read_cb_.is_null());
373 // Drop decoding result if Reset() was called during decoding.
374 // The resetting process will be handled when the decoder is reset.
375 if (!reset_cb_.is_null())
378 // TODO(xhwang): VideoDecoder doesn't need to return EOS after it's flushed.
379 // Fix all decoders and remove this block.
380 // Store decoded output.
381 ready_outputs_.push_back(output);
383 if (read_cb_.is_null())
386 // Satisfy outstanding read request, if any.
387 scoped_refptr<Output> read_result = ready_outputs_.front();
388 ready_outputs_.pop_front();
389 SatisfyRead(OK, output);
392 template <DemuxerStream::Type StreamType>
393 void DecoderStream<StreamType>::ReadFromDemuxerStream() {
395 DCHECK_EQ(state_, STATE_NORMAL) << state_;
396 DCHECK(CanDecodeMore());
397 DCHECK(reset_cb_.is_null());
399 state_ = STATE_PENDING_DEMUXER_READ;
400 stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady,
401 weak_factory_.GetWeakPtr()));
404 template <DemuxerStream::Type StreamType>
405 void DecoderStream<StreamType>::OnBufferReady(
406 DemuxerStream::Status status,
407 const scoped_refptr<DecoderBuffer>& buffer) {
408 FUNCTION_DVLOG(2) << ": " << status << ", "
409 << (buffer.get() ? buffer->AsHumanReadableString()
412 DCHECK(task_runner_->BelongsToCurrentThread());
413 DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
415 DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
417 // Decoding has been stopped (e.g due to an error).
418 if (state_ != STATE_PENDING_DEMUXER_READ) {
419 DCHECK(state_ == STATE_ERROR);
420 DCHECK(read_cb_.is_null());
424 state_ = STATE_NORMAL;
426 if (status == DemuxerStream::kConfigChanged) {
427 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
428 DCHECK(stream_->SupportsConfigChanges());
430 if (!config_change_observer_cb_.is_null())
431 config_change_observer_cb_.Run();
433 state_ = STATE_FLUSHING_DECODER;
434 if (!reset_cb_.is_null()) {
435 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
436 // which will continue the resetting process in it's callback.
437 if (!decrypting_demuxer_stream_)
438 Reset(base::ResetAndReturn(&reset_cb_));
439 // Reinitialization will continue after Reset() is done.
446 if (!reset_cb_.is_null()) {
447 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
448 // which will continue the resetting process in it's callback.
449 if (!decrypting_demuxer_stream_)
450 Reset(base::ResetAndReturn(&reset_cb_));
454 if (status == DemuxerStream::kAborted) {
455 if (!read_cb_.is_null())
456 SatisfyRead(DEMUXER_READ_ABORTED, NULL);
460 if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) {
461 const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp();
462 if (active_splice_ || has_splice_ts) {
463 splice_observer_cb_.Run(buffer->splice_timestamp());
464 active_splice_ = has_splice_ts;
468 DCHECK(status == DemuxerStream::kOk) << status;
471 // Read more data if the decoder supports multiple parallel decoding requests.
473 ReadFromDemuxerStream();
476 template <DemuxerStream::Type StreamType>
477 void DecoderStream<StreamType>::ReinitializeDecoder() {
479 DCHECK(task_runner_->BelongsToCurrentThread());
480 DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_;
481 DCHECK_EQ(pending_decode_requests_, 0);
483 DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig());
484 state_ = STATE_REINITIALIZING_DECODER;
485 DecoderStreamTraits<StreamType>::Initialize(
487 StreamTraits::GetDecoderConfig(*stream_),
489 base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized,
490 weak_factory_.GetWeakPtr()),
491 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
492 weak_factory_.GetWeakPtr()));
495 template <DemuxerStream::Type StreamType>
496 void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
498 DCHECK(task_runner_->BelongsToCurrentThread());
499 DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER) << state_;
501 // ReinitializeDecoder() can be called in two cases:
502 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
503 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
504 // Also, Reset() can be called during pending ReinitializeDecoder().
505 // This function needs to handle them all!
507 state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR;
509 if (!reset_cb_.is_null()) {
510 base::ResetAndReturn(&reset_cb_).Run();
514 if (read_cb_.is_null())
517 if (state_ == STATE_ERROR) {
518 SatisfyRead(DECODE_ERROR, NULL);
522 ReadFromDemuxerStream();
525 template <DemuxerStream::Type StreamType>
526 void DecoderStream<StreamType>::ResetDecoder() {
528 DCHECK(task_runner_->BelongsToCurrentThread());
529 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
530 state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_;
531 DCHECK(!reset_cb_.is_null());
533 decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset,
534 weak_factory_.GetWeakPtr()));
537 template <DemuxerStream::Type StreamType>
538 void DecoderStream<StreamType>::OnDecoderReset() {
540 DCHECK(task_runner_->BelongsToCurrentThread());
541 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
542 state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_;
543 // If Reset() was called during pending read, read callback should be fired
544 // before the reset callback is fired.
545 DCHECK(read_cb_.is_null());
546 DCHECK(!reset_cb_.is_null());
548 if (state_ != STATE_FLUSHING_DECODER) {
549 state_ = STATE_NORMAL;
550 active_splice_ = false;
551 base::ResetAndReturn(&reset_cb_).Run();
555 // The resetting process will be continued in OnDecoderReinitialized().
556 ReinitializeDecoder();
559 template class DecoderStream<DemuxerStream::VIDEO>;
560 template class DecoderStream<DemuxerStream::AUDIO>;