fixup! acm: Fix coverity defects
[platform/core/multimedia/pulseaudio-modules-tizen.git] / src / acm.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2019 Sangchul Lee <sc11.lee@samsung.com>
5
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <errno.h>
26 #include <unistd.h>
27 #include <sched.h>
28
29 #include <sys/socket.h>
30 #include <sys/un.h>
31 #include <errno.h>
32 #include <fcntl.h>
33
34 #include <pulse/xmalloc.h>
35 #include <pulse/timeval.h>
36 #include <pulse/rtclock.h>
37
38 #include <pulsecore/core-error.h>
39 #include <pulsecore/sink.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/thread.h>
45 #include <pulsecore/thread-mq.h>
46 #include <pulsecore/rtpoll.h>
47 #include <pulsecore/mutex.h>
48
49 #include "acm.h"
50
51 #define DEFAULT_SINK_NAME "ACM Sink"
52 #define DEFAULT_WRITE_BLOCK_SIZE 1024
53 #define DEFAULT_ACM_EVENT_SOCKET_PATH "/tmp/.acm_event_socket"
54 #define DEFAULT_ACM_DATA_SOCKET_PATH "/tmp/.acm_data_socket"
55
56 #define IPC_ERR -1
57 #define IPC_MAX_MSG_LEN 256
58 #define SOCKET_ENOUGH_DATA_SIZE 3072
59
60 #define MSG_DATA_ENOUGH "data_enough"
61 #define MSG_DRAIN_REQUEST "drain_request"
62 #define MSG_DRAIN_COMPLETE "drain_complete"
63 #define MSG_FLUSH_REQUEST "flush_request"
64 #define MSG_SEND_PAUSE "acm_reader_queue_full"
65 #define MSG_SEND_RESUME "acm_reader_queue_need_data"
66
67 #define MSG_WAIT_TIMEOUT 1000  /* msec */
68
69 //#define DUMP_PCM
70 //#define DEBUG_LOG
71
72 struct userdata {
73     pa_core *core;
74     pa_module *module;
75     pa_sink *sink;
76
77     pa_thread *thread;
78     pa_thread_mq thread_mq;
79     pa_rtpoll *rtpoll;
80
81     pa_usec_t timestamp;
82
83     size_t write_block_size;
84
85     pa_thread *msg_thread;
86     bool exit_msg_thread;
87     char *data_socket_path;
88     char *msg_socket_path;
89     int data_fd;
90     int msg_fd;
91     bool enough_data_sent;
92     int initial_sent_size;
93
94     pa_mutex *msg_mutex;
95     pa_cond *msg_cond;
96     bool need_pause;
97
98 #ifdef DUMP_PCM
99     FILE *dump_fp;
100 #endif
101 };
102
103 static int send_data(struct userdata *u, pa_memchunk *chunk);
104 static void ipc_request_drain(struct userdata *u);
105
106 #ifdef DUMP_PCM
107 #include <time.h>
108 #define DUMP_PATH_PREFIX "/tmp/pcm"
109 static void dump_open(struct userdata *u) {
110     char date_time[7];
111     struct timeval cur_time;
112     struct tm tm;
113     char *dump_time;
114     char *dump_path;
115
116     pa_assert(u);
117
118     pa_gettimeofday(&cur_time);
119     localtime_r(&cur_time.tv_sec, &tm);
120     memset(&date_time[0], 0x00, sizeof(date_time));
121     strftime(&date_time[0], sizeof(date_time), "%H%M%S", &tm);
122
123     dump_time = pa_sprintf_malloc("%s.%03ld", &date_time[0], cur_time.tv_usec / 1000);
124     dump_path = pa_sprintf_malloc("%s_%s_%uHz_%uch_%s.raw", DUMP_PATH_PREFIX,
125                                                             pa_sample_format_to_string(u->sink->sample_spec.format),
126                                                             u->sink->sample_spec.rate, u->sink->sample_spec.channels,
127                                                             pa_strempty(dump_time));
128
129     if (u->dump_fp)
130         fclose(u->dump_fp);
131
132     u->dump_fp = fopen(dump_path, "w");
133     if (u->dump_fp)
134         pa_log_info("%s opened",  dump_path);
135     else
136         pa_log_warn("%s open failed", dump_path);
137
138     pa_xfree(dump_time);
139     pa_xfree(dump_path);
140 }
141
142 static void dump_write(struct userdata *u, pa_memchunk *chunk) {
143     pa_assert(u);
144     pa_assert(chunk);
145
146     if (u->dump_fp) {
147         void *ptr = NULL;
148         if ((ptr = pa_memblock_acquire(chunk->memblock))) {
149             fwrite((uint8_t *)ptr + chunk->index, 1, chunk->length, u->dump_fp);
150             pa_log_info("ptr(%p), chunk->index(%zu), write data of index %p, length %zu",
151                 (uint8_t *)ptr, chunk->index, (uint8_t *)ptr + chunk->index, chunk->length);
152             pa_memblock_release(chunk->memblock);
153         }
154     }
155 }
156
157 static void dump_close(struct userdata *u) {
158     pa_assert(u);
159
160     if (u->dump_fp) {
161         fclose(u->dump_fp);
162         u->dump_fp = NULL;
163         pa_log_info("dump closed");
164     }
165 }
166 #endif
167
168 static void msg_cond_wait(struct userdata *u) {
169     pa_assert(u);
170
171     pa_mutex_lock(u->msg_mutex);
172     if (pa_cond_timedwait(u->msg_cond, u->msg_mutex, MSG_WAIT_TIMEOUT))
173         pa_log_error("msg cond wait failed, timeout..");
174     else
175         pa_log_info("msg cond wakeup");
176     pa_mutex_unlock(u->msg_mutex);
177 }
178
179 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
180     struct userdata *u = PA_SINK(o)->userdata;
181
182     switch (code) {
183
184         case PA_SINK_MESSAGE_SET_STATE: {
185             pa_sink_state_t new_state = PA_PTR_TO_UINT(data);
186
187             if (pa_sink_get_state(u->sink) == PA_SINK_SUSPENDED || pa_sink_get_state(u->sink) == PA_SINK_INIT) {
188                 if (new_state == PA_SINK_RUNNING || new_state == PA_SINK_IDLE)
189                     u->timestamp = pa_rtclock_now();
190             }
191 #ifdef DUMP_PCM
192             if  (new_state == PA_SINK_RUNNING)
193                 dump_open(u);
194             else
195                 dump_close(u);
196 #endif
197             if (pa_sink_get_state(u->sink) == PA_SINK_RUNNING) {
198                 if (new_state != PA_SINK_RUNNING)
199                     ipc_request_drain(u);
200             }
201             break;
202         }
203
204         case PA_SINK_MESSAGE_GET_LATENCY: {
205             pa_usec_t now;
206
207             now = pa_rtclock_now();
208             *((int64_t*) data) = (int64_t)u->timestamp - (int64_t)now;
209             return 0;
210         }
211     }
212
213     return pa_sink_process_msg(o, code, data, offset, chunk);
214 }
215
216 static void process_rewind(struct userdata *u) {
217     /* Rewind not supported */
218     pa_sink_process_rewind(u->sink, 0);
219 }
220
221 typedef enum {
222     IPC_CHANNEL_MSG,
223     IPC_CHANNEL_DATA
224 } ipc_channel_t;
225
226 static int ipc_get_client_fd(struct userdata *u, ipc_channel_t channel)
227 {
228     struct sockaddr_un address;
229     int len, ret = IPC_ERR;
230     int sockfd;
231     int n_opt_val;
232     unsigned int n_opt_len = sizeof (n_opt_val);
233     char *socket_path = u->msg_socket_path;
234
235     /*Create socket*/
236     if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
237         pa_log_error("socket failure: %s",  pa_cstrerror(errno));
238         return -1;
239     }
240
241     if (channel == IPC_CHANNEL_DATA) {
242         n_opt_val = 19200;
243         ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, n_opt_len);
244         if (ret == -1) {
245             pa_log_error("unable to setsockopt SO_SNDBUF socket fd %d: %s", sockfd, pa_cstrerror(errno));
246             close(sockfd);
247             return -1;
248         }
249
250         socket_path = u->data_socket_path;
251     }
252     ret = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (char *) &n_opt_val, &n_opt_len);
253     if (ret == -1) {
254         pa_log_error("unable to getsockopt SO_SNDBUF socket fd %d: %s", sockfd, pa_cstrerror(errno));
255         close(sockfd);
256         return -1;
257     }
258     pa_log_info("sockfd: %d,  socket send buffer size: %d", sockfd, n_opt_val);
259
260     if (fcntl(sockfd, F_SETFD, FD_CLOEXEC) < 0) {
261         pa_log_error("unable to set on ctrls socket fd %d: %s", sockfd, pa_cstrerror(errno));
262         close(sockfd);
263         return -1;
264     }
265
266     if (channel == IPC_CHANNEL_MSG) {
267         int flag = fcntl(sockfd, F_GETFL, 0);
268         if (flag == -1) {
269             pa_log_error("unable to get file status on socket fd %d: %s", sockfd, pa_cstrerror(errno));
270             close(sockfd);
271             return -1;
272         }
273
274         if (fcntl(sockfd, F_SETFL, flag | O_NONBLOCK) != 0) {
275             pa_log_error("unable to set file status on socket fd %d: %s", sockfd, pa_cstrerror(errno));
276             close(sockfd);
277             return -1;
278         }
279     }
280
281     memset(&address, 0, sizeof(address));
282     address.sun_family = AF_UNIX;
283     strncpy(address.sun_path, socket_path, sizeof(address.sun_path) - 1);
284     len = sizeof(address);
285
286     if ((ret = connect(sockfd, (struct sockaddr *)&address, len)) < 0) {
287         pa_log_error("connect failure: %s, path: %s", pa_cstrerror(errno), socket_path);
288         close(sockfd);
289         return -1;
290     }
291
292     pa_log_debug("connected well, fd[%d] for [%s]", sockfd, socket_path);
293
294     return sockfd;
295 }
296
297 static int ipc_push_data(struct userdata *u, void *data, int size) {
298     int sent = 0;
299
300     pa_assert(u);
301
302     if (u->data_fd < 0)
303         return -1;
304
305     if ((sent = send(u->data_fd, data, size, 0)) < 0) {
306         pa_log_error("[fd:%d] fail to send data: %s", u->data_fd, pa_cstrerror(errno));
307         return -1;
308     }
309
310     if (!u->enough_data_sent) {
311         u->initial_sent_size += sent;
312         if (u->initial_sent_size > SOCKET_ENOUGH_DATA_SIZE ) {
313             int msg_sent_len = 0;
314             pa_log_info("send data enough message to ipc server");
315             if ((msg_sent_len = send(u->msg_fd, MSG_DATA_ENOUGH, strlen(MSG_DATA_ENOUGH), 0)) < 0)
316                 pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DATA_ENOUGH, msg_sent_len);
317             u->enough_data_sent = true;
318             u->initial_sent_size = 0;
319         }
320     }
321
322 #ifdef DEBUG_LOG
323     if (sent > 0)
324         pa_log_debug("sent length:  %d", sent);
325 #endif
326
327     return sent;
328 }
329
330 static void ipc_request_drain(struct userdata *u) {
331     pa_assert(u);
332
333     if (u->msg_fd >= 0 && u->enough_data_sent) {
334         int msg_sent_len = 0;
335         if ((msg_sent_len = send(u->msg_fd, MSG_DRAIN_REQUEST, strlen(MSG_DRAIN_REQUEST), 0)) < 0)
336             pa_log_error("send message was failed, msg(%s), msg_sent_len =%d", MSG_DRAIN_REQUEST, msg_sent_len);
337
338         u->enough_data_sent = false;
339     }
340 }
341
342 static int initialize_ipc(struct userdata *u) {
343     int retry = 10;
344
345     pa_assert(u);
346
347     while (retry--) {
348         u->msg_fd = ipc_get_client_fd(u, IPC_CHANNEL_MSG);
349         if (u->msg_fd < 0) {
350             pa_log_warn("msg_fd is not connected. try again(%d)", retry);
351             usleep(100 * 1000);
352             continue;
353         }
354         pa_log_debug("got fd for message %d", u->msg_fd);
355         break;
356     }
357     if (u->msg_fd < 0) {
358         pa_log_error("failed to get msg_fd");
359         return -1;
360     }
361
362     retry = 10;
363     while (retry--) {
364         u->data_fd = ipc_get_client_fd(u, IPC_CHANNEL_DATA);
365         if (u->data_fd < 0) {
366             pa_log_warn("data_fd is not connected. try again(%d)", retry);
367             usleep(100 * 1000);
368             continue;
369         }
370         pa_log_debug("got fd for data %d", u->data_fd);
371         break;
372     }
373     if (u->data_fd < 0) {
374         pa_log_error("failed to get data_fd");
375         return -1;
376     }
377
378     u->enough_data_sent = false;
379     u->initial_sent_size = 0;
380
381     return 0;
382 }
383
384 static void deinitialize_ipc(struct userdata *u) {
385     pa_assert(u);
386
387     ipc_request_drain(u);
388
389     u->initial_sent_size = 0;
390
391     if (u->data_fd >= 0) {
392         close(u->data_fd);
393         u->data_fd = -1;
394     }
395
396     if (u->msg_fd >= 0) {
397         close(u->msg_fd);
398         u->msg_fd = -1;
399     }
400 }
401
402 static int send_data(struct userdata *u, pa_memchunk *chunk) {
403     size_t index = 0;
404     size_t length = 0;
405     size_t total_size = 0;
406     void *p;
407
408     pa_assert(u);
409     pa_assert(chunk);
410
411     p = pa_memblock_acquire(chunk->memblock);
412     pa_assert(p);
413
414     index = chunk->index;
415     length = chunk->length;
416
417     for (;;) {
418         ssize_t sent;
419
420         if (u->need_pause) {
421             pa_log_info("msg cond wait for resume");
422             msg_cond_wait(u);
423         }
424
425         sent = ipc_push_data(u, p + index, u->write_block_size);
426
427         if (sent < 0) {
428             if (errno == EINTR || errno == EAGAIN) {
429                 pa_log_warn("Failed to write data to fd: %s", pa_cstrerror(errno));
430                 continue;
431             }
432             pa_log_error("Failed to write data to fd: %s", pa_cstrerror(errno));
433             return -1;
434         } else {
435             index += (size_t) sent;
436             if (length <= sent) {
437                 total_size += sent;
438 #ifdef DEBUG_LOG
439                 pa_log_debug("Wrote %zu bytes.", total_size);
440 #endif
441                 break;
442             }
443             length -= (size_t) sent;
444             total_size += (size_t) sent;
445             if (length < u->write_block_size) {
446 #ifdef DEBUG_LOG
447                 pa_log_warn("Unexpected write, remained %zu, skip it", length);
448 #endif
449                 chunk->index = index;
450                 chunk->length = length;
451                 goto finish;
452             }
453         }
454     }
455
456 finish:
457     pa_memblock_release(chunk->memblock);
458
459     return 0;
460 }
461
462 static void process_render(struct userdata *u, pa_usec_t now) {
463     size_t ate = 0;
464
465     pa_assert(u);
466
467     while (u->timestamp < now) {
468         pa_memchunk chunk;
469
470         pa_sink_render_full(u->sink, u->sink->thread_info.max_request, &chunk);
471
472 #ifdef DUMP_PCM
473         dump_write(u, &chunk);
474 #endif
475         if (!send_data(u, &chunk)) {
476             u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
477             ate += chunk.length;
478         }
479
480         pa_memblock_unref(chunk.memblock);
481
482         if (ate >= u->sink->thread_info.max_request)
483             break;
484     }
485 #ifdef DEBUG_LOG
486     pa_log_debug("end of while loop, timestamp[%" PRIu64 "]", u->timestamp);
487 #endif
488 }
489
490 static void thread_func(void *userdata) {
491     struct userdata *u = userdata;
492
493     pa_assert(u);
494
495     pa_log_debug("Thread starting up");
496
497     pa_thread_mq_install(&u->thread_mq);
498
499     u->timestamp = pa_rtclock_now();
500
501     for (;;) {
502         pa_usec_t now = 0;
503         int ret;
504
505         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
506             now = pa_rtclock_now();
507 #ifdef DEBUG_LOG
508             pa_log_debug("now[%" PRIu64 "]", now);
509 #endif
510         }
511
512         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
513             process_rewind(u);
514
515         if (PA_SINK_IS_RUNNING(u->sink->thread_info.state)) {
516 #ifdef DEBUG_LOG
517             pa_log_debug("timestamp[%" PRIu64 "], now[%" PRIu64 "]", u->timestamp, now);
518 #endif
519             if (u->timestamp <= now)
520                 process_render(u, now);
521
522             pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
523         } else {
524             pa_rtpoll_set_timer_disabled(u->rtpoll);
525         }
526
527         /* Hmm, nothing to do. Let's sleep */
528         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
529             goto fail;
530
531         if (ret == 0)
532             goto finish;
533     }
534
535 fail:
536     /* If this was no regular exit from the loop we have to continue
537      * processing messages until we received PA_MESSAGE_SHUTDOWN */
538     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
539     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
540
541 finish:
542     pa_log_debug("Thread shutting down");
543 }
544
545 static void msg_thread_func(void *userdata) {
546     struct userdata *u = userdata;
547     char recv_msg[IPC_MAX_MSG_LEN];
548     int recv_len = 0;
549
550     pa_assert(u);
551
552     pa_log_debug("MSG thread starting up");
553
554     while (!u->exit_msg_thread) {
555         if (u->msg_fd < 0) {
556             usleep(10*1000);
557             continue;
558         }
559
560         if ((recv_len = recv(u->msg_fd, recv_msg , IPC_MAX_MSG_LEN, 0)) > 0) {
561             if (!strcmp(recv_msg, MSG_DRAIN_COMPLETE)) {
562                 pa_log_debug("got drain complete signal [%s]", recv_msg);
563                 pa_cond_signal(u->msg_cond, false);
564             } else if (!strcmp(recv_msg, MSG_SEND_PAUSE)) {
565                 pa_log_debug("got pause signal [%s]", recv_msg);
566                 u->need_pause = true;
567             } else if (!strcmp(recv_msg, MSG_SEND_RESUME) && u->need_pause) {
568                 pa_log_debug("got resume signal [%s]", recv_msg);
569                 u->need_pause = false;
570                 pa_cond_signal(u->msg_cond, false);
571             }
572         }
573
574         sched_yield();
575
576         usleep(10*1000);
577     }
578
579     pa_log_debug("MSG Thread shutting down");
580 }
581
582 int pa_acm_init(pa_module *m, const char* const v_modargs[]) {
583     struct userdata *u;
584     pa_sample_spec ss;
585     pa_channel_map map;
586     uint32_t write_block_size;
587     pa_modargs *ma;
588     pa_sink_new_data data;
589     char st[PA_SAMPLE_SPEC_SNPRINT_MAX];
590     char cm[PA_CHANNEL_MAP_SNPRINT_MAX];
591
592     pa_assert(m);
593
594     if (!(ma = pa_modargs_new(m->argument, v_modargs))) {
595         pa_log("Failed to parse module arguments.");
596         goto fail;
597     }
598
599     ss = m->core->default_sample_spec;
600     map = m->core->default_channel_map;
601     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
602         pa_log("Invalid sample format specification or channel map");
603         goto fail;
604     }
605     pa_log_info("sample spec: %s, channel map: %s",
606                 pa_sample_spec_snprint(st, sizeof(st), &ss),
607                 pa_channel_map_snprint(cm, sizeof(cm), &map));
608
609     write_block_size = DEFAULT_WRITE_BLOCK_SIZE;
610     if (pa_modargs_get_value_u32(ma, "write_block_size", &write_block_size) < 0) {
611         pa_log("Failed to parse write_block_size argument");
612         goto fail;
613     }
614     pa_log_info("write_block_size: %d", write_block_size);
615
616     u = pa_xnew0(struct userdata, 1);
617     u->msg_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "msg_socket", DEFAULT_ACM_EVENT_SOCKET_PATH));
618     u->data_socket_path = pa_xstrdup(pa_modargs_get_value(ma, "data_socket", DEFAULT_ACM_DATA_SOCKET_PATH));
619     u->core = m->core;
620     u->module = m;
621     m->userdata = u;
622
623     if (initialize_ipc(u)) {
624         pa_log("Failed to initialize IPC");
625         goto fail;
626     }
627
628     u->rtpoll = pa_rtpoll_new();
629     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
630         pa_log("pa_thread_mq_init() failed.");
631         goto fail;
632     }
633
634     pa_sink_new_data_init(&data);
635     data.driver = __FILE__;
636     data.module = m;
637     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
638     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Send PCM data to ACM core via socket fd");
639     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_API, "acm");
640     pa_sink_new_data_set_sample_spec(&data, &ss);
641     pa_sink_new_data_set_channel_map(&data, &map);
642
643     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY);
644
645     pa_sink_new_data_done(&data);
646
647     if (!u->sink) {
648         pa_log("Failed to create sink.");
649         goto fail;
650     }
651
652     u->sink->parent.process_msg = sink_process_msg;
653     u->sink->userdata = u;
654
655     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
656     pa_sink_set_rtpoll(u->sink, u->rtpoll);
657
658     if (!u->msg_mutex)
659         u->msg_mutex = pa_mutex_new(false, false);
660
661     if (!u->msg_cond)
662         u->msg_cond = pa_cond_new();
663
664     if (!(u->thread = pa_thread_new("acm-sink", thread_func, u))) {
665         pa_log("Failed to create thread");
666         goto fail;
667     }
668
669     if (!(u->msg_thread = pa_thread_new("acm-msg-thread", msg_thread_func, u))) {
670         pa_log("Failed to create message thread");
671         goto fail;
672     }
673
674     u->write_block_size = pa_frame_align(write_block_size, &u->sink->sample_spec);
675     pa_sink_set_fixed_latency(u->sink, pa_bytes_to_usec(u->write_block_size, &u->sink->sample_spec));
676     pa_sink_set_max_request(u->sink, u->write_block_size);
677
678     pa_sink_put(u->sink);
679
680     pa_modargs_free(ma);
681
682     return 0;
683
684 fail:
685     pa_acm_done(m);
686
687     if (ma)
688         pa_modargs_free(ma);
689
690     return -1;
691 }
692
693 void pa_acm_done(pa_module *m) {
694     struct userdata* u;
695
696     pa_assert(m);
697
698     pa_log("pa_acm_done()");
699
700     if (!(u = m->userdata))
701         return;
702
703     if (u->sink)
704         pa_sink_unlink(u->sink);
705
706     if (u->thread) {
707         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
708         pa_thread_free(u->thread);
709     }
710
711     if (u->msg_thread) {
712         u->exit_msg_thread = true;
713         pa_thread_free(u->msg_thread);
714     }
715
716     pa_thread_mq_done(&u->thread_mq);
717
718     if (u->sink)
719         pa_sink_unref(u->sink);
720
721     if (u->rtpoll)
722         pa_rtpoll_free(u->rtpoll);
723
724     deinitialize_ipc(u);
725
726     if (u->msg_mutex) {
727         pa_mutex_free(u->msg_mutex);
728         u->msg_mutex = NULL;
729     }
730
731     if (u->msg_cond) {
732         pa_cond_free(u->msg_cond);
733         u->msg_cond = NULL;
734     }
735
736     pa_xfree(u->msg_socket_path);
737     pa_xfree(u->data_socket_path);
738     pa_xfree(u);
739 }