hashmap: Add the ability to free keys
[platform/upstream/pulseaudio.git] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3   This file is part of PulseAudio.
4
5   Copyright 2006 Lennart Poettering
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <math.h>
34
35 #include <pulse/rtclock.h>
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-rtclock.h>
47 #include <pulsecore/core-util.h>
48 #include <pulsecore/modargs.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/sample-util.h>
51 #include <pulsecore/macro.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/once.h>
55 #include <pulsecore/poll.h>
56 #include <pulsecore/arpa-inet.h>
57
58 #include "module-rtp-recv-symdef.h"
59
60 #include "rtp.h"
61 #include "sdp.h"
62 #include "sap.h"
63
64 PA_MODULE_AUTHOR("Lennart Poettering");
65 PA_MODULE_DESCRIPTION("Receive data from a network via RTP/SAP/SDP");
66 PA_MODULE_VERSION(PACKAGE_VERSION);
67 PA_MODULE_LOAD_ONCE(false);
68 PA_MODULE_USAGE(
69         "sink=<name of the sink> "
70         "sap_address=<multicast address to listen on> "
71 );
72
73 #define SAP_PORT 9875
74 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
75 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
76 #define MAX_SESSIONS 16
77 #define DEATH_TIMEOUT 20
78 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
79 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
80
81 static const char* const valid_modargs[] = {
82     "sink",
83     "sap_address",
84     NULL
85 };
86
87 struct session {
88     struct userdata *userdata;
89     PA_LLIST_FIELDS(struct session);
90
91     pa_sink_input *sink_input;
92     pa_memblockq *memblockq;
93
94     bool first_packet;
95     uint32_t ssrc;
96     uint32_t offset;
97
98     struct pa_sdp_info sdp_info;
99
100     pa_rtp_context rtp_context;
101
102     pa_rtpoll_item *rtpoll_item;
103
104     pa_atomic_t timestamp;
105
106     pa_usec_t intended_latency;
107     pa_usec_t sink_latency;
108
109     pa_usec_t last_rate_update;
110     pa_usec_t last_latency;
111     double estimated_rate;
112     double avg_estimated_rate;
113 };
114
115 struct userdata {
116     pa_module *module;
117     pa_core *core;
118
119     pa_sap_context sap_context;
120     pa_io_event* sap_event;
121
122     pa_time_event *check_death_event;
123
124     char *sink_name;
125
126     PA_LLIST_HEAD(struct session, sessions);
127     pa_hashmap *by_origin;
128     int n_sessions;
129 };
130
131 static void session_free(struct session *s);
132
133 /* Called from I/O thread context */
134 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135     struct session *s = PA_SINK_INPUT(o)->userdata;
136
137     switch (code) {
138         case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
139             *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
140
141             /* Fall through, the default handler will add in the extra
142              * latency added by the resampler */
143             break;
144     }
145
146     return pa_sink_input_process_msg(o, code, data, offset, chunk);
147 }
148
149 /* Called from I/O thread context */
150 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
151     struct session *s;
152     pa_sink_input_assert_ref(i);
153     pa_assert_se(s = i->userdata);
154
155     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
156         return -1;
157
158     pa_memblockq_drop(s->memblockq, chunk->length);
159
160     return 0;
161 }
162
163 /* Called from I/O thread context */
164 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
165     struct session *s;
166
167     pa_sink_input_assert_ref(i);
168     pa_assert_se(s = i->userdata);
169
170     pa_memblockq_rewind(s->memblockq, nbytes);
171 }
172
173 /* Called from I/O thread context */
174 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
175     struct session *s;
176
177     pa_sink_input_assert_ref(i);
178     pa_assert_se(s = i->userdata);
179
180     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
181 }
182
183 /* Called from main context */
184 static void sink_input_kill(pa_sink_input* i) {
185     struct session *s;
186     pa_sink_input_assert_ref(i);
187     pa_assert_se(s = i->userdata);
188
189     pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
190     session_free(s);
191 }
192
193 /* Called from IO context */
194 static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
195     struct session *s;
196     pa_sink_input_assert_ref(i);
197     pa_assert_se(s = i->userdata);
198
199     if (b)
200         pa_memblockq_flush_read(s->memblockq);
201     else
202         s->first_packet = false;
203 }
204
205 /* Called from I/O thread context */
206 static int rtpoll_work_cb(pa_rtpoll_item *i) {
207     pa_memchunk chunk;
208     int64_t k, j, delta;
209     struct timeval now = { 0, 0 };
210     struct session *s;
211     struct pollfd *p;
212
213     pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
214
215     p = pa_rtpoll_item_get_pollfd(i, NULL);
216
217     if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
218         pa_log("poll() signalled bad revents.");
219         return -1;
220     }
221
222     if ((p->revents & POLLIN) == 0)
223         return 0;
224
225     p->revents = 0;
226
227     if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
228         return 0;
229
230     if (s->sdp_info.payload != s->rtp_context.payload ||
231         !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
232         pa_memblock_unref(chunk.memblock);
233         return 0;
234     }
235
236     if (!s->first_packet) {
237         s->first_packet = true;
238
239         s->ssrc = s->rtp_context.ssrc;
240         s->offset = s->rtp_context.timestamp;
241
242         if (s->ssrc == s->userdata->module->core->cookie)
243             pa_log_warn("Detected RTP packet loop!");
244     } else {
245         if (s->ssrc != s->rtp_context.ssrc) {
246             pa_memblock_unref(chunk.memblock);
247             return 0;
248         }
249     }
250
251     /* Check whether there was a timestamp overflow */
252     k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
253     j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
254
255     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
256         delta = k;
257     else
258         delta = j;
259
260     pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
261
262     if (now.tv_sec == 0) {
263         PA_ONCE_BEGIN {
264             pa_log_warn("Using artificial time instead of timestamp");
265         } PA_ONCE_END;
266         pa_rtclock_get(&now);
267     } else
268         pa_rtclock_from_wallclock(&now);
269
270     if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
271         pa_log_warn("Queue overrun");
272         pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, true);
273     }
274
275 /*     pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
276
277     pa_memblock_unref(chunk.memblock);
278
279     /* The next timestamp we expect */
280     s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
281
282     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
283
284     if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
285         pa_usec_t wi, ri, render_delay, sink_delay = 0, latency;
286         uint32_t base_rate = s->sink_input->sink->sample_spec.rate;
287         uint32_t current_rate = s->sink_input->sample_spec.rate;
288         uint32_t new_rate;
289         double estimated_rate, alpha = 0.02;
290
291         pa_log_debug("Updating sample rate");
292
293         wi = pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec);
294         ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
295
296         pa_log_debug("wi=%lu ri=%lu", (unsigned long) wi, (unsigned long) ri);
297
298         sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
299         render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
300
301         if (ri > render_delay+sink_delay)
302             ri -= render_delay+sink_delay;
303         else
304             ri = 0;
305
306         if (wi < ri)
307             latency = 0;
308         else
309             latency = wi - ri;
310
311         pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
312
313         /* The buffer is filling with some unknown rate R̂ samples/second. If the rate of reading in
314          * the last T seconds was Rⁿ, then the increase in buffer latency ΔLⁿ = Lⁿ - Lⁿ⁻ⁱ in that
315          * same period is ΔLⁿ = (TR̂ - TRⁿ) / R̂, giving the estimated target rate
316          *                                           T
317          *                                 R̂ = ─────────────── Rⁿ .                             (1)
318          *                                     T - (Lⁿ - Lⁿ⁻ⁱ)
319          *
320          * Setting the sample rate to R̂ results in the latency being constant (if the estimate of R̂
321          * is correct).  But there is also the requirement to keep the buffer at a predefined target
322          * latency L̂.  So instead of setting Rⁿ⁺ⁱ to R̂ immediately, the strategy will be to reduce R
323          * from Rⁿ⁺ⁱ to R̂ in a steps of T seconds, where Rⁿ⁺ⁱ is chosen such that in the total time
324          * aT the latency is reduced from Lⁿ to L̂.  This strategy translates to the requirements
325          *            ₐ      R̂ - Rⁿ⁺ʲ                            a-j+1         j-1
326          *            Σ  T ────────── = L̂ - Lⁿ    with    Rⁿ⁺ʲ = ───── Rⁿ⁺ⁱ + ───── R̂ .
327          *           ʲ⁼ⁱ        R̂                                  a            a
328          * Solving for Rⁿ⁺ⁱ gives
329          *                                     T - ²∕ₐ₊₁(L̂ - Lⁿ)
330          *                              Rⁿ⁺ⁱ = ───────────────── R̂ .                            (2)
331          *                                            T
332          * In the code below a = 7 is used.
333          *
334          * Equation (1) is not directly used in (2), but instead an exponentially weighted average
335          * of the estimated rate R̂ is used.  This average R̅ is defined as
336          *                                R̅ⁿ = α R̂ⁿ + (1-α) R̅ⁿ⁻ⁱ .
337          * Because it is difficult to find a fixed value for the coefficient α such that the
338          * averaging is without significant lag but oscillations are filtered out, a heuristic is
339          * used.  When the successive estimates R̂ⁿ do not change much then α→1, but when there is a
340          * sudden spike in the estimated rate α→0, such that the deviation is given little weight.
341          */
342         estimated_rate = (double) current_rate * (double) RATE_UPDATE_INTERVAL / (double) (RATE_UPDATE_INTERVAL + s->last_latency - latency);
343         if (fabs(s->estimated_rate - s->avg_estimated_rate) > 1) {
344           double ratio = (estimated_rate + s->estimated_rate - 2*s->avg_estimated_rate) / (s->estimated_rate - s->avg_estimated_rate);
345           alpha = PA_CLAMP(2 * (ratio + fabs(ratio)) / (4 + ratio*ratio), 0.02, 0.8);
346         }
347         s->avg_estimated_rate = alpha * estimated_rate + (1-alpha) * s->avg_estimated_rate;
348         s->estimated_rate = estimated_rate;
349         pa_log_debug("Estimated target rate: %.0f Hz, using average of %.0f Hz  (α=%.3f)", estimated_rate, s->avg_estimated_rate, alpha);
350         new_rate = (uint32_t) ((double) (RATE_UPDATE_INTERVAL + latency/4 - s->intended_latency/4) / (double) RATE_UPDATE_INTERVAL * s->avg_estimated_rate);
351         s->last_latency = latency;
352
353         if (new_rate < (uint32_t) (base_rate*0.8) || new_rate > (uint32_t) (base_rate*1.25)) {
354             pa_log_warn("Sample rates too different, not adjusting (%u vs. %u).", base_rate, new_rate);
355             new_rate = base_rate;
356         } else {
357             if (base_rate < new_rate + 20 && new_rate < base_rate + 20)
358               new_rate = base_rate;
359             /* Do the adjustment in small steps; 2‰ can be considered inaudible */
360             if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) {
361                 pa_log_info("New rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", new_rate, current_rate);
362                 new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002));
363             }
364         }
365         s->sink_input->sample_spec.rate = new_rate;
366
367         pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
368
369         pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
370
371         pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
372
373         s->last_rate_update = pa_timeval_load(&now);
374     }
375
376     if (pa_memblockq_is_readable(s->memblockq) &&
377         s->sink_input->thread_info.underrun_for > 0) {
378         pa_log_debug("Requesting rewind due to end of underrun");
379         pa_sink_input_request_rewind(s->sink_input,
380                                      (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
381                                      false, true, false);
382     }
383
384     return 1;
385 }
386
387 /* Called from I/O thread context */
388 static void sink_input_attach(pa_sink_input *i) {
389     struct session *s;
390     struct pollfd *p;
391
392     pa_sink_input_assert_ref(i);
393     pa_assert_se(s = i->userdata);
394
395     pa_assert(!s->rtpoll_item);
396     s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
397
398     p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
399     p->fd = s->rtp_context.fd;
400     p->events = POLLIN;
401     p->revents = 0;
402
403     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
404     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
405 }
406
407 /* Called from I/O thread context */
408 static void sink_input_detach(pa_sink_input *i) {
409     struct session *s;
410     pa_sink_input_assert_ref(i);
411     pa_assert_se(s = i->userdata);
412
413     pa_assert(s->rtpoll_item);
414     pa_rtpoll_item_free(s->rtpoll_item);
415     s->rtpoll_item = NULL;
416 }
417
418 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
419     int af, fd = -1, r, one;
420
421     pa_assert(sa);
422     pa_assert(salen > 0);
423
424     af = sa->sa_family;
425     if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) {
426         pa_log("Failed to create socket: %s", pa_cstrerror(errno));
427         goto fail;
428     }
429
430     pa_make_udp_socket_low_delay(fd);
431
432 #ifdef SO_TIMESTAMP
433     one = 1;
434     if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) {
435         pa_log("SO_TIMESTAMP failed: %s", pa_cstrerror(errno));
436         goto fail;
437     }
438 #else
439     pa_log("SO_TIMESTAMP unsupported on this platform");
440     goto fail;
441 #endif
442
443     one = 1;
444     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
445         pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
446         goto fail;
447     }
448
449     if (af == AF_INET) {
450         struct ip_mreq mr4;
451         memset(&mr4, 0, sizeof(mr4));
452         mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
453         r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
454 #ifdef HAVE_IPV6
455     } else if (af == AF_INET6) {
456         struct ipv6_mreq mr6;
457         memset(&mr6, 0, sizeof(mr6));
458         mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
459         r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
460 #endif
461     } else
462         pa_assert_not_reached();
463
464     if (r < 0) {
465         pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
466         goto fail;
467     }
468
469     if (bind(fd, sa, salen) < 0) {
470         pa_log("bind() failed: %s", pa_cstrerror(errno));
471         goto fail;
472     }
473
474     return fd;
475
476 fail:
477     if (fd >= 0)
478         close(fd);
479
480     return -1;
481 }
482
483 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
484     struct session *s = NULL;
485     pa_sink *sink;
486     int fd = -1;
487     pa_memchunk silence;
488     pa_sink_input_new_data data;
489     struct timeval now;
490
491     pa_assert(u);
492     pa_assert(sdp_info);
493
494     if (u->n_sessions >= MAX_SESSIONS) {
495         pa_log("Session limit reached.");
496         goto fail;
497     }
498
499     if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
500         pa_log("Sink does not exist.");
501         goto fail;
502     }
503
504     pa_rtclock_get(&now);
505
506     s = pa_xnew0(struct session, 1);
507     s->userdata = u;
508     s->first_packet = false;
509     s->sdp_info = *sdp_info;
510     s->rtpoll_item = NULL;
511     s->intended_latency = LATENCY_USEC;
512     s->last_rate_update = pa_timeval_load(&now);
513     s->last_latency = LATENCY_USEC;
514     s->estimated_rate = (double) sink->sample_spec.rate;
515     s->avg_estimated_rate = (double) sink->sample_spec.rate;
516     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
517
518     if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
519         goto fail;
520
521     pa_sink_input_new_data_init(&data);
522     pa_sink_input_new_data_set_sink(&data, sink, false);
523     data.driver = __FILE__;
524     pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
525     pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
526                      "RTP Stream%s%s%s",
527                      sdp_info->session_name ? " (" : "",
528                      sdp_info->session_name ? sdp_info->session_name : "",
529                      sdp_info->session_name ? ")" : "");
530
531     if (sdp_info->session_name)
532         pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
533     pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
534     pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
535     data.module = u->module;
536     pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
537     data.flags = PA_SINK_INPUT_VARIABLE_RATE;
538
539     pa_sink_input_new(&s->sink_input, u->module->core, &data);
540     pa_sink_input_new_data_done(&data);
541
542     if (!s->sink_input) {
543         pa_log("Failed to create sink input.");
544         goto fail;
545     }
546
547     s->sink_input->userdata = s;
548
549     s->sink_input->parent.process_msg = sink_input_process_msg;
550     s->sink_input->pop = sink_input_pop_cb;
551     s->sink_input->process_rewind = sink_input_process_rewind_cb;
552     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
553     s->sink_input->kill = sink_input_kill;
554     s->sink_input->attach = sink_input_attach;
555     s->sink_input->detach = sink_input_detach;
556     s->sink_input->suspend_within_thread = sink_input_suspend_within_thread;
557
558     pa_sink_input_get_silence(s->sink_input, &silence);
559
560     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
561
562     if (s->intended_latency < s->sink_latency*2)
563         s->intended_latency = s->sink_latency*2;
564
565     s->memblockq = pa_memblockq_new(
566             "module-rtp-recv memblockq",
567             0,
568             MEMBLOCKQ_MAXLENGTH,
569             MEMBLOCKQ_MAXLENGTH,
570             &s->sink_input->sample_spec,
571             pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
572             0,
573             0,
574             &silence);
575
576     pa_memblock_unref(silence.memblock);
577
578     pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
579
580     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
581     u->n_sessions++;
582     PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
583
584     pa_sink_input_put(s->sink_input);
585
586     pa_log_info("New session '%s'", s->sdp_info.session_name);
587
588     return s;
589
590 fail:
591     pa_xfree(s);
592
593     if (fd >= 0)
594         pa_close(fd);
595
596     return NULL;
597 }
598
599 static void session_free(struct session *s) {
600     pa_assert(s);
601
602     pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
603
604     pa_sink_input_unlink(s->sink_input);
605     pa_sink_input_unref(s->sink_input);
606
607     PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
608     pa_assert(s->userdata->n_sessions >= 1);
609     s->userdata->n_sessions--;
610
611     pa_memblockq_free(s->memblockq);
612     pa_sdp_info_destroy(&s->sdp_info);
613     pa_rtp_context_destroy(&s->rtp_context);
614
615     pa_xfree(s);
616 }
617
618 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
619     struct userdata *u = userdata;
620     bool goodbye = false;
621     pa_sdp_info info;
622     struct session *s;
623
624     pa_assert(m);
625     pa_assert(e);
626     pa_assert(u);
627     pa_assert(fd == u->sap_context.fd);
628     pa_assert(flags == PA_IO_EVENT_INPUT);
629
630     if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
631         return;
632
633     if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
634         return;
635
636     if (goodbye) {
637
638         if ((s = pa_hashmap_remove(u->by_origin, info.origin)))
639             session_free(s);
640
641         pa_sdp_info_destroy(&info);
642     } else {
643
644         if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
645             if (!session_new(u, &info))
646                 pa_sdp_info_destroy(&info);
647
648         } else {
649             struct timeval now;
650             pa_rtclock_get(&now);
651             pa_atomic_store(&s->timestamp, (int) now.tv_sec);
652
653             pa_sdp_info_destroy(&info);
654         }
655     }
656 }
657
658 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *tv, void *userdata) {
659     struct session *s, *n;
660     struct userdata *u = userdata;
661     struct timeval now;
662
663     pa_assert(m);
664     pa_assert(t);
665     pa_assert(u);
666
667     pa_rtclock_get(&now);
668
669     pa_log_debug("Checking for dead streams ...");
670
671     for (s = u->sessions; s; s = n) {
672         int k;
673         n = s->next;
674
675         k = pa_atomic_load(&s->timestamp);
676
677         if (k + DEATH_TIMEOUT < now.tv_sec) {
678             pa_hashmap_remove(u->by_origin, s->sdp_info.origin);
679             session_free(s);
680         }
681     }
682
683     /* Restart timer */
684     pa_core_rttime_restart(u->module->core, t, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC);
685 }
686
687 int pa__init(pa_module*m) {
688     struct userdata *u;
689     pa_modargs *ma = NULL;
690     struct sockaddr_in sa4;
691 #ifdef HAVE_IPV6
692     struct sockaddr_in6 sa6;
693 #endif
694     struct sockaddr *sa;
695     socklen_t salen;
696     const char *sap_address;
697     int fd = -1;
698
699     pa_assert(m);
700
701     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
702         pa_log("failed to parse module arguments");
703         goto fail;
704     }
705
706     sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
707
708     if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
709         sa4.sin_family = AF_INET;
710         sa4.sin_port = htons(SAP_PORT);
711         sa = (struct sockaddr*) &sa4;
712         salen = sizeof(sa4);
713 #ifdef HAVE_IPV6
714     } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
715         sa6.sin6_family = AF_INET6;
716         sa6.sin6_port = htons(SAP_PORT);
717         sa = (struct sockaddr*) &sa6;
718         salen = sizeof(sa6);
719 #endif
720     } else {
721         pa_log("Invalid SAP address '%s'", sap_address);
722         goto fail;
723     }
724
725     if ((fd = mcast_socket(sa, salen)) < 0)
726         goto fail;
727
728     m->userdata = u = pa_xnew(struct userdata, 1);
729     u->module = m;
730     u->core = m->core;
731     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
732
733     u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
734     pa_sap_context_init_recv(&u->sap_context, fd);
735
736     PA_LLIST_HEAD_INIT(struct session, u->sessions);
737     u->n_sessions = 0;
738     u->by_origin = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, (pa_free_cb_t) session_free);
739
740     u->check_death_event = pa_core_rttime_new(m->core, pa_rtclock_now() + DEATH_TIMEOUT * PA_USEC_PER_SEC, check_death_event_cb, u);
741
742     pa_modargs_free(ma);
743
744     return 0;
745
746 fail:
747     if (ma)
748         pa_modargs_free(ma);
749
750     if (fd >= 0)
751         pa_close(fd);
752
753     return -1;
754 }
755
756 void pa__done(pa_module*m) {
757     struct userdata *u;
758
759     pa_assert(m);
760
761     if (!(u = m->userdata))
762         return;
763
764     if (u->sap_event)
765         m->core->mainloop->io_free(u->sap_event);
766
767     if (u->check_death_event)
768         m->core->mainloop->time_free(u->check_death_event);
769
770     pa_sap_context_destroy(&u->sap_context);
771
772     if (u->by_origin)
773         pa_hashmap_free(u->by_origin);
774
775     pa_xfree(u->sink_name);
776     pa_xfree(u);
777 }