Fix FullScreen crash in Webapp
[platform/framework/web/chromium-efl.git] / media / remoting / stream_provider.cc
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/remoting/stream_provider.h"
6 #include <vector>
7
8 #include "base/containers/circular_deque.h"
9 #include "base/functional/bind.h"
10 #include "base/functional/callback.h"
11 #include "base/functional/callback_helpers.h"
12 #include "base/logging.h"
13 #include "base/task/bind_post_task.h"
14 #include "base/task/sequenced_task_runner.h"
15 #include "base/task/single_thread_task_runner.h"
16 #include "media/base/decoder_buffer.h"
17 #include "media/base/demuxer.h"
18 #include "media/base/video_transformation.h"
19 #include "media/cast/openscreen/remoting_proto_enum_utils.h"
20 #include "media/cast/openscreen/remoting_proto_utils.h"
21 #include "media/mojo/common/mojo_decoder_buffer_converter.h"
22 #include "media/remoting/receiver_controller.h"
23 #include "third_party/openscreen/src/cast/streaming/rpc_messenger.h"
24
25 using openscreen::cast::RpcMessenger;
26
27 namespace media {
28 namespace remoting {
29
30 namespace {
31 // The number of frames requested in each ReadUntil RPC message.
32 constexpr int kNumFramesInEachReadUntil = 10;
33 }
34
35 // static
36 void StreamProvider::MediaStream::CreateOnMainThread(
37     RpcMessenger* rpc_messenger,
38     Type type,
39     int32_t handle,
40     const scoped_refptr<base::SequencedTaskRunner>& media_task_runner,
41     base::OnceCallback<void(MediaStream::UniquePtr)> callback) {
42   MediaStream::UniquePtr stream(
43       new MediaStream(rpc_messenger, type, handle, media_task_runner),
44       &DestructionHelper);
45   std::move(callback).Run(std::move(stream));
46 }
47
48 // static
49 void StreamProvider::MediaStream::DestructionHelper(MediaStream* stream) {
50   stream->Destroy();
51 }
52
53 StreamProvider::MediaStream::MediaStream(
54     RpcMessenger* rpc_messenger,
55     Type type,
56     int remote_handle,
57     const scoped_refptr<base::SequencedTaskRunner>& media_task_runner)
58     : main_task_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()),
59       media_task_runner_(media_task_runner),
60       rpc_messenger_(rpc_messenger),
61       type_(type),
62       remote_handle_(remote_handle),
63       rpc_handle_(rpc_messenger_->GetUniqueHandle()) {
64   DCHECK(remote_handle_ != RpcMessenger::kInvalidHandle);
65
66   media_weak_this_ = media_weak_factory_.GetWeakPtr();
67
68   auto receive_callback = base::BindPostTask(
69       media_task_runner_,
70       BindRepeating(&MediaStream::OnReceivedRpc, media_weak_this_));
71   rpc_messenger_->RegisterMessageReceiverCallback(
72       rpc_handle_, [cb = std::move(receive_callback)](
73                        std::unique_ptr<openscreen::cast::RpcMessage> message) {
74         cb.Run(std::move(message));
75       });
76 }
77
78 StreamProvider::MediaStream::~MediaStream() {
79   DCHECK(main_task_runner_->BelongsToCurrentThread());
80   rpc_messenger_->UnregisterMessageReceiverCallback(rpc_handle_);
81 }
82
83 void StreamProvider::MediaStream::Destroy() {
84   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
85
86   // Invalid weak pointers to prevent |this| from receiving RPC calls on the
87   // media thread.
88   media_weak_factory_.InvalidateWeakPtrs();
89
90   // Unbind all mojo pipes and bindings.
91   receiver_.reset();
92   decoder_buffer_reader_.reset();
93
94   // After invalidating all weak ptrs of |media_weak_factory_|, MediaStream
95   // won't be access anymore, so using |this| here is safe.
96   main_task_runner_->DeleteSoon(FROM_HERE, this);
97 }
98
99 void StreamProvider::MediaStream::SendRpcMessageOnMainThread(
100     std::unique_ptr<openscreen::cast::RpcMessage> message) {
101   // |rpc_messenger_| is owned by |receiver_controller_| which is a singleton
102   // per process, so it's safe to use Unretained() here.
103   main_task_runner_->PostTask(
104       FROM_HERE, base::BindOnce(&RpcMessenger::SendMessageToRemote,
105                                 base::Unretained(rpc_messenger_), *message));
106 }
107
108 void StreamProvider::MediaStream::Initialize(
109     base::OnceClosure init_done_callback) {
110   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
111   DCHECK(init_done_callback);
112
113   if (init_done_callback_) {
114     OnError("Duplicate initialization");
115     return;
116   }
117
118   init_done_callback_ = std::move(init_done_callback);
119
120   auto rpc = std::make_unique<openscreen::cast::RpcMessage>();
121   rpc->set_handle(remote_handle_);
122   rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE);
123   rpc->set_integer_value(rpc_handle_);
124   SendRpcMessageOnMainThread(std::move(rpc));
125 }
126
127 void StreamProvider::MediaStream::InitializeDataPipe(
128     mojo::ScopedDataPipeConsumerHandle data_pipe) {
129   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
130
131   decoder_buffer_reader_ =
132       std::make_unique<MojoDecoderBufferReader>(std::move(data_pipe));
133   CompleteInitialize();
134 }
135
136 void StreamProvider::MediaStream::ReceiveFrame(uint32_t count,
137                                                mojom::DecoderBufferPtr buffer) {
138   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
139   DCHECK(decoder_buffer_reader_);
140
141   auto callback = base::BindPostTaskToCurrentDefault(
142       base::BindOnce(&MediaStream::AppendBuffer, media_weak_this_, count));
143   decoder_buffer_reader_->ReadDecoderBuffer(std::move(buffer),
144                                             std::move(callback));
145 }
146
147 void StreamProvider::MediaStream::FlushUntil(uint32_t count) {
148   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
149
150   if (count < current_frame_count_)
151     return;
152
153   uint32_t buffers_to_erase = count - current_frame_count_;
154
155   if (buffers_to_erase > buffers_.size()) {
156     buffers_.clear();
157   } else {
158     buffers_.erase(buffers_.begin(), buffers_.begin() + buffers_to_erase);
159   }
160
161   current_frame_count_ = count;
162
163   if (!read_complete_callback_.is_null())
164     CompleteRead(DemuxerStream::kAborted);
165
166   read_until_sent_ = false;
167 }
168
169 void StreamProvider::MediaStream::OnReceivedRpc(
170     std::unique_ptr<openscreen::cast::RpcMessage> message) {
171   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
172   DCHECK(message->handle() == rpc_handle_);
173
174   switch (message->proc()) {
175     case openscreen::cast::RpcMessage::RPC_DS_INITIALIZE_CALLBACK:
176       OnInitializeCallback(std::move(message));
177       break;
178     case openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK:
179       OnReadUntilCallback(std::move(message));
180       break;
181     default:
182       VLOG(3) << __func__ << "Unknow RPC message.";
183   }
184 }
185
186 void StreamProvider::MediaStream::OnInitializeCallback(
187     std::unique_ptr<openscreen::cast::RpcMessage> message) {
188   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
189   const openscreen::cast::DemuxerStreamInitializeCallback callback_message =
190       message->demuxerstream_initializecb_rpc();
191   if (callback_message.type() != type_) {
192     OnError("Wrong type");
193     return;
194   }
195
196   if ((type_ == DemuxerStream::AUDIO &&
197        audio_decoder_config_.IsValidConfig()) ||
198       (type_ == DemuxerStream::VIDEO &&
199        video_decoder_config_.IsValidConfig())) {
200     OnError("Duplicate initialization");
201     return;
202   }
203
204   if (type_ == DemuxerStream::AUDIO &&
205       callback_message.has_audio_decoder_config()) {
206     const openscreen::cast::AudioDecoderConfig audio_message =
207         callback_message.audio_decoder_config();
208     media::cast::ConvertProtoToAudioDecoderConfig(audio_message,
209                                                   &audio_decoder_config_);
210     if (!audio_decoder_config_.IsValidConfig()) {
211       OnError("Invalid audio config");
212       return;
213     }
214   } else if (type_ == DemuxerStream::VIDEO &&
215              callback_message.has_video_decoder_config()) {
216     const openscreen::cast::VideoDecoderConfig video_message =
217         callback_message.video_decoder_config();
218     media::cast::ConvertProtoToVideoDecoderConfig(video_message,
219                                                   &video_decoder_config_);
220     if (!video_decoder_config_.IsValidConfig()) {
221       OnError("Invalid video config");
222       return;
223     }
224   } else {
225     OnError("Config missing");
226     return;
227   }
228
229   rpc_initialized_ = true;
230   CompleteInitialize();
231 }
232
233 void StreamProvider::MediaStream::CompleteInitialize() {
234   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
235
236   // Initialization finished when received RPC_DS_INITIALIZE_CALLBACK and
237   // |decoder_buffer_reader_| is created.
238   if (!rpc_initialized_ || !decoder_buffer_reader_)
239     return;
240
241   if (!init_done_callback_) {
242     OnError("Initialize callback missing");
243     return;
244   }
245
246   std::move(init_done_callback_).Run();
247 }
248
249 void StreamProvider::MediaStream::OnReadUntilCallback(
250     std::unique_ptr<openscreen::cast::RpcMessage> message) {
251   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
252
253   if (!read_until_sent_) {
254     OnError("Unexpected ReadUntilCallback");
255     return;
256   }
257   read_until_sent_ = false;
258   const openscreen::cast::DemuxerStreamReadUntilCallback callback_message =
259       message->demuxerstream_readuntilcb_rpc();
260   total_received_frame_count_ = callback_message.count();
261
262   if (media::cast::ToDemuxerStreamStatus(callback_message.status()) ==
263       kConfigChanged) {
264     if (callback_message.has_audio_decoder_config()) {
265       const openscreen::cast::AudioDecoderConfig audio_message =
266           callback_message.audio_decoder_config();
267       UpdateAudioConfig(audio_message);
268     }
269
270     if (callback_message.has_video_decoder_config()) {
271       const openscreen::cast::VideoDecoderConfig video_message =
272           callback_message.video_decoder_config();
273       UpdateVideoConfig(video_message);
274     }
275
276     if (buffers_.empty() && !read_complete_callback_.is_null())
277       CompleteRead(DemuxerStream::kConfigChanged);
278
279     return;
280   }
281
282   if (buffers_.empty() && !read_complete_callback_.is_null())
283     SendReadUntil();
284 }
285
286 void StreamProvider::MediaStream::UpdateAudioConfig(
287     const openscreen::cast::AudioDecoderConfig& audio_message) {
288   DCHECK(type_ == AUDIO);
289   AudioDecoderConfig audio_config;
290   media::cast::ConvertProtoToAudioDecoderConfig(audio_message, &audio_config);
291   if (!audio_config.IsValidConfig()) {
292     OnError("Invalid audio config");
293     return;
294   }
295   next_audio_decoder_config_ = audio_config;
296 }
297
298 void StreamProvider::MediaStream::UpdateVideoConfig(
299     const openscreen::cast::VideoDecoderConfig& video_message) {
300   DCHECK(type_ == VIDEO);
301   VideoDecoderConfig video_config;
302   media::cast::ConvertProtoToVideoDecoderConfig(video_message, &video_config);
303   if (!video_config.IsValidConfig()) {
304     OnError("Invalid video config");
305     return;
306   }
307   next_video_decoder_config_ = video_config;
308 }
309
310 void StreamProvider::MediaStream::SendReadUntil() {
311   if (read_until_sent_)
312     return;
313
314   std::unique_ptr<openscreen::cast::RpcMessage> rpc(
315       new openscreen::cast::RpcMessage());
316   rpc->set_handle(remote_handle_);
317   rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_READUNTIL);
318   auto* message = rpc->mutable_demuxerstream_readuntil_rpc();
319   message->set_count(total_received_frame_count_ + kNumFramesInEachReadUntil);
320   message->set_callback_handle(rpc_handle_);
321   SendRpcMessageOnMainThread(std::move(rpc));
322   read_until_sent_ = true;
323 }
324
325 // Only return one buffer at a time so we ignore the count.
326 void StreamProvider::MediaStream::Read(uint32_t /*count*/, ReadCB read_cb) {
327   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
328   DCHECK(read_complete_callback_.is_null());
329   DCHECK(read_cb);
330
331   read_complete_callback_ = std::move(read_cb);
332   if (buffers_.empty() && (next_audio_decoder_config_.IsValidConfig() ||
333                            next_video_decoder_config_.IsValidConfig())) {
334     CompleteRead(DemuxerStream::kConfigChanged);
335     return;
336   }
337
338   // Wait for more data.
339   if (buffers_.empty()) {
340     SendReadUntil();
341     return;
342   }
343
344   CompleteRead(DemuxerStream::kOk);
345 }
346
347 void StreamProvider::MediaStream::CompleteRead(DemuxerStream::Status status) {
348   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
349
350   switch (status) {
351     case DemuxerStream::kConfigChanged:
352       if (next_audio_decoder_config_.IsValidConfig()) {
353         audio_decoder_config_ = next_audio_decoder_config_;
354         next_audio_decoder_config_ = media::AudioDecoderConfig();
355       }
356       if (next_video_decoder_config_.IsValidConfig()) {
357         video_decoder_config_ = next_video_decoder_config_;
358         next_video_decoder_config_ = media::VideoDecoderConfig();
359       }
360       std::move(read_complete_callback_).Run(status, {});
361       return;
362     case DemuxerStream::kAborted:
363     case DemuxerStream::kError:
364       std::move(read_complete_callback_).Run(status, {});
365       return;
366     case DemuxerStream::kOk:
367       DCHECK(read_complete_callback_);
368       DCHECK(!buffers_.empty());
369       DCHECK_LT(current_frame_count_, buffered_frame_count_);
370       scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
371       buffers_.pop_front();
372       ++current_frame_count_;
373       std::move(read_complete_callback_).Run(status, {frame_data});
374       return;
375   }
376 }
377
378 AudioDecoderConfig StreamProvider::MediaStream::audio_decoder_config() {
379   DCHECK(type_ == DemuxerStream::AUDIO);
380   return audio_decoder_config_;
381 }
382
383 VideoDecoderConfig StreamProvider::MediaStream::video_decoder_config() {
384   DCHECK(type_ == DemuxerStream::VIDEO);
385   return video_decoder_config_;
386 }
387
388 DemuxerStream::Type StreamProvider::MediaStream::type() const {
389   return type_;
390 }
391
392 StreamLiveness StreamProvider::MediaStream::liveness() const {
393   return StreamLiveness::kLive;
394 }
395
396 bool StreamProvider::MediaStream::SupportsConfigChanges() {
397   return true;
398 }
399
400 void StreamProvider::MediaStream::AppendBuffer(
401     uint32_t count,
402     scoped_refptr<DecoderBuffer> buffer) {
403   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
404
405   // Drop flushed frame.
406   if (count < current_frame_count_)
407     return;
408
409   // Continuity check.
410   DCHECK(buffers_.empty() || buffered_frame_count_ == count);
411
412   buffers_.push_back(buffer);
413   buffered_frame_count_ = count + 1;
414
415   if (!read_complete_callback_.is_null())
416     CompleteRead(DemuxerStream::kOk);
417 }
418
419 void StreamProvider::MediaStream::OnError(const std::string& error) {
420   auto rpc = std::make_unique<openscreen::cast::RpcMessage>();
421   rpc->set_handle(remote_handle_);
422   rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_ONERROR);
423   SendRpcMessageOnMainThread(std::move(rpc));
424 }
425
426 StreamProvider::StreamProvider(
427     ReceiverController* receiver_controller,
428     const scoped_refptr<base::SequencedTaskRunner>& media_task_runner)
429     : main_task_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()),
430       media_task_runner_(media_task_runner),
431       receiver_controller_(receiver_controller),
432       rpc_messenger_(receiver_controller_->rpc_messenger()) {
433   DCHECK(receiver_controller_);
434   DCHECK(rpc_messenger_);
435
436   media_weak_this_ = media_weak_factory_.GetWeakPtr();
437
438   auto callback = base::BindPostTask(
439       media_task_runner_,
440       base::BindRepeating(&StreamProvider::OnReceivedRpc, media_weak_this_));
441   rpc_messenger_->RegisterMessageReceiverCallback(
442       RpcMessenger::kAcquireDemuxerHandle,
443       [cb = std::move(callback)](
444           std::unique_ptr<openscreen::cast::RpcMessage> message) {
445         cb.Run(std::move(message));
446       });
447 }
448
449 StreamProvider::~StreamProvider() {
450   DCHECK(main_task_runner_->BelongsToCurrentThread());
451   rpc_messenger_->UnregisterMessageReceiverCallback(
452       RpcMessenger::kAcquireDemuxerHandle);
453 }
454
455 std::string StreamProvider::GetDisplayName() const {
456   return "media::remoting::StreamProvider";
457 }
458
459 DemuxerType StreamProvider::GetDemuxerType() const {
460   return DemuxerType::kStreamProviderDemuxer;
461 }
462
463 void StreamProvider::Initialize(DemuxerHost* host,
464                                 PipelineStatusCallback status_cb) {
465   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
466   init_done_callback_ = std::move(status_cb);
467   CompleteInitialize();
468 }
469
470 void StreamProvider::AbortPendingReads() {}
471
472 void StreamProvider::StartWaitingForSeek(base::TimeDelta seek_time) {}
473
474 void StreamProvider::CancelPendingSeek(base::TimeDelta seek_time) {}
475
476 void StreamProvider::Seek(base::TimeDelta time,
477                           PipelineStatusCallback seek_cb) {
478   media_task_runner_->PostTask(FROM_HERE,
479                                base::BindOnce(std::move(seek_cb), PIPELINE_OK));
480 }
481
482 bool StreamProvider::IsSeekable() const {
483   return false;
484 }
485
486 void StreamProvider::Stop() {}
487
488 base::TimeDelta StreamProvider::GetStartTime() const {
489   return base::TimeDelta();
490 }
491
492 base::Time StreamProvider::GetTimelineOffset() const {
493   return base::Time();
494 }
495
496 int64_t StreamProvider::GetMemoryUsage() const {
497   return 0;
498 }
499
500 absl::optional<container_names::MediaContainerName>
501 StreamProvider::GetContainerForMetrics() const {
502   return absl::optional<container_names::MediaContainerName>();
503 }
504
505 void StreamProvider::OnEnabledAudioTracksChanged(
506     const std::vector<MediaTrack::Id>& track_ids,
507     base::TimeDelta curr_time,
508     TrackChangeCB change_completed_cb) {
509   std::vector<DemuxerStream*> streams;
510   std::move(change_completed_cb).Run(DemuxerStream::AUDIO, streams);
511   DVLOG(1) << "Track changes are not supported.";
512 }
513
514 void StreamProvider::OnSelectedVideoTrackChanged(
515     const std::vector<MediaTrack::Id>& track_ids,
516     base::TimeDelta curr_time,
517     TrackChangeCB change_completed_cb) {
518   std::vector<DemuxerStream*> streams;
519   std::move(change_completed_cb).Run(DemuxerStream::VIDEO, streams);
520   DVLOG(1) << "Track changes are not supported.";
521 }
522
523 void StreamProvider::Destroy() {
524   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
525
526   if (init_done_callback_)
527     std::move(init_done_callback_).Run(PIPELINE_ERROR_ABORT);
528
529   // Invalid weak pointers to prevent |this| from receiving RPC calls on the
530   // media thread.
531   media_weak_factory_.InvalidateWeakPtrs();
532
533   audio_stream_.reset();
534   video_stream_.reset();
535
536   // After invalidating all weak ptrs of |media_weak_factory_|, StreamProvider
537   // won't be access anymore, so using |this| here is safe.
538   main_task_runner_->DeleteSoon(FROM_HERE, this);
539 }
540
541 void StreamProvider::OnReceivedRpc(
542     std::unique_ptr<openscreen::cast::RpcMessage> message) {
543   switch (message->proc()) {
544     case openscreen::cast::RpcMessage::RPC_ACQUIRE_DEMUXER:
545       OnAcquireDemuxer(std::move(message));
546       break;
547     default:
548       VLOG(3) << __func__ << "Unknown RPC message.";
549   }
550 }
551
552 void StreamProvider::OnAcquireDemuxer(
553     std::unique_ptr<openscreen::cast::RpcMessage> message) {
554   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
555   DCHECK(message->has_acquire_demuxer_rpc());
556
557   int32_t audio_demuxer_handle =
558       message->acquire_demuxer_rpc().audio_demuxer_handle();
559   int32_t video_demuxer_handle =
560       message->acquire_demuxer_rpc().video_demuxer_handle();
561   has_audio_ = audio_demuxer_handle != RpcMessenger::kInvalidHandle;
562   has_video_ = video_demuxer_handle != RpcMessenger::kInvalidHandle;
563
564   DCHECK(has_audio_ || has_video_);
565
566   if (has_audio_) {
567     auto callback = base::BindPostTaskToCurrentDefault(base::BindOnce(
568         &StreamProvider::OnAudioStreamCreated, media_weak_this_));
569     main_task_runner_->PostTask(
570         FROM_HERE,
571         base::BindOnce(&MediaStream::CreateOnMainThread, rpc_messenger_,
572                        DemuxerStream::AUDIO, audio_demuxer_handle,
573                        media_task_runner_, std::move(callback)));
574   }
575
576   if (has_video_) {
577     auto callback = base::BindPostTaskToCurrentDefault(base::BindOnce(
578         &StreamProvider::OnVideoStreamCreated, media_weak_this_));
579     main_task_runner_->PostTask(
580         FROM_HERE,
581         base::BindOnce(&MediaStream::CreateOnMainThread, rpc_messenger_,
582                        DemuxerStream::VIDEO, video_demuxer_handle,
583                        media_task_runner_, std::move(callback)));
584   }
585 }
586
587 void StreamProvider::OnAudioStreamCreated(MediaStream::UniquePtr stream) {
588   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
589   audio_stream_ = std::move(stream);
590   audio_stream_->Initialize(base::BindOnce(
591       &StreamProvider::OnAudioStreamInitialized, media_weak_this_));
592   InitializeDataPipe();
593 }
594
595 void StreamProvider::OnVideoStreamCreated(MediaStream::UniquePtr stream) {
596   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
597   video_stream_ = std::move(stream);
598   video_stream_->Initialize(base::BindOnce(
599       &StreamProvider::OnVideoStreamInitialized, media_weak_this_));
600   InitializeDataPipe();
601 }
602
603 void StreamProvider::InitializeDataPipe() {
604   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
605
606   if ((has_audio_ && !audio_stream_) || (has_video_ && !video_stream_))
607     return;
608
609   receiver_controller_->StartDataStreams(
610       has_audio_ ? audio_stream_->BindNewPipeAndPassRemote()
611                  : mojo::NullRemote(),
612       has_video_ ? video_stream_->BindNewPipeAndPassRemote()
613                  : mojo::NullRemote());
614 }
615
616 void StreamProvider::OnAudioStreamInitialized() {
617   audio_stream_initialized_ = true;
618   CompleteInitialize();
619 }
620
621 void StreamProvider::OnVideoStreamInitialized() {
622   video_stream_initialized_ = true;
623   CompleteInitialize();
624 }
625
626 void StreamProvider::CompleteInitialize() {
627   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
628
629   // Haven't receive RpcAcquireRenderer message
630   if (!has_audio_ && !has_video_)
631     return;
632
633   if ((has_audio_ && !audio_stream_initialized_) ||
634       (has_video_ && !video_stream_initialized_) || !init_done_callback_)
635     return;
636
637   // |init_done_callback_| should be called on |media_task_runner_|.
638   std::move(init_done_callback_).Run(PIPELINE_OK);
639 }
640
641 std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
642   std::vector<DemuxerStream*> streams;
643   if (audio_stream_)
644     streams.push_back(audio_stream_.get());
645   if (video_stream_)
646     streams.push_back(video_stream_.get());
647   return streams;
648 }
649
650 }  // namespace remoting
651 }  // namespace media
652
653 namespace std {
654
655 void default_delete<media::remoting::StreamProvider>::operator()(
656     media::remoting::StreamProvider* ptr) const {
657   ptr->Destroy();
658 }
659
660 }  // namespace std