Git init
[framework/multimedia/pulseaudio.git] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3   This file is part of PulseAudio.
4
5   Copyright 2006 Lennart Poettering
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
58
59 #include "module-rtp-recv-symdef.h"
60
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70         "sink=<name of the sink> "
71         "sap_address=<multicast address to listen on> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81
82 static const char* const valid_modargs[] = {
83     "sink",
84     "sap_address",
85     NULL
86 };
87
88 struct session {
89     struct userdata *userdata;
90     PA_LLIST_FIELDS(struct session);
91
92     pa_sink_input *sink_input;
93     pa_memblockq *memblockq;
94
95     pa_bool_t first_packet;
96     uint32_t ssrc;
97     uint32_t offset;
98
99     struct pa_sdp_info sdp_info;
100
101     pa_rtp_context rtp_context;
102
103     pa_rtpoll_item *rtpoll_item;
104
105     pa_atomic_t timestamp;
106
107     pa_smoother *smoother;
108     pa_usec_t intended_latency;
109     pa_usec_t sink_latency;
110
111     pa_usec_t last_rate_update;
112 };
113
114 struct userdata {
115     pa_module *module;
116     pa_core *core;
117
118     pa_sap_context sap_context;
119     pa_io_event* sap_event;
120
121     pa_time_event *check_death_event;
122
123     char *sink_name;
124
125     PA_LLIST_HEAD(struct session, sessions);
126     pa_hashmap *by_origin;
127     int n_sessions;
128 };
129
130 static void session_free(struct session *s);
131
132 /* Called from I/O thread context */
133 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
134     struct session *s = PA_SINK_INPUT(o)->userdata;
135
136     switch (code) {
137         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
138             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
139
140             /* Fall through, the default handler will add in the extra
141              * latency added by the resampler */
142             break;
143     }
144
145     return pa_sink_input_process_msg(o, code, data, offset, chunk);
146 }
147
148 /* Called from I/O thread context */
149 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
150     struct session *s;
151     pa_sink_input_assert_ref(i);
152     pa_assert_se(s = i->userdata);
153
154     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
155         return -1;
156
157     pa_memblockq_drop(s->memblockq, chunk->length);
158
159     return 0;
160 }
161
162 /* Called from I/O thread context */
163 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
164     struct session *s;
165
166     pa_sink_input_assert_ref(i);
167     pa_assert_se(s = i->userdata);
168
169     pa_memblockq_rewind(s->memblockq, nbytes);
170 }
171
172 /* Called from I/O thread context */
173 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
174     struct session *s;
175
176     pa_sink_input_assert_ref(i);
177     pa_assert_se(s = i->userdata);
178
179     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
180 }
181
182 /* Called from main context */
183 static void sink_input_kill(pa_sink_input* i) {
184     struct session *s;
185     pa_sink_input_assert_ref(i);
186     pa_assert_se(s = i->userdata);
187
188     session_free(s);
189 }
190
191 /* Called from IO context */
192 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
193     struct session *s;
194     pa_sink_input_assert_ref(i);
195     pa_assert_se(s = i->userdata);
196
197     if (b) {
198         pa_smoother_pause(s->smoother, pa_rtclock_now());
199         pa_memblockq_flush_read(s->memblockq);
200     } else
201         s->first_packet = FALSE;
202 }
203
204 /* Called from I/O thread context */
205 static int rtpoll_work_cb(pa_rtpoll_item *i) {
206     pa_memchunk chunk;
207     int64_t k, j, delta;
208     struct timeval now = { 0, 0 };
209     struct session *s;
210     struct pollfd *p;
211
212     pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
213
214     p = pa_rtpoll_item_get_pollfd(i, NULL);
215
216     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
217         pa_log("poll() signalled bad revents.");
218         return -1;
219     }
220
221     if ((p->revents & POLLIN) == 0)
222         return 0;
223
224     p->revents = 0;
225
226     if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
227         return 0;
228
229     if (s->sdp_info.payload != s->rtp_context.payload ||
230         !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
231         pa_memblock_unref(chunk.memblock);
232         return 0;
233     }
234
235     if (!s->first_packet) {
236         s->first_packet = TRUE;
237
238         s->ssrc = s->rtp_context.ssrc;
239         s->offset = s->rtp_context.timestamp;
240
241         if (s->ssrc == s->userdata->module->core->cookie)
242             pa_log_warn("Detected RTP packet loop!");
243     } else {
244         if (s->ssrc != s->rtp_context.ssrc) {
245             pa_memblock_unref(chunk.memblock);
246             return 0;
247         }
248     }
249
250     /* Check whether there was a timestamp overflow */
251     k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
252     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
253
254     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
255         delta = k;
256     else
257         delta = j;
258
259     pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
260
261     if (now.tv_sec == 0) {
262         PA_ONCE_BEGIN {
263             pa_log_warn("Using artificial time instead of timestamp");
264         } PA_ONCE_END;
265         pa_rtclock_get(&now);
266     } else
267         pa_rtclock_from_wallclock(&now);
268
269     pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
270
271     /* Tell the smoother that we are rolling now, in case it is still paused */
272     pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
273
274     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
275         pa_log_warn("Queue overrun");
276         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
277     }
278
279 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
280
281     pa_memblock_unref(chunk.memblock);
282
283     /* The next timestamp we expect */
284     s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
285
286     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
287
288     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
289         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
290         unsigned fix_samples;
291
292         pa_log_debug("Updating sample rate");
293
294         wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
295         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
296
297         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
298
299         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
300         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
301
302         if (ri > render_delay+sink_delay)
303             ri -= render_delay+sink_delay;
304         else
305             ri = 0;
306
307         if (wi < ri)
308             latency = 0;
309         else
310             latency = wi - ri;
311
312         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double)  s->intended_latency/PA_USEC_PER_MSEC);
313
314         /* Calculate deviation */
315         if (latency < s->intended_latency)
316             fix = s->intended_latency - latency;
317         else
318             fix = latency - s->intended_latency;
319
320         /* How many samples is this per second? */
321         fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
322
323         /* Check if deviation is in bounds */
324         if (fix_samples > s->sink_input->sample_spec.rate*.50)
325             pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
326         else {
327             /* Fix up rate */
328             if (latency < s->intended_latency)
329                 s->sink_input->sample_spec.rate -= fix_samples;
330             else
331                 s->sink_input->sample_spec.rate += fix_samples;
332
333             if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
334                 s->sink_input->sample_spec.rate = PA_RATE_MAX;
335         }
336
337         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
338
339         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
340
341         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
342
343         s->last_rate_update = pa_timeval_load(&now);
344     }
345
346     if (pa_memblockq_is_readable(s->memblockq) &&
347         s->sink_input->thread_info.underrun_for > 0) {
348         pa_log_debug("Requesting rewind due to end of underrun");
349         pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
350     }
351
352     return 1;
353 }
354
355 /* Called from I/O thread context */
356 static void sink_input_attach(pa_sink_input *i) {
357     struct session *s;
358     struct pollfd *p;
359
360     pa_sink_input_assert_ref(i);
361     pa_assert_se(s = i->userdata);
362
363     pa_assert(!s->rtpoll_item);
364     s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
365
366     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
367     p->fd = s->rtp_context.fd;
368     p->events = POLLIN;
369     p->revents = 0;
370
371     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
372     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
373 }
374
375 /* Called from I/O thread context */
376 static void sink_input_detach(pa_sink_input *i) {
377     struct session *s;
378     pa_sink_input_assert_ref(i);
379     pa_assert_se(s = i->userdata);
380
381     pa_assert(s->rtpoll_item);
382     pa_rtpoll_item_free(s->rtpoll_item);
383     s->rtpoll_item = NULL;
384 }
385
386 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
387     int af, fd = -1, r, one;
388
389     pa_assert(sa);
390     pa_assert(salen > 0);
391
392     af = sa->sa_family;
393     if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
394         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
395         goto fail;
396     }
397
398     pa_make_udp_socket_low_delay(fd);
399
400     one = 1;
401     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
402         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
403         goto fail;
404     }
405
406     one = 1;
407     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
408         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
409         goto fail;
410     }
411
412     if (af == AF_INET) {
413         struct ip_mreq mr4;
414         memset(&mr4, 0, sizeof(mr4));
415         mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
416         r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
417 #ifdef HAVE_IPV6
418     } else {
419         struct ipv6_mreq mr6;
420         memset(&mr6, 0, sizeof(mr6));
421         mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
422         r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
423 #endif
424     }
425
426     if (r < 0) {
427         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
428         goto fail;
429     }
430
431     if (bind(fd, sa, salen) < 0) {
432         pa_log("bind() failed: %s", pa_cstrerror(errno));
433         goto fail;
434     }
435
436     return fd;
437
438 fail:
439     if (fd >= 0)
440         close(fd);
441
442     return -1;
443 }
444
445 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
446     struct session *s = NULL;
447     pa_sink *sink;
448     int fd = -1;
449     pa_memchunk silence;
450     pa_sink_input_new_data data;
451     struct timeval now;
452
453     pa_assert(u);
454     pa_assert(sdp_info);
455
456     if (u->n_sessions >= MAX_SESSIONS) {
457         pa_log("Session limit reached.");
458         goto fail;
459     }
460
461     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
462         pa_log("Sink does not exist.");
463         goto fail;
464     }
465
466     pa_rtclock_get(&now);
467
468     s = pa_xnew0(struct session, 1);
469     s->userdata = u;
470     s->first_packet = FALSE;
471     s->sdp_info = *sdp_info;
472     s->rtpoll_item = NULL;
473     s->intended_latency = LATENCY_USEC;
474     s->smoother = pa_smoother_new(
475             PA_USEC_PER_SEC*5,
476             PA_USEC_PER_SEC*2,
477             TRUE,
478             TRUE,
479             10,
480             pa_timeval_load(&now),
481             TRUE);
482     s->last_rate_update = pa_timeval_load(&now);
483     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
484
485     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
486         goto fail;
487
488     pa_sink_input_new_data_init(&data);
489     data.sink = sink;
490     data.driver = __FILE__;
491     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
492     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
493                      "RTP Stream%s%s%s",
494                      sdp_info->session_name ? " (" : "",
495                      sdp_info->session_name ? sdp_info->session_name : "",
496                      sdp_info->session_name ? ")" : "");
497
498     if (sdp_info->session_name)
499         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
500     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
501     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
502     data.module = u->module;
503     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
504     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
505
506     pa_sink_input_new(&s->sink_input, u->module->core, &data);
507     pa_sink_input_new_data_done(&data);
508
509     if (!s->sink_input) {
510         pa_log("Failed to create sink input.");
511         goto fail;
512     }
513
514     s->sink_input->userdata = s;
515
516     s->sink_input->parent.process_msg = sink_input_process_msg;
517     s->sink_input->pop = sink_input_pop_cb;
518     s->sink_input->process_rewind = sink_input_process_rewind_cb;
519     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
520     s->sink_input->kill = sink_input_kill;
521     s->sink_input->attach = sink_input_attach;
522     s->sink_input->detach = sink_input_detach;
523     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
524
525     pa_sink_input_get_silence(s->sink_input, &silence);
526
527     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
528
529     if (s->intended_latency < s->sink_latency*2)
530         s->intended_latency = s->sink_latency*2;
531
532     s->memblockq = pa_memblockq_new(
533             0,
534             MEMBLOCKQ_MAXLENGTH,
535             MEMBLOCKQ_MAXLENGTH,
536             pa_frame_size(&s->sink_input->sample_spec),
537             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
538             0,
539             0,
540             &silence);
541
542     pa_memblock_unref(silence.memblock);
543
544     pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
545
546     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
547     u->n_sessions++;
548     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
549
550     pa_sink_input_put(s->sink_input);
551
552     pa_log_info("New session '%s'", s->sdp_info.session_name);
553
554     return s;
555
556 fail:
557     pa_xfree(s);
558
559     if (fd >= 0)
560         pa_close(fd);
561
562     return NULL;
563 }
564
565 static void session_free(struct session *s) {
566     pa_assert(s);
567
568     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
569
570     pa_sink_input_unlink(s->sink_input);
571     pa_sink_input_unref(s->sink_input);
572
573     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
574     pa_assert(s->userdata->n_sessions >= 1);
575     s->userdata->n_sessions--;
576     pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
577
578     pa_memblockq_free(s->memblockq);
579     pa_sdp_info_destroy(&s->sdp_info);
580     pa_rtp_context_destroy(&s->rtp_context);
581
582     pa_smoother_free(s->smoother);
583
584     pa_xfree(s);
585 }
586
587 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
588     struct userdata *u = userdata;
589     pa_bool_t goodbye = FALSE;
590     pa_sdp_info info;
591     struct session *s;
592
593     pa_assert(m);
594     pa_assert(e);
595     pa_assert(u);
596     pa_assert(fd == u->sap_context.fd);
597     pa_assert(flags == PA_IO_EVENT_INPUT);
598
599     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
600         return;
601
602     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
603         return;
604
605     if (goodbye) {
606
607         if ((s = pa_hashmap_get(u->by_origin, info.origin)))
608             session_free(s);
609
610         pa_sdp_info_destroy(&info);
611     } else {
612
613         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
614             if (!session_new(u, &info))
615                 pa_sdp_info_destroy(&info);
616
617         } else {
618             struct timeval now;
619             pa_rtclock_get(&now);
620             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
621
622             pa_sdp_info_destroy(&info);
623         }
624     }
625 }
626
627 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
628     struct session *s, *n;
629     struct userdata *u = userdata;
630     struct timeval now;
631
632     pa_assert(m);
633     pa_assert(t);
634     pa_assert(u);
635
636     pa_rtclock_get(&now);
637
638     pa_log_debug("Checking for dead streams ...");
639
640     for (s = u->sessions; s; s = n) {
641         int k;
642         n = s->next;
643
644         k = pa_atomic_load(&s->timestamp);
645
646         if (k + DEATH_TIMEOUT < now.tv_sec)
647             session_free(s);
648     }
649
650     /* Restart timer */
651     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
652 }
653
654 int pa__init(pa_module*m) {
655     struct userdata *u;
656     pa_modargs *ma = NULL;
657     struct sockaddr_in sa4;
658 #ifdef HAVE_IPV6
659     struct sockaddr_in6 sa6;
660 #endif
661     struct sockaddr *sa;
662     socklen_t salen;
663     const char *sap_address;
664     int fd = -1;
665
666     pa_assert(m);
667
668     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
669         pa_log("failed to parse module arguments");
670         goto fail;
671     }
672
673     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
674
675     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
676         sa4.sin_family = AF_INET;
677         sa4.sin_port = htons(SAP_PORT);
678         sa = (struct sockaddr*) &sa4;
679         salen = sizeof(sa4);
680 #ifdef HAVE_IPV6
681     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
682         sa6.sin6_family = AF_INET6;
683         sa6.sin6_port = htons(SAP_PORT);
684         sa = (struct sockaddr*) &sa6;
685         salen = sizeof(sa6);
686 #endif
687     } else {
688         pa_log("Invalid SAP address '%s'", sap_address);
689         goto fail;
690     }
691
692     if ((fd = mcast_socket(sa, salen)) < 0)
693         goto fail;
694
695     m->userdata = u = pa_xnew(struct userdata, 1);
696     u->module = m;
697     u->core = m->core;
698     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
699
700     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
701     pa_sap_context_init_recv(&u->sap_context, fd);
702
703     PA_LLIST_HEAD_INIT(struct session, u->sessions);
704     u->n_sessions = 0;
705     u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
706
707     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
708
709     pa_modargs_free(ma);
710
711     return 0;
712
713 fail:
714     if (ma)
715         pa_modargs_free(ma);
716
717     if (fd >= 0)
718         pa_close(fd);
719
720     return -1;
721 }
722
723 void pa__done(pa_module*m) {
724     struct userdata *u;
725     struct session *s;
726
727     pa_assert(m);
728
729     if (!(u = m->userdata))
730         return;
731
732     if (u->sap_event)
733         m->core->mainloop->io_free(u->sap_event);
734
735     if (u->check_death_event)
736         m->core->mainloop->time_free(u->check_death_event);
737
738     pa_sap_context_destroy(&u->sap_context);
739
740     if (u->by_origin) {
741         while ((s = pa_hashmap_first(u->by_origin)))
742             session_free(s);
743
744         pa_hashmap_free(u->by_origin, NULL, NULL);
745     }
746
747     pa_xfree(u->sink_name);
748     pa_xfree(u);
749 }