client_->OnLoadDeferredUntilSourceOpen();
const auto emss_pipeline_mode = client_->GetEmssPipelineMode();
auto emss_demuxer_dispatcher = EmssDemuxerDispatcher::Create(
- emss_pipeline_mode, base::SingleThreadTaskRunner::GetCurrentDefault());
+ emss_pipeline_mode, base::SingleThreadTaskRunner::GetCurrentDefault(),
+ media_task_runner_);
auto [blink_source_dispatcher, any_thread_source_impl,
control_thread_source_impl, worker_thread_source_impl] =
WebElementaryMediaStreamSourceDispatcher::Create(
std::shared_ptr<DemuxerDispatcher> DemuxerDispatcher::Create(
PipelineMode pipeline_mode,
const scoped_refptr<base::SingleThreadTaskRunner>&
- control_thread_task_runner) {
+ control_thread_task_runner,
+ scoped_refptr<base::SingleThreadTaskRunner> worker_task_runner) {
EMSS_DEBUG_NO_INSTANCE();
+ if (!worker_task_runner) {
+ worker_task_runner = base::ThreadPool::CreateSingleThreadTaskRunner(
+ {base::TaskPriority::USER_BLOCKING,
+ base::TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN},
+ base::SingleThreadTaskRunnerThreadMode::DEDICATED);
+ }
+
auto [demuxer_dispatcher, maybe_pending_data_stream_src] = ([&]() {
if (pipeline_mode.demuxer_mode == DemuxerMode::kMediaStream) {
- return std::make_tuple(std::make_shared<DemuxerDispatcher>(
- std::nullopt, control_thread_task_runner),
- worker_thread::PendingDataStreamSrcInterfaces{});
+ return std::make_tuple(
+ std::make_shared<DemuxerDispatcher>(
+ std::nullopt, control_thread_task_runner, worker_task_runner),
+ worker_thread::PendingDataStreamSrcInterfaces{});
}
auto [pending_data_stream_src, pending_data_stream_sink] =
CreateDataStream();
auto dispatcher = std::make_shared<DemuxerDispatcher>(
- std::move(pending_data_stream_sink), control_thread_task_runner);
+ std::move(pending_data_stream_sink), control_thread_task_runner,
+ worker_task_runner);
return std::make_tuple(std::move(dispatcher),
std::move(pending_data_stream_src));
DispatchTask(&WorkerLifecycleObserver::OnDispatcherDisconnected);
}
-DemuxerDispatcher::DemuxerDispatcher(
- std::optional<PendingDataStreamSinkInterfaces> pending_data_stream_sink,
- const scoped_refptr<base::SingleThreadTaskRunner>&
- control_thread_task_runner)
- : DemuxerDispatcher(
- std::move(pending_data_stream_sink),
- control_thread_task_runner,
- base::ThreadPool::CreateSingleThreadTaskRunner(
- {base::TaskPriority::USER_BLOCKING,
- base::TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN},
- base::SingleThreadTaskRunnerThreadMode::DEDICATED)) {}
-
DemuxerDispatcher::DemuxerDispatcher(
std::optional<PendingDataStreamSinkInterfaces> pending_data_stream_sink,
const scoped_refptr<base::SingleThreadTaskRunner>&
using WorkerThreadDemuxerAdapterClient =
worker_thread::PlatformDemuxerAdapterClient;
+ // * control_task_runner is the task runner that controls playback and reports
+ // to the app (typically the main JS thread runner).
+ // * worker_task_runner is the task runner that's used to process packets (if
+ // unset a new task runner will be created).
static std::shared_ptr<DemuxerDispatcher> Create(
PipelineMode,
- const scoped_refptr<base::SingleThreadTaskRunner>&);
+ const scoped_refptr<base::SingleThreadTaskRunner>& control_task_runner,
+ scoped_refptr<base::SingleThreadTaskRunner> worker_task_runner = {});
DemuxerDispatcher(
std::optional<PendingDataStreamSinkInterfaces> pending_data_stream_sink,
const scoped_refptr<base::SingleThreadTaskRunner>&
- control_thread_task_runner);
+ control_thread_task_runner,
+ const scoped_refptr<base::SingleThreadTaskRunner>&
+ worker_thread_task_runner);
~DemuxerDispatcher() override;
std::optional<PendingDataStreamSinkInterfaces> ConsumeDataStreamSink();
return worker_thread_task_runner_;
}
- protected:
- DemuxerDispatcher(
- std::optional<PendingDataStreamSinkInterfaces> pending_data_stream_sink,
- const scoped_refptr<base::SingleThreadTaskRunner>&
- control_thread_task_runner,
- const scoped_refptr<base::SingleThreadTaskRunner>&
- worker_thread_task_runner);
-
private:
template <typename DemuxerImplT>
static void SetAnyClients(
#include <sstream>
+#include "base/check.h"
#include "base/functional/bind.h"
#include "base/functional/callback_helpers.h"
#include "base/task/sequenced_task_runner.h"
void StartInitializingVideoDecoder(const media::VideoDecoderConfig&);
private:
- static std::unique_ptr<media::VideoDecoder>
- CreateVideoDecoderOnGpuFactoriesThread(
- media::GpuVideoAcceleratorFactories* gpu_factories,
- media::MediaLog* media_log);
void OnVideoDecoderInitialized(std::unique_ptr<media::VideoDecoder>,
media::DecoderStatus);
return;
}
- gpu_factories_->GetTaskRunner()->PostTaskAndReplyWithResult(
- FROM_HERE,
- base::BindOnce(&MsDecodingStream::VideoStream::
- CreateVideoDecoderOnGpuFactoriesThread,
- gpu_factories_, media_log_),
- base::BindOnce(&MsDecodingStream::VideoStream::InitializeDecoder,
- weak_ptr_factory_.GetWeakPtr(), video_conf));
-}
-
-// static
-std::unique_ptr<media::VideoDecoder>
-MsDecodingStream::VideoStream::CreateVideoDecoderOnGpuFactoriesThread(
- media::GpuVideoAcceleratorFactories* gpu_factories,
- media::MediaLog* media_log) {
- EMSS_DEBUG_NO_INSTANCE();
- return gpu_factories->CreateVideoDecoder(
- media_log, base::BindRepeating(&OnRequestOverlayInfo));
+ auto decoder = gpu_factories_->CreateVideoDecoder(
+ media_log_, base::BindRepeating(&OnRequestOverlayInfo));
+ InitializeDecoder(video_conf, std::move(decoder));
}
void MsDecodingStream::VideoStream::OnDecoderInitialized(
media::GpuVideoAcceleratorFactories* gpu_factories) {
EMSS_DEBUG() << gpu_factories;
+ if (!gpu_factories) {
+ EMSS_LOG(ERROR) << "GPU factories are empty?";
+ EmitError(BackendError::kPipelineError,
+ "Video acceleration failed to initialize.");
+ return;
+ }
+
gpu_factories_ = gpu_factories;
+ CHECK(gpu_factories_->GetTaskRunner() ==
+ base::SingleThreadTaskRunner::GetCurrentDefault())
+ << "worker_thread must be GPU factories thread or MojoVideoDecoder may "
+ "crash.";
+
if (video_stream_)
video_stream_->SetGpuFactories(gpu_factories);
}