Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio.git] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3   This file is part of PulseAudio.
4
5   Copyright 2006 Lennart Poettering
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
55
56 #include "module-rtp-recv-symdef.h"
57
58 #include "rtp.h"
59 #include "sdp.h"
60 #include "sap.h"
61
62 PA_MODULE_AUTHOR("Lennart Poettering");
63 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP");
64 PA_MODULE_VERSION(PACKAGE_VERSION);
65 PA_MODULE_LOAD_ONCE(FALSE);
66 PA_MODULE_USAGE(
67         "sink=<name of the sink> "
68         "sap_address=<multicast address to listen on> "
69 );
70
71 #define SAP_PORT 9875
72 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
78
79 static const char* const valid_modargs[] = {
80     "sink",
81     "sap_address",
82     NULL
83 };
84
85 struct session {
86     struct userdata *userdata;
87     PA_LLIST_FIELDS(struct session);
88
89     pa_sink_input *sink_input;
90     pa_memblockq *memblockq;
91
92     pa_bool_t first_packet;
93     uint32_t ssrc;
94     uint32_t offset;
95
96     struct pa_sdp_info sdp_info;
97
98     pa_rtp_context rtp_context;
99
100     pa_rtpoll_item *rtpoll_item;
101
102     pa_atomic_t timestamp;
103
104     pa_smoother *smoother;
105     pa_usec_t intended_latency;
106     pa_usec_t sink_latency;
107
108     pa_usec_t last_rate_update;
109 };
110
111 struct userdata {
112     pa_module *module;
113
114     pa_sap_context sap_context;
115     pa_io_event* sap_event;
116
117     pa_time_event *check_death_event;
118
119     char *sink_name;
120
121     PA_LLIST_HEAD(struct session, sessions);
122     pa_hashmap *by_origin;
123     int n_sessions;
124 };
125
126 static void session_free(struct session *s);
127
128 /* Called from I/O thread context */
129 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
130     struct session *s = PA_SINK_INPUT(o)->userdata;
131
132     switch (code) {
133         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
134             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
135
136             /* Fall through, the default handler will add in the extra
137              * latency added by the resampler */
138             break;
139     }
140
141     return pa_sink_input_process_msg(o, code, data, offset, chunk);
142 }
143
144 /* Called from I/O thread context */
145 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
146     struct session *s;
147     pa_sink_input_assert_ref(i);
148     pa_assert_se(s = i->userdata);
149
150     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
151         return -1;
152
153     pa_memblockq_drop(s->memblockq, chunk->length);
154
155     return 0;
156 }
157
158 /* Called from I/O thread context */
159 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
160     struct session *s;
161
162     pa_sink_input_assert_ref(i);
163     pa_assert_se(s = i->userdata);
164
165     pa_memblockq_rewind(s->memblockq, nbytes);
166 }
167
168 /* Called from thread context */
169 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
170     struct session *s;
171
172     pa_sink_input_assert_ref(i);
173     pa_assert_se(s = i->userdata);
174
175     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
176 }
177
178 /* Called from main context */
179 static void sink_input_kill(pa_sink_input* i) {
180     struct session *s;
181     pa_sink_input_assert_ref(i);
182     pa_assert_se(s = i->userdata);
183
184     session_free(s);
185 }
186
187 /* Called from I/O thread context */
188 static int rtpoll_work_cb(pa_rtpoll_item *i) {
189     pa_memchunk chunk;
190     int64_t k, j, delta;
191     struct timeval now;
192     struct session *s;
193     struct pollfd *p;
194
195     pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
196
197     p = pa_rtpoll_item_get_pollfd(i, NULL);
198
199     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
200         pa_log("poll() signalled bad revents.");
201         return -1;
202     }
203
204     if ((p->revents & POLLIN) == 0)
205         return 0;
206
207     p->revents = 0;
208
209     if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool) < 0)
210         return 0;
211
212     if (s->sdp_info.payload != s->rtp_context.payload) {
213         pa_memblock_unref(chunk.memblock);
214         return 0;
215     }
216
217     if (!s->first_packet) {
218         s->first_packet = TRUE;
219
220         s->ssrc = s->rtp_context.ssrc;
221         s->offset = s->rtp_context.timestamp;
222
223         if (s->ssrc == s->userdata->module->core->cookie)
224             pa_log_warn("Detected RTP packet loop!");
225     } else {
226         if (s->ssrc != s->rtp_context.ssrc) {
227             pa_memblock_unref(chunk.memblock);
228             return 0;
229         }
230     }
231
232     /* Check whether there was a timestamp overflow */
233     k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
234     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
235
236     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
237         delta = k;
238     else
239         delta = j;
240
241     pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
242
243     pa_rtclock_get(&now);
244
245     pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
246
247     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
248         pa_log_warn("Queue overrun");
249         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
250     }
251
252 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
253
254     pa_memblock_unref(chunk.memblock);
255
256     /* The next timestamp we expect */
257     s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
258
259     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
260
261     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
262         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
263         unsigned fix_samples;
264
265         pa_log("Updating sample rate");
266
267         wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
268         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
269
270         if (PA_MSGOBJECT(s->sink_input->sink)->process_msg(PA_MSGOBJECT(s->sink_input->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_delay, 0, NULL) < 0)
271             sink_delay = 0;
272
273         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
274
275         if (ri > render_delay+sink_delay)
276             ri -= render_delay+sink_delay;
277         else
278             ri = 0;
279
280         if (wi < ri)
281             latency = 0;
282         else
283             latency = wi - ri;
284
285         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double)  s->intended_latency/PA_USEC_PER_MSEC);
286
287         /* Calculate deviation */
288         if (latency < s->intended_latency)
289             fix = s->intended_latency - latency;
290         else
291             fix = latency - s->intended_latency;
292
293         /* How many samples is this per second? */
294         fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
295
296         /* Check if deviation is in bounds */
297         if (fix_samples > s->sink_input->sample_spec.rate*.20)
298             pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
299
300         /* Fix up rate */
301         if (latency < s->intended_latency)
302             s->sink_input->sample_spec.rate -= fix_samples;
303         else
304             s->sink_input->sample_spec.rate += fix_samples;
305
306         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
307
308         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
309
310         s->last_rate_update = pa_timeval_load(&now);
311     }
312
313     if (pa_memblockq_is_readable(s->memblockq) &&
314         s->sink_input->thread_info.underrun_for > 0) {
315         pa_log_debug("Requesting rewind due to end of underrun");
316         pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
317     }
318
319     return 1;
320 }
321
322 /* Called from I/O thread context */
323 static void sink_input_attach(pa_sink_input *i) {
324     struct session *s;
325     struct pollfd *p;
326
327     pa_sink_input_assert_ref(i);
328     pa_assert_se(s = i->userdata);
329
330     pa_assert(!s->rtpoll_item);
331     s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
332
333     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
334     p->fd = s->rtp_context.fd;
335     p->events = POLLIN;
336     p->revents = 0;
337
338     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
339     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
340 }
341
342 /* Called from I/O thread context */
343 static void sink_input_detach(pa_sink_input *i) {
344     struct session *s;
345     pa_sink_input_assert_ref(i);
346     pa_assert_se(s = i->userdata);
347
348     pa_assert(s->rtpoll_item);
349     pa_rtpoll_item_free(s->rtpoll_item);
350     s->rtpoll_item = NULL;
351 }
352
353 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
354     int af, fd = -1, r, one;
355
356     pa_assert(sa);
357     pa_assert(salen > 0);
358
359     af = sa->sa_family;
360     if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
361         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
362         goto fail;
363     }
364
365     one = 1;
366     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
367         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
368         goto fail;
369     }
370
371     if (af == AF_INET) {
372         struct ip_mreq mr4;
373         memset(&mr4, 0, sizeof(mr4));
374         mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
375         r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
376 #ifdef HAVE_IPV6
377     } else {
378         struct ipv6_mreq mr6;
379         memset(&mr6, 0, sizeof(mr6));
380         mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
381         r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
382 #endif
383     }
384
385     if (r < 0) {
386         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
387         goto fail;
388     }
389
390     if (bind(fd, sa, salen) < 0) {
391         pa_log("bind() failed: %s", pa_cstrerror(errno));
392         goto fail;
393     }
394
395     return fd;
396
397 fail:
398     if (fd >= 0)
399         close(fd);
400
401     return -1;
402 }
403
404 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
405     struct session *s = NULL;
406     pa_sink *sink;
407     int fd = -1;
408     pa_memchunk silence;
409     pa_sink_input_new_data data;
410     struct timeval now;
411
412     pa_assert(u);
413     pa_assert(sdp_info);
414
415     if (u->n_sessions >= MAX_SESSIONS) {
416         pa_log("Session limit reached.");
417         goto fail;
418     }
419
420     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
421         pa_log("Sink does not exist.");
422         goto fail;
423     }
424
425     pa_rtclock_get(&now);
426
427     s = pa_xnew0(struct session, 1);
428     s->userdata = u;
429     s->first_packet = FALSE;
430     s->sdp_info = *sdp_info;
431     s->rtpoll_item = NULL;
432     s->intended_latency = LATENCY_USEC;
433     s->smoother = pa_smoother_new(
434             PA_USEC_PER_SEC*5,
435             PA_USEC_PER_SEC*2,
436             TRUE,
437             TRUE,
438             10,
439             pa_timeval_load(&now),
440             FALSE);
441     s->last_rate_update = pa_timeval_load(&now);
442     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
443
444     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
445         goto fail;
446
447     pa_sink_input_new_data_init(&data);
448     data.sink = sink;
449     data.driver = __FILE__;
450     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
451     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
452                      "RTP Stream%s%s%s",
453                      sdp_info->session_name ? " (" : "",
454                      sdp_info->session_name ? sdp_info->session_name : "",
455                      sdp_info->session_name ? ")" : "");
456
457     if (sdp_info->session_name)
458         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
459     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
460     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
461     data.module = u->module;
462     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
463
464     pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
465     pa_sink_input_new_data_done(&data);
466
467     if (!s->sink_input) {
468         pa_log("Failed to create sink input.");
469         goto fail;
470     }
471
472     s->sink_input->userdata = s;
473
474     s->sink_input->parent.process_msg = sink_input_process_msg;
475     s->sink_input->pop = sink_input_pop_cb;
476     s->sink_input->process_rewind = sink_input_process_rewind_cb;
477     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
478     s->sink_input->kill = sink_input_kill;
479     s->sink_input->attach = sink_input_attach;
480     s->sink_input->detach = sink_input_detach;
481
482     pa_sink_input_get_silence(s->sink_input, &silence);
483
484     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
485
486     if (s->intended_latency < s->sink_latency*2)
487         s->intended_latency = s->sink_latency*2;
488
489     s->memblockq = pa_memblockq_new(
490             0,
491             MEMBLOCKQ_MAXLENGTH,
492             MEMBLOCKQ_MAXLENGTH,
493             pa_frame_size(&s->sink_input->sample_spec),
494             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
495             0,
496             0,
497             &silence);
498
499     pa_memblock_unref(silence.memblock);
500
501     pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
502
503     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
504     u->n_sessions++;
505     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
506
507     pa_sink_input_put(s->sink_input);
508
509     pa_log_info("New session '%s'", s->sdp_info.session_name);
510
511     return s;
512
513 fail:
514     pa_xfree(s);
515
516     if (fd >= 0)
517         pa_close(fd);
518
519     return NULL;
520 }
521
522 static void session_free(struct session *s) {
523     pa_assert(s);
524
525     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
526
527     pa_sink_input_unlink(s->sink_input);
528     pa_sink_input_unref(s->sink_input);
529
530     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
531     pa_assert(s->userdata->n_sessions >= 1);
532     s->userdata->n_sessions--;
533     pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
534
535     pa_memblockq_free(s->memblockq);
536     pa_sdp_info_destroy(&s->sdp_info);
537     pa_rtp_context_destroy(&s->rtp_context);
538
539     pa_smoother_free(s->smoother);
540
541     pa_xfree(s);
542 }
543
544 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
545     struct userdata *u = userdata;
546     pa_bool_t goodbye = FALSE;
547     pa_sdp_info info;
548     struct session *s;
549
550     pa_assert(m);
551     pa_assert(e);
552     pa_assert(u);
553     pa_assert(fd == u->sap_context.fd);
554     pa_assert(flags == PA_IO_EVENT_INPUT);
555
556     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
557         return;
558
559     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
560         return;
561
562     if (goodbye) {
563
564         if ((s = pa_hashmap_get(u->by_origin, info.origin)))
565             session_free(s);
566
567         pa_sdp_info_destroy(&info);
568     } else {
569
570         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
571             if (!session_new(u, &info))
572                 pa_sdp_info_destroy(&info);
573
574         } else {
575             struct timeval now;
576             pa_rtclock_get(&now);
577             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
578
579             pa_sdp_info_destroy(&info);
580         }
581     }
582 }
583
584 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
585     struct session *s, *n;
586     struct userdata *u = userdata;
587     struct timeval now;
588     struct timeval tv;
589
590     pa_assert(m);
591     pa_assert(t);
592     pa_assert(ptv);
593     pa_assert(u);
594
595     pa_rtclock_get(&now);
596
597     pa_log_debug("Checking for dead streams ...");
598
599     for (s = u->sessions; s; s = n) {
600         int k;
601         n = s->next;
602
603         k = pa_atomic_load(&s->timestamp);
604
605         if (k + DEATH_TIMEOUT < now.tv_sec)
606             session_free(s);
607     }
608
609     /* Restart timer */
610     pa_gettimeofday(&tv);
611     pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
612     m->time_restart(t, &tv);
613 }
614
615 int pa__init(pa_module*m) {
616     struct userdata *u;
617     pa_modargs *ma = NULL;
618     struct sockaddr_in sa4;
619 #ifdef HAVE_IPV6
620     struct sockaddr_in6 sa6;
621 #endif
622     struct sockaddr *sa;
623     socklen_t salen;
624     const char *sap_address;
625     int fd = -1;
626     struct timeval tv;
627
628     pa_assert(m);
629
630     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
631         pa_log("failed to parse module arguments");
632         goto fail;
633     }
634
635     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
636
637     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
638         sa4.sin_family = AF_INET;
639         sa4.sin_port = htons(SAP_PORT);
640         sa = (struct sockaddr*) &sa4;
641         salen = sizeof(sa4);
642 #ifdef HAVE_IPV6
643     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
644         sa6.sin6_family = AF_INET6;
645         sa6.sin6_port = htons(SAP_PORT);
646         sa = (struct sockaddr*) &sa6;
647         salen = sizeof(sa6);
648 #endif
649     } else {
650         pa_log("Invalid SAP address '%s'", sap_address);
651         goto fail;
652     }
653
654     if ((fd = mcast_socket(sa, salen)) < 0)
655         goto fail;
656
657     u = pa_xnew(struct userdata, 1);
658     m->userdata = u;
659     u->module = m;
660     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
661
662     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
663     pa_sap_context_init_recv(&u->sap_context, fd);
664
665     PA_LLIST_HEAD_INIT(struct session, u->sessions);
666     u->n_sessions = 0;
667     u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
668
669     pa_gettimeofday(&tv);
670     pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
671     u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
672
673     pa_modargs_free(ma);
674
675     return 0;
676
677 fail:
678     if (ma)
679         pa_modargs_free(ma);
680
681     if (fd >= 0)
682         pa_close(fd);
683
684     return -1;
685 }
686
687 void pa__done(pa_module*m) {
688     struct userdata *u;
689     struct session *s;
690
691     pa_assert(m);
692
693     if (!(u = m->userdata))
694         return;
695
696     if (u->sap_event)
697         m->core->mainloop->io_free(u->sap_event);
698
699     if (u->check_death_event)
700         m->core->mainloop->time_free(u->check_death_event);
701
702     pa_sap_context_destroy(&u->sap_context);
703
704     if (u->by_origin) {
705         while ((s = pa_hashmap_first(u->by_origin)))
706             session_free(s);
707
708         pa_hashmap_free(u->by_origin, NULL, NULL);
709     }
710
711     pa_xfree(u->sink_name);
712     pa_xfree(u);
713 }