--- /dev/null
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2019 Sangchul Lee <sc11.lee@samsung.com>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sched.h>
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
+#include <pulse/rtclock.h>
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/sink.h>
+#include <pulsecore/module.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/modargs.h>
+#include <pulsecore/log.h>
+#include <pulsecore/thread.h>
+#include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
+#include <pulsecore/mutex.h>
+
+#include "acm.h"
+
+#define DEFAULT_SINK_NAME "ACM Sink"
+#define DEFAULT_WRITE_BLOCK_SIZE 1024
+#define DEFAULT_ACM_EVENT_SOCKET_PATH "/tmp/.acm_event_socket"
+#define DEFAULT_ACM_DATA_SOCKET_PATH "/tmp/.acm_data_socket"
+
+#define IPC_ERR -1
+#define IPC_MAX_MSG_LEN 256
+#define SOCKET_ENOUGH_DATA_SIZE 3072
+
+#define MSG_DATA_ENOUGH "data_enough"
+#define MSG_DRAIN_REQUEST "drain_request"
+#define MSG_DRAIN_COMPLETE "drain_complete"
+#define MSG_FLUSH_REQUEST "flush_request"
+#define MSG_SEND_PAUSE "acm_reader_queue_full"
+#define MSG_SEND_RESUME "acm_reader_queue_need_data"
+
+#define MSG_WAIT_TIMEOUT 1000 /* msec */
+
+//#define DUMP_PCM
+//#define DEBUG_LOG
+
+struct userdata {
+ pa_core *core;
+ pa_module *module;
+ pa_sink *sink;
+
+ pa_thread *thread;
+ pa_thread_mq thread_mq;
+ pa_rtpoll *rtpoll;
+
+ pa_usec_t timestamp;
+
+ size_t write_block_size;
+
+ pa_thread *msg_thread;
+ bool exit_msg_thread;
+ char *data_socket_path;
+ char *msg_socket_path;
+ int data_fd;
+ int msg_fd;
+ bool msg_sent;
+ int initial_sent_size;
+
+ pa_mutex *msg_mutex;
+ pa_cond *msg_cond;
+ bool need_pause;
+
+#ifdef DUMP_PCM
+ FILE *dump_fp;
+#endif
+};
+
+static int initialize_IPC(struct userdata *u);
+static void deinitialize_IPC(struct userdata *u);
+static int send_data(struct userdata *u, pa_memchunk *chunk);
+
+#ifdef DUMP_PCM
+#include <time.h>
+#define DUMP_PATH_PREFIX "/tmp/pcm"
+static void dump_open(struct userdata *u) {
+ char date_time[7];
+ struct timeval cur_time;
+ struct tm tm;
+ char *dump_time;
+ char *dump_path;
+
+ pa_assert(u);
+
+ pa_gettimeofday(&cur_time);
+ localtime_r(&cur_time.tv_sec, &tm);
+ memset(&date_time[0], 0x00, sizeof(date_time));
+ strftime(&date_time[0], sizeof(date_time), "%H%M%S", &tm);
+
+ dump_time = pa_sprintf_malloc("%s.%03ld", &date_time[0], cur_time.tv_usec / 1000);
+ dump_path = pa_sprintf_malloc("%s_%s_%uHz_%uch_%s.raw", DUMP_PATH_PREFIX,
+ pa_sample_format_to_string(u->sink->sample_spec.format),
+ u->sink->sample_spec.rate, u->sink->sample_spec.channels,
+ pa_strempty(dump_time));
+
+ if (u->dump_fp)
+ fclose(u->dump_fp);
+
+ u->dump_fp = fopen(dump_path, "w");
+ if (u->dump_fp)
+ pa_log_info("%s opened", dump_path);
+ else
+ pa_log_warn("%s open failed", dump_path);
+
+ pa_xfree(dump_time);
+ pa_xfree(dump_path);
+}
+
+static void dump_write(struct userdata *u, pa_memchunk *chunk) {
+ pa_assert(u);
+ pa_assert(chunk);
+
+ if (u->dump_fp) {
+ void *ptr = NULL;
+ if ((ptr = pa_memblock_acquire(chunk->memblock))) {
+ fwrite((uint8_t *)ptr + chunk->index, 1, chunk->length, u->dump_fp);
+ pa_log_info("ptr(%p), chunk->index(%zu), write data of index %p, length %zu",
+ (uint8_t *)ptr, chunk->index, (uint8_t *)ptr + chunk->index, chunk->length);
+ pa_memblock_release(chunk->memblock);
+ }
+ }
+}
+
+static void dump_close(struct userdata *u) {
+ pa_assert(u);
+
+ if (u->dump_fp) {
+ fclose(u->dump_fp);
+ u->dump_fp = NULL;
+ pa_log_info("dump closed");
+ }
+}
+#endif
+
+static void msg_cond_wait(struct userdata *u) {
+ pa_assert(u);
+
+ pa_mutex_lock(u->msg_mutex);
+ if (pa_cond_timedwait(u->msg_cond, u->msg_mutex, MSG_WAIT_TIMEOUT))
+ pa_log_error("msg cond wait failed, timeout..");
+ else
+ pa_log_info("msg cond wakeup");
+ pa_mutex_unlock(u->msg_mutex);
+}
+
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct userdata *u = PA_SINK(o)->userdata;
+
+ switch (code) {
+
+ case PA_SINK_MESSAGE_SET_STATE:
+ if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
+ if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
+ u->timestamp = pa_rtclock_now();
+ }
+#ifdef DUMP_PCM
+ if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING)
+ dump_open(u);
+ else
+ dump_close(u);
+#endif
+ break;
+
+ case PA_SINK_MESSAGE_GET_LATENCY: {
+ pa_usec_t now;
+
+ now = pa_rtclock_now();
+ *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
+ return 0;
+ }
+ }
+
+ return pa_sink_process_msg(o, code, data, offset, chunk);
+}
+
+static void process_rewind(struct userdata *u) {
+ /* Rewind not supported */
+ pa_sink_process_rewind(u->sink, 0);
+}
+
+typedef enum {
+ IPC_CHANNEL_MSG,
+ IPC_CHANNEL_DATA
+} ipc_channel_t;
+
+static int ipc_get_client_fd(struct userdata *u, ipc_channel_t channel)
+{
+ struct sockaddr_un address;
+ int len, ret = IPC_ERR;
+ int sockfd;
+ int n_opt_val;
+ unsigned int n_opt_len = sizeof (n_opt_val);
+ char *socket_path = u->msg_socket_path;
+
+ /*Create socket*/
+ if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ pa_log_error("socket failure: %s", pa_cstrerror(errno));
+ return -1;
+ }
+
+ if (channel == IPC_CHANNEL_DATA) {
+ n_opt_val = 19200;
+ setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, n_opt_len);
+ socket_path = u->data_socket_path;
+ }
+ getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, &n_opt_len);
+ pa_log_info("sockfd: %d, socket send buffer size: %d", sockfd, n_opt_val);
+
+ if (fcntl(sockfd, F_SETFD, FD_CLOEXEC) < 0) {
+ pa_log_error("unable to set on ctrls socket fd %d: %s", sockfd, pa_cstrerror(errno));
+ close(sockfd);
+ return -1;
+ }
+
+ if (channel == IPC_CHANNEL_MSG) {
+ int flag = fcntl(sockfd, F_GETFL, 0);
+ fcntl(sockfd, F_SETFL, flag | O_NONBLOCK);
+ }
+
+ memset(&address, 0, sizeof(address));
+ address.sun_family = AF_UNIX;
+ strncpy(address.sun_path, socket_path, sizeof(address.sun_path));
+ len = sizeof(address);
+
+ if ((ret = connect(sockfd, (struct sockaddr *)&address, len)) < 0) {
+ pa_log_error("connect failure: %s, path: %s", pa_cstrerror(errno), socket_path);
+ close(sockfd);
+ return -1;
+ }
+
+ pa_log_debug("connected well, fd[%d] for [%s]", sockfd, socket_path);
+
+ return sockfd;
+}
+
+static int ipc_push_data(struct userdata *u, void *data, int size) {
+ int sent = 0;
+
+ pa_assert(u);
+
+ if (u->data_fd < 0)
+ return -1;
+
+ if ((sent = send(u->data_fd, data, size, 0)) < 0) {
+ pa_log_error("[fd:%d] fail to send data: %s", u->data_fd, pa_cstrerror(errno));
+ return -1;
+ }
+
+ if (!u->msg_sent) {
+ u->initial_sent_size += sent;
+ if (u->initial_sent_size > SOCKET_ENOUGH_DATA_SIZE ) {
+ int msg_sent_len = 0;
+ pa_log_info("send data enough message to ipc server");
+ if ((msg_sent_len = send(u->msg_fd, MSG_DATA_ENOUGH, strlen(MSG_DATA_ENOUGH), 0)) < 0)
+ pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DATA_ENOUGH, msg_sent_len);
+ u->msg_sent = true;
+ u->initial_sent_size = 0;
+ }
+ }
+
+#ifdef DEBUG_LOG
+ if (sent > 0)
+ pa_log_debug("sent length: %d", sent);
+#endif
+
+ return sent;
+}
+
+static int initialize_IPC(struct userdata *u) {
+ int retry = 10;
+
+ pa_assert(u);
+
+ while (retry--) {
+ u->msg_fd = ipc_get_client_fd(u, IPC_CHANNEL_MSG);
+ if (u->msg_fd < 0) {
+ pa_log_warn("msg_fd is not connected. try again(%d)", retry);
+ usleep(100 * 1000);
+ continue;
+ }
+ pa_log_debug("got fd for message %d", u->msg_fd);
+ break;
+ }
+ if (u->msg_fd < 0) {
+ pa_log_error("failed to get msg_fd");
+ return -1;
+ }
+
+ retry = 10;
+ while (retry--) {
+ u->data_fd = ipc_get_client_fd(u, IPC_CHANNEL_DATA);
+ if (u->data_fd < 0) {
+ pa_log_warn("data_fd is not connected. try again(%d)", retry);
+ usleep(100 * 1000);
+ continue;
+ }
+ pa_log_debug("got fd for data %d", u->data_fd);
+ break;
+ }
+ if (u->data_fd < 0) {
+ pa_log_error("failed to get data_fd");
+ return -1;
+ }
+
+ u->msg_sent = false;
+ u->initial_sent_size = 0;
+
+ return 0;
+}
+
+static void deinitialize_IPC(struct userdata *u) {
+ pa_assert(u);
+
+ if (u->msg_fd >= 0) {
+ int msg_sent_len = 0;
+ if ((msg_sent_len = send(u->msg_fd, MSG_DRAIN_REQUEST, strlen(MSG_DRAIN_REQUEST), 0)) < 0) {
+ pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DRAIN_REQUEST, msg_sent_len);
+ } else {
+ pa_log_info("msg cond wait for drain completed");
+ msg_cond_wait(u);
+ }
+ }
+
+ u->msg_sent = false;
+ u->initial_sent_size = 0;
+
+ if (u->data_fd >= 0) {
+ close(u->data_fd);
+ u->data_fd = -1;
+ }
+
+ if (u->msg_fd >= 0) {
+ close(u->msg_fd);
+ u->msg_fd = -1;
+ }
+}
+
+static int send_data(struct userdata *u, pa_memchunk *chunk) {
+ size_t index = 0;
+ size_t length = 0;
+ size_t total_size = 0;
+ void *p;
+
+ pa_assert(u);
+ pa_assert(chunk);
+
+ p = pa_memblock_acquire(chunk->memblock);
+ pa_assert(p);
+
+ index = chunk->index;
+ length = chunk->length;
+
+ for (;;) {
+ ssize_t sent;
+
+ if (u->need_pause) {
+ pa_log_info("msg cond wait for resume");
+ msg_cond_wait(u);
+ }
+
+ sent = ipc_push_data(u, p + index, u->write_block_size);
+
+ if (sent < 0) {
+ if (errno == EINTR || errno == EAGAIN) {
+ pa_log_warn("Failed to write data to fd: %s", pa_cstrerror(errno));
+ continue;
+ }
+ pa_log_error("Failed to write data to fd: %s", pa_cstrerror(errno));
+ return -1;
+ } else {
+ index += (size_t) sent;
+ if (length <= sent) {
+ total_size += sent;
+#ifdef DEBUG_LOG
+ pa_log_debug("Wrote %zu bytes.", total_size);
+#endif
+ break;
+ }
+ length -= (size_t) sent;
+ total_size += (size_t) sent;
+ if (length < u->write_block_size) {
+#ifdef DEBUG_LOG
+ pa_log_warn("Unexpected write, remained %zu, skip it", length);
+#endif
+ chunk->index = index;
+ chunk->length = length;
+ goto finish;
+ }
+ }
+ }
+
+finish:
+ pa_memblock_release(chunk->memblock);
+
+ return 0;
+}
+
+static void process_render(struct userdata *u, pa_usec_t now) {
+ size_t ate = 0;
+
+ pa_assert(u);
+
+ while (u->timestamp < now) {
+ pa_memchunk chunk;
+
+ pa_sink_render_full(u->sink, u->sink->thread_info.max_request, &chunk);
+
+#ifdef DUMP_PCM
+ dump_write(u, &chunk);
+#endif
+ if (!send_data(u, &chunk)) {
+ u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
+ ate += chunk.length;
+ }
+
+ pa_memblock_unref(chunk.memblock);
+
+ if (ate >= u->sink->thread_info.max_request)
+ break;
+ }
+#ifdef DEBUG_LOG
+ pa_log_debug("end of while loop, timestamp[%" PRIu64 "]", u->timestamp);
+#endif
+}
+
+static void thread_func(void *userdata) {
+ struct userdata *u = userdata;
+
+ pa_assert(u);
+
+ pa_log_debug("Thread starting up");
+
+ pa_thread_mq_install(&u->thread_mq);
+
+ u->timestamp = pa_rtclock_now();
+
+ for (;;) {
+ pa_usec_t now = 0;
+ int ret;
+
+ if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
+ now = pa_rtclock_now();
+#ifdef DEBUG_LOG
+ pa_log_debug("now[%" PRIu64 "]", now);
+#endif
+ }
+
+ if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+ process_rewind(u);
+
+ if (PA_SINK_IS_RUNNING(u->sink->thread_info.state)) {
+#ifdef DEBUG_LOG
+ pa_log_debug("timestamp[%" PRIu64 "], now[%" PRIu64 "]", u->timestamp, now);
+#endif
+ if (u->timestamp <= now)
+ process_render(u, now);
+
+ pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
+ } else {
+ pa_rtpoll_set_timer_disabled(u->rtpoll);
+ }
+
+ /* Hmm, nothing to do. Let's sleep */
+ if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
+ goto fail;
+
+ if (ret == 0)
+ goto finish;
+ }
+
+fail:
+ /* If this was no regular exit from the loop we have to continue
+ * processing messages until we received PA_MESSAGE_SHUTDOWN */
+ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
+ pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
+
+finish:
+ pa_log_debug("Thread shutting down");
+}
+
+static void msg_thread_func(void *userdata) {
+ struct userdata *u = userdata;
+ char recv_msg[IPC_MAX_MSG_LEN];
+ int recv_len = 0;
+
+ pa_assert(u);
+
+ pa_log_debug("MSG thread starting up");
+
+ while (!u->exit_msg_thread) {
+ if (u->msg_fd < 0) {
+ usleep(10*1000);
+ continue;
+ }
+
+ if ((recv_len = recv(u->msg_fd, recv_msg , IPC_MAX_MSG_LEN, 0)) > 0) {
+ if (!strcmp(recv_msg, MSG_DRAIN_COMPLETE)) {
+ pa_log_debug("got drain complete signal [%s]", recv_msg);
+ pa_cond_signal(u->msg_cond, false);
+ } else if (!strcmp(recv_msg, MSG_SEND_PAUSE)) {
+ pa_log_debug("got pause signal [%s]", recv_msg);
+ u->need_pause = true;
+ } else if (!strcmp(recv_msg, MSG_SEND_RESUME) && u->need_pause) {
+ pa_log_debug("got resume signal [%s]", recv_msg);
+ u->need_pause = false;
+ pa_cond_signal(u->msg_cond, false);
+ }
+ }
+
+ sched_yield();
+
+ usleep(10*1000);
+ }
+
+ pa_log_debug("MSG Thread shutting down");
+}
+
+int pa_acm_init(pa_module *m, const char* const v_modargs[]) {
+ struct userdata *u;
+ pa_sample_spec ss;
+ pa_channel_map map;
+ uint32_t write_block_size;
+ pa_modargs *ma;
+ pa_sink_new_data data;
+ char st[PA_SAMPLE_SPEC_SNPRINT_MAX];
+ char cm[PA_CHANNEL_MAP_SNPRINT_MAX];
+
+ pa_assert(m);
+
+ if (!(ma = pa_modargs_new(m->argument, v_modargs))) {
+ pa_log("Failed to parse module arguments.");
+ goto fail;
+ }
+
+ ss = m->core->default_sample_spec;
+ map = m->core->default_channel_map;
+ if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
+ pa_log("Invalid sample format specification or channel map");
+ goto fail;
+ }
+ pa_log_info("sample spec: %s, channel map: %s",
+ pa_sample_spec_snprint(st, sizeof(st), &ss),
+ pa_channel_map_snprint(cm, sizeof(cm), &map));
+
+ write_block_size = DEFAULT_WRITE_BLOCK_SIZE;
+ if (pa_modargs_get_value_u32(ma, "write_block_size", &write_block_size) < 0) {
+ pa_log("Failed to parse write_block_size argument");
+ goto fail;
+ }
+ pa_log_info("write_block_size: %d", write_block_size);
+
+ u = pa_xnew0(struct userdata, 1);
+ u->msg_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "msg_socket", DEFAULT_ACM_EVENT_SOCKET_PATH));
+ u->data_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "data_socket", DEFAULT_ACM_DATA_SOCKET_PATH));
+ u->core = m->core;
+ u->module = m;
+ m->userdata = u;
+
+ if (initialize_IPC(u)) {
+ pa_log("Failed to initialize IPC");
+ goto fail;
+ }
+
+ u->rtpoll = pa_rtpoll_new();
+ if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
+ pa_log("pa_thread_mq_init() failed.");
+ goto fail;
+ }
+
+ pa_sink_new_data_init(&data);
+ data.driver = __FILE__;
+ data.module = m;
+ pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
+ pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Send PCM data to ACM core via socket fd");
+ pa_proplist_sets(data.proplist, PA_PROP_DEVICE_API, "acm");
+ pa_sink_new_data_set_sample_spec(&data, &ss);
+ pa_sink_new_data_set_channel_map(&data, &map);
+
+ u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
+
+ pa_sink_new_data_done(&data);
+
+ if (!u->sink) {
+ pa_log("Failed to create sink.");
+ goto fail;
+ }
+
+ u->sink->parent.process_msg = sink_process_msg;
+ u->sink->userdata = u;
+
+ pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
+ pa_sink_set_rtpoll(u->sink, u->rtpoll);
+
+ if (!u->msg_mutex)
+ u->msg_mutex = pa_mutex_new(false, false);
+
+ if (!u->msg_cond)
+ u->msg_cond = pa_cond_new();
+
+ if (!(u->thread = pa_thread_new("acm-sink", thread_func, u))) {
+ pa_log("Failed to create thread");
+ goto fail;
+ }
+
+ if (!(u->msg_thread = pa_thread_new("acm-msg-thread", msg_thread_func, u))) {
+ pa_log("Failed to create message thread");
+ goto fail;
+ }
+
+ u->write_block_size = pa_frame_align(write_block_size, &u->sink->sample_spec);
+ pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->write_block_size, &u->sink->sample_spec));
+ pa_sink_set_max_request(u->sink, u->write_block_size);
+
+ pa_sink_put(u->sink);
+
+ pa_modargs_free(ma);
+
+ return 0;
+
+fail:
+ pa_acm_done(m);
+
+ if (ma)
+ pa_modargs_free(ma);
+
+ return -1;
+}
+
+void pa_acm_done(pa_module *m) {
+ struct userdata* u;
+
+ pa_assert(m);
+
+ pa_log("pa_acm_done()");
+
+ if (!(u = m->userdata))
+ return;
+
+ if (u->sink)
+ pa_sink_unlink(u->sink);
+
+ if (u->thread) {
+ pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
+ pa_thread_free(u->thread);
+ }
+
+ if (u->msg_thread) {
+ u->exit_msg_thread = true;
+ pa_thread_free(u->msg_thread);
+ }
+
+ pa_thread_mq_done(&u->thread_mq);
+
+ if (u->sink)
+ pa_sink_unref(u->sink);
+
+ if (u->rtpoll)
+ pa_rtpoll_free(u->rtpoll);
+
+ deinitialize_IPC(u);
+
+ if (u->msg_mutex) {
+ pa_mutex_free(u->msg_mutex);
+ u->msg_mutex = NULL;
+ }
+
+ if (u->msg_cond) {
+ pa_cond_free(u->msg_cond);
+ u->msg_cond = NULL;
+ }
+
+ pa_xfree(u->msg_socket_path);
+ pa_xfree(u->data_socket_path);
+ pa_xfree(u);
+}