return 0;
}
-
- case PA_SINK_MESSAGE_SET_STATE:
-
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
- case PA_SINK_SUSPENDED: {
- pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
-
- suspend(u);
-
- break;
- }
-
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING: {
- int r;
-
- if (u->sink->thread_info.state == PA_SINK_INIT) {
- if (build_pollfd(u) < 0)
- /* FIXME: This will cause an assertion failure in
- * pa_sink_put(), because with the current design
- * pa_sink_put() is not allowed to fail. */
- return -PA_ERR_IO;
- }
-
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
- if ((r = unsuspend(u)) < 0)
- return r;
- }
-
- break;
- }
-
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- case PA_SINK_INVALID_STATE:
- ;
- }
-
- break;
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+
+ case PA_SINK_SUSPENDED: {
+ pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+
+ suspend(u);
+
+ break;
+ }
+
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING: {
+ int r;
+
+ if (u->sink->thread_info.state == PA_SINK_INIT) {
+ if (build_pollfd(u) < 0)
+ /* FIXME: This will cause an assertion failure, because
+ * with the current design pa_sink_put() is not allowed
+ * to fail and pa_sink_put() has no fallback code that
+ * would start the sink suspended if opening the device
+ * fails. */
+ return -PA_ERR_IO;
+ }
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+ if ((r = unsuspend(u)) < 0)
+ return r;
+ }
+
+ break;
+ }
+
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ case PA_SINK_INVALID_STATE:
+ break;
+ }
+
+ return 0;
+}
+
static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
struct userdata *u = snd_mixer_elem_get_callback_private(elem);
if (u->use_tsched)
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
if (u->ucm_context)
u->sink->set_port = sink_set_port_ucm_cb;
else
return 0;
}
-
- case PA_SOURCE_MESSAGE_SET_STATE:
-
- switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
-
- case PA_SOURCE_SUSPENDED: {
- pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
-
- suspend(u);
-
- break;
- }
-
- case PA_SOURCE_IDLE:
- case PA_SOURCE_RUNNING: {
- int r;
-
- if (u->source->thread_info.state == PA_SOURCE_INIT) {
- if (build_pollfd(u) < 0)
- /* FIXME: This will cause an assertion failure in
- * pa_source_put(), because with the current design
- * pa_source_put() is not allowed to fail. */
- return -PA_ERR_IO;
- }
-
- if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
- if ((r = unsuspend(u)) < 0)
- return r;
- }
-
- break;
- }
-
- case PA_SOURCE_UNLINKED:
- case PA_SOURCE_INIT:
- case PA_SOURCE_INVALID_STATE:
- ;
- }
-
- break;
}
return pa_source_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+
+ case PA_SOURCE_SUSPENDED: {
+ pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
+
+ suspend(u);
+
+ break;
+ }
+
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING: {
+ int r;
+
+ if (u->source->thread_info.state == PA_SOURCE_INIT) {
+ if (build_pollfd(u) < 0)
+ /* FIXME: This will cause an assertion failure, because
+ * with the current design pa_source_put() is not allowed
+ * to fail and pa_source_put() has no fallback code that
+ * would start the source suspended if opening the device
+ * fails. */
+ return -PA_ERR_IO;
+ }
+
+ if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
+ if ((r = unsuspend(u)) < 0)
+ return r;
+ }
+
+ break;
+ }
+
+ case PA_SOURCE_UNLINKED:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_INVALID_STATE:
+ ;
+ }
+
+ return 0;
+}
+
static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) {
struct userdata *u = snd_mixer_elem_get_callback_private(elem);
if (u->use_tsched)
u->source->update_requested_latency = source_update_requested_latency_cb;
u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
if (u->ucm_context)
u->source->set_port = source_set_port_ucm_cb;
else
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE:
-
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
- case PA_SINK_SUSPENDED:
- /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
- if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
- break;
-
- /* Stop the device if the source is suspended as well */
- if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
- /* We deliberately ignore whether stopping
- * actually worked. Since the stream_fd is
- * closed it doesn't really matter */
- bt_transport_release(u);
-
- break;
-
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING:
- if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
- break;
-
- /* Resume the device if the source was suspended as well */
- if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
- if (bt_transport_acquire(u, false) < 0)
- return -1;
- else
- setup_stream(u);
- }
- break;
-
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- case PA_SINK_INVALID_STATE:
- ;
- }
- break;
-
case PA_SINK_MESSAGE_GET_LATENCY: {
if (u->read_smoother) {
return pa_sink_process_msg(o, code, data, offset, chunk);
}
-/* Run from IO thread */
-static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
- struct userdata *u = PA_SOURCE(o)->userdata;
-
- pa_assert(u->source == PA_SOURCE(o));
- pa_assert(u->transport);
-
- switch (code) {
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
- case PA_SOURCE_MESSAGE_SET_STATE:
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
- switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
+ switch (new_state) {
- case PA_SOURCE_SUSPENDED:
- /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
- if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
- break;
+ case PA_SINK_SUSPENDED:
+ /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
+ if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
+ break;
- /* Stop the device if the sink is suspended as well */
- if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
- bt_transport_release(u);
+ /* Stop the device if the source is suspended as well */
+ if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
+ /* We deliberately ignore whether stopping
+ * actually worked. Since the stream_fd is
+ * closed it doesn't really matter */
+ bt_transport_release(u);
- if (u->read_smoother)
- pa_smoother_pause(u->read_smoother, pa_rtclock_now());
- break;
+ break;
- case PA_SOURCE_IDLE:
- case PA_SOURCE_RUNNING:
- if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
- break;
-
- /* Resume the device if the sink was suspended as well */
- if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
- if (bt_transport_acquire(u, false) < 0)
- return -1;
- else
- setup_stream(u);
- }
- /* We don't resume the smoother here. Instead we
- * wait until the first packet arrives */
- break;
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING:
+ if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
+ break;
- case PA_SOURCE_UNLINKED:
- case PA_SOURCE_INIT:
- case PA_SOURCE_INVALID_STATE:
- ;
+ /* Resume the device if the source was suspended as well */
+ if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
+ if (bt_transport_acquire(u, false) < 0)
+ return -1;
+ else
+ setup_stream(u);
}
break;
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ case PA_SINK_INVALID_STATE:
+ break;
+ }
+
+ return 0;
+}
+
+/* Run from IO thread */
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct userdata *u = PA_SOURCE(o)->userdata;
+
+ pa_assert(u->source == PA_SOURCE(o));
+ pa_assert(u->transport);
+
+ switch (code) {
+
case PA_SOURCE_MESSAGE_GET_LATENCY: {
int64_t wi, ri;
return pa_source_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+
+ case PA_SOURCE_SUSPENDED:
+ /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
+ if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+ break;
+
+ /* Stop the device if the sink is suspended as well */
+ if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
+ bt_transport_release(u);
+
+ if (u->read_smoother)
+ pa_smoother_pause(u->read_smoother, pa_rtclock_now());
+ break;
+
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING:
+ if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
+ break;
+
+ /* Resume the device if the sink was suspended as well */
+ if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
+ if (bt_transport_acquire(u, false) < 0)
+ return -1;
+ else
+ setup_stream(u);
+ }
+ /* We don't resume the smoother here. Instead we
+ * wait until the first packet arrives */
+ break;
+
+ case PA_SOURCE_UNLINKED:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_INVALID_STATE:
+ break;
+ }
+
+ return 0;
+}
+
/* Called from main thread context */
static int device_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct bluetooth_msg *u = BLUETOOTH_MSG(obj);
u->sink->userdata = u;
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->set_port = sink_set_port_cb;
}
u->source->userdata = u;
u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
u->source->set_port = source_set_port_cb;
}
switch (code) {
- case PA_SOURCE_MESSAGE_SET_STATE:
-
- switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
-
- case PA_SOURCE_SUSPENDED:
- /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
- if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
- break;
-
- /* Stop the device if the sink is suspended as well */
- if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
- transport_release(u);
-
- if (u->read_smoother)
- pa_smoother_pause(u->read_smoother, pa_rtclock_now());
-
- break;
-
- case PA_SOURCE_IDLE:
- case PA_SOURCE_RUNNING:
- if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
- break;
-
- /* Resume the device if the sink was suspended as well */
- if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
- if (!setup_transport_and_stream(u))
- return -1;
- }
-
- /* We don't resume the smoother here. Instead we
- * wait until the first packet arrives */
-
- break;
-
- case PA_SOURCE_UNLINKED:
- case PA_SOURCE_INIT:
- case PA_SOURCE_INVALID_STATE:
- break;
- }
-
- break;
-
case PA_SOURCE_MESSAGE_GET_LATENCY: {
int64_t wi, ri;
return pa_source_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+
+ case PA_SOURCE_SUSPENDED:
+ /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
+ if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+ break;
+
+ /* Stop the device if the sink is suspended as well */
+ if (!u->sink || u->sink->state == PA_SINK_SUSPENDED)
+ transport_release(u);
+
+ if (u->read_smoother)
+ pa_smoother_pause(u->read_smoother, pa_rtclock_now());
+
+ break;
+
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING:
+ if (u->source->thread_info.state != PA_SOURCE_SUSPENDED)
+ break;
+
+ /* Resume the device if the sink was suspended as well */
+ if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state))
+ if (!setup_transport_and_stream(u))
+ return -1;
+
+ /* We don't resume the smoother here. Instead we
+ * wait until the first packet arrives */
+
+ break;
+
+ case PA_SOURCE_UNLINKED:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_INVALID_STATE:
+ break;
+ }
+
+ return 0;
+}
+
/* Run from main thread */
static void source_set_volume_cb(pa_source *s) {
uint16_t gain;
u->source->userdata = u;
u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) {
pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE:
-
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
- case PA_SINK_SUSPENDED:
- /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
- if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
- break;
-
- /* Stop the device if the source is suspended as well */
- if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
- /* We deliberately ignore whether stopping
- * actually worked. Since the stream_fd is
- * closed it doesn't really matter */
- transport_release(u);
-
- break;
-
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING:
- if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
- break;
-
- /* Resume the device if the source was suspended as well */
- if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) {
- if (!setup_transport_and_stream(u))
- return -1;
- }
-
- break;
-
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- case PA_SINK_INVALID_STATE:
- break;
- }
-
- break;
-
case PA_SINK_MESSAGE_GET_LATENCY: {
int64_t wi, ri;
return pa_sink_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+
+ case PA_SINK_SUSPENDED:
+ /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
+ if (!PA_SINK_IS_OPENED(u->sink->thread_info.state))
+ break;
+
+ /* Stop the device if the source is suspended as well */
+ if (!u->source || u->source->state == PA_SOURCE_SUSPENDED)
+ /* We deliberately ignore whether stopping
+ * actually worked. Since the stream_fd is
+ * closed it doesn't really matter */
+ transport_release(u);
+
+ break;
+
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING:
+ if (u->sink->thread_info.state != PA_SINK_SUSPENDED)
+ break;
+
+ /* Resume the device if the source was suspended as well */
+ if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state))
+ if (!setup_transport_and_stream(u))
+ return -1;
+
+ break;
+
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ case PA_SINK_INVALID_STATE:
+ break;
+ }
+
+ return 0;
+}
+
/* Run from main thread */
static void sink_set_volume_cb(pa_sink *s) {
uint16_t gain;
u->sink->userdata = u;
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) {
pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
return 0;
-
- case PA_SINK_MESSAGE_SET_STATE: {
- pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
- /* When set to running or idle for the first time, request a rewind
- * of the master sink to make sure we are heard immediately */
- if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
- pa_log_debug("Requesting rewind due to state change.");
- pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
- }
- break;
- }
-
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ /* When set to running or idle for the first time, request a rewind
+ * of the master sink to make sure we are heard immediately */
+ if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+ pa_log_debug("Requesting rewind due to state change.");
+ pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+ }
+
+ return 0;
+}
+
/* Called from source I/O thread context */
static void source_update_requested_latency_cb(pa_source *s) {
struct userdata *u;
u->sink->parent.process_msg = sink_process_msg_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->request_rewind = sink_request_rewind_cb;
pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+ bool running;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ running = new_state == PA_SINK_RUNNING;
+ pa_atomic_store(&u->thread_info.running, running);
+
+ if (running)
+ pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
+ else
+ pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
+
+ return 0;
+}
+
/* Called from IO context */
static void update_max_request(struct userdata *u) {
size_t max_request = 0;
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE: {
- bool running = (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING);
-
- pa_atomic_store(&u->thread_info.running, running);
-
- if (running)
- pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true);
- else
- pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now());
-
- break;
- }
-
case PA_SINK_MESSAGE_GET_LATENCY: {
pa_usec_t x, y, c;
int64_t *delay = data;
u->sink->parent.process_msg = sink_process_msg;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency;
u->sink->userdata = u;
//+ pa_bytes_to_usec(u->latency * fs, ss)
return 0;
}
-
- case PA_SINK_MESSAGE_SET_STATE: {
- pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
- /* When set to running or idle for the first time, request a rewind
- * of the master sink to make sure we are heard immediately */
- if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
- pa_log_debug("Requesting rewind due to state change.");
- pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
- }
- break;
- }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ /* When set to running or idle for the first time, request a rewind
+ * of the master sink to make sure we are heard immediately */
+ if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+ pa_log_debug("Requesting rewind due to state change.");
+ pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+ }
+
+ return 0;
+}
+
/* Called from I/O thread context */
static void sink_request_rewind_cb(pa_sink *s) {
struct userdata *u;
u->sink->parent.process_msg = sink_process_msg_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->request_rewind = sink_request_rewind_cb;
pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE:
-
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
-
- case PA_SINK_SUSPENDED:
- pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
-
- pa_smoother_pause(u->smoother, pa_rtclock_now());
- break;
-
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING:
-
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
- pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
-
- break;
-
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- case PA_SINK_INVALID_STATE:
- ;
- }
-
- break;
-
case PA_SINK_MESSAGE_GET_LATENCY: {
pa_usec_t w, r;
return pa_sink_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+
+ case PA_SINK_SUSPENDED:
+ pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+
+ pa_smoother_pause(u->smoother, pa_rtclock_now());
+ break;
+
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING:
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
+ pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
+
+ break;
+
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ case PA_SINK_INVALID_STATE:
+ ;
+ }
+
+ return 0;
+}
+
static void thread_func(void *userdata) {
struct userdata *u = userdata;
int write_type = 0;
}
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->userdata = u;
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
connect_control_ports(u);
return 0;
-
- case PA_SINK_MESSAGE_SET_STATE: {
- pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
- /* When set to running or idle for the first time, request a rewind
- * of the master sink to make sure we are heard immediately */
- if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
- pa_log_debug("Requesting rewind due to state change.");
- pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
- }
- break;
- }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ /* When set to running or idle for the first time, request a rewind
+ * of the master sink to make sure we are heard immediately */
+ if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+ pa_log_debug("Requesting rewind due to state change.");
+ pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+ }
+
+ return 0;
+}
+
/* Called from I/O thread context */
static void sink_request_rewind_cb(pa_sink *s) {
struct userdata *u;
u->sink->parent.process_msg = sink_process_msg_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->request_rewind = sink_request_rewind_cb;
pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
struct userdata *u = PA_SINK(o)->userdata;
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE:
-
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
- if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data)))
- u->timestamp = pa_rtclock_now();
- }
-
- break;
-
case PA_SINK_MESSAGE_GET_LATENCY: {
pa_usec_t now;
return pa_sink_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
+ if (PA_SINK_IS_OPENED(new_state))
+ u->timestamp = pa_rtclock_now();
+ }
+
+ return 0;
+}
+
static void sink_update_requested_latency_cb(pa_sink *s) {
struct userdata *u;
size_t nbytes;
}
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->userdata = u;
struct userdata *u = PA_SOURCE(o)->userdata;
switch (code) {
- case PA_SOURCE_MESSAGE_SET_STATE:
-
- if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING)
- u->timestamp = pa_rtclock_now();
-
- break;
-
case PA_SOURCE_MESSAGE_GET_LATENCY: {
pa_usec_t now;
return pa_source_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ if (new_state == PA_SOURCE_RUNNING)
+ u->timestamp = pa_rtclock_now();
+
+ return 0;
+}
+
static void source_update_requested_latency_cb(pa_source *s) {
struct userdata *u;
u->latency_time = latency_time;
u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
u->source->update_requested_latency = source_update_requested_latency_cb;
u->source->userdata = u;
struct userdata *u = PA_SINK(o)->userdata;
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE:
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
- if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data)))
- u->timestamp = pa_rtclock_now();
- } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) {
- if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) {
- /* Clear potential FIFO error flag */
- u->fifo_error = false;
-
- /* Continuously dropping data (clear counter on entering suspended state. */
- if (u->bytes_dropped != 0) {
- pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped);
- u->bytes_dropped = 0;
- }
- }
- }
- break;
-
case PA_SINK_MESSAGE_GET_LATENCY:
if (u->use_system_clock_for_timing) {
pa_usec_t now;
return pa_sink_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
+ if (PA_SINK_IS_OPENED(new_state))
+ u->timestamp = pa_rtclock_now();
+ } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) {
+ if (new_state == PA_SINK_SUSPENDED) {
+ /* Clear potential FIFO error flag */
+ u->fifo_error = false;
+
+ /* Continuously dropping data (clear counter on entering suspended state. */
+ if (u->bytes_dropped != 0) {
+ pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped);
+ u->bytes_dropped = 0;
+ }
+ }
+ }
+
+ return 0;
+}
+
static void sink_update_requested_latency_cb(pa_sink *s) {
struct userdata *u;
size_t nbytes;
}
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
if (u->use_system_clock_for_timing)
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->userdata = u;
pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
return 0;
-
- case PA_SINK_MESSAGE_SET_STATE: {
- pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
- /* When set to running or idle for the first time, request a rewind
- * of the master sink to make sure we are heard immediately */
- if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
- pa_log_debug("Requesting rewind due to state change.");
- pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
- }
- break;
- }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ /* When set to running or idle for the first time, request a rewind
+ * of the master sink to make sure we are heard immediately */
+ if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+ pa_log_debug("Requesting rewind due to state change.");
+ pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+ }
+
+ return 0;
+}
+
/* Called from I/O thread context */
static void sink_request_rewind(pa_sink *s) {
struct userdata *u;
u->sink->parent.process_msg = sink_process_msg;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency;
u->sink->request_rewind = sink_request_rewind;
u->sink->userdata = u;
switch (code) {
- case PA_SOURCE_MESSAGE_SET_STATE:
-
- if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING)
- u->timestamp = pa_rtclock_now();
-
- break;
-
case PA_SOURCE_MESSAGE_GET_LATENCY: {
pa_usec_t now, left_to_fill;
return pa_source_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ if (new_state == PA_SOURCE_RUNNING)
+ u->timestamp = pa_rtclock_now();
+
+ return 0;
+}
+
static void source_update_requested_latency_cb(pa_source *s) {
struct userdata *u;
}
u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
u->source->update_requested_latency = source_update_requested_latency_cb;
u->source->userdata = u;
case PA_SINK_MESSAGE_GET_LATENCY:
*((int64_t*) data) = sink_get_latency(u, &PA_SINK(o)->sample_spec);
return 0;
+ }
- case PA_SINK_MESSAGE_SET_STATE:
+ return pa_sink_process_msg(o, code, data, offset, chunk);
+}
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
- case PA_SINK_SUSPENDED:
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
- pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+ switch (new_state) {
- pa_smoother_pause(u->smoother, pa_rtclock_now());
+ case PA_SINK_SUSPENDED:
- if (!u->source || u->source_suspended)
- suspend(u);
+ pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
- u->sink_suspended = true;
- break;
+ pa_smoother_pause(u->smoother, pa_rtclock_now());
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING:
-
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
- pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
-
- if (!u->source || u->source_suspended) {
- bool mute;
- if (unsuspend(u) < 0)
- return -1;
- u->sink->get_volume(u->sink);
- if (u->sink->get_mute(u->sink, &mute) >= 0)
- pa_sink_set_mute(u->sink, mute, false);
- }
- u->sink_suspended = false;
- }
- break;
+ if (!u->source || u->source_suspended)
+ suspend(u);
- case PA_SINK_INVALID_STATE:
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- ;
- }
+ u->sink_suspended = true;
+ break;
+
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING:
+
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+ pa_smoother_resume(u->smoother, pa_rtclock_now(), true);
+ if (!u->source || u->source_suspended) {
+ bool mute;
+ if (unsuspend(u) < 0)
+ return -1;
+ u->sink->get_volume(u->sink);
+ if (u->sink->get_mute(u->sink, &mute) >= 0)
+ pa_sink_set_mute(u->sink, mute, false);
+ }
+ u->sink_suspended = false;
+ }
break;
+
+ case PA_SINK_INVALID_STATE:
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ ;
}
- return pa_sink_process_msg(o, code, data, offset, chunk);
+ return 0;
}
static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
case PA_SOURCE_MESSAGE_GET_LATENCY:
*((pa_usec_t*) data) = source_get_latency(u, &PA_SOURCE(o)->sample_spec);
return 0;
+ }
- case PA_SOURCE_MESSAGE_SET_STATE:
+ return pa_source_process_msg(o, code, data, offset, chunk);
+}
- switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
- case PA_SOURCE_SUSPENDED:
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
- pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
+ switch (new_state) {
- if (!u->sink || u->sink_suspended)
- suspend(u);
+ case PA_SOURCE_SUSPENDED:
- u->source_suspended = true;
- break;
+ pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
- case PA_SOURCE_IDLE:
- case PA_SOURCE_RUNNING:
+ if (!u->sink || u->sink_suspended)
+ suspend(u);
- if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
- if (!u->sink || u->sink_suspended) {
- if (unsuspend(u) < 0)
- return -1;
- u->source->get_volume(u->source);
- }
- u->source_suspended = false;
- }
- break;
+ u->source_suspended = true;
+ break;
- case PA_SOURCE_UNLINKED:
- case PA_SOURCE_INIT:
- case PA_SOURCE_INVALID_STATE:
- ;
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING:
+ if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
+ if (!u->sink || u->sink_suspended) {
+ if (unsuspend(u) < 0)
+ return -1;
+ u->source->get_volume(u->source);
+ }
+ u->source_suspended = false;
}
break;
+ case PA_SOURCE_UNLINKED:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_INVALID_STATE:
+ ;
+
}
- return pa_source_process_msg(o, code, data, offset, chunk);
+ return 0;
}
static void sink_set_volume(pa_sink *s) {
u->source->userdata = u;
u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
pa_source_set_rtpoll(u->source, u->rtpoll);
pa_assert(u->sink);
u->sink->userdata = u;
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
pa_sink_set_rtpoll(u->sink, u->rtpoll);
*((int64_t*) data) = remote_latency;
return 0;
}
- case PA_SINK_MESSAGE_SET_STATE:
- if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
- break;
+ }
+ return pa_sink_process_msg(o, code, data, offset, chunk);
+}
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
- case PA_SINK_SUSPENDED: {
- cork_stream(u, true);
- break;
- }
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING: {
- cork_stream(u, false);
- break;
- }
- case PA_SINK_INVALID_STATE:
- case PA_SINK_INIT:
- case PA_SINK_UNLINKED:
- break;
- }
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
+ return 0;
+
+ switch (new_state) {
+ case PA_SINK_SUSPENDED: {
+ cork_stream(u, true);
+ break;
+ }
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING: {
+ cork_stream(u, false);
+ break;
+ }
+ case PA_SINK_INVALID_STATE:
+ case PA_SINK_INIT:
+ case PA_SINK_UNLINKED:
break;
}
- return pa_sink_process_msg(o, code, data, offset, chunk);
+
+ return 0;
}
int pa__init(pa_module *m) {
pa_sink_new_data_done(&sink_data);
u->sink->userdata = u;
u->sink->parent.process_msg = sink_process_msg_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
return 0;
}
- case PA_SOURCE_MESSAGE_SET_STATE:
- if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
- break;
+ }
+ return pa_source_process_msg(o, code, data, offset, chunk);
+}
- switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
- case PA_SOURCE_SUSPENDED: {
- cork_stream(u, true);
- break;
- }
- case PA_SOURCE_IDLE:
- case PA_SOURCE_RUNNING: {
- cork_stream(u, false);
- break;
- }
- case PA_SOURCE_INVALID_STATE:
- case PA_SOURCE_INIT:
- case PA_SOURCE_UNLINKED:
- break;
- }
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY)
+ return 0;
+
+ switch (new_state) {
+ case PA_SOURCE_SUSPENDED: {
+ cork_stream(u, true);
+ break;
+ }
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING: {
+ cork_stream(u, false);
+ break;
+ }
+ case PA_SOURCE_INVALID_STATE:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_UNLINKED:
break;
}
- return pa_source_process_msg(o, code, data, offset, chunk);
+
+ return 0;
}
int pa__init(pa_module *m) {
pa_source_new_data_done(&source_data);
u->source->userdata = u;
u->source->parent.process_msg = source_process_msg_cb;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
u->source->update_requested_latency = source_update_requested_latency_cb;
pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
return 0;
-
- case PA_SINK_MESSAGE_SET_STATE: {
- pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
- /* When set to running or idle for the first time, request a rewind
- * of the master sink to make sure we are heard immediately */
- if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
- pa_log_debug("Requesting rewind due to state change.");
- pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
- }
- break;
- }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ /* When set to running or idle for the first time, request a rewind
+ * of the master sink to make sure we are heard immediately */
+ if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+ pa_log_debug("Requesting rewind due to state change.");
+ pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+ }
+
+ return 0;
+}
+
/* Called from I/O thread context */
static void sink_request_rewind_cb(pa_sink *s) {
struct userdata *u;
u->sink->parent.process_msg = sink_process_msg_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->request_rewind = sink_request_rewind_cb;
pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
return 0;
-
- case PA_SINK_MESSAGE_SET_STATE: {
- pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data);
-
- /* When set to running or idle for the first time, request a rewind
- * of the master sink to make sure we are heard immediately */
- if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
- pa_log_debug("Requesting rewind due to state change.");
- pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
- }
- break;
- }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ /* When set to running or idle for the first time, request a rewind
+ * of the master sink to make sure we are heard immediately */
+ if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) {
+ pa_log_debug("Requesting rewind due to state change.");
+ pa_sink_input_request_rewind(u->sink_input, 0, false, true, true);
+ }
+
+ return 0;
+}
+
/* Called from I/O thread context */
static void sink_request_rewind_cb(pa_sink *s) {
struct userdata *u;
u->sink->parent.process_msg = sink_process_msg_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->request_rewind = sink_request_rewind_cb;
pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
/* Called from IO context */
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
- bool do_trigger = false, quick = true;
- pa_sink_state_t new_state;
switch (code) {
return 0;
}
+ }
- case PA_SINK_MESSAGE_SET_STATE:
- new_state = PA_PTR_TO_UINT(data);
+ return pa_sink_process_msg(o, code, data, offset, chunk);
+}
- switch (new_state) {
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+ bool do_trigger = false;
+ bool quick = true;
- case PA_SINK_SUSPENDED:
- pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
- if (!u->source || u->source_suspended)
- suspend(u);
+ switch (new_state) {
- do_trigger = true;
+ case PA_SINK_SUSPENDED:
+ pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
- u->sink_suspended = true;
- break;
+ if (!u->source || u->source_suspended)
+ suspend(u);
- case PA_SINK_IDLE:
- case PA_SINK_RUNNING:
+ do_trigger = true;
- if (u->sink->thread_info.state == PA_SINK_INIT) {
- do_trigger = true;
- quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state);
- }
+ u->sink_suspended = true;
+ break;
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING:
- if (!u->source || u->source_suspended) {
- if (unsuspend(u) < 0)
- return -1;
- quick = false;
- }
+ if (u->sink->thread_info.state == PA_SINK_INIT) {
+ do_trigger = true;
+ quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state);
+ }
- do_trigger = true;
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
- u->out_mmap_current = 0;
- u->out_mmap_saved_nfrags = 0;
+ if (!u->source || u->source_suspended) {
+ if (unsuspend(u) < 0)
+ return -1;
+ quick = false;
+ }
- u->sink_suspended = false;
- }
+ do_trigger = true;
- break;
+ u->out_mmap_current = 0;
+ u->out_mmap_saved_nfrags = 0;
- case PA_SINK_INVALID_STATE:
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- ;
+ u->sink_suspended = false;
}
break;
+
+ case PA_SINK_INVALID_STATE:
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ ;
}
if (do_trigger)
trigger(u, new_state, u->source ? u->source->thread_info.state : PA_SOURCE_INVALID_STATE, quick);
- return pa_sink_process_msg(o, code, data, offset, chunk);
+ return 0;
}
static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SOURCE(o)->userdata;
- bool do_trigger = false, quick = true;
- pa_source_state_t new_state;
switch (code) {
*((int64_t*) data) = (int64_t)r;
return 0;
}
+ }
- case PA_SOURCE_MESSAGE_SET_STATE:
- new_state = PA_PTR_TO_UINT(data);
+ return pa_source_process_msg(o, code, data, offset, chunk);
+}
- switch (new_state) {
+/* Called from the IO thread. */
+static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
+ struct userdata *u;
+ bool do_trigger = false;
+ bool quick = true;
- case PA_SOURCE_SUSPENDED:
- pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
- if (!u->sink || u->sink_suspended)
- suspend(u);
+ switch (new_state) {
- do_trigger = true;
+ case PA_SOURCE_SUSPENDED:
+ pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
- u->source_suspended = true;
- break;
+ if (!u->sink || u->sink_suspended)
+ suspend(u);
- case PA_SOURCE_IDLE:
- case PA_SOURCE_RUNNING:
+ do_trigger = true;
- if (u->source->thread_info.state == PA_SOURCE_INIT) {
- do_trigger = true;
- quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state);
- }
+ u->source_suspended = true;
+ break;
- if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING:
- if (!u->sink || u->sink_suspended) {
- if (unsuspend(u) < 0)
- return -1;
- quick = false;
- }
+ if (u->source->thread_info.state == PA_SOURCE_INIT) {
+ do_trigger = true;
+ quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state);
+ }
- do_trigger = true;
+ if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
- u->in_mmap_current = 0;
- u->in_mmap_saved_nfrags = 0;
+ if (!u->sink || u->sink_suspended) {
+ if (unsuspend(u) < 0)
+ return -1;
+ quick = false;
+ }
- u->source_suspended = false;
- }
- break;
+ do_trigger = true;
- case PA_SOURCE_UNLINKED:
- case PA_SOURCE_INIT:
- case PA_SOURCE_INVALID_STATE:
- ;
+ u->in_mmap_current = 0;
+ u->in_mmap_saved_nfrags = 0;
+ u->source_suspended = false;
}
break;
+
+ case PA_SOURCE_UNLINKED:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_INVALID_STATE:
+ ;
}
if (do_trigger)
trigger(u, u->sink ? u->sink->thread_info.state : PA_SINK_INVALID_STATE, new_state, quick);
- return pa_source_process_msg(o, code, data, offset, chunk);
+ return 0;
}
static void sink_get_volume(pa_sink *s) {
}
u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
u->source->userdata = u;
pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
}
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->userdata = u;
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
pa_assert(u->raop);
switch (code) {
- case PA_SINK_MESSAGE_SET_STATE: {
- switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
- case PA_SINK_SUSPENDED: {
- pa_log_debug("RAOP: SUSPENDED");
-
- pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
-
- /* Issue a TEARDOWN if we are still connected */
- if (pa_raop_client_is_alive(u->raop)) {
- pa_raop_client_teardown(u->raop);
- }
-
- break;
- }
-
- case PA_SINK_IDLE: {
- pa_log_debug("RAOP: IDLE");
-
- /* Issue a FLUSH if we're comming from running state */
- if (u->sink->thread_info.state == PA_SINK_RUNNING) {
- pa_rtpoll_set_timer_disabled(u->rtpoll);
- pa_raop_client_flush(u->raop);
- }
-
- break;
- }
-
- case PA_SINK_RUNNING: {
- pa_usec_t now;
-
- pa_log_debug("RAOP: RUNNING");
-
- now = pa_rtclock_now();
- pa_smoother_reset(u->smoother, now, false);
-
- if (!pa_raop_client_is_alive(u->raop)) {
- /* Connecting will trigger a RECORD and start steaming */
- pa_raop_client_announce(u->raop);
- } else if (!pa_raop_client_can_stream(u->raop)) {
- /* RECORD alredy sent, simply start streaming */
- pa_raop_client_stream(u->raop);
- pa_rtpoll_set_timer_absolute(u->rtpoll, now);
- u->write_count = 0;
- u->start = now;
- }
-
- break;
- }
-
- case PA_SINK_UNLINKED:
- case PA_SINK_INIT:
- case PA_SINK_INVALID_STATE:
- break;
- }
-
- break;
- }
-
case PA_SINK_MESSAGE_GET_LATENCY: {
int64_t r = 0;
return pa_sink_process_msg(o, code, data, offset, chunk);
}
+/* Called from the IO thread. */
+static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
+ struct userdata *u;
+
+ pa_assert(s);
+ pa_assert_se(u = s->userdata);
+
+ switch (new_state) {
+ case PA_SINK_SUSPENDED:
+ pa_log_debug("RAOP: SUSPENDED");
+
+ pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
+
+ /* Issue a TEARDOWN if we are still connected */
+ if (pa_raop_client_is_alive(u->raop)) {
+ pa_raop_client_teardown(u->raop);
+ }
+
+ break;
+
+ case PA_SINK_IDLE:
+ pa_log_debug("RAOP: IDLE");
+
+ /* Issue a FLUSH if we're comming from running state */
+ if (u->sink->thread_info.state == PA_SINK_RUNNING) {
+ pa_rtpoll_set_timer_disabled(u->rtpoll);
+ pa_raop_client_flush(u->raop);
+ }
+
+ break;
+
+ case PA_SINK_RUNNING: {
+ pa_usec_t now;
+
+ pa_log_debug("RAOP: RUNNING");
+
+ now = pa_rtclock_now();
+ pa_smoother_reset(u->smoother, now, false);
+
+ if (!pa_raop_client_is_alive(u->raop)) {
+ /* Connecting will trigger a RECORD and start steaming */
+ pa_raop_client_announce(u->raop);
+ } else if (!pa_raop_client_can_stream(u->raop)) {
+ /* RECORD alredy sent, simply start streaming */
+ pa_raop_client_stream(u->raop);
+ pa_rtpoll_set_timer_absolute(u->rtpoll, now);
+ u->write_count = 0;
+ u->start = now;
+ }
+
+ break;
+ }
+
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ case PA_SINK_INVALID_STATE:
+ break;
+ }
+
+ return 0;
+}
+
static void sink_set_volume_cb(pa_sink *s) {
struct userdata *u = s->userdata;
pa_cvolume hw;
}
u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
u->sink->userdata = u;
pa_assert(s);
s->set_state_in_main_thread = NULL;
+ s->set_state_in_io_thread = NULL;
s->get_volume = NULL;
s->set_volume = NULL;
s->write_volume = NULL;
(s->thread_info.state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
(PA_SINK_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SINK_SUSPENDED);
+ if (s->set_state_in_io_thread) {
+ int r;
+
+ if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0)
+ return r;
+ }
+
s->thread_info.state = PA_PTR_TO_UINT(userdata);
if (s->thread_info.state == PA_SINK_SUSPENDED) {
bool set_mute_in_progress;
- /* Called when the main loop requests a state change. Called from
- * main loop context. If returns -1 the state change will be
- * inhibited. This will also be called even if only the suspend cause
+ /* Callbacks for doing things when the sink state and/or suspend cause is
+ * changed. It's fine to set either or both of the callbacks to NULL if the
+ * implementation doesn't have anything to do on state or suspend cause
* changes.
*
- * s->state and s->suspend_cause haven't been updated yet when this is
- * called, so the callback can get the old state through those variables.
+ * set_state_in_main_thread() is called first. The callback is allowed to
+ * report failure if and only if the sink changes its state from
+ * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow
+ * failure also when changing state from INIT to IDLE or RUNNING, but
+ * currently that will crash pa_sink_put().) If
+ * set_state_in_main_thread() fails, set_state_in_io_thread() won't be
+ * called.
*
- * If set_state_in_main_thread() is successful, the IO thread will be
- * notified with the SET_STATE message. The message handler is allowed to
- * fail, in which case the old state is restored, and
- * set_state_in_main_thread() is called again. */
+ * If set_state_in_main_thread() is successful (or not set), then
+ * set_state_in_io_thread() is called. Again, failure is allowed if and
+ * only if the sink changes state from SUSPENDED to IDLE or RUNNING. If
+ * set_state_in_io_thread() fails, then set_state_in_main_thread() is
+ * called again, this time with the state parameter set to SUSPENDED and
+ * the suspend_cause parameter set to 0.
+ *
+ * pa_sink.state, pa_sink.thread_info.state and pa_sink.suspend_cause
+ * are updated only after all the callback calls. In case of failure, the
+ * state is set to SUSPENDED and the suspend cause is set to 0. */
int (*set_state_in_main_thread)(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+ int (*set_state_in_io_thread)(pa_sink *s, pa_sink_state_t state); /* may be NULL */
/* Sink drivers that support hardware volume may set this
* callback. This is called when the current volume needs to be
pa_assert(s);
s->set_state_in_main_thread = NULL;
+ s->set_state_in_io_thread = NULL;
s->get_volume = NULL;
s->set_volume = NULL;
s->write_volume = NULL;
(s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) ||
(PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED);
+ if (s->set_state_in_io_thread) {
+ int r;
+
+ if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0)
+ return r;
+ }
+
s->thread_info.state = PA_PTR_TO_UINT(userdata);
if (suspend_change) {
bool set_mute_in_progress;
- /* Called when the main loop requests a state change. Called from
- * main loop context. If returns -1 the state change will be
- * inhibited. This will also be called even if only the suspend cause
+ /* Callbacks for doing things when the source state and/or suspend cause is
+ * changed. It's fine to set either or both of the callbacks to NULL if the
+ * implementation doesn't have anything to do on state or suspend cause
* changes.
*
- * s->state and s->suspend_cause haven't been updated yet when this is
- * called, so the callback can get the old state through those variables.
+ * set_state_in_main_thread() is called first. The callback is allowed to
+ * report failure if and only if the source changes its state from
+ * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow
+ * failure also when changing state from INIT to IDLE or RUNNING, but
+ * currently that will crash pa_source_put().) If
+ * set_state_in_main_thread() fails, set_state_in_io_thread() won't be
+ * called.
*
- * If set_state_in_main_thread() is successful, the IO thread will be
- * notified with the SET_STATE message. The message handler is allowed to
- * fail, in which case the old state is restored, and
- * set_state_in_main_thread() is called again. */
- int (*set_state_in_main_thread)(pa_source *source, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+ * If set_state_in_main_thread() is successful (or not set), then
+ * set_state_in_io_thread() is called. Again, failure is allowed if and
+ * only if the source changes state from SUSPENDED to IDLE or RUNNING. If
+ * set_state_in_io_thread() fails, then set_state_in_main_thread() is
+ * called again, this time with the state parameter set to SUSPENDED and
+ * the suspend_cause parameter set to 0.
+ *
+ * pa_source.state, pa_source.thread_info.state and pa_source.suspend_cause
+ * are updated only after all the callback calls. In case of failure, the
+ * state is set to SUSPENDED and the suspend cause is set to 0. */
+ int (*set_state_in_main_thread)(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */
+ int (*set_state_in_io_thread)(pa_source *s, pa_source_state_t state); /* may be NULL */
/* Called when the volume is queried. Called from main loop
* context. If this is NULL a PA_SOURCE_MESSAGE_GET_VOLUME message