Merge commit 'origin/master-tx'
[platform/upstream/pulseaudio.git] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3   This file is part of PulseAudio.
4
5   Copyright 2006 Lennart Poettering
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/socket-util.h>
56 #include <pulsecore/once.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(FALSE);
68 PA_MODULE_USAGE(
69         "sink=<name of the sink> "
70         "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82     "sink",
83     "sap_address",
84     NULL
85 };
86
87 struct session {
88     struct userdata *userdata;
89     PA_LLIST_FIELDS(struct session);
90
91     pa_sink_input *sink_input;
92     pa_memblockq *memblockq;
93
94     pa_bool_t first_packet;
95     uint32_t ssrc;
96     uint32_t offset;
97
98     struct pa_sdp_info sdp_info;
99
100     pa_rtp_context rtp_context;
101
102     pa_rtpoll_item *rtpoll_item;
103
104     pa_atomic_t timestamp;
105
106     pa_smoother *smoother;
107     pa_usec_t intended_latency;
108     pa_usec_t sink_latency;
109
110     pa_usec_t last_rate_update;
111 };
112
113 struct userdata {
114     pa_module *module;
115
116     pa_sap_context sap_context;
117     pa_io_event* sap_event;
118
119     pa_time_event *check_death_event;
120
121     char *sink_name;
122
123     PA_LLIST_HEAD(struct session, sessions);
124     pa_hashmap *by_origin;
125     int n_sessions;
126 };
127
128 static void session_free(struct session *s);
129
130 /* Called from I/O thread context */
131 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
132     struct session *s = PA_SINK_INPUT(o)->userdata;
133
134     switch (code) {
135         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
136             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
137
138             /* Fall through, the default handler will add in the extra
139              * latency added by the resampler */
140             break;
141     }
142
143     return pa_sink_input_process_msg(o, code, data, offset, chunk);
144 }
145
146 /* Called from I/O thread context */
147 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
148     struct session *s;
149     pa_sink_input_assert_ref(i);
150     pa_assert_se(s = i->userdata);
151
152     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
153         return -1;
154
155     pa_memblockq_drop(s->memblockq, chunk->length);
156
157     return 0;
158 }
159
160 /* Called from I/O thread context */
161 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
162     struct session *s;
163
164     pa_sink_input_assert_ref(i);
165     pa_assert_se(s = i->userdata);
166
167     pa_memblockq_rewind(s->memblockq, nbytes);
168 }
169
170 /* Called from I/O thread context */
171 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
172     struct session *s;
173
174     pa_sink_input_assert_ref(i);
175     pa_assert_se(s = i->userdata);
176
177     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
178 }
179
180 /* Called from main context */
181 static void sink_input_kill(pa_sink_input* i) {
182     struct session *s;
183     pa_sink_input_assert_ref(i);
184     pa_assert_se(s = i->userdata);
185
186     session_free(s);
187 }
188
189 /* Called from IO context */
190 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
191     struct session *s;
192     pa_sink_input_assert_ref(i);
193     pa_assert_se(s = i->userdata);
194
195     if (b) {
196         pa_smoother_pause(s->smoother, pa_rtclock_usec());
197         pa_memblockq_flush_read(s->memblockq);
198     } else
199         s->first_packet = FALSE;
200 }
201
202 /* Called from I/O thread context */
203 static int rtpoll_work_cb(pa_rtpoll_item *i) {
204     pa_memchunk chunk;
205     int64_t k, j, delta;
206     struct timeval now = { 0, 0 };
207     struct session *s;
208     struct pollfd *p;
209
210     pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
211
212     p = pa_rtpoll_item_get_pollfd(i, NULL);
213
214     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
215         pa_log("poll() signalled bad revents.");
216         return -1;
217     }
218
219     if ((p->revents & POLLIN) == 0)
220         return 0;
221
222     p->revents = 0;
223
224     if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
225         return 0;
226
227     if (s->sdp_info.payload != s->rtp_context.payload ||
228         !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
229         pa_memblock_unref(chunk.memblock);
230         return 0;
231     }
232
233     if (!s->first_packet) {
234         s->first_packet = TRUE;
235
236         s->ssrc = s->rtp_context.ssrc;
237         s->offset = s->rtp_context.timestamp;
238
239         if (s->ssrc == s->userdata->module->core->cookie)
240             pa_log_warn("Detected RTP packet loop!");
241     } else {
242         if (s->ssrc != s->rtp_context.ssrc) {
243             pa_memblock_unref(chunk.memblock);
244             return 0;
245         }
246     }
247
248     /* Check whether there was a timestamp overflow */
249     k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
250     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
251
252     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
253         delta = k;
254     else
255         delta = j;
256
257     pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
258
259     if (now.tv_sec == 0) {
260         PA_ONCE_BEGIN {
261             pa_log_warn("Using artificial time instead of timestamp");
262         } PA_ONCE_END;
263         pa_rtclock_get(&now);
264     } else
265         pa_rtclock_from_wallclock(&now);
266
267     pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
268
269     /* Tell the smoother that we are rolling now, in case it is still paused */
270     pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
271
272     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
273         pa_log_warn("Queue overrun");
274         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
275     }
276
277 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
278
279     pa_memblock_unref(chunk.memblock);
280
281     /* The next timestamp we expect */
282     s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
283
284     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
285
286     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
287         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
288         unsigned fix_samples;
289
290         pa_log_debug("Updating sample rate");
291
292         wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
293         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
294
295         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
296
297         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
298         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
299
300         if (ri > render_delay+sink_delay)
301             ri -= render_delay+sink_delay;
302         else
303             ri = 0;
304
305         if (wi < ri)
306             latency = 0;
307         else
308             latency = wi - ri;
309
310         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double)  s->intended_latency/PA_USEC_PER_MSEC);
311
312         /* Calculate deviation */
313         if (latency < s->intended_latency)
314             fix = s->intended_latency - latency;
315         else
316             fix = latency - s->intended_latency;
317
318         /* How many samples is this per second? */
319         fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
320
321         /* Check if deviation is in bounds */
322         if (fix_samples > s->sink_input->sample_spec.rate*.50)
323             pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
324         else {
325             /* Fix up rate */
326             if (latency < s->intended_latency)
327                 s->sink_input->sample_spec.rate -= fix_samples;
328             else
329                 s->sink_input->sample_spec.rate += fix_samples;
330
331             if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
332                 s->sink_input->sample_spec.rate = PA_RATE_MAX;
333         }
334
335         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
336
337         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
338
339         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
340
341         s->last_rate_update = pa_timeval_load(&now);
342     }
343
344     if (pa_memblockq_is_readable(s->memblockq) &&
345         s->sink_input->thread_info.underrun_for > 0) {
346         pa_log_debug("Requesting rewind due to end of underrun");
347         pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
348     }
349
350     return 1;
351 }
352
353 /* Called from I/O thread context */
354 static void sink_input_attach(pa_sink_input *i) {
355     struct session *s;
356     struct pollfd *p;
357
358     pa_sink_input_assert_ref(i);
359     pa_assert_se(s = i->userdata);
360
361     pa_assert(!s->rtpoll_item);
362     s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
363
364     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
365     p->fd = s->rtp_context.fd;
366     p->events = POLLIN;
367     p->revents = 0;
368
369     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
370     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
371 }
372
373 /* Called from I/O thread context */
374 static void sink_input_detach(pa_sink_input *i) {
375     struct session *s;
376     pa_sink_input_assert_ref(i);
377     pa_assert_se(s = i->userdata);
378
379     pa_assert(s->rtpoll_item);
380     pa_rtpoll_item_free(s->rtpoll_item);
381     s->rtpoll_item = NULL;
382 }
383
384 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
385     int af, fd = -1, r, one;
386
387     pa_assert(sa);
388     pa_assert(salen > 0);
389
390     af = sa->sa_family;
391     if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
392         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
393         goto fail;
394     }
395
396     pa_make_udp_socket_low_delay(fd);
397
398     one = 1;
399     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
400         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
401         goto fail;
402     }
403
404     one = 1;
405     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
406         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
407         goto fail;
408     }
409
410     if (af == AF_INET) {
411         struct ip_mreq mr4;
412         memset(&mr4, 0, sizeof(mr4));
413         mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
414         r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
415 #ifdef HAVE_IPV6
416     } else {
417         struct ipv6_mreq mr6;
418         memset(&mr6, 0, sizeof(mr6));
419         mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
420         r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
421 #endif
422     }
423
424     if (r < 0) {
425         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
426         goto fail;
427     }
428
429     if (bind(fd, sa, salen) < 0) {
430         pa_log("bind() failed: %s", pa_cstrerror(errno));
431         goto fail;
432     }
433
434     return fd;
435
436 fail:
437     if (fd >= 0)
438         close(fd);
439
440     return -1;
441 }
442
443 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
444     struct session *s = NULL;
445     pa_sink *sink;
446     int fd = -1;
447     pa_memchunk silence;
448     pa_sink_input_new_data data;
449     struct timeval now;
450
451     pa_assert(u);
452     pa_assert(sdp_info);
453
454     if (u->n_sessions >= MAX_SESSIONS) {
455         pa_log("Session limit reached.");
456         goto fail;
457     }
458
459     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
460         pa_log("Sink does not exist.");
461         goto fail;
462     }
463
464     pa_rtclock_get(&now);
465
466     s = pa_xnew0(struct session, 1);
467     s->userdata = u;
468     s->first_packet = FALSE;
469     s->sdp_info = *sdp_info;
470     s->rtpoll_item = NULL;
471     s->intended_latency = LATENCY_USEC;
472     s->smoother = pa_smoother_new(
473             PA_USEC_PER_SEC*5,
474             PA_USEC_PER_SEC*2,
475             TRUE,
476             TRUE,
477             10,
478             pa_timeval_load(&now),
479             TRUE);
480     s->last_rate_update = pa_timeval_load(&now);
481     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
482
483     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
484         goto fail;
485
486     pa_sink_input_new_data_init(&data);
487     data.sink = sink;
488     data.driver = __FILE__;
489     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
490     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
491                      "RTP Stream%s%s%s",
492                      sdp_info->session_name ? " (" : "",
493                      sdp_info->session_name ? sdp_info->session_name : "",
494                      sdp_info->session_name ? ")" : "");
495
496     if (sdp_info->session_name)
497         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
498     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
499     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
500     data.module = u->module;
501     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
502
503     pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
504     pa_sink_input_new_data_done(&data);
505
506     if (!s->sink_input) {
507         pa_log("Failed to create sink input.");
508         goto fail;
509     }
510
511     s->sink_input->userdata = s;
512
513     s->sink_input->parent.process_msg = sink_input_process_msg;
514     s->sink_input->pop = sink_input_pop_cb;
515     s->sink_input->process_rewind = sink_input_process_rewind_cb;
516     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
517     s->sink_input->kill = sink_input_kill;
518     s->sink_input->attach = sink_input_attach;
519     s->sink_input->detach = sink_input_detach;
520     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
521
522     pa_sink_input_get_silence(s->sink_input, &silence);
523
524     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
525
526     if (s->intended_latency < s->sink_latency*2)
527         s->intended_latency = s->sink_latency*2;
528
529     s->memblockq = pa_memblockq_new(
530             0,
531             MEMBLOCKQ_MAXLENGTH,
532             MEMBLOCKQ_MAXLENGTH,
533             pa_frame_size(&s->sink_input->sample_spec),
534             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
535             0,
536             0,
537             &silence);
538
539     pa_memblock_unref(silence.memblock);
540
541     pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
542
543     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
544     u->n_sessions++;
545     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
546
547     pa_sink_input_put(s->sink_input);
548
549     pa_log_info("New session '%s'", s->sdp_info.session_name);
550
551     return s;
552
553 fail:
554     pa_xfree(s);
555
556     if (fd >= 0)
557         pa_close(fd);
558
559     return NULL;
560 }
561
562 static void session_free(struct session *s) {
563     pa_assert(s);
564
565     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
566
567     pa_sink_input_unlink(s->sink_input);
568     pa_sink_input_unref(s->sink_input);
569
570     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
571     pa_assert(s->userdata->n_sessions >= 1);
572     s->userdata->n_sessions--;
573     pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
574
575     pa_memblockq_free(s->memblockq);
576     pa_sdp_info_destroy(&s->sdp_info);
577     pa_rtp_context_destroy(&s->rtp_context);
578
579     pa_smoother_free(s->smoother);
580
581     pa_xfree(s);
582 }
583
584 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
585     struct userdata *u = userdata;
586     pa_bool_t goodbye = FALSE;
587     pa_sdp_info info;
588     struct session *s;
589
590     pa_assert(m);
591     pa_assert(e);
592     pa_assert(u);
593     pa_assert(fd == u->sap_context.fd);
594     pa_assert(flags == PA_IO_EVENT_INPUT);
595
596     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
597         return;
598
599     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
600         return;
601
602     if (goodbye) {
603
604         if ((s = pa_hashmap_get(u->by_origin, info.origin)))
605             session_free(s);
606
607         pa_sdp_info_destroy(&info);
608     } else {
609
610         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
611             if (!session_new(u, &info))
612                 pa_sdp_info_destroy(&info);
613
614         } else {
615             struct timeval now;
616             pa_rtclock_get(&now);
617             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
618
619             pa_sdp_info_destroy(&info);
620         }
621     }
622 }
623
624 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
625     struct session *s, *n;
626     struct userdata *u = userdata;
627     struct timeval now;
628     struct timeval tv;
629
630     pa_assert(m);
631     pa_assert(t);
632     pa_assert(ptv);
633     pa_assert(u);
634
635     pa_rtclock_get(&now);
636
637     pa_log_debug("Checking for dead streams ...");
638
639     for (s = u->sessions; s; s = n) {
640         int k;
641         n = s->next;
642
643         k = pa_atomic_load(&s->timestamp);
644
645         if (k + DEATH_TIMEOUT < now.tv_sec)
646             session_free(s);
647     }
648
649     /* Restart timer */
650     pa_gettimeofday(&tv);
651     pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
652     m->time_restart(t, &tv);
653 }
654
655 int pa__init(pa_module*m) {
656     struct userdata *u;
657     pa_modargs *ma = NULL;
658     struct sockaddr_in sa4;
659 #ifdef HAVE_IPV6
660     struct sockaddr_in6 sa6;
661 #endif
662     struct sockaddr *sa;
663     socklen_t salen;
664     const char *sap_address;
665     int fd = -1;
666     struct timeval tv;
667
668     pa_assert(m);
669
670     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
671         pa_log("failed to parse module arguments");
672         goto fail;
673     }
674
675     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
676
677     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
678         sa4.sin_family = AF_INET;
679         sa4.sin_port = htons(SAP_PORT);
680         sa = (struct sockaddr*) &sa4;
681         salen = sizeof(sa4);
682 #ifdef HAVE_IPV6
683     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
684         sa6.sin6_family = AF_INET6;
685         sa6.sin6_port = htons(SAP_PORT);
686         sa = (struct sockaddr*) &sa6;
687         salen = sizeof(sa6);
688 #endif
689     } else {
690         pa_log("Invalid SAP address '%s'", sap_address);
691         goto fail;
692     }
693
694     if ((fd = mcast_socket(sa, salen)) < 0)
695         goto fail;
696
697     m->userdata = u = pa_xnew(struct userdata, 1);
698     u->module = m;
699     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
700
701     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
702     pa_sap_context_init_recv(&u->sap_context, fd);
703
704     PA_LLIST_HEAD_INIT(struct session, u->sessions);
705     u->n_sessions = 0;
706     u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
707
708     pa_gettimeofday(&tv);
709     pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
710     u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
711
712     pa_modargs_free(ma);
713
714     return 0;
715
716 fail:
717     if (ma)
718         pa_modargs_free(ma);
719
720     if (fd >= 0)
721         pa_close(fd);
722
723     return -1;
724 }
725
726 void pa__done(pa_module*m) {
727     struct userdata *u;
728     struct session *s;
729
730     pa_assert(m);
731
732     if (!(u = m->userdata))
733         return;
734
735     if (u->sap_event)
736         m->core->mainloop->io_free(u->sap_event);
737
738     if (u->check_death_event)
739         m->core->mainloop->time_free(u->check_death_event);
740
741     pa_sap_context_destroy(&u->sap_context);
742
743     if (u->by_origin) {
744         while ((s = pa_hashmap_first(u->by_origin)))
745             session_free(s);
746
747         pa_hashmap_free(u->by_origin, NULL, NULL);
748     }
749
750     pa_xfree(u->sink_name);
751     pa_xfree(u);
752 }