3bbeb1fcbd82dbb72fb9b9375930b9ccb90ad13a
[profile/ivi/pulseaudio-panda.git] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3   This file is part of PulseAudio.
4
5   Copyright 2006 Lennart Poettering
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
58
59 #include "module-rtp-recv-symdef.h"
60
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70         "sink=<name of the sink> "
71         "sap_address=<multicast address to listen on> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81
82 static const char* const valid_modargs[] = {
83     "sink",
84     "sap_address",
85     NULL
86 };
87
88 struct session {
89     struct userdata *userdata;
90     PA_LLIST_FIELDS(struct session);
91
92     pa_sink_input *sink_input;
93     pa_memblockq *memblockq;
94
95     pa_bool_t first_packet;
96     uint32_t ssrc;
97     uint32_t offset;
98
99     struct pa_sdp_info sdp_info;
100
101     pa_rtp_context rtp_context;
102
103     pa_rtpoll_item *rtpoll_item;
104
105     pa_atomic_t timestamp;
106
107     pa_smoother *smoother;
108     pa_usec_t intended_latency;
109     pa_usec_t sink_latency;
110
111     pa_usec_t last_rate_update;
112 };
113
114 struct userdata {
115     pa_module *module;
116     pa_core *core;
117
118     pa_sap_context sap_context;
119     pa_io_event* sap_event;
120
121     pa_time_event *check_death_event;
122
123     char *sink_name;
124
125     PA_LLIST_HEAD(struct session, sessions);
126     pa_hashmap *by_origin;
127     int n_sessions;
128 };
129
130 static void session_free(struct session *s);
131
132 /* Called from I/O thread context */
133 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
134     struct session *s = PA_SINK_INPUT(o)->userdata;
135
136     switch (code) {
137         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
138             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
139
140             /* Fall through, the default handler will add in the extra
141              * latency added by the resampler */
142             break;
143     }
144
145     return pa_sink_input_process_msg(o, code, data, offset, chunk);
146 }
147
148 /* Called from I/O thread context */
149 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
150     struct session *s;
151     pa_sink_input_assert_ref(i);
152     pa_assert_se(s = i->userdata);
153
154     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
155         return -1;
156
157     pa_memblockq_drop(s->memblockq, chunk->length);
158
159     return 0;
160 }
161
162 /* Called from I/O thread context */
163 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
164     struct session *s;
165
166     pa_sink_input_assert_ref(i);
167     pa_assert_se(s = i->userdata);
168
169     pa_memblockq_rewind(s->memblockq, nbytes);
170 }
171
172 /* Called from I/O thread context */
173 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
174     struct session *s;
175
176     pa_sink_input_assert_ref(i);
177     pa_assert_se(s = i->userdata);
178
179     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
180 }
181
182 /* Called from main context */
183 static void sink_input_kill(pa_sink_input* i) {
184     struct session *s;
185     pa_sink_input_assert_ref(i);
186     pa_assert_se(s = i->userdata);
187
188     session_free(s);
189 }
190
191 /* Called from IO context */
192 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
193     struct session *s;
194     pa_sink_input_assert_ref(i);
195     pa_assert_se(s = i->userdata);
196
197     if (b) {
198         pa_smoother_pause(s->smoother, pa_rtclock_now());
199         pa_memblockq_flush_read(s->memblockq);
200     } else
201         s->first_packet = FALSE;
202 }
203
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206     pa_memchunk chunk;
207     int64_t k, j, delta;
208     struct timeval now = { 0, 0 };
209     struct session *s;
210     struct pollfd *p;
211
212     pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
213
214     p = pa_rtpoll_item_get_pollfd(i, NULL);
215
216     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217         pa_log("poll() signalled bad revents.");
218         return -1;
219     }
220
221     if ((p->revents & POLLIN) == 0)
222         return 0;
223
224     p->revents = 0;
225
226     if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227         return 0;
228
229     if (s->sdp_info.payload != s->rtp_context.payload ||
230         !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231         pa_memblock_unref(chunk.memblock);
232         return 0;
233     }
234
235     if (!s->first_packet) {
236         s->first_packet = TRUE;
237
238         s->ssrc = s->rtp_context.ssrc;
239         s->offset = s->rtp_context.timestamp;
240
241         if (s->ssrc == s->userdata->module->core->cookie)
242             pa_log_warn("Detected RTP packet loop!");
243     } else {
244         if (s->ssrc != s->rtp_context.ssrc) {
245             pa_memblock_unref(chunk.memblock);
246             return 0;
247         }
248     }
249
250     /* Check whether there was a timestamp overflow */
251     k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
253
254     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255         delta = k;
256     else
257         delta = j;
258
259     pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
260
261     if (now.tv_sec == 0) {
262         PA_ONCE_BEGIN {
263             pa_log_warn("Using artificial time instead of timestamp");
264         } PA_ONCE_END;
265         pa_rtclock_get(&now);
266     } else
267         pa_rtclock_from_wallclock(&now);
268
269     pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
270
271     /* Tell the smoother that we are rolling now, in case it is still paused */
272     pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
273
274     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
275         pa_log_warn("Queue overrun");
276         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
277     }
278
279 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280
281     pa_memblock_unref(chunk.memblock);
282
283     /* The next timestamp we expect */
284     s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
285
286     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
287
288     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
289         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
290         unsigned fix_samples;
291         uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
292         uint32_t current_rate = s->sink_input->sample_spec.rate;
293         uint32_t new_rate;
294
295         pa_log_debug("Updating sample rate");
296
297         wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
298         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
299
300         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
301
302         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
303         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
304
305         if (ri > render_delay+sink_delay)
306             ri -= render_delay+sink_delay;
307         else
308             ri = 0;
309
310         if (wi < ri)
311             latency = 0;
312         else
313             latency = wi - ri;
314
315         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
316
317         /* Calculate deviation */
318         if (latency < s->intended_latency)
319             fix = s->intended_latency - latency;
320         else
321             fix = latency - s->intended_latency;
322
323         /* How many samples is this per second? */
324         fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
325
326         if (latency < s->intended_latency)
327             new_rate = current_rate - fix_samples;
328         else
329             new_rate = current_rate + fix_samples;
330
331         if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
332             pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
333             new_rate = base_rate;
334         } else {
335             if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
336               new_rate = base_rate;
337             /* Do the adjustment in small steps; 2‰ can be considered inaudible */
338             if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
339                 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
340                 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
341             }
342         }
343         s->sink_input->sample_spec.rate = new_rate;
344
345         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
346
347         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
348
349         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
350
351         s->last_rate_update = pa_timeval_load(&now);
352     }
353
354     if (pa_memblockq_is_readable(s->memblockq) &&
355         s->sink_input->thread_info.underrun_for > 0) {
356         pa_log_debug("Requesting rewind due to end of underrun");
357         pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
358     }
359
360     return 1;
361 }
362
363 /* Called from I/O thread context */
364 static void sink_input_attach(pa_sink_input *i) {
365     struct session *s;
366     struct pollfd *p;
367
368     pa_sink_input_assert_ref(i);
369     pa_assert_se(s = i->userdata);
370
371     pa_assert(!s->rtpoll_item);
372     s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
373
374     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
375     p->fd = s->rtp_context.fd;
376     p->events = POLLIN;
377     p->revents = 0;
378
379     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
380     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
381 }
382
383 /* Called from I/O thread context */
384 static void sink_input_detach(pa_sink_input *i) {
385     struct session *s;
386     pa_sink_input_assert_ref(i);
387     pa_assert_se(s = i->userdata);
388
389     pa_assert(s->rtpoll_item);
390     pa_rtpoll_item_free(s->rtpoll_item);
391     s->rtpoll_item = NULL;
392 }
393
394 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
395     int af, fd = -1, r, one;
396
397     pa_assert(sa);
398     pa_assert(salen > 0);
399
400     af = sa->sa_family;
401     if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
402         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
403         goto fail;
404     }
405
406     pa_make_udp_socket_low_delay(fd);
407
408     one = 1;
409     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
410         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
411         goto fail;
412     }
413
414     one = 1;
415     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
416         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
417         goto fail;
418     }
419
420     if (af == AF_INET) {
421         struct ip_mreq mr4;
422         memset(&mr4, 0, sizeof(mr4));
423         mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
424         r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
425 #ifdef HAVE_IPV6
426     } else {
427         struct ipv6_mreq mr6;
428         memset(&mr6, 0, sizeof(mr6));
429         mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
430         r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
431 #endif
432     }
433
434     if (r < 0) {
435         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
436         goto fail;
437     }
438
439     if (bind(fd, sa, salen) < 0) {
440         pa_log("bind() failed: %s", pa_cstrerror(errno));
441         goto fail;
442     }
443
444     return fd;
445
446 fail:
447     if (fd >= 0)
448         close(fd);
449
450     return -1;
451 }
452
453 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
454     struct session *s = NULL;
455     pa_sink *sink;
456     int fd = -1;
457     pa_memchunk silence;
458     pa_sink_input_new_data data;
459     struct timeval now;
460
461     pa_assert(u);
462     pa_assert(sdp_info);
463
464     if (u->n_sessions >= MAX_SESSIONS) {
465         pa_log("Session limit reached.");
466         goto fail;
467     }
468
469     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
470         pa_log("Sink does not exist.");
471         goto fail;
472     }
473
474     pa_rtclock_get(&now);
475
476     s = pa_xnew0(struct session, 1);
477     s->userdata = u;
478     s->first_packet = FALSE;
479     s->sdp_info = *sdp_info;
480     s->rtpoll_item = NULL;
481     s->intended_latency = LATENCY_USEC;
482     s->smoother = pa_smoother_new(
483             PA_USEC_PER_SEC*5,
484             PA_USEC_PER_SEC*2,
485             TRUE,
486             TRUE,
487             10,
488             pa_timeval_load(&now),
489             TRUE);
490     s->last_rate_update = pa_timeval_load(&now);
491     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
492
493     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
494         goto fail;
495
496     pa_sink_input_new_data_init(&data);
497     data.sink = sink;
498     data.driver = __FILE__;
499     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
500     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
501                      "RTP Stream%s%s%s",
502                      sdp_info->session_name ? " (" : "",
503                      sdp_info->session_name ? sdp_info->session_name : "",
504                      sdp_info->session_name ? ")" : "");
505
506     if (sdp_info->session_name)
507         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
508     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
509     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
510     data.module = u->module;
511     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
512     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
513
514     pa_sink_input_new(&s->sink_input, u->module->core, &data);
515     pa_sink_input_new_data_done(&data);
516
517     if (!s->sink_input) {
518         pa_log("Failed to create sink input.");
519         goto fail;
520     }
521
522     s->sink_input->userdata = s;
523
524     s->sink_input->parent.process_msg = sink_input_process_msg;
525     s->sink_input->pop = sink_input_pop_cb;
526     s->sink_input->process_rewind = sink_input_process_rewind_cb;
527     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
528     s->sink_input->kill = sink_input_kill;
529     s->sink_input->attach = sink_input_attach;
530     s->sink_input->detach = sink_input_detach;
531     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
532
533     pa_sink_input_get_silence(s->sink_input, &silence);
534
535     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
536
537     if (s->intended_latency < s->sink_latency*2)
538         s->intended_latency = s->sink_latency*2;
539
540     s->memblockq = pa_memblockq_new(
541             0,
542             MEMBLOCKQ_MAXLENGTH,
543             MEMBLOCKQ_MAXLENGTH,
544             pa_frame_size(&s->sink_input->sample_spec),
545             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
546             0,
547             0,
548             &silence);
549
550     pa_memblock_unref(silence.memblock);
551
552     pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
553
554     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
555     u->n_sessions++;
556     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
557
558     pa_sink_input_put(s->sink_input);
559
560     pa_log_info("New session '%s'", s->sdp_info.session_name);
561
562     return s;
563
564 fail:
565     pa_xfree(s);
566
567     if (fd >= 0)
568         pa_close(fd);
569
570     return NULL;
571 }
572
573 static void session_free(struct session *s) {
574     pa_assert(s);
575
576     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
577
578     pa_sink_input_unlink(s->sink_input);
579     pa_sink_input_unref(s->sink_input);
580
581     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
582     pa_assert(s->userdata->n_sessions >= 1);
583     s->userdata->n_sessions--;
584     pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
585
586     pa_memblockq_free(s->memblockq);
587     pa_sdp_info_destroy(&s->sdp_info);
588     pa_rtp_context_destroy(&s->rtp_context);
589
590     pa_smoother_free(s->smoother);
591
592     pa_xfree(s);
593 }
594
595 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
596     struct userdata *u = userdata;
597     pa_bool_t goodbye = FALSE;
598     pa_sdp_info info;
599     struct session *s;
600
601     pa_assert(m);
602     pa_assert(e);
603     pa_assert(u);
604     pa_assert(fd == u->sap_context.fd);
605     pa_assert(flags == PA_IO_EVENT_INPUT);
606
607     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
608         return;
609
610     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
611         return;
612
613     if (goodbye) {
614
615         if ((s = pa_hashmap_get(u->by_origin, info.origin)))
616             session_free(s);
617
618         pa_sdp_info_destroy(&info);
619     } else {
620
621         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
622             if (!session_new(u, &info))
623                 pa_sdp_info_destroy(&info);
624
625         } else {
626             struct timeval now;
627             pa_rtclock_get(&now);
628             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
629
630             pa_sdp_info_destroy(&info);
631         }
632     }
633 }
634
635 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
636     struct session *s, *n;
637     struct userdata *u = userdata;
638     struct timeval now;
639
640     pa_assert(m);
641     pa_assert(t);
642     pa_assert(u);
643
644     pa_rtclock_get(&now);
645
646     pa_log_debug("Checking for dead streams ...");
647
648     for (s = u->sessions; s; s = n) {
649         int k;
650         n = s->next;
651
652         k = pa_atomic_load(&s->timestamp);
653
654         if (k + DEATH_TIMEOUT < now.tv_sec)
655             session_free(s);
656     }
657
658     /* Restart timer */
659     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
660 }
661
662 int pa__init(pa_module*m) {
663     struct userdata *u;
664     pa_modargs *ma = NULL;
665     struct sockaddr_in sa4;
666 #ifdef HAVE_IPV6
667     struct sockaddr_in6 sa6;
668 #endif
669     struct sockaddr *sa;
670     socklen_t salen;
671     const char *sap_address;
672     int fd = -1;
673
674     pa_assert(m);
675
676     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
677         pa_log("failed to parse module arguments");
678         goto fail;
679     }
680
681     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
682
683     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
684         sa4.sin_family = AF_INET;
685         sa4.sin_port = htons(SAP_PORT);
686         sa = (struct sockaddr*) &sa4;
687         salen = sizeof(sa4);
688 #ifdef HAVE_IPV6
689     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
690         sa6.sin6_family = AF_INET6;
691         sa6.sin6_port = htons(SAP_PORT);
692         sa = (struct sockaddr*) &sa6;
693         salen = sizeof(sa6);
694 #endif
695     } else {
696         pa_log("Invalid SAP address '%s'", sap_address);
697         goto fail;
698     }
699
700     if ((fd = mcast_socket(sa, salen)) < 0)
701         goto fail;
702
703     m->userdata = u = pa_xnew(struct userdata, 1);
704     u->module = m;
705     u->core = m->core;
706     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
707
708     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
709     pa_sap_context_init_recv(&u->sap_context, fd);
710
711     PA_LLIST_HEAD_INIT(struct session, u->sessions);
712     u->n_sessions = 0;
713     u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
714
715     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
716
717     pa_modargs_free(ma);
718
719     return 0;
720
721 fail:
722     if (ma)
723         pa_modargs_free(ma);
724
725     if (fd >= 0)
726         pa_close(fd);
727
728     return -1;
729 }
730
731 void pa__done(pa_module*m) {
732     struct userdata *u;
733     struct session *s;
734
735     pa_assert(m);
736
737     if (!(u = m->userdata))
738         return;
739
740     if (u->sap_event)
741         m->core->mainloop->io_free(u->sap_event);
742
743     if (u->check_death_event)
744         m->core->mainloop->time_free(u->check_death_event);
745
746     pa_sap_context_destroy(&u->sap_context);
747
748     if (u->by_origin) {
749         while ((s = pa_hashmap_first(u->by_origin)))
750             session_free(s);
751
752         pa_hashmap_free(u->by_origin, NULL, NULL);
753     }
754
755     pa_xfree(u->sink_name);
756     pa_xfree(u);
757 }