2 This file is part of PulseAudio.
4 Copyright 2019 Sangchul Lee <sc11.lee@samsung.com>
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
29 #include <sys/socket.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/timeval.h>
36 #include <pulse/rtclock.h>
38 #include <pulsecore/core-error.h>
39 #include <pulsecore/sink.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/thread.h>
45 #include <pulsecore/thread-mq.h>
46 #include <pulsecore/rtpoll.h>
47 #include <pulsecore/mutex.h>
51 #define DEFAULT_SINK_NAME "ACM Sink"
52 #define DEFAULT_WRITE_BLOCK_SIZE 1024
53 #define DEFAULT_ACM_EVENT_SOCKET_PATH "/tmp/.acm_event_socket"
54 #define DEFAULT_ACM_DATA_SOCKET_PATH "/tmp/.acm_data_socket"
57 #define IPC_MAX_MSG_LEN 256
58 #define SOCKET_ENOUGH_DATA_SIZE 3072
60 #define MSG_DATA_ENOUGH "data_enough"
61 #define MSG_DRAIN_REQUEST "drain_request"
62 #define MSG_DRAIN_COMPLETE "drain_complete"
63 #define MSG_FLUSH_REQUEST "flush_request"
64 #define MSG_SEND_PAUSE "acm_reader_queue_full"
65 #define MSG_SEND_RESUME "acm_reader_queue_need_data"
67 #define MSG_WAIT_TIMEOUT 1000 /* msec */
78 pa_thread_mq thread_mq;
83 size_t write_block_size;
85 pa_thread *msg_thread;
87 char *data_socket_path;
88 char *msg_socket_path;
91 bool enough_data_sent;
92 int initial_sent_size;
103 static int send_data(struct userdata *u, pa_memchunk *chunk);
104 static void ipc_request_drain(struct userdata *u);
108 #define DUMP_PATH_PREFIX "/tmp/pcm"
109 static void dump_open(struct userdata *u) {
111 struct timeval cur_time;
118 pa_gettimeofday(&cur_time);
119 localtime_r(&cur_time.tv_sec, &tm);
120 memset(&date_time[0], 0x00, sizeof(date_time));
121 strftime(&date_time[0], sizeof(date_time), "%H%M%S", &tm);
123 dump_time = pa_sprintf_malloc("%s.%03ld", &date_time[0], cur_time.tv_usec / 1000);
124 dump_path = pa_sprintf_malloc("%s_%s_%uHz_%uch_%s.raw", DUMP_PATH_PREFIX,
125 pa_sample_format_to_string(u->sink->sample_spec.format),
126 u->sink->sample_spec.rate, u->sink->sample_spec.channels,
127 pa_strempty(dump_time));
132 u->dump_fp = fopen(dump_path, "w");
134 pa_log_info("%s opened", dump_path);
136 pa_log_warn("%s open failed", dump_path);
142 static void dump_write(struct userdata *u, pa_memchunk *chunk) {
148 if ((ptr = pa_memblock_acquire(chunk->memblock))) {
149 fwrite((uint8_t *)ptr + chunk->index, 1, chunk->length, u->dump_fp);
150 pa_log_info("ptr(%p), chunk->index(%zu), write data of index %p, length %zu",
151 (uint8_t *)ptr, chunk->index, (uint8_t *)ptr + chunk->index, chunk->length);
152 pa_memblock_release(chunk->memblock);
157 static void dump_close(struct userdata *u) {
163 pa_log_info("dump closed");
168 static void msg_cond_wait(struct userdata *u) {
171 pa_mutex_lock(u->msg_mutex);
172 if (pa_cond_timedwait(u->msg_cond, u->msg_mutex, MSG_WAIT_TIMEOUT))
173 pa_log_error("msg cond wait failed, timeout..");
175 pa_log_info("msg cond wakeup");
176 pa_mutex_unlock(u->msg_mutex);
179 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
180 struct userdata *u = PA_SINK(o)->userdata;
184 case PA_SINK_MESSAGE_SET_STATE: {
185 pa_sink_state_t new_state = PA_PTR_TO_UINT(data);
187 if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
188 if (new_state == PA_SINK_RUNNING || new_state == PA_SINK_IDLE)
189 u->timestamp = pa_rtclock_now();
192 if (new_state == PA_SINK_RUNNING)
197 if (pa_sink_get_state(u->sink) == PA_SINK_RUNNING) {
198 if (new_state != PA_SINK_RUNNING)
199 ipc_request_drain(u);
204 case PA_SINK_MESSAGE_GET_LATENCY: {
207 now = pa_rtclock_now();
208 *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
213 return pa_sink_process_msg(o, code, data, offset, chunk);
216 static void process_rewind(struct userdata *u) {
217 /* Rewind not supported */
218 pa_sink_process_rewind(u->sink, 0);
226 static int ipc_get_client_fd(struct userdata *u, ipc_channel_t channel)
228 struct sockaddr_un address;
229 int len, ret = IPC_ERR;
232 unsigned int n_opt_len = sizeof (n_opt_val);
233 char *socket_path = u->msg_socket_path;
236 if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
237 pa_log_error("socket failure: %s", pa_cstrerror(errno));
241 if (channel == IPC_CHANNEL_DATA) {
243 ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, n_opt_len);
245 pa_log_error("unable to setsockopt SO_SNDBUF socket fd %d: %s", sockfd, pa_cstrerror(errno));
250 socket_path = u->data_socket_path;
252 ret = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, &n_opt_len);
254 pa_log_error("unable to getsockopt SO_SNDBUF socket fd %d: %s", sockfd, pa_cstrerror(errno));
258 pa_log_info("sockfd: %d, socket send buffer size: %d", sockfd, n_opt_val);
260 if (fcntl(sockfd, F_SETFD, FD_CLOEXEC) < 0) {
261 pa_log_error("unable to set on ctrls socket fd %d: %s", sockfd, pa_cstrerror(errno));
266 if (channel == IPC_CHANNEL_MSG) {
267 int flag = fcntl(sockfd, F_GETFL, 0);
269 pa_log_error("unable to get file status on socket fd %d: %s", sockfd, pa_cstrerror(errno));
274 if (fcntl(sockfd, F_SETFL, flag | O_NONBLOCK) != 0) {
275 pa_log_error("unable to set file status on socket fd %d: %s", sockfd, pa_cstrerror(errno));
281 memset(&address, 0, sizeof(address));
282 address.sun_family = AF_UNIX;
283 strncpy(address.sun_path, socket_path, sizeof(address.sun_path) - 1);
284 len = sizeof(address);
286 if ((ret = connect(sockfd, (struct sockaddr *)&address, len)) < 0) {
287 pa_log_error("connect failure: %s, path: %s", pa_cstrerror(errno), socket_path);
292 pa_log_debug("connected well, fd[%d] for [%s]", sockfd, socket_path);
297 static int ipc_push_data(struct userdata *u, void *data, int size) {
305 if ((sent = send(u->data_fd, data, size, 0)) < 0) {
306 pa_log_error("[fd:%d] fail to send data: %s", u->data_fd, pa_cstrerror(errno));
310 if (!u->enough_data_sent) {
311 u->initial_sent_size += sent;
312 if (u->initial_sent_size > SOCKET_ENOUGH_DATA_SIZE ) {
313 int msg_sent_len = 0;
314 pa_log_info("send data enough message to ipc server");
315 if ((msg_sent_len = send(u->msg_fd, MSG_DATA_ENOUGH, strlen(MSG_DATA_ENOUGH), 0)) < 0)
316 pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DATA_ENOUGH, msg_sent_len);
317 u->enough_data_sent = true;
318 u->initial_sent_size = 0;
324 pa_log_debug("sent length: %d", sent);
330 static void ipc_request_drain(struct userdata *u) {
333 if (u->msg_fd >= 0 && u->enough_data_sent) {
334 int msg_sent_len = 0;
335 if ((msg_sent_len = send(u->msg_fd, MSG_DRAIN_REQUEST, strlen(MSG_DRAIN_REQUEST), 0)) < 0)
336 pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DRAIN_REQUEST, msg_sent_len);
338 u->enough_data_sent = false;
342 static int initialize_ipc(struct userdata *u) {
348 u->msg_fd = ipc_get_client_fd(u, IPC_CHANNEL_MSG);
350 pa_log_warn("msg_fd is not connected. try again(%d)", retry);
354 pa_log_debug("got fd for message %d", u->msg_fd);
358 pa_log_error("failed to get msg_fd");
364 u->data_fd = ipc_get_client_fd(u, IPC_CHANNEL_DATA);
365 if (u->data_fd < 0) {
366 pa_log_warn("data_fd is not connected. try again(%d)", retry);
370 pa_log_debug("got fd for data %d", u->data_fd);
373 if (u->data_fd < 0) {
374 pa_log_error("failed to get data_fd");
378 u->enough_data_sent = false;
379 u->initial_sent_size = 0;
384 static void deinitialize_ipc(struct userdata *u) {
387 ipc_request_drain(u);
389 u->initial_sent_size = 0;
391 if (u->data_fd >= 0) {
396 if (u->msg_fd >= 0) {
402 static int send_data(struct userdata *u, pa_memchunk *chunk) {
405 size_t total_size = 0;
411 p = pa_memblock_acquire(chunk->memblock);
414 index = chunk->index;
415 length = chunk->length;
421 pa_log_info("msg cond wait for resume");
425 sent = ipc_push_data(u, p + index, u->write_block_size);
428 if (errno == EINTR || errno == EAGAIN) {
429 pa_log_warn("Failed to write data to fd: %s", pa_cstrerror(errno));
432 pa_log_error("Failed to write data to fd: %s", pa_cstrerror(errno));
435 index += (size_t) sent;
436 if (length <= sent) {
439 pa_log_debug("Wrote %zu bytes.", total_size);
443 length -= (size_t) sent;
444 total_size += (size_t) sent;
445 if (length < u->write_block_size) {
447 pa_log_warn("Unexpected write, remained %zu, skip it", length);
449 chunk->index = index;
450 chunk->length = length;
457 pa_memblock_release(chunk->memblock);
462 static void process_render(struct userdata *u, pa_usec_t now) {
467 while (u->timestamp < now) {
470 pa_sink_render_full(u->sink, u->sink->thread_info.max_request, &chunk);
473 dump_write(u, &chunk);
475 if (!send_data(u, &chunk)) {
476 u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
480 pa_memblock_unref(chunk.memblock);
482 if (ate >= u->sink->thread_info.max_request)
486 pa_log_debug("end of while loop, timestamp[%" PRIu64 "]", u->timestamp);
490 static void thread_func(void *userdata) {
491 struct userdata *u = userdata;
495 pa_log_debug("Thread starting up");
497 pa_thread_mq_install(&u->thread_mq);
499 u->timestamp = pa_rtclock_now();
505 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
506 now = pa_rtclock_now();
508 pa_log_debug("now[%" PRIu64 "]", now);
512 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
515 if (PA_SINK_IS_RUNNING(u->sink->thread_info.state)) {
517 pa_log_debug("timestamp[%" PRIu64 "], now[%" PRIu64 "]", u->timestamp, now);
519 if (u->timestamp <= now)
520 process_render(u, now);
522 pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
524 pa_rtpoll_set_timer_disabled(u->rtpoll);
527 /* Hmm, nothing to do. Let's sleep */
528 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
536 /* If this was no regular exit from the loop we have to continue
537 * processing messages until we received PA_MESSAGE_SHUTDOWN */
538 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
539 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
542 pa_log_debug("Thread shutting down");
545 static void msg_thread_func(void *userdata) {
546 struct userdata *u = userdata;
547 char recv_msg[IPC_MAX_MSG_LEN];
552 pa_log_debug("MSG thread starting up");
554 while (!u->exit_msg_thread) {
560 if ((recv_len = recv(u->msg_fd, recv_msg , IPC_MAX_MSG_LEN, 0)) > 0) {
561 if (!strcmp(recv_msg, MSG_DRAIN_COMPLETE)) {
562 pa_log_debug("got drain complete signal [%s]", recv_msg);
563 pa_cond_signal(u->msg_cond, false);
564 } else if (!strcmp(recv_msg, MSG_SEND_PAUSE)) {
565 pa_log_debug("got pause signal [%s]", recv_msg);
566 u->need_pause = true;
567 } else if (!strcmp(recv_msg, MSG_SEND_RESUME) && u->need_pause) {
568 pa_log_debug("got resume signal [%s]", recv_msg);
569 u->need_pause = false;
570 pa_cond_signal(u->msg_cond, false);
579 pa_log_debug("MSG Thread shutting down");
582 int pa_acm_init(pa_module *m, const char* const v_modargs[]) {
586 uint32_t write_block_size;
588 pa_sink_new_data data;
589 char st[PA_SAMPLE_SPEC_SNPRINT_MAX];
590 char cm[PA_CHANNEL_MAP_SNPRINT_MAX];
594 if (!(ma = pa_modargs_new(m->argument, v_modargs))) {
595 pa_log("Failed to parse module arguments.");
599 ss = m->core->default_sample_spec;
600 map = m->core->default_channel_map;
601 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
602 pa_log("Invalid sample format specification or channel map");
605 pa_log_info("sample spec: %s, channel map: %s",
606 pa_sample_spec_snprint(st, sizeof(st), &ss),
607 pa_channel_map_snprint(cm, sizeof(cm), &map));
609 write_block_size = DEFAULT_WRITE_BLOCK_SIZE;
610 if (pa_modargs_get_value_u32(ma, "write_block_size", &write_block_size) < 0) {
611 pa_log("Failed to parse write_block_size argument");
614 pa_log_info("write_block_size: %d", write_block_size);
616 u = pa_xnew0(struct userdata, 1);
617 u->msg_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "msg_socket", DEFAULT_ACM_EVENT_SOCKET_PATH));
618 u->data_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "data_socket", DEFAULT_ACM_DATA_SOCKET_PATH));
623 if (initialize_ipc(u)) {
624 pa_log("Failed to initialize IPC");
628 u->rtpoll = pa_rtpoll_new();
629 if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
630 pa_log("pa_thread_mq_init() failed.");
634 pa_sink_new_data_init(&data);
635 data.driver = __FILE__;
637 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
638 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Send PCM data to ACM core via socket fd");
639 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_API, "acm");
640 pa_sink_new_data_set_sample_spec(&data, &ss);
641 pa_sink_new_data_set_channel_map(&data, &map);
643 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
645 pa_sink_new_data_done(&data);
648 pa_log("Failed to create sink.");
652 u->sink->parent.process_msg = sink_process_msg;
653 u->sink->userdata = u;
655 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
656 pa_sink_set_rtpoll(u->sink, u->rtpoll);
659 u->msg_mutex = pa_mutex_new(false, false);
662 u->msg_cond = pa_cond_new();
664 if (!(u->thread = pa_thread_new("acm-sink", thread_func, u))) {
665 pa_log("Failed to create thread");
669 if (!(u->msg_thread = pa_thread_new("acm-msg-thread", msg_thread_func, u))) {
670 pa_log("Failed to create message thread");
674 u->write_block_size = pa_frame_align(write_block_size, &u->sink->sample_spec);
675 pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->write_block_size, &u->sink->sample_spec));
676 pa_sink_set_max_request(u->sink, u->write_block_size);
678 pa_sink_put(u->sink);
693 void pa_acm_done(pa_module *m) {
698 pa_log("pa_acm_done()");
700 if (!(u = m->userdata))
704 pa_sink_unlink(u->sink);
707 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
708 pa_thread_free(u->thread);
712 u->exit_msg_thread = true;
713 pa_thread_free(u->msg_thread);
716 pa_thread_mq_done(&u->thread_mq);
719 pa_sink_unref(u->sink);
722 pa_rtpoll_free(u->rtpoll);
727 pa_mutex_free(u->msg_mutex);
732 pa_cond_free(u->msg_cond);
736 pa_xfree(u->msg_socket_path);
737 pa_xfree(u->data_socket_path);