Fix FullScreen crash in Webapp
[platform/framework/web/chromium-efl.git] / media / remoting / demuxer_stream_adapter.cc
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/remoting/demuxer_stream_adapter.h"
6
7 #include <utility>
8
9 #include "base/base64.h"
10 #include "base/functional/bind.h"
11 #include "base/functional/callback_helpers.h"
12 #include "base/task/bind_post_task.h"
13 #include "base/task/sequenced_task_runner.h"
14 #include "base/task/single_thread_task_runner.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/timestamp_constants.h"
17 #include "media/cast/openscreen/remoting_proto_enum_utils.h"
18 #include "media/cast/openscreen/remoting_proto_utils.h"
19 #include "media/mojo/common/media_type_converters.h"
20
21 // Convenience logging macro used throughout this file.
22 #define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
23
24 using openscreen::cast::RpcMessenger;
25
26 namespace media {
27 namespace remoting {
28
29 namespace {
30 // base::Bind* doesn't understand openscreen::WeakPtr, so we must manually
31 // check the RpcMessenger pointer before calling into it.
32 void RegisterForRpcTask(
33     openscreen::WeakPtr<openscreen::cast::RpcMessenger> rpc_messenger,
34     int rpc_handle,
35     openscreen::cast::RpcMessenger::ReceiveMessageCallback message_cb) {
36   if (rpc_messenger) {
37     rpc_messenger->RegisterMessageReceiverCallback(rpc_handle,
38                                                    std::move(message_cb));
39   }
40 }
41 void DeregisterFromRpcTask(
42     openscreen::WeakPtr<openscreen::cast::RpcMessenger> rpc_messenger,
43     int rpc_handle) {
44   if (rpc_messenger) {
45     rpc_messenger->UnregisterMessageReceiverCallback(rpc_handle);
46   }
47 }
48 }  // namespace
49
50 DemuxerStreamAdapter::DemuxerStreamAdapter(
51     scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
52     scoped_refptr<base::SequencedTaskRunner> media_task_runner,
53     const std::string& name,
54     DemuxerStream* demuxer_stream,
55     const openscreen::WeakPtr<RpcMessenger>& rpc_messenger,
56     int rpc_handle,
57     mojo::PendingRemote<mojom::RemotingDataStreamSender> stream_sender_remote,
58     mojo::ScopedDataPipeProducerHandle producer_handle,
59     ErrorCallback error_callback)
60     : main_task_runner_(std::move(main_task_runner)),
61       media_task_runner_(std::move(media_task_runner)),
62       name_(name),
63       rpc_messenger_(rpc_messenger),
64       rpc_handle_(rpc_handle),
65       demuxer_stream_(demuxer_stream),
66       type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
67       error_callback_(std::move(error_callback)),
68       remote_callback_handle_(RpcMessenger::kInvalidHandle),
69       read_until_callback_handle_(RpcMessenger::kInvalidHandle),
70       read_until_count_(0),
71       last_count_(0),
72       pending_flush_(false),
73       media_status_(DemuxerStream::kOk),
74       data_pipe_writer_(std::move(producer_handle)),
75       bytes_written_to_pipe_(0) {
76   DCHECK(main_task_runner_);
77   DCHECK(media_task_runner_);
78   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
79   DCHECK(demuxer_stream);
80   DCHECK(!error_callback_.is_null());
81
82   RegisterForRpcMessaging();
83
84   stream_sender_.Bind(std::move(stream_sender_remote));
85   stream_sender_.set_disconnect_handler(
86       base::BindOnce(&DemuxerStreamAdapter::OnFatalError,
87                      weak_factory_.GetWeakPtr(), MOJO_DISCONNECTED));
88 }
89
90 DemuxerStreamAdapter::~DemuxerStreamAdapter() {
91   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
92   DeregisterFromRpcMessaging();
93 }
94
95 int64_t DemuxerStreamAdapter::GetBytesWrittenAndReset() {
96   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
97   const int64_t current_count = bytes_written_to_pipe_;
98   bytes_written_to_pipe_ = 0;
99   return current_count;
100 }
101
102 absl::optional<uint32_t> DemuxerStreamAdapter::SignalFlush(bool flushing) {
103   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
104   DEMUXER_VLOG(2) << "flushing=" << flushing;
105
106   // Ignores if |pending_flush_| states is same.
107   if (pending_flush_ == flushing)
108     return absl::nullopt;
109
110   // Invalidates pending Read() tasks.
111   request_buffer_weak_factory_.InvalidateWeakPtrs();
112
113   // Cancels in flight data in browser process.
114   pending_flush_ = flushing;
115   if (flushing) {
116     stream_sender_->CancelInFlightData();
117   } else {
118     // Sets callback handle invalid to abort ongoing read request.
119     read_until_callback_handle_ = RpcMessenger::kInvalidHandle;
120   }
121   return last_count_;
122 }
123
124 void DemuxerStreamAdapter::OnReceivedRpc(
125     std::unique_ptr<openscreen::cast::RpcMessage> message) {
126   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
127   DCHECK(message);
128   DCHECK(rpc_handle_ == message->handle());
129
130   switch (message->proc()) {
131     case openscreen::cast::RpcMessage::RPC_DS_INITIALIZE:
132       Initialize(message->integer_value());
133       break;
134     case openscreen::cast::RpcMessage::RPC_DS_READUNTIL:
135       ReadUntil(std::move(message));
136       break;
137     case openscreen::cast::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
138       EnableBitstreamConverter();
139       break;
140     case openscreen::cast::RpcMessage::RPC_DS_ONERROR:
141       OnFatalError(UNEXPECTED_FAILURE);
142       break;
143     default:
144       DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
145   }
146 }
147
148 void DemuxerStreamAdapter::Initialize(int remote_callback_handle) {
149   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
150   DCHECK(!pending_flush_);
151   DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle="
152                   << remote_callback_handle;
153
154   // Checks if initialization had been called or not.
155   if (remote_callback_handle_ != RpcMessenger::kInvalidHandle) {
156     DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
157                     << remote_callback_handle_
158                     << ", Given: " << remote_callback_handle;
159     // Shuts down data pipe if available if providing different remote callback
160     // handle for initialization. Otherwise, just silently ignore the duplicated
161     // request.
162     if (remote_callback_handle_ != remote_callback_handle) {
163       OnFatalError(PEERS_OUT_OF_SYNC);
164     }
165     return;
166   }
167   remote_callback_handle_ = remote_callback_handle;
168
169   // Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
170   std::unique_ptr<openscreen::cast::RpcMessage> rpc(
171       new openscreen::cast::RpcMessage());
172   rpc->set_handle(remote_callback_handle_);
173   rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
174   auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
175   init_cb_message->set_type(type_);
176   switch (type_) {
177     case DemuxerStream::Type::AUDIO: {
178       audio_config_ = demuxer_stream_->audio_decoder_config();
179       openscreen::cast::AudioDecoderConfig* audio_message =
180           init_cb_message->mutable_audio_decoder_config();
181       media::cast::ConvertAudioDecoderConfigToProto(audio_config_,
182                                                     audio_message);
183       break;
184     }
185     case DemuxerStream::Type::VIDEO: {
186       video_config_ = demuxer_stream_->video_decoder_config();
187       openscreen::cast::VideoDecoderConfig* video_message =
188           init_cb_message->mutable_video_decoder_config();
189       media::cast::ConvertVideoDecoderConfigToProto(video_config_,
190                                                     video_message);
191       break;
192     }
193     default:
194       NOTREACHED();
195   }
196
197   DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle()
198                   << " with decoder_config={"
199                   << (type_ == DemuxerStream::Type::AUDIO
200                           ? audio_config_.AsHumanReadableString()
201                           : video_config_.AsHumanReadableString())
202                   << '}';
203   main_task_runner_->PostTask(
204       FROM_HERE,
205       base::BindOnce(&RpcMessenger::SendMessageToRemote, rpc_messenger_, *rpc));
206 }
207
208 void DemuxerStreamAdapter::ReadUntil(
209     std::unique_ptr<openscreen::cast::RpcMessage> message) {
210   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
211   DCHECK(message);
212   if (!message->has_demuxerstream_readuntil_rpc()) {
213     DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
214     OnFatalError(RPC_INVALID);
215     return;
216   }
217
218   const openscreen::cast::DemuxerStreamReadUntil& rpc_message =
219       message->demuxerstream_readuntil_rpc();
220   DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle="
221                   << rpc_message.callback_handle()
222                   << ", count=" << rpc_message.count();
223
224   if (pending_flush_) {
225     DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state";
226     return;
227   }
228
229   if (is_processing_read_request()) {
230     DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state.";
231     return;
232   }
233
234   if (rpc_message.count() <= last_count_) {
235     DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to "
236                        "current frame count";
237     return;
238   }
239
240   read_until_count_ = rpc_message.count();
241   read_until_callback_handle_ = rpc_message.callback_handle();
242   RequestBuffer();
243 }
244
245 void DemuxerStreamAdapter::EnableBitstreamConverter() {
246   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
247   DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER";
248   bool is_command_sent = true;
249 #if BUILDFLAG(USE_PROPRIETARY_CODECS)
250   demuxer_stream_->EnableBitstreamConverter();
251 #else
252   is_command_sent = false;
253   DEMUXER_VLOG(1) << "Ignoring EnableBitstreamConverter() RPC: Proprietary "
254                      "codecs not enabled in this Chromium build.";
255 #endif
256
257   if (remote_callback_handle_ != RpcMessenger::kInvalidHandle) {
258     auto rpc = std::make_unique<openscreen::cast::RpcMessage>();
259     rpc->set_handle(remote_callback_handle_);
260     rpc->set_proc(
261         openscreen::cast::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER_CALLBACK);
262     rpc->set_boolean_value(is_command_sent);
263     main_task_runner_->PostTask(
264         FROM_HERE, base::BindOnce(&RpcMessenger::SendMessageToRemote,
265                                   rpc_messenger_, *rpc));
266   }
267 }
268
269 void DemuxerStreamAdapter::RequestBuffer() {
270   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
271   if (!is_processing_read_request() || pending_flush_) {
272     DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
273     return;
274   }
275   demuxer_stream_->Read(
276       1, base::BindOnce(&DemuxerStreamAdapter::OnNewBuffersRead,
277                         request_buffer_weak_factory_.GetWeakPtr()));
278 }
279
280 void DemuxerStreamAdapter::OnNewBuffersRead(
281     DemuxerStream::Status status,
282     DemuxerStream::DecoderBufferVector buffers_queue) {
283   DCHECK_LE(buffers_queue.size(), 1u)
284       << "DemuxerStreamAdapter only reads a single-buffer.";
285   OnNewBuffer(status,
286               buffers_queue.empty() ? nullptr : std::move(buffers_queue[0]));
287 }
288
289 void DemuxerStreamAdapter::OnNewBuffer(DemuxerStream::Status status,
290                                        scoped_refptr<DecoderBuffer> input) {
291   DEMUXER_VLOG(3) << "status=" << status;
292   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
293   if (!is_processing_read_request() || pending_flush_) {
294     DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
295     return;
296   }
297
298   switch (status) {
299     case DemuxerStream::kAborted:
300       DCHECK(!input);
301       SendReadAck();
302       return;
303     case DemuxerStream::kError:
304       // Currently kError can only happen because of DECRYPTION_ERROR.
305       OnFatalError(DECRYPTION_ERROR);
306       return;
307     case DemuxerStream::kConfigChanged:
308       // TODO(erickung): Notify controller of new decoder config, just in case
309       // that will require remoting to be shutdown (due to known
310       // lack-of-support).
311       // Stores available audio/video decoder config and issues
312       // RPC_DS_READUNTIL_CALLBACK RPC to notify receiver.
313       DCHECK(!input);
314       media_status_ = status;
315       if (demuxer_stream_->type() == DemuxerStream::VIDEO)
316         video_config_ = demuxer_stream_->video_decoder_config();
317       if (demuxer_stream_->type() == DemuxerStream::AUDIO)
318         audio_config_ = demuxer_stream_->audio_decoder_config();
319       SendReadAck();
320       return;
321     case DemuxerStream::kOk: {
322       media_status_ = status;
323       DCHECK(!pending_frame_);
324       if (!data_pipe_writer_.IsPipeValid())
325         return;  // Do not start sending (due to previous fatal error).
326       pending_frame_ = std::move(input);
327       WriteFrame();
328     } break;
329   }
330 }
331
332 void DemuxerStreamAdapter::WriteFrame() {
333   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
334   DCHECK(!pending_flush_);
335   DCHECK(is_processing_read_request());
336   DCHECK(pending_frame_);
337
338   if (!stream_sender_ || !data_pipe_writer_.IsPipeValid()) {
339     DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
340     return;
341   }
342
343   // Unretained is safe because `this` owns the mojo::Remote.
344   stream_sender_->SendFrame(
345       media::mojom::DecoderBuffer::From(*pending_frame_),
346       base::BindOnce(&DemuxerStreamAdapter::OnWrittenFrameRead,
347                      base::Unretained(this)));
348
349   if (!pending_frame_->end_of_stream()) {
350     data_pipe_writer_.Write(
351         pending_frame_->data(), pending_frame_->data_size(),
352         base::BindOnce(&DemuxerStreamAdapter::OnFrameWritten,
353                        base::Unretained(this)));
354   } else {
355     DemuxerStreamAdapter::OnFrameWritten(true);
356   }
357 }
358
359 void DemuxerStreamAdapter::OnFrameWritten(bool success) {
360   if (!success) {
361     OnFatalError(DATA_PIPE_WRITE_ERROR);
362     return;
363   }
364
365   was_pending_frame_written_ = true;
366   TryCompleteFrameWrite();
367 }
368
369 void DemuxerStreamAdapter::OnWrittenFrameRead() {
370   was_pending_frame_read_ = true;
371   TryCompleteFrameWrite();
372 }
373
374 void DemuxerStreamAdapter::TryCompleteFrameWrite() {
375   if (!was_pending_frame_written_ || !was_pending_frame_read_) {
376     return;
377   }
378
379   // Resets frame buffer variables.
380   const bool pending_frame_is_eos = pending_frame_->end_of_stream();
381   if (!pending_frame_is_eos) {
382     bytes_written_to_pipe_ += pending_frame_->data_size();
383   }
384
385   ++last_count_;
386   ResetPendingFrame();
387
388   // Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message.
389   if (read_until_count_ == last_count_ || pending_frame_is_eos) {
390     SendReadAck();
391     return;
392   }
393
394   // Contiune to read decoder buffer until reaching |read_until_count_| or
395   // end of stream.
396   media_task_runner_->PostTask(
397       FROM_HERE, base::BindOnce(&DemuxerStreamAdapter::RequestBuffer,
398                                 weak_factory_.GetWeakPtr()));
399 }
400
401 void DemuxerStreamAdapter::SendReadAck() {
402   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
403   DEMUXER_VLOG(3) << "last_count_=" << last_count_
404                   << ", remote_read_callback_handle="
405                   << read_until_callback_handle_
406                   << ", media_status=" << media_status_;
407   // Issues RPC_DS_READUNTIL_CALLBACK RPC message.
408   std::unique_ptr<openscreen::cast::RpcMessage> rpc(
409       new openscreen::cast::RpcMessage());
410   rpc->set_handle(read_until_callback_handle_);
411   rpc->set_proc(openscreen::cast::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
412   auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
413   message->set_count(last_count_);
414   message->set_status(
415       media::cast::ToProtoDemuxerStreamStatus(media_status_).value());
416   if (media_status_ == DemuxerStream::kConfigChanged) {
417     if (audio_config_.IsValidConfig()) {
418       openscreen::cast::AudioDecoderConfig* audio_message =
419           message->mutable_audio_decoder_config();
420       media::cast::ConvertAudioDecoderConfigToProto(audio_config_,
421                                                     audio_message);
422     } else if (video_config_.IsValidConfig()) {
423       openscreen::cast::VideoDecoderConfig* video_message =
424           message->mutable_video_decoder_config();
425       media::cast::ConvertVideoDecoderConfigToProto(video_config_,
426                                                     video_message);
427     } else {
428       NOTREACHED();
429     }
430   }
431
432   DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle()
433                   << " with count=" << message->count()
434                   << ", status=" << message->status() << ", decoder_config={"
435                   << (audio_config_.IsValidConfig()
436                           ? audio_config_.AsHumanReadableString()
437                           : video_config_.IsValidConfig()
438                                 ? video_config_.AsHumanReadableString()
439                                 : "DID NOT CHANGE")
440                   << '}';
441   main_task_runner_->PostTask(
442       FROM_HERE,
443       base::BindOnce(&RpcMessenger::SendMessageToRemote, rpc_messenger_, *rpc));
444   // Resets callback handle after completing the reading request.
445   read_until_callback_handle_ = RpcMessenger::kInvalidHandle;
446
447   // Resets audio/video decoder config since it only sends once.
448   if (audio_config_.IsValidConfig())
449     audio_config_ = AudioDecoderConfig();
450   if (video_config_.IsValidConfig())
451     video_config_ = VideoDecoderConfig();
452 }
453
454 void DemuxerStreamAdapter::ResetPendingFrame() {
455   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
456   pending_frame_.reset();
457   was_pending_frame_read_ = false;
458   was_pending_frame_written_ = false;
459 }
460
461 void DemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
462   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
463
464   DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
465
466   if (error_callback_.is_null())
467     return;
468
469   data_pipe_writer_.Close();
470
471   std::move(error_callback_).Run(stop_trigger);
472 }
473
474 void DemuxerStreamAdapter::RegisterForRpcMessaging() {
475   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
476   auto receive_callback =
477       base::BindPostTaskToCurrentDefault(base::BindRepeating(
478           &DemuxerStreamAdapter::OnReceivedRpc, weak_factory_.GetWeakPtr()));
479   main_task_runner_->PostTask(
480       FROM_HERE,
481       base::BindOnce(
482           &RegisterForRpcTask, rpc_messenger_, rpc_handle_,
483           [cb = std::move(receive_callback)](
484               std::unique_ptr<openscreen::cast::RpcMessage> message) {
485             cb.Run(std::move(message));
486           }));
487 }
488
489 void DemuxerStreamAdapter::DeregisterFromRpcMessaging() {
490   DCHECK(media_task_runner_->RunsTasksInCurrentSequence());
491   if (rpc_messenger_) {
492     main_task_runner_->PostTask(
493         FROM_HERE,
494         base::BindOnce(&DeregisterFromRpcTask, rpc_messenger_, rpc_handle_));
495   }
496 }
497
498 }  // namespace remoting
499 }  // namespace media