Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / media / filters / decoder_stream.cc
index 2863d22..4f712be 100644 (file)
@@ -50,6 +50,8 @@ DecoderStream<StreamType>::DecoderStream(
           new DecoderSelector<StreamType>(task_runner,
                                           decoders.Pass(),
                                           set_decryptor_ready_cb)),
+      active_splice_(false),
+      pending_decode_requests_(0),
       weak_factory_(this) {}
 
 template <DemuxerStream::Type StreamType>
@@ -59,6 +61,7 @@ DecoderStream<StreamType>::~DecoderStream() {
 
 template <DemuxerStream::Type StreamType>
 void DecoderStream<StreamType>::Initialize(DemuxerStream* stream,
+                                           bool low_delay,
                                            const StatisticsCB& statistics_cb,
                                            const InitCB& init_cb) {
   FUNCTION_DVLOG(2);
@@ -70,11 +73,12 @@ void DecoderStream<StreamType>::Initialize(DemuxerStream* stream,
   statistics_cb_ = statistics_cb;
   init_cb_ = init_cb;
   stream_ = stream;
+  low_delay_ = low_delay;
 
   state_ = STATE_INITIALIZING;
   // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
   decoder_selector_->SelectDecoder(
-      stream,
+      stream, low_delay,
       base::Bind(&DecoderStream<StreamType>::OnDecoderSelected,
                  weak_factory_.GetWeakPtr()));
 }
@@ -84,36 +88,45 @@ void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
   FUNCTION_DVLOG(2);
   DCHECK(task_runner_->BelongsToCurrentThread());
   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
-         state_ == STATE_ERROR) << state_;
+         state_ == STATE_ERROR || state_ == STATE_REINITIALIZING_DECODER ||
+         state_ == STATE_PENDING_DEMUXER_READ)
+      << state_;
   // No two reads in the flight at any time.
   DCHECK(read_cb_.is_null());
   // No read during resetting or stopping process.
   DCHECK(reset_cb_.is_null());
   DCHECK(stop_cb_.is_null());
 
+  read_cb_ = read_cb;
+
   if (state_ == STATE_ERROR) {
-    task_runner_->PostTask(FROM_HERE, base::Bind(
-        read_cb, DECODE_ERROR, scoped_refptr<Output>()));
+    task_runner_->PostTask(FROM_HERE,
+                           base::Bind(base::ResetAndReturn(&read_cb_),
+                                      DECODE_ERROR,
+                                      scoped_refptr<Output>()));
     return;
   }
 
-  read_cb_ = read_cb;
+  if (!ready_outputs_.empty()) {
+    task_runner_->PostTask(FROM_HERE, base::Bind(
+        base::ResetAndReturn(&read_cb_), OK, ready_outputs_.front()));
+    ready_outputs_.pop_front();
+  }
 
-  if (state_ == STATE_FLUSHING_DECODER) {
-    FlushDecoder();
+  // Decoder may be in reinitializing state as result of the previous Read().
+  if (state_ == STATE_REINITIALIZING_DECODER)
     return;
-  }
 
-  scoped_refptr<Output> output = decoder_->GetDecodeOutput();
+  if (!CanDecodeMore())
+    return;
 
-  // If the decoder has queued output ready to go we don't need a demuxer read.
-  if (output) {
-    task_runner_->PostTask(
-        FROM_HERE, base::Bind(base::ResetAndReturn(&read_cb_), OK, output));
+  if (state_ == STATE_FLUSHING_DECODER) {
+    FlushDecoder();
     return;
   }
 
-  ReadFromDemuxerStream();
+  if (state_ != STATE_PENDING_DEMUXER_READ)
+    ReadFromDemuxerStream();
 }
 
 template <DemuxerStream::Type StreamType>
@@ -126,6 +139,13 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
 
   reset_cb_ = closure;
 
+  if (!read_cb_.is_null()) {
+    task_runner_->PostTask(FROM_HERE, base::Bind(
+        base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
+  }
+
+  ready_outputs_.clear();
+
   // During decoder reinitialization, the Decoder does not need to be and
   // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
   // reinitialization.
@@ -138,11 +158,6 @@ void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
   if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
     return;
 
-  // The Decoder API guarantees that if Decoder::Reset() is called during
-  // a pending decode, the decode callback must be fired before the reset
-  // callback is fired. Therefore, we can call Decoder::Reset() regardless
-  // of if we have a pending decode and always satisfy the reset callback when
-  // the decoder reset is finished.
   if (decrypting_demuxer_stream_) {
     decrypting_demuxer_stream_->Reset(base::Bind(
         &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
@@ -172,9 +187,10 @@ void DecoderStream<StreamType>::Stop(const base::Closure& closure) {
   weak_factory_.InvalidateWeakPtrs();
 
   // Post callbacks to prevent reentrance into this object.
-  if (!read_cb_.is_null())
+  if (!read_cb_.is_null()) {
     task_runner_->PostTask(FROM_HERE, base::Bind(
         base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
+  }
   if (!reset_cb_.is_null())
     task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
 
@@ -210,6 +226,24 @@ bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
 }
 
 template <DemuxerStream::Type StreamType>
+bool DecoderStream<StreamType>::CanDecodeMore() const {
+  DCHECK(task_runner_->BelongsToCurrentThread());
+
+  // Limit total number of outputs stored in |ready_outputs_| and being decoded.
+  // It only makes sense to saturate decoder completely when output queue is
+  // empty.
+  int num_decodes =
+      static_cast<int>(ready_outputs_.size()) + pending_decode_requests_;
+  return num_decodes < decoder_->GetMaxDecodeRequests();
+}
+
+template <>
+bool DecoderStream<DemuxerStream::AUDIO>::CanDecodeMore() const {
+  DCHECK(task_runner_->BelongsToCurrentThread());
+  return !pending_decode_requests_ && ready_outputs_.empty();
+}
+
+template <DemuxerStream::Type StreamType>
 void DecoderStream<StreamType>::OnDecoderSelected(
     scoped_ptr<Decoder> selected_decoder,
     scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
@@ -252,22 +286,11 @@ void DecoderStream<StreamType>::SatisfyRead(
 }
 
 template <DemuxerStream::Type StreamType>
-void DecoderStream<StreamType>::AbortRead() {
-  // Abort read during pending reset. It is safe to fire the |read_cb_| directly
-  // instead of posting it because the renderer won't call into this class
-  // again when it's in kFlushing state.
-  // TODO(xhwang): Improve the resetting process to avoid this dependency on the
-  // caller.
-  DCHECK(!reset_cb_.is_null());
-  SatisfyRead(ABORTED, NULL);
-}
-
-template <DemuxerStream::Type StreamType>
 void DecoderStream<StreamType>::Decode(
     const scoped_refptr<DecoderBuffer>& buffer) {
   FUNCTION_DVLOG(2);
   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
-  DCHECK(!read_cb_.is_null());
+  DCHECK(CanDecodeMore());
   DCHECK(reset_cb_.is_null());
   DCHECK(stop_cb_.is_null());
   DCHECK(buffer);
@@ -275,6 +298,7 @@ void DecoderStream<StreamType>::Decode(
   int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
 
   TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
+  ++pending_decode_requests_;
   decoder_->Decode(buffer,
                    base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
                               weak_factory_.GetWeakPtr(),
@@ -283,7 +307,8 @@ void DecoderStream<StreamType>::Decode(
 
 template <DemuxerStream::Type StreamType>
 void DecoderStream<StreamType>::FlushDecoder() {
-  Decode(DecoderBuffer::CreateEOSBuffer());
+  if (pending_decode_requests_ == 0)
+    Decode(DecoderBuffer::CreateEOSBuffer());
 }
 
 template <DemuxerStream::Type StreamType>
@@ -291,28 +316,42 @@ void DecoderStream<StreamType>::OnDecodeOutputReady(
     int buffer_size,
     typename Decoder::Status status,
     const scoped_refptr<Output>& output) {
-  FUNCTION_DVLOG(2);
-  DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
-  DCHECK(!read_cb_.is_null());
+  FUNCTION_DVLOG(2) << status << " " << output;
+  DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
+         state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
+      << state_;
   DCHECK(stop_cb_.is_null());
   DCHECK_EQ(status == Decoder::kOk, output != NULL);
+  DCHECK_GT(pending_decode_requests_, 0);
+
+  --pending_decode_requests_;
 
   TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
 
+  if (state_ == STATE_ERROR) {
+    DCHECK(read_cb_.is_null());
+    return;
+  }
+
   if (status == Decoder::kDecodeError) {
     state_ = STATE_ERROR;
-    SatisfyRead(DECODE_ERROR, NULL);
+    ready_outputs_.clear();
+    if (!read_cb_.is_null())
+      SatisfyRead(DECODE_ERROR, NULL);
     return;
   }
 
   if (status == Decoder::kDecryptError) {
     state_ = STATE_ERROR;
-    SatisfyRead(DECRYPT_ERROR, NULL);
+    ready_outputs_.clear();
+    if (!read_cb_.is_null())
+      SatisfyRead(DECRYPT_ERROR, NULL);
     return;
   }
 
   if (status == Decoder::kAborted) {
-    SatisfyRead(ABORTED, NULL);
+    if (!read_cb_.is_null())
+      SatisfyRead(ABORTED, NULL);
     return;
   }
 
@@ -323,10 +362,8 @@ void DecoderStream<StreamType>::OnDecodeOutputReady(
 
   // Drop decoding result if Reset() was called during decoding.
   // The resetting process will be handled when the decoder is reset.
-  if (!reset_cb_.is_null()) {
-    AbortRead();
+  if (!reset_cb_.is_null())
     return;
-  }
 
   // Decoder flushed. Reinitialize the decoder.
   if (state_ == STATE_FLUSHING_DECODER &&
@@ -344,14 +381,27 @@ void DecoderStream<StreamType>::OnDecodeOutputReady(
   }
 
   DCHECK(output);
-  SatisfyRead(OK, output);
+
+  // Store decoded output.
+  ready_outputs_.push_back(output);
+  scoped_refptr<Output> extra_output;
+  while ((extra_output = decoder_->GetDecodeOutput()) != NULL) {
+    ready_outputs_.push_back(extra_output);
+  }
+
+  // Satisfy outstanding read request, if any.
+  if (!read_cb_.is_null()) {
+    scoped_refptr<Output> read_result = ready_outputs_.front();
+    ready_outputs_.pop_front();
+    SatisfyRead(OK, output);
+  }
 }
 
 template <DemuxerStream::Type StreamType>
 void DecoderStream<StreamType>::ReadFromDemuxerStream() {
   FUNCTION_DVLOG(2);
   DCHECK_EQ(state_, STATE_NORMAL) << state_;
-  DCHECK(!read_cb_.is_null());
+  DCHECK(CanDecodeMore());
   DCHECK(reset_cb_.is_null());
   DCHECK(stop_cb_.is_null());
 
@@ -366,11 +416,19 @@ void DecoderStream<StreamType>::OnBufferReady(
     const scoped_refptr<DecoderBuffer>& buffer) {
   FUNCTION_DVLOG(2) << ": " << status;
   DCHECK(task_runner_->BelongsToCurrentThread());
-  DCHECK_EQ(state_, STATE_PENDING_DEMUXER_READ) << state_;
+  DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR ||
+         state_ == STATE_STOPPED)
+      << state_;
   DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
-  DCHECK(!read_cb_.is_null());
   DCHECK(stop_cb_.is_null());
 
+  // Decoding has been stopped (e.g due to an error).
+  if (state_ != STATE_PENDING_DEMUXER_READ) {
+    DCHECK(state_ == STATE_ERROR || state_ == STATE_STOPPED);
+    DCHECK(read_cb_.is_null());
+    return;
+  }
+
   state_ = STATE_NORMAL;
 
   if (status == DemuxerStream::kConfigChanged) {
@@ -382,7 +440,6 @@ void DecoderStream<StreamType>::OnBufferReady(
 
     state_ = STATE_FLUSHING_DECODER;
     if (!reset_cb_.is_null()) {
-      AbortRead();
       // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
       // which will continue the resetting process in it's callback.
       if (!decrypting_demuxer_stream_)
@@ -395,7 +452,6 @@ void DecoderStream<StreamType>::OnBufferReady(
   }
 
   if (!reset_cb_.is_null()) {
-    AbortRead();
     // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
     // which will continue the resetting process in it's callback.
     if (!decrypting_demuxer_stream_)
@@ -408,13 +464,20 @@ void DecoderStream<StreamType>::OnBufferReady(
     return;
   }
 
-  if (!splice_observer_cb_.is_null() && !buffer->end_of_stream() &&
-      buffer->splice_timestamp() != kNoTimestamp()) {
-    splice_observer_cb_.Run(buffer->splice_timestamp());
+  if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) {
+    const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp();
+    if (active_splice_ || has_splice_ts) {
+      splice_observer_cb_.Run(buffer->splice_timestamp());
+      active_splice_ = has_splice_ts;
+    }
   }
 
   DCHECK(status == DemuxerStream::kOk) << status;
   Decode(buffer);
+
+  // Read more data if the decoder supports multiple parallel decoding requests.
+  if (CanDecodeMore() && !buffer->end_of_stream())
+    ReadFromDemuxerStream();
 }
 
 template <DemuxerStream::Type StreamType>
@@ -422,11 +485,14 @@ void DecoderStream<StreamType>::ReinitializeDecoder() {
   FUNCTION_DVLOG(2);
   DCHECK(task_runner_->BelongsToCurrentThread());
   DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_;
+  DCHECK_EQ(pending_decode_requests_, 0);
 
   DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig());
   state_ = STATE_REINITIALIZING_DECODER;
-  decoder_->Initialize(
+  DecoderStreamTraits<StreamType>::Initialize(
+      decoder_.get(),
       StreamTraits::GetDecoderConfig(*stream_),
+      low_delay_,
       base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized,
                  weak_factory_.GetWeakPtr()));
 }
@@ -447,9 +513,8 @@ void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
   state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR;
 
   if (!reset_cb_.is_null()) {
-    if (!read_cb_.is_null())
-      AbortRead();
     base::ResetAndReturn(&reset_cb_).Run();
+    return;
   }
 
   if (read_cb_.is_null())
@@ -503,26 +568,14 @@ void DecoderStream<StreamType>::StopDecoder() {
   DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_;
   DCHECK(!stop_cb_.is_null());
 
-  decoder_->Stop(base::Bind(&DecoderStream<StreamType>::OnDecoderStopped,
-                            weak_factory_.GetWeakPtr()));
-}
-
-template <DemuxerStream::Type StreamType>
-void DecoderStream<StreamType>::OnDecoderStopped() {
-  FUNCTION_DVLOG(2);
-  DCHECK(task_runner_->BelongsToCurrentThread());
-  DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_;
-  // If Stop() was called during pending read/reset, read/reset callback should
-  // be fired before the stop callback is fired.
-  DCHECK(read_cb_.is_null());
-  DCHECK(reset_cb_.is_null());
-  DCHECK(!stop_cb_.is_null());
-
   state_ = STATE_STOPPED;
+  decoder_->Stop();
   stream_ = NULL;
   decoder_.reset();
   decrypting_demuxer_stream_.reset();
-  base::ResetAndReturn(&stop_cb_).Run();
+  // Post |stop_cb_| because pending |read_cb_| and/or |reset_cb_| are also
+  // posted in Stop().
+  task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_));
 }
 
 template class DecoderStream<DemuxerStream::VIDEO>;