acm-sink: Add new sink module for ACM service 37/206937/7 accepted/tizen/unified/20190610.082555 submit/tizen/20190606.233237
authorSangchul Lee <sc11.lee@samsung.com>
Tue, 14 May 2019 05:13:50 +0000 (14:13 +0900)
committerSangchul Lee <sc11.lee@samsung.com>
Wed, 5 Jun 2019 07:31:26 +0000 (16:31 +0900)
It is for Tizen speaker profile.
pulseaudio-module-acm.rpm is newly added.

[Version] 11.1.46
[Issue type] New feature

Change-Id: If3ef8993014f61e61ed65ba8dc658418e08af169
Signed-off-by: Sangchul Lee <sc11.lee@samsung.com>
Makefile.am
configure.ac
packaging/pulseaudio-modules-tizen.spec
src/acm.c [new file with mode: 0644]
src/acm.h [new file with mode: 0644]
src/module-acm-sink.c [new file with mode: 0644]
src/module-tizenaudio-policy.c

index aa0a418..424938d 100644 (file)
@@ -43,6 +43,11 @@ if ENABLE_HALTC
 pulsemodlibexec_LTLIBRARIES += \
                module-tizenaudio-haltc.la
 endif
+if ENABLE_ACM
+pulsemodlibexec_LTLIBRARIES += \
+               module-acm-sink.la
+endif
+
 
 # These are generated by an M4 script
 SYMDEF_FILES = \
@@ -59,6 +64,12 @@ SYMDEF_FILES += \
                module-tizenaudio-haltc-symdef.h
 endif
 
+if ENABLE_ACM
+SYMDEF_FILES += \
+               module-acm-sink-symdef.h
+endif
+
+
 if ENABLE_VCONF_HELPER
 SYMDEF_FILES += \
                module-vconf-symdef.h
@@ -135,6 +146,13 @@ module_poweroff_la_LDFLAGS = $(MODULE_LDFLAGS)
 module_poweroff_la_LIBADD = $(MODULE_LIBADD) $(DBUS_LIBS) libhal-interface.la
 module_poweroff_la_CFLAGS = $(MODULE_CFLAGS) $(DBUS_CFLAGS)
 
+if ENABLE_ACM
+module_acm_sink_la_SOURCES = src/module-acm-sink.c   src/acm.c   src/acm.h
+module_acm_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
+module_acm_sink_la_LIBADD = $(MODULE_LIBADD)
+module_acm_sink_la_CFLAGS = $(MODULE_CFLAGS)
+endif
+
 if ENABLE_VCONF_HELPER
 pulsemodlibexec_LTLIBRARIES += module-vconf.la
 
index f578a47..f38d675 100644 (file)
@@ -373,6 +373,18 @@ AC_ARG_ENABLE(haltc, AC_HELP_STRING([--enable-haltc], [using haltc]),
 AM_CONDITIONAL(ENABLE_HALTC, test "x$ENABLE_HALTC" = "xyes")
 dnl end --------------------------------------------------------------------
 
+dnl use acm ----------------------------------------------------------------
+AC_ARG_ENABLE(acm, AC_HELP_STRING([--enable-acm], [using acm]),
+[
+ case "${enableval}" in
+        yes) ENABLE_ACM=yes ;;
+        no)  ENABLE_ACM=no ;;
+        *)   AC_MSG_ERROR(bad value ${enableval} for --enable-acm) ;;
+ esac
+ ],[USE_ACM=no])
+AM_CONDITIONAL(ENABLE_ACM, test "x$ENABLE_ACM" = "xyes")
+dnl end --------------------------------------------------------------------
+
 #### D-Bus support (optional) ####
 
 AC_ARG_ENABLE([dbus],
index f06556f..1d8ca37 100644 (file)
@@ -1,6 +1,6 @@
 Name:             pulseaudio-modules-tizen
 Summary:          Pulseaudio modules for Tizen
-Version:          11.1.45
+Version:          11.1.46
 Release:          0
 Group:            Multimedia/Audio
 License:          LGPL-2.1+
@@ -28,6 +28,13 @@ Requires(postun): /sbin/ldconfig
 %description
 This package contains pulseaudio modules for tizen audio system.
 
+%package -n pulseaudio-module-acm
+Summary: PA module-acm-sink
+Group:   Multimedia/Audio
+
+%description -n pulseaudio-module-acm
+PulseAudio module-acm-sink for sending PCM data to ACM core.
+
 %prep
 %setup -q
 
@@ -37,6 +44,7 @@ export CFLAGS="%{optflags} -fno-strict-aliasing -D__TIZEN__ -DSYSCONFDIR=\\\"%{_
 export LD_AS_NEEDED=0
 %reconfigure --prefix=%{_prefix} \
         --disable-static \
+        --enable-acm
 %if "%{tizen_profile_name}" == "tv"
         --enable-vconf-helper
 %endif
@@ -57,10 +65,15 @@ install -m 0644 %SOURCE1 %{buildroot}%{_tmpfilesdir}/pulseaudio.conf
 /sbin/ldconfig
 
 %files
-%manifest pulseaudio-modules-tizen.manifest
+%manifest %{name}.manifest
 %defattr(-,root,root,-)
 %license LICENSE.LGPL-2.1+
-%{_libdir}/pulse-11.1/modules/module-*.so
+%{_libdir}/pulse-11.1/modules/module-hw-keysound.so
+%{_libdir}/pulse-11.1/modules/module-poweroff.so
+%{_libdir}/pulse-11.1/modules/module-sound-player.so
+%{_libdir}/pulse-11.1/modules/module-tizenaudio-policy.so
+%{_libdir}/pulse-11.1/modules/module-tizenaudio-sink.so
+%{_libdir}/pulse-11.1/modules/module-tizenaudio-source.so
 %{_libdir}/pulse-11.1/modules/libhal-interface.so
 %{_libdir}/pulse-11.1/modules/libcommunicator.so
 %{_tmpfilesdir}/pulseaudio.conf
@@ -69,3 +82,10 @@ install -m 0644 %SOURCE1 %{buildroot}%{_tmpfilesdir}/pulseaudio.conf
 %{_libexecdir}/pulse/vconf-helper
 %endif
 %{_libdir}/ladspa/*.so
+
+%files -n pulseaudio-module-acm
+%manifest %{name}.manifest
+%defattr(-,root,root,-)
+%license LICENSE.LGPL-2.1+
+%{_libdir}/pulse-11.1/modules/module-acm-sink.so
+
diff --git a/src/acm.c b/src/acm.c
new file mode 100644 (file)
index 0000000..f6023f2
--- /dev/null
+++ b/src/acm.c
@@ -0,0 +1,709 @@
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2019 Sangchul Lee <sc11.lee@samsung.com>
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sched.h>
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
+#include <pulse/rtclock.h>
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/sink.h>
+#include <pulsecore/module.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/modargs.h>
+#include <pulsecore/log.h>
+#include <pulsecore/thread.h>
+#include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
+#include <pulsecore/mutex.h>
+
+#include "acm.h"
+
+#define DEFAULT_SINK_NAME "ACM Sink"
+#define DEFAULT_WRITE_BLOCK_SIZE 1024
+#define DEFAULT_ACM_EVENT_SOCKET_PATH "/tmp/.acm_event_socket"
+#define DEFAULT_ACM_DATA_SOCKET_PATH "/tmp/.acm_data_socket"
+
+#define IPC_ERR -1
+#define IPC_MAX_MSG_LEN 256
+#define SOCKET_ENOUGH_DATA_SIZE 3072
+
+#define MSG_DATA_ENOUGH "data_enough"
+#define MSG_DRAIN_REQUEST "drain_request"
+#define MSG_DRAIN_COMPLETE "drain_complete"
+#define MSG_FLUSH_REQUEST "flush_request"
+#define MSG_SEND_PAUSE "acm_reader_queue_full"
+#define MSG_SEND_RESUME "acm_reader_queue_need_data"
+
+#define MSG_WAIT_TIMEOUT 1000  /* msec */
+
+//#define DUMP_PCM
+//#define DEBUG_LOG
+
+struct userdata {
+    pa_core *core;
+    pa_module *module;
+    pa_sink *sink;
+
+    pa_thread *thread;
+    pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
+
+    pa_usec_t timestamp;
+
+    size_t write_block_size;
+
+    pa_thread *msg_thread;
+    bool exit_msg_thread;
+    char *data_socket_path;
+    char *msg_socket_path;
+    int data_fd;
+    int msg_fd;
+    bool msg_sent;
+    int initial_sent_size;
+
+    pa_mutex *msg_mutex;
+    pa_cond *msg_cond;
+    bool need_pause;
+
+#ifdef DUMP_PCM
+    FILE *dump_fp;
+#endif
+};
+
+static int initialize_IPC(struct userdata *u);
+static void deinitialize_IPC(struct userdata *u);
+static int send_data(struct userdata *u, pa_memchunk *chunk);
+
+#ifdef DUMP_PCM
+#include <time.h>
+#define DUMP_PATH_PREFIX "/tmp/pcm"
+static void dump_open(struct userdata *u) {
+    char date_time[7];
+    struct timeval cur_time;
+    struct tm tm;
+    char *dump_time;
+    char *dump_path;
+
+    pa_assert(u);
+
+    pa_gettimeofday(&cur_time);
+    localtime_r(&cur_time.tv_sec, &tm);
+    memset(&date_time[0], 0x00, sizeof(date_time));
+    strftime(&date_time[0], sizeof(date_time), "%H%M%S", &tm);
+
+    dump_time = pa_sprintf_malloc("%s.%03ld", &date_time[0], cur_time.tv_usec / 1000);
+    dump_path = pa_sprintf_malloc("%s_%s_%uHz_%uch_%s.raw", DUMP_PATH_PREFIX,
+                                                            pa_sample_format_to_string(u->sink->sample_spec.format),
+                                                            u->sink->sample_spec.rate, u->sink->sample_spec.channels,
+                                                            pa_strempty(dump_time));
+
+    if (u->dump_fp)
+        fclose(u->dump_fp);
+
+    u->dump_fp = fopen(dump_path, "w");
+    if (u->dump_fp)
+        pa_log_info("%s opened",  dump_path);
+    else
+        pa_log_warn("%s open failed", dump_path);
+
+    pa_xfree(dump_time);
+    pa_xfree(dump_path);
+}
+
+static void dump_write(struct userdata *u, pa_memchunk *chunk) {
+    pa_assert(u);
+    pa_assert(chunk);
+
+    if (u->dump_fp) {
+        void *ptr = NULL;
+        if ((ptr = pa_memblock_acquire(chunk->memblock))) {
+            fwrite((uint8_t *)ptr + chunk->index, 1, chunk->length, u->dump_fp);
+            pa_log_info("ptr(%p), chunk->index(%zu), write data of index %p, length %zu",
+                (uint8_t *)ptr, chunk->index, (uint8_t *)ptr + chunk->index, chunk->length);
+            pa_memblock_release(chunk->memblock);
+        }
+    }
+}
+
+static void dump_close(struct userdata *u) {
+    pa_assert(u);
+
+    if (u->dump_fp) {
+        fclose(u->dump_fp);
+        u->dump_fp = NULL;
+        pa_log_info("dump closed");
+    }
+}
+#endif
+
+static void msg_cond_wait(struct userdata *u) {
+    pa_assert(u);
+
+    pa_mutex_lock(u->msg_mutex);
+    if (pa_cond_timedwait(u->msg_cond, u->msg_mutex, MSG_WAIT_TIMEOUT))
+        pa_log_error("msg cond wait failed, timeout..");
+    else
+        pa_log_info("msg cond wakeup");
+    pa_mutex_unlock(u->msg_mutex);
+}
+
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+    struct userdata *u = PA_SINK(o)->userdata;
+
+    switch (code) {
+
+        case PA_SINK_MESSAGE_SET_STATE:
+            if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
+                if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING || PA_PTR_TO_UINT(data) == PA_SINK_IDLE)
+                    u->timestamp = pa_rtclock_now();
+            }
+#ifdef DUMP_PCM
+            if  (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING)
+                dump_open(u);
+            else
+                dump_close(u);
+#endif
+            break;
+
+        case PA_SINK_MESSAGE_GET_LATENCY: {
+            pa_usec_t now;
+
+            now = pa_rtclock_now();
+            *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
+            return 0;
+        }
+    }
+
+    return pa_sink_process_msg(o, code, data, offset, chunk);
+}
+
+static void process_rewind(struct userdata *u) {
+    /* Rewind not supported */
+    pa_sink_process_rewind(u->sink, 0);
+}
+
+typedef enum {
+    IPC_CHANNEL_MSG,
+    IPC_CHANNEL_DATA
+} ipc_channel_t;
+
+static int ipc_get_client_fd(struct userdata *u, ipc_channel_t channel)
+{
+    struct sockaddr_un address;
+    int len, ret = IPC_ERR;
+    int sockfd;
+    int n_opt_val;
+    unsigned int n_opt_len = sizeof (n_opt_val);
+    char *socket_path = u->msg_socket_path;
+
+    /*Create socket*/
+    if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+        pa_log_error("socket failure: %s",  pa_cstrerror(errno));
+        return -1;
+    }
+
+    if (channel == IPC_CHANNEL_DATA) {
+        n_opt_val = 19200;
+        setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, n_opt_len);
+        socket_path = u->data_socket_path;
+    }
+    getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, &n_opt_len);
+    pa_log_info("sockfd: %d,  socket send buffer size: %d", sockfd, n_opt_val);
+
+    if (fcntl(sockfd, F_SETFD, FD_CLOEXEC) < 0) {
+        pa_log_error("unable to set on ctrls socket fd %d: %s", sockfd, pa_cstrerror(errno));
+        close(sockfd);
+        return -1;
+    }
+
+    if (channel == IPC_CHANNEL_MSG) {
+        int flag = fcntl(sockfd, F_GETFL, 0);
+        fcntl(sockfd, F_SETFL, flag | O_NONBLOCK);
+    }
+
+    memset(&address, 0, sizeof(address));
+    address.sun_family = AF_UNIX;
+    strncpy(address.sun_path, socket_path, sizeof(address.sun_path));
+    len = sizeof(address);
+
+    if ((ret = connect(sockfd, (struct sockaddr *)&address, len)) < 0) {
+        pa_log_error("connect failure: %s, path: %s", pa_cstrerror(errno), socket_path);
+        close(sockfd);
+        return -1;
+    }
+
+    pa_log_debug("connected well, fd[%d] for [%s]", sockfd, socket_path);
+
+    return sockfd;
+}
+
+static int ipc_push_data(struct userdata *u, void *data, int size) {
+    int sent = 0;
+
+    pa_assert(u);
+
+    if (u->data_fd < 0)
+        return -1;
+
+    if ((sent = send(u->data_fd, data, size, 0)) < 0) {
+        pa_log_error("[fd:%d] fail to send data: %s", u->data_fd, pa_cstrerror(errno));
+        return -1;
+    }
+
+    if (!u->msg_sent) {
+        u->initial_sent_size += sent;
+        if (u->initial_sent_size > SOCKET_ENOUGH_DATA_SIZE ) {
+            int msg_sent_len = 0;
+            pa_log_info("send data enough message to ipc server");
+            if ((msg_sent_len = send(u->msg_fd, MSG_DATA_ENOUGH, strlen(MSG_DATA_ENOUGH), 0)) < 0)
+                pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DATA_ENOUGH, msg_sent_len);
+            u->msg_sent = true;
+            u->initial_sent_size = 0;
+        }
+    }
+
+#ifdef DEBUG_LOG
+    if (sent > 0)
+        pa_log_debug("sent length:  %d", sent);
+#endif
+
+    return sent;
+}
+
+static int initialize_IPC(struct userdata *u) {
+    int retry = 10;
+
+    pa_assert(u);
+
+    while (retry--) {
+        u->msg_fd = ipc_get_client_fd(u, IPC_CHANNEL_MSG);
+        if (u->msg_fd < 0) {
+            pa_log_warn("msg_fd is not connected. try again(%d)", retry);
+            usleep(100 * 1000);
+            continue;
+        }
+        pa_log_debug("got fd for message %d", u->msg_fd);
+        break;
+    }
+    if (u->msg_fd < 0) {
+        pa_log_error("failed to get msg_fd");
+        return -1;
+    }
+
+    retry = 10;
+    while (retry--) {
+        u->data_fd = ipc_get_client_fd(u, IPC_CHANNEL_DATA);
+        if (u->data_fd < 0) {
+            pa_log_warn("data_fd is not connected. try again(%d)", retry);
+            usleep(100 * 1000);
+            continue;
+        }
+        pa_log_debug("got fd for data %d", u->data_fd);
+        break;
+    }
+    if (u->data_fd < 0) {
+        pa_log_error("failed to get data_fd");
+        return -1;
+    }
+
+    u->msg_sent = false;
+    u->initial_sent_size = 0;
+
+    return 0;
+}
+
+static void deinitialize_IPC(struct userdata *u) {
+    pa_assert(u);
+
+    if (u->msg_fd >= 0) {
+        int msg_sent_len = 0;
+        if ((msg_sent_len = send(u->msg_fd, MSG_DRAIN_REQUEST, strlen(MSG_DRAIN_REQUEST), 0)) < 0) {
+            pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DRAIN_REQUEST, msg_sent_len);
+        } else {
+            pa_log_info("msg cond wait for drain completed");
+            msg_cond_wait(u);
+        }
+    }
+
+    u->msg_sent = false;
+    u->initial_sent_size = 0;
+
+    if (u->data_fd >= 0) {
+        close(u->data_fd);
+        u->data_fd = -1;
+    }
+
+    if (u->msg_fd >= 0) {
+        close(u->msg_fd);
+        u->msg_fd = -1;
+    }
+}
+
+static int send_data(struct userdata *u, pa_memchunk *chunk) {
+    size_t index = 0;
+    size_t length = 0;
+    size_t total_size = 0;
+    void *p;
+
+    pa_assert(u);
+    pa_assert(chunk);
+
+    p = pa_memblock_acquire(chunk->memblock);
+    pa_assert(p);
+
+    index = chunk->index;
+    length = chunk->length;
+
+    for (;;) {
+        ssize_t sent;
+
+        if (u->need_pause) {
+            pa_log_info("msg cond wait for resume");
+            msg_cond_wait(u);
+        }
+
+        sent = ipc_push_data(u, p + index, u->write_block_size);
+
+        if (sent < 0) {
+            if (errno == EINTR || errno == EAGAIN) {
+                pa_log_warn("Failed to write data to fd: %s", pa_cstrerror(errno));
+                continue;
+            }
+            pa_log_error("Failed to write data to fd: %s", pa_cstrerror(errno));
+            return -1;
+        } else {
+            index += (size_t) sent;
+            if (length <= sent) {
+                total_size += sent;
+#ifdef DEBUG_LOG
+                pa_log_debug("Wrote %zu bytes.", total_size);
+#endif
+                break;
+            }
+            length -= (size_t) sent;
+            total_size += (size_t) sent;
+            if (length < u->write_block_size) {
+#ifdef DEBUG_LOG
+                pa_log_warn("Unexpected write, remained %zu, skip it", length);
+#endif
+                chunk->index = index;
+                chunk->length = length;
+                goto finish;
+            }
+        }
+    }
+
+finish:
+    pa_memblock_release(chunk->memblock);
+
+    return 0;
+}
+
+static void process_render(struct userdata *u, pa_usec_t now) {
+    size_t ate = 0;
+
+    pa_assert(u);
+
+    while (u->timestamp < now) {
+        pa_memchunk chunk;
+
+        pa_sink_render_full(u->sink, u->sink->thread_info.max_request, &chunk);
+
+#ifdef DUMP_PCM
+        dump_write(u, &chunk);
+#endif
+        if (!send_data(u, &chunk)) {
+            u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
+            ate += chunk.length;
+        }
+
+        pa_memblock_unref(chunk.memblock);
+
+        if (ate >= u->sink->thread_info.max_request)
+            break;
+    }
+#ifdef DEBUG_LOG
+    pa_log_debug("end of while loop, timestamp[%" PRIu64 "]", u->timestamp);
+#endif
+}
+
+static void thread_func(void *userdata) {
+    struct userdata *u = userdata;
+
+    pa_assert(u);
+
+    pa_log_debug("Thread starting up");
+
+    pa_thread_mq_install(&u->thread_mq);
+
+    u->timestamp = pa_rtclock_now();
+
+    for (;;) {
+        pa_usec_t now = 0;
+        int ret;
+
+        if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
+            now = pa_rtclock_now();
+#ifdef DEBUG_LOG
+            pa_log_debug("now[%" PRIu64 "]", now);
+#endif
+        }
+
+        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+            process_rewind(u);
+
+        if (PA_SINK_IS_RUNNING(u->sink->thread_info.state)) {
+#ifdef DEBUG_LOG
+            pa_log_debug("timestamp[%" PRIu64 "], now[%" PRIu64 "]", u->timestamp, now);
+#endif
+            if (u->timestamp <= now)
+                process_render(u, now);
+
+            pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
+        } else {
+            pa_rtpoll_set_timer_disabled(u->rtpoll);
+        }
+
+        /* Hmm, nothing to do. Let's sleep */
+        if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
+            goto fail;
+
+        if (ret == 0)
+            goto finish;
+    }
+
+fail:
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
+    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
+    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
+
+finish:
+    pa_log_debug("Thread shutting down");
+}
+
+static void msg_thread_func(void *userdata) {
+    struct userdata *u = userdata;
+    char recv_msg[IPC_MAX_MSG_LEN];
+    int recv_len = 0;
+
+    pa_assert(u);
+
+    pa_log_debug("MSG thread starting up");
+
+    while (!u->exit_msg_thread) {
+        if (u->msg_fd < 0) {
+            usleep(10*1000);
+            continue;
+        }
+
+        if ((recv_len = recv(u->msg_fd, recv_msg , IPC_MAX_MSG_LEN, 0)) > 0) {
+            if (!strcmp(recv_msg, MSG_DRAIN_COMPLETE)) {
+                pa_log_debug("got drain complete signal [%s]", recv_msg);
+                pa_cond_signal(u->msg_cond, false);
+            } else if (!strcmp(recv_msg, MSG_SEND_PAUSE)) {
+                pa_log_debug("got pause signal [%s]", recv_msg);
+                u->need_pause = true;
+            } else if (!strcmp(recv_msg, MSG_SEND_RESUME) && u->need_pause) {
+                pa_log_debug("got resume signal [%s]", recv_msg);
+                u->need_pause = false;
+                pa_cond_signal(u->msg_cond, false);
+            }
+        }
+
+        sched_yield();
+
+        usleep(10*1000);
+    }
+
+    pa_log_debug("MSG Thread shutting down");
+}
+
+int pa_acm_init(pa_module *m, const char* const v_modargs[]) {
+    struct userdata *u;
+    pa_sample_spec ss;
+    pa_channel_map map;
+    uint32_t write_block_size;
+    pa_modargs *ma;
+    pa_sink_new_data data;
+    char st[PA_SAMPLE_SPEC_SNPRINT_MAX];
+    char cm[PA_CHANNEL_MAP_SNPRINT_MAX];
+
+    pa_assert(m);
+
+    if (!(ma = pa_modargs_new(m->argument, v_modargs))) {
+        pa_log("Failed to parse module arguments.");
+        goto fail;
+    }
+
+    ss = m->core->default_sample_spec;
+    map = m->core->default_channel_map;
+    if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
+        pa_log("Invalid sample format specification or channel map");
+        goto fail;
+    }
+    pa_log_info("sample spec: %s, channel map: %s",
+                pa_sample_spec_snprint(st, sizeof(st), &ss),
+                pa_channel_map_snprint(cm, sizeof(cm), &map));
+
+    write_block_size = DEFAULT_WRITE_BLOCK_SIZE;
+    if (pa_modargs_get_value_u32(ma, "write_block_size", &write_block_size) < 0) {
+        pa_log("Failed to parse write_block_size argument");
+        goto fail;
+    }
+    pa_log_info("write_block_size: %d", write_block_size);
+
+    u = pa_xnew0(struct userdata, 1);
+    u->msg_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "msg_socket", DEFAULT_ACM_EVENT_SOCKET_PATH));
+    u->data_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "data_socket", DEFAULT_ACM_DATA_SOCKET_PATH));
+    u->core = m->core;
+    u->module = m;
+    m->userdata = u;
+
+    if (initialize_IPC(u)) {
+        pa_log("Failed to initialize IPC");
+        goto fail;
+    }
+
+    u->rtpoll = pa_rtpoll_new();
+    if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
+        pa_log("pa_thread_mq_init() failed.");
+        goto fail;
+    }
+
+    pa_sink_new_data_init(&data);
+    data.driver = __FILE__;
+    data.module = m;
+    pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
+    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Send PCM data to ACM core via socket fd");
+    pa_proplist_sets(data.proplist, PA_PROP_DEVICE_API, "acm");
+    pa_sink_new_data_set_sample_spec(&data, &ss);
+    pa_sink_new_data_set_channel_map(&data, &map);
+
+    u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
+
+    pa_sink_new_data_done(&data);
+
+    if (!u->sink) {
+        pa_log("Failed to create sink.");
+        goto fail;
+    }
+
+    u->sink->parent.process_msg = sink_process_msg;
+    u->sink->userdata = u;
+
+    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
+    pa_sink_set_rtpoll(u->sink, u->rtpoll);
+
+    if (!u->msg_mutex)
+        u->msg_mutex = pa_mutex_new(false, false);
+
+    if (!u->msg_cond)
+        u->msg_cond = pa_cond_new();
+
+    if (!(u->thread = pa_thread_new("acm-sink", thread_func, u))) {
+        pa_log("Failed to create thread");
+        goto fail;
+    }
+
+    if (!(u->msg_thread = pa_thread_new("acm-msg-thread", msg_thread_func, u))) {
+        pa_log("Failed to create message thread");
+        goto fail;
+    }
+
+    u->write_block_size = pa_frame_align(write_block_size, &u->sink->sample_spec);
+    pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->write_block_size, &u->sink->sample_spec));
+    pa_sink_set_max_request(u->sink, u->write_block_size);
+
+    pa_sink_put(u->sink);
+
+    pa_modargs_free(ma);
+
+    return 0;
+
+fail:
+    pa_acm_done(m);
+
+    if (ma)
+        pa_modargs_free(ma);
+
+    return -1;
+}
+
+void pa_acm_done(pa_module *m) {
+    struct userdata* u;
+
+    pa_assert(m);
+
+    pa_log("pa_acm_done()");
+
+    if (!(u = m->userdata))
+        return;
+
+    if (u->sink)
+        pa_sink_unlink(u->sink);
+
+    if (u->thread) {
+        pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
+        pa_thread_free(u->thread);
+    }
+
+    if (u->msg_thread) {
+        u->exit_msg_thread = true;
+        pa_thread_free(u->msg_thread);
+    }
+
+    pa_thread_mq_done(&u->thread_mq);
+
+    if (u->sink)
+        pa_sink_unref(u->sink);
+
+    if (u->rtpoll)
+        pa_rtpoll_free(u->rtpoll);
+
+    deinitialize_IPC(u);
+
+    if (u->msg_mutex) {
+        pa_mutex_free(u->msg_mutex);
+        u->msg_mutex = NULL;
+    }
+
+    if (u->msg_cond) {
+        pa_cond_free(u->msg_cond);
+        u->msg_cond = NULL;
+    }
+
+    pa_xfree(u->msg_socket_path);
+    pa_xfree(u->data_socket_path);
+    pa_xfree(u);
+}
diff --git a/src/acm.h b/src/acm.h
new file mode 100644 (file)
index 0000000..2663937
--- /dev/null
+++ b/src/acm.h
@@ -0,0 +1,21 @@
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2019 Sangchul Lee <sc11.lee@samsung.com>
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+int pa_acm_init(pa_module *m, const char* const v_modargs[]);
+void pa_acm_done(pa_module *m);
diff --git a/src/module-acm-sink.c b/src/module-acm-sink.c
new file mode 100644 (file)
index 0000000..dfb3234
--- /dev/null
@@ -0,0 +1,67 @@
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2019 Sangchul Lee <sc11.lee@samsung.com>
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <pulsecore/macro.h>
+#include <pulsecore/core.h>
+#include "acm.h"
+
+#include "module-acm-sink-symdef.h"
+
+PA_MODULE_AUTHOR("Sangchul Lee");
+PA_MODULE_DESCRIPTION("ACM Sink");
+PA_MODULE_VERSION(PACKAGE_VERSION);
+PA_MODULE_LOAD_ONCE(true);
+PA_MODULE_USAGE(
+        "format=<sample format>"
+        "rate=<sample rate>"
+        "channels=<number of channels>"
+        "channel_map=<channel map>"
+        "msg_socket=<path of ACM message socket>"
+        "data_socket=<path of ACM data socket>"
+        "write_block_size=<block size to be written to ACM data fd>"
+);
+
+static const char* const valid_modargs[] = {
+    "format",
+    "rate",
+    "channels",
+    "channel_map",
+    "msg_socket",
+    "data_socket",
+    "write_block_size",
+    NULL
+};
+
+int pa__init(pa_module *m) {
+
+    pa_assert(m);
+
+    return pa_acm_init(m, valid_modargs);
+}
+
+void pa__done(pa_module *m) {
+
+    pa_assert(m);
+
+    pa_acm_done(m);
+}
index 10cfcce..58540c2 100644 (file)
@@ -1001,7 +1001,6 @@ static void route_change_rollback_streams(struct userdata *u, pa_stream_manager_
         return;
     if (!IS_AUTO_ROUTE_TYPE_SERIES(data->route_type))
         return;
-
     if (data->stream_type == STREAM_SINK_INPUT) {
         if (!(sink = pa_tz_device_get_sink(device, data->device_role)))
             return;