DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
was_read_aborted_ = true;
if (pending_read_cb_)
- std::move(pending_read_cb_).Run(Status::kAborted, nullptr);
+ std::move(pending_read_cb_).Run(Status::kAborted, {});
}
protected:
is_bitstream_enable_in_progress_ = false;
if (pending_read_cb_) {
- Read(std::move(pending_read_cb_));
+ Read(1, std::move(pending_read_cb_));
}
}
}
if (buffer->end_of_stream()) {
- std::move(pending_read_cb_).Run(Status::kError, nullptr);
+ std::move(pending_read_cb_).Run(Status::kError, {});
} else {
- std::move(pending_read_cb_).Run(Status::kOk, std::move(buffer));
+ std::move(pending_read_cb_).Run(Status::kOk, {std::move(buffer)});
}
}
pending_config_change_ = true;
if (pending_read_cb_) {
- Read(std::move(pending_read_cb_));
+ Read(1, std::move(pending_read_cb_));
}
}
buffer_requester_.reset();
if (pending_read_cb_) {
- std::move(pending_read_cb_)
- .Run(Status::kAborted, scoped_refptr<media::DecoderBuffer>(nullptr));
+ std::move(pending_read_cb_).Run(Status::kAborted, {});
}
}
// DemuxerStream partial implementation.
- void Read(ReadCB read_cb) final {
+ void Read(uint32_t count, ReadCB read_cb) final {
DVLOG(3) << __func__;
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!pending_read_cb_);
DCHECK(!buffer_requester_ || current_buffer_provider_);
+ DCHECK_EQ(count, 1u)
+ << "FrameInjectingDemuxerStream only reads a single buffer.";
pending_read_cb_ = std::move(read_cb);
// Check whether OnMojoDisconnect() has been called and abort if so.
if (!buffer_requester_) {
- std::move(pending_read_cb_)
- .Run(Status::kAborted, scoped_refptr<media::DecoderBuffer>(nullptr));
+ std::move(pending_read_cb_).Run(Status::kAborted, {});
return;
}
// Handle the special case of a config change.
if (pending_config_change_) {
pending_config_change_ = false;
- std::move(pending_read_cb_).Run(Status::kConfigChanged, nullptr);
+ std::move(pending_read_cb_).Run(Status::kConfigChanged, {});
return;
}
is_demuxer_read_pending_ = true;
demuxer_stream_->Read(
- base::BindOnce(&WebEngineAudioRenderer::OnDemuxerStreamReadDone,
- weak_factory_.GetWeakPtr()));
+ 1, base::BindOnce(&WebEngineAudioRenderer::OnDemuxerStreamReadDone,
+ weak_factory_.GetWeakPtr()));
}
void WebEngineAudioRenderer::OnDemuxerStreamReadDone(
media::DemuxerStream::Status read_status,
- scoped_refptr<media::DecoderBuffer> buffer) {
+ media::DemuxerStream::DecoderBufferVector buffers) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(is_demuxer_read_pending_);
+ DCHECK_LE(buffers.size(), 1u)
+ << "ReadDemuxerStream() only reads a single buffer.";
is_demuxer_read_pending_ = false;
return;
}
+ scoped_refptr<media::DecoderBuffer> buffer = std::move(buffers[0]);
+ DCHECK(buffer);
+
if (buffer->end_of_stream()) {
is_at_end_of_stream_ = true;
} else {
// Helpers to pump data from |demuxer_stream_| to |stream_sink_|.
void ScheduleReadDemuxerStream();
void ReadDemuxerStream();
- void OnDemuxerStreamReadDone(media::DemuxerStream::Status status,
- scoped_refptr<media::DecoderBuffer> buffer);
+ void OnDemuxerStreamReadDone(
+ media::DemuxerStream::Status status,
+ media::DemuxerStream::DecoderBufferVector buffers);
// Sends the specified packet to |stream_sink_|.
void SendInputPacket(media::StreamProcessorHelper::IoPacket packet);
while (!read_queue_.empty())
read_queue_.pop();
if (read_cb_)
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
}
bool is_read_pending() const { return !!read_cb_; }
// DemuxerStream implementation.
- void Read(ReadCB read_cb) override {
+ void Read(uint32_t count, ReadCB read_cb) override {
+ DCHECK_EQ(count, 1u) << "TestDemuxerStream only reads a single buffer.";
read_cb_ = std::move(read_cb);
SatisfyRead();
}
status = kOk;
}
- std::move(read_cb_).Run(status, result.buffer);
+ std::move(read_cb_).Run(status, {result.buffer});
}
media::AudioDecoderConfig config_;
// Returns the total size of buffers inside the queue.
size_t data_size() const { return data_size_; }
+ // Returns the number of buffers in the queue.
+ size_t queue_size() const { return queue_.size(); }
+
private:
using Queue = base::circular_deque<scoped_refptr<DecoderBuffer>>;
Queue queue_;
return "aborted";
case DemuxerStream::kConfigChanged:
return "config_changed";
+#if defined(TIZEN_MULTIMEDIA)
+ case DemuxerStream::kNeedBuffer:
+ return "need_buffer";
+#endif // defined(TIZEN_MULTIMEDIA)
case DemuxerStream::kError:
return "error";
}
#ifndef MEDIA_BASE_DEMUXER_STREAM_H_
#define MEDIA_BASE_DEMUXER_STREAM_H_
+#include <vector>
+
#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "media/base/media_export.h"
static const char* GetStatusName(Status status);
- // Request a buffer to returned via the provided callback.
- //
- // The first parameter indicates the status of the read.
- // The second parameter is non-NULL and contains media data
- // or the end of the stream if the first parameter is kOk. NULL otherwise.
- typedef base::OnceCallback<void(Status, scoped_refptr<DecoderBuffer>)> ReadCB;
- virtual void Read(ReadCB read_cb) = 0;
+ using DecoderBufferVector = std::vector<scoped_refptr<DecoderBuffer>>;
+ using ReadCB = base::OnceCallback<void(Status, DecoderBufferVector)>;
+
+ // Request buffers to be returned via the provided callback.
+ // The first parameter indicates the status of the read request.
+ // If the status is kAborted, kConfigChanged or kError, the vector must be
+ // empty. If the status is kOk, the size of vector should be 1<=n<=N, where
+ // N is the requested count. The last buffer of the vector could be EOS.
+ virtual void Read(uint32_t count, ReadCB read_cb) = 0;
// Returns the audio/video decoder configuration. It is an error to call the
// audio method on a video stream and vice versa. After |kConfigChanged| is
next_read_num_ = 0;
}
-void FakeDemuxerStream::Read(ReadCB read_cb) {
+// Only return one buffer at a time so we ignore the count.
+void FakeDemuxerStream::Read(uint32_t /*count*/, ReadCB read_cb) {
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(!read_cb_);
read_to_hold_ = -1;
if (read_cb_)
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
}
void FakeDemuxerStream::Error() {
read_to_hold_ = -1;
if (read_cb_)
- std::move(read_cb_).Run(kError, nullptr);
+ std::move(read_cb_).Run(kError, {});
}
void FakeDemuxerStream::SeekToStart() {
if (num_buffers_left_in_current_config_ == 0) {
// End of stream.
if (num_configs_left_ == 0) {
- std::move(read_cb_).Run(kOk, DecoderBuffer::CreateEOSBuffer());
+ std::move(read_cb_).Run(kOk, {DecoderBuffer::CreateEOSBuffer()});
return;
}
// Config change.
num_buffers_left_in_current_config_ = num_buffers_in_one_config_;
UpdateVideoDecoderConfig();
- std::move(read_cb_).Run(kConfigChanged, nullptr);
+ std::move(read_cb_).Run(kConfigChanged, {});
return;
}
num_configs_left_--;
num_buffers_returned_++;
- std::move(read_cb_).Run(kOk, buffer);
+ std::move(read_cb_).Run(kOk, {std::move(buffer)});
}
FakeMediaResource::FakeMediaResource(int num_video_configs,
~FakeDemuxerStream() override;
// DemuxerStream implementation.
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
Type type() const override;
~FakeDemuxerStreamTest() override = default;
void BufferReady(DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
DCHECK(read_pending_);
+ DCHECK_LE(buffers.size(), 1u)
+ << "FakeDemuxerStreamTest only reads a single-buffer.";
+ scoped_refptr<DecoderBuffer> buffer =
+ buffers.empty() ? nullptr : std::move(buffers[0]);
+
read_pending_ = false;
status_ = status;
if (status == DemuxerStream::kOk && !buffer->end_of_stream())
void ReadAndExpect(ReadResult result) {
EXPECT_FALSE(read_pending_);
read_pending_ = true;
- stream_->Read(base::BindOnce(&FakeDemuxerStreamTest::BufferReady,
- base::Unretained(this)));
+ stream_->Read(1, base::BindOnce(&FakeDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
ExpectReadResult(result);
}
void ReadUntilPending() {
while (true) {
read_pending_ = true;
- stream_->Read(base::BindOnce(&FakeDemuxerStreamTest::BufferReady,
- base::Unretained(this)));
+ stream_->Read(1, base::BindOnce(&FakeDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
if (read_pending_)
break;
DCHECK(!read_cb_);
}
-void FakeTextTrackStream::Read(ReadCB read_cb) {
+// Only return one buffer at a time so we ignore the count.
+void FakeTextTrackStream::Read(uint32_t /*count*/, ReadCB read_cb) {
DCHECK(read_cb);
DCHECK(!read_cb_);
OnRead();
// Assume all fake text buffers are keyframes.
buffer->set_is_key_frame(true);
- std::move(read_cb_).Run(kOk, buffer);
+ std::move(read_cb_).Run(kOk, {std::move(buffer)});
}
void FakeTextTrackStream::AbortPendingRead() {
DCHECK(read_cb_);
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
}
void FakeTextTrackStream::SendEosNotification() {
DCHECK(read_cb_);
- std::move(read_cb_).Run(kOk, DecoderBuffer::CreateEOSBuffer());
+ std::move(read_cb_).Run(kOk, {DecoderBuffer::CreateEOSBuffer()});
}
void FakeTextTrackStream::Stop() {
~FakeTextTrackStream() override;
// DemuxerStream implementation.
- void Read(ReadCB) override;
+ void Read(uint32_t count, ReadCB) override;
MOCK_METHOD0(audio_decoder_config, AudioDecoderConfig());
MOCK_METHOD0(video_decoder_config, VideoDecoderConfig());
Type type() const override;
// DemuxerStream implementation.
Type type() const override;
StreamLiveness liveness() const override;
- void Read(ReadCB read_cb) override { OnRead(read_cb); }
+ void Read(uint32_t count, ReadCB read_cb) override { OnRead(read_cb); }
MOCK_METHOD1(OnRead, void(ReadCB& read_cb));
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
return arg.Matches(config);
}
-MATCHER_P(HasTimestamp, timestamp_in_ms, "") {
- return arg.get() && !arg->end_of_stream() &&
- arg->timestamp().InMilliseconds() == timestamp_in_ms;
+MATCHER_P(ReadOneAndHasTimestamp, timestamp_in_ms, "") {
+ DCHECK_EQ(arg.size(), 1u);
+ return !arg[0]->end_of_stream() &&
+ arg[0]->timestamp().InMilliseconds() == timestamp_in_ms;
}
-MATCHER(IsEndOfStream, "") {
- return arg.get() && arg->end_of_stream();
+MATCHER(ReadOneAndIsEndOfStream, "") {
+ DCHECK_EQ(arg.size(), 1u);
+ return arg[0]->end_of_stream();
+}
+
+MATCHER(IsEmpty, "") {
+ return arg.empty();
}
MATCHER(EosBeforeHaveMetadata, "") {
return !text_track_state_map_.empty();
}
+void TextRenderer::OnBuffersRead(DemuxerStream* text_stream,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
+ DCHECK_LE(buffers.size(), 1u) << "TextRenderer only reads a single-buffer.";
+ BufferReady(text_stream, status,
+ buffers.empty() ? nullptr : std::move(buffers[0]));
+}
+
void TextRenderer::BufferReady(DemuxerStream* stream,
DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> input) {
state->read_state = TextTrackState::kReadPending;
++pending_read_count_;
- text_stream->Read(base::BindOnce(&TextRenderer::BufferReady,
- weak_factory_.GetWeakPtr(), text_stream));
+ text_stream->Read(1, base::BindOnce(&TextRenderer::OnBuffersRead,
+ weak_factory_.GetWeakPtr(), text_stream));
}
TextRenderer::TextTrackState::TextTrackState(std::unique_ptr<TextTrack> tt)
TextRanges text_ranges_;
};
+ void OnBuffersRead(DemuxerStream* text_stream,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers);
+
// Callback delivered by the demuxer |text_stream| when
// a read from the stream completes.
void BufferReady(DemuxerStream* text_stream,
ASSERT_NE(first_decoder, nullptr);
// Make a regular DemuxerStream::Read().
+ scoped_refptr<DecoderBuffer> buffer = base::MakeRefCounted<DecoderBuffer>(12);
+ DemuxerStream::DecoderBufferVector buffers;
+ buffers.emplace_back(buffer);
EXPECT_CALL(*demuxer_stream(), OnRead(_))
- .WillOnce(RunOnceCallback<0>(DemuxerStream::kOk, new DecoderBuffer(12)));
+ .WillOnce(RunOnceCallback<0>(DemuxerStream::kOk, buffers));
EXPECT_CALL(*decoder(), Decode(IsRegularDecoderBuffer(), _))
.WillOnce(Invoke(this, &AudioDecoderStreamTest::ProduceDecoderOutput));
base::RunLoop run_loop0;
// Expect the decoder to be flushed. Upon flushing, the decoder releases
// internally buffered output.
EXPECT_CALL(*demuxer_stream(), OnRead(_))
- .WillOnce(RunOnceCallback<0>(DemuxerStream::kConfigChanged, nullptr));
+ .WillOnce(RunOnceCallback<0>(DemuxerStream::kConfigChanged,
+ DemuxerStream::DecoderBufferVector()));
EXPECT_CALL(*decoder(), Decode(IsEOSDecoderBuffer(), _))
.WillOnce(Invoke(this, &AudioDecoderStreamTest::ProduceDecoderOutput));
base::RunLoop run_loop1;
base::AutoLock auto_lock(lock_);
ChangeState_Locked(RETURNING_ABORT_FOR_READS);
if (read_cb_)
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
}
void ChunkDemuxerStream::CompletePendingReadIfPossible() {
// data will be sent.
if (read_cb_) {
std::move(read_cb_).Run(DemuxerStream::kOk,
- StreamParserBuffer::CreateEOSBuffer());
+ {StreamParserBuffer::CreateEOSBuffer()});
}
}
}
// DemuxerStream methods.
-void ChunkDemuxerStream::Read(ReadCB read_cb) {
+void ChunkDemuxerStream::Read(uint32_t count, ReadCB read_cb) {
base::AutoLock auto_lock(lock_);
DCHECK_NE(state_, UNINITIALIZED);
DCHECK(!read_cb_);
read_cb_ = BindToCurrentLoop(std::move(read_cb));
+ requested_buffer_count_ = count;
if (!is_enabled_) {
DVLOG(1) << "Read from disabled stream, returning EOS";
- std::move(read_cb_).Run(kOk, StreamParserBuffer::CreateEOSBuffer());
+ std::move(read_cb_).Run(DemuxerStream::kOk,
+ {StreamParserBuffer::CreateEOSBuffer()});
return;
}
stream_->Seek(timestamp);
} else if (read_cb_) {
DVLOG(1) << "Read from disabled stream, returning EOS";
- std::move(read_cb_).Run(kOk, StreamParserBuffer::CreateEOSBuffer());
+ std::move(read_cb_).Run(kOk, {StreamParserBuffer::CreateEOSBuffer()});
}
}
lock_.AssertAcquired();
DCHECK(read_cb_);
- DemuxerStream::Status status = DemuxerStream::kAborted;
- scoped_refptr<StreamParserBuffer> buffer;
-
switch (state_) {
case UNINITIALIZED:
+ requested_buffer_count_ = 0;
NOTREACHED();
return;
- case RETURNING_DATA_FOR_READS:
- switch (stream_->GetNextBuffer(&buffer)) {
- case SourceBufferStreamStatus::kSuccess:
- status = DemuxerStream::kOk;
- DVLOG(2) << __func__ << ": returning kOk, type " << type_ << ", dts "
- << buffer->GetDecodeTimestamp().InSecondsF() << ", pts "
- << buffer->timestamp().InSecondsF() << ", dur "
- << buffer->duration().InSecondsF() << ", key "
- << buffer->is_key_frame();
-#if BUILDFLAG(IS_TIZEN_TV)
- buffer->set_dts(buffer->GetDecodeTimestamp().ToPresentationTime());
-#endif
- break;
- case SourceBufferStreamStatus::kNeedBuffer:
- DVLOG(2) << __func__ << ": returning kNeedBuffer, type " << type_;
-#if defined(TIZEN_MULTIMEDIA)
- if (IsUpstreamArchitectureEnabled() ||
- (type_ == DemuxerStream::VIDEO &&
- stream_->GetCurrentVideoDecoderConfig().is_rtc())) {
- // Return early without calling |read_cb_| since we don't have
- // any data yet. Not invoking read_cb_ here, will make sure that
- // it's available once we have some actual chunks to share.
- return;
- } else {
- // DemuxerStream::kNeedBuffer is a custom enum value handled by
- // ESPP. Keep in mind that here, the read_cb_ at the function's
- // bottom will be invoked.
- status = DemuxerStream::kNeedBuffer;
- buffer = nullptr;
- break;
- }
-#else
- return;
-#endif
- case SourceBufferStreamStatus::kEndOfStream:
- status = DemuxerStream::kOk;
- buffer = StreamParserBuffer::CreateEOSBuffer();
- DVLOG(2) << __func__ << ": returning kOk with EOS buffer, type "
- << type_;
- break;
- case SourceBufferStreamStatus::kConfigChange:
- status = kConfigChanged;
- buffer = nullptr;
- DVLOG(2) << __func__ << ": returning kConfigChange, type " << type_;
- break;
- }
- break;
case RETURNING_ABORT_FOR_READS:
// Null buffers should be returned in this state since we are waiting
// for a seek. Any buffers in the SourceBuffer should NOT be returned
// because they are associated with the seek.
- status = DemuxerStream::kAborted;
- buffer = nullptr;
+ requested_buffer_count_ = 0;
+ std::move(read_cb_).Run(kAborted, {});
DVLOG(2) << __func__ << ": returning kAborted, type " << type_;
- break;
+ return;
case SHUTDOWN:
- status = DemuxerStream::kOk;
- buffer = StreamParserBuffer::CreateEOSBuffer();
+ requested_buffer_count_ = 0;
+ std::move(read_cb_).Run(kOk, {StreamParserBuffer::CreateEOSBuffer()});
DVLOG(2) << __func__ << ": returning kOk with EOS buffer, type " << type_;
+ return;
+ case RETURNING_DATA_FOR_READS:
break;
}
+ DCHECK(state_ == RETURNING_DATA_FOR_READS);
+
+ auto [status, buffers] = GetPendingBuffers_Locked();
+
+ // If the status from |stream_| is kNeedBuffer and there's no buffers,
+ // then after ChunkDemuxerStream::Append, try to read data again,
+ // 'requested_buffer_count_' does not need to be cleared to 0.
+ if (status == SourceBufferStreamStatus::kNeedBuffer && buffers.empty()) {
+ return;
+ }
+ // If the status from |stream_| is kConfigChange, the vector muse be
+ // empty, then need to notify new config by running |read_cb_|.
+ if (status == SourceBufferStreamStatus::kConfigChange) {
+ DCHECK(buffers.empty());
+ requested_buffer_count_ = 0;
+ std::move(read_cb_).Run(kConfigChanged, std::move(buffers));
+ return;
+ }
+ // Other cases are kOk and just return the buffers.
+ DCHECK(!buffers.empty());
+ requested_buffer_count_ = 0;
+ std::move(read_cb_).Run(kOk, std::move(buffers));
+}
+
+std::pair<SourceBufferStreamStatus, DemuxerStream::DecoderBufferVector>
+ChunkDemuxerStream::GetPendingBuffers_Locked() {
+ lock_.AssertAcquired();
+ DemuxerStream::DecoderBufferVector output_buffers;
+ for (uint32_t i = 0; i < requested_buffer_count_; ++i) {
+ // This aims to avoid send out buffers with different config. To
+ // simply the config change handling on renderer(receiver) side, prefer to
+ // send out buffers before config change happens.
+ if (stream_->IsNextBufferConfigChanged() && !output_buffers.empty()) {
+ DVLOG(3) << __func__ << " status=0"
+ << ", type=" << type_ << ", req_size=" << requested_buffer_count_
+ << ", out_size=" << output_buffers.size();
+ return {SourceBufferStreamStatus::kSuccess, std::move(output_buffers)};
+ }
+
+ scoped_refptr<StreamParserBuffer> buffer;
+ SourceBufferStreamStatus status = stream_->GetNextBuffer(&buffer);
+ switch (status) {
+ case SourceBufferStreamStatus::kSuccess:
+#if BUILDFLAG(IS_TIZEN_TV)
+ buffer->set_dts(buffer->GetDecodeTimestamp().ToPresentationTime());
+#endif
+ output_buffers.emplace_back(buffer);
+ break;
+ case SourceBufferStreamStatus::kNeedBuffer:
+ // Return early with calling |read_cb_| if output_buffers has buffers
+ // since there is no more readable data.
+ DVLOG(3) << __func__ << " status=" << (int)status << ", type=" << type_
+ << ", req_size=" << requested_buffer_count_
+ << ", out_size=" << output_buffers.size();
+ return {status, std::move(output_buffers)};
+ case SourceBufferStreamStatus::kEndOfStream:
+ output_buffers.emplace_back(StreamParserBuffer::CreateEOSBuffer());
+ DVLOG(3) << __func__ << " status=" << (int)status << ", type=" << type_
+ << ", req_size=" << requested_buffer_count_
+ << ", out_size=" << output_buffers.size();
+ return {status, std::move(output_buffers)};
+ case SourceBufferStreamStatus::kConfigChange:
+ // Since IsNextBufferConfigChanged has detected config change happen and
+ // send out buffers if |output_buffers| has buffer. When confige
+ // change actually happen it should be the first time run this |for
+ // loop|, i.e. output_buffers should be empty.
+ DCHECK(output_buffers.empty());
+ DVLOG(3) << __func__ << " status=" << (int)status << ", type=" << type_
+ << ", req_size=" << requested_buffer_count_
+ << ", out_size=" << output_buffers.size();
+ return {status, std::move(output_buffers)};
+ }
+ }
- std::move(read_cb_).Run(status, buffer);
+ DCHECK_EQ(output_buffers.size(),
+ static_cast<size_t>(requested_buffer_count_));
+ DVLOG(3) << __func__ << " status are always kSuccess"
+ << ", type=" << type_ << ", req_size=" << requested_buffer_count_
+ << ", out_size=" << output_buffers.size();
+ return {SourceBufferStreamStatus::kSuccess, std::move(output_buffers)};
}
ChunkDemuxer::ChunkDemuxer(
void UnmarkEndOfStream();
// DemuxerStream methods.
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
Type type() const override;
StreamLiveness liveness() const override;
AudioDecoderConfig audio_decoder_config() override;
void CompletePendingReadIfPossible_Locked() EXCLUSIVE_LOCKS_REQUIRED(lock_);
+ std::pair<SourceBufferStreamStatus, DemuxerStream::DecoderBufferVector>
+ GetPendingBuffers_Locked() EXCLUSIVE_LOCKS_REQUIRED(lock_);
+
// Specifies the type of the stream.
const Type type_;
AppendObserverCB append_observer_cb_;
GroupStartObserverCB group_start_observer_cb_;
+ // Requested buffer count. The actual returned buffer count could be less
+ // according to DemuxerStream::Read() API.
+ uint32_t requested_buffer_count_ = 0;
+
mutable base::Lock lock_;
State state_ GUARDED_BY(lock_);
ReadCB read_cb_ GUARDED_BY(lock_);
}
}
-static void OnReadDone(const base::TimeDelta& expected_time,
- bool* called,
- DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+static void CheckBuffers(base::TimeDelta expected_start_time,
+ const int expected_duration_time_ms,
+ const DemuxerStream::DecoderBufferVector& buffers) {
+ for (const auto& buffer : buffers) {
+ EXPECT_EQ(expected_start_time, buffer->timestamp());
+ expected_start_time += base::Milliseconds(expected_duration_time_ms);
+ }
+}
+
+static void OnReadDone_Ok(base::TimeDelta expected_start_time,
+ const int expected_duration_time_ms,
+ const size_t expected_read_count,
+ bool* called,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
EXPECT_EQ(status, DemuxerStream::kOk);
- EXPECT_EQ(expected_time, buffer->timestamp());
+ DVLOG(3) << __func__ << "buffers.size=" << buffers.size();
+ EXPECT_EQ(buffers.size(), expected_read_count);
+ CheckBuffers(expected_start_time, expected_duration_time_ms, buffers);
+
*called = true;
}
-static void OnReadDone_AbortExpected(bool* called,
- DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+static void OnReadDone_AbortExpected(
+ bool* called,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
EXPECT_EQ(status, DemuxerStream::kAborted);
- EXPECT_EQ(NULL, buffer.get());
+ EXPECT_EQ(buffers.size(), 0u);
*called = true;
}
-static void OnReadDone_EOSExpected(bool* called,
- DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+static void OnReadDone_LastBufferEOSExpected(
+ bool* called,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
EXPECT_EQ(status, DemuxerStream::kOk);
- EXPECT_TRUE(buffer->end_of_stream());
+ DCHECK_GE(buffers.size(), 1u);
+ DVLOG(3) << __func__ << "buffers.size=" << buffers.size();
+ for (size_t i = 0; i < buffers.size() - 1; ++i) {
+ EXPECT_FALSE(buffers[0]->end_of_stream());
+ }
+ EXPECT_TRUE(buffers.back()->end_of_stream());
*called = true;
}
*called = true;
}
+static void StoreStatusAndBuffers(
+ DemuxerStream::Status* status_out,
+ DemuxerStream::DecoderBufferVector* buffers_out,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
+ *status_out = status;
+ *buffers_out = std::move(buffers);
+}
+
class ChunkDemuxerTest : public ::testing::Test {
public:
// Public method because test cases use it directly.
return nullptr;
}
- void Read(DemuxerStream::Type type, DemuxerStream::ReadCB read_cb) {
- GetStream(type)->Read(std::move(read_cb));
+ void Read(uint32_t count,
+ DemuxerStream::Type type,
+ DemuxerStream::ReadCB read_cb) {
+ GetStream(type)->Read(count, std::move(read_cb));
base::RunLoop().RunUntilIdle();
}
- void ReadAudio(DemuxerStream::ReadCB read_cb) {
- Read(DemuxerStream::AUDIO, std::move(read_cb));
+ void ReadAudio(uint32_t count, DemuxerStream::ReadCB read_cb) {
+ Read(count, DemuxerStream::AUDIO, std::move(read_cb));
}
- void ReadVideo(DemuxerStream::ReadCB read_cb) {
- Read(DemuxerStream::VIDEO, std::move(read_cb));
+ void ReadVideo(uint32_t count, DemuxerStream::ReadCB read_cb) {
+ Read(count, DemuxerStream::VIDEO, std::move(read_cb));
}
void GenerateExpectedReads(int timecode, int block_count) {
MOCK_METHOD2(ReadDone,
void(DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer>));
-
- void StoreStatusAndBuffer(DemuxerStream::Status* status_out,
- scoped_refptr<DecoderBuffer>* buffer_out,
- DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
- *status_out = status;
- *buffer_out = buffer;
- }
+ DemuxerStream::DecoderBufferVector buffers));
void ReadUntilNotOkOrEndOfStream(DemuxerStream::Type type,
DemuxerStream::Status* status,
base::TimeDelta* last_timestamp) {
DemuxerStream* stream = GetStream(type);
- scoped_refptr<DecoderBuffer> buffer;
+ DemuxerStream::DecoderBufferVector buffers;
*last_timestamp = kNoTimestamp;
do {
- stream->Read(base::BindOnce(&ChunkDemuxerTest::StoreStatusAndBuffer,
- base::Unretained(this), status, &buffer));
+ stream->Read(1, base::BindOnce(StoreStatusAndBuffers, status, &buffers));
+ EXPECT_LE(buffers.size(), 1u);
base::RunLoop().RunUntilIdle();
- if (*status == DemuxerStream::kOk && !buffer->end_of_stream())
- *last_timestamp = buffer->timestamp();
- } while (*status == DemuxerStream::kOk && !buffer->end_of_stream());
+ if (*status == DemuxerStream::kOk && !buffers[0]->end_of_stream())
+ *last_timestamp = buffers[0]->timestamp();
+ } while (*status == DemuxerStream::kOk && !buffers[0]->end_of_stream());
}
void ExpectEndOfStream(DemuxerStream::Type type) {
- EXPECT_CALL(*this, ReadDone(DemuxerStream::kOk, IsEndOfStream()));
+ EXPECT_CALL(*this, ReadDone(DemuxerStream::kOk, ReadOneAndIsEndOfStream()));
GetStream(type)->Read(
- base::BindOnce(&ChunkDemuxerTest::ReadDone, base::Unretained(this)));
+ 1, base::BindOnce(&ChunkDemuxerTest::ReadDone, base::Unretained(this)));
base::RunLoop().RunUntilIdle();
}
void ExpectRead(DemuxerStream::Type type, int64_t timestamp_in_ms) {
EXPECT_CALL(*this, ReadDone(DemuxerStream::kOk,
- HasTimestamp(timestamp_in_ms)));
+ ReadOneAndHasTimestamp(timestamp_in_ms)));
GetStream(type)->Read(
- base::BindOnce(&ChunkDemuxerTest::ReadDone, base::Unretained(this)));
+ 1, base::BindOnce(&ChunkDemuxerTest::ReadDone, base::Unretained(this)));
base::RunLoop().RunUntilIdle();
}
void ExpectConfigChanged(DemuxerStream::Type type) {
EXPECT_CALL(*this, ReadDone(DemuxerStream::kConfigChanged, _));
GetStream(type)->Read(
- base::BindOnce(&ChunkDemuxerTest::ReadDone, base::Unretained(this)));
+ 1, base::BindOnce(&ChunkDemuxerTest::ReadDone, base::Unretained(this)));
base::RunLoop().RunUntilIdle();
}
std::stringstream ss;
for (size_t i = 0; i < timestamps.size(); ++i) {
// Initialize status to kAborted since it's possible for Read() to return
- // without calling StoreStatusAndBuffer() if it doesn't have any buffers
+ // without calling StoreStatusAndBuffers() if it doesn't have any buffers
// left to return.
DemuxerStream::Status status = DemuxerStream::kAborted;
- scoped_refptr<DecoderBuffer> buffer;
- stream->Read(base::BindOnce(&ChunkDemuxerTest::StoreStatusAndBuffer,
- base::Unretained(this), &status, &buffer));
+ DemuxerStream::DecoderBufferVector buffers;
+ stream->Read(1, base::BindOnce(StoreStatusAndBuffers, &status, &buffers));
+ EXPECT_LE(buffers.size(), 1u);
base::RunLoop().RunUntilIdle();
- if (status != DemuxerStream::kOk || buffer->end_of_stream())
+ if (status != DemuxerStream::kOk || buffers[0]->end_of_stream())
break;
if (i > 0)
ss << " ";
- ss << buffer->timestamp().InMilliseconds();
+ ss << buffers[0]->timestamp().InMilliseconds();
- if (buffer->is_key_frame())
+ if (buffers[0]->is_key_frame())
ss << "K";
// Handle preroll buffers.
if (base::EndsWith(timestamps[i], "P", base::CompareCase::SENSITIVE)) {
- ASSERT_EQ(kInfiniteDuration, buffer->discard_padding().first);
- ASSERT_EQ(base::TimeDelta(), buffer->discard_padding().second);
+ ASSERT_EQ(kInfiniteDuration, buffers[0]->discard_padding().first);
+ ASSERT_EQ(base::TimeDelta(), buffers[0]->discard_padding().second);
ss << "P";
}
}
bool video_read_done = false;
if (timestamps[i].audio_time_ms != kSkip) {
- ReadAudio(base::BindOnce(
- &OnReadDone, base::Milliseconds(timestamps[i].audio_time_ms),
- &audio_read_done));
+ ReadAudio(
+ 1, base::BindOnce(&OnReadDone_Ok,
+ base::Milliseconds(timestamps[i].audio_time_ms),
+ kAudioBlockDuration, 1, &audio_read_done));
EXPECT_TRUE(audio_read_done);
}
if (timestamps[i].video_time_ms != kSkip) {
- ReadVideo(base::BindOnce(
- &OnReadDone, base::Milliseconds(timestamps[i].video_time_ms),
- &video_read_done));
+ ReadVideo(
+ 1, base::BindOnce(&OnReadDone_Ok,
+ base::Milliseconds(timestamps[i].video_time_ms),
+ kVideoBlockDuration, 1, &video_read_done));
EXPECT_TRUE(video_read_done);
}
}
bool audio_read_done = false;
bool video_read_done = false;
- audio_stream->Read(base::BindOnce(&OnReadDone_EOSExpected, &audio_read_done));
- video_stream->Read(base::BindOnce(&OnReadDone_EOSExpected, &video_read_done));
+ audio_stream->Read(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &audio_read_done));
+ video_stream->Read(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &video_read_done));
base::RunLoop().RunUntilIdle();
EXPECT_FALSE(audio_read_done);
×tamp_offset_map_[kSourceId]));
}
+// Testing for batch read.
+TEST_F(ChunkDemuxerTest, ReadMultiBuffer_kOK) {
+ ASSERT_TRUE(InitDemuxer(HAS_AUDIO | HAS_VIDEO));
+
+ ASSERT_TRUE(AppendCluster(kDefaultFirstCluster()));
+
+ bool audio_read_done = false;
+ bool video_read_done = false;
+ ReadAudio(3, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kAudioBlockDuration, 2, &audio_read_done));
+ ReadVideo(2, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kVideoBlockDuration, 2, &video_read_done));
+
+ EXPECT_TRUE(audio_read_done);
+ EXPECT_TRUE(video_read_done);
+}
+
+TEST_F(ChunkDemuxerTest, ReadMultiBuffer_ActualReadCount_LE_RequestedCount) {
+ ASSERT_TRUE(InitDemuxer(HAS_AUDIO | HAS_VIDEO));
+
+ ASSERT_TRUE(AppendCluster(kDefaultFirstCluster()));
+
+ DemuxerStream::Status audio_status;
+ DemuxerStream::Status vedio_status;
+ DemuxerStream::DecoderBufferVector audio_buffers;
+ DemuxerStream::DecoderBufferVector video_buffers;
+ DemuxerStream* auido_stream = GetStream(DemuxerStream::Type::AUDIO);
+
+ auido_stream->Read(100, base::BindOnce(StoreStatusAndBuffers, &audio_status,
+ &audio_buffers));
+ DemuxerStream* video_stream = GetStream(DemuxerStream::Type::VIDEO);
+ video_stream->Read(100, base::BindOnce(StoreStatusAndBuffers, &vedio_status,
+ &video_buffers));
+ base::RunLoop().RunUntilIdle();
+
+ EXPECT_EQ(vedio_status, DemuxerStream::Status::kOk);
+ EXPECT_EQ(audio_buffers.size(), 2u);
+ CheckBuffers(base::Milliseconds(0), kAudioBlockDuration, audio_buffers);
+ EXPECT_EQ(vedio_status, DemuxerStream::Status::kOk);
+ EXPECT_EQ(video_buffers.size(), 2u);
+ CheckBuffers(base::Milliseconds(0), kVideoBlockDuration, video_buffers);
+}
+
+TEST_F(ChunkDemuxerTest, ReadMultiBuffer_LastBufferIsEOS) {
+ ASSERT_TRUE(InitDemuxer(HAS_AUDIO | HAS_VIDEO));
+ ASSERT_TRUE(AppendCluster(kDefaultFirstCluster()));
+
+ EXPECT_CALL(host_, SetDuration(base::Milliseconds(66)));
+ MarkEndOfStream(PIPELINE_OK);
+
+ DemuxerStream::DecoderBufferVector video_buffers;
+ DemuxerStream* video_stream = GetStream(DemuxerStream::Type::VIDEO);
+
+ bool audio_read_done = false;
+ // Totally 3 buffers and last buffer is EOS.
+ video_stream->Read(
+ 100, base::BindOnce(OnReadDone_LastBufferEOSExpected, &audio_read_done));
+ base::RunLoop().RunUntilIdle();
+
+ EXPECT_TRUE(audio_read_done);
+}
+
// Make sure Read() callbacks are dispatched with the proper data.
-TEST_F(ChunkDemuxerTest, Read) {
+TEST_F(ChunkDemuxerTest, ReadOneBuffer) {
ASSERT_TRUE(InitDemuxer(HAS_AUDIO | HAS_VIDEO));
ASSERT_TRUE(AppendCluster(kDefaultFirstCluster()));
bool audio_read_done = false;
bool video_read_done = false;
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &audio_read_done));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kAudioBlockDuration, 1, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kVideoBlockDuration, 1, &video_read_done));
EXPECT_TRUE(audio_read_done);
EXPECT_TRUE(video_read_done);
EXPECT_FALSE(audio_read_done_);
EXPECT_FALSE(video_read_done_);
- audio_stream_->Read(
- base::BindOnce(&OnEndOfStreamReadDone, &audio_read_done_));
- video_stream_->Read(
- base::BindOnce(&OnEndOfStreamReadDone, &video_read_done_));
+ audio_stream_->Read(1, base::BindOnce(&OnReadDone_LastBufferEOSExpected,
+ &audio_read_done_));
+ video_stream_->Read(1, base::BindOnce(&OnReadDone_LastBufferEOSExpected,
+ &video_read_done_));
base::RunLoop().RunUntilIdle();
}
}
private:
- static void OnEndOfStreamReadDone(bool* called,
- DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
- EXPECT_EQ(status, DemuxerStream::kOk);
- EXPECT_TRUE(buffer->end_of_stream());
- *called = true;
- }
-
raw_ptr<DemuxerStream> audio_stream_;
raw_ptr<DemuxerStream> video_stream_;
bool audio_read_done_;
EndOfStreamHelper end_of_stream_helper_1(audio_stream, video_stream);
EndOfStreamHelper end_of_stream_helper_2(audio_stream, video_stream);
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &audio_read_done_1));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &video_read_done_1));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kAudioBlockDuration, 1, &audio_read_done_1));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kVideoBlockDuration, 1, &video_read_done_1));
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(audio_read_done_1);
EndOfStreamHelper end_of_stream_helper_2(audio_stream, video_stream);
EndOfStreamHelper end_of_stream_helper_3(audio_stream, video_stream);
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &audio_read_done_1));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &video_read_done_1));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kAudioBlockDuration, 1, &audio_read_done_1));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kVideoBlockDuration, 1, &video_read_done_1));
end_of_stream_helper_1.RequestReads();
bool audio_read_done = false;
bool video_read_done = false;
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &audio_read_done));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &video_read_done));
-
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kAudioBlockDuration, 1, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kVideoBlockDuration, 1, &video_read_done));
// Make sure the reads haven't completed yet.
EXPECT_FALSE(audio_read_done);
EXPECT_FALSE(video_read_done);
audio_read_done = false;
video_read_done = false;
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(23), &audio_read_done));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(33), &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(23),
+ kAudioBlockDuration, 1, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(33),
+ kVideoBlockDuration, 1, &video_read_done));
// Make sure the reads haven't completed yet.
EXPECT_FALSE(audio_read_done);
// Read() from audio should return "end of stream" buffers.
bool audio_read_done = false;
- audio_stream->Read(base::BindOnce(&OnReadDone_EOSExpected, &audio_read_done));
+ audio_stream->Read(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &audio_read_done));
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(audio_read_done);
// Attempt to read in unbuffered area; should not fulfill the read.
bool audio_read_done = false;
bool video_read_done = false;
- ReadAudio(base::BindOnce(&OnReadDone_AbortExpected, &audio_read_done));
- ReadVideo(base::BindOnce(&OnReadDone_AbortExpected, &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_AbortExpected, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_AbortExpected, &video_read_done));
EXPECT_FALSE(audio_read_done);
EXPECT_FALSE(video_read_done);
// Read requests should be fulfilled with empty buffers.
bool audio_read_done = false;
bool video_read_done = false;
- ReadAudio(base::BindOnce(&OnReadDone_AbortExpected, &audio_read_done));
- ReadVideo(base::BindOnce(&OnReadDone_AbortExpected, &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_AbortExpected, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_AbortExpected, &video_read_done));
EXPECT_TRUE(audio_read_done);
EXPECT_TRUE(video_read_done);
// Read() should return buffers at 0.
bool audio_read_done = false;
bool video_read_done = false;
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &audio_read_done));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(0), &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kAudioBlockDuration, 1, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(0),
+ kVideoBlockDuration, 1, &video_read_done));
EXPECT_TRUE(audio_read_done);
EXPECT_TRUE(video_read_done);
audio_read_done = false;
video_read_done = false;
- ReadAudio(base::BindOnce(&OnReadDone, base::Seconds(3), &audio_read_done));
- ReadVideo(base::BindOnce(&OnReadDone, base::Seconds(3), &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Seconds(3),
+ kAudioBlockDuration, 1, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Seconds(3),
+ kVideoBlockDuration, 1, &video_read_done));
// Read()s should not return until after data is appended at the Seek point.
EXPECT_FALSE(audio_read_done);
EXPECT_FALSE(video_read_done);
bool audio_read_done = false;
bool video_read_done = false;
- ReadAudio(
- base::BindOnce(&OnReadDone, base::Milliseconds(138), &audio_read_done));
- ReadVideo(
- base::BindOnce(&OnReadDone, base::Milliseconds(138), &video_read_done));
+ ReadAudio(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(138),
+ kAudioBlockDuration, 1, &audio_read_done));
+ ReadVideo(1, base::BindOnce(&OnReadDone_Ok, base::Milliseconds(138),
+ kVideoBlockDuration, 1, &video_read_done));
// Verify that the reads didn't complete
EXPECT_FALSE(audio_read_done);
// Verify that reads block because the append cleared the end of stream state.
audio_read_done = false;
video_read_done = false;
- ReadAudio(base::BindOnce(&OnReadDone_EOSExpected, &audio_read_done));
- ReadVideo(base::BindOnce(&OnReadDone_EOSExpected, &video_read_done));
+ ReadAudio(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &audio_read_done));
+ ReadVideo(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &video_read_done));
// Verify that the reads don't complete.
EXPECT_FALSE(audio_read_done);
// Verify stream status changes with pending read.
bool read_done = false;
- audio_stream->Read(base::BindOnce(&OnReadDone_EOSExpected, &read_done));
+ audio_stream->Read(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &read_done));
DisableAndEnableDemuxerTracks(demuxer_.get(), &task_environment_);
EXPECT_TRUE(read_done);
read_done = false;
- video_stream->Read(base::BindOnce(&OnReadDone_EOSExpected, &read_done));
+ video_stream->Read(
+ 1, base::BindOnce(&OnReadDone_LastBufferEOSExpected, &read_done));
DisableAndEnableDemuxerTracks(demuxer_.get(), &task_environment_);
EXPECT_TRUE(read_done);
}
TRACE_EVENT_ASYNC_BEGIN0("media", GetDemuxerReadTraceString<StreamType>(),
this);
pending_demuxer_read_ = true;
- stream_->Read(base::BindOnce(&DecoderStream<StreamType>::OnBufferReady,
- weak_factory_.GetWeakPtr()));
+ stream_->Read(1, base::BindOnce(&DecoderStream<StreamType>::OnBuffersRead,
+ weak_factory_.GetWeakPtr()));
+}
+
+template <DemuxerStream::Type StreamType>
+void DecoderStream<StreamType>::OnBuffersRead(
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
+ DCHECK_LE(buffers.size(), 1u) << "DecoderStream only reads a single-buffer.";
+ OnBufferReady(status, buffers.empty() ? nullptr : std::move(buffers[0]));
}
template <DemuxerStream::Type StreamType>
// Reads a buffer from |stream_| and returns the result via OnBufferReady().
void ReadFromDemuxerStream();
+ void OnBuffersRead(DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers);
+
// Callback for DemuxerStream::Read().
void OnBufferReady(DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> buffer);
std::move(init_cb_).Run(PIPELINE_OK);
}
-void DecryptingDemuxerStream::Read(ReadCB read_cb) {
+void DecryptingDemuxerStream::Read(uint32_t count, ReadCB read_cb) {
DVLOG(3) << __func__;
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK_EQ(state_, kIdle) << state_;
DCHECK(read_cb);
CHECK(!read_cb_) << "Overlapping reads are not supported.";
+ DCHECK_EQ(count, 1u) << "DecryptingDemuxerStream only reads a single-buffer.";
read_cb_ = BindToCurrentLoop(std::move(read_cb));
state_ = kPendingDemuxerRead;
demuxer_stream_->Read(
- base::BindOnce(&DecryptingDemuxerStream::OnBufferReadFromDemuxerStream,
+ 1,
+ base::BindOnce(&DecryptingDemuxerStream::OnBuffersReadFromDemuxerStream,
weak_factory_.GetWeakPtr()));
}
CompleteWaitingForDecryptionKey();
DCHECK(read_cb_);
pending_buffer_to_decrypt_ = nullptr;
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
}
DCHECK(!read_cb_);
if (init_cb_)
std::move(init_cb_).Run(PIPELINE_ERROR_ABORT);
if (read_cb_)
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
if (reset_cb_)
std::move(reset_cb_).Run();
pending_buffer_to_decrypt_ = nullptr;
}
+void DecryptingDemuxerStream::OnBuffersReadFromDemuxerStream(
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
+ DCHECK_LE(buffers.size(), 1u)
+ << "DecryptingDemuxerStream only reads a single-buffer.";
+ OnBufferReadFromDemuxerStream(
+ status, buffers.empty() ? nullptr : std::move(buffers[0]));
+}
+
void DecryptingDemuxerStream::OnBufferReadFromDemuxerStream(
DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> buffer) {
InitializeDecoderConfig();
state_ = kIdle;
- std::move(read_cb_).Run(kConfigChanged, nullptr);
+ std::move(read_cb_).Run(kConfigChanged, {});
if (reset_cb_)
DoReset();
return;
}
if (reset_cb_) {
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
DoReset();
return;
}
<< GetDisplayName() << ": demuxer stream read error.";
}
state_ = kIdle;
- std::move(read_cb_).Run(status, nullptr);
+ std::move(read_cb_).Run(status, {});
return;
}
#if BUILDFLAG(IS_TIZEN_TV)
if (status == kNeedBuffer) {
DVLOG(2) << __func__ << ": need buffer";
- std::move(read_cb_).Run(status, nullptr);
+ std::move(read_cb_).Run(status, {});
return;
}
if (buffer->end_of_stream()) {
DVLOG(2) << __func__ << ": EOS buffer";
state_ = kIdle;
- std::move(read_cb_).Run(kOk, std::move(buffer));
+ std::move(read_cb_).Run(kOk, {std::move(buffer)});
return;
}
if (!buffer->decrypt_config()) {
DVLOG(2) << __func__ << ": clear buffer";
state_ = kIdle;
- std::move(read_cb_).Run(kOk, std::move(buffer));
+ std::move(read_cb_).Run(kOk, {std::move(buffer)});
return;
}
if (reset_cb_) {
pending_buffer_to_decrypt_ = nullptr;
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
DoReset();
return;
}
<< GetDisplayName() << ": decrypt error " << status;
pending_buffer_to_decrypt_ = nullptr;
state_ = kIdle;
- std::move(read_cb_).Run(kError, nullptr);
+ std::move(read_cb_).Run(kError, {});
return;
}
if (decrypted_buffer.get() == nullptr) {
pending_buffer_to_decrypt_ = nullptr;
state_ = kIdle;
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
return;
}
#else
pending_buffer_to_decrypt_ = nullptr;
state_ = kIdle;
- std::move(read_cb_).Run(kOk, std::move(decrypted_buffer));
+ std::move(read_cb_).Run(kOk, {std::move(decrypted_buffer)});
}
void DecryptingDemuxerStream::OnCdmContextEvent(CdmContext::Event event) {
std::string GetDisplayName() const;
// DemuxerStream implementation.
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
Type type() const override;
kWaitingForKey
};
+ void OnBuffersReadFromDemuxerStream(
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers);
// Callback for DemuxerStream::Read().
void OnBufferReadFromDemuxerStream(DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> buffer);
using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::InvokeWithoutArgs;
-using ::testing::IsNull;
using ::testing::Return;
using ::testing::SaveArg;
using ::testing::StrictMock;
ACTION_P(ReturnBuffer, buffer) {
std::move(arg0).Run(
- buffer.get() ? DemuxerStream::kOk : DemuxerStream::kAborted, buffer);
+ buffer.get() ? DemuxerStream::kOk : DemuxerStream::kAborted, {buffer});
}
} // namespace
DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> decrypted_buffer) {
if (status != DemuxerStream::kOk)
- EXPECT_CALL(*this, BufferReady(status, IsNull()));
+ EXPECT_CALL(*this, BufferReady(status, IsEmpty()));
else if (decrypted_buffer->end_of_stream())
- EXPECT_CALL(*this, BufferReady(status, IsEndOfStream()));
- else
- EXPECT_CALL(*this, BufferReady(status, decrypted_buffer));
+ EXPECT_CALL(*this, BufferReady(status, ReadOneAndIsEndOfStream()));
+ else {
+ DemuxerStream::DecoderBufferVector buffers;
+ buffers.emplace_back(decrypted_buffer_);
+ EXPECT_CALL(*this, BufferReady(status, buffers));
+ }
- demuxer_stream_->Read(base::BindOnce(
- &DecryptingDemuxerStreamTest::BufferReady, base::Unretained(this)));
+ demuxer_stream_->Read(
+ 1, base::BindOnce(&DecryptingDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
}
: clear_buffer_));
// For clearbuffer, decryptor->Decrypt() will not be called.
-
- scoped_refptr<DecoderBuffer> decrypted_buffer;
+ DemuxerStream::DecoderBufferVector buffers;
EXPECT_CALL(*this, BufferReady(DemuxerStream::kOk, _))
- .WillOnce(SaveArg<1>(&decrypted_buffer));
- demuxer_stream_->Read(base::BindOnce(
- &DecryptingDemuxerStreamTest::BufferReady, base::Unretained(this)));
+ .WillOnce(SaveArg<1>(&buffers));
+ demuxer_stream_->Read(
+ 1, base::BindOnce(&DecryptingDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
-
- EXPECT_FALSE(decrypted_buffer->decrypt_config());
+ DCHECK_EQ(buffers.size(), 1u);
+ EXPECT_FALSE(buffers[0]->decrypt_config());
}
// Sets up expectations and actions to put DecryptingDemuxerStream in an
EXPECT_TRUE(!pending_demuxer_read_cb_);
EXPECT_CALL(*input_audio_stream_, OnRead(_))
.WillOnce(MoveArg<0>(&pending_demuxer_read_cb_));
- demuxer_stream_->Read(base::BindOnce(
- &DecryptingDemuxerStreamTest::BufferReady, base::Unretained(this)));
+ demuxer_stream_->Read(
+ 1, base::BindOnce(&DecryptingDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
// Make sure the Read() triggers a Read() on the input demuxer stream.
EXPECT_FALSE(!pending_demuxer_read_cb_);
pending_decrypt_cb_ = std::move(callback);
})));
- demuxer_stream_->Read(base::BindOnce(
- &DecryptingDemuxerStreamTest::BufferReady, base::Unretained(this)));
+ demuxer_stream_->Read(
+ 1, base::BindOnce(&DecryptingDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
// Make sure Read() triggers a Decrypt() on the decryptor.
EXPECT_FALSE(!pending_decrypt_cb_);
scoped_refptr<DecoderBuffer>()));
EXPECT_MEDIA_LOG(HasSubstr("DecryptingDemuxerStream: no key for key ID"));
EXPECT_CALL(*this, OnWaiting(WaitingReason::kNoDecryptionKey));
- demuxer_stream_->Read(base::BindOnce(
- &DecryptingDemuxerStreamTest::BufferReady, base::Unretained(this)));
+ demuxer_stream_->Read(
+ 1, base::BindOnce(&DecryptingDemuxerStreamTest::BufferReady,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
}
void SatisfyPendingDemuxerReadCB(DemuxerStream::Status status) {
scoped_refptr<DecoderBuffer> buffer =
(status == DemuxerStream::kOk) ? encrypted_buffer_ : nullptr;
- std::move(pending_demuxer_read_cb_).Run(status, buffer);
+ std::move(pending_demuxer_read_cb_).Run(status, {buffer});
}
void Reset() {
}
MOCK_METHOD2(BufferReady,
- void(DemuxerStream::Status, scoped_refptr<DecoderBuffer>));
+ void(DemuxerStream::Status, DemuxerStream::DecoderBufferVector));
MOCK_METHOD1(OnWaiting, void(WaitingReason));
base::test::SingleThreadTaskEnvironment task_environment_;
EXPECT_CALL(*decryptor_, Decrypt(_, encrypted_buffer_, _))
.WillRepeatedly(
RunOnceCallback<2>(Decryptor::kSuccess, decrypted_buffer_));
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kOk, decrypted_buffer_));
+ DemuxerStream::DecoderBufferVector buffers;
+ buffers.emplace_back(decrypted_buffer_);
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kOk, buffers));
event_cb_.Run(CdmContext::Event::kHasAdditionalUsableKey);
base::RunLoop().RunUntilIdle();
}
EXPECT_CALL(*decryptor_, Decrypt(_, encrypted_buffer_, _))
.WillRepeatedly(
RunOnceCallback<2>(Decryptor::kSuccess, decrypted_buffer_));
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kOk, decrypted_buffer_));
+ DemuxerStream::DecoderBufferVector buffers;
+ buffers.emplace_back(decrypted_buffer_);
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kOk, buffers));
// The decrypt callback is returned after the correct decryption key is added.
event_cb_.Run(CdmContext::Event::kHasAdditionalUsableKey);
std::move(pending_decrypt_cb_).Run(Decryptor::kNoKey, nullptr);
Initialize();
EnterPendingReadState();
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
Reset();
SatisfyPendingDemuxerReadCB(DemuxerStream::kOk);
Initialize();
EnterPendingDecryptState();
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
Reset();
}
Initialize();
EnterWaitingForKeyState();
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
Reset();
}
EnterPendingReadState();
// Make sure we get a null audio frame returned.
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
Reset();
SatisfyPendingDemuxerReadCB(DemuxerStream::kAborted);
EXPECT_CALL(*input_audio_stream_, OnRead(_))
.WillOnce(RunOnceCallback<0>(DemuxerStream::kConfigChanged,
- scoped_refptr<DecoderBuffer>()));
+ DemuxerStream::DecoderBufferVector()));
ReadAndExpectBufferReadyWith(DemuxerStream::kConfigChanged, nullptr);
}
EnterPendingReadState();
// Make sure we get a |kConfigChanged| instead of a |kAborted|.
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kConfigChanged, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kConfigChanged, IsEmpty()));
Reset();
SatisfyPendingDemuxerReadCB(DemuxerStream::kConfigChanged);
Initialize();
EnterPendingReadState();
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
}
// Test destruction in kPendingDecrypt state.
Initialize();
EnterPendingDecryptState();
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
}
// Test destruction in kWaitingForKey state.
Initialize();
EnterWaitingForKeyState();
- EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsNull()));
+ EXPECT_CALL(*this, BufferReady(DemuxerStream::kAborted, IsEmpty()));
}
// Test destruction after reset.
ACTION_P(ReturnBuffer, buffer) {
std::move(arg0).Run(
- buffer.get() ? DemuxerStream::kOk : DemuxerStream::kAborted, buffer);
+ buffer.get() ? DemuxerStream::kOk : DemuxerStream::kAborted, {buffer});
}
} // namespace
}
MOCK_METHOD2(BufferReady,
- void(DemuxerStream::Status, scoped_refptr<DecoderBuffer>));
+ void(DemuxerStream::Status, DemuxerStream::DecoderBufferVector));
protected:
base::test::TaskEnvironment task_environment_;
decrypting_media_resource_->Initialize(
decrypting_media_resource_init_cb_.Get(), waiting_cb_.Get());
- decrypting_media_resource_->GetAllStreams().front()->Read(base::BindOnce(
- &DecryptingMediaResourceTest::BufferReady, base::Unretained(this)));
+ decrypting_media_resource_->GetAllStreams().front()->Read(
+ 1, base::BindOnce(&DecryptingMediaResourceTest::BufferReady,
+ base::Unretained(this)));
task_environment_.RunUntilIdle();
}
-} // namespace media
+} // namespace media
\ No newline at end of file
bool* end_of_stream,
base::TimeDelta* timestamp,
media::DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer);
+ DemuxerStream::DecoderBufferVector buffers);
int GetNextStreamIndexToRead();
Streams streams_;
base::TimeDelta timestamp;
base::RunLoop run_loop;
- streams_[index]->Read(base::BindOnce(
- &StreamReader::OnReadDone, base::Unretained(this),
- base::ThreadTaskRunnerHandle::Get(), run_loop.QuitWhenIdleClosure(),
- &end_of_stream, ×tamp));
+ streams_[index]->Read(
+ 1, base::BindOnce(&StreamReader::OnReadDone, base::Unretained(this),
+ base::SingleThreadTaskRunner::GetCurrentDefault(),
+ run_loop.QuitWhenIdleClosure(), &end_of_stream,
+ ×tamp));
run_loop.Run();
CHECK(end_of_stream || timestamp != media::kNoTimestamp);
bool* end_of_stream,
base::TimeDelta* timestamp,
media::DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
CHECK_EQ(status, media::DemuxerStream::kOk);
- CHECK(buffer);
+ CHECK_EQ(buffers.size(), 1u) << "StreamReader only reads a single-buffer.";
+ scoped_refptr<DecoderBuffer> buffer = std::move(buffers[0]);
*end_of_stream = buffer->end_of_stream();
*timestamp = *end_of_stream ? media::kNoTimestamp : buffer->timestamp();
task_runner->PostTask(FROM_HERE, std::move(quit_when_idle_closure));
void FFmpegDemuxerStream::Abort() {
aborted_ = true;
if (read_cb_)
- std::move(read_cb_).Run(DemuxerStream::kAborted, nullptr);
+ std::move(read_cb_).Run(DemuxerStream::kAborted, {});
}
void FFmpegDemuxerStream::Stop() {
end_of_stream_ = true;
if (read_cb_) {
std::move(read_cb_).Run(DemuxerStream::kOk,
- DecoderBuffer::CreateEOSBuffer());
+ {DecoderBuffer::CreateEOSBuffer()});
}
}
return liveness_;
}
-void FFmpegDemuxerStream::Read(ReadCB read_cb) {
+void FFmpegDemuxerStream::Read(uint32_t count, ReadCB read_cb) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (read_cb_) {
LOG(WARNING) << "Overlapping reads are not supported";
return;
}
read_cb_ = BindToCurrentLoop(std::move(read_cb));
-
+ requested_buffer_count_ = static_cast<size_t>(count);
+ DVLOG(3) << __func__
+ << " requested_buffer_count_ = " << requested_buffer_count_;
// Don't accept any additional reads if we've been told to stop.
// The |demuxer_| may have been destroyed in the pipeline thread.
//
// TODO(scherkus): it would be cleaner to reply with an error message.
if (!demuxer_) {
std::move(read_cb_).Run(DemuxerStream::kOk,
- DecoderBuffer::CreateEOSBuffer());
+ {DecoderBuffer::CreateEOSBuffer()});
return;
}
if (!is_enabled_) {
DVLOG(1) << "Read from disabled stream, returning EOS";
- std::move(read_cb_).Run(kOk, DecoderBuffer::CreateEOSBuffer());
+ std::move(read_cb_).Run(kOk, {DecoderBuffer::CreateEOSBuffer()});
return;
}
if (aborted_) {
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
return;
}
}
if (!is_enabled_ && read_cb_) {
DVLOG(1) << "Read from disabled stream, returning EOS";
- std::move(read_cb_).Run(kOk, DecoderBuffer::CreateEOSBuffer());
+ std::move(read_cb_).Run(kOk, {DecoderBuffer::CreateEOSBuffer()});
}
}
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (read_cb_) {
if (!buffer_queue_.IsEmpty()) {
- std::move(read_cb_).Run(DemuxerStream::kOk, buffer_queue_.Pop());
+ DemuxerStream::DecoderBufferVector output_buffers;
+
+ for (size_t i = 0;
+ i < std::min(requested_buffer_count_, buffer_queue_.queue_size());
+ ++i) {
+ output_buffers.emplace_back(buffer_queue_.Pop());
+ }
+ DVLOG(3) << __func__ << " Status:kOk, return output_buffers.size = "
+ << output_buffers.size();
+ std::move(read_cb_).Run(DemuxerStream::kOk, std::move(output_buffers));
} else if (end_of_stream_) {
std::move(read_cb_).Run(DemuxerStream::kOk,
- DecoderBuffer::CreateEOSBuffer());
+ {DecoderBuffer::CreateEOSBuffer()});
}
}
-
// Have capacity? Ask for more!
if (HasAvailableCapacity() && !end_of_stream_) {
demuxer_->NotifyCapacityAvailable();
// DemuxerStream implementation.
Type type() const override;
StreamLiveness liveness() const override;
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
void EnableBitstreamConverter() override;
bool SupportsConfigChanges() override;
AudioDecoderConfig audio_decoder_config() override;
int num_discarded_packet_warnings_;
int64_t last_packet_pos_;
int64_t last_packet_dts_;
+ // Requested buffer count. The actual returned buffer count could be less
+ // according to DemuxerStream::Read() API.
+ size_t requested_buffer_count_ = 0;
};
class MEDIA_EXPORT FFmpegDemuxer : public Demuxer {
namespace media {
-MATCHER(IsEndOfStreamBuffer,
- std::string(negation ? "isn't" : "is") + " end of stream") {
- return arg->end_of_stream();
-}
-
// This does not verify any of the codec parameters that may be included in the
// log entry.
MATCHER_P(SimpleCreatedFFmpegDemuxerStream, stream_type, "") {
static void EosOnReadDone(bool* got_eos_buffer,
DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
- base::ThreadTaskRunnerHandle::Get()->PostTask(
+ DemuxerStream::DecoderBufferVector buffers) {
+ // TODO(crbug.com/1347395): add multi read unit tests in next CL.
+ DCHECK_EQ(buffers.size(), 1u)
+ << "FFmpegDemuxerTest only reads a single-buffer.";
+ scoped_refptr<DecoderBuffer> buffer = std::move(buffers[0]);
+ base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::RunLoop::QuitCurrentWhenIdleClosureDeprecated());
EXPECT_EQ(status, DemuxerStream::kOk);
const ReadExpectation& read_expectation,
base::OnceClosure quit_closure,
DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
+ // TODO(crbug.com/1347395): add multi read unit tests in next CL.
+ DCHECK_LE(buffers.size(), 1u)
+ << "FFmpegDemuxerTest only reads a single-buffer.";
std::string location_str = location.ToString();
location_str += "\n";
SCOPED_TRACE(location_str);
EXPECT_EQ(read_expectation.status, status);
if (status == DemuxerStream::kOk) {
+ DCHECK_EQ(buffers.size(), 1u);
+ scoped_refptr<DecoderBuffer> buffer = std::move(buffers[0]);
EXPECT_TRUE(buffer);
EXPECT_EQ(read_expectation.size, buffer->data_size());
EXPECT_EQ(read_expectation.timestamp_us,
DemuxerStream::Status status = DemuxerStream::Status::kOk,
base::TimeDelta discard_front_padding = base::TimeDelta()) {
base::RunLoop run_loop;
- stream->Read(NewReadCBWithCheckedDiscard(
- location, size, timestamp_us, discard_front_padding, is_key_frame,
- status, run_loop.QuitClosure()));
+ stream->Read(1, NewReadCBWithCheckedDiscard(
+ location, size, timestamp_us, discard_front_padding,
+ is_key_frame, status, run_loop.QuitClosure()));
run_loop.Run();
// Ensure tasks posted after the ReadCB is satisfied run. These are always
bool got_eos_buffer = false;
const int kMaxBuffers = 170;
for (int i = 0; !got_eos_buffer && i < kMaxBuffers; i++) {
- stream->Read(base::BindOnce(&EosOnReadDone, &got_eos_buffer));
+ stream->Read(1, base::BindOnce(&EosOnReadDone, &got_eos_buffer));
base::RunLoop().Run();
}
format_context()->pb->eof_reached = 1;
{
base::RunLoop run_loop;
- audio->Read(NewReadCBWithCheckedDiscard(FROM_HERE, 29, 0, base::TimeDelta(),
- true, DemuxerStream::kAborted,
- run_loop.QuitClosure()));
+ audio->Read(1, NewReadCBWithCheckedDiscard(
+ FROM_HERE, 29, 0, base::TimeDelta(), true,
+ DemuxerStream::kAborted, run_loop.QuitClosure()));
demuxer_->AbortPendingReads();
run_loop.Run();
task_environment_.RunUntilIdle();
// Reads after being stopped are all EOS buffers.
StrictMock<base::MockCallback<DemuxerStream::ReadCB>> callback;
- EXPECT_CALL(callback, Run(DemuxerStream::kOk, IsEndOfStreamBuffer()));
+ EXPECT_CALL(callback, Run(DemuxerStream::kOk, ReadOneAndIsEndOfStream()));
// Attempt the read...
- audio->Read(callback.Get());
+ audio->Read(1, callback.Get());
task_environment_.RunUntilIdle();
// Don't let the test call Stop() again.
#if BUILDFLAG(USE_PROPRIETARY_CODECS)
static void ValidateAnnexB(DemuxerStream* stream,
DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
EXPECT_EQ(status, DemuxerStream::kOk);
-
+ EXPECT_EQ(buffers.size(), 1u);
+ scoped_refptr<DecoderBuffer> buffer = std::move(buffers[0]);
if (buffer->end_of_stream()) {
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::RunLoop::QuitCurrentWhenIdleClosureDeprecated());
return;
}
- stream->Read(base::BindOnce(&ValidateAnnexB, stream));
+ stream->Read(1, base::BindOnce(&ValidateAnnexB, stream));
}
TEST_F(FFmpegDemuxerTest, IsValidAnnexB) {
ASSERT_TRUE(stream);
stream->EnableBitstreamConverter();
- stream->Read(base::BindOnce(&ValidateAnnexB, stream));
+ stream->Read(1, base::BindOnce(&ValidateAnnexB, stream));
base::RunLoop().Run();
demuxer_->Stop();
}
void OnReadDoneExpectEos(DemuxerStream::Status status,
- const scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
EXPECT_EQ(status, DemuxerStream::kOk);
- EXPECT_TRUE(buffer->end_of_stream());
+ EXPECT_EQ(buffers.size(), 1u);
+ EXPECT_TRUE(buffers[0]->end_of_stream());
}
} // namespace
audio_stream->FlushBuffers(true);
video_stream->FlushBuffers(true);
- audio_stream->Read(base::BindOnce(&OnReadDoneExpectEos));
- video_stream->Read(base::BindOnce(&OnReadDoneExpectEos));
+ audio_stream->Read(1, base::BindOnce(&OnReadDoneExpectEos));
+ video_stream->Read(1, base::BindOnce(&OnReadDoneExpectEos));
DisableAndEnableDemuxerTracks(demuxer_.get(), &task_environment_);
}
do {
read_callback_called_ = false;
- stream->Read(base::BindOnce(&FrameProcessorTest::StoreStatusAndBuffer,
- base::Unretained(this)));
+ stream->Read(1, base::BindOnce(&FrameProcessorTest::StoreStatusAndBuffer,
+ base::Unretained(this)));
base::RunLoop().RunUntilIdle();
} while (++loop_count < 2 && read_callback_called_ &&
last_read_status_ == DemuxerStream::kAborted);
do {
read_callback_called_ = false;
- stream->Read(base::BindOnce(&FrameProcessorTest::StoreStatusAndBuffer,
+ stream->Read(1,
+ base::BindOnce(&FrameProcessorTest::StoreStatusAndBuffer,
base::Unretained(this)));
base::RunLoop().RunUntilIdle();
EXPECT_TRUE(read_callback_called_);
private:
void StoreStatusAndBuffer(DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
+ DCHECK_LE(buffers.size(), 1u)
+ << "FrameProcessorTest only reads a single-buffer.";
+ scoped_refptr<DecoderBuffer> buffer =
+ (buffers.empty() ? nullptr : std::move(buffers[0]));
+
if (status == DemuxerStream::kOk && buffer.get()) {
DVLOG(3) << __func__ << "status: " << status
<< " ts: " << buffer->timestamp().InSecondsF();
#include "base/numerics/safe_conversions.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/decoder_buffer.h"
+#include "media/base/demuxer_stream.h"
#include "media/base/video_decoder_config.h"
#include "media/mojo/common/media_type_converters.h"
#include "media/mojo/common/mojo_decoder_buffer_converter.h"
audio_config, video_config);
}
-void MojoDemuxerStreamImpl::Read(ReadCallback callback) {
- stream_->Read(base::BindOnce(&MojoDemuxerStreamImpl::OnBufferReady,
- weak_factory_.GetWeakPtr(),
- std::move(callback)));
+void MojoDemuxerStreamImpl::Read(uint32_t count, ReadCallback callback) {
+ DVLOG(3) << __func__ << " client receive count:" << count;
+ stream_->Read(
+ count, base::BindOnce(&MojoDemuxerStreamImpl::OnBufferReady,
+ weak_factory_.GetWeakPtr(), std::move(callback)));
}
void MojoDemuxerStreamImpl::EnableBitstreamConverter() {
stream_->EnableBitstreamConverter();
}
-void MojoDemuxerStreamImpl::OnBufferReady(ReadCallback callback,
- Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+void MojoDemuxerStreamImpl::OnBufferReady(
+ ReadCallback callback,
+ Status status,
+ media::DemuxerStream::DecoderBufferVector buffers) {
absl::optional<AudioDecoderConfig> audio_config;
absl::optional<VideoDecoderConfig> video_config;
+ DVLOG(3) << __func__ << "status:" << status
+ << " buffers.size:" << buffers.size();
if (status == Status::kConfigChanged) {
- DVLOG(2) << __func__ << ": ConfigChange!";
+ // To simply the config change handling on renderer(receiver) side, prefer
+ // to send out buffers before config change happens. For FFmpegDemuxer, it
+ // doesn't make config change. For ChunkDemuxer, it send out buffer before
+ // confige change happen. |buffers| is empty at this point.
+ DCHECK(buffers.empty());
+
// Send the config change so our client can read it once it parses the
// Status obtained via Run() below.
if (stream_->type() == Type::AUDIO) {
NOTREACHED() << "Unsupported config change encountered for type: "
<< stream_->type();
}
-
- std::move(callback).Run(Status::kConfigChanged, mojom::DecoderBufferPtr(),
- audio_config, video_config);
+ std::move(callback).Run(Status::kConfigChanged, {}, audio_config,
+ video_config);
return;
}
LOG_IF(INFO, log_on) << __func__ << " ("
<< media::DemuxerStream::GetTypeName(stream_->type())
<< ") kNeedBuffer!";
- std::move(callback).Run(Status::kNeedBuffer, mojom::DecoderBufferPtr(),
- audio_config, video_config);
+ std::move(callback).Run(Status::kNeedBuffer, {}, audio_config,
+ video_config);
return;
}
#endif
if (status == Status::kAborted) {
- std::move(callback).Run(Status::kAborted, mojom::DecoderBufferPtr(),
- audio_config, video_config);
+ std::move(callback).Run(Status::kAborted, {}, audio_config, video_config);
return;
}
DCHECK_EQ(status, Status::kOk);
-#if BUILDFLAG(IS_TIZEN_TV)
- if (!buffer.get()) {
- LOG(ERROR) << __func__ << " ; buffer is nullptr";
- return;
- }
-#endif
-
- mojom::DecoderBufferPtr mojo_buffer =
- mojo_decoder_buffer_writer_->WriteDecoderBuffer(std::move(buffer));
- if (!mojo_buffer) {
- std::move(callback).Run(Status::kAborted, mojom::DecoderBufferPtr(),
- audio_config, video_config);
- return;
+ std::vector<mojom::DecoderBufferPtr> output_mojo_buffers;
+ for (auto& buffer : buffers) {
+ mojom::DecoderBufferPtr mojo_buffer =
+ mojo_decoder_buffer_writer_->WriteDecoderBuffer(std::move(buffer));
+ if (!mojo_buffer) {
+ std::move(callback).Run(Status::kAborted, {}, audio_config, video_config);
+ return;
+ }
+ output_mojo_buffers.emplace_back(std::move(mojo_buffer));
}
// TODO(dalecurtis): Once we can write framed data to the DataPipe, fill via
// the producer handle and then read more to keep the pipe full. Waiting for
// space can be accomplished using an AsyncWaiter.
- std::move(callback).Run(status, std::move(mojo_buffer), audio_config,
+ std::move(callback).Run(status, std::move(output_mojo_buffers), audio_config,
video_config);
}
// InitializeCallback and ReadCallback are defined in
// mojom::DemuxerStream.
void Initialize(InitializeCallback callback) override;
- void Read(ReadCallback callback) override;
+ void Read(uint32_t count, ReadCallback callback) override;
void EnableBitstreamConverter() override;
// Sets an error handler that will be called if a connection error occurs on
void OnBufferReady(ReadCallback callback,
Status status,
- scoped_refptr<DecoderBuffer> buffer);
+ media::DemuxerStream::DecoderBufferVector buffers);
mojo::Receiver<mojom::DemuxerStream> receiver_;
AudioDecoderConfig? audio_config,
VideoDecoderConfig? video_config);
- // Requests a DecoderBuffer from this stream for decoding and rendering.
+ // Requests multi DecoderBuffer from this stream for decoding and rendering.
// See media::DemuxerStream::ReadCB for a general explanation of the fields.
//
// Notes on the callback:
- // - If |status| is OK, |buffer| should be non-null and clients must fill out
- // the data section of the returned media::DecoderBuffer by reading from
- // the |pipe| provided during Initialize().
- // - If |status| is ABORTED, all other fields should be null.
+ // - If |status| is OK, the size of |batch_buffers| should be 1<=n<=count
+ // and clients must fill out the data section of the returned
+ // array<DecoderBuffer> by reading from the |pipe| provided during
+ // Initialize().
+ // - If |status| is ABORTED, |audio_config| and |video_config| should be null
+ // and size of |batch_buffers| is zero.
// - If |status| is CONFIG_CHANGED, the config for the stream type should be
- // non-null.
+ // non-null and size of |batch_buffers| is zero.
//
// TODO(dalecurtis): Remove this method in favor of serializing everything
// into the DataPipe given to Initialize() once DataPipe supports framed data
// in a nicer fashion.
- Read() => (Status status,
- DecoderBuffer? buffer,
- AudioDecoderConfig? audio_config,
- VideoDecoderConfig? video_config);
+ Read(uint32 count) => (Status status,
+ array<DecoderBuffer> batch_buffers,
+ AudioDecoderConfig? audio_config,
+ VideoDecoderConfig? video_config);
// Enables converting bitstream to a format that is expected by the decoder.
// For example, H.264/AAC bitstream based packets into H.264 Annex B format.
mojo::PendingRemote<mojom::DemuxerStream> demuxer_stream,
base::OnceClosure stream_ready_cb)
: demuxer_stream_(std::move(demuxer_stream)),
- stream_ready_cb_(std::move(stream_ready_cb)),
- type_(UNKNOWN) {
+ stream_ready_cb_(std::move(stream_ready_cb)) {
DVLOG(1) << __func__;
demuxer_stream_->Initialize(base::BindOnce(
&MojoDemuxerStreamAdapter::OnStreamReady, weak_factory_.GetWeakPtr()));
DVLOG(1) << __func__;
}
-void MojoDemuxerStreamAdapter::Read(ReadCB read_cb) {
+void MojoDemuxerStreamAdapter::Read(uint32_t count, ReadCB read_cb) {
DVLOG(3) << __func__;
// We shouldn't be holding on to a previous callback if a new Read() came in.
DCHECK(!read_cb_);
read_cb_ = std::move(read_cb);
- demuxer_stream_->Read(base::BindOnce(&MojoDemuxerStreamAdapter::OnBufferReady,
+ demuxer_stream_->Read(count,
+ base::BindOnce(&MojoDemuxerStreamAdapter::OnBufferReady,
weak_factory_.GetWeakPtr()));
}
void MojoDemuxerStreamAdapter::OnBufferReady(
Status status,
- mojom::DecoderBufferPtr buffer,
+ std::vector<mojom::DecoderBufferPtr> batch_buffers,
const absl::optional<AudioDecoderConfig>& audio_config,
const absl::optional<VideoDecoderConfig>& video_config) {
- DVLOG(3) << __func__;
+ DVLOG(3) << __func__ << ": status=" << status
+ << ", batch_buffers.size=" << batch_buffers.size();
DCHECK(read_cb_);
DCHECK_NE(type_, UNKNOWN);
if (status == kConfigChanged) {
UpdateConfig(std::move(audio_config), std::move(video_config));
- std::move(read_cb_).Run(kConfigChanged, nullptr);
+ std::move(read_cb_).Run(kConfigChanged, {});
return;
}
if (status == kAborted) {
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
return;
}
#if defined(TIZEN_MULTIMEDIA)
if (status == kNeedBuffer) {
- std::move(read_cb_).Run(kNeedBuffer, nullptr);
+ std::move(read_cb_).Run(kNeedBuffer, {});
return;
}
#endif
DCHECK_EQ(status, kOk);
- mojo_decoder_buffer_reader_->ReadDecoderBuffer(
- std::move(buffer), base::BindOnce(&MojoDemuxerStreamAdapter::OnBufferRead,
- weak_factory_.GetWeakPtr()));
+ status_ = status;
+ actual_read_count_ = batch_buffers.size();
+ for (mojom::DecoderBufferPtr& buffer : batch_buffers) {
+ mojo_decoder_buffer_reader_->ReadDecoderBuffer(
+ std::move(buffer),
+ base::BindOnce(&MojoDemuxerStreamAdapter::OnBufferRead,
+ weak_factory_.GetWeakPtr()));
+ }
}
void MojoDemuxerStreamAdapter::OnBufferRead(
scoped_refptr<DecoderBuffer> buffer) {
if (!buffer) {
- std::move(read_cb_).Run(kAborted, nullptr);
+ std::move(read_cb_).Run(kAborted, {});
+ buffer_queue_.clear();
return;
}
- std::move(read_cb_).Run(kOk, buffer);
+ buffer_queue_.push_back(buffer);
+ if (buffer_queue_.size() == actual_read_count_) {
+ std::move(read_cb_).Run(status_, buffer_queue_);
+ actual_read_count_ = 0;
+ buffer_queue_.clear();
+ }
}
void MojoDemuxerStreamAdapter::UpdateConfig(
const absl::optional<AudioDecoderConfig>& audio_config,
const absl::optional<VideoDecoderConfig>& video_config) {
- DCHECK_NE(type_, UNKNOWN);
+ DCHECK_NE(type_, Type::UNKNOWN);
switch(type_) {
case AUDIO:
~MojoDemuxerStreamAdapter() override;
// DemuxerStream implementation.
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
Type type() const override;
// The callback from |demuxer_stream_| that a read operation has completed.
// |read_cb| is a callback from the client who invoked Read() on |this|.
void OnBufferReady(Status status,
- mojom::DecoderBufferPtr buffer,
+ std::vector<mojom::DecoderBufferPtr> batch_buffers,
const absl::optional<AudioDecoderConfig>& audio_config,
const absl::optional<VideoDecoderConfig>& video_config);
AudioDecoderConfig audio_config_;
VideoDecoderConfig video_config_;
- Type type_;
+ Type type_ = Type::UNKNOWN;
+ Status status_ = Status::kOk;
+
+ size_t actual_read_count_ = 0;
+
+ DemuxerStream::DecoderBufferVector buffer_queue_;
std::unique_ptr<MojoDecoderBufferReader> mojo_decoder_buffer_reader_;
return;
}
demuxer_stream_->Read(
- base::BindOnce(&DemuxerStreamAdapter::OnNewBuffer,
- request_buffer_weak_factory_.GetWeakPtr()));
+ 1, base::BindOnce(&DemuxerStreamAdapter::OnNewBuffersRead,
+ request_buffer_weak_factory_.GetWeakPtr()));
+}
+
+void DemuxerStreamAdapter::OnNewBuffersRead(
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers_queue) {
+ DCHECK_LE(buffers_queue.size(), 1u)
+ << "DemuxerStreamAdapter only reads a single-buffer.";
+ OnNewBuffer(status,
+ buffers_queue.empty() ? nullptr : std::move(buffers_queue[0]));
}
void DemuxerStreamAdapter::OnNewBuffer(DemuxerStream::Status status,
void SendReadAck();
// Callback function when retrieving data from demuxer.
+ void OnNewBuffersRead(DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers_queue);
void OnNewBuffer(DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> input);
+
// Write the current frame into the mojo data pipe. OnFrameWritten() will be
// called when the writing has finished.
void WriteFrame();
TEST_F(DemuxerStreamAdapterTest, SingleReadUntil) {
// Read will be called once since it doesn't return frame buffer in the dummy
// implementation.
- EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1);
+ EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
demuxer_stream_adapter_->FakeReadUntil(3, 999);
RunPendingTasks();
TEST_F(DemuxerStreamAdapterTest, MultiReadUntil) {
// Read will be called once since it doesn't return frame buffer in the dummy
// implementation, and 2nd one will not proceed when there is ongoing read.
- EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1);
+ EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
demuxer_stream_adapter_->FakeReadUntil(1, 100);
RunPendingTasks();
}
TEST_F(DemuxerStreamAdapterTest, WriteOneFrameSmallerThanCapacity) {
- EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1);
+ EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
// Sends a frame with size 50 bytes, pts = 1 and key frame.
demuxer_stream_->CreateFakeFrame(50, true, 1 /* pts */);
demuxer_stream_adapter_->FakeReadUntil(1, 999);
}
TEST_F(DemuxerStreamAdapterTest, WriteOneFrameLargerThanCapacity) {
- EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1);
+ EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
// Sends a frame with size 800 bytes, pts = 1 and key frame.
demuxer_stream_->CreateFakeFrame(800, true, 1 /* pts */);
demuxer_stream_adapter_->FakeReadUntil(1, 999);
}
TEST_F(DemuxerStreamAdapterTest, SendFrameAndSignalFlushMix) {
- EXPECT_CALL(*demuxer_stream_, Read(_)).Times(4);
+ EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(4);
// Sends a frame with size 50 bytes, pts = 1 and key frame.
demuxer_stream_->CreateFakeFrame(50, true, 1 /* pts */);
// Issues ReadUntil request with frame count up to 1 (fetch #0).
}
TEST_F(DemuxerStreamAdapterTest, ClosingDataPipeCausesWriteError) {
- EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1);
+ EXPECT_CALL(*demuxer_stream_, Read(_, _)).Times(1);
std::vector<StopTrigger> errors;
demuxer_stream_adapter_->TakeErrors(&errors);
rect, size, std::vector<uint8_t>(),
EncryptionScheme::kUnencrypted);
}
- ON_CALL(*this, Read(_))
+ ON_CALL(*this, Read)
.WillByDefault(Invoke(this, &FakeDemuxerStream::FakeRead));
}
FakeDemuxerStream::~FakeDemuxerStream() = default;
-void FakeDemuxerStream::FakeRead(ReadCB read_cb) {
+// Only return one buffer at a time so we ignore the count.
+void FakeDemuxerStream::FakeRead(uint32_t /*count*/, ReadCB read_cb) {
if (buffer_queue_.empty()) {
// Silent return to simulate waiting for buffer available.
pending_read_cb_ = std::move(read_cb);
}
scoped_refptr<DecoderBuffer> buffer = buffer_queue_.front();
buffer_queue_.pop_front();
- std::move(read_cb).Run(kOk, buffer);
+ std::move(read_cb).Run(kOk, {std::move(buffer)});
}
AudioDecoderConfig FakeDemuxerStream::audio_decoder_config() {
if (!pending_read_cb_) {
buffer_queue_.push_back(input_buffer);
} else {
- std::move(pending_read_cb_).Run(kOk, input_buffer);
+ std::move(pending_read_cb_).Run(kOk, {std::move(input_buffer)});
}
}
~FakeDemuxerStream() override;
// DemuxerStream implementation.
- MOCK_METHOD1(Read, void(ReadCB read_cb));
- void FakeRead(ReadCB read_cb);
+ MOCK_METHOD2(Read, void(uint32_t count, ReadCB read_cb));
+ void FakeRead(uint32_t count, ReadCB read_cb);
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
Type type() const override;
read_until_sent_ = true;
}
-void StreamProvider::MediaStream::Read(ReadCB read_cb) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
+// Only return one buffer at a time so we ignore the count.
+void StreamProvider::MediaStream::Read(uint32_t /*count*/, ReadCB read_cb) {
+ DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
DCHECK(read_complete_callback_.is_null());
DCHECK(read_cb);
video_decoder_config_ = next_video_decoder_config_;
next_video_decoder_config_ = media::VideoDecoderConfig();
}
- std::move(read_complete_callback_).Run(status, nullptr);
+ std::move(read_complete_callback_).Run(status, {});
return;
case DemuxerStream::kAborted:
case DemuxerStream::kError:
- std::move(read_complete_callback_).Run(status, nullptr);
+ std::move(read_complete_callback_).Run(status, {});
return;
case DemuxerStream::kOk:
DCHECK(read_complete_callback_);
scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
buffers_.pop_front();
++current_frame_count_;
- std::move(read_complete_callback_).Run(status, frame_data);
+ std::move(read_complete_callback_).Run(status, {frame_data});
return;
}
}
const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner);
// DemuxerStream implementation.
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
DemuxerStream::Type type() const override;
return stream_provider_->video_stream_->current_frame_count_;
}
- void OnBufferReadFromDemuxerStream(DemuxerStream::Type type,
- DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ void OnBufferReadFromDemuxerStream(
+ DemuxerStream::Type type,
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
EXPECT_EQ(status, DemuxerStream::Status::kOk);
+ EXPECT_EQ(buffers.size(), 1u)
+ << "StreamProviderTest only reads a single-buffer.";
+ scoped_refptr<DecoderBuffer> buffer = std::move(buffers[0]);
switch (type) {
case DemuxerStream::Type::AUDIO:
received_audio_buffer_ = buffer;
EXPECT_TRUE(stream_provider_initialized_);
audio_stream_->Read(
- base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream,
- base::Unretained(this), DemuxerStream::Type::AUDIO));
+ 1, base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream,
+ base::Unretained(this), DemuxerStream::Type::AUDIO));
task_environment_.RunUntilIdle();
EXPECT_EQ(audio_buffer_->data_size(), received_audio_buffer_->data_size());
EXPECT_EQ(audio_buffer_->end_of_stream(),
received_audio_buffer_->is_key_frame());
video_stream_->Read(
- base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream,
- base::Unretained(this), DemuxerStream::Type::VIDEO));
+ 1, base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream,
+ base::Unretained(this), DemuxerStream::Type::VIDEO));
task_environment_.RunUntilIdle();
EXPECT_EQ(video_buffer_->end_of_stream(),
received_video_buffer_->end_of_stream());
return;
}
scoped_refptr<DecoderBuffer> decoder_buffer(new DecoderBuffer(0));
- std::move(read_cb).Run(DemuxerStream::kOk, decoder_buffer);
+ std::move(read_cb).Run(DemuxerStream::kOk, {std::move(decoder_buffer)});
}
bool IsDemuxerStalled() { return !!stalled_demixer_read_cb_; }
DCHECK(decode_cb_);
// Return EOS buffer to trigger EOS frame.
+ DemuxerStream::DecoderBufferVector buffers;
+ buffers.emplace_back(DecoderBuffer::CreateEOSBuffer());
EXPECT_CALL(demuxer_stream_, OnRead(_))
- .WillOnce(RunOnceCallback<0>(DemuxerStream::kOk,
- DecoderBuffer::CreateEOSBuffer()));
+ .WillOnce(RunOnceCallback<0>(DemuxerStream::kOk, buffers));
// Satify pending |decode_cb_| to trigger a new DemuxerStream::Read().
main_thread_task_runner_->PostTask(
// Test hook for to specify a custom buffer duration.
decoder_buffer->set_duration(buffer_duration_);
- std::move(read_cb).Run(DemuxerStream::kOk, decoder_buffer);
+ std::move(read_cb).Run(DemuxerStream::kOk, {std::move(decoder_buffer)});
}
bool IsDemuxerStalled() { return !!stalled_demixer_read_cb_; }
DCHECK(decode_cb_);
// Return EOS buffer to trigger EOS frame.
+ DemuxerStream::DecoderBufferVector buffers;
+ buffers.emplace_back(DecoderBuffer::CreateEOSBuffer());
EXPECT_CALL(demuxer_stream_, OnRead(_))
- .WillOnce(RunOnceCallback<0>(DemuxerStream::kOk,
- DecoderBuffer::CreateEOSBuffer()));
+ .WillOnce(RunOnceCallback<0>(DemuxerStream::kOk, buffers));
// Satify pending |decode_cb_| to trigger a new DemuxerStream::Read().
task_environment_.GetMainThreadTaskRunner()->PostTask(
.WillRepeatedly(Return(true));
// Signal a config change at the next DemuxerStream::Read().
+ DemuxerStream::DecoderBufferVector buffers;
EXPECT_CALL(demuxer_stream_, OnRead(_))
- .WillOnce(RunOnceCallback<0>(DemuxerStream::kConfigChanged, nullptr));
+ .WillOnce(RunOnceCallback<0>(DemuxerStream::kConfigChanged, buffers));
// Use LargeEncrypted config (non-default) to ensure its plumbed through to
// callback.
#include <mferror.h>
#include "base/bind.h"
+#include "base/task/sequenced_task_runner.h"
+#include "base/trace_event/base_tracing.h"
#include "media/base/video_codecs.h"
#include "media/base/win/mf_helpers.h"
#include "media/renderers/win/media_foundation_audio_stream.h"
using Microsoft::WRL::ComPtr;
namespace {
+
+// Requested buffer count. The actual returned buffer count could be less
+// according to DemuxerStream::Read() API.
+const uint32_t kBatchReadCount = 1;
+
// |guid_string| is a binary serialization of a GUID in network byte order
// format.
GUID GetGUIDFromString(const std::string& guid_string) {
return time.InNanoseconds() / 100;
}
+PendingInputBuffer::PendingInputBuffer(DemuxerStream::Status status,
+ scoped_refptr<DecoderBuffer> buffer)
+ : status(status), buffer(std::move(buffer)) {}
+
+PendingInputBuffer::PendingInputBuffer(DemuxerStream::Status status)
+ : status(status) {}
+
+PendingInputBuffer::PendingInputBuffer(const PendingInputBuffer& other) =
+ default;
+
+PendingInputBuffer::~PendingInputBuffer() = default;
+
} // namespace
MediaFoundationStreamWrapper::MediaFoundationStreamWrapper() = default;
base::AutoLock auto_lock(lock_);
flushed_ = flushed;
if (flushed_) {
+ DVLOG_FUNC(2) << "flush buffer_queue_";
+ buffer_queue_.clear();
while (!post_flush_buffers_.empty()) {
post_flush_buffers_.pop();
}
return;
}
- if (!demuxer_stream_ || pending_stream_read_)
+ if (!demuxer_stream_) {
return;
+ }
- demuxer_stream_->Read(
- base::BindOnce(&MediaFoundationStreamWrapper::OnDemuxerStreamRead,
- weak_factory_.GetWeakPtr()));
- pending_stream_read_ = true;
+ base::AutoLock auto_lock(lock_);
+ if (!buffer_queue_.empty()) {
+ // Using queued buffer for multi buffers read from Renderer process. If
+ // a valid buffer already exists in queued buffer, return the buffer
+ // directly without IPC calls for buffer requested from MediaEngine.
+ OnDemuxerStreamRead(buffer_queue_.front().status,
+ std::move(buffer_queue_.front().buffer));
+ buffer_queue_.pop_front();
+ return;
+ }
+
+ // Request multi buffers by sending IPC to 'MojoDemuxerStreamImpl'.
+ if (!pending_stream_read_) {
+ DVLOG_FUNC(3) << " IPC send, BatchReadCount=" << kBatchReadCount;
+ TRACE_EVENT2("media", "MFGetBuffersFromRendererByIPC",
+ "StreamType:", DemuxerStream::GetTypeName(stream_type_),
+ "kBatchReadCount:", kBatchReadCount);
+ pending_stream_read_ = true;
+ demuxer_stream_->Read(
+ kBatchReadCount,
+ base::BindOnce(
+ &MediaFoundationStreamWrapper::OnDemuxerStreamReadBuffers,
+ weak_factory_.GetWeakPtr()));
+ }
+}
+
+void MediaFoundationStreamWrapper::OnDemuxerStreamReadBuffers(
+ DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers) {
+ DCHECK(task_runner_->RunsTasksInCurrentSequence());
+ DVLOG_FUNC(3) << "receive data, status="
+ << DemuxerStream::GetStatusName(status)
+ << ", buffer count= " << buffers.size()
+ << ", stream type=" << DemuxerStream::GetTypeName(stream_type_);
+ {
+ base::AutoLock auto_lock(lock_);
+ DCHECK(pending_stream_read_);
+ pending_stream_read_ = false;
+
+ DemuxerStream::DecoderBufferVector pending_buffers =
+ (status == DemuxerStream::Status::kOk)
+ ? std::move(buffers)
+ : DemuxerStream::DecoderBufferVector{nullptr};
+ for (auto& buffer : pending_buffers) {
+ DVLOG_FUNC(3) << "push buffer to buffer_queue_, status="
+ << DemuxerStream::GetStatusName(status) << ", buffer="
+ << (buffer ? buffer->AsHumanReadableString(false) : "null");
+ buffer_queue_.emplace_back(PendingInputBuffer(status, std::move(buffer)));
+ }
+ }
+
+ // Restart processing of queued requests when we receive buffers.
+ ProcessRequestsIfPossible();
}
HRESULT MediaFoundationStreamWrapper::ServiceSampleRequest(
DVLOG_FUNC(3) << "status=" << status
<< (buffer ? " buffer=" + buffer->AsHumanReadableString(true)
: "");
-
{
- base::AutoLock auto_lock(lock_);
- DCHECK(pending_stream_read_);
- pending_stream_read_ = false;
-
+ lock_.AssertAcquired();
ComPtr<IUnknown> token = pending_sample_request_tokens_.front();
HRESULT hr = S_OK;
}
}
- ProcessRequestsIfPossible();
+ // ProcessRequestsIfPossible calls OnDemuxerStreamRead, OnDemuxerStreamRead
+ // calls ProcessRequestsIfPossible, so use PostTask to avoid deadlock here.
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(&MediaFoundationStreamWrapper::ProcessRequestsIfPossible,
+ weak_factory_.GetWeakPtr()));
}
HRESULT MediaFoundationStreamWrapper::GenerateSampleFromDecoderBuffer(
DWORD cipher_bytes = 0;
};
+struct PendingInputBuffer {
+ PendingInputBuffer(DemuxerStream::Status status,
+ scoped_refptr<media::DecoderBuffer> buffer);
+ explicit PendingInputBuffer(DemuxerStream::Status status);
+ PendingInputBuffer(const PendingInputBuffer& other);
+ ~PendingInputBuffer();
+
+ DemuxerStream::Status status;
+ scoped_refptr<media::DecoderBuffer> buffer;
+};
+
} // namespace
// IMFMediaStream implementation
void ProcessRequestsIfPossible();
void OnDemuxerStreamRead(DemuxerStream::Status status,
scoped_refptr<DecoderBuffer> buffer);
+ // Receive the data from MojoDemuxerStreamAdapter.
+ void OnDemuxerStreamReadBuffers(DemuxerStream::Status status,
+ DemuxerStream::DecoderBufferVector buffers);
// IMFMediaStream implementation - it is in general running in MF threadpool
// thread.
// If true, there is a pending a read completion from Chromium media stack.
bool pending_stream_read_ = false;
+ // Maintain the buffer obtained by batch read. We push buffer into
+ // |buffer_queue_| by OnDemuxerStreamReadBuffers(), pop buffer by
+ // ProcessRequestsIfPossible(), these two operations are both on media stack
+ // thread. SetFlush() can be invoked by media stack thread or MF threadpool
+ // thread, it clears the buffer in |buffer_queue_|. So |buffer_queue_| needs
+ // to be guardedby the lock.
+ std::deque<PendingInputBuffer> buffer_queue_ GUARDED_BY(lock_);
+
bool stream_ended_ = false;
GUID last_key_id_ = GUID_NULL;
~NullDemuxerStream() override = default;
- void Read(ReadCB read_cb) override { NOTREACHED(); }
+ void Read(uint32_t count, ReadCB read_cb) override { NOTREACHED(); }
void Configure(DecoderConfigType config);
~InternalDemuxerStream() override = default;
- // DemuxerStream
- void Read(ReadCB read_cb) override {
+ // DemuxerStream, only return one buffer at a time so we ignore the count.
+ void Read(uint32_t /*count*/, ReadCB read_cb) override {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!pending_read_);
pending_read_ = std::move(read_cb);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
buffers_.clear();
if (pending_read_)
- std::move(pending_read_).Run(DemuxerStream::Status::kAborted, nullptr);
+ std::move(pending_read_).Run(DemuxerStream::Status::kAborted, {});
}
// If enabled, we'll drop any queued buffers when we're given a keyframe.
// change first, and keep the buffer for the next call.
if (buffers_.front()->new_config) {
config_ = std::move(*(buffers_.front()->new_config));
- std::move(pending_read_)
- .Run(DemuxerStream::Status::kConfigChanged, nullptr);
+ std::move(pending_read_).Run(DemuxerStream::Status::kConfigChanged, {});
return;
}
buffers_.pop_front();
std::move(pending_read_)
- .Run(DemuxerStream::Status::kOk, std::move(pending_buffer->buffer));
+ .Run(DemuxerStream::Status::kOk, {std::move(pending_buffer->buffer)});
}
media::VideoDecoderConfig config_;
//////////////////// media::DemuxerStream ////////////////////
-void MediaStreamImpl::Read(ReadCB read_cb) {
+void MediaStreamImpl::Read(uint32_t count, ReadCB read_cb) {
EMSS_VERBOSE_TYPED();
auto dispatcher = dispatcher_.lock();
base::AutoLock lock{stream_lock_};
read_cb_ = media::BindToCurrentLoop(std::move(read_cb));
+ read_count_ = count;
if (!dispatcher || aborting_until_seek_) {
EMSS_VERBOSE_TYPED() << "Read aborted, aborting until seek = "
void MediaStreamImpl::AbortReadCb() {
EMSS_VERBOSE_TYPED();
EMSS_LOG_ASSERT(read_cb_);
- std::move(read_cb_).Run(media::DemuxerStream::kAborted, nullptr);
+ std::move(read_cb_).Run(media::DemuxerStream::kAborted, {});
}
void MediaStreamImpl::RunReadCb(scoped_refptr<media::DecoderBuffer> buffer) {
EMSS_VERBOSE_TYPED() << buffer->AsHumanReadableString();
EMSS_LOG_ASSERT(read_cb_);
- std::move(read_cb_).Run(media::DemuxerStream::kOk, std::move(buffer));
+ std::move(read_cb_).Run(media::DemuxerStream::kOk, {std::move(buffer)});
}
} // namespace any_thread
~MediaStreamImpl() override;
// media::DemuxerStream:
- void Read(ReadCB read_cb) override;
+ void Read(uint32_t count, ReadCB read_cb) override;
media::AudioDecoderConfig audio_decoder_config() override;
media::VideoDecoderConfig video_decoder_config() override;
Type type() const override;
bool is_read_pending_ GUARDED_BY(stream_lock_);
std::queue<Packet> packets_ GUARDED_BY(stream_lock_);
ReadCB read_cb_ GUARDED_BY(stream_lock_);
+ uint32_t read_count_ GUARDED_BY(stream_lock_);
uint32_t session_id_ GUARDED_BY(stream_lock_);
base::Lock stream_lock_;
}
SetReadRequested(type, true);
- GetDemuxerStream(type)->Read(
+
+ constexpr uint32_t kBufferReadCount = 1;
+ GetDemuxerStream(type)->Read(kBufferReadCount,
base::BindOnce(&MediaPlayerESPlusPlayer::OnBufferReady,
weak_factory_.GetWeakPtr(), type));
}
void MediaPlayerESPlusPlayer::OnBufferReady(
DemuxerStream::Type type,
DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer) {
+ DemuxerStream::DecoderBufferVector buffers) {
bool should_delay_read = false;
switch (status) {
case DemuxerStream::kAborted:
InitializeStreamConfig(type);
break;
case DemuxerStream::kOk: {
- GetBufferQueue(type).push_back(buffer);
+ for (auto const& buffer: buffers) {
+ GetBufferQueue(type).push_back(buffer);
+ }
break;
}
}
void PostReadBuffer(DemuxerStream::Type type);
void OnBufferReady(DemuxerStream::Type type,
DemuxerStream::Status status,
- scoped_refptr<DecoderBuffer> buffer);
+ DemuxerStream::DecoderBufferVector buffers);
+ bool should_delay_read = false;
esplusplayer_submit_status SubmitEosPacket(DemuxerStream::Type type);
void SeekInternal(base::TimeDelta time);