#define STREAM_MAP_STREAM_AVAIL_OUT_DEVICES "avail-out-devices"
#define STREAM_MAP_STREAM_AVAIL_FRAMEWORKS "avail-frameworks"
+PA_DEFINE_PRIVATE_CLASS(stream_manager_msg, pa_msgobject);
+
+enum {
+ MESSAGE_RAMP_FINISHED,
+};
+
+struct stream_manager_param {
+ pa_stream_manager *m;
+ pa_sink_input *sink_input;
+ uint32_t index;
+};
+
+/* Called from main context */
+static void process_ramp_finish(struct stream_manager_param *param) {
+ stream_ducking *sd;
+ void *state;
+
+ /* Find a context id from all the ducked stream list by this stream index.
+ * Check the number of managed streams of the context id, if it is the last one
+ * then broadcast a signal with context id.*/
+ PA_HASHMAP_FOREACH(sd, param->m->stream_duckings, state) {
+ if (!pa_idxset_get_by_data(sd->idx_ducking_streams, param->sink_input, NULL)) {
+ pa_log_debug("not found matched stream(%p, index:%u) in sd(%p)",
+ param->sink_input, param->index, sd);
+ continue;
+ }
+
+ pa_log_info("found matched stream(%p, index:%u) in sd(%p, ducking_stream_count:%d, state:%u)",
+ param->sink_input, param->index, sd, sd->ducking_stream_count, sd->state);
+
+ if (sd->ducking_stream_count <= 0)
+ continue;
+
+ /* Remove trigger when unducked */
+ if (sd->state == STREAM_DUCKING_STATE_UNDUCKING)
+ pa_idxset_remove_by_data(sd->idx_ducking_streams, (void *)param->sink_input, NULL);
+
+ /* Send signal when all streams are ducked.
+ * Note that the condition of increasing count value below is located in
+ * handle_activate_ducking() of DBus handler. */
+ if (--sd->ducking_stream_count == 0) {
+ if (sd->state == STREAM_DUCKING_STATE_DUCKING) {
+ sd->state = STREAM_DUCKING_STATE_DUCKED;
+ } else if (sd->state == STREAM_DUCKING_STATE_UNDUCKING) {
+ sd->state = STREAM_DUCKING_STATE_UNDUCKED;
+ } else {
+ pa_log_warn("sd->state(%d), already ducked or unducked, skip sending signal", sd->state);
+ continue;
+ }
+
+ pa_log_info("send signal for ramp finished - sd(%p, state:%u)", sd, sd->state);
+ send_ducking_state_changed_signal(pa_dbus_connection_get(param->m->dbus_conn), sd->trigger_index, is_stream_ducked(sd));
+ }
+ }
+}
+
static bool is_valid_notify_command(notify_command_type_t command) {
return (command < sizeof(notify_command_type_str) / sizeof(char *));
}
pa_assert(stream);
if (type == STREAM_SINK_INPUT) {
- /* Parse request formats for samplerate, channel, format infomation */
+ /* Parse request formats for samplerate, channel, format information */
if (((pa_sink_input_new_data*)stream)->req_formats) {
req_format = pa_idxset_first(((pa_sink_input_new_data*)stream)->req_formats, NULL);
if (req_format && req_format->plist) {
pa_assert(i);
pa_assert(m);
- pa_log_debug("sink-input(%p, index:%u), state(%d)", i, i->index, i->state);
+ pa_log_debug("sink-input(%p, index:%u, state:%d)", i, i->index, i->state);
switch (i->state) {
case PA_SINK_INPUT_CORKED:
return PA_HOOK_OK;
}
+static bool is_in_main_thread()
+{
+ return (getpid() == gettid());
+}
+
+/* Called from either IO thread context or main context */
+/* FIXME : make this callback be invoked from the main context only */
static pa_hook_result_t sink_input_ramp_finish_cb(pa_core *core, pa_sink_input *i, pa_stream_manager *m) {
- stream_ducking *sd;
- void *state;
+ struct stream_manager_param param;
pa_core_assert_ref(core);
pa_sink_input_assert_ref(i);
if (core->state == PA_CORE_SHUTDOWN)
return PA_HOOK_OK;
- pa_log_debug("sink-input(%p, index:%u)", i, i->index);
-
- /* Find a context id from all the ducked stream list by this stream index.
- * Check the number of managed streams of the context id, if it is the last one
- * then broadcast a signal with context id.*/
- PA_HASHMAP_FOREACH(sd, m->stream_duckings, state) {
- if (!pa_idxset_get_by_data(sd->idx_ducking_streams, i, NULL)) {
- pa_log_debug("not found matched stream(%p, index:%u) in sd(%p)", i, i->index, sd);
- continue;
- }
-
- pa_log_info("found matched stream(%p, index:%u) in sd(%p, ducking_stream_count:%d, state:%u)",
- i, i->index, sd, sd->ducking_stream_count, sd->state);
-
- if (sd->ducking_stream_count <= 0)
- continue;
-
- /* Remove trigger when unducked */
- if (sd->state == STREAM_DUCKING_STATE_UNDUCKING)
- pa_idxset_remove_by_data(sd->idx_ducking_streams, (void *)i, NULL);
+ param.m = m;
+ param.sink_input = i;
+ param.index = i->index;
- /* Send signal when all streams are ducked.
- * Note that the condition of increasing count value below is located in
- * handle_activate_ducking() of DBus handler. */
- if (--sd->ducking_stream_count == 0) {
- if (sd->state == STREAM_DUCKING_STATE_DUCKING) {
- sd->state = STREAM_DUCKING_STATE_DUCKED;
- } else if (sd->state == STREAM_DUCKING_STATE_UNDUCKING) {
- sd->state = STREAM_DUCKING_STATE_UNDUCKED;
- } else {
- pa_log_warn("sd->state(%d), already ducked or unducked, skip sending signal", sd->state);
- continue;
- }
+ pa_log_info("sink-input(%p, index:%u)", i, i->index);
- pa_log_info("send signal for ramp finished - sd(%p, state:%u)", sd, sd->state);
- send_ducking_state_changed_signal(pa_dbus_connection_get(m->dbus_conn), sd->trigger_index, is_stream_ducked(sd));
- }
+ if (is_in_main_thread()) {
+ process_ramp_finish(¶m);
+ pa_log_info("sink-input(%p, index:%u) : direct process_ramp_finish() done", i, i->index);
+ } else {
+ /* Post message to make process_ramp_finish() run from main thread */
+ pa_asyncmsgq_post(m->thread_mq.outq, PA_MSGOBJECT(m->msg), MESSAGE_RAMP_FINISHED,
+ pa_xmemdup(¶m, sizeof(struct stream_manager_param)), 0, NULL, pa_xfree);
+ pa_log_info("sink-input(%p, index:%u) : posting MESSAGE_RAMP_FINISHED done", i, i->index);
}
return PA_HOOK_OK;
return s->volume_types[stream_type == STREAM_SINK_INPUT ? STREAM_DIRECTION_OUT : STREAM_DIRECTION_IN];
}
+static int stream_manager_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct stream_manager_param *param = (struct stream_manager_param *)data;
+
+ pa_assert(param);
+ pa_assert(param->m);
+ pa_assert(param->m->core);
+
+ pa_log_info("code(%d), sink-input(%p, index:%u)", code, param->sink_input, param->index);
+
+ if (param->m->core->state == PA_CORE_SHUTDOWN)
+ return 0;
+
+ switch (code) {
+ case MESSAGE_RAMP_FINISHED:
+ process_ramp_finish(param);
+ break;
+
+ default:
+ pa_assert_not_reached();
+ }
+
+ return 0;
+}
+
pa_stream_manager* pa_stream_manager_get(pa_core *c) {
pa_stream_manager *m;
PA_REFCNT_INIT(m);
m->core = c;
+ m->rtpoll = pa_rtpoll_new();
+ if (pa_thread_mq_init(&m->thread_mq, m->core->mainloop, m->rtpoll) < 0) {
+ pa_log("pa_thread_mq_init() failed.");
+ goto fail;
+ }
+ m->msg = pa_msgobject_new(stream_manager_msg);
+ m->msg->parent.process_msg = stream_manager_process_msg;
+
if (!(m->hal = pa_hal_interface_get(c)))
goto fail;
fail:
pa_log_error("failed to initialize stream-manager");
+
+ pa_thread_mq_done(&m->thread_mq);
+ pa_rtpoll_free(m->rtpoll);
+ pa_xfree(m->msg);
+
deinit_volumes(m);
deinit_stream_map(m);
deinit_filters(m);
if (PA_REFCNT_DEC(m) > 0)
return;
+ pa_thread_mq_done(&m->thread_mq);
+ pa_rtpoll_free(m->rtpoll);
+ pa_xfree(m->msg);
+
free_hook_slots(m);
if (m->comm.comm)