1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/remoting/stream_provider.h"
8 #include "base/containers/circular_deque.h"
9 #include "base/functional/bind.h"
10 #include "base/functional/callback.h"
11 #include "base/functional/callback_helpers.h"
12 #include "base/logging.h"
13 #include "base/task/bind_post_task.h"
14 #include "base/task/sequenced_task_runner.h"
15 #include "base/task/single_thread_task_runner.h"
16 #include "media/base/decoder_buffer.h"
17 #include "media/base/demuxer.h"
18 #include "media/base/video_transformation.h"
19 #include "media/cast/openscreen/remoting_proto_enum_utils.h"
20 #include "media/cast/openscreen/remoting_proto_utils.h"
21 #include "media/mojo/common/mojo_decoder_buffer_converter.h"
22 #include "media/remoting/receiver_controller.h"
23 #include "third_party/openscreen/src/cast/streaming/rpc_messenger.h"
25 using openscreen::cast::RpcMessenger;
31 // The number of frames requested in each ReadUntil RPC message.
32 constexpr int kNumFramesInEachReadUntil = 10;
36 void StreamProvider::MediaStream::CreateOnMainThread(
37 RpcMessenger* rpc_messenger,
40 const scoped_refptr<base::SequencedTaskRunner>& media_task_runner,
41 base::OnceCallback<void(MediaStream::UniquePtr)> callback) {
42 MediaStream::UniquePtr stream(
43 new MediaStream(rpc_messenger, type, handle, media_task_runner),
45 std::move(callback).Run(std::move(stream));
49 void StreamProvider::MediaStream::DestructionHelper(MediaStream* stream) {
53 StreamProvider::MediaStream::MediaStream(
54 RpcMessenger* rpc_messenger,
57 const scoped_refptr<base::SequencedTaskRunner>& media_task_runner)
58 : main_task_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()),
59 media_task_runner_(media_task_runner),
60 rpc_messenger_(rpc_messenger),
62 remote_handle_(remote_handle),
63 rpc_handle_(rpc_messenger_->GetUniqueHandle()) {
64 DCHECK(remote_handle_ != RpcMessenger::kInvalidHandle);
66 media_weak_this_ = media_weak_factory_.GetWeakPtr();
68 auto receive_callback = base::BindPostTask(
70 BindRepeating(&MediaStream::OnReceivedRpc, media_weak_this_));
71 rpc_messenger_->RegisterMessageReceiverCallback(
72 rpc_handle_, [cb = std::move(receive_callback)](
73 std::unique_ptr<openscreen::cast::RpcMessage> message) {
74 cb.Run(std::move(message));
78 StreamProvider::MediaStream::~MediaStream() {
79 DCHECK(main_task_runner_->BelongsToCurrentThread());
80 rpc_messenger_->UnregisterMessageReceiverCallback(rpc_handle_);
83 void StreamProvider::MediaStream::Destroy() {
84 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
86 // Invalid weak pointers to prevent |this| from receiving RPC calls on the
88 media_weak_factory_.InvalidateWeakPtrs();
90 // Unbind all mojo pipes and bindings.
92 decoder_buffer_reader_.reset();
94 // After invalidating all weak ptrs of |media_weak_factory_|, MediaStream
95 // won't be access anymore, so using |this| here is safe.
96 main_task_runner_->DeleteSoon(FROM_HERE, this);
99 void StreamProvider::MediaStream::SendRpcMessageOnMainThread(
100 std::unique_ptr<openscreen::cast::RpcMessage> message) {
101 // |rpc_messenger_| is owned by |receiver_controller_| which is a singleton
102 // per process, so it's safe to use Unretained() here.
103 main_task_runner_->PostTask(
104 FROM_HERE, base::BindOnce(&RpcMessenger::SendMessageToRemote,
105 base::Unretained(rpc_messenger_), *message));
108 void StreamProvider::MediaStream::Initialize(
109 base::OnceClosure init_done_callback) {
110 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
111 DCHECK(init_done_callback);
113 if (init_done_callback_) {
114 OnError("Duplicate initialization");
118 init_done_callback_ = std::move(init_done_callback);
120 auto rpc = std::make_unique<openscreen::cast::RpcMessage>();
121 rpc->set_handle(remote_handle_);
122 rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE);
123 rpc->set_integer_value(rpc_handle_);
124 SendRpcMessageOnMainThread(std::move(rpc));
127 void StreamProvider::MediaStream::InitializeDataPipe(
128 mojo::ScopedDataPipeConsumerHandle data_pipe) {
129 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
131 decoder_buffer_reader_ =
132 std::make_unique<MojoDecoderBufferReader>(std::move(data_pipe));
133 CompleteInitialize();
136 void StreamProvider::MediaStream::ReceiveFrame(uint32_t count,
137 mojom::DecoderBufferPtr buffer) {
138 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
139 DCHECK(decoder_buffer_reader_);
141 auto callback = base::BindPostTaskToCurrentDefault(
142 base::BindOnce(&MediaStream::AppendBuffer, media_weak_this_, count));
143 decoder_buffer_reader_->ReadDecoderBuffer(std::move(buffer),
144 std::move(callback));
147 void StreamProvider::MediaStream::FlushUntil(uint32_t count) {
148 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
150 if (count < current_frame_count_)
153 uint32_t buffers_to_erase = count - current_frame_count_;
155 if (buffers_to_erase > buffers_.size()) {
158 buffers_.erase(buffers_.begin(), buffers_.begin() + buffers_to_erase);
161 current_frame_count_ = count;
163 if (!read_complete_callback_.is_null())
164 CompleteRead(DemuxerStream::kAborted);
166 read_until_sent_ = false;
169 void StreamProvider::MediaStream::OnReceivedRpc(
170 std::unique_ptr<openscreen::cast::RpcMessage> message) {
171 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
172 DCHECK(message->handle() == rpc_handle_);
174 switch (message->proc()) {
175 case openscreen::cast::RpcMessage::RPC_DS_INITIALIZE_CALLBACK:
176 OnInitializeCallback(std::move(message));
178 case openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK:
179 OnReadUntilCallback(std::move(message));
182 VLOG(3) << __func__ << "Unknow RPC message.";
186 void StreamProvider::MediaStream::OnInitializeCallback(
187 std::unique_ptr<openscreen::cast::RpcMessage> message) {
188 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
189 const openscreen::cast::DemuxerStreamInitializeCallback callback_message =
190 message->demuxerstream_initializecb_rpc();
191 if (callback_message.type() != type_) {
192 OnError("Wrong type");
196 if ((type_ == DemuxerStream::AUDIO &&
197 audio_decoder_config_.IsValidConfig()) ||
198 (type_ == DemuxerStream::VIDEO &&
199 video_decoder_config_.IsValidConfig())) {
200 OnError("Duplicate initialization");
204 if (type_ == DemuxerStream::AUDIO &&
205 callback_message.has_audio_decoder_config()) {
206 const openscreen::cast::AudioDecoderConfig audio_message =
207 callback_message.audio_decoder_config();
208 media::cast::ConvertProtoToAudioDecoderConfig(audio_message,
209 &audio_decoder_config_);
210 if (!audio_decoder_config_.IsValidConfig()) {
211 OnError("Invalid audio config");
214 } else if (type_ == DemuxerStream::VIDEO &&
215 callback_message.has_video_decoder_config()) {
216 const openscreen::cast::VideoDecoderConfig video_message =
217 callback_message.video_decoder_config();
218 media::cast::ConvertProtoToVideoDecoderConfig(video_message,
219 &video_decoder_config_);
220 if (!video_decoder_config_.IsValidConfig()) {
221 OnError("Invalid video config");
225 OnError("Config missing");
229 rpc_initialized_ = true;
230 CompleteInitialize();
233 void StreamProvider::MediaStream::CompleteInitialize() {
234 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
236 // Initialization finished when received RPC_DS_INITIALIZE_CALLBACK and
237 // |decoder_buffer_reader_| is created.
238 if (!rpc_initialized_ || !decoder_buffer_reader_)
241 if (!init_done_callback_) {
242 OnError("Initialize callback missing");
246 std::move(init_done_callback_).Run();
249 void StreamProvider::MediaStream::OnReadUntilCallback(
250 std::unique_ptr<openscreen::cast::RpcMessage> message) {
251 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
253 if (!read_until_sent_) {
254 OnError("Unexpected ReadUntilCallback");
257 read_until_sent_ = false;
258 const openscreen::cast::DemuxerStreamReadUntilCallback callback_message =
259 message->demuxerstream_readuntilcb_rpc();
260 total_received_frame_count_ = callback_message.count();
262 if (media::cast::ToDemuxerStreamStatus(callback_message.status()) ==
264 if (callback_message.has_audio_decoder_config()) {
265 const openscreen::cast::AudioDecoderConfig audio_message =
266 callback_message.audio_decoder_config();
267 UpdateAudioConfig(audio_message);
270 if (callback_message.has_video_decoder_config()) {
271 const openscreen::cast::VideoDecoderConfig video_message =
272 callback_message.video_decoder_config();
273 UpdateVideoConfig(video_message);
276 if (buffers_.empty() && !read_complete_callback_.is_null())
277 CompleteRead(DemuxerStream::kConfigChanged);
282 if (buffers_.empty() && !read_complete_callback_.is_null())
286 void StreamProvider::MediaStream::UpdateAudioConfig(
287 const openscreen::cast::AudioDecoderConfig& audio_message) {
288 DCHECK(type_ == AUDIO);
289 AudioDecoderConfig audio_config;
290 media::cast::ConvertProtoToAudioDecoderConfig(audio_message, &audio_config);
291 if (!audio_config.IsValidConfig()) {
292 OnError("Invalid audio config");
295 next_audio_decoder_config_ = audio_config;
298 void StreamProvider::MediaStream::UpdateVideoConfig(
299 const openscreen::cast::VideoDecoderConfig& video_message) {
300 DCHECK(type_ == VIDEO);
301 VideoDecoderConfig video_config;
302 media::cast::ConvertProtoToVideoDecoderConfig(video_message, &video_config);
303 if (!video_config.IsValidConfig()) {
304 OnError("Invalid video config");
307 next_video_decoder_config_ = video_config;
310 void StreamProvider::MediaStream::SendReadUntil() {
311 if (read_until_sent_)
314 std::unique_ptr<openscreen::cast::RpcMessage> rpc(
315 new openscreen::cast::RpcMessage());
316 rpc->set_handle(remote_handle_);
317 rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_READUNTIL);
318 auto* message = rpc->mutable_demuxerstream_readuntil_rpc();
319 message->set_count(total_received_frame_count_ + kNumFramesInEachReadUntil);
320 message->set_callback_handle(rpc_handle_);
321 SendRpcMessageOnMainThread(std::move(rpc));
322 read_until_sent_ = true;
325 // Only return one buffer at a time so we ignore the count.
326 void StreamProvider::MediaStream::Read(uint32_t /*count*/, ReadCB read_cb) {
327 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
328 DCHECK(read_complete_callback_.is_null());
331 read_complete_callback_ = std::move(read_cb);
332 if (buffers_.empty() && (next_audio_decoder_config_.IsValidConfig() ||
333 next_video_decoder_config_.IsValidConfig())) {
334 CompleteRead(DemuxerStream::kConfigChanged);
338 // Wait for more data.
339 if (buffers_.empty()) {
344 CompleteRead(DemuxerStream::kOk);
347 void StreamProvider::MediaStream::CompleteRead(DemuxerStream::Status status) {
348 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
351 case DemuxerStream::kConfigChanged:
352 if (next_audio_decoder_config_.IsValidConfig()) {
353 audio_decoder_config_ = next_audio_decoder_config_;
354 next_audio_decoder_config_ = media::AudioDecoderConfig();
356 if (next_video_decoder_config_.IsValidConfig()) {
357 video_decoder_config_ = next_video_decoder_config_;
358 next_video_decoder_config_ = media::VideoDecoderConfig();
360 std::move(read_complete_callback_).Run(status, {});
362 case DemuxerStream::kAborted:
363 case DemuxerStream::kError:
364 std::move(read_complete_callback_).Run(status, {});
366 case DemuxerStream::kOk:
367 DCHECK(read_complete_callback_);
368 DCHECK(!buffers_.empty());
369 DCHECK_LT(current_frame_count_, buffered_frame_count_);
370 scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
371 buffers_.pop_front();
372 ++current_frame_count_;
373 std::move(read_complete_callback_).Run(status, {frame_data});
378 AudioDecoderConfig StreamProvider::MediaStream::audio_decoder_config() {
379 DCHECK(type_ == DemuxerStream::AUDIO);
380 return audio_decoder_config_;
383 VideoDecoderConfig StreamProvider::MediaStream::video_decoder_config() {
384 DCHECK(type_ == DemuxerStream::VIDEO);
385 return video_decoder_config_;
388 DemuxerStream::Type StreamProvider::MediaStream::type() const {
392 StreamLiveness StreamProvider::MediaStream::liveness() const {
393 return StreamLiveness::kLive;
396 bool StreamProvider::MediaStream::SupportsConfigChanges() {
400 void StreamProvider::MediaStream::AppendBuffer(
402 scoped_refptr<DecoderBuffer> buffer) {
403 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
405 // Drop flushed frame.
406 if (count < current_frame_count_)
410 DCHECK(buffers_.empty() || buffered_frame_count_ == count);
412 buffers_.push_back(buffer);
413 buffered_frame_count_ = count + 1;
415 if (!read_complete_callback_.is_null())
416 CompleteRead(DemuxerStream::kOk);
419 void StreamProvider::MediaStream::OnError(const std::string& error) {
420 auto rpc = std::make_unique<openscreen::cast::RpcMessage>();
421 rpc->set_handle(remote_handle_);
422 rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_ONERROR);
423 SendRpcMessageOnMainThread(std::move(rpc));
426 StreamProvider::StreamProvider(
427 ReceiverController* receiver_controller,
428 const scoped_refptr<base::SequencedTaskRunner>& media_task_runner)
429 : main_task_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()),
430 media_task_runner_(media_task_runner),
431 receiver_controller_(receiver_controller),
432 rpc_messenger_(receiver_controller_->rpc_messenger()) {
433 DCHECK(receiver_controller_);
434 DCHECK(rpc_messenger_);
436 media_weak_this_ = media_weak_factory_.GetWeakPtr();
438 auto callback = base::BindPostTask(
440 base::BindRepeating(&StreamProvider::OnReceivedRpc, media_weak_this_));
441 rpc_messenger_->RegisterMessageReceiverCallback(
442 RpcMessenger::kAcquireDemuxerHandle,
443 [cb = std::move(callback)](
444 std::unique_ptr<openscreen::cast::RpcMessage> message) {
445 cb.Run(std::move(message));
449 StreamProvider::~StreamProvider() {
450 DCHECK(main_task_runner_->BelongsToCurrentThread());
451 rpc_messenger_->UnregisterMessageReceiverCallback(
452 RpcMessenger::kAcquireDemuxerHandle);
455 std::string StreamProvider::GetDisplayName() const {
456 return "media::remoting::StreamProvider";
459 DemuxerType StreamProvider::GetDemuxerType() const {
460 return DemuxerType::kStreamProviderDemuxer;
463 void StreamProvider::Initialize(DemuxerHost* host,
464 PipelineStatusCallback status_cb) {
465 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
466 init_done_callback_ = std::move(status_cb);
467 CompleteInitialize();
470 void StreamProvider::AbortPendingReads() {}
472 void StreamProvider::StartWaitingForSeek(base::TimeDelta seek_time) {}
474 void StreamProvider::CancelPendingSeek(base::TimeDelta seek_time) {}
476 void StreamProvider::Seek(base::TimeDelta time,
477 PipelineStatusCallback seek_cb) {
478 media_task_runner_->PostTask(FROM_HERE,
479 base::BindOnce(std::move(seek_cb), PIPELINE_OK));
482 bool StreamProvider::IsSeekable() const {
486 void StreamProvider::Stop() {}
488 base::TimeDelta StreamProvider::GetStartTime() const {
489 return base::TimeDelta();
492 base::Time StreamProvider::GetTimelineOffset() const {
496 int64_t StreamProvider::GetMemoryUsage() const {
500 absl::optional<container_names::MediaContainerName>
501 StreamProvider::GetContainerForMetrics() const {
502 return absl::optional<container_names::MediaContainerName>();
505 void StreamProvider::OnEnabledAudioTracksChanged(
506 const std::vector<MediaTrack::Id>& track_ids,
507 base::TimeDelta curr_time,
508 TrackChangeCB change_completed_cb) {
509 std::vector<DemuxerStream*> streams;
510 std::move(change_completed_cb).Run(DemuxerStream::AUDIO, streams);
511 DVLOG(1) << "Track changes are not supported.";
514 void StreamProvider::OnSelectedVideoTrackChanged(
515 const std::vector<MediaTrack::Id>& track_ids,
516 base::TimeDelta curr_time,
517 TrackChangeCB change_completed_cb) {
518 std::vector<DemuxerStream*> streams;
519 std::move(change_completed_cb).Run(DemuxerStream::VIDEO, streams);
520 DVLOG(1) << "Track changes are not supported.";
523 void StreamProvider::Destroy() {
524 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
526 if (init_done_callback_)
527 std::move(init_done_callback_).Run(PIPELINE_ERROR_ABORT);
529 // Invalid weak pointers to prevent |this| from receiving RPC calls on the
531 media_weak_factory_.InvalidateWeakPtrs();
533 audio_stream_.reset();
534 video_stream_.reset();
536 // After invalidating all weak ptrs of |media_weak_factory_|, StreamProvider
537 // won't be access anymore, so using |this| here is safe.
538 main_task_runner_->DeleteSoon(FROM_HERE, this);
541 void StreamProvider::OnReceivedRpc(
542 std::unique_ptr<openscreen::cast::RpcMessage> message) {
543 switch (message->proc()) {
544 case openscreen::cast::RpcMessage::RPC_ACQUIRE_DEMUXER:
545 OnAcquireDemuxer(std::move(message));
548 VLOG(3) << __func__ << "Unknown RPC message.";
552 void StreamProvider::OnAcquireDemuxer(
553 std::unique_ptr<openscreen::cast::RpcMessage> message) {
554 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
555 DCHECK(message->has_acquire_demuxer_rpc());
557 int32_t audio_demuxer_handle =
558 message->acquire_demuxer_rpc().audio_demuxer_handle();
559 int32_t video_demuxer_handle =
560 message->acquire_demuxer_rpc().video_demuxer_handle();
561 has_audio_ = audio_demuxer_handle != RpcMessenger::kInvalidHandle;
562 has_video_ = video_demuxer_handle != RpcMessenger::kInvalidHandle;
564 DCHECK(has_audio_ || has_video_);
567 auto callback = base::BindPostTaskToCurrentDefault(base::BindOnce(
568 &StreamProvider::OnAudioStreamCreated, media_weak_this_));
569 main_task_runner_->PostTask(
571 base::BindOnce(&MediaStream::CreateOnMainThread, rpc_messenger_,
572 DemuxerStream::AUDIO, audio_demuxer_handle,
573 media_task_runner_, std::move(callback)));
577 auto callback = base::BindPostTaskToCurrentDefault(base::BindOnce(
578 &StreamProvider::OnVideoStreamCreated, media_weak_this_));
579 main_task_runner_->PostTask(
581 base::BindOnce(&MediaStream::CreateOnMainThread, rpc_messenger_,
582 DemuxerStream::VIDEO, video_demuxer_handle,
583 media_task_runner_, std::move(callback)));
587 void StreamProvider::OnAudioStreamCreated(MediaStream::UniquePtr stream) {
588 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
589 audio_stream_ = std::move(stream);
590 audio_stream_->Initialize(base::BindOnce(
591 &StreamProvider::OnAudioStreamInitialized, media_weak_this_));
592 InitializeDataPipe();
595 void StreamProvider::OnVideoStreamCreated(MediaStream::UniquePtr stream) {
596 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
597 video_stream_ = std::move(stream);
598 video_stream_->Initialize(base::BindOnce(
599 &StreamProvider::OnVideoStreamInitialized, media_weak_this_));
600 InitializeDataPipe();
603 void StreamProvider::InitializeDataPipe() {
604 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
606 if ((has_audio_ && !audio_stream_) || (has_video_ && !video_stream_))
609 receiver_controller_->StartDataStreams(
610 has_audio_ ? audio_stream_->BindNewPipeAndPassRemote()
611 : mojo::NullRemote(),
612 has_video_ ? video_stream_->BindNewPipeAndPassRemote()
613 : mojo::NullRemote());
616 void StreamProvider::OnAudioStreamInitialized() {
617 audio_stream_initialized_ = true;
618 CompleteInitialize();
621 void StreamProvider::OnVideoStreamInitialized() {
622 video_stream_initialized_ = true;
623 CompleteInitialize();
626 void StreamProvider::CompleteInitialize() {
627 DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
629 // Haven't receive RpcAcquireRenderer message
630 if (!has_audio_ && !has_video_)
633 if ((has_audio_ && !audio_stream_initialized_) ||
634 (has_video_ && !video_stream_initialized_) || !init_done_callback_)
637 // |init_done_callback_| should be called on |media_task_runner_|.
638 std::move(init_done_callback_).Run(PIPELINE_OK);
641 std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
642 std::vector<DemuxerStream*> streams;
644 streams.push_back(audio_stream_.get());
646 streams.push_back(video_stream_.get());
650 } // namespace remoting
655 void default_delete<media::remoting::StreamProvider>::operator()(
656 media::remoting::StreamProvider* ptr) const {