module-rtp-recv: Average the estimated real sample rate
[profile/ivi/pulseaudio-panda.git] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3   This file is part of PulseAudio.
4
5   Copyright 2006 Lennart Poettering
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/rtclock.h>
37 #include <pulse/timeval.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/core-error.h>
41 #include <pulsecore/module.h>
42 #include <pulsecore/llist.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/memblockq.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/sample-util.h>
52 #include <pulsecore/macro.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/atomic.h>
55 #include <pulsecore/time-smoother.h>
56 #include <pulsecore/socket-util.h>
57 #include <pulsecore/once.h>
58
59 #include "module-rtp-recv-symdef.h"
60
61 #include "rtp.h"
62 #include "sdp.h"
63 #include "sap.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70         "sink=<name of the sink> "
71         "sap_address=<multicast address to listen on> "
72 );
73
74 #define SAP_PORT 9875
75 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
76 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
77 #define MAX_SESSIONS 16
78 #define DEATH_TIMEOUT 20
79 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
80 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
81
82 static const char* const valid_modargs[] = {
83     "sink",
84     "sap_address",
85     NULL
86 };
87
88 struct session {
89     struct userdata *userdata;
90     PA_LLIST_FIELDS(struct session);
91
92     pa_sink_input *sink_input;
93     pa_memblockq *memblockq;
94
95     pa_bool_t first_packet;
96     uint32_t ssrc;
97     uint32_t offset;
98
99     struct pa_sdp_info sdp_info;
100
101     pa_rtp_context rtp_context;
102
103     pa_rtpoll_item *rtpoll_item;
104
105     pa_atomic_t timestamp;
106
107     pa_smoother *smoother;
108     pa_usec_t intended_latency;
109     pa_usec_t sink_latency;
110
111     pa_usec_t last_rate_update;
112     pa_usec_t last_latency;
113     double estimated_rate;
114     double avg_estimated_rate;
115 };
116
117 struct userdata {
118     pa_module *module;
119     pa_core *core;
120
121     pa_sap_context sap_context;
122     pa_io_event* sap_event;
123
124     pa_time_event *check_death_event;
125
126     char *sink_name;
127
128     PA_LLIST_HEAD(struct session, sessions);
129     pa_hashmap *by_origin;
130     int n_sessions;
131 };
132
133 static void session_free(struct session *s);
134
135 /* Called from I/O thread context */
136 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
137     struct session *s = PA_SINK_INPUT(o)->userdata;
138
139     switch (code) {
140         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
141             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
142
143             /* Fall through, the default handler will add in the extra
144              * latency added by the resampler */
145             break;
146     }
147
148     return pa_sink_input_process_msg(o, code, data, offset, chunk);
149 }
150
151 /* Called from I/O thread context */
152 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
153     struct session *s;
154     pa_sink_input_assert_ref(i);
155     pa_assert_se(s = i->userdata);
156
157     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
158         return -1;
159
160     pa_memblockq_drop(s->memblockq, chunk->length);
161
162     return 0;
163 }
164
165 /* Called from I/O thread context */
166 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
167     struct session *s;
168
169     pa_sink_input_assert_ref(i);
170     pa_assert_se(s = i->userdata);
171
172     pa_memblockq_rewind(s->memblockq, nbytes);
173 }
174
175 /* Called from I/O thread context */
176 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
177     struct session *s;
178
179     pa_sink_input_assert_ref(i);
180     pa_assert_se(s = i->userdata);
181
182     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
183 }
184
185 /* Called from main context */
186 static void sink_input_kill(pa_sink_input* i) {
187     struct session *s;
188     pa_sink_input_assert_ref(i);
189     pa_assert_se(s = i->userdata);
190
191     session_free(s);
192 }
193
194 /* Called from IO context */
195 static void sink_input_suspend_within_thread(pa_sink_input* i, pa_bool_t b) {
196     struct session *s;
197     pa_sink_input_assert_ref(i);
198     pa_assert_se(s = i->userdata);
199
200     if (b) {
201         pa_smoother_pause(s->smoother, pa_rtclock_now());
202         pa_memblockq_flush_read(s->memblockq);
203     } else
204         s->first_packet = FALSE;
205 }
206
207 /* Called from I/O thread context */
208 static int rtpoll_work_cb(pa_rtpoll_item *i) {
209     pa_memchunk chunk;
210     int64_t k, j, delta;
211     struct timeval now = { 0, 0 };
212     struct session *s;
213     struct pollfd *p;
214
215     pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
216
217     p = pa_rtpoll_item_get_pollfd(i, NULL);
218
219     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
220         pa_log("poll() signalled bad revents.");
221         return -1;
222     }
223
224     if ((p->revents & POLLIN) == 0)
225         return 0;
226
227     p->revents = 0;
228
229     if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
230         return 0;
231
232     if (s->sdp_info.payload != s->rtp_context.payload ||
233         !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
234         pa_memblock_unref(chunk.memblock);
235         return 0;
236     }
237
238     if (!s->first_packet) {
239         s->first_packet = TRUE;
240
241         s->ssrc = s->rtp_context.ssrc;
242         s->offset = s->rtp_context.timestamp;
243
244         if (s->ssrc == s->userdata->module->core->cookie)
245             pa_log_warn("Detected RTP packet loop!");
246     } else {
247         if (s->ssrc != s->rtp_context.ssrc) {
248             pa_memblock_unref(chunk.memblock);
249             return 0;
250         }
251     }
252
253     /* Check whether there was a timestamp overflow */
254     k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
255     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
256
257     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
258         delta = k;
259     else
260         delta = j;
261
262     pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
263
264     if (now.tv_sec == 0) {
265         PA_ONCE_BEGIN {
266             pa_log_warn("Using artificial time instead of timestamp");
267         } PA_ONCE_END;
268         pa_rtclock_get(&now);
269     } else
270         pa_rtclock_from_wallclock(&now);
271
272     pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
273
274     /* Tell the smoother that we are rolling now, in case it is still paused */
275     pa_smoother_resume(s->smoother, pa_timeval_load(&now), TRUE);
276
277     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
278         pa_log_warn("Queue overrun");
279         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
280     }
281
282 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
283
284     pa_memblock_unref(chunk.memblock);
285
286     /* The next timestamp we expect */
287     s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
288
289     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
290
291     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
292         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
293         uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
294         uint32_t current_rate = s->sink_input->sample_spec.rate;
295         uint32_t new_rate;
296         double estimated_rate, alpha = 0.02;
297
298         pa_log_debug("Updating sample rate");
299
300         wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
301         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
302
303         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
304
305         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
306         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
307
308         if (ri > render_delay+sink_delay)
309             ri -= render_delay+sink_delay;
310         else
311             ri = 0;
312
313         if (wi < ri)
314             latency = 0;
315         else
316             latency = wi - ri;
317
318         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
319
320         /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
321          * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
322          * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
323          *                                           T
324          *                                 R̂ = ─────────────── Rⁿ .                             (1)
325          *                                     T - (Lⁿ - Lⁿ⁻ⁱ)
326          *
327          * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
328          * is correct).  But there is also the requirement to keep the buffer at a predefined target
329          * latency L̂.  So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
330          * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
331          * aT the latency is reduced from Lⁿ to L̂.  This strategy translates to the requirements
332          *            ₐ      R̂ - Rⁿ⁺ʲ                            a-j+1         j-1
333          *            Σ  T ────────── = L̂ - Lⁿ    with    Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
334          *           ʲ⁼ⁱ        R̂                                  a            a
335          * Solving for Rⁿ⁺ⁱ gives
336          *                                     T - ²∕ₐ₊₁(L̂ - Lⁿ)
337          *                              Rⁿ⁺ⁱ = ───────────────── R̂ .                            (2)
338          *                                            T
339          * In the code below a = 7 is used.
340          *
341          * Equation (1) is not directly used in (2), but instead an exponentially weighted average
342          * of the estimated rate R̂ is used.  This average R̅ is defined as
343          *                                R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
344          * Because it is difficult to find a fixed value for the coefficient α such that the
345          * averaging is without significant lag but oscillations are filtered out, a heuristic is
346          * used.  When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
347          * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
348          */
349         estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
350         if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
351           double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
352           alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
353         }
354         s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
355         s->estimated_rate = estimated_rate;
356         pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz  (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
357         new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
358         s->last_latency = latency;
359
360         if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
361             pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
362             new_rate = base_rate;
363         } else {
364             if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
365               new_rate = base_rate;
366             /* Do the adjustment in small steps; 2‰ can be considered inaudible */
367             if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
368                 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
369                 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
370             }
371         }
372         s->sink_input->sample_spec.rate = new_rate;
373
374         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
375
376         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
377
378         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
379
380         s->last_rate_update = pa_timeval_load(&now);
381     }
382
383     if (pa_memblockq_is_readable(s->memblockq) &&
384         s->sink_input->thread_info.underrun_for > 0) {
385         pa_log_debug("Requesting rewind due to end of underrun");
386         pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
387     }
388
389     return 1;
390 }
391
392 /* Called from I/O thread context */
393 static void sink_input_attach(pa_sink_input *i) {
394     struct session *s;
395     struct pollfd *p;
396
397     pa_sink_input_assert_ref(i);
398     pa_assert_se(s = i->userdata);
399
400     pa_assert(!s->rtpoll_item);
401     s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
402
403     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
404     p->fd = s->rtp_context.fd;
405     p->events = POLLIN;
406     p->revents = 0;
407
408     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
409     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
410 }
411
412 /* Called from I/O thread context */
413 static void sink_input_detach(pa_sink_input *i) {
414     struct session *s;
415     pa_sink_input_assert_ref(i);
416     pa_assert_se(s = i->userdata);
417
418     pa_assert(s->rtpoll_item);
419     pa_rtpoll_item_free(s->rtpoll_item);
420     s->rtpoll_item = NULL;
421 }
422
423 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
424     int af, fd = -1, r, one;
425
426     pa_assert(sa);
427     pa_assert(salen > 0);
428
429     af = sa->sa_family;
430     if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
431         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
432         goto fail;
433     }
434
435     pa_make_udp_socket_low_delay(fd);
436
437     one = 1;
438     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
439         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
440         goto fail;
441     }
442
443     one = 1;
444     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
445         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
446         goto fail;
447     }
448
449     if (af == AF_INET) {
450         struct ip_mreq mr4;
451         memset(&mr4, 0, sizeof(mr4));
452         mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
453         r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
454 #ifdef HAVE_IPV6
455     } else {
456         struct ipv6_mreq mr6;
457         memset(&mr6, 0, sizeof(mr6));
458         mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
459         r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
460 #endif
461     }
462
463     if (r < 0) {
464         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
465         goto fail;
466     }
467
468     if (bind(fd, sa, salen) < 0) {
469         pa_log("bind() failed: %s", pa_cstrerror(errno));
470         goto fail;
471     }
472
473     return fd;
474
475 fail:
476     if (fd >= 0)
477         close(fd);
478
479     return -1;
480 }
481
482 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
483     struct session *s = NULL;
484     pa_sink *sink;
485     int fd = -1;
486     pa_memchunk silence;
487     pa_sink_input_new_data data;
488     struct timeval now;
489
490     pa_assert(u);
491     pa_assert(sdp_info);
492
493     if (u->n_sessions >= MAX_SESSIONS) {
494         pa_log("Session limit reached.");
495         goto fail;
496     }
497
498     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
499         pa_log("Sink does not exist.");
500         goto fail;
501     }
502
503     pa_rtclock_get(&now);
504
505     s = pa_xnew0(struct session, 1);
506     s->userdata = u;
507     s->first_packet = FALSE;
508     s->sdp_info = *sdp_info;
509     s->rtpoll_item = NULL;
510     s->intended_latency = LATENCY_USEC;
511     s->smoother = pa_smoother_new(
512             PA_USEC_PER_SEC*5,
513             PA_USEC_PER_SEC*2,
514             TRUE,
515             TRUE,
516             10,
517             pa_timeval_load(&now),
518             TRUE);
519     s->last_rate_update = pa_timeval_load(&now);
520     s->last_latency = LATENCY_USEC;
521     s->estimated_rate = (double) sink->sample_spec.rate;
522     s->avg_estimated_rate = (double) sink->sample_spec.rate;
523     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
524
525     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
526         goto fail;
527
528     pa_sink_input_new_data_init(&data);
529     data.sink = sink;
530     data.driver = __FILE__;
531     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
532     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
533                      "RTP Stream%s%s%s",
534                      sdp_info->session_name ? " (" : "",
535                      sdp_info->session_name ? sdp_info->session_name : "",
536                      sdp_info->session_name ? ")" : "");
537
538     if (sdp_info->session_name)
539         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
540     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
541     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
542     data.module = u->module;
543     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
544     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
545
546     pa_sink_input_new(&s->sink_input, u->module->core, &data);
547     pa_sink_input_new_data_done(&data);
548
549     if (!s->sink_input) {
550         pa_log("Failed to create sink input.");
551         goto fail;
552     }
553
554     s->sink_input->userdata = s;
555
556     s->sink_input->parent.process_msg = sink_input_process_msg;
557     s->sink_input->pop = sink_input_pop_cb;
558     s->sink_input->process_rewind = sink_input_process_rewind_cb;
559     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
560     s->sink_input->kill = sink_input_kill;
561     s->sink_input->attach = sink_input_attach;
562     s->sink_input->detach = sink_input_detach;
563     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
564
565     pa_sink_input_get_silence(s->sink_input, &silence);
566
567     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
568
569     if (s->intended_latency < s->sink_latency*2)
570         s->intended_latency = s->sink_latency*2;
571
572     s->memblockq = pa_memblockq_new(
573             0,
574             MEMBLOCKQ_MAXLENGTH,
575             MEMBLOCKQ_MAXLENGTH,
576             pa_frame_size(&s->sink_input->sample_spec),
577             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
578             0,
579             0,
580             &silence);
581
582     pa_memblock_unref(silence.memblock);
583
584     pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
585
586     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
587     u->n_sessions++;
588     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
589
590     pa_sink_input_put(s->sink_input);
591
592     pa_log_info("New session '%s'", s->sdp_info.session_name);
593
594     return s;
595
596 fail:
597     pa_xfree(s);
598
599     if (fd >= 0)
600         pa_close(fd);
601
602     return NULL;
603 }
604
605 static void session_free(struct session *s) {
606     pa_assert(s);
607
608     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
609
610     pa_sink_input_unlink(s->sink_input);
611     pa_sink_input_unref(s->sink_input);
612
613     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
614     pa_assert(s->userdata->n_sessions >= 1);
615     s->userdata->n_sessions--;
616     pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
617
618     pa_memblockq_free(s->memblockq);
619     pa_sdp_info_destroy(&s->sdp_info);
620     pa_rtp_context_destroy(&s->rtp_context);
621
622     pa_smoother_free(s->smoother);
623
624     pa_xfree(s);
625 }
626
627 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
628     struct userdata *u = userdata;
629     pa_bool_t goodbye = FALSE;
630     pa_sdp_info info;
631     struct session *s;
632
633     pa_assert(m);
634     pa_assert(e);
635     pa_assert(u);
636     pa_assert(fd == u->sap_context.fd);
637     pa_assert(flags == PA_IO_EVENT_INPUT);
638
639     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
640         return;
641
642     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
643         return;
644
645     if (goodbye) {
646
647         if ((s = pa_hashmap_get(u->by_origin, info.origin)))
648             session_free(s);
649
650         pa_sdp_info_destroy(&info);
651     } else {
652
653         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
654             if (!session_new(u, &info))
655                 pa_sdp_info_destroy(&info);
656
657         } else {
658             struct timeval now;
659             pa_rtclock_get(&now);
660             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
661
662             pa_sdp_info_destroy(&info);
663         }
664     }
665 }
666
667 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
668     struct session *s, *n;
669     struct userdata *u = userdata;
670     struct timeval now;
671
672     pa_assert(m);
673     pa_assert(t);
674     pa_assert(u);
675
676     pa_rtclock_get(&now);
677
678     pa_log_debug("Checking for dead streams ...");
679
680     for (s = u->sessions; s; s = n) {
681         int k;
682         n = s->next;
683
684         k = pa_atomic_load(&s->timestamp);
685
686         if (k + DEATH_TIMEOUT < now.tv_sec)
687             session_free(s);
688     }
689
690     /* Restart timer */
691     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
692 }
693
694 int pa__init(pa_module*m) {
695     struct userdata *u;
696     pa_modargs *ma = NULL;
697     struct sockaddr_in sa4;
698 #ifdef HAVE_IPV6
699     struct sockaddr_in6 sa6;
700 #endif
701     struct sockaddr *sa;
702     socklen_t salen;
703     const char *sap_address;
704     int fd = -1;
705
706     pa_assert(m);
707
708     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
709         pa_log("failed to parse module arguments");
710         goto fail;
711     }
712
713     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
714
715     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
716         sa4.sin_family = AF_INET;
717         sa4.sin_port = htons(SAP_PORT);
718         sa = (struct sockaddr*) &sa4;
719         salen = sizeof(sa4);
720 #ifdef HAVE_IPV6
721     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
722         sa6.sin6_family = AF_INET6;
723         sa6.sin6_port = htons(SAP_PORT);
724         sa = (struct sockaddr*) &sa6;
725         salen = sizeof(sa6);
726 #endif
727     } else {
728         pa_log("Invalid SAP address '%s'", sap_address);
729         goto fail;
730     }
731
732     if ((fd = mcast_socket(sa, salen)) < 0)
733         goto fail;
734
735     m->userdata = u = pa_xnew(struct userdata, 1);
736     u->module = m;
737     u->core = m->core;
738     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
739
740     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
741     pa_sap_context_init_recv(&u->sap_context, fd);
742
743     PA_LLIST_HEAD_INIT(struct session, u->sessions);
744     u->n_sessions = 0;
745     u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
746
747     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
748
749     pa_modargs_free(ma);
750
751     return 0;
752
753 fail:
754     if (ma)
755         pa_modargs_free(ma);
756
757     if (fd >= 0)
758         pa_close(fd);
759
760     return -1;
761 }
762
763 void pa__done(pa_module*m) {
764     struct userdata *u;
765     struct session *s;
766
767     pa_assert(m);
768
769     if (!(u = m->userdata))
770         return;
771
772     if (u->sap_event)
773         m->core->mainloop->io_free(u->sap_event);
774
775     if (u->check_death_event)
776         m->core->mainloop->time_free(u->check_death_event);
777
778     pa_sap_context_destroy(&u->sap_context);
779
780     if (u->by_origin) {
781         while ((s = pa_hashmap_first(u->by_origin)))
782             session_free(s);
783
784         pa_hashmap_free(u->by_origin, NULL, NULL);
785     }
786
787     pa_xfree(u->sink_name);
788     pa_xfree(u);
789 }