rtpbin: Fix division by zero when using ts-offset-smoothing-factor
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @title: rtpbin
23  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
24  *
25  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
26  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
27  * RTP sessions that will be synchronized together using RTCP SR packets.
28  *
29  * #GstRtpBin is configured with a number of request pads that define the
30  * functionality that is activated, similar to the #GstRtpSession element.
31  *
32  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
33  * number must be specified in the pad name.
34  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
35  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
36  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
37  * the packets are released from the jitterbuffer, they will be forwarded to a
38  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
39  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
40  * rtpbin with the session number, SSRC and payload type respectively as the pad
41  * name.
42  *
43  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
44  * session number must be specified in the pad name.
45  *
46  * If you want the session manager to generate and send RTCP packets, request
47  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
48  * on this pad contain SR/RR RTCP reports that should be sent to all participants
49  * in the session.
50  *
51  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
52  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
53  * the pad from the lowest available session will be returned. The session manager will modify the
54  * SSRC in the RTP packets to its own SSRC and will forward the packets on the
55  * send_rtp_src_\%u pad after updating its internal state.
56  *
57  * The session manager needs the clock-rate of the payload types it is handling
58  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
59  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
60  * signal.
61  *
62  * Access to the internal statistics of rtpbin is provided with the
63  * get-internal-session property. This action signal gives access to the
64  * RTPSession object which further provides action signals to retrieve the
65  * internal source and other sources.
66  *
67  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
68  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
69  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
70  * and decoders in order to support SRTP. The encoders must provide the pads
71  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
72  * RTCP. The session number will be used in the pad name. The decoders must provide
73  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
74  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
75  * internally.
76  *
77  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
78  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
79  * used to create or merge additional RTP streams. AUX elements are needed to
80  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
81  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
82  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
83  * and the pad will be linked to the session send_rtp_sink pad. Each session will
84  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
85  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
86  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
87  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
88  * The #GstRtpBin::request-jitterbuffer signal can be used to provide a custom
89  * element to perform arrival time smoothing, reordering and optionally packet
90  * loss detection and retransmission requests.
91  *
92  * ## Example pipelines
93  *
94  * |[
95  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
96  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
97  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
98  * |[
99  * gst-launch-1.0 rtpbin name=rtpbin \
100  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
101  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
102  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
103  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
104  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
105  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
106  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
107  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
108  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
109  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
110  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
111  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
112  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
113  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
114  * is received on port 5007. Since RTCP packets from the sender should be sent
115  * as soon as possible and do not participate in preroll, sync=false and
116  * async=false is configured on udpsink
117  * |[
118  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
119  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
120  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
121  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
122  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
123  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
124  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
125  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
126  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
127  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
128  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
129  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
130  * decode and display the video.
131  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
132  * decode and play the audio.
133  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
134  * session 1 on port 5003. These packets will be used for session management and
135  * synchronisation.
136  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
137  * on port 5007.
138  *
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #include "config.h"
143 #endif
144 #include <stdio.h>
145 #include <string.h>
146
147 #include <gst/rtp/gstrtpbuffer.h>
148 #include <gst/rtp/gstrtcpbuffer.h>
149
150 #include "gstrtpbin.h"
151 #include "rtpsession.h"
152 #include "gstrtpsession.h"
153 #include "gstrtpjitterbuffer.h"
154
155 #include <gst/glib-compat-private.h>
156
157 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
158 #define GST_CAT_DEFAULT gst_rtp_bin_debug
159
160 /* sink pads */
161 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
162     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
163     GST_PAD_SINK,
164     GST_PAD_REQUEST,
165     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
166     );
167
168 /**
169  * GstRtpBin!recv_fec_sink_%u_%u:
170  *
171  * Sink template for receiving Forward Error Correction packets,
172  * in the form recv_fec_sink_<session_idx>_<fec_stream_idx>
173  *
174  * See #GstRTPST_2022_1_FecDec for example usage
175  *
176  * Since: 1.20
177  */
178 static GstStaticPadTemplate rtpbin_recv_fec_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS ("application/x-rtp")
183     );
184
185 /**
186  * GstRtpBin!send_fec_src_%u_%u:
187  *
188  * Src template for sending Forward Error Correction packets,
189  * in the form send_fec_src_<session_idx>_<fec_stream_idx>
190  *
191  * See #GstRTPST_2022_1_FecEnc for example usage
192  *
193  * Since: 1.20
194  */
195 static GstStaticPadTemplate rtpbin_send_fec_src_template =
196 GST_STATIC_PAD_TEMPLATE ("send_fec_src_%u_%u",
197     GST_PAD_SRC,
198     GST_PAD_SOMETIMES,
199     GST_STATIC_CAPS ("application/x-rtp")
200     );
201
202 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
203     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
204     GST_PAD_SINK,
205     GST_PAD_REQUEST,
206     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
207     );
208
209 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
210 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
211     GST_PAD_SINK,
212     GST_PAD_REQUEST,
213     GST_STATIC_CAPS ("application/x-rtp")
214     );
215
216 /* src pads */
217 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
218 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
219     GST_PAD_SRC,
220     GST_PAD_SOMETIMES,
221     GST_STATIC_CAPS ("application/x-rtp")
222     );
223
224 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
225     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
226     GST_PAD_SRC,
227     GST_PAD_REQUEST,
228     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
229     );
230
231 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
232     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
233     GST_PAD_SRC,
234     GST_PAD_SOMETIMES,
235     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
236     );
237
238 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
239 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
240
241 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
242 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
243 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
244
245 /* lock for shutdown */
246 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
247 G_STMT_START {                                   \
248   if (g_atomic_int_get (&bin->priv->shutdown))   \
249     goto label;                                  \
250   GST_RTP_BIN_DYN_LOCK (bin);                    \
251   if (g_atomic_int_get (&bin->priv->shutdown)) { \
252     GST_RTP_BIN_DYN_UNLOCK (bin);                \
253     goto label;                                  \
254   }                                              \
255 } G_STMT_END
256
257 /* unlock for shutdown */
258 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
259   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
260
261 /* Minimum time offset to apply. This compensates for rounding errors in NTP to
262  * RTP timestamp conversions */
263 #define MIN_TS_OFFSET_ROUND_OFF_COMP (4 * GST_MSECOND)
264
265 struct _GstRtpBinPrivate
266 {
267   GMutex bin_lock;
268
269   /* lock protecting dynamic adding/removing */
270   GMutex dyn_lock;
271
272   /* if we are shutting down or not */
273   gint shutdown;
274
275   gboolean autoremove;
276
277   /* NTP time in ns of last SR sync used */
278   guint64 last_ntpnstime;
279
280   /* list of extra elements */
281   GList *elements;
282 };
283
284 /* signals and args */
285 enum
286 {
287   SIGNAL_REQUEST_PT_MAP,
288   SIGNAL_PAYLOAD_TYPE_CHANGE,
289   SIGNAL_CLEAR_PT_MAP,
290   SIGNAL_RESET_SYNC,
291   SIGNAL_GET_SESSION,
292   SIGNAL_GET_INTERNAL_SESSION,
293   SIGNAL_GET_STORAGE,
294   SIGNAL_GET_INTERNAL_STORAGE,
295   SIGNAL_CLEAR_SSRC,
296
297   SIGNAL_ON_NEW_SSRC,
298   SIGNAL_ON_SSRC_COLLISION,
299   SIGNAL_ON_SSRC_VALIDATED,
300   SIGNAL_ON_SSRC_ACTIVE,
301   SIGNAL_ON_SSRC_SDES,
302   SIGNAL_ON_BYE_SSRC,
303   SIGNAL_ON_BYE_TIMEOUT,
304   SIGNAL_ON_TIMEOUT,
305   SIGNAL_ON_SENDER_TIMEOUT,
306   SIGNAL_ON_NPT_STOP,
307
308   SIGNAL_REQUEST_RTP_ENCODER,
309   SIGNAL_REQUEST_RTP_DECODER,
310   SIGNAL_REQUEST_RTCP_ENCODER,
311   SIGNAL_REQUEST_RTCP_DECODER,
312
313   SIGNAL_REQUEST_FEC_DECODER,
314   SIGNAL_REQUEST_FEC_DECODER_FULL,
315   SIGNAL_REQUEST_FEC_ENCODER,
316
317   SIGNAL_REQUEST_JITTERBUFFER,
318
319   SIGNAL_NEW_JITTERBUFFER,
320   SIGNAL_NEW_STORAGE,
321
322   SIGNAL_REQUEST_AUX_SENDER,
323   SIGNAL_REQUEST_AUX_RECEIVER,
324
325   SIGNAL_ON_NEW_SENDER_SSRC,
326   SIGNAL_ON_SENDER_SSRC_ACTIVE,
327
328   SIGNAL_ON_BUNDLED_SSRC,
329
330   LAST_SIGNAL
331 };
332
333 #define DEFAULT_LATENCY_MS           200
334 #define DEFAULT_DROP_ON_LATENCY      FALSE
335 #define DEFAULT_SDES                 NULL
336 #define DEFAULT_DO_LOST              FALSE
337 #define DEFAULT_IGNORE_PT            FALSE
338 #define DEFAULT_NTP_SYNC             FALSE
339 #define DEFAULT_AUTOREMOVE           FALSE
340 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
341 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
342 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
343 #define DEFAULT_RTCP_SYNC_INTERVAL   0
344 #define DEFAULT_DO_SYNC_EVENT        FALSE
345 #define DEFAULT_DO_RETRANSMISSION    FALSE
346 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
347 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
348 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
349 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
350 #define DEFAULT_MAX_DROPOUT_TIME     60000
351 #define DEFAULT_MAX_MISORDER_TIME    2000
352 #define DEFAULT_RFC7273_SYNC         FALSE
353 #define DEFAULT_ADD_REFERENCE_TIMESTAMP_META FALSE
354 #define DEFAULT_MAX_STREAMS          G_MAXUINT
355 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0)
356 #define DEFAULT_MAX_TS_OFFSET        G_GINT64_CONSTANT(3000000000)
357 #define DEFAULT_MIN_TS_OFFSET        MIN_TS_OFFSET_ROUND_OFF_COMP
358 #define DEFAULT_TS_OFFSET_SMOOTHING_FACTOR  0
359
360 enum
361 {
362   PROP_0,
363   PROP_LATENCY,
364   PROP_DROP_ON_LATENCY,
365   PROP_SDES,
366   PROP_DO_LOST,
367   PROP_IGNORE_PT,
368   PROP_NTP_SYNC,
369   PROP_RTCP_SYNC,
370   PROP_RTCP_SYNC_INTERVAL,
371   PROP_AUTOREMOVE,
372   PROP_BUFFER_MODE,
373   PROP_USE_PIPELINE_CLOCK,
374   PROP_DO_SYNC_EVENT,
375   PROP_DO_RETRANSMISSION,
376   PROP_RTP_PROFILE,
377   PROP_NTP_TIME_SOURCE,
378   PROP_RTCP_SYNC_SEND_TIME,
379   PROP_MAX_RTCP_RTP_TIME_DIFF,
380   PROP_MAX_DROPOUT_TIME,
381   PROP_MAX_MISORDER_TIME,
382   PROP_RFC7273_SYNC,
383   PROP_ADD_REFERENCE_TIMESTAMP_META,
384   PROP_MAX_STREAMS,
385   PROP_MAX_TS_OFFSET_ADJUSTMENT,
386   PROP_MAX_TS_OFFSET,
387   PROP_MIN_TS_OFFSET,
388   PROP_TS_OFFSET_SMOOTHING_FACTOR,
389   PROP_FEC_DECODERS,
390   PROP_FEC_ENCODERS,
391 };
392
393 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
394 static GType
395 gst_rtp_bin_rtcp_sync_get_type (void)
396 {
397   static GType rtcp_sync_type = 0;
398   static const GEnumValue rtcp_sync_types[] = {
399     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
400     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
401     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
402     {0, NULL, NULL},
403   };
404
405   if (!rtcp_sync_type) {
406     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
407   }
408   return rtcp_sync_type;
409 }
410
411 /* helper objects */
412 typedef struct _GstRtpBinSession GstRtpBinSession;
413 typedef struct _GstRtpBinStream GstRtpBinStream;
414 typedef struct _GstRtpBinClient GstRtpBinClient;
415
416 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
417
418 static GstCaps *pt_map_requested (GstElement * element, guint pt,
419     GstRtpBinSession * session);
420 static void payload_type_change (GstElement * element, guint pt,
421     GstRtpBinSession * session);
422 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
423 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
424 static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
425 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
426 static void remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
427 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
428 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
429 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
430 static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id);
431 static GstPad *complete_session_sink (GstRtpBin * rtpbin,
432     GstRtpBinSession * session);
433 static void
434 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
435     guint sessid);
436 static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
437     GstRtpBinSession * session, guint sessid);
438 static GstElement *session_request_element (GstRtpBinSession * session,
439     guint signal);
440
441 /* Manages the RTP stream for one SSRC.
442  *
443  * We pipe the stream (coming from the SSRC demuxer) into a jitterbuffer.
444  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
445  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
446  * together (see below).
447  */
448 struct _GstRtpBinStream
449 {
450   /* the SSRC of this stream */
451   guint32 ssrc;
452
453   /* parent bin */
454   GstRtpBin *bin;
455
456   /* the session this SSRC belongs to */
457   GstRtpBinSession *session;
458
459   /* the jitterbuffer of the SSRC */
460   GstElement *buffer;
461   gulong buffer_handlesync_sig;
462   gulong buffer_ptreq_sig;
463   gulong buffer_ntpstop_sig;
464   gint percent;
465
466   /* the PT demuxer of the SSRC */
467   GstElement *demux;
468   gulong demux_newpad_sig;
469   gulong demux_padremoved_sig;
470   gulong demux_ptreq_sig;
471   gulong demux_ptchange_sig;
472
473   /* if we have calculated a valid rt_delta for this stream */
474   gboolean have_sync;
475   /* mapping to local RTP and NTP time */
476   gint64 rt_delta;
477   gint64 rtp_delta;
478   gint64 avg_ts_offset;
479   gboolean is_initialized;
480   /* base rtptime in gst time */
481   gint64 clock_base;
482 };
483
484 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
485 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
486
487 /* Manages the receiving end of the packets.
488  *
489  * There is one such structure for each RTP session (audio/video/...).
490  * We get the RTP/RTCP packets and stuff them into the session manager. From
491  * there they are pushed into an SSRC demuxer that splits the stream based on
492  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
493  * the GstRtpBinStream above).
494  *
495  * Before the SSRC demuxer, a storage element may be inserted for the purpose
496  * of Forward Error Correction.
497  */
498 struct _GstRtpBinSession
499 {
500   /* session id */
501   gint id;
502   /* the parent bin */
503   GstRtpBin *bin;
504   /* the session element */
505   GstElement *session;
506   /* the SSRC demuxer */
507   GstElement *demux;
508   gulong demux_newpad_sig;
509   gulong demux_padremoved_sig;
510
511   /* Fec support */
512   GstElement *storage;
513
514   GMutex lock;
515
516   /* list of GstRtpBinStream */
517   GSList *streams;
518
519   /* list of elements */
520   GSList *elements;
521
522   /* mapping of payload type to caps */
523   GHashTable *ptmap;
524
525   /* the pads of the session */
526   GstPad *recv_rtp_sink;
527   GstPad *recv_rtp_sink_ghost;
528   GstPad *recv_rtp_src;
529   GstPad *recv_rtcp_sink;
530   GstPad *recv_rtcp_sink_ghost;
531   GstPad *sync_src;
532   GstPad *send_rtp_sink;
533   GstPad *send_rtp_sink_ghost;
534   GstPad *send_rtp_src_ghost;
535   GstPad *send_rtcp_src;
536   GstPad *send_rtcp_src_ghost;
537
538   GSList *recv_fec_sinks;
539   GSList *recv_fec_sink_ghosts;
540   /* fec decoder placed before the rtpjitterbuffer but after the rtpssrcdemux.
541    * XXX: This does not yet support multiple ssrc's in the same rtp session
542    */
543   GstElement *early_fec_decoder;
544
545   GSList *send_fec_src_ghosts;
546 };
547
548 /* Manages the RTP streams that come from one client and should therefore be
549  * synchronized.
550  */
551 struct _GstRtpBinClient
552 {
553   /* the common CNAME for the streams */
554   gchar *cname;
555   guint cname_len;
556
557   /* the streams */
558   guint nstreams;
559   GSList *streams;
560 };
561
562 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
563 static GstRtpBinSession *
564 find_session_by_id (GstRtpBin * rtpbin, gint id)
565 {
566   GSList *walk;
567
568   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
569     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
570
571     if (sess->id == id)
572       return sess;
573   }
574   return NULL;
575 }
576
577 static gboolean
578 pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad)
579 {
580   return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL;
581 }
582
583 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
584 static GstRtpBinSession *
585 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
586 {
587   GSList *walk;
588
589   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
590     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
591
592     if ((sess->recv_rtp_sink_ghost == pad) ||
593         (sess->recv_rtcp_sink_ghost == pad) ||
594         (sess->send_rtp_sink_ghost == pad) ||
595         (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad))
596       return sess;
597   }
598   return NULL;
599 }
600
601 static void
602 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
603 {
604   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
605       sess->id, ssrc);
606 }
607
608 static void
609 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
610 {
611   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
612       sess->id, ssrc);
613 }
614
615 static void
616 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
617 {
618   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
619       sess->id, ssrc);
620 }
621
622 static void
623 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
624 {
625   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
626       sess->id, ssrc);
627 }
628
629 static void
630 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
631 {
632   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
633       sess->id, ssrc);
634 }
635
636 static void
637 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
638 {
639   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
640       sess->id, ssrc);
641 }
642
643 static void
644 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
645 {
646   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
647       sess->id, ssrc);
648
649   if (sess->bin->priv->autoremove)
650     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
651 }
652
653 static void
654 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
655 {
656   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
657       sess->id, ssrc);
658
659   if (sess->bin->priv->autoremove)
660     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
661 }
662
663 static void
664 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
665 {
666   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
667       sess->id, ssrc);
668 }
669
670 static void
671 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
672 {
673   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
674       stream->session->id, stream->ssrc);
675 }
676
677 static void
678 on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
679 {
680   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
681       sess->id, ssrc);
682 }
683
684 static void
685 on_sender_ssrc_active (GstElement * session, guint32 ssrc,
686     GstRtpBinSession * sess)
687 {
688   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE],
689       0, sess->id, ssrc);
690 }
691
692 /* must be called with the SESSION lock */
693 static GstRtpBinStream *
694 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
695 {
696   GSList *walk;
697
698   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
699     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
700
701     if (stream->ssrc == ssrc)
702       return stream;
703   }
704   return NULL;
705 }
706
707 static void
708 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
709     GstRtpBinSession * session)
710 {
711   GstRtpBinStream *stream = NULL;
712   GstRtpBin *rtpbin;
713
714   rtpbin = session->bin;
715
716   GST_RTP_BIN_LOCK (rtpbin);
717
718   GST_RTP_SESSION_LOCK (session);
719   if ((stream = find_stream_by_ssrc (session, ssrc)))
720     session->streams = g_slist_remove (session->streams, stream);
721   GST_RTP_SESSION_UNLOCK (session);
722
723   if (stream)
724     free_stream (stream, rtpbin);
725
726   GST_RTP_BIN_UNLOCK (rtpbin);
727 }
728
729 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
730 static GstRtpBinSession *
731 create_session (GstRtpBin * rtpbin, gint id)
732 {
733   GstRtpBinSession *sess;
734   GstElement *session, *demux;
735   GstElement *storage = NULL;
736   GstState target;
737
738   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
739     goto no_session;
740
741   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
742     goto no_demux;
743
744   if (!(storage = gst_element_factory_make ("rtpstorage", NULL)))
745     goto no_storage;
746
747   /* need to sink the storage or otherwise signal handlers from bindings will
748    * take ownership of it and we don't own it anymore */
749   gst_object_ref_sink (storage);
750   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_STORAGE], 0, storage,
751       id);
752
753   sess = g_new0 (GstRtpBinSession, 1);
754   g_mutex_init (&sess->lock);
755   sess->id = id;
756   sess->bin = rtpbin;
757   sess->session = session;
758   sess->demux = demux;
759   sess->storage = storage;
760
761   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
762       (GDestroyNotify) gst_caps_unref);
763   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
764
765   /* configure SDES items */
766   GST_OBJECT_LOCK (rtpbin);
767   g_object_set (demux, "max-streams", rtpbin->max_streams, NULL);
768   g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile",
769       rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time,
770       NULL);
771   if (rtpbin->use_pipeline_clock)
772     g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock,
773         NULL);
774   else
775     g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL);
776
777   g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time,
778       "max-misorder-time", rtpbin->max_misorder_time, NULL);
779   GST_OBJECT_UNLOCK (rtpbin);
780
781   /* provide clock_rate to the session manager when needed */
782   g_signal_connect (session, "request-pt-map",
783       (GCallback) pt_map_requested, sess);
784
785   g_signal_connect (sess->session, "on-new-ssrc",
786       (GCallback) on_new_ssrc, sess);
787   g_signal_connect (sess->session, "on-ssrc-collision",
788       (GCallback) on_ssrc_collision, sess);
789   g_signal_connect (sess->session, "on-ssrc-validated",
790       (GCallback) on_ssrc_validated, sess);
791   g_signal_connect (sess->session, "on-ssrc-active",
792       (GCallback) on_ssrc_active, sess);
793   g_signal_connect (sess->session, "on-ssrc-sdes",
794       (GCallback) on_ssrc_sdes, sess);
795   g_signal_connect (sess->session, "on-bye-ssrc",
796       (GCallback) on_bye_ssrc, sess);
797   g_signal_connect (sess->session, "on-bye-timeout",
798       (GCallback) on_bye_timeout, sess);
799   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
800   g_signal_connect (sess->session, "on-sender-timeout",
801       (GCallback) on_sender_timeout, sess);
802   g_signal_connect (sess->session, "on-new-sender-ssrc",
803       (GCallback) on_new_sender_ssrc, sess);
804   g_signal_connect (sess->session, "on-sender-ssrc-active",
805       (GCallback) on_sender_ssrc_active, sess);
806
807   gst_bin_add (GST_BIN_CAST (rtpbin), session);
808   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
809   gst_bin_add (GST_BIN_CAST (rtpbin), storage);
810
811   /* unref the storage again, the bin has a reference now and
812    * we don't need it anymore */
813   gst_object_unref (storage);
814
815   GST_OBJECT_LOCK (rtpbin);
816   target = GST_STATE_TARGET (rtpbin);
817   GST_OBJECT_UNLOCK (rtpbin);
818
819   /* change state only to what's needed */
820   gst_element_set_state (demux, target);
821   gst_element_set_state (session, target);
822   gst_element_set_state (storage, target);
823
824   return sess;
825
826   /* ERRORS */
827 no_session:
828   {
829     g_warning ("rtpbin: could not create rtpsession element");
830     return NULL;
831   }
832 no_demux:
833   {
834     gst_object_unref (session);
835     g_warning ("rtpbin: could not create rtpssrcdemux element");
836     return NULL;
837   }
838 no_storage:
839   {
840     gst_object_unref (session);
841     gst_object_unref (demux);
842     g_warning ("rtpbin: could not create rtpstorage element");
843     return NULL;
844   }
845 }
846
847 static gboolean
848 bin_manage_element (GstRtpBin * bin, GstElement * element)
849 {
850   GstRtpBinPrivate *priv = bin->priv;
851
852   if (g_list_find (priv->elements, element)) {
853     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
854   } else {
855     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
856
857     if (g_object_is_floating (element))
858       element = gst_object_ref_sink (element);
859
860     if (!gst_bin_add (GST_BIN_CAST (bin), element))
861       goto add_failed;
862     if (!gst_element_sync_state_with_parent (element))
863       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
864   }
865   /* we add the element multiple times, each we need an equal number of
866    * removes to really remove the element from the bin */
867   priv->elements = g_list_prepend (priv->elements, element);
868
869   return TRUE;
870
871   /* ERRORS */
872 add_failed:
873   {
874     GST_WARNING_OBJECT (bin, "unable to add element");
875     gst_object_unref (element);
876     return FALSE;
877   }
878 }
879
880 static void
881 remove_bin_element (GstElement * element, GstRtpBin * bin)
882 {
883   GstRtpBinPrivate *priv = bin->priv;
884   GList *find;
885
886   find = g_list_find (priv->elements, element);
887   if (find) {
888     priv->elements = g_list_delete_link (priv->elements, find);
889
890     if (!g_list_find (priv->elements, element)) {
891       gst_element_set_locked_state (element, TRUE);
892       gst_bin_remove (GST_BIN_CAST (bin), element);
893       gst_element_set_state (element, GST_STATE_NULL);
894     }
895
896     gst_object_unref (element);
897   }
898 }
899
900 /* called with RTP_BIN_LOCK */
901 static void
902 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
903 {
904   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
905
906   gst_element_set_locked_state (sess->demux, TRUE);
907   gst_element_set_locked_state (sess->session, TRUE);
908   gst_element_set_locked_state (sess->storage, TRUE);
909
910   gst_element_set_state (sess->demux, GST_STATE_NULL);
911   gst_element_set_state (sess->session, GST_STATE_NULL);
912   gst_element_set_state (sess->storage, GST_STATE_NULL);
913
914   remove_recv_rtp (bin, sess);
915   remove_recv_rtcp (bin, sess);
916   remove_recv_fec (bin, sess);
917   remove_send_rtp (bin, sess);
918   remove_send_fec (bin, sess);
919   remove_rtcp (bin, sess);
920
921   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
922   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
923   gst_bin_remove (GST_BIN_CAST (bin), sess->storage);
924
925   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
926   g_slist_free (sess->elements);
927   sess->elements = NULL;
928
929   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
930   g_slist_free (sess->streams);
931
932   g_mutex_clear (&sess->lock);
933   g_hash_table_destroy (sess->ptmap);
934
935   g_free (sess);
936 }
937
938 /* get the payload type caps for the specific payload @pt in @session */
939 static GstCaps *
940 get_pt_map (GstRtpBinSession * session, guint pt)
941 {
942   GstCaps *caps = NULL;
943   GstRtpBin *bin;
944   GValue ret = { 0 };
945   GValue args[3] = { {0}, {0}, {0} };
946
947   GST_DEBUG ("searching pt %u in cache", pt);
948
949   GST_RTP_SESSION_LOCK (session);
950
951   /* first look in the cache */
952   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
953   if (caps) {
954     gst_caps_ref (caps);
955     goto done;
956   }
957
958   bin = session->bin;
959
960   GST_DEBUG ("emitting signal for pt %u in session %u", pt, session->id);
961
962   /* not in cache, send signal to request caps */
963   g_value_init (&args[0], GST_TYPE_ELEMENT);
964   g_value_set_object (&args[0], bin);
965   g_value_init (&args[1], G_TYPE_UINT);
966   g_value_set_uint (&args[1], session->id);
967   g_value_init (&args[2], G_TYPE_UINT);
968   g_value_set_uint (&args[2], pt);
969
970   g_value_init (&ret, GST_TYPE_CAPS);
971   g_value_set_boxed (&ret, NULL);
972
973   GST_RTP_SESSION_UNLOCK (session);
974
975   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
976
977   GST_RTP_SESSION_LOCK (session);
978
979   g_value_unset (&args[0]);
980   g_value_unset (&args[1]);
981   g_value_unset (&args[2]);
982
983   /* look in the cache again because we let the lock go */
984   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
985   if (caps) {
986     gst_caps_ref (caps);
987     g_value_unset (&ret);
988     goto done;
989   }
990
991   caps = (GstCaps *) g_value_dup_boxed (&ret);
992   g_value_unset (&ret);
993   if (!caps)
994     goto no_caps;
995
996   GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps);
997
998   /* store in cache, take additional ref */
999   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
1000       gst_caps_ref (caps));
1001
1002 done:
1003   GST_RTP_SESSION_UNLOCK (session);
1004
1005   return caps;
1006
1007   /* ERRORS */
1008 no_caps:
1009   {
1010     GST_RTP_SESSION_UNLOCK (session);
1011     GST_DEBUG ("no pt map could be obtained");
1012     return NULL;
1013   }
1014 }
1015
1016 static gboolean
1017 return_true (gpointer key, gpointer value, gpointer user_data)
1018 {
1019   return TRUE;
1020 }
1021
1022 static void
1023 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
1024 {
1025   GSList *clients, *streams;
1026
1027   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
1028
1029   GST_RTP_BIN_LOCK (rtpbin);
1030   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
1031     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1032
1033     /* reset sync on all streams for this client */
1034     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
1035       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1036
1037       /* make use require a new SR packet for this stream before we attempt new
1038        * lip-sync */
1039       stream->have_sync = FALSE;
1040       stream->rt_delta = 0;
1041       stream->avg_ts_offset = 0;
1042       stream->is_initialized = FALSE;
1043       stream->rtp_delta = 0;
1044       stream->clock_base = -100 * GST_SECOND;
1045     }
1046   }
1047   GST_RTP_BIN_UNLOCK (rtpbin);
1048 }
1049
1050 static void
1051 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
1052 {
1053   GSList *sessions, *streams;
1054
1055   GST_RTP_BIN_LOCK (bin);
1056   GST_DEBUG_OBJECT (bin, "clearing pt map");
1057   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1058     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1059
1060     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
1061     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
1062
1063     GST_RTP_SESSION_LOCK (session);
1064     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
1065
1066     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1067       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1068
1069       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
1070       if (g_signal_lookup ("clear-pt-map", G_OBJECT_TYPE (stream->buffer)) != 0)
1071         g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
1072       if (stream->demux)
1073         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
1074     }
1075     GST_RTP_SESSION_UNLOCK (session);
1076   }
1077   GST_RTP_BIN_UNLOCK (bin);
1078
1079   /* reset sync too */
1080   gst_rtp_bin_reset_sync (bin);
1081 }
1082
1083 static GstElement *
1084 gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id)
1085 {
1086   GstRtpBinSession *session;
1087   GstElement *ret = NULL;
1088
1089   GST_RTP_BIN_LOCK (bin);
1090   GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id);
1091   session = find_session_by_id (bin, (gint) session_id);
1092   if (session) {
1093     ret = gst_object_ref (session->session);
1094   }
1095   GST_RTP_BIN_UNLOCK (bin);
1096
1097   return ret;
1098 }
1099
1100 static RTPSession *
1101 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
1102 {
1103   RTPSession *internal_session = NULL;
1104   GstRtpBinSession *session;
1105
1106   GST_RTP_BIN_LOCK (bin);
1107   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u",
1108       session_id);
1109   session = find_session_by_id (bin, (gint) session_id);
1110   if (session) {
1111     g_object_get (session->session, "internal-session", &internal_session,
1112         NULL);
1113   }
1114   GST_RTP_BIN_UNLOCK (bin);
1115
1116   return internal_session;
1117 }
1118
1119 static GstElement *
1120 gst_rtp_bin_get_storage (GstRtpBin * bin, guint session_id)
1121 {
1122   GstRtpBinSession *session;
1123   GstElement *res = NULL;
1124
1125   GST_RTP_BIN_LOCK (bin);
1126   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1127       session_id);
1128   session = find_session_by_id (bin, (gint) session_id);
1129   if (session && session->storage) {
1130     res = gst_object_ref (session->storage);
1131   }
1132   GST_RTP_BIN_UNLOCK (bin);
1133
1134   return res;
1135 }
1136
1137 static GObject *
1138 gst_rtp_bin_get_internal_storage (GstRtpBin * bin, guint session_id)
1139 {
1140   GObject *internal_storage = NULL;
1141   GstRtpBinSession *session;
1142
1143   GST_RTP_BIN_LOCK (bin);
1144   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1145       session_id);
1146   session = find_session_by_id (bin, (gint) session_id);
1147   if (session && session->storage) {
1148     g_object_get (session->storage, "internal-storage", &internal_storage,
1149         NULL);
1150   }
1151   GST_RTP_BIN_UNLOCK (bin);
1152
1153   return internal_storage;
1154 }
1155
1156 static void
1157 gst_rtp_bin_clear_ssrc (GstRtpBin * bin, guint session_id, guint32 ssrc)
1158 {
1159   GstRtpBinSession *session;
1160   GstElement *demux = NULL;
1161
1162   GST_RTP_BIN_LOCK (bin);
1163   GST_DEBUG_OBJECT (bin, "clearing ssrc %u for session %u", ssrc, session_id);
1164   session = find_session_by_id (bin, (gint) session_id);
1165   if (session)
1166     demux = gst_object_ref (session->demux);
1167   GST_RTP_BIN_UNLOCK (bin);
1168
1169   if (demux) {
1170     g_signal_emit_by_name (demux, "clear-ssrc", ssrc, NULL);
1171     gst_object_unref (demux);
1172   }
1173 }
1174
1175 static GstElement *
1176 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
1177 {
1178   GST_DEBUG_OBJECT (bin, "return NULL encoder");
1179   return NULL;
1180 }
1181
1182 static GstElement *
1183 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
1184 {
1185   GST_DEBUG_OBJECT (bin, "return NULL decoder");
1186   return NULL;
1187 }
1188
1189 static GstElement *
1190 gst_rtp_bin_request_jitterbuffer (GstRtpBin * bin, guint session_id)
1191 {
1192   return gst_element_factory_make ("rtpjitterbuffer", NULL);
1193 }
1194
1195 static void
1196 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
1197     const gchar * name, const GValue * value)
1198 {
1199   GSList *sessions, *streams;
1200
1201   GST_RTP_BIN_LOCK (bin);
1202   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1203     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1204
1205     GST_RTP_SESSION_LOCK (session);
1206     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1207       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1208       GObjectClass *jb_class;
1209
1210       jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1211       if (g_object_class_find_property (jb_class, name))
1212         g_object_set_property (G_OBJECT (stream->buffer), name, value);
1213       else
1214         GST_WARNING_OBJECT (bin,
1215             "Stream jitterbuffer does not expose property %s", name);
1216     }
1217     GST_RTP_SESSION_UNLOCK (session);
1218   }
1219   GST_RTP_BIN_UNLOCK (bin);
1220 }
1221
1222 static void
1223 gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin,
1224     const gchar * name, const GValue * value)
1225 {
1226   GSList *sessions;
1227
1228   GST_RTP_BIN_LOCK (bin);
1229   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1230     GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data;
1231
1232     g_object_set_property (G_OBJECT (sess->session), name, value);
1233   }
1234   GST_RTP_BIN_UNLOCK (bin);
1235 }
1236
1237 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
1238 static GstRtpBinClient *
1239 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
1240 {
1241   GstRtpBinClient *result = NULL;
1242   GSList *walk;
1243
1244   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
1245     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
1246
1247     if (len != client->cname_len)
1248       continue;
1249
1250     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
1251       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
1252           client->cname);
1253       result = client;
1254       break;
1255     }
1256   }
1257
1258   /* nothing found, create one */
1259   if (result == NULL) {
1260     result = g_new0 (GstRtpBinClient, 1);
1261     result->cname = g_strndup ((gchar *) data, len);
1262     result->cname_len = len;
1263     bin->clients = g_slist_prepend (bin->clients, result);
1264     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
1265         result->cname);
1266   }
1267   return result;
1268 }
1269
1270 static void
1271 free_client (GstRtpBinClient * client, GstRtpBin * bin)
1272 {
1273   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
1274   g_slist_free (client->streams);
1275   g_free (client->cname);
1276   g_free (client);
1277 }
1278
1279 static void
1280 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
1281     guint64 * ntpnstime)
1282 {
1283   guint64 ntpns = -1;
1284   GstClock *clock;
1285   GstClockTime base_time, rt, clock_time;
1286
1287   GST_OBJECT_LOCK (bin);
1288   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1289     base_time = GST_ELEMENT_CAST (bin)->base_time;
1290     gst_object_ref (clock);
1291     GST_OBJECT_UNLOCK (bin);
1292
1293     /* get current clock time and convert to running time */
1294     clock_time = gst_clock_get_time (clock);
1295     rt = clock_time - base_time;
1296
1297     if (bin->use_pipeline_clock) {
1298       ntpns = rt;
1299       /* add constant to convert from 1970 based time to 1900 based time */
1300       ntpns += (2208988800LL * GST_SECOND);
1301     } else {
1302       switch (bin->ntp_time_source) {
1303         case GST_RTP_NTP_TIME_SOURCE_NTP:
1304         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1305           /* get current NTP time */
1306           ntpns = g_get_real_time () * GST_USECOND;
1307
1308           /* add constant to convert from 1970 based time to 1900 based time */
1309           if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1310             ntpns += (2208988800LL * GST_SECOND);
1311           break;
1312         }
1313         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1314           ntpns = rt;
1315           break;
1316         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1317           ntpns = clock_time;
1318           break;
1319         default:
1320           ntpns = -1;           /* Fix uninited compiler warning */
1321           g_assert_not_reached ();
1322           break;
1323       }
1324     }
1325
1326     gst_object_unref (clock);
1327   } else {
1328     GST_OBJECT_UNLOCK (bin);
1329     rt = -1;
1330     ntpns = -1;
1331   }
1332   if (running_time)
1333     *running_time = rt;
1334   if (ntpnstime)
1335     *ntpnstime = ntpns;
1336 }
1337
1338 static void
1339 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1340     gint64 ts_offset, gint64 max_ts_offset, guint64 min_ts_offset,
1341     gboolean allow_positive_ts_offset)
1342 {
1343   gint64 prev_ts_offset;
1344   GObjectClass *jb_class;
1345
1346   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1347
1348   if (!g_object_class_find_property (jb_class, "ts-offset")) {
1349     GST_LOG_OBJECT (bin,
1350         "stream's jitterbuffer does not expose ts-offset property");
1351     return;
1352   }
1353
1354   if (bin->ts_offset_smoothing_factor > 0) {
1355     if (!stream->is_initialized) {
1356       stream->avg_ts_offset = ts_offset;
1357       stream->is_initialized = TRUE;
1358     } else {
1359       /* RMA algorithm using smoothing factor is following, but split into
1360        * parts to check for overflows:
1361        * stream->avg_ts_offset =
1362        *   ((bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset
1363        *    + ts_offset) / bin->ts_offset_smoothing_factor
1364        */
1365       guint64 max_possible_smoothing_factor = G_MAXUINT64;
1366       gint64 cur_avg_product =
1367           (bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset;
1368       if (stream->avg_ts_offset != 0)
1369         max_possible_smoothing_factor =
1370             G_MAXINT64 / ABS (stream->avg_ts_offset);
1371
1372       if ((max_possible_smoothing_factor < bin->ts_offset_smoothing_factor) ||
1373           (cur_avg_product > 0 && G_MAXINT64 - cur_avg_product < ts_offset) ||
1374           (cur_avg_product < 0 && G_MININT64 - cur_avg_product > ts_offset)) {
1375         GST_WARNING_OBJECT (bin,
1376             "ts-offset-smoothing-factor calculation overflow, fallback to using ts-offset directly");
1377         stream->avg_ts_offset = ts_offset;
1378       } else {
1379         stream->avg_ts_offset =
1380             (cur_avg_product + ts_offset) / bin->ts_offset_smoothing_factor;
1381       }
1382     }
1383   } else {
1384     stream->avg_ts_offset = ts_offset;
1385   }
1386
1387   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1388
1389   /* delta changed, see how much */
1390   if (prev_ts_offset != stream->avg_ts_offset) {
1391     gint64 diff;
1392
1393     diff = prev_ts_offset - stream->avg_ts_offset;
1394
1395     GST_DEBUG_OBJECT (bin,
1396         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1397         ", diff: %" G_GINT64_FORMAT, stream->avg_ts_offset, prev_ts_offset,
1398         diff);
1399
1400     /* ignore minor offsets */
1401     if (ABS (diff) < min_ts_offset) {
1402       GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1403       return;
1404     }
1405
1406     /* sanity check offset */
1407     if (max_ts_offset > 0) {
1408       if (stream->avg_ts_offset > 0 && !allow_positive_ts_offset) {
1409         GST_DEBUG_OBJECT (bin,
1410             "offset is positive (clocks are out of sync), ignoring");
1411         return;
1412       }
1413       if (ABS (stream->avg_ts_offset) > max_ts_offset) {
1414         GST_DEBUG_OBJECT (bin, "offset too large, ignoring");
1415         return;
1416       }
1417     }
1418
1419     g_object_set (stream->buffer, "ts-offset", stream->avg_ts_offset, NULL);
1420   }
1421   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1422       stream->ssrc, stream->avg_ts_offset);
1423 }
1424
1425 static void
1426 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1427 {
1428   if (stream->bin->send_sync_event) {
1429     GstEvent *event;
1430     GstPad *srcpad;
1431
1432     GST_DEBUG_OBJECT (stream->bin,
1433         "sending GstRTCPSRReceived event downstream");
1434
1435     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1436         gst_structure_new_empty ("GstRTCPSRReceived"));
1437
1438     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1439     gst_pad_push_event (srcpad, event);
1440     gst_object_unref (srcpad);
1441   }
1442 }
1443
1444 /* associate a stream to the given CNAME. This will make sure all streams for
1445  * that CNAME are synchronized together.
1446  * Must be called with GST_RTP_BIN_LOCK */
1447 static void
1448 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1449     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1450     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1451     gint64 rtp_clock_base)
1452 {
1453   GstRtpBinClient *client;
1454   gboolean created;
1455   GSList *walk;
1456   GstClockTime running_time, running_time_rtp;
1457   guint64 ntpnstime;
1458
1459   /* first find or create the CNAME */
1460   client = get_client (bin, len, data, &created);
1461
1462   /* find stream in the client */
1463   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1464     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1465
1466     if (ostream == stream)
1467       break;
1468   }
1469   /* not found, add it to the list */
1470   if (walk == NULL) {
1471     GST_DEBUG_OBJECT (bin,
1472         "new association of SSRC %08x with client %p with CNAME %s",
1473         stream->ssrc, client, client->cname);
1474     client->streams = g_slist_prepend (client->streams, stream);
1475     client->nstreams++;
1476   } else {
1477     GST_DEBUG_OBJECT (bin,
1478         "found association of SSRC %08x with client %p with CNAME %s",
1479         stream->ssrc, client, client->cname);
1480   }
1481
1482   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1483     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1484     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1485       /* we don't need that data, so carry on,
1486        * but make some values look saner */
1487       last_extrtptime = base_rtptime;
1488     } else {
1489       /* nothing we can do with this data in this case */
1490       GST_DEBUG_OBJECT (bin, "bailing out");
1491       return;
1492     }
1493   }
1494
1495   /* Take the extended rtptime we found in the SR packet and map it to the
1496    * local rtptime. The local rtp time is used to construct timestamps on the
1497    * buffers so we will calculate what running_time corresponds to the RTP
1498    * timestamp in the SR packet. */
1499   running_time_rtp = last_extrtptime - base_rtptime;
1500
1501   GST_DEBUG_OBJECT (bin,
1502       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1503       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1504       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1505       last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
1506
1507   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1508    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1509    * into a corresponding gstreamer timestamp. Note that the base_time also
1510    * contains the drift between sender and receiver. */
1511   running_time =
1512       gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
1513   running_time += base_time;
1514
1515   /* convert ntptime to nanoseconds */
1516   ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND,
1517       (G_GINT64_CONSTANT (1) << 32));
1518
1519   stream->have_sync = TRUE;
1520
1521   GST_DEBUG_OBJECT (bin,
1522       "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
1523       running_time, ntpnstime);
1524
1525   /* recalc inter stream playout offset, but only if there is more than one
1526    * stream or we're doing NTP sync. */
1527   if (bin->ntp_sync) {
1528     gint64 ntpdiff, rtdiff;
1529     guint64 local_ntpnstime;
1530     GstClockTime local_running_time;
1531
1532     /* For NTP sync we need to first get a snapshot of running_time and NTP
1533      * time. We know at what running_time we play a certain RTP time, we also
1534      * calculated when we would play the RTP time in the SR packet. Now we need
1535      * to know how the running_time and the NTP time relate to each other. */
1536     get_current_times (bin, &local_running_time, &local_ntpnstime);
1537
1538     /* see how far away the NTP time is. This is the difference between the
1539      * current NTP time and the NTP time in the last SR packet. */
1540     ntpdiff = local_ntpnstime - ntpnstime;
1541     /* see how far away the running_time is. This is the difference between the
1542      * current running_time and the running_time of the RTP timestamp in the
1543      * last SR packet. */
1544     rtdiff = local_running_time - running_time;
1545
1546     GST_DEBUG_OBJECT (bin,
1547         "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
1548         local_ntpnstime, ntpnstime);
1549     GST_DEBUG_OBJECT (bin,
1550         "local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
1551         G_GUINT64_FORMAT, local_running_time, running_time);
1552     GST_DEBUG_OBJECT (bin,
1553         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1554         rtdiff);
1555
1556     /* combine to get the final diff to apply to the running_time */
1557     stream->rt_delta = rtdiff - ntpdiff;
1558
1559     stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset,
1560         bin->min_ts_offset, FALSE);
1561   } else {
1562     gint64 min, rtp_min, clock_base = stream->clock_base;
1563     gboolean all_sync, use_rtp;
1564     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1565
1566     /* calculate delta between server and receiver. ntpnstime is created by
1567      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1568      * delta expresses the difference to our timeline and the server timeline. The
1569      * difference in itself doesn't mean much but we can combine the delta of
1570      * multiple streams to create a stream specific offset. */
1571     stream->rt_delta = ntpnstime - running_time;
1572
1573     /* calculate the min of all deltas, ignoring streams that did not yet have a
1574      * valid rt_delta because we did not yet receive an SR packet for those
1575      * streams.
1576      * We calculate the minimum because we would like to only apply positive
1577      * offsets to streams, delaying their playback instead of trying to speed up
1578      * other streams (which might be impossible when we have to create negative
1579      * latencies).
1580      * The stream that has the smallest diff is selected as the reference stream,
1581      * all other streams will have a positive offset to this difference. */
1582
1583     /* some alternative setting allow ignoring RTCP as much as possible,
1584      * for servers generating bogus ntp timeline */
1585     min = rtp_min = G_MAXINT64;
1586     use_rtp = FALSE;
1587     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1588       guint64 ext_base;
1589
1590       use_rtp = TRUE;
1591       /* signed version for convenience */
1592       clock_base = base_rtptime;
1593       /* deal with possible wrap-around */
1594       ext_base = base_rtptime;
1595       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1596       /* sanity check; base rtp and provided clock_base should be close */
1597       if (rtp_clock_base >= clock_base) {
1598         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1599           rtp_clock_base = base_time +
1600               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1601               GST_SECOND, clock_rate);
1602         } else {
1603           use_rtp = FALSE;
1604         }
1605       } else {
1606         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1607           rtp_clock_base = base_time -
1608               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1609               GST_SECOND, clock_rate);
1610         } else {
1611           use_rtp = FALSE;
1612         }
1613       }
1614       /* warn and bail for clarity out if no sane values */
1615       if (!use_rtp) {
1616         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1617         return;
1618       }
1619       /* store to track changes */
1620       clock_base = rtp_clock_base;
1621       /* generate a fake as before,
1622        * now equating rtptime obtained from RTP-Info,
1623        * where the large time represent the otherwise irrelevant npt/ntp time */
1624       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1625     } else {
1626       clock_base = rtp_clock_base;
1627     }
1628
1629     all_sync = TRUE;
1630     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1631       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1632
1633       if (!ostream->have_sync) {
1634         all_sync = FALSE;
1635         continue;
1636       }
1637
1638       /* change in current stream's base from previously init'ed value
1639        * leads to reset of all stream's base */
1640       if (stream != ostream && stream->clock_base >= 0 &&
1641           (stream->clock_base != clock_base)) {
1642         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1643         ostream->clock_base = -100 * GST_SECOND;
1644         ostream->rtp_delta = 0;
1645       }
1646
1647       if (ostream->rt_delta < min)
1648         min = ostream->rt_delta;
1649       if (ostream->rtp_delta < rtp_min)
1650         rtp_min = ostream->rtp_delta;
1651     }
1652
1653     /* arrange to re-sync for each stream upon significant change,
1654      * e.g. post-seek */
1655     all_sync = all_sync && (stream->clock_base == clock_base);
1656     stream->clock_base = clock_base;
1657
1658     /* may need init performed above later on, but nothing more to do now */
1659     if (client->nstreams <= 1)
1660       return;
1661
1662     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1663         " all sync %d", client, min, all_sync);
1664     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1665
1666     switch (rtcp_sync) {
1667       case GST_RTP_BIN_RTCP_SYNC_RTP:
1668         if (!use_rtp)
1669           break;
1670         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1671             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1672         /* fall-through */
1673       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1674         /* if all have been synced already, do not bother further */
1675         if (all_sync) {
1676           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1677           return;
1678         }
1679         break;
1680       default:
1681         break;
1682     }
1683
1684     /* bail out if we adjusted recently enough */
1685     if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
1686         bin->rtcp_sync_interval * GST_MSECOND) {
1687       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1688           "previous sender info too recent "
1689           "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
1690       return;
1691     }
1692     bin->priv->last_ntpnstime = ntpnstime;
1693
1694     /* calculate offsets for each stream */
1695     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1696       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1697       gint64 ts_offset;
1698
1699       /* ignore streams for which we didn't receive an SR packet yet, we
1700        * can't synchronize them yet. We can however sync other streams just
1701        * fine. */
1702       if (!ostream->have_sync)
1703         continue;
1704
1705       /* calculate offset to our reference stream, this should always give a
1706        * positive number. */
1707       if (use_rtp)
1708         ts_offset = ostream->rtp_delta - rtp_min;
1709       else
1710         ts_offset = ostream->rt_delta - min;
1711
1712       stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
1713           bin->min_ts_offset, TRUE);
1714     }
1715   }
1716   gst_rtp_bin_send_sync_event (stream);
1717
1718   return;
1719 }
1720
1721 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1722   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1723           (b) = gst_rtcp_packet_move_to_next ((packet)))
1724
1725 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1726   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1727           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1728
1729 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1730   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1731           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1732
1733 static void
1734 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1735     GstRtpBinStream * stream)
1736 {
1737   GstRtpBin *bin;
1738   GstRTCPPacket packet;
1739   guint32 ssrc;
1740   guint64 ntptime;
1741   gboolean have_sr, have_sdes;
1742   gboolean more;
1743   guint64 base_rtptime;
1744   guint64 base_time;
1745   guint clock_rate;
1746   guint64 clock_base;
1747   guint64 extrtptime;
1748   GstBuffer *buffer;
1749   GstRTCPBuffer rtcp = { NULL, };
1750
1751   bin = stream->bin;
1752
1753   GST_DEBUG_OBJECT (bin, "sync handler called");
1754
1755   /* get the last relation between the rtp timestamps and the gstreamer
1756    * timestamps. We get this info directly from the jitterbuffer which
1757    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1758    * what the current situation is. */
1759   base_rtptime =
1760       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1761   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1762   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1763   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1764   extrtptime =
1765       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1766   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1767
1768   have_sr = FALSE;
1769   have_sdes = FALSE;
1770
1771   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1772
1773   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1774     /* first packet must be SR or RR or else the validate would have failed */
1775     switch (gst_rtcp_packet_get_type (&packet)) {
1776       case GST_RTCP_TYPE_SR:
1777         /* only parse first. There is only supposed to be one SR in the packet
1778          * but we will deal with malformed packets gracefully */
1779         if (have_sr)
1780           break;
1781         /* get NTP and RTP times */
1782         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1783             NULL, NULL);
1784
1785         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1786         /* ignore SR that is not ours */
1787         if (ssrc != stream->ssrc)
1788           continue;
1789
1790         have_sr = TRUE;
1791         break;
1792       case GST_RTCP_TYPE_SDES:
1793       {
1794         gboolean more_items, more_entries;
1795
1796         /* only deal with first SDES, there is only supposed to be one SDES in
1797          * the RTCP packet but we deal with bad packets gracefully. Also bail
1798          * out if we have not seen an SR item yet. */
1799         if (have_sdes || !have_sr)
1800           break;
1801
1802         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1803           /* skip items that are not about the SSRC of the sender */
1804           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1805             continue;
1806
1807           /* find the CNAME entry */
1808           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1809             GstRTCPSDESType type;
1810             guint8 len;
1811             guint8 *data;
1812
1813             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1814
1815             if (type == GST_RTCP_SDES_CNAME) {
1816               GST_RTP_BIN_LOCK (bin);
1817               /* associate the stream to CNAME */
1818               gst_rtp_bin_associate (bin, stream, len, data,
1819                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1820                   clock_base);
1821               GST_RTP_BIN_UNLOCK (bin);
1822             }
1823           }
1824         }
1825         have_sdes = TRUE;
1826         break;
1827       }
1828       default:
1829         /* we can ignore these packets */
1830         break;
1831     }
1832   }
1833   gst_rtcp_buffer_unmap (&rtcp);
1834 }
1835
1836 /* create a new stream with @ssrc in @session. Must be called with
1837  * RTP_SESSION_LOCK. */
1838 static GstRtpBinStream *
1839 create_stream (GstRtpBinSession * session, guint32 ssrc)
1840 {
1841   GstElement *buffer, *demux = NULL;
1842   GstRtpBinStream *stream;
1843   GstRtpBin *rtpbin;
1844   GstState target;
1845   GObjectClass *jb_class;
1846
1847   rtpbin = session->bin;
1848
1849   if (g_slist_length (session->streams) >= rtpbin->max_streams)
1850     goto max_streams;
1851
1852   if (!(buffer =
1853           session_request_element (session, SIGNAL_REQUEST_JITTERBUFFER)))
1854     goto no_jitterbuffer;
1855
1856   if (!rtpbin->ignore_pt) {
1857     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1858       goto no_demux;
1859   }
1860
1861   stream = g_new0 (GstRtpBinStream, 1);
1862   stream->ssrc = ssrc;
1863   stream->bin = rtpbin;
1864   stream->session = session;
1865   stream->buffer = gst_object_ref (buffer);
1866   stream->demux = demux;
1867
1868   stream->have_sync = FALSE;
1869   stream->rt_delta = 0;
1870   stream->avg_ts_offset = 0;
1871   stream->is_initialized = FALSE;
1872   stream->rtp_delta = 0;
1873   stream->percent = 100;
1874   stream->clock_base = -100 * GST_SECOND;
1875   session->streams = g_slist_prepend (session->streams, stream);
1876
1877   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (buffer));
1878
1879   if (g_signal_lookup ("request-pt-map", G_OBJECT_TYPE (buffer)) != 0) {
1880     /* provide clock_rate to the jitterbuffer when needed */
1881     stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1882         (GCallback) pt_map_requested, session);
1883   }
1884   if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (buffer)) != 0) {
1885     stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1886         (GCallback) on_npt_stop, stream);
1887   }
1888
1889   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1890   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1891
1892   /* configure latency and packet lost */
1893   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1894
1895   if (g_object_class_find_property (jb_class, "drop-on-latency"))
1896     g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1897   if (g_object_class_find_property (jb_class, "do-lost"))
1898     g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1899   if (g_object_class_find_property (jb_class, "mode"))
1900     g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1901   if (g_object_class_find_property (jb_class, "do-retransmission"))
1902     g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1903   if (g_object_class_find_property (jb_class, "max-rtcp-rtp-time-diff"))
1904     g_object_set (buffer, "max-rtcp-rtp-time-diff",
1905         rtpbin->max_rtcp_rtp_time_diff, NULL);
1906   if (g_object_class_find_property (jb_class, "max-dropout-time"))
1907     g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, NULL);
1908   if (g_object_class_find_property (jb_class, "max-misorder-time"))
1909     g_object_set (buffer, "max-misorder-time", rtpbin->max_misorder_time, NULL);
1910   if (g_object_class_find_property (jb_class, "rfc7273-sync"))
1911     g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
1912   if (g_object_class_find_property (jb_class, "add-reference-timestamp-meta"))
1913     g_object_set (buffer, "add-reference-timestamp-meta",
1914         rtpbin->add_reference_timestamp_meta, NULL);
1915   if (g_object_class_find_property (jb_class, "max-ts-offset-adjustment"))
1916     g_object_set (buffer, "max-ts-offset-adjustment",
1917         rtpbin->max_ts_offset_adjustment, NULL);
1918
1919   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1920       buffer, session->id, ssrc);
1921
1922   if (!rtpbin->ignore_pt)
1923     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1924
1925   /* link stuff */
1926   if (demux)
1927     gst_element_link_pads_full (buffer, "src", demux, "sink",
1928         GST_PAD_LINK_CHECK_NOTHING);
1929
1930   if (rtpbin->buffering) {
1931     guint64 last_out;
1932
1933     if (g_signal_lookup ("set-active", G_OBJECT_TYPE (buffer)) != 0) {
1934       GST_INFO_OBJECT (rtpbin,
1935           "bin is buffering, set jitterbuffer as not active");
1936       g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0,
1937           &last_out);
1938     }
1939   }
1940
1941
1942   GST_OBJECT_LOCK (rtpbin);
1943   target = GST_STATE_TARGET (rtpbin);
1944   GST_OBJECT_UNLOCK (rtpbin);
1945
1946   /* from sink to source */
1947   if (demux)
1948     gst_element_set_state (demux, target);
1949
1950   gst_element_set_state (buffer, target);
1951
1952   return stream;
1953
1954   /* ERRORS */
1955 max_streams:
1956   {
1957     GST_WARNING_OBJECT (rtpbin, "stream exceeds maximum (%d)",
1958         rtpbin->max_streams);
1959     return NULL;
1960   }
1961 no_jitterbuffer:
1962   {
1963     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1964     return NULL;
1965   }
1966 no_demux:
1967   {
1968     gst_object_unref (buffer);
1969     g_warning ("rtpbin: could not create rtpptdemux element");
1970     return NULL;
1971   }
1972 }
1973
1974 /* called with RTP_BIN_LOCK */
1975 static void
1976 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1977 {
1978   GstRtpBinSession *sess = stream->session;
1979   GSList *clients, *next_client;
1980
1981   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1982
1983   gst_element_set_locked_state (stream->buffer, TRUE);
1984   if (stream->demux)
1985     gst_element_set_locked_state (stream->demux, TRUE);
1986
1987   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1988   if (stream->demux)
1989     gst_element_set_state (stream->demux, GST_STATE_NULL);
1990
1991   if (stream->demux) {
1992     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1993     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1994     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1995     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1996   }
1997
1998   if (stream->buffer_handlesync_sig)
1999     g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
2000   if (stream->buffer_ptreq_sig)
2001     g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
2002   if (stream->buffer_ntpstop_sig)
2003     g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
2004
2005   sess->elements = g_slist_remove (sess->elements, stream->buffer);
2006   remove_bin_element (stream->buffer, bin);
2007   gst_object_unref (stream->buffer);
2008
2009   if (stream->demux)
2010     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
2011
2012   for (clients = bin->clients; clients; clients = next_client) {
2013     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
2014     GSList *streams, *next_stream;
2015
2016     next_client = g_slist_next (clients);
2017
2018     for (streams = client->streams; streams; streams = next_stream) {
2019       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
2020
2021       next_stream = g_slist_next (streams);
2022
2023       if (ostream == stream) {
2024         client->streams = g_slist_delete_link (client->streams, streams);
2025         /* If this was the last stream belonging to this client,
2026          * clean up the client. */
2027         if (--client->nstreams == 0) {
2028           bin->clients = g_slist_delete_link (bin->clients, clients);
2029           free_client (client, bin);
2030           break;
2031         }
2032       }
2033     }
2034   }
2035   g_free (stream);
2036 }
2037
2038 /* GObject vmethods */
2039 static void gst_rtp_bin_dispose (GObject * object);
2040 static void gst_rtp_bin_finalize (GObject * object);
2041 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
2042     const GValue * value, GParamSpec * pspec);
2043 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
2044     GValue * value, GParamSpec * pspec);
2045
2046 /* GstElement vmethods */
2047 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
2048     GstStateChange transition);
2049 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
2050     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
2051 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
2052 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
2053
2054 #define gst_rtp_bin_parent_class parent_class
2055 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
2056 GST_ELEMENT_REGISTER_DEFINE (rtpbin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN);
2057
2058 static gboolean
2059 _gst_element_accumulator (GSignalInvocationHint * ihint,
2060     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2061 {
2062   GstElement *element;
2063
2064   element = g_value_get_object (handler_return);
2065   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
2066
2067   g_value_set_object (return_accu, element);
2068
2069   /* stop emission if we have an element */
2070   return (element == NULL);
2071 }
2072
2073 static gboolean
2074 _gst_caps_accumulator (GSignalInvocationHint * ihint,
2075     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2076 {
2077   GstCaps *caps;
2078
2079   caps = g_value_get_boxed (handler_return);
2080   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
2081
2082   g_value_set_boxed (return_accu, caps);
2083
2084   /* stop emission if we have a caps */
2085   return (caps == NULL);
2086 }
2087
2088 static void
2089 gst_rtp_bin_class_init (GstRtpBinClass * klass)
2090 {
2091   GObjectClass *gobject_class;
2092   GstElementClass *gstelement_class;
2093   GstBinClass *gstbin_class;
2094
2095   gobject_class = (GObjectClass *) klass;
2096   gstelement_class = (GstElementClass *) klass;
2097   gstbin_class = (GstBinClass *) klass;
2098
2099   gobject_class->dispose = gst_rtp_bin_dispose;
2100   gobject_class->finalize = gst_rtp_bin_finalize;
2101   gobject_class->set_property = gst_rtp_bin_set_property;
2102   gobject_class->get_property = gst_rtp_bin_get_property;
2103
2104   g_object_class_install_property (gobject_class, PROP_LATENCY,
2105       g_param_spec_uint ("latency", "Buffer latency in ms",
2106           "Default amount of ms to buffer in the jitterbuffers", 0,
2107           G_MAXUINT, DEFAULT_LATENCY_MS,
2108           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2109
2110   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
2111       g_param_spec_boolean ("drop-on-latency",
2112           "Drop buffers when maximum latency is reached",
2113           "Tells the jitterbuffer to never exceed the given latency in size",
2114           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2115
2116   /**
2117    * GstRtpBin::request-pt-map:
2118    * @rtpbin: the object which received the signal
2119    * @session: the session
2120    * @pt: the pt
2121    *
2122    * Request the payload type as #GstCaps for @pt in @session.
2123    */
2124   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
2125       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
2126       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
2127       _gst_caps_accumulator, NULL, NULL, GST_TYPE_CAPS, 2, G_TYPE_UINT,
2128       G_TYPE_UINT);
2129
2130     /**
2131    * GstRtpBin::payload-type-change:
2132    * @rtpbin: the object which received the signal
2133    * @session: the session
2134    * @pt: the pt
2135    *
2136    * Signal that the current payload type changed to @pt in @session.
2137    */
2138   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
2139       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
2140       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
2141       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2142
2143   /**
2144    * GstRtpBin::clear-pt-map:
2145    * @rtpbin: the object which received the signal
2146    *
2147    * Clear all previously cached pt-mapping obtained with
2148    * #GstRtpBin::request-pt-map.
2149    */
2150   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
2151       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
2152       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2153           clear_pt_map), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2154
2155   /**
2156    * GstRtpBin::reset-sync:
2157    * @rtpbin: the object which received the signal
2158    *
2159    * Reset all currently configured lip-sync parameters and require new SR
2160    * packets for all streams before lip-sync is attempted again.
2161    */
2162   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
2163       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
2164       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2165           reset_sync), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2166
2167   /**
2168    * GstRtpBin::get-session:
2169    * @rtpbin: the object which received the signal
2170    * @id: the session id
2171    *
2172    * Request the related GstRtpSession as #GstElement related with session @id.
2173    *
2174    * Since: 1.8
2175    */
2176   gst_rtp_bin_signals[SIGNAL_GET_SESSION] =
2177       g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass),
2178       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2179           get_session), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2180
2181   /**
2182    * GstRtpBin::get-internal-session:
2183    * @rtpbin: the object which received the signal
2184    * @id: the session id
2185    *
2186    * Request the internal RTPSession object as #GObject in session @id.
2187    */
2188   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
2189       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
2190       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2191           get_internal_session), NULL, NULL, NULL, RTP_TYPE_SESSION, 1,
2192       G_TYPE_UINT);
2193
2194   /**
2195    * GstRtpBin::get-internal-storage:
2196    * @rtpbin: the object which received the signal
2197    * @id: the session id
2198    *
2199    * Request the internal RTPStorage object as #GObject in session @id. This
2200    * is the internal storage used by the RTPStorage element, which is used to
2201    * keep a backlog of received RTP packets for the session @id.
2202    *
2203    * Since: 1.14
2204    */
2205   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_STORAGE] =
2206       g_signal_new ("get-internal-storage", G_TYPE_FROM_CLASS (klass),
2207       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2208           get_internal_storage), NULL, NULL, NULL, G_TYPE_OBJECT, 1,
2209       G_TYPE_UINT);
2210
2211   /**
2212    * GstRtpBin::get-storage:
2213    * @rtpbin: the object which received the signal
2214    * @id: the session id
2215    *
2216    * Request the RTPStorage element as #GObject in session @id. This element
2217    * is used to keep a backlog of received RTP packets for the session @id.
2218    *
2219    * Since: 1.16
2220    */
2221   gst_rtp_bin_signals[SIGNAL_GET_STORAGE] =
2222       g_signal_new ("get-storage", G_TYPE_FROM_CLASS (klass),
2223       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2224           get_storage), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2225
2226   /**
2227    * GstRtpBin::clear-ssrc:
2228    * @rtpbin: the object which received the signal
2229    * @id: the session id
2230    * @ssrc: the ssrc
2231    *
2232    * Remove all pads from rtpssrcdemux element associated with the specified
2233    * ssrc. This delegate the action signal to the rtpssrcdemux element
2234    * associated with the specified session.
2235    *
2236    * Since: 1.20
2237    */
2238   gst_rtp_bin_signals[SIGNAL_CLEAR_SSRC] =
2239       g_signal_new ("clear-ssrc", G_TYPE_FROM_CLASS (klass),
2240       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2241           clear_ssrc), NULL, NULL, NULL, G_TYPE_NONE, 2,
2242       G_TYPE_UINT, G_TYPE_UINT);
2243
2244   /**
2245    * GstRtpBin::on-new-ssrc:
2246    * @rtpbin: the object which received the signal
2247    * @session: the session
2248    * @ssrc: the SSRC
2249    *
2250    * Notify of a new SSRC that entered @session.
2251    */
2252   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
2253       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
2254       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
2255       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2256   /**
2257    * GstRtpBin::on-ssrc-collision:
2258    * @rtpbin: the object which received the signal
2259    * @session: the session
2260    * @ssrc: the SSRC
2261    *
2262    * Notify when we have an SSRC collision
2263    */
2264   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
2265       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
2266       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
2267       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2268   /**
2269    * GstRtpBin::on-ssrc-validated:
2270    * @rtpbin: the object which received the signal
2271    * @session: the session
2272    * @ssrc: the SSRC
2273    *
2274    * Notify of a new SSRC that became validated.
2275    */
2276   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
2277       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
2278       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
2279       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2280   /**
2281    * GstRtpBin::on-ssrc-active:
2282    * @rtpbin: the object which received the signal
2283    * @session: the session
2284    * @ssrc: the SSRC
2285    *
2286    * Notify of a SSRC that is active, i.e., sending RTCP.
2287    */
2288   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
2289       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
2290       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
2291       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2292   /**
2293    * GstRtpBin::on-ssrc-sdes:
2294    * @rtpbin: the object which received the signal
2295    * @session: the session
2296    * @ssrc: the SSRC
2297    *
2298    * Notify of a SSRC that is active, i.e., sending RTCP.
2299    */
2300   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
2301       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
2302       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
2303       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2304
2305   /**
2306    * GstRtpBin::on-bye-ssrc:
2307    * @rtpbin: the object which received the signal
2308    * @session: the session
2309    * @ssrc: the SSRC
2310    *
2311    * Notify of an SSRC that became inactive because of a BYE packet.
2312    */
2313   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
2314       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
2315       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
2316       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2317   /**
2318    * GstRtpBin::on-bye-timeout:
2319    * @rtpbin: the object which received the signal
2320    * @session: the session
2321    * @ssrc: the SSRC
2322    *
2323    * Notify of an SSRC that has timed out because of BYE
2324    */
2325   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
2326       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
2327       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
2328       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2329   /**
2330    * GstRtpBin::on-timeout:
2331    * @rtpbin: the object which received the signal
2332    * @session: the session
2333    * @ssrc: the SSRC
2334    *
2335    * Notify of an SSRC that has timed out
2336    */
2337   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
2338       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
2339       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
2340       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2341   /**
2342    * GstRtpBin::on-sender-timeout:
2343    * @rtpbin: the object which received the signal
2344    * @session: the session
2345    * @ssrc: the SSRC
2346    *
2347    * Notify of a sender SSRC that has timed out and became a receiver
2348    */
2349   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
2350       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
2351       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
2352       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2353
2354   /**
2355    * GstRtpBin::on-npt-stop:
2356    * @rtpbin: the object which received the signal
2357    * @session: the session
2358    * @ssrc: the SSRC
2359    *
2360    * Notify that SSRC sender has sent data up to the configured NPT stop time.
2361    */
2362   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
2363       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
2364       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
2365       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2366
2367   /**
2368    * GstRtpBin::request-rtp-encoder:
2369    * @rtpbin: the object which received the signal
2370    * @session: the session
2371    *
2372    * Request an RTP encoder element for the given @session. The encoder
2373    * element will be added to the bin if not previously added.
2374    *
2375    * If no handler is connected, no encoder will be used.
2376    *
2377    * Since: 1.4
2378    */
2379   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
2380       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
2381       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2382           request_rtp_encoder), _gst_element_accumulator, NULL, NULL,
2383       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2384
2385   /**
2386    * GstRtpBin::request-rtp-decoder:
2387    * @rtpbin: the object which received the signal
2388    * @session: the session
2389    *
2390    * Request an RTP decoder element for the given @session. The decoder
2391    * element will be added to the bin if not previously added.
2392    *
2393    * If no handler is connected, no encoder will be used.
2394    *
2395    * Since: 1.4
2396    */
2397   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
2398       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
2399       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2400           request_rtp_decoder), _gst_element_accumulator, NULL,
2401       NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2402
2403   /**
2404    * GstRtpBin::request-rtcp-encoder:
2405    * @rtpbin: the object which received the signal
2406    * @session: the session
2407    *
2408    * Request an RTCP encoder element for the given @session. The encoder
2409    * element will be added to the bin if not previously added.
2410    *
2411    * If no handler is connected, no encoder will be used.
2412    *
2413    * Since: 1.4
2414    */
2415   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
2416       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
2417       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2418           request_rtcp_encoder), _gst_element_accumulator, NULL, NULL,
2419       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2420
2421   /**
2422    * GstRtpBin::request-rtcp-decoder:
2423    * @rtpbin: the object which received the signal
2424    * @session: the session
2425    *
2426    * Request an RTCP decoder element for the given @session. The decoder
2427    * element will be added to the bin if not previously added.
2428    *
2429    * If no handler is connected, no encoder will be used.
2430    *
2431    * Since: 1.4
2432    */
2433   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
2434       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
2435       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2436           request_rtcp_decoder), _gst_element_accumulator, NULL, NULL,
2437       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2438
2439   /**
2440    * GstRtpBin::request-jitterbuffer:
2441    * @rtpbin: the object which received the signal
2442    * @session: the session
2443    *
2444    * Request a jitterbuffer element for the given @session.
2445    *
2446    * If no handler is connected, the default jitterbuffer will be used.
2447    *
2448    * Note: The provided element is expected to conform to the API exposed
2449    * by the standard #GstRtpJitterBuffer. Runtime checks will be made to
2450    * determine whether it exposes properties and signals before attempting
2451    * to set, call or connect to them, and some functionalities of #GstRtpBin
2452    * may not be available when that is not the case.
2453    *
2454    * This should be considered experimental API, as the standard jitterbuffer
2455    * API is susceptible to change, provided elements will have to update their
2456    * custom jitterbuffer's API to match the API of #GstRtpJitterBuffer if and
2457    * when it changes.
2458    *
2459    * Since: 1.18
2460    */
2461   gst_rtp_bin_signals[SIGNAL_REQUEST_JITTERBUFFER] =
2462       g_signal_new ("request-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2463       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2464           request_jitterbuffer), _gst_element_accumulator, NULL,
2465       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2466
2467   /**
2468    * GstRtpBin::new-jitterbuffer:
2469    * @rtpbin: the object which received the signal
2470    * @jitterbuffer: the new jitterbuffer
2471    * @session: the session
2472    * @ssrc: the SSRC
2473    *
2474    * Notify that a new @jitterbuffer was created for @session and @ssrc.
2475    * This signal can, for example, be used to configure @jitterbuffer.
2476    *
2477    * Since: 1.4
2478    */
2479   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
2480       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2481       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2482           new_jitterbuffer), NULL, NULL, NULL,
2483       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2484
2485   /**
2486    * GstRtpBin::new-storage:
2487    * @rtpbin: the object which received the signal
2488    * @storage: the new storage
2489    * @session: the session
2490    *
2491    * Notify that a new @storage was created for @session.
2492    * This signal can, for example, be used to configure @storage.
2493    *
2494    * Since: 1.14
2495    */
2496   gst_rtp_bin_signals[SIGNAL_NEW_STORAGE] =
2497       g_signal_new ("new-storage", G_TYPE_FROM_CLASS (klass),
2498       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2499           new_storage), NULL, NULL, NULL,
2500       G_TYPE_NONE, 2, GST_TYPE_ELEMENT, G_TYPE_UINT);
2501
2502   /**
2503    * GstRtpBin::request-aux-sender:
2504    * @rtpbin: the object which received the signal
2505    * @session: the session
2506    *
2507    * Request an AUX sender element for the given @session. The AUX
2508    * element will be added to the bin.
2509    *
2510    * If no handler is connected, no AUX element will be used.
2511    *
2512    * Since: 1.4
2513    */
2514   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2515       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2516       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2517           request_aux_sender), _gst_element_accumulator, NULL, NULL,
2518       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2519
2520   /**
2521    * GstRtpBin::request-aux-receiver:
2522    * @rtpbin: the object which received the signal
2523    * @session: the session
2524    *
2525    * Request an AUX receiver element for the given @session. The AUX
2526    * element will be added to the bin.
2527    *
2528    * If no handler is connected, no AUX element will be used.
2529    *
2530    * Since: 1.4
2531    */
2532   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2533       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2534       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2535           request_aux_receiver), _gst_element_accumulator, NULL, NULL,
2536       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2537
2538   /**
2539    * GstRtpBin::request-fec-decoder:
2540    * @rtpbin: the object which received the signal
2541    * @session: the session index
2542    *
2543    * Request a FEC decoder element for the given @session. The element
2544    * will be added to the bin after the pt demuxer.  If there are multiple
2545    * ssrc's and pt's in @session, this signal may be called multiple times for
2546    * the same @session each corresponding to a newly discovered ssrc.
2547    *
2548    * If no handler is connected, no FEC decoder will be used.
2549    *
2550    * Warning: usage of this signal is not appropriate for the BUNDLE case,
2551    * connect to #GstRtpBin::request-fec-decoder-full instead.
2552    *
2553    * Since: 1.14
2554    */
2555   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER] =
2556       g_signal_new ("request-fec-decoder", G_TYPE_FROM_CLASS (klass),
2557       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2558           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2559       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2560
2561   /**
2562    * GstRtpBin::request-fec-decoder-full:
2563    * @rtpbin: the object which received the signal
2564    * @session: the session index
2565    * @ssrc: the ssrc of the stream
2566    * @pt: the payload type
2567    *
2568    * Request a FEC decoder element for the given @session. The element
2569    * will be added to the bin after the pt demuxer.  If there are multiple
2570    * ssrc's and pt's in @session, this signal may be called multiple times for
2571    * the same @session each corresponding to a newly discovered ssrc and payload
2572    * type, those are provided as parameters.
2573    *
2574    * If no handler is connected, no FEC decoder will be used.
2575    *
2576    * Since: 1.20
2577    */
2578   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER_FULL] =
2579       g_signal_new ("request-fec-decoder-full", G_TYPE_FROM_CLASS (klass),
2580       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2581           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2582       GST_TYPE_ELEMENT, 3, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT);
2583
2584   /**
2585    * GstRtpBin::request-fec-encoder:
2586    * @rtpbin: the object which received the signal
2587    * @session: the session index
2588    *
2589    * Request a FEC encoder element for the given @session. The element
2590    * will be added to the bin after the RTPSession.
2591    *
2592    * If no handler is connected, no FEC encoder will be used.
2593    *
2594    * Since: 1.14
2595    */
2596   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_ENCODER] =
2597       g_signal_new ("request-fec-encoder", G_TYPE_FROM_CLASS (klass),
2598       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2599           request_fec_encoder), _gst_element_accumulator, NULL, NULL,
2600       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2601
2602   /**
2603    * GstRtpBin::on-new-sender-ssrc:
2604    * @rtpbin: the object which received the signal
2605    * @session: the session
2606    * @ssrc: the sender SSRC
2607    *
2608    * Notify of a new sender SSRC that entered @session.
2609    *
2610    * Since: 1.8
2611    */
2612   gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
2613       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
2614       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc),
2615       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2616   /**
2617    * GstRtpBin::on-sender-ssrc-active:
2618    * @rtpbin: the object which received the signal
2619    * @session: the session
2620    * @ssrc: the sender SSRC
2621    *
2622    * Notify of a sender SSRC that is active, i.e., sending RTCP.
2623    *
2624    * Since: 1.8
2625    */
2626   gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
2627       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
2628       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2629           on_sender_ssrc_active), NULL, NULL, NULL,
2630       G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2631
2632   g_object_class_install_property (gobject_class, PROP_SDES,
2633       g_param_spec_boxed ("sdes", "SDES",
2634           "The SDES items of this session",
2635           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
2636           | GST_PARAM_DOC_SHOW_DEFAULT));
2637
2638   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2639       g_param_spec_boolean ("do-lost", "Do Lost",
2640           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2641           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2642
2643   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2644       g_param_spec_boolean ("autoremove", "Auto Remove",
2645           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2646           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2647
2648   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2649       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2650           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2651           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2652
2653   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2654       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2655           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
2656           "(DEPRECATED: Use ntp-time-source property)",
2657           DEFAULT_USE_PIPELINE_CLOCK,
2658           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
2659   /**
2660    * GstRtpBin:buffer-mode:
2661    *
2662    * Control the buffering and timestamping mode used by the jitterbuffer.
2663    */
2664   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2665       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2666           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2667           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2668   /**
2669    * GstRtpBin:ntp-sync:
2670    *
2671    * Set the NTP time from the sender reports as the running-time on the
2672    * buffers. When both the sender and receiver have sychronized
2673    * running-time, i.e. when the clock and base-time is shared
2674    * between the receivers and the and the senders, this option can be
2675    * used to synchronize receivers on multiple machines.
2676    */
2677   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2678       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2679           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2680           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2681
2682   /**
2683    * GstRtpBin:rtcp-sync:
2684    *
2685    * If not synchronizing (directly) to the NTP clock, determines how to sync
2686    * the various streams.
2687    */
2688   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2689       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2690           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2691           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2692
2693   /**
2694    * GstRtpBin:rtcp-sync-interval:
2695    *
2696    * Determines how often to sync streams using RTCP data.
2697    */
2698   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2699       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2700           "RTCP SR interval synchronization (ms) (0 = always)",
2701           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2702           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2703
2704   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2705       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2706           "Send event downstream when a stream is synchronized to the sender",
2707           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2708
2709   /**
2710    * GstRtpBin:do-retransmission:
2711    *
2712    * Enables RTP retransmission on all streams. To control retransmission on
2713    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2714    * set the #GstRtpJitterBuffer:do-retransmission property on the
2715    * #GstRtpJitterBuffer object instead.
2716    */
2717   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2718       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2719           "Enable retransmission on all streams",
2720           DEFAULT_DO_RETRANSMISSION,
2721           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2722
2723   /**
2724    * GstRtpBin:rtp-profile:
2725    *
2726    * Sets the default RTP profile of newly created RTP sessions. The
2727    * profile can be changed afterwards on a per-session basis.
2728    */
2729   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
2730       g_param_spec_enum ("rtp-profile", "RTP Profile",
2731           "Default RTP profile of newly created sessions",
2732           GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
2733           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2734
2735   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
2736       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
2737           "NTP time source for RTCP packets",
2738           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
2739           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2740
2741   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
2742       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
2743           "Use send time or capture time for RTCP sync "
2744           "(TRUE = send time, FALSE = capture time)",
2745           DEFAULT_RTCP_SYNC_SEND_TIME,
2746           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2747
2748   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
2749       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
2750           "Maximum amount of time in ms that the RTP time in RTCP SRs "
2751           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
2752           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
2753           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2754
2755   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
2756       g_param_spec_uint ("max-dropout-time", "Max dropout time",
2757           "The maximum time (milliseconds) of missing packets tolerated.",
2758           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
2759           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2760
2761   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
2762       g_param_spec_uint ("max-misorder-time", "Max misorder time",
2763           "The maximum time (milliseconds) of misordered packets tolerated.",
2764           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
2765           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2766
2767   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
2768       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
2769           "Synchronize received streams to the RFC7273 clock "
2770           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
2771           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2772
2773   /**
2774    * GstRtpBin:add-reference-timestamp-meta:
2775    *
2776    * When syncing to a RFC7273 clock, add #GstReferenceTimestampMeta
2777    * to buffers with the original reconstructed reference clock timestamp.
2778    *
2779    * Since: 1.22
2780    */
2781   g_object_class_install_property (gobject_class,
2782       PROP_ADD_REFERENCE_TIMESTAMP_META,
2783       g_param_spec_boolean ("add-reference-timestamp-meta",
2784           "Add Reference Timestamp Meta",
2785           "Add Reference Timestamp Meta to buffers with the original clock timestamp "
2786           "before any adjustments when syncing to an RFC7273 clock.",
2787           DEFAULT_ADD_REFERENCE_TIMESTAMP_META,
2788           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2789
2790   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
2791       g_param_spec_uint ("max-streams", "Max Streams",
2792           "The maximum number of streams to create for one session",
2793           0, G_MAXUINT, DEFAULT_MAX_STREAMS,
2794           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2795
2796   /**
2797    * GstRtpBin:max-ts-offset-adjustment:
2798    *
2799    * Syncing time stamps to NTP time adds a time offset. This parameter
2800    * specifies the maximum number of nanoseconds per frame that this time offset
2801    * may be adjusted with. This is used to avoid sudden large changes to time
2802    * stamps.
2803    *
2804    * Since: 1.14
2805    */
2806   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
2807       g_param_spec_uint64 ("max-ts-offset-adjustment",
2808           "Max Timestamp Offset Adjustment",
2809           "The maximum number of nanoseconds per frame that time stamp offsets "
2810           "may be adjusted (0 = no limit).", 0, G_MAXUINT64,
2811           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
2812           G_PARAM_STATIC_STRINGS));
2813
2814   /**
2815    * GstRtpBin:max-ts-offset:
2816    *
2817    * Used to set an upper limit of how large a time offset may be. This
2818    * is used to protect against unrealistic values as a result of either
2819    * client,server or clock issues.
2820    *
2821    * Since: 1.14
2822    */
2823   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET,
2824       g_param_spec_int64 ("max-ts-offset", "Max TS Offset",
2825           "The maximum absolute value of the time offset in (nanoseconds). "
2826           "Note, if the ntp-sync parameter is set the default value is "
2827           "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET,
2828           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2829
2830   /**
2831    * GstRtpBin:min-ts-offset:
2832    *
2833    * Used to set an lower limit for when a time offset is deemed large enough
2834    * to be useful for sync corrections.
2835    *
2836    * When streaming for instance audio, even very small ts_offsets cause
2837    * audible glitches. This property is used for controlling how sensitive the
2838    * adjustments should be to small deviations in ts_offset, occurring for
2839    * instance due to jittery network conditions or system load.
2840    *
2841    * Since: 1.22
2842    */
2843   g_object_class_install_property (gobject_class, PROP_MIN_TS_OFFSET,
2844       g_param_spec_uint64 ("min-ts-offset", "Min TS Offset",
2845           "The minimum absolute value of the time offset in (nanoseconds). "
2846           "Used to set an lower limit for when a time offset is deemed large "
2847           "enough to be useful for sync corrections."
2848           "Note, if the ntp-sync parameter is set the default value is "
2849           "changed to 0 (no limit)", 0, G_MAXUINT64, DEFAULT_MIN_TS_OFFSET,
2850           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2851
2852   /**
2853    * GstRtpBin:ts-offset-smoothing-factor:
2854    *
2855    * Controls the weighting between previous and current timestamp offsets in
2856    * a running moving average (RMA):
2857    * ts_offset_average(n) =
2858    *   ((ts-offset-smoothing-factor - 1) * ts_offset_average(n - 1) + ts_offset(n)) /
2859    *   ts-offset-smoothing-factor
2860    *
2861    * This can stabilize the timestamp offset and prevent unnecessary skew
2862    * corrections due to jitter introduced by network or system load.
2863    *
2864    * Since: 1.22
2865    */
2866   g_object_class_install_property (gobject_class,
2867       PROP_TS_OFFSET_SMOOTHING_FACTOR,
2868       g_param_spec_uint ("ts-offset-smoothing-factor",
2869           "Timestamp Offset Smoothing Factor",
2870           "Sets a smoothing factor for the timestamp offset in number of "
2871           "values for a calculated running moving average. "
2872           "(0 = no smoothing factor)", 0, G_MAXUINT,
2873           DEFAULT_TS_OFFSET_SMOOTHING_FACTOR,
2874           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2875
2876   /**
2877    * GstRtpBin:fec-decoders:
2878    *
2879    * Used to provide a factory used to build the FEC decoder for a
2880    * given session, as a command line alternative to
2881    * #GstRtpBin::request-fec-decoder.
2882    *
2883    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2884    *
2885    * Since: 1.20
2886    */
2887   g_object_class_install_property (gobject_class, PROP_FEC_DECODERS,
2888       g_param_spec_boxed ("fec-decoders", "Fec Decoders",
2889           "GstStructure mapping from session index to FEC decoder "
2890           "factory, eg "
2891           "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'",
2892           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2893
2894   /**
2895    * GstRtpBin:fec-encoders:
2896    *
2897    * Used to provide a factory used to build the FEC encoder for a
2898    * given session, as a command line alternative to
2899    * #GstRtpBin::request-fec-encoder.
2900    *
2901    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2902    *
2903    * Since: 1.20
2904    */
2905   g_object_class_install_property (gobject_class, PROP_FEC_ENCODERS,
2906       g_param_spec_boxed ("fec-encoders", "Fec Encoders",
2907           "GstStructure mapping from session index to FEC encoder "
2908           "factory, eg "
2909           "fec-encoders='fec,0=\"rtpst2022-1-fecenc\\ rows\\=5\\ columns\\=5\";'",
2910           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2911
2912   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2913   gstelement_class->request_new_pad =
2914       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2915   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2916
2917   /* sink pads */
2918   gst_element_class_add_static_pad_template (gstelement_class,
2919       &rtpbin_recv_rtp_sink_template);
2920   gst_element_class_add_static_pad_template (gstelement_class,
2921       &rtpbin_recv_fec_sink_template);
2922   gst_element_class_add_static_pad_template (gstelement_class,
2923       &rtpbin_recv_rtcp_sink_template);
2924   gst_element_class_add_static_pad_template (gstelement_class,
2925       &rtpbin_send_rtp_sink_template);
2926
2927   /* src pads */
2928   gst_element_class_add_static_pad_template (gstelement_class,
2929       &rtpbin_recv_rtp_src_template);
2930   gst_element_class_add_static_pad_template (gstelement_class,
2931       &rtpbin_send_rtcp_src_template);
2932   gst_element_class_add_static_pad_template (gstelement_class,
2933       &rtpbin_send_rtp_src_template);
2934   gst_element_class_add_static_pad_template (gstelement_class,
2935       &rtpbin_send_fec_src_template);
2936
2937   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2938       "Filter/Network/RTP",
2939       "Real-Time Transport Protocol bin",
2940       "Wim Taymans <wim.taymans@gmail.com>");
2941
2942   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2943
2944   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2945   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2946   klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session);
2947   klass->get_internal_session =
2948       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2949   klass->get_storage = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_storage);
2950   klass->get_internal_storage =
2951       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_storage);
2952   klass->clear_ssrc = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_ssrc);
2953   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2954   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2955   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2956   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2957   klass->request_jitterbuffer =
2958       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_jitterbuffer);
2959
2960   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2961
2962   gst_type_mark_as_plugin_api (GST_RTP_BIN_RTCP_SYNC_TYPE, 0);
2963 }
2964
2965 static void
2966 gst_rtp_bin_init (GstRtpBin * rtpbin)
2967 {
2968   gchar *cname;
2969
2970   rtpbin->priv = gst_rtp_bin_get_instance_private (rtpbin);
2971   g_mutex_init (&rtpbin->priv->bin_lock);
2972   g_mutex_init (&rtpbin->priv->dyn_lock);
2973
2974   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2975   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2976   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2977   rtpbin->do_lost = DEFAULT_DO_LOST;
2978   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2979   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2980   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2981   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2982   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2983   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2984   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2985   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2986   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2987   rtpbin->rtp_profile = DEFAULT_RTP_PROFILE;
2988   rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
2989   rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
2990   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
2991   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
2992   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
2993   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
2994   rtpbin->add_reference_timestamp_meta = DEFAULT_ADD_REFERENCE_TIMESTAMP_META;
2995   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
2996   rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
2997   rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
2998   rtpbin->max_ts_offset_is_set = FALSE;
2999   rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
3000   rtpbin->min_ts_offset_is_set = FALSE;
3001   rtpbin->ts_offset_smoothing_factor = DEFAULT_TS_OFFSET_SMOOTHING_FACTOR;
3002
3003   /* some default SDES entries */
3004   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
3005   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
3006       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
3007   rtpbin->fec_decoders =
3008       gst_structure_new_empty ("application/x-rtp-fec-decoders");
3009   rtpbin->fec_encoders =
3010       gst_structure_new_empty ("application/x-rtp-fec-encoders");
3011   g_free (cname);
3012 }
3013
3014 static void
3015 gst_rtp_bin_dispose (GObject * object)
3016 {
3017   GstRtpBin *rtpbin;
3018
3019   rtpbin = GST_RTP_BIN (object);
3020
3021   GST_RTP_BIN_LOCK (rtpbin);
3022   GST_DEBUG_OBJECT (object, "freeing sessions");
3023   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
3024   g_slist_free (rtpbin->sessions);
3025   rtpbin->sessions = NULL;
3026   GST_RTP_BIN_UNLOCK (rtpbin);
3027
3028   G_OBJECT_CLASS (parent_class)->dispose (object);
3029 }
3030
3031 static void
3032 gst_rtp_bin_finalize (GObject * object)
3033 {
3034   GstRtpBin *rtpbin;
3035
3036   rtpbin = GST_RTP_BIN (object);
3037
3038   if (rtpbin->sdes)
3039     gst_structure_free (rtpbin->sdes);
3040
3041   if (rtpbin->fec_decoders)
3042     gst_structure_free (rtpbin->fec_decoders);
3043
3044   if (rtpbin->fec_encoders)
3045     gst_structure_free (rtpbin->fec_encoders);
3046
3047   g_mutex_clear (&rtpbin->priv->bin_lock);
3048   g_mutex_clear (&rtpbin->priv->dyn_lock);
3049
3050   G_OBJECT_CLASS (parent_class)->finalize (object);
3051 }
3052
3053
3054 static void
3055 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
3056 {
3057   GSList *item;
3058
3059   if (sdes == NULL)
3060     return;
3061
3062   GST_RTP_BIN_LOCK (bin);
3063
3064   GST_OBJECT_LOCK (bin);
3065   if (bin->sdes)
3066     gst_structure_free (bin->sdes);
3067   bin->sdes = gst_structure_copy (sdes);
3068   GST_OBJECT_UNLOCK (bin);
3069
3070   /* store in all sessions */
3071   for (item = bin->sessions; item; item = g_slist_next (item)) {
3072     GstRtpBinSession *session = item->data;
3073     g_object_set (session->session, "sdes", sdes, NULL);
3074   }
3075
3076   GST_RTP_BIN_UNLOCK (bin);
3077 }
3078
3079 static void
3080 gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin,
3081     const GstStructure * decoders)
3082 {
3083   if (decoders == NULL)
3084     return;
3085
3086   GST_RTP_BIN_LOCK (bin);
3087
3088   GST_OBJECT_LOCK (bin);
3089   if (bin->fec_decoders)
3090     gst_structure_free (bin->fec_decoders);
3091   bin->fec_decoders = gst_structure_copy (decoders);
3092
3093   GST_OBJECT_UNLOCK (bin);
3094
3095   GST_RTP_BIN_UNLOCK (bin);
3096 }
3097
3098 static void
3099 gst_rtp_bin_set_fec_encoders_struct (GstRtpBin * bin,
3100     const GstStructure * encoders)
3101 {
3102   if (encoders == NULL)
3103     return;
3104
3105   GST_RTP_BIN_LOCK (bin);
3106
3107   GST_OBJECT_LOCK (bin);
3108   if (bin->fec_encoders)
3109     gst_structure_free (bin->fec_encoders);
3110   bin->fec_encoders = gst_structure_copy (encoders);
3111
3112   GST_OBJECT_UNLOCK (bin);
3113
3114   GST_RTP_BIN_UNLOCK (bin);
3115 }
3116
3117 static GstStructure *
3118 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
3119 {
3120   GstStructure *result;
3121
3122   GST_OBJECT_LOCK (bin);
3123   result = gst_structure_copy (bin->sdes);
3124   GST_OBJECT_UNLOCK (bin);
3125
3126   return result;
3127 }
3128
3129 static GstStructure *
3130 gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin)
3131 {
3132   GstStructure *result;
3133
3134   GST_OBJECT_LOCK (bin);
3135   result = gst_structure_copy (bin->fec_decoders);
3136   GST_OBJECT_UNLOCK (bin);
3137
3138   return result;
3139 }
3140
3141 static GstStructure *
3142 gst_rtp_bin_get_fec_encoders_struct (GstRtpBin * bin)
3143 {
3144   GstStructure *result;
3145
3146   GST_OBJECT_LOCK (bin);
3147   result = gst_structure_copy (bin->fec_encoders);
3148   GST_OBJECT_UNLOCK (bin);
3149
3150   return result;
3151 }
3152
3153 static void
3154 gst_rtp_bin_set_property (GObject * object, guint prop_id,
3155     const GValue * value, GParamSpec * pspec)
3156 {
3157   GstRtpBin *rtpbin;
3158
3159   rtpbin = GST_RTP_BIN (object);
3160
3161   switch (prop_id) {
3162     case PROP_LATENCY:
3163       GST_RTP_BIN_LOCK (rtpbin);
3164       rtpbin->latency_ms = g_value_get_uint (value);
3165       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
3166       GST_RTP_BIN_UNLOCK (rtpbin);
3167       /* propagate the property down to the jitterbuffer */
3168       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
3169       break;
3170     case PROP_DROP_ON_LATENCY:
3171       GST_RTP_BIN_LOCK (rtpbin);
3172       rtpbin->drop_on_latency = g_value_get_boolean (value);
3173       GST_RTP_BIN_UNLOCK (rtpbin);
3174       /* propagate the property down to the jitterbuffer */
3175       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3176           "drop-on-latency", value);
3177       break;
3178     case PROP_SDES:
3179       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
3180       break;
3181     case PROP_DO_LOST:
3182       GST_RTP_BIN_LOCK (rtpbin);
3183       rtpbin->do_lost = g_value_get_boolean (value);
3184       GST_RTP_BIN_UNLOCK (rtpbin);
3185       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
3186       break;
3187     case PROP_NTP_SYNC:
3188       rtpbin->ntp_sync = g_value_get_boolean (value);
3189       /* The default value of max_ts_offset depends on ntp_sync. If user
3190        * hasn't set it then change default value */
3191       if (!rtpbin->max_ts_offset_is_set) {
3192         if (rtpbin->ntp_sync) {
3193           rtpbin->max_ts_offset = 0;
3194         } else {
3195           rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
3196         }
3197       }
3198       if (!rtpbin->min_ts_offset_is_set) {
3199         if (rtpbin->ntp_sync) {
3200           rtpbin->min_ts_offset = 0;
3201         } else {
3202           rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
3203         }
3204       }
3205       break;
3206     case PROP_RTCP_SYNC:
3207       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
3208       break;
3209     case PROP_RTCP_SYNC_INTERVAL:
3210       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
3211       break;
3212     case PROP_IGNORE_PT:
3213       rtpbin->ignore_pt = g_value_get_boolean (value);
3214       break;
3215     case PROP_AUTOREMOVE:
3216       rtpbin->priv->autoremove = g_value_get_boolean (value);
3217       break;
3218     case PROP_USE_PIPELINE_CLOCK:
3219     {
3220       GSList *sessions;
3221       GST_RTP_BIN_LOCK (rtpbin);
3222       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
3223       for (sessions = rtpbin->sessions; sessions;
3224           sessions = g_slist_next (sessions)) {
3225         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3226
3227         g_object_set (G_OBJECT (session->session),
3228             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
3229       }
3230       GST_RTP_BIN_UNLOCK (rtpbin);
3231     }
3232       break;
3233     case PROP_DO_SYNC_EVENT:
3234       rtpbin->send_sync_event = g_value_get_boolean (value);
3235       break;
3236     case PROP_BUFFER_MODE:
3237       GST_RTP_BIN_LOCK (rtpbin);
3238       rtpbin->buffer_mode = g_value_get_enum (value);
3239       GST_RTP_BIN_UNLOCK (rtpbin);
3240       /* propagate the property down to the jitterbuffer */
3241       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
3242       break;
3243     case PROP_DO_RETRANSMISSION:
3244       GST_RTP_BIN_LOCK (rtpbin);
3245       rtpbin->do_retransmission = g_value_get_boolean (value);
3246       GST_RTP_BIN_UNLOCK (rtpbin);
3247       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3248           "do-retransmission", value);
3249       break;
3250     case PROP_RTP_PROFILE:
3251       rtpbin->rtp_profile = g_value_get_enum (value);
3252       break;
3253     case PROP_NTP_TIME_SOURCE:{
3254       GSList *sessions;
3255       GST_RTP_BIN_LOCK (rtpbin);
3256       rtpbin->ntp_time_source = g_value_get_enum (value);
3257       for (sessions = rtpbin->sessions; sessions;
3258           sessions = g_slist_next (sessions)) {
3259         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3260
3261         g_object_set (G_OBJECT (session->session),
3262             "ntp-time-source", rtpbin->ntp_time_source, NULL);
3263       }
3264       GST_RTP_BIN_UNLOCK (rtpbin);
3265       break;
3266     }
3267     case PROP_RTCP_SYNC_SEND_TIME:{
3268       GSList *sessions;
3269       GST_RTP_BIN_LOCK (rtpbin);
3270       rtpbin->rtcp_sync_send_time = g_value_get_boolean (value);
3271       for (sessions = rtpbin->sessions; sessions;
3272           sessions = g_slist_next (sessions)) {
3273         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3274
3275         g_object_set (G_OBJECT (session->session),
3276             "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL);
3277       }
3278       GST_RTP_BIN_UNLOCK (rtpbin);
3279       break;
3280     }
3281     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3282       GST_RTP_BIN_LOCK (rtpbin);
3283       rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value);
3284       GST_RTP_BIN_UNLOCK (rtpbin);
3285       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3286           "max-rtcp-rtp-time-diff", value);
3287       break;
3288     case PROP_MAX_DROPOUT_TIME:
3289       GST_RTP_BIN_LOCK (rtpbin);
3290       rtpbin->max_dropout_time = g_value_get_uint (value);
3291       GST_RTP_BIN_UNLOCK (rtpbin);
3292       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3293           "max-dropout-time", value);
3294       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
3295           value);
3296       break;
3297     case PROP_MAX_MISORDER_TIME:
3298       GST_RTP_BIN_LOCK (rtpbin);
3299       rtpbin->max_misorder_time = g_value_get_uint (value);
3300       GST_RTP_BIN_UNLOCK (rtpbin);
3301       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3302           "max-misorder-time", value);
3303       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time",
3304           value);
3305       break;
3306     case PROP_RFC7273_SYNC:
3307       rtpbin->rfc7273_sync = g_value_get_boolean (value);
3308       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3309           "rfc7273-sync", value);
3310       break;
3311     case PROP_ADD_REFERENCE_TIMESTAMP_META:
3312       rtpbin->add_reference_timestamp_meta = g_value_get_boolean (value);
3313       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3314           "add-reference-timestamp-meta", value);
3315       break;
3316     case PROP_MAX_STREAMS:
3317       rtpbin->max_streams = g_value_get_uint (value);
3318       break;
3319     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3320       rtpbin->max_ts_offset_adjustment = g_value_get_uint64 (value);
3321       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3322           "max-ts-offset-adjustment", value);
3323       break;
3324     case PROP_MAX_TS_OFFSET:
3325       rtpbin->max_ts_offset = g_value_get_int64 (value);
3326       rtpbin->max_ts_offset_is_set = TRUE;
3327       break;
3328     case PROP_MIN_TS_OFFSET:
3329       rtpbin->min_ts_offset = g_value_get_uint64 (value);
3330       rtpbin->min_ts_offset_is_set = TRUE;
3331       break;
3332     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3333       rtpbin->ts_offset_smoothing_factor = g_value_get_uint (value);
3334       break;
3335     case PROP_FEC_DECODERS:
3336       gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value));
3337       break;
3338     case PROP_FEC_ENCODERS:
3339       gst_rtp_bin_set_fec_encoders_struct (rtpbin, g_value_get_boxed (value));
3340       break;
3341     default:
3342       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3343       break;
3344   }
3345 }
3346
3347 static void
3348 gst_rtp_bin_get_property (GObject * object, guint prop_id,
3349     GValue * value, GParamSpec * pspec)
3350 {
3351   GstRtpBin *rtpbin;
3352
3353   rtpbin = GST_RTP_BIN (object);
3354
3355   switch (prop_id) {
3356     case PROP_LATENCY:
3357       GST_RTP_BIN_LOCK (rtpbin);
3358       g_value_set_uint (value, rtpbin->latency_ms);
3359       GST_RTP_BIN_UNLOCK (rtpbin);
3360       break;
3361     case PROP_DROP_ON_LATENCY:
3362       GST_RTP_BIN_LOCK (rtpbin);
3363       g_value_set_boolean (value, rtpbin->drop_on_latency);
3364       GST_RTP_BIN_UNLOCK (rtpbin);
3365       break;
3366     case PROP_SDES:
3367       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
3368       break;
3369     case PROP_DO_LOST:
3370       GST_RTP_BIN_LOCK (rtpbin);
3371       g_value_set_boolean (value, rtpbin->do_lost);
3372       GST_RTP_BIN_UNLOCK (rtpbin);
3373       break;
3374     case PROP_IGNORE_PT:
3375       g_value_set_boolean (value, rtpbin->ignore_pt);
3376       break;
3377     case PROP_NTP_SYNC:
3378       g_value_set_boolean (value, rtpbin->ntp_sync);
3379       break;
3380     case PROP_RTCP_SYNC:
3381       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
3382       break;
3383     case PROP_RTCP_SYNC_INTERVAL:
3384       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
3385       break;
3386     case PROP_AUTOREMOVE:
3387       g_value_set_boolean (value, rtpbin->priv->autoremove);
3388       break;
3389     case PROP_BUFFER_MODE:
3390       g_value_set_enum (value, rtpbin->buffer_mode);
3391       break;
3392     case PROP_USE_PIPELINE_CLOCK:
3393       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
3394       break;
3395     case PROP_DO_SYNC_EVENT:
3396       g_value_set_boolean (value, rtpbin->send_sync_event);
3397       break;
3398     case PROP_DO_RETRANSMISSION:
3399       GST_RTP_BIN_LOCK (rtpbin);
3400       g_value_set_boolean (value, rtpbin->do_retransmission);
3401       GST_RTP_BIN_UNLOCK (rtpbin);
3402       break;
3403     case PROP_RTP_PROFILE:
3404       g_value_set_enum (value, rtpbin->rtp_profile);
3405       break;
3406     case PROP_NTP_TIME_SOURCE:
3407       g_value_set_enum (value, rtpbin->ntp_time_source);
3408       break;
3409     case PROP_RTCP_SYNC_SEND_TIME:
3410       g_value_set_boolean (value, rtpbin->rtcp_sync_send_time);
3411       break;
3412     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3413       GST_RTP_BIN_LOCK (rtpbin);
3414       g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff);
3415       GST_RTP_BIN_UNLOCK (rtpbin);
3416       break;
3417     case PROP_MAX_DROPOUT_TIME:
3418       g_value_set_uint (value, rtpbin->max_dropout_time);
3419       break;
3420     case PROP_MAX_MISORDER_TIME:
3421       g_value_set_uint (value, rtpbin->max_misorder_time);
3422       break;
3423     case PROP_RFC7273_SYNC:
3424       g_value_set_boolean (value, rtpbin->rfc7273_sync);
3425       break;
3426     case PROP_ADD_REFERENCE_TIMESTAMP_META:
3427       g_value_set_boolean (value, rtpbin->add_reference_timestamp_meta);
3428       break;
3429     case PROP_MAX_STREAMS:
3430       g_value_set_uint (value, rtpbin->max_streams);
3431       break;
3432     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3433       g_value_set_uint64 (value, rtpbin->max_ts_offset_adjustment);
3434       break;
3435     case PROP_MAX_TS_OFFSET:
3436       g_value_set_int64 (value, rtpbin->max_ts_offset);
3437       break;
3438     case PROP_MIN_TS_OFFSET:
3439       g_value_set_uint64 (value, rtpbin->min_ts_offset);
3440       break;
3441     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3442       g_value_set_uint (value, rtpbin->ts_offset_smoothing_factor);
3443       break;
3444     case PROP_FEC_DECODERS:
3445       g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin));
3446       break;
3447     case PROP_FEC_ENCODERS:
3448       g_value_take_boxed (value, gst_rtp_bin_get_fec_encoders_struct (rtpbin));
3449       break;
3450     default:
3451       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3452       break;
3453   }
3454 }
3455
3456 static void
3457 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
3458 {
3459   GstRtpBin *rtpbin;
3460
3461   rtpbin = GST_RTP_BIN (bin);
3462
3463   switch (GST_MESSAGE_TYPE (message)) {
3464     case GST_MESSAGE_ELEMENT:
3465     {
3466       const GstStructure *s = gst_message_get_structure (message);
3467
3468       /* we change the structure name and add the session ID to it */
3469       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
3470         GstRtpBinSession *sess;
3471
3472         /* find the session we set it as object data */
3473         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3474             "GstRTPBin.session");
3475
3476         if (G_LIKELY (sess)) {
3477           message = gst_message_make_writable (message);
3478           s = gst_message_get_structure (message);
3479           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
3480               sess->id, NULL);
3481         }
3482       }
3483       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3484       break;
3485     }
3486     case GST_MESSAGE_BUFFERING:
3487     {
3488       gint percent;
3489       gint min_percent = 100;
3490       GSList *sessions, *streams;
3491       GstRtpBinStream *stream;
3492       gboolean change = FALSE, active = FALSE;
3493       GstClockTime min_out_time;
3494       GstBufferingMode mode;
3495       gint avg_in, avg_out;
3496       gint64 buffering_left;
3497
3498       gst_message_parse_buffering (message, &percent);
3499       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
3500           &buffering_left);
3501
3502       stream =
3503           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3504           "GstRTPBin.stream");
3505
3506       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
3507
3508       /* get the stream */
3509       if (G_LIKELY (stream)) {
3510         GST_RTP_BIN_LOCK (rtpbin);
3511         /* fill in the percent */
3512         stream->percent = percent;
3513
3514         /* calculate the min value for all streams */
3515         for (sessions = rtpbin->sessions; sessions;
3516             sessions = g_slist_next (sessions)) {
3517           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3518
3519           GST_RTP_SESSION_LOCK (session);
3520           if (session->streams) {
3521             for (streams = session->streams; streams;
3522                 streams = g_slist_next (streams)) {
3523               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3524
3525               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
3526                   stream->percent);
3527
3528               /* find min percent */
3529               if (min_percent > stream->percent)
3530                 min_percent = stream->percent;
3531             }
3532           } else {
3533             GST_INFO_OBJECT (bin,
3534                 "session has no streams, setting min_percent to 0");
3535             min_percent = 0;
3536           }
3537           GST_RTP_SESSION_UNLOCK (session);
3538         }
3539         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
3540
3541         if (rtpbin->buffering) {
3542           if (min_percent == 100) {
3543             rtpbin->buffering = FALSE;
3544             active = TRUE;
3545             change = TRUE;
3546           }
3547         } else {
3548           if (min_percent < 100) {
3549             /* pause the streams */
3550             rtpbin->buffering = TRUE;
3551             active = FALSE;
3552             change = TRUE;
3553           }
3554         }
3555         GST_RTP_BIN_UNLOCK (rtpbin);
3556
3557         gst_message_unref (message);
3558
3559         /* make a new buffering message with the min value */
3560         message =
3561             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
3562         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
3563             buffering_left);
3564
3565         if (G_UNLIKELY (change)) {
3566           GstClock *clock;
3567           guint64 running_time = 0;
3568           guint64 offset = 0;
3569
3570           /* figure out the running time when we have a clock */
3571           if (G_LIKELY ((clock =
3572                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
3573             guint64 now, base_time;
3574
3575             now = gst_clock_get_time (clock);
3576             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
3577             running_time = now - base_time;
3578             gst_object_unref (clock);
3579           }
3580           GST_DEBUG_OBJECT (bin,
3581               "running time now %" GST_TIME_FORMAT,
3582               GST_TIME_ARGS (running_time));
3583
3584           GST_RTP_BIN_LOCK (rtpbin);
3585
3586           /* when we reactivate, calculate the offsets so that all streams have
3587            * an output time that is at least as big as the running_time */
3588           offset = 0;
3589           if (active) {
3590             if (running_time > rtpbin->buffer_start) {
3591               offset = running_time - rtpbin->buffer_start;
3592               if (offset >= rtpbin->latency_ns)
3593                 offset -= rtpbin->latency_ns;
3594               else
3595                 offset = 0;
3596             }
3597           }
3598
3599           /* pause all streams */
3600           min_out_time = -1;
3601           for (sessions = rtpbin->sessions; sessions;
3602               sessions = g_slist_next (sessions)) {
3603             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3604
3605             GST_RTP_SESSION_LOCK (session);
3606             for (streams = session->streams; streams;
3607                 streams = g_slist_next (streams)) {
3608               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3609               GstElement *element = stream->buffer;
3610               guint64 last_out = -1;
3611
3612               if (g_signal_lookup ("set-active", G_OBJECT_TYPE (element)) != 0) {
3613                 g_signal_emit_by_name (element, "set-active", active, offset,
3614                     &last_out);
3615               }
3616
3617               if (!active) {
3618                 g_object_get (element, "percent", &stream->percent, NULL);
3619
3620                 if (last_out == -1)
3621                   last_out = 0;
3622                 if (min_out_time == -1 || last_out < min_out_time)
3623                   min_out_time = last_out;
3624               }
3625
3626               GST_DEBUG_OBJECT (bin,
3627                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
3628                   GST_TIME_FORMAT ", percent %d", element, active,
3629                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
3630                   stream->percent);
3631             }
3632             GST_RTP_SESSION_UNLOCK (session);
3633           }
3634           GST_DEBUG_OBJECT (bin,
3635               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
3636
3637           /* the buffer_start is the min out time of all paused jitterbuffers */
3638           if (!active)
3639             rtpbin->buffer_start = min_out_time;
3640
3641           GST_RTP_BIN_UNLOCK (rtpbin);
3642         }
3643       }
3644       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3645       break;
3646     }
3647     default:
3648     {
3649       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3650       break;
3651     }
3652   }
3653 }
3654
3655 static GstStateChangeReturn
3656 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
3657 {
3658   GstStateChangeReturn res;
3659   GstRtpBin *rtpbin;
3660   GstRtpBinPrivate *priv;
3661
3662   rtpbin = GST_RTP_BIN (element);
3663   priv = rtpbin->priv;
3664
3665   switch (transition) {
3666     case GST_STATE_CHANGE_NULL_TO_READY:
3667       break;
3668     case GST_STATE_CHANGE_READY_TO_PAUSED:
3669       priv->last_ntpnstime = 0;
3670       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
3671       g_atomic_int_set (&priv->shutdown, 0);
3672       break;
3673     case GST_STATE_CHANGE_PAUSED_TO_READY:
3674       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
3675       g_atomic_int_set (&priv->shutdown, 1);
3676       /* wait for all callbacks to end by taking the lock. No new callbacks will
3677        * be able to happen as we set the shutdown flag. */
3678       GST_RTP_BIN_DYN_LOCK (rtpbin);
3679       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
3680       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3681       break;
3682     default:
3683       break;
3684   }
3685
3686   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3687
3688   switch (transition) {
3689     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3690       break;
3691     case GST_STATE_CHANGE_PAUSED_TO_READY:
3692       break;
3693     case GST_STATE_CHANGE_READY_TO_NULL:
3694       break;
3695     default:
3696       break;
3697   }
3698   return res;
3699 }
3700
3701 static GstElement *
3702 session_request_element_full (GstRtpBinSession * session, guint signal,
3703     guint ssrc, guint8 pt)
3704 {
3705   GstElement *element = NULL;
3706   GstRtpBin *bin = session->bin;
3707
3708   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, ssrc, pt,
3709       &element);
3710
3711   if (element) {
3712     if (!bin_manage_element (bin, element))
3713       goto manage_failed;
3714     session->elements = g_slist_prepend (session->elements, element);
3715   }
3716   return element;
3717
3718   /* ERRORS */
3719 manage_failed:
3720   {
3721     GST_WARNING_OBJECT (bin, "unable to manage element");
3722     gst_object_unref (element);
3723     return NULL;
3724   }
3725 }
3726
3727 static GstElement *
3728 session_request_element (GstRtpBinSession * session, guint signal)
3729 {
3730   GstElement *element = NULL;
3731   GstRtpBin *bin = session->bin;
3732
3733   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
3734
3735   if (element) {
3736     if (!bin_manage_element (bin, element))
3737       goto manage_failed;
3738     session->elements = g_slist_prepend (session->elements, element);
3739   }
3740   return element;
3741
3742   /* ERRORS */
3743 manage_failed:
3744   {
3745     GST_WARNING_OBJECT (bin, "unable to manage element");
3746     gst_object_unref (element);
3747     return NULL;
3748   }
3749 }
3750
3751 static gboolean
3752 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
3753 {
3754   GstPad *gpad = GST_PAD_CAST (user_data);
3755
3756   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
3757   gst_pad_store_sticky_event (gpad, *event);
3758
3759   return TRUE;
3760 }
3761
3762 static gboolean
3763 ensure_early_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
3764 {
3765   const gchar *factory;
3766   gchar *sess_id_str;
3767
3768   if (session->early_fec_decoder)
3769     goto done;
3770
3771   sess_id_str = g_strdup_printf ("%u", session->id);
3772   factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str);
3773   g_free (sess_id_str);
3774
3775   /* First try the property */
3776   if (factory) {
3777     GError *err = NULL;
3778
3779     session->early_fec_decoder =
3780         gst_parse_bin_from_description_full (factory, TRUE, NULL,
3781         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
3782         &err);
3783     if (!session->early_fec_decoder) {
3784       GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
3785           err->message);
3786     }
3787
3788     bin_manage_element (session->bin, session->early_fec_decoder);
3789     session->elements =
3790         g_slist_prepend (session->elements, session->early_fec_decoder);
3791     GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
3792         " for session %u", session->early_fec_decoder, session->id);
3793   }
3794
3795   /* Do not fallback to the signal as the signal expects a fec decoder to
3796    * be placed at a different place in the pipeline */
3797
3798 done:
3799   return session->early_fec_decoder != NULL;
3800 }
3801
3802 static void
3803 expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
3804     guint8 pt)
3805 {
3806   GstElementClass *klass;
3807   GstPadTemplate *templ;
3808   gchar *padname;
3809   GstPad *gpad;
3810
3811   gst_object_ref (pad);
3812
3813   if (stream->session->storage) {
3814     /* First try the legacy signal, with no ssrc and pt as parameters.
3815      * This will likely cause issues for the BUNDLE case. */
3816     GstElement *fec_decoder =
3817         session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
3818
3819     /* Now try the new signal, where the application can provide a FEC
3820      * decoder according to ssrc and pt. */
3821     if (!fec_decoder) {
3822       fec_decoder =
3823           session_request_element_full (stream->session,
3824           SIGNAL_REQUEST_FEC_DECODER_FULL, stream->ssrc, pt);
3825     }
3826
3827     if (fec_decoder) {
3828       GstPad *sinkpad, *srcpad;
3829       GstPadLinkReturn ret;
3830
3831       sinkpad = gst_element_get_static_pad (fec_decoder, "sink");
3832
3833       if (!sinkpad)
3834         goto fec_decoder_sink_failed;
3835
3836       ret = gst_pad_link (pad, sinkpad);
3837       gst_object_unref (sinkpad);
3838
3839       if (ret != GST_PAD_LINK_OK)
3840         goto fec_decoder_link_failed;
3841
3842       srcpad = gst_element_get_static_pad (fec_decoder, "src");
3843
3844       if (!srcpad)
3845         goto fec_decoder_src_failed;
3846
3847       gst_pad_sticky_events_foreach (pad, copy_sticky_events, srcpad);
3848       gst_object_unref (pad);
3849       pad = srcpad;
3850     }
3851   }
3852
3853   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3854
3855   /* ghost the pad to the parent */
3856   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3857   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3858   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3859       stream->session->id, stream->ssrc, pt);
3860   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3861   g_free (padname);
3862   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
3863
3864   gst_pad_set_active (gpad, TRUE);
3865   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3866
3867   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3868   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3869
3870 done:
3871   gst_object_unref (pad);
3872
3873   return;
3874
3875 shutdown:
3876   {
3877     GST_DEBUG ("ignoring, we are shutting down");
3878     goto done;
3879   }
3880 fec_decoder_sink_failed:
3881   {
3882     g_warning ("rtpbin: failed to get fec encoder sink pad for session %u",
3883         stream->session->id);
3884     goto done;
3885   }
3886 fec_decoder_src_failed:
3887   {
3888     g_warning ("rtpbin: failed to get fec encoder src pad for session %u",
3889         stream->session->id);
3890     goto done;
3891   }
3892 fec_decoder_link_failed:
3893   {
3894     g_warning ("rtpbin: failed to link fec decoder for session %u",
3895         stream->session->id);
3896     goto done;
3897   }
3898 }
3899
3900 /* a new pad (SSRC) was created in @session. This signal is emitted from the
3901  * payload demuxer. */
3902 static void
3903 new_payload_found (GstElement * element, guint pt, GstPad * pad,
3904     GstRtpBinStream * stream)
3905 {
3906   GstRtpBin *rtpbin;
3907
3908   rtpbin = stream->bin;
3909
3910   GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt);
3911
3912   expose_recv_src_pad (rtpbin, pad, stream, pt);
3913 }
3914
3915 static void
3916 payload_pad_removed (GstElement * element, GstPad * pad,
3917     GstRtpBinStream * stream)
3918 {
3919   GstRtpBin *rtpbin;
3920   GstPad *gpad;
3921
3922   rtpbin = stream->bin;
3923
3924   GST_DEBUG ("payload pad removed");
3925
3926   GST_RTP_BIN_DYN_LOCK (rtpbin);
3927   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
3928     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
3929
3930     gst_pad_set_active (gpad, FALSE);
3931     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3932   }
3933   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3934 }
3935
3936 static GstCaps *
3937 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
3938 {
3939   GstRtpBin *rtpbin;
3940   GstCaps *caps;
3941
3942   rtpbin = session->bin;
3943
3944   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt,
3945       session->id);
3946
3947   caps = get_pt_map (session, pt);
3948   if (!caps)
3949     goto no_caps;
3950
3951   return caps;
3952
3953   /* ERRORS */
3954 no_caps:
3955   {
3956     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
3957     return NULL;
3958   }
3959 }
3960
3961 static GstCaps *
3962 ptdemux_pt_map_requested (GstElement * element, guint pt,
3963     GstRtpBinSession * session)
3964 {
3965   GstCaps *ret = pt_map_requested (element, pt, session);
3966
3967   if (ret && gst_caps_get_size (ret) == 1) {
3968     const GstStructure *s = gst_caps_get_structure (ret, 0);
3969     gboolean is_fec;
3970
3971     if (gst_structure_get_boolean (s, "is-fec", &is_fec) && is_fec) {
3972       GValue v = G_VALUE_INIT;
3973       GValue v2 = G_VALUE_INIT;
3974
3975       GST_INFO_OBJECT (session->bin, "Will ignore FEC pt %u in session %u", pt,
3976           session->id);
3977       g_value_init (&v, GST_TYPE_ARRAY);
3978       g_value_init (&v2, G_TYPE_INT);
3979       g_object_get_property (G_OBJECT (element), "ignored-payload-types", &v);
3980       g_value_set_int (&v2, pt);
3981       gst_value_array_append_value (&v, &v2);
3982       g_value_unset (&v2);
3983       g_object_set_property (G_OBJECT (element), "ignored-payload-types", &v);
3984       g_value_unset (&v);
3985     }
3986   }
3987
3988   return ret;
3989 }
3990
3991 static void
3992 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
3993 {
3994   GST_DEBUG_OBJECT (session->bin,
3995       "emitting signal for pt type changed to %u in session %u", pt,
3996       session->id);
3997
3998   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
3999       0, session->id, pt);
4000 }
4001
4002 /* emitted when caps changed for the session */
4003 static void
4004 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
4005 {
4006   GstRtpBin *bin;
4007   GstCaps *caps;
4008   gint payload;
4009   const GstStructure *s;
4010
4011   bin = session->bin;
4012
4013   g_object_get (pad, "caps", &caps, NULL);
4014
4015   if (caps == NULL)
4016     return;
4017
4018   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
4019
4020   s = gst_caps_get_structure (caps, 0);
4021
4022   /* get payload, finish when it's not there */
4023   if (!gst_structure_get_int (s, "payload", &payload)) {
4024     gst_caps_unref (caps);
4025     return;
4026   }
4027
4028   GST_RTP_SESSION_LOCK (session);
4029   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
4030   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
4031   GST_RTP_SESSION_UNLOCK (session);
4032 }
4033
4034 /* a new pad (SSRC) was created in @session */
4035 static void
4036 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
4037     GstRtpBinSession * session)
4038 {
4039   GstRtpBin *rtpbin;
4040   GstRtpBinStream *stream;
4041   GstPad *sinkpad, *srcpad;
4042   gchar *padname;
4043
4044   rtpbin = session->bin;
4045
4046   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
4047       GST_DEBUG_PAD_NAME (pad));
4048
4049   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
4050
4051   GST_RTP_SESSION_LOCK (session);
4052
4053   /* create new stream */
4054   stream = create_stream (session, ssrc);
4055   if (!stream)
4056     goto no_stream;
4057
4058   /* get pad and link */
4059   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
4060   padname = g_strdup_printf ("src_%u", ssrc);
4061   srcpad = gst_element_get_static_pad (element, padname);
4062   g_free (padname);
4063
4064   if (session->early_fec_decoder) {
4065     GST_DEBUG_OBJECT (rtpbin, "linking fec decoder");
4066     sinkpad = gst_element_get_static_pad (session->early_fec_decoder, "sink");
4067     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4068     gst_object_unref (sinkpad);
4069     gst_object_unref (srcpad);
4070     srcpad = gst_element_get_static_pad (session->early_fec_decoder, "src");
4071   }
4072
4073   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
4074   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4075   gst_object_unref (sinkpad);
4076   gst_object_unref (srcpad);
4077
4078   sinkpad = gst_element_request_pad_simple (stream->buffer, "sink_rtcp");
4079   if (sinkpad) {
4080     GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
4081     padname = g_strdup_printf ("rtcp_src_%u", ssrc);
4082     srcpad = gst_element_get_static_pad (element, padname);
4083     g_free (padname);
4084     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4085     gst_object_unref (sinkpad);
4086     gst_object_unref (srcpad);
4087   }
4088
4089   if (g_signal_lookup ("handle-sync", G_OBJECT_TYPE (stream->buffer)) != 0) {
4090     /* connect to the RTCP sync signal from the jitterbuffer */
4091     GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
4092     stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
4093         "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
4094   }
4095
4096   if (stream->demux) {
4097     /* connect to the new-pad signal of the payload demuxer, this will expose the
4098      * new pad by ghosting it. */
4099     stream->demux_newpad_sig = g_signal_connect (stream->demux,
4100         "new-payload-type", (GCallback) new_payload_found, stream);
4101     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
4102         "pad-removed", (GCallback) payload_pad_removed, stream);
4103
4104     /* connect to the request-pt-map signal. This signal will be emitted by the
4105      * demuxer so that it can apply a proper caps on the buffers for the
4106      * depayloaders. */
4107     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
4108         "request-pt-map", (GCallback) ptdemux_pt_map_requested, session);
4109     /* connect to the  signal so it can be forwarded. */
4110     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
4111         "payload-type-change", (GCallback) payload_type_change, session);
4112
4113     GST_RTP_SESSION_UNLOCK (session);
4114     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4115   } else {
4116     /* add rtpjitterbuffer src pad to pads */
4117     GstPad *pad;
4118
4119     pad = gst_element_get_static_pad (stream->buffer, "src");
4120
4121     GST_RTP_SESSION_UNLOCK (session);
4122     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4123
4124     expose_recv_src_pad (rtpbin, pad, stream, 255);
4125
4126     gst_object_unref (pad);
4127   }
4128
4129   return;
4130
4131   /* ERRORS */
4132 shutdown:
4133   {
4134     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
4135     return;
4136   }
4137 no_stream:
4138   {
4139     GST_RTP_SESSION_UNLOCK (session);
4140     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4141     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
4142     return;
4143   }
4144 }
4145
4146 static GstPad *
4147 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session)
4148 {
4149   guint sessid = session->id;
4150   GstPad *recv_rtp_sink;
4151   GstElement *decoder;
4152
4153   g_assert (!session->recv_rtp_sink);
4154
4155   /* get recv_rtp pad and store */
4156   session->recv_rtp_sink =
4157       gst_element_request_pad_simple (session->session, "recv_rtp_sink");
4158   if (session->recv_rtp_sink == NULL)
4159     goto pad_failed;
4160
4161   g_signal_connect (session->recv_rtp_sink, "notify::caps",
4162       (GCallback) caps_changed, session);
4163
4164   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
4165   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
4166   if (decoder) {
4167     GstPad *decsrc, *decsink;
4168     GstPadLinkReturn ret;
4169
4170     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
4171     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
4172     if (decsink == NULL)
4173       goto dec_sink_failed;
4174
4175     recv_rtp_sink = decsink;
4176
4177     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
4178     if (decsrc == NULL)
4179       goto dec_src_failed;
4180
4181     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
4182
4183     gst_object_unref (decsrc);
4184
4185     if (ret != GST_PAD_LINK_OK)
4186       goto dec_link_failed;
4187
4188   } else {
4189     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
4190     recv_rtp_sink = gst_object_ref (session->recv_rtp_sink);
4191   }
4192
4193   return recv_rtp_sink;
4194
4195   /* ERRORS */
4196 pad_failed:
4197   {
4198     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
4199     return NULL;
4200   }
4201 dec_sink_failed:
4202   {
4203     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4204     return NULL;
4205   }
4206 dec_src_failed:
4207   {
4208     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4209     gst_object_unref (recv_rtp_sink);
4210     return NULL;
4211   }
4212 dec_link_failed:
4213   {
4214     g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid);
4215     gst_object_unref (recv_rtp_sink);
4216     return NULL;
4217   }
4218 }
4219
4220 static void
4221 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
4222     guint sessid)
4223 {
4224   GstElement *aux;
4225   GstPad *recv_rtp_src;
4226
4227   g_assert (!session->recv_rtp_src);
4228
4229   session->recv_rtp_src =
4230       gst_element_get_static_pad (session->session, "recv_rtp_src");
4231   if (session->recv_rtp_src == NULL)
4232     goto pad_failed;
4233
4234   /* find out if we need AUX elements */
4235   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
4236   if (aux) {
4237     gchar *pname;
4238     GstPad *auxsink;
4239     GstPadLinkReturn ret;
4240
4241     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
4242
4243     pname = g_strdup_printf ("sink_%u", sessid);
4244     auxsink = gst_element_get_static_pad (aux, pname);
4245     g_free (pname);
4246     if (auxsink == NULL)
4247       goto aux_sink_failed;
4248
4249     ret = gst_pad_link (session->recv_rtp_src, auxsink);
4250     gst_object_unref (auxsink);
4251     if (ret != GST_PAD_LINK_OK)
4252       goto aux_link_failed;
4253
4254     /* this can be NULL when this AUX element is not to be linked any further */
4255     pname = g_strdup_printf ("src_%u", sessid);
4256     recv_rtp_src = gst_element_get_static_pad (aux, pname);
4257     g_free (pname);
4258   } else {
4259     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
4260   }
4261
4262   /* Add a storage element if needed */
4263   if (recv_rtp_src && session->storage) {
4264     GstPadLinkReturn ret;
4265     GstPad *sinkpad = gst_element_get_static_pad (session->storage, "sink");
4266
4267     ret = gst_pad_link (recv_rtp_src, sinkpad);
4268
4269     gst_object_unref (sinkpad);
4270     gst_object_unref (recv_rtp_src);
4271
4272     if (ret != GST_PAD_LINK_OK)
4273       goto storage_link_failed;
4274
4275     recv_rtp_src = gst_element_get_static_pad (session->storage, "src");
4276   }
4277
4278   if (recv_rtp_src) {
4279     GstPad *sinkdpad;
4280
4281     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
4282     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
4283     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
4284     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4285     gst_object_unref (sinkdpad);
4286     gst_object_unref (recv_rtp_src);
4287
4288     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
4289     session->demux_newpad_sig = g_signal_connect (session->demux,
4290         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
4291     session->demux_padremoved_sig = g_signal_connect (session->demux,
4292         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
4293   }
4294
4295   return;
4296
4297 pad_failed:
4298   {
4299     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
4300     return;
4301   }
4302 aux_sink_failed:
4303   {
4304     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4305     return;
4306   }
4307 aux_link_failed:
4308   {
4309     g_warning ("rtpbin: failed to link AUX pad to session %u", sessid);
4310     return;
4311   }
4312 storage_link_failed:
4313   {
4314     g_warning ("rtpbin: failed to link storage");
4315     return;
4316   }
4317 }
4318
4319 /* Create a pad for receiving RTP for the session in @name. Must be called with
4320  * RTP_BIN_LOCK.
4321  */
4322 static GstPad *
4323 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4324 {
4325   guint sessid;
4326   GstRtpBinSession *session;
4327   GstPad *recv_rtp_sink;
4328
4329   /* first get the session number */
4330   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
4331     goto no_name;
4332
4333   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4334
4335   /* get or create session */
4336   session = find_session_by_id (rtpbin, sessid);
4337   if (!session) {
4338     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4339     /* create session now */
4340     session = create_session (rtpbin, sessid);
4341     if (session == NULL)
4342       goto create_error;
4343   }
4344
4345   /* check if pad was requested */
4346   if (session->recv_rtp_sink_ghost != NULL)
4347     return session->recv_rtp_sink_ghost;
4348
4349   /* setup the session sink pad */
4350   recv_rtp_sink = complete_session_sink (rtpbin, session);
4351   if (!recv_rtp_sink)
4352     goto session_sink_failed;
4353
4354   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
4355   session->recv_rtp_sink_ghost =
4356       gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ);
4357   gst_object_unref (recv_rtp_sink);
4358   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
4359   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
4360
4361   complete_session_receiver (rtpbin, session, sessid);
4362
4363   return session->recv_rtp_sink_ghost;
4364
4365   /* ERRORS */
4366 no_name:
4367   {
4368     g_warning ("rtpbin: cannot find session id for pad: %s",
4369         GST_STR_NULL (name));
4370     return NULL;
4371   }
4372 create_error:
4373   {
4374     /* create_session already warned */
4375     return NULL;
4376   }
4377 session_sink_failed:
4378   {
4379     /* warning already done */
4380     return NULL;
4381   }
4382 }
4383
4384 static void
4385 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4386 {
4387   if (session->demux_newpad_sig) {
4388     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
4389     session->demux_newpad_sig = 0;
4390   }
4391   if (session->demux_padremoved_sig) {
4392     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
4393     session->demux_padremoved_sig = 0;
4394   }
4395   if (session->recv_rtp_src) {
4396     gst_object_unref (session->recv_rtp_src);
4397     session->recv_rtp_src = NULL;
4398   }
4399   if (session->recv_rtp_sink) {
4400     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
4401     gst_object_unref (session->recv_rtp_sink);
4402     session->recv_rtp_sink = NULL;
4403   }
4404   if (session->recv_rtp_sink_ghost) {
4405     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
4406     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4407         session->recv_rtp_sink_ghost);
4408     session->recv_rtp_sink_ghost = NULL;
4409   }
4410 }
4411
4412 static gint
4413 fec_sinkpad_find (const GValue * item, gchar * padname)
4414 {
4415   GstPad *pad = g_value_get_object (item);
4416   return g_strcmp0 (GST_PAD_NAME (pad), padname);
4417 }
4418
4419 static GstPad *
4420 complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
4421     guint fec_idx)
4422 {
4423   gboolean have_static_pad;
4424   gchar *padname;
4425
4426   GstPad *ret;
4427   GstIterator *it;
4428   GValue item = { 0, };
4429
4430   if (!ensure_early_fec_decoder (rtpbin, session))
4431     goto no_decoder;
4432
4433   padname = g_strdup_printf ("fec_%u", fec_idx);
4434
4435   GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad %s", padname);
4436
4437   /* First try to find the decoder static pad that matches the padname */
4438   it = gst_element_iterate_sink_pads (session->early_fec_decoder);
4439   have_static_pad =
4440       gst_iterator_find_custom (it, (GCompareFunc) fec_sinkpad_find, &item,
4441       padname);
4442
4443   if (have_static_pad) {
4444     ret = g_value_get_object (&item);
4445     gst_object_ref (ret);
4446     g_value_unset (&item);
4447   } else {
4448     ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
4449   }
4450
4451   g_free (padname);
4452   gst_iterator_free (it);
4453
4454   if (ret == NULL)
4455     goto pad_failed;
4456
4457   session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret);
4458
4459   return ret;
4460
4461 pad_failed:
4462   {
4463     g_warning ("rtpbin: failed to get decoder fec pad");
4464     return NULL;
4465   }
4466 no_decoder:
4467   {
4468     g_warning ("rtpbin: failed to build FEC decoder for session %u",
4469         session->id);
4470     return NULL;
4471   }
4472 }
4473
4474 static GstPad *
4475 complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
4476     guint sessid)
4477 {
4478   GstElement *decoder;
4479   GstPad *sinkdpad;
4480   GstPad *decsink = NULL;
4481
4482   /* get recv_rtp pad and store */
4483   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
4484   session->recv_rtcp_sink =
4485       gst_element_request_pad_simple (session->session, "recv_rtcp_sink");
4486   if (session->recv_rtcp_sink == NULL)
4487     goto pad_failed;
4488
4489   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
4490   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
4491   if (decoder) {
4492     GstPad *decsrc;
4493     GstPadLinkReturn ret;
4494
4495     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
4496     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
4497     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
4498
4499     if (decsink == NULL)
4500       goto dec_sink_failed;
4501
4502     if (decsrc == NULL)
4503       goto dec_src_failed;
4504
4505     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
4506
4507     gst_object_unref (decsrc);
4508
4509     if (ret != GST_PAD_LINK_OK)
4510       goto dec_link_failed;
4511   } else {
4512     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
4513     decsink = gst_object_ref (session->recv_rtcp_sink);
4514   }
4515
4516   /* get srcpad, link to SSRCDemux */
4517   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
4518   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
4519   if (session->sync_src == NULL)
4520     goto src_pad_failed;
4521
4522   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
4523   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
4524   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4525   gst_object_unref (sinkdpad);
4526
4527   return decsink;
4528
4529 pad_failed:
4530   {
4531     g_warning ("rtpbin: failed to get session rtcp_sink pad");
4532     return NULL;
4533   }
4534 dec_sink_failed:
4535   {
4536     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4537     return NULL;
4538   }
4539 dec_src_failed:
4540   {
4541     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4542     goto cleanup;
4543   }
4544 dec_link_failed:
4545   {
4546     g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid);
4547     goto cleanup;
4548   }
4549 src_pad_failed:
4550   {
4551     g_warning ("rtpbin: failed to get session sync_src pad");
4552   }
4553
4554 cleanup:
4555   gst_object_unref (decsink);
4556   return NULL;
4557 }
4558
4559 /* Create a pad for receiving RTCP for the session in @name. Must be called with
4560  * RTP_BIN_LOCK.
4561  */
4562 static GstPad *
4563 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
4564     const gchar * name)
4565 {
4566   guint sessid;
4567   GstRtpBinSession *session;
4568   GstPad *decsink = NULL;
4569
4570   /* first get the session number */
4571   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
4572     goto no_name;
4573
4574   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4575
4576   /* get or create the session */
4577   session = find_session_by_id (rtpbin, sessid);
4578   if (!session) {
4579     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4580     /* create session now */
4581     session = create_session (rtpbin, sessid);
4582     if (session == NULL)
4583       goto create_error;
4584   }
4585
4586   /* check if pad was requested */
4587   if (session->recv_rtcp_sink_ghost != NULL)
4588     return session->recv_rtcp_sink_ghost;
4589
4590   decsink = complete_session_rtcp (rtpbin, session, sessid);
4591   if (!decsink)
4592     goto create_error;
4593
4594   session->recv_rtcp_sink_ghost =
4595       gst_ghost_pad_new_from_template (name, decsink, templ);
4596   gst_object_unref (decsink);
4597   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
4598   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
4599       session->recv_rtcp_sink_ghost);
4600
4601   return session->recv_rtcp_sink_ghost;
4602
4603   /* ERRORS */
4604 no_name:
4605   {
4606     g_warning ("rtpbin: cannot find session id for pad: %s",
4607         GST_STR_NULL (name));
4608     return NULL;
4609   }
4610 create_error:
4611   {
4612     /* create_session already warned */
4613     return NULL;
4614   }
4615 }
4616
4617 static GstPad *
4618 create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4619 {
4620   guint sessid, fec_idx;
4621   GstRtpBinSession *session;
4622   GstPad *decsink = NULL;
4623   GstPad *ghost;
4624
4625   /* first get the session number */
4626   if (name == NULL
4627       || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2)
4628     goto no_name;
4629
4630   if (fec_idx > 1)
4631     goto invalid_idx;
4632
4633   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4634
4635   /* get or create the session */
4636   session = find_session_by_id (rtpbin, sessid);
4637   if (!session) {
4638     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4639     /* create session now */
4640     session = create_session (rtpbin, sessid);
4641     if (session == NULL)
4642       goto create_error;
4643   }
4644
4645   decsink = complete_session_fec (rtpbin, session, fec_idx);
4646   if (!decsink)
4647     goto create_error;
4648
4649   ghost = gst_ghost_pad_new_from_template (name, decsink, templ);
4650   session->recv_fec_sink_ghosts =
4651       g_slist_prepend (session->recv_fec_sink_ghosts, ghost);
4652   gst_object_unref (decsink);
4653   gst_pad_set_active (ghost, TRUE);
4654   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4655
4656   return ghost;
4657
4658   /* ERRORS */
4659 no_name:
4660   {
4661     g_warning ("rtpbin: cannot find session id for pad: %s",
4662         GST_STR_NULL (name));
4663     return NULL;
4664   }
4665 invalid_idx:
4666   {
4667     g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name));
4668     return NULL;
4669   }
4670 create_error:
4671   {
4672     /* create_session already warned */
4673     return NULL;
4674   }
4675 }
4676
4677 static void
4678 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4679 {
4680   if (session->recv_rtcp_sink_ghost) {
4681     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
4682     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4683         session->recv_rtcp_sink_ghost);
4684     session->recv_rtcp_sink_ghost = NULL;
4685   }
4686   if (session->sync_src) {
4687     /* releasing the request pad should also unref the sync pad */
4688     gst_object_unref (session->sync_src);
4689     session->sync_src = NULL;
4690   }
4691   if (session->recv_rtcp_sink) {
4692     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
4693     gst_object_unref (session->recv_rtcp_sink);
4694     session->recv_rtcp_sink = NULL;
4695   }
4696 }
4697
4698 static void
4699 remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
4700     GstPad * ghost)
4701 {
4702   GSList *item;
4703   GstPad *target;
4704
4705   target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
4706
4707   if (target) {
4708     item = g_slist_find (session->recv_fec_sinks, target);
4709     if (item) {
4710       GstPadTemplate *templ;
4711       GstPad *pad;
4712
4713       pad = item->data;
4714       templ = gst_pad_get_pad_template (pad);
4715
4716       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_REQUEST) {
4717         GST_DEBUG_OBJECT (rtpbin,
4718             "Releasing FEC decoder pad %" GST_PTR_FORMAT, pad);
4719         gst_element_release_request_pad (session->early_fec_decoder, pad);
4720       } else {
4721         gst_object_unref (pad);
4722       }
4723
4724       session->recv_fec_sinks =
4725           g_slist_delete_link (session->recv_fec_sinks, item);
4726
4727       gst_object_unref (templ);
4728     }
4729     gst_object_unref (target);
4730   }
4731
4732   item = g_slist_find (session->recv_fec_sink_ghosts, ghost);
4733   if (item)
4734     session->recv_fec_sink_ghosts =
4735         g_slist_delete_link (session->recv_fec_sink_ghosts, item);
4736
4737   gst_pad_set_active (ghost, FALSE);
4738   gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4739 }
4740
4741 static void
4742 remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
4743 {
4744   GSList *copy;
4745   GSList *tmp;
4746
4747   copy = g_slist_copy (session->recv_fec_sink_ghosts);
4748
4749   for (tmp = copy; tmp; tmp = tmp->next) {
4750     remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data);
4751   }
4752
4753   g_slist_free (copy);
4754 }
4755
4756 static gboolean
4757 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
4758 {
4759   gchar *gname;
4760   guint sessid = session->id;
4761   GstPad *send_rtp_src;
4762   GstElement *encoder;
4763   GstElementClass *klass;
4764   GstPadTemplate *templ;
4765   gboolean ret = FALSE;
4766
4767   /* get srcpad */
4768   send_rtp_src = gst_element_get_static_pad (session->session, "send_rtp_src");
4769
4770   if (send_rtp_src == NULL)
4771     goto no_srcpad;
4772
4773   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
4774   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
4775   if (encoder) {
4776     gchar *ename;
4777     GstPad *encsrc, *encsink;
4778     GstPadLinkReturn ret;
4779
4780     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
4781     ename = g_strdup_printf ("rtp_src_%u", sessid);
4782     encsrc = gst_element_get_static_pad (encoder, ename);
4783     g_free (ename);
4784
4785     if (encsrc == NULL)
4786       goto enc_src_failed;
4787
4788     ename = g_strdup_printf ("rtp_sink_%u", sessid);
4789     encsink = gst_element_get_static_pad (encoder, ename);
4790     g_free (ename);
4791     if (encsink == NULL)
4792       goto enc_sink_failed;
4793
4794     ret = gst_pad_link (send_rtp_src, encsink);
4795     gst_object_unref (encsink);
4796     gst_object_unref (send_rtp_src);
4797
4798     send_rtp_src = encsrc;
4799
4800     if (ret != GST_PAD_LINK_OK)
4801       goto enc_link_failed;
4802   } else {
4803     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
4804   }
4805
4806   /* ghost the new source pad */
4807   klass = GST_ELEMENT_GET_CLASS (rtpbin);
4808   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
4809   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
4810   session->send_rtp_src_ghost =
4811       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
4812   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
4813   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
4814       session->send_rtp_src_ghost);
4815   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
4816   g_free (gname);
4817
4818   ret = TRUE;
4819
4820 done:
4821   if (send_rtp_src)
4822     gst_object_unref (send_rtp_src);
4823
4824   return ret;
4825
4826   /* ERRORS */
4827 no_srcpad:
4828   {
4829     g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid);
4830     goto done;
4831   }
4832 enc_src_failed:
4833   {
4834     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4835         " src pad for session %u", encoder, sessid);
4836     goto done;
4837   }
4838 enc_sink_failed:
4839   {
4840     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4841         " sink pad for session %u", encoder, sessid);
4842     goto done;
4843   }
4844 enc_link_failed:
4845   {
4846     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
4847         encoder, sessid);
4848     goto done;
4849   }
4850 }
4851
4852 static gboolean
4853 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
4854 {
4855   GstPad *pad;
4856   gchar *name;
4857   guint sessid;
4858   GstRtpBinSession *session = user_data, *newsess;
4859   GstRtpBin *rtpbin = session->bin;
4860   GstPadLinkReturn ret;
4861
4862   pad = g_value_get_object (item);
4863   name = gst_pad_get_name (pad);
4864
4865   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
4866     goto no_name;
4867
4868   g_free (name);
4869
4870   newsess = find_session_by_id (rtpbin, sessid);
4871   if (newsess == NULL) {
4872     /* create new session */
4873     newsess = create_session (rtpbin, sessid);
4874     if (newsess == NULL)
4875       goto create_error;
4876   } else if (newsess->send_rtp_sink != NULL)
4877     goto existing_session;
4878
4879   /* get send_rtp pad and store */
4880   newsess->send_rtp_sink =
4881       gst_element_request_pad_simple (newsess->session, "send_rtp_sink");
4882   if (newsess->send_rtp_sink == NULL)
4883     goto pad_failed;
4884
4885   ret = gst_pad_link (pad, newsess->send_rtp_sink);
4886   if (ret != GST_PAD_LINK_OK)
4887     goto aux_link_failed;
4888
4889   if (!complete_session_src (rtpbin, newsess))
4890     goto session_src_failed;
4891
4892   return TRUE;
4893
4894   /* ERRORS */
4895 no_name:
4896   {
4897     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
4898     g_free (name);
4899     return TRUE;
4900   }
4901 create_error:
4902   {
4903     /* create_session already warned */
4904     return FALSE;
4905   }
4906 existing_session:
4907   {
4908     GST_DEBUG_OBJECT (rtpbin,
4909         "skipping src_%i setup, since it is already configured.", sessid);
4910     return TRUE;
4911   }
4912 pad_failed:
4913   {
4914     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4915     return FALSE;
4916   }
4917 aux_link_failed:
4918   {
4919     g_warning ("rtpbin: failed to link AUX for session %u", sessid);
4920     return FALSE;
4921   }
4922 session_src_failed:
4923   {
4924     g_warning ("rtpbin: failed to complete AUX for session %u", sessid);
4925     return FALSE;
4926   }
4927 }
4928
4929 static gboolean
4930 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
4931     GstElement * aux)
4932 {
4933   GstIterator *it;
4934   GValue result = { 0, };
4935   GstIteratorResult res;
4936
4937   it = gst_element_iterate_src_pads (aux);
4938   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
4939   gst_iterator_free (it);
4940
4941   return res == GST_ITERATOR_DONE;
4942 }
4943
4944 static void
4945 fec_encoder_add_pad_unlocked (GstPad * pad, GstRtpBinSession * session)
4946 {
4947   GstElementClass *klass;
4948   gchar *gname;
4949   GstPadTemplate *templ;
4950   guint fec_idx;
4951   GstPad *ghost;
4952
4953   if (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) != 1) {
4954     GST_WARNING_OBJECT (session->bin,
4955         "FEC encoder added pad with name not matching fec_%%u (%s)",
4956         GST_PAD_NAME (pad));
4957     goto done;
4958   }
4959
4960   GST_INFO_OBJECT (session->bin, "FEC encoder for session %u exposed new pad",
4961       session->id);
4962
4963   klass = GST_ELEMENT_GET_CLASS (session->bin);
4964   gname = g_strdup_printf ("send_fec_src_%u_%u", session->id, fec_idx);
4965   templ = gst_element_class_get_pad_template (klass, "send_fec_src_%u_%u");
4966   ghost = gst_ghost_pad_new_from_template (gname, pad, templ);
4967   session->send_fec_src_ghosts =
4968       g_slist_prepend (session->send_fec_src_ghosts, ghost);
4969   gst_pad_set_active (ghost, TRUE);
4970   gst_pad_sticky_events_foreach (pad, copy_sticky_events, ghost);
4971   gst_element_add_pad (GST_ELEMENT (session->bin), ghost);
4972   g_free (gname);
4973
4974 done:
4975   return;
4976 }
4977
4978 static void
4979 fec_encoder_add_pad (GstPad * pad, GstRtpBinSession * session)
4980 {
4981   GST_RTP_BIN_LOCK (session->bin);
4982   fec_encoder_add_pad_unlocked (pad, session);
4983   GST_RTP_BIN_UNLOCK (session->bin);
4984 }
4985
4986 static gint
4987 fec_srcpad_iterator_filter (const GValue * item, GValue * unused)
4988 {
4989   guint fec_idx;
4990   GstPad *pad = g_value_get_object (item);
4991   GstPadTemplate *templ = gst_pad_get_pad_template (pad);
4992
4993   gint have_static_pad =
4994       (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_ALWAYS) &&
4995       (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) == 1);
4996
4997   gst_object_unref (templ);
4998
4999   /* return 0 to retain pad in filtered iterator */
5000   return !have_static_pad;
5001 }
5002
5003 static void
5004 fec_srcpad_iterator_foreach (const GValue * item, GstRtpBinSession * session)
5005 {
5006   GstPad *pad = g_value_get_object (item);
5007   fec_encoder_add_pad_unlocked (pad, session);
5008 }
5009
5010 static void
5011 fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
5012     GstRtpBinSession * session)
5013 {
5014   fec_encoder_add_pad (pad, session);
5015 }
5016
5017 static GstElement *
5018 request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session,
5019     guint sessid)
5020 {
5021   GstElement *ret = NULL;
5022   const gchar *factory;
5023   gchar *sess_id_str;
5024
5025   sess_id_str = g_strdup_printf ("%u", sessid);
5026   factory = gst_structure_get_string (rtpbin->fec_encoders, sess_id_str);
5027   g_free (sess_id_str);
5028
5029   /* First try the property */
5030   if (factory) {
5031     GError *err = NULL;
5032
5033     ret =
5034         gst_parse_bin_from_description_full (factory, TRUE, NULL,
5035         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
5036         &err);
5037     if (!ret) {
5038       GST_ERROR_OBJECT (rtpbin, "Failed to build encoder from factory: %s",
5039           err->message);
5040       goto done;
5041     }
5042
5043     bin_manage_element (session->bin, ret);
5044     session->elements = g_slist_prepend (session->elements, ret);
5045     GST_INFO_OBJECT (rtpbin, "Built FEC encoder: %" GST_PTR_FORMAT
5046         " for session %u", ret, sessid);
5047   }
5048
5049   /* Fallback to the signal */
5050   if (!ret)
5051     ret = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER);
5052
5053   if (ret) {
5054     /* First, add encoder pads that match fec_% template and are already present */
5055     GstIterator *it, *filter;
5056     GstIteratorResult it_ret = GST_ITERATOR_OK;
5057
5058     it = gst_element_iterate_src_pads (ret);
5059     filter =
5060         gst_iterator_filter (it, (GCompareFunc) fec_srcpad_iterator_filter,
5061         NULL);
5062
5063     while (it_ret == GST_ITERATOR_OK || it_ret == GST_ITERATOR_RESYNC) {
5064       it_ret =
5065           gst_iterator_foreach (filter,
5066           (GstIteratorForeachFunction) fec_srcpad_iterator_foreach, session);
5067
5068       if (it_ret == GST_ITERATOR_RESYNC)
5069         gst_iterator_resync (filter);
5070     }
5071
5072     gst_iterator_free (filter);
5073
5074     /* Finally, connect to pad-added signal if any of the encoder pads are
5075      * added later */
5076     g_signal_connect (ret, "pad-added", G_CALLBACK (fec_encoder_pad_added_cb),
5077         session);
5078   }
5079
5080 done:
5081   return ret;
5082 }
5083
5084 /* Create a pad for sending RTP for the session in @name. Must be called with
5085  * RTP_BIN_LOCK.
5086  */
5087 static GstPad *
5088 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
5089 {
5090   gchar *pname;
5091   guint sessid;
5092   GstPad *send_rtp_sink;
5093   GstElement *aux;
5094   GstElement *encoder;
5095   GstElement *prev = NULL;
5096   GstRtpBinSession *session;
5097
5098   /* first get the session number */
5099   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
5100     goto no_name;
5101
5102   /* get or create session */
5103   session = find_session_by_id (rtpbin, sessid);
5104   if (!session) {
5105     /* create session now */
5106     session = create_session (rtpbin, sessid);
5107     if (session == NULL)
5108       goto create_error;
5109   }
5110
5111   /* check if pad was requested */
5112   if (session->send_rtp_sink_ghost != NULL)
5113     return session->send_rtp_sink_ghost;
5114
5115   /* check if we are already using this session as a sender */
5116   if (session->send_rtp_sink != NULL)
5117     goto existing_session;
5118
5119   encoder = request_fec_encoder (rtpbin, session, sessid);
5120
5121   if (encoder) {
5122     GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder");
5123
5124     send_rtp_sink = gst_element_get_static_pad (encoder, "sink");
5125
5126     if (!send_rtp_sink)
5127       goto enc_sink_failed;
5128
5129     prev = encoder;
5130   }
5131
5132   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
5133   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
5134   if (aux) {
5135     GstPad *sinkpad;
5136     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
5137     if (!setup_aux_sender (rtpbin, session, aux))
5138       goto aux_session_failed;
5139
5140     pname = g_strdup_printf ("sink_%u", sessid);
5141     sinkpad = gst_element_get_static_pad (aux, pname);
5142     g_free (pname);
5143
5144     if (sinkpad == NULL)
5145       goto aux_sink_failed;
5146
5147     if (!prev) {
5148       send_rtp_sink = sinkpad;
5149     } else {
5150       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5151       GstPadLinkReturn ret;
5152
5153       ret = gst_pad_link (srcpad, sinkpad);
5154       gst_object_unref (srcpad);
5155       if (ret != GST_PAD_LINK_OK) {
5156         goto aux_link_failed;
5157       }
5158       gst_object_unref (sinkpad);
5159     }
5160     prev = aux;
5161   } else {
5162     /* get send_rtp pad and store */
5163     session->send_rtp_sink =
5164         gst_element_request_pad_simple (session->session, "send_rtp_sink");
5165     if (session->send_rtp_sink == NULL)
5166       goto pad_failed;
5167
5168     if (!complete_session_src (rtpbin, session))
5169       goto session_src_failed;
5170
5171     if (!prev) {
5172       send_rtp_sink = gst_object_ref (session->send_rtp_sink);
5173     } else {
5174       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5175       GstPadLinkReturn ret;
5176
5177       ret = gst_pad_link (srcpad, session->send_rtp_sink);
5178       gst_object_unref (srcpad);
5179       if (ret != GST_PAD_LINK_OK)
5180         goto session_link_failed;
5181     }
5182   }
5183
5184   session->send_rtp_sink_ghost =
5185       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
5186   gst_object_unref (send_rtp_sink);
5187   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
5188   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
5189
5190   return session->send_rtp_sink_ghost;
5191
5192   /* ERRORS */
5193 no_name:
5194   {
5195     g_warning ("rtpbin: cannot find session id for pad: %s",
5196         GST_STR_NULL (name));
5197     return NULL;
5198   }
5199 create_error:
5200   {
5201     /* create_session already warned */
5202     return NULL;
5203   }
5204 existing_session:
5205   {
5206     g_warning ("rtpbin: session %u is already in use", sessid);
5207     return NULL;
5208   }
5209 aux_session_failed:
5210   {
5211     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5212     return NULL;
5213   }
5214 aux_sink_failed:
5215   {
5216     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5217     return NULL;
5218   }
5219 aux_link_failed:
5220   {
5221     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5222         aux, sessid);
5223     return NULL;
5224   }
5225 pad_failed:
5226   {
5227     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
5228     return NULL;
5229   }
5230 session_src_failed:
5231   {
5232     g_warning ("rtpbin: failed to setup source pads for session %u", sessid);
5233     return NULL;
5234   }
5235 session_link_failed:
5236   {
5237     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5238         session, sessid);
5239     return NULL;
5240   }
5241 enc_sink_failed:
5242   {
5243     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
5244         " sink pad for session %u", encoder, sessid);
5245     return NULL;
5246   }
5247 }
5248
5249 static void
5250 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5251 {
5252   if (session->send_rtp_src_ghost) {
5253     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
5254     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5255         session->send_rtp_src_ghost);
5256     session->send_rtp_src_ghost = NULL;
5257   }
5258   if (session->send_rtp_sink) {
5259     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
5260         session->send_rtp_sink);
5261     gst_object_unref (session->send_rtp_sink);
5262     session->send_rtp_sink = NULL;
5263   }
5264   if (session->send_rtp_sink_ghost) {
5265     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
5266     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5267         session->send_rtp_sink_ghost);
5268     session->send_rtp_sink_ghost = NULL;
5269   }
5270 }
5271
5272 static void
5273 remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
5274 {
5275   GSList *tmp;
5276
5277   for (tmp = session->send_fec_src_ghosts; tmp; tmp = tmp->next) {
5278     GstPad *ghost = GST_PAD (tmp->data);
5279     gst_pad_set_active (ghost, FALSE);
5280     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
5281   }
5282
5283   g_slist_free (session->send_fec_src_ghosts);
5284   session->send_fec_src_ghosts = NULL;
5285 }
5286
5287 /* Create a pad for sending RTCP for the session in @name. Must be called with
5288  * RTP_BIN_LOCK.
5289  */
5290 static GstPad *
5291 create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
5292     const gchar * name)
5293 {
5294   guint sessid;
5295   GstPad *encsrc;
5296   GstElement *encoder;
5297   GstRtpBinSession *session;
5298
5299   /* first get the session number */
5300   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
5301     goto no_name;
5302
5303   /* get or create session */
5304   session = find_session_by_id (rtpbin, sessid);
5305   if (!session) {
5306     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
5307     /* create session now */
5308     session = create_session (rtpbin, sessid);
5309     if (session == NULL)
5310       goto create_error;
5311   }
5312
5313   /* check if pad was requested */
5314   if (session->send_rtcp_src_ghost != NULL)
5315     return session->send_rtcp_src_ghost;
5316
5317   /* get rtcp_src pad and store */
5318   session->send_rtcp_src =
5319       gst_element_request_pad_simple (session->session, "send_rtcp_src");
5320   if (session->send_rtcp_src == NULL)
5321     goto pad_failed;
5322
5323   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
5324   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
5325   if (encoder) {
5326     gchar *ename;
5327     GstPad *encsink;
5328     GstPadLinkReturn ret;
5329
5330     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
5331
5332     ename = g_strdup_printf ("rtcp_src_%u", sessid);
5333     encsrc = gst_element_get_static_pad (encoder, ename);
5334     g_free (ename);
5335     if (encsrc == NULL)
5336       goto enc_src_failed;
5337
5338     ename = g_strdup_printf ("rtcp_sink_%u", sessid);
5339     encsink = gst_element_get_static_pad (encoder, ename);
5340     g_free (ename);
5341     if (encsink == NULL)
5342       goto enc_sink_failed;
5343
5344     ret = gst_pad_link (session->send_rtcp_src, encsink);
5345     gst_object_unref (encsink);
5346
5347     if (ret != GST_PAD_LINK_OK)
5348       goto enc_link_failed;
5349   } else {
5350     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
5351     encsrc = gst_object_ref (session->send_rtcp_src);
5352   }
5353
5354   session->send_rtcp_src_ghost =
5355       gst_ghost_pad_new_from_template (name, encsrc, templ);
5356   gst_object_unref (encsrc);
5357   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
5358   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
5359
5360   return session->send_rtcp_src_ghost;
5361
5362   /* ERRORS */
5363 no_name:
5364   {
5365     g_warning ("rtpbin: cannot find session id for pad: %s",
5366         GST_STR_NULL (name));
5367     return NULL;
5368   }
5369 create_error:
5370   {
5371     /* create_session already warned */
5372     return NULL;
5373   }
5374 pad_failed:
5375   {
5376     g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid);
5377     return NULL;
5378   }
5379 enc_src_failed:
5380   {
5381     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
5382     return NULL;
5383   }
5384 enc_sink_failed:
5385   {
5386     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
5387     gst_object_unref (encsrc);
5388     return NULL;
5389   }
5390 enc_link_failed:
5391   {
5392     g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid);
5393     gst_object_unref (encsrc);
5394     return NULL;
5395   }
5396 }
5397
5398 static void
5399 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5400 {
5401   if (session->send_rtcp_src_ghost) {
5402     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
5403     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5404         session->send_rtcp_src_ghost);
5405     session->send_rtcp_src_ghost = NULL;
5406   }
5407   if (session->send_rtcp_src) {
5408     gst_element_release_request_pad (session->session, session->send_rtcp_src);
5409     gst_object_unref (session->send_rtcp_src);
5410     session->send_rtcp_src = NULL;
5411   }
5412 }
5413
5414 /* If the requested name is NULL we should create a name with
5415  * the session number assuming we want the lowest possible session
5416  * with a free pad like the template */
5417 static gchar *
5418 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
5419 {
5420   gboolean name_found = FALSE;
5421   gint session = 0;
5422   GstIterator *pad_it = NULL;
5423   gchar *pad_name = NULL;
5424   GValue data = { 0, };
5425
5426   GST_DEBUG_OBJECT (element, "find a free pad name for template");
5427   while (!name_found) {
5428     gboolean done = FALSE;
5429
5430     g_free (pad_name);
5431     pad_name = g_strdup_printf (templ->name_template, session++);
5432     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
5433     name_found = TRUE;
5434     while (!done) {
5435       switch (gst_iterator_next (pad_it, &data)) {
5436         case GST_ITERATOR_OK:
5437         {
5438           GstPad *pad;
5439           gchar *name;
5440
5441           pad = g_value_get_object (&data);
5442           name = gst_pad_get_name (pad);
5443
5444           if (strcmp (name, pad_name) == 0) {
5445             done = TRUE;
5446             name_found = FALSE;
5447           }
5448           g_free (name);
5449           g_value_reset (&data);
5450           break;
5451         }
5452         case GST_ITERATOR_ERROR:
5453         case GST_ITERATOR_RESYNC:
5454           /* restart iteration */
5455           done = TRUE;
5456           name_found = FALSE;
5457           session = 0;
5458           break;
5459         case GST_ITERATOR_DONE:
5460           done = TRUE;
5461           break;
5462       }
5463     }
5464     g_value_unset (&data);
5465     gst_iterator_free (pad_it);
5466   }
5467
5468   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
5469   return pad_name;
5470 }
5471
5472 /*
5473  */
5474 static GstPad *
5475 gst_rtp_bin_request_new_pad (GstElement * element,
5476     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
5477 {
5478   GstRtpBin *rtpbin;
5479   GstElementClass *klass;
5480   GstPad *result;
5481
5482   gchar *pad_name = NULL;
5483
5484   g_return_val_if_fail (templ != NULL, NULL);
5485   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
5486
5487   rtpbin = GST_RTP_BIN (element);
5488   klass = GST_ELEMENT_GET_CLASS (element);
5489
5490   GST_RTP_BIN_LOCK (rtpbin);
5491
5492   if (name == NULL) {
5493     /* use a free pad name */
5494     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
5495   } else {
5496     /* use the provided name */
5497     pad_name = g_strdup (name);
5498   }
5499
5500   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
5501
5502   /* figure out the template */
5503   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
5504     result = create_recv_rtp (rtpbin, templ, pad_name);
5505   } else if (templ == gst_element_class_get_pad_template (klass,
5506           "recv_rtcp_sink_%u")) {
5507     result = create_recv_rtcp (rtpbin, templ, pad_name);
5508   } else if (templ == gst_element_class_get_pad_template (klass,
5509           "send_rtp_sink_%u")) {
5510     result = create_send_rtp (rtpbin, templ, pad_name);
5511   } else if (templ == gst_element_class_get_pad_template (klass,
5512           "send_rtcp_src_%u")) {
5513     result = create_send_rtcp (rtpbin, templ, pad_name);
5514   } else if (templ == gst_element_class_get_pad_template (klass,
5515           "recv_fec_sink_%u_%u")) {
5516     result = create_recv_fec (rtpbin, templ, pad_name);
5517   } else
5518     goto wrong_template;
5519
5520   g_free (pad_name);
5521   GST_RTP_BIN_UNLOCK (rtpbin);
5522
5523   return result;
5524
5525   /* ERRORS */
5526 wrong_template:
5527   {
5528     g_free (pad_name);
5529     GST_RTP_BIN_UNLOCK (rtpbin);
5530     g_warning ("rtpbin: this is not our template");
5531     return NULL;
5532   }
5533 }
5534
5535 static void
5536 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
5537 {
5538   GstRtpBinSession *session;
5539   GstRtpBin *rtpbin;
5540
5541   g_return_if_fail (GST_IS_GHOST_PAD (pad));
5542   g_return_if_fail (GST_IS_RTP_BIN (element));
5543
5544   rtpbin = GST_RTP_BIN (element);
5545
5546   GST_RTP_BIN_LOCK (rtpbin);
5547   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
5548       GST_DEBUG_PAD_NAME (pad));
5549
5550   if (!(session = find_session_by_pad (rtpbin, pad)))
5551     goto unknown_pad;
5552
5553   if (session->recv_rtp_sink_ghost == pad) {
5554     remove_recv_rtp (rtpbin, session);
5555   } else if (session->recv_rtcp_sink_ghost == pad) {
5556     remove_recv_rtcp (rtpbin, session);
5557   } else if (session->send_rtp_sink_ghost == pad) {
5558     remove_send_rtp (rtpbin, session);
5559   } else if (session->send_rtcp_src_ghost == pad) {
5560     remove_rtcp (rtpbin, session);
5561   } else if (pad_is_recv_fec (session, pad)) {
5562     remove_recv_fec_for_pad (rtpbin, session, pad);
5563   }
5564
5565   /* no more request pads, free the complete session */
5566   if (session->recv_rtp_sink_ghost == NULL
5567       && session->recv_rtcp_sink_ghost == NULL
5568       && session->send_rtp_sink_ghost == NULL
5569       && session->send_rtcp_src_ghost == NULL
5570       && session->recv_fec_sink_ghosts == NULL) {
5571     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
5572     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
5573     free_session (session, rtpbin);
5574   }
5575   GST_RTP_BIN_UNLOCK (rtpbin);
5576
5577   return;
5578
5579   /* ERROR */
5580 unknown_pad:
5581   {
5582     GST_RTP_BIN_UNLOCK (rtpbin);
5583     g_warning ("rtpbin: %s:%s is not one of our request pads",
5584         GST_DEBUG_PAD_NAME (pad));
5585     return;
5586   }
5587 }