2fdba14e223b37f9966243a2585ff4de23ddeeec
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @title: rtpbin
23  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
24  *
25  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
26  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
27  * RTP sessions that will be synchronized together using RTCP SR packets.
28  *
29  * #GstRtpBin is configured with a number of request pads that define the
30  * functionality that is activated, similar to the #GstRtpSession element.
31  *
32  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
33  * number must be specified in the pad name.
34  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
35  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
36  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
37  * the packets are released from the jitterbuffer, they will be forwarded to a
38  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
39  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
40  * rtpbin with the session number, SSRC and payload type respectively as the pad
41  * name.
42  *
43  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
44  * session number must be specified in the pad name.
45  *
46  * If you want the session manager to generate and send RTCP packets, request
47  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
48  * on this pad contain SR/RR RTCP reports that should be sent to all participants
49  * in the session.
50  *
51  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
52  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
53  * the pad from the lowest available session will be returned. The session manager will modify the
54  * SSRC in the RTP packets to its own SSRC and will forward the packets on the
55  * send_rtp_src_\%u pad after updating its internal state.
56  *
57  * The session manager needs the clock-rate of the payload types it is handling
58  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
59  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
60  * signal.
61  *
62  * Access to the internal statistics of rtpbin is provided with the
63  * get-internal-session property. This action signal gives access to the
64  * RTPSession object which further provides action signals to retrieve the
65  * internal source and other sources.
66  *
67  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
68  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
69  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
70  * and decoders in order to support SRTP. The encoders must provide the pads
71  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
72  * RTCP. The session number will be used in the pad name. The decoders must provide
73  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
74  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
75  * internally.
76  *
77  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
78  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
79  * used to create or merge additional RTP streams. AUX elements are needed to
80  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
81  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
82  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
83  * and the pad will be linked to the session send_rtp_sink pad. Each session will
84  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
85  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
86  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
87  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
88  * The #GstRtpBin::request-jitterbuffer signal can be used to provide a custom
89  * element to perform arrival time smoothing, reordering and optionally packet
90  * loss detection and retransmission requests.
91  *
92  * ## Example pipelines
93  *
94  * |[
95  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
96  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
97  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
98  * |[
99  * gst-launch-1.0 rtpbin name=rtpbin \
100  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
101  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
102  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
103  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
104  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
105  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
106  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
107  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
108  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
109  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
110  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
111  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
112  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
113  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
114  * is received on port 5007. Since RTCP packets from the sender should be sent
115  * as soon as possible and do not participate in preroll, sync=false and
116  * async=false is configured on udpsink
117  * |[
118  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
119  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
120  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
121  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
122  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
123  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
124  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
125  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
126  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
127  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
128  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
129  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
130  * decode and display the video.
131  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
132  * decode and play the audio.
133  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
134  * session 1 on port 5003. These packets will be used for session management and
135  * synchronisation.
136  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
137  * on port 5007.
138  *
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #include "config.h"
143 #endif
144 #include <stdio.h>
145 #include <string.h>
146
147 #include <gst/rtp/gstrtpbuffer.h>
148 #include <gst/rtp/gstrtcpbuffer.h>
149
150 #include "gstrtpbin.h"
151 #include "rtpsession.h"
152 #include "gstrtpsession.h"
153 #include "gstrtpjitterbuffer.h"
154
155 #include <gst/glib-compat-private.h>
156
157 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
158 #define GST_CAT_DEFAULT gst_rtp_bin_debug
159
160 /* sink pads */
161 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
162     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
163     GST_PAD_SINK,
164     GST_PAD_REQUEST,
165     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
166     );
167
168 /**
169  * GstRtpBin!recv_fec_sink_%u_%u:
170  *
171  * Sink template for receiving Forward Error Correction packets,
172  * in the form recv_fec_sink_<session_idx>_<fec_stream_idx>
173  *
174  * See #GstRTPST_2022_1_FecDec for example usage
175  *
176  * Since: 1.20
177  */
178 static GstStaticPadTemplate rtpbin_recv_fec_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS ("application/x-rtp")
183     );
184
185 /**
186  * GstRtpBin!send_fec_src_%u_%u:
187  *
188  * Src template for sending Forward Error Correction packets,
189  * in the form send_fec_src_<session_idx>_<fec_stream_idx>
190  *
191  * See #GstRTPST_2022_1_FecEnc for example usage
192  *
193  * Since: 1.20
194  */
195 static GstStaticPadTemplate rtpbin_send_fec_src_template =
196 GST_STATIC_PAD_TEMPLATE ("send_fec_src_%u_%u",
197     GST_PAD_SRC,
198     GST_PAD_SOMETIMES,
199     GST_STATIC_CAPS ("application/x-rtp")
200     );
201
202 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
203     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
204     GST_PAD_SINK,
205     GST_PAD_REQUEST,
206     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
207     );
208
209 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
210 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
211     GST_PAD_SINK,
212     GST_PAD_REQUEST,
213     GST_STATIC_CAPS ("application/x-rtp")
214     );
215
216 /* src pads */
217 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
218 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
219     GST_PAD_SRC,
220     GST_PAD_SOMETIMES,
221     GST_STATIC_CAPS ("application/x-rtp")
222     );
223
224 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
225     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
226     GST_PAD_SRC,
227     GST_PAD_REQUEST,
228     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
229     );
230
231 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
232     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
233     GST_PAD_SRC,
234     GST_PAD_SOMETIMES,
235     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
236     );
237
238 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
239 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
240
241 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
242 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
243 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
244
245 /* lock for shutdown */
246 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
247 G_STMT_START {                                   \
248   if (g_atomic_int_get (&bin->priv->shutdown))   \
249     goto label;                                  \
250   GST_RTP_BIN_DYN_LOCK (bin);                    \
251   if (g_atomic_int_get (&bin->priv->shutdown)) { \
252     GST_RTP_BIN_DYN_UNLOCK (bin);                \
253     goto label;                                  \
254   }                                              \
255 } G_STMT_END
256
257 /* unlock for shutdown */
258 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
259   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
260
261 /* Minimum time offset to apply. This compensates for rounding errors in NTP to
262  * RTP timestamp conversions */
263 #define MIN_TS_OFFSET_ROUND_OFF_COMP (4 * GST_MSECOND)
264
265 struct _GstRtpBinPrivate
266 {
267   GMutex bin_lock;
268
269   /* lock protecting dynamic adding/removing */
270   GMutex dyn_lock;
271
272   /* if we are shutting down or not */
273   gint shutdown;
274
275   gboolean autoremove;
276
277   /* NTP time in ns of last SR sync used */
278   guint64 last_ntpnstime;
279
280   /* list of extra elements */
281   GList *elements;
282 };
283
284 /* signals and args */
285 enum
286 {
287   SIGNAL_REQUEST_PT_MAP,
288   SIGNAL_PAYLOAD_TYPE_CHANGE,
289   SIGNAL_CLEAR_PT_MAP,
290   SIGNAL_RESET_SYNC,
291   SIGNAL_GET_SESSION,
292   SIGNAL_GET_INTERNAL_SESSION,
293   SIGNAL_GET_STORAGE,
294   SIGNAL_GET_INTERNAL_STORAGE,
295   SIGNAL_CLEAR_SSRC,
296
297   SIGNAL_ON_NEW_SSRC,
298   SIGNAL_ON_SSRC_COLLISION,
299   SIGNAL_ON_SSRC_VALIDATED,
300   SIGNAL_ON_SSRC_ACTIVE,
301   SIGNAL_ON_SSRC_SDES,
302   SIGNAL_ON_BYE_SSRC,
303   SIGNAL_ON_BYE_TIMEOUT,
304   SIGNAL_ON_TIMEOUT,
305   SIGNAL_ON_SENDER_TIMEOUT,
306   SIGNAL_ON_NPT_STOP,
307
308   SIGNAL_REQUEST_RTP_ENCODER,
309   SIGNAL_REQUEST_RTP_DECODER,
310   SIGNAL_REQUEST_RTCP_ENCODER,
311   SIGNAL_REQUEST_RTCP_DECODER,
312
313   SIGNAL_REQUEST_FEC_DECODER,
314   SIGNAL_REQUEST_FEC_DECODER_FULL,
315   SIGNAL_REQUEST_FEC_ENCODER,
316
317   SIGNAL_REQUEST_JITTERBUFFER,
318
319   SIGNAL_NEW_JITTERBUFFER,
320   SIGNAL_NEW_STORAGE,
321
322   SIGNAL_REQUEST_AUX_SENDER,
323   SIGNAL_REQUEST_AUX_RECEIVER,
324
325   SIGNAL_ON_NEW_SENDER_SSRC,
326   SIGNAL_ON_SENDER_SSRC_ACTIVE,
327
328   SIGNAL_ON_BUNDLED_SSRC,
329
330   LAST_SIGNAL
331 };
332
333 #define DEFAULT_LATENCY_MS           200
334 #define DEFAULT_DROP_ON_LATENCY      FALSE
335 #define DEFAULT_SDES                 NULL
336 #define DEFAULT_DO_LOST              FALSE
337 #define DEFAULT_IGNORE_PT            FALSE
338 #define DEFAULT_NTP_SYNC             FALSE
339 #define DEFAULT_AUTOREMOVE           FALSE
340 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
341 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
342 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
343 #define DEFAULT_RTCP_SYNC_INTERVAL   0
344 #define DEFAULT_DO_SYNC_EVENT        FALSE
345 #define DEFAULT_DO_RETRANSMISSION    FALSE
346 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
347 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
348 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
349 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
350 #define DEFAULT_MAX_DROPOUT_TIME     60000
351 #define DEFAULT_MAX_MISORDER_TIME    2000
352 #define DEFAULT_RFC7273_SYNC         FALSE
353 #define DEFAULT_MAX_STREAMS          G_MAXUINT
354 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0)
355 #define DEFAULT_MAX_TS_OFFSET        G_GINT64_CONSTANT(3000000000)
356 #define DEFAULT_MIN_TS_OFFSET        MIN_TS_OFFSET_ROUND_OFF_COMP
357 #define DEFAULT_TS_OFFSET_SMOOTHING_FACTOR  0
358
359 enum
360 {
361   PROP_0,
362   PROP_LATENCY,
363   PROP_DROP_ON_LATENCY,
364   PROP_SDES,
365   PROP_DO_LOST,
366   PROP_IGNORE_PT,
367   PROP_NTP_SYNC,
368   PROP_RTCP_SYNC,
369   PROP_RTCP_SYNC_INTERVAL,
370   PROP_AUTOREMOVE,
371   PROP_BUFFER_MODE,
372   PROP_USE_PIPELINE_CLOCK,
373   PROP_DO_SYNC_EVENT,
374   PROP_DO_RETRANSMISSION,
375   PROP_RTP_PROFILE,
376   PROP_NTP_TIME_SOURCE,
377   PROP_RTCP_SYNC_SEND_TIME,
378   PROP_MAX_RTCP_RTP_TIME_DIFF,
379   PROP_MAX_DROPOUT_TIME,
380   PROP_MAX_MISORDER_TIME,
381   PROP_RFC7273_SYNC,
382   PROP_MAX_STREAMS,
383   PROP_MAX_TS_OFFSET_ADJUSTMENT,
384   PROP_MAX_TS_OFFSET,
385   PROP_MIN_TS_OFFSET,
386   PROP_TS_OFFSET_SMOOTHING_FACTOR,
387   PROP_FEC_DECODERS,
388   PROP_FEC_ENCODERS,
389 };
390
391 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
392 static GType
393 gst_rtp_bin_rtcp_sync_get_type (void)
394 {
395   static GType rtcp_sync_type = 0;
396   static const GEnumValue rtcp_sync_types[] = {
397     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
398     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
399     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
400     {0, NULL, NULL},
401   };
402
403   if (!rtcp_sync_type) {
404     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
405   }
406   return rtcp_sync_type;
407 }
408
409 /* helper objects */
410 typedef struct _GstRtpBinSession GstRtpBinSession;
411 typedef struct _GstRtpBinStream GstRtpBinStream;
412 typedef struct _GstRtpBinClient GstRtpBinClient;
413
414 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
415
416 static GstCaps *pt_map_requested (GstElement * element, guint pt,
417     GstRtpBinSession * session);
418 static void payload_type_change (GstElement * element, guint pt,
419     GstRtpBinSession * session);
420 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
421 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
422 static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
423 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
424 static void remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
425 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
426 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
427 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
428 static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id);
429 static GstPad *complete_session_sink (GstRtpBin * rtpbin,
430     GstRtpBinSession * session);
431 static void
432 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
433     guint sessid);
434 static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
435     GstRtpBinSession * session, guint sessid);
436 static GstElement *session_request_element (GstRtpBinSession * session,
437     guint signal);
438
439 /* Manages the RTP stream for one SSRC.
440  *
441  * We pipe the stream (coming from the SSRC demuxer) into a jitterbuffer.
442  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
443  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
444  * together (see below).
445  */
446 struct _GstRtpBinStream
447 {
448   /* the SSRC of this stream */
449   guint32 ssrc;
450
451   /* parent bin */
452   GstRtpBin *bin;
453
454   /* the session this SSRC belongs to */
455   GstRtpBinSession *session;
456
457   /* the jitterbuffer of the SSRC */
458   GstElement *buffer;
459   gulong buffer_handlesync_sig;
460   gulong buffer_ptreq_sig;
461   gulong buffer_ntpstop_sig;
462   gint percent;
463
464   /* the PT demuxer of the SSRC */
465   GstElement *demux;
466   gulong demux_newpad_sig;
467   gulong demux_padremoved_sig;
468   gulong demux_ptreq_sig;
469   gulong demux_ptchange_sig;
470
471   /* if we have calculated a valid rt_delta for this stream */
472   gboolean have_sync;
473   /* mapping to local RTP and NTP time */
474   gint64 rt_delta;
475   gint64 rtp_delta;
476   gint64 avg_ts_offset;
477   gboolean is_initialized;
478   /* base rtptime in gst time */
479   gint64 clock_base;
480 };
481
482 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
483 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
484
485 /* Manages the receiving end of the packets.
486  *
487  * There is one such structure for each RTP session (audio/video/...).
488  * We get the RTP/RTCP packets and stuff them into the session manager. From
489  * there they are pushed into an SSRC demuxer that splits the stream based on
490  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
491  * the GstRtpBinStream above).
492  *
493  * Before the SSRC demuxer, a storage element may be inserted for the purpose
494  * of Forward Error Correction.
495  */
496 struct _GstRtpBinSession
497 {
498   /* session id */
499   gint id;
500   /* the parent bin */
501   GstRtpBin *bin;
502   /* the session element */
503   GstElement *session;
504   /* the SSRC demuxer */
505   GstElement *demux;
506   gulong demux_newpad_sig;
507   gulong demux_padremoved_sig;
508
509   /* Fec support */
510   GstElement *storage;
511
512   GMutex lock;
513
514   /* list of GstRtpBinStream */
515   GSList *streams;
516
517   /* list of elements */
518   GSList *elements;
519
520   /* mapping of payload type to caps */
521   GHashTable *ptmap;
522
523   /* the pads of the session */
524   GstPad *recv_rtp_sink;
525   GstPad *recv_rtp_sink_ghost;
526   GstPad *recv_rtp_src;
527   GstPad *recv_rtcp_sink;
528   GstPad *recv_rtcp_sink_ghost;
529   GstPad *sync_src;
530   GstPad *send_rtp_sink;
531   GstPad *send_rtp_sink_ghost;
532   GstPad *send_rtp_src_ghost;
533   GstPad *send_rtcp_src;
534   GstPad *send_rtcp_src_ghost;
535
536   GSList *recv_fec_sinks;
537   GSList *recv_fec_sink_ghosts;
538   /* fec decoder placed before the rtpjitterbuffer but after the rtpssrcdemux.
539    * XXX: This does not yet support multiple ssrc's in the same rtp session
540    */
541   GstElement *early_fec_decoder;
542
543   GSList *send_fec_src_ghosts;
544 };
545
546 /* Manages the RTP streams that come from one client and should therefore be
547  * synchronized.
548  */
549 struct _GstRtpBinClient
550 {
551   /* the common CNAME for the streams */
552   gchar *cname;
553   guint cname_len;
554
555   /* the streams */
556   guint nstreams;
557   GSList *streams;
558 };
559
560 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
561 static GstRtpBinSession *
562 find_session_by_id (GstRtpBin * rtpbin, gint id)
563 {
564   GSList *walk;
565
566   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
567     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
568
569     if (sess->id == id)
570       return sess;
571   }
572   return NULL;
573 }
574
575 static gboolean
576 pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad)
577 {
578   return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL;
579 }
580
581 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
582 static GstRtpBinSession *
583 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
584 {
585   GSList *walk;
586
587   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
588     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
589
590     if ((sess->recv_rtp_sink_ghost == pad) ||
591         (sess->recv_rtcp_sink_ghost == pad) ||
592         (sess->send_rtp_sink_ghost == pad) ||
593         (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad))
594       return sess;
595   }
596   return NULL;
597 }
598
599 static void
600 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
601 {
602   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
603       sess->id, ssrc);
604 }
605
606 static void
607 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
608 {
609   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
610       sess->id, ssrc);
611 }
612
613 static void
614 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
615 {
616   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
617       sess->id, ssrc);
618 }
619
620 static void
621 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
622 {
623   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
624       sess->id, ssrc);
625 }
626
627 static void
628 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
629 {
630   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
631       sess->id, ssrc);
632 }
633
634 static void
635 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
636 {
637   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
638       sess->id, ssrc);
639 }
640
641 static void
642 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
643 {
644   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
645       sess->id, ssrc);
646
647   if (sess->bin->priv->autoremove)
648     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
649 }
650
651 static void
652 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
653 {
654   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
655       sess->id, ssrc);
656
657   if (sess->bin->priv->autoremove)
658     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
659 }
660
661 static void
662 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
663 {
664   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
665       sess->id, ssrc);
666 }
667
668 static void
669 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
670 {
671   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
672       stream->session->id, stream->ssrc);
673 }
674
675 static void
676 on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
677 {
678   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
679       sess->id, ssrc);
680 }
681
682 static void
683 on_sender_ssrc_active (GstElement * session, guint32 ssrc,
684     GstRtpBinSession * sess)
685 {
686   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE],
687       0, sess->id, ssrc);
688 }
689
690 /* must be called with the SESSION lock */
691 static GstRtpBinStream *
692 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
693 {
694   GSList *walk;
695
696   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
697     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
698
699     if (stream->ssrc == ssrc)
700       return stream;
701   }
702   return NULL;
703 }
704
705 static void
706 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
707     GstRtpBinSession * session)
708 {
709   GstRtpBinStream *stream = NULL;
710   GstRtpBin *rtpbin;
711
712   rtpbin = session->bin;
713
714   GST_RTP_BIN_LOCK (rtpbin);
715
716   GST_RTP_SESSION_LOCK (session);
717   if ((stream = find_stream_by_ssrc (session, ssrc)))
718     session->streams = g_slist_remove (session->streams, stream);
719   GST_RTP_SESSION_UNLOCK (session);
720
721   if (stream)
722     free_stream (stream, rtpbin);
723
724   GST_RTP_BIN_UNLOCK (rtpbin);
725 }
726
727 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
728 static GstRtpBinSession *
729 create_session (GstRtpBin * rtpbin, gint id)
730 {
731   GstRtpBinSession *sess;
732   GstElement *session, *demux;
733   GstElement *storage = NULL;
734   GstState target;
735
736   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
737     goto no_session;
738
739   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
740     goto no_demux;
741
742   if (!(storage = gst_element_factory_make ("rtpstorage", NULL)))
743     goto no_storage;
744
745   /* need to sink the storage or otherwise signal handlers from bindings will
746    * take ownership of it and we don't own it anymore */
747   gst_object_ref_sink (storage);
748   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_STORAGE], 0, storage,
749       id);
750
751   sess = g_new0 (GstRtpBinSession, 1);
752   g_mutex_init (&sess->lock);
753   sess->id = id;
754   sess->bin = rtpbin;
755   sess->session = session;
756   sess->demux = demux;
757   sess->storage = storage;
758
759   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
760       (GDestroyNotify) gst_caps_unref);
761   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
762
763   /* configure SDES items */
764   GST_OBJECT_LOCK (rtpbin);
765   g_object_set (demux, "max-streams", rtpbin->max_streams, NULL);
766   g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile",
767       rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time,
768       NULL);
769   if (rtpbin->use_pipeline_clock)
770     g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock,
771         NULL);
772   else
773     g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL);
774
775   g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time,
776       "max-misorder-time", rtpbin->max_misorder_time, NULL);
777   GST_OBJECT_UNLOCK (rtpbin);
778
779   /* provide clock_rate to the session manager when needed */
780   g_signal_connect (session, "request-pt-map",
781       (GCallback) pt_map_requested, sess);
782
783   g_signal_connect (sess->session, "on-new-ssrc",
784       (GCallback) on_new_ssrc, sess);
785   g_signal_connect (sess->session, "on-ssrc-collision",
786       (GCallback) on_ssrc_collision, sess);
787   g_signal_connect (sess->session, "on-ssrc-validated",
788       (GCallback) on_ssrc_validated, sess);
789   g_signal_connect (sess->session, "on-ssrc-active",
790       (GCallback) on_ssrc_active, sess);
791   g_signal_connect (sess->session, "on-ssrc-sdes",
792       (GCallback) on_ssrc_sdes, sess);
793   g_signal_connect (sess->session, "on-bye-ssrc",
794       (GCallback) on_bye_ssrc, sess);
795   g_signal_connect (sess->session, "on-bye-timeout",
796       (GCallback) on_bye_timeout, sess);
797   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
798   g_signal_connect (sess->session, "on-sender-timeout",
799       (GCallback) on_sender_timeout, sess);
800   g_signal_connect (sess->session, "on-new-sender-ssrc",
801       (GCallback) on_new_sender_ssrc, sess);
802   g_signal_connect (sess->session, "on-sender-ssrc-active",
803       (GCallback) on_sender_ssrc_active, sess);
804
805   gst_bin_add (GST_BIN_CAST (rtpbin), session);
806   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
807   gst_bin_add (GST_BIN_CAST (rtpbin), storage);
808
809   /* unref the storage again, the bin has a reference now and
810    * we don't need it anymore */
811   gst_object_unref (storage);
812
813   GST_OBJECT_LOCK (rtpbin);
814   target = GST_STATE_TARGET (rtpbin);
815   GST_OBJECT_UNLOCK (rtpbin);
816
817   /* change state only to what's needed */
818   gst_element_set_state (demux, target);
819   gst_element_set_state (session, target);
820   gst_element_set_state (storage, target);
821
822   return sess;
823
824   /* ERRORS */
825 no_session:
826   {
827     g_warning ("rtpbin: could not create rtpsession element");
828     return NULL;
829   }
830 no_demux:
831   {
832     gst_object_unref (session);
833     g_warning ("rtpbin: could not create rtpssrcdemux element");
834     return NULL;
835   }
836 no_storage:
837   {
838     gst_object_unref (session);
839     gst_object_unref (demux);
840     g_warning ("rtpbin: could not create rtpstorage element");
841     return NULL;
842   }
843 }
844
845 static gboolean
846 bin_manage_element (GstRtpBin * bin, GstElement * element)
847 {
848   GstRtpBinPrivate *priv = bin->priv;
849
850   if (g_list_find (priv->elements, element)) {
851     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
852   } else {
853     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
854
855     if (g_object_is_floating (element))
856       element = gst_object_ref_sink (element);
857
858     if (!gst_bin_add (GST_BIN_CAST (bin), element))
859       goto add_failed;
860     if (!gst_element_sync_state_with_parent (element))
861       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
862   }
863   /* we add the element multiple times, each we need an equal number of
864    * removes to really remove the element from the bin */
865   priv->elements = g_list_prepend (priv->elements, element);
866
867   return TRUE;
868
869   /* ERRORS */
870 add_failed:
871   {
872     GST_WARNING_OBJECT (bin, "unable to add element");
873     gst_object_unref (element);
874     return FALSE;
875   }
876 }
877
878 static void
879 remove_bin_element (GstElement * element, GstRtpBin * bin)
880 {
881   GstRtpBinPrivate *priv = bin->priv;
882   GList *find;
883
884   find = g_list_find (priv->elements, element);
885   if (find) {
886     priv->elements = g_list_delete_link (priv->elements, find);
887
888     if (!g_list_find (priv->elements, element)) {
889       gst_element_set_locked_state (element, TRUE);
890       gst_bin_remove (GST_BIN_CAST (bin), element);
891       gst_element_set_state (element, GST_STATE_NULL);
892     }
893
894     gst_object_unref (element);
895   }
896 }
897
898 /* called with RTP_BIN_LOCK */
899 static void
900 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
901 {
902   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
903
904   gst_element_set_locked_state (sess->demux, TRUE);
905   gst_element_set_locked_state (sess->session, TRUE);
906   gst_element_set_locked_state (sess->storage, TRUE);
907
908   gst_element_set_state (sess->demux, GST_STATE_NULL);
909   gst_element_set_state (sess->session, GST_STATE_NULL);
910   gst_element_set_state (sess->storage, GST_STATE_NULL);
911
912   remove_recv_rtp (bin, sess);
913   remove_recv_rtcp (bin, sess);
914   remove_recv_fec (bin, sess);
915   remove_send_rtp (bin, sess);
916   remove_send_fec (bin, sess);
917   remove_rtcp (bin, sess);
918
919   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
920   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
921   gst_bin_remove (GST_BIN_CAST (bin), sess->storage);
922
923   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
924   g_slist_free (sess->elements);
925   sess->elements = NULL;
926
927   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
928   g_slist_free (sess->streams);
929
930   g_mutex_clear (&sess->lock);
931   g_hash_table_destroy (sess->ptmap);
932
933   g_free (sess);
934 }
935
936 /* get the payload type caps for the specific payload @pt in @session */
937 static GstCaps *
938 get_pt_map (GstRtpBinSession * session, guint pt)
939 {
940   GstCaps *caps = NULL;
941   GstRtpBin *bin;
942   GValue ret = { 0 };
943   GValue args[3] = { {0}, {0}, {0} };
944
945   GST_DEBUG ("searching pt %u in cache", pt);
946
947   GST_RTP_SESSION_LOCK (session);
948
949   /* first look in the cache */
950   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
951   if (caps) {
952     gst_caps_ref (caps);
953     goto done;
954   }
955
956   bin = session->bin;
957
958   GST_DEBUG ("emitting signal for pt %u in session %u", pt, session->id);
959
960   /* not in cache, send signal to request caps */
961   g_value_init (&args[0], GST_TYPE_ELEMENT);
962   g_value_set_object (&args[0], bin);
963   g_value_init (&args[1], G_TYPE_UINT);
964   g_value_set_uint (&args[1], session->id);
965   g_value_init (&args[2], G_TYPE_UINT);
966   g_value_set_uint (&args[2], pt);
967
968   g_value_init (&ret, GST_TYPE_CAPS);
969   g_value_set_boxed (&ret, NULL);
970
971   GST_RTP_SESSION_UNLOCK (session);
972
973   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
974
975   GST_RTP_SESSION_LOCK (session);
976
977   g_value_unset (&args[0]);
978   g_value_unset (&args[1]);
979   g_value_unset (&args[2]);
980
981   /* look in the cache again because we let the lock go */
982   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
983   if (caps) {
984     gst_caps_ref (caps);
985     g_value_unset (&ret);
986     goto done;
987   }
988
989   caps = (GstCaps *) g_value_dup_boxed (&ret);
990   g_value_unset (&ret);
991   if (!caps)
992     goto no_caps;
993
994   GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps);
995
996   /* store in cache, take additional ref */
997   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
998       gst_caps_ref (caps));
999
1000 done:
1001   GST_RTP_SESSION_UNLOCK (session);
1002
1003   return caps;
1004
1005   /* ERRORS */
1006 no_caps:
1007   {
1008     GST_RTP_SESSION_UNLOCK (session);
1009     GST_DEBUG ("no pt map could be obtained");
1010     return NULL;
1011   }
1012 }
1013
1014 static gboolean
1015 return_true (gpointer key, gpointer value, gpointer user_data)
1016 {
1017   return TRUE;
1018 }
1019
1020 static void
1021 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
1022 {
1023   GSList *clients, *streams;
1024
1025   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
1026
1027   GST_RTP_BIN_LOCK (rtpbin);
1028   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
1029     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1030
1031     /* reset sync on all streams for this client */
1032     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
1033       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1034
1035       /* make use require a new SR packet for this stream before we attempt new
1036        * lip-sync */
1037       stream->have_sync = FALSE;
1038       stream->rt_delta = 0;
1039       stream->avg_ts_offset = 0;
1040       stream->is_initialized = FALSE;
1041       stream->rtp_delta = 0;
1042       stream->clock_base = -100 * GST_SECOND;
1043     }
1044   }
1045   GST_RTP_BIN_UNLOCK (rtpbin);
1046 }
1047
1048 static void
1049 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
1050 {
1051   GSList *sessions, *streams;
1052
1053   GST_RTP_BIN_LOCK (bin);
1054   GST_DEBUG_OBJECT (bin, "clearing pt map");
1055   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1056     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1057
1058     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
1059     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
1060
1061     GST_RTP_SESSION_LOCK (session);
1062     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
1063
1064     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1065       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1066
1067       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
1068       if (g_signal_lookup ("clear-pt-map", G_OBJECT_TYPE (stream->buffer)) != 0)
1069         g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
1070       if (stream->demux)
1071         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
1072     }
1073     GST_RTP_SESSION_UNLOCK (session);
1074   }
1075   GST_RTP_BIN_UNLOCK (bin);
1076
1077   /* reset sync too */
1078   gst_rtp_bin_reset_sync (bin);
1079 }
1080
1081 static GstElement *
1082 gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id)
1083 {
1084   GstRtpBinSession *session;
1085   GstElement *ret = NULL;
1086
1087   GST_RTP_BIN_LOCK (bin);
1088   GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id);
1089   session = find_session_by_id (bin, (gint) session_id);
1090   if (session) {
1091     ret = gst_object_ref (session->session);
1092   }
1093   GST_RTP_BIN_UNLOCK (bin);
1094
1095   return ret;
1096 }
1097
1098 static RTPSession *
1099 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
1100 {
1101   RTPSession *internal_session = NULL;
1102   GstRtpBinSession *session;
1103
1104   GST_RTP_BIN_LOCK (bin);
1105   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u",
1106       session_id);
1107   session = find_session_by_id (bin, (gint) session_id);
1108   if (session) {
1109     g_object_get (session->session, "internal-session", &internal_session,
1110         NULL);
1111   }
1112   GST_RTP_BIN_UNLOCK (bin);
1113
1114   return internal_session;
1115 }
1116
1117 static GstElement *
1118 gst_rtp_bin_get_storage (GstRtpBin * bin, guint session_id)
1119 {
1120   GstRtpBinSession *session;
1121   GstElement *res = NULL;
1122
1123   GST_RTP_BIN_LOCK (bin);
1124   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1125       session_id);
1126   session = find_session_by_id (bin, (gint) session_id);
1127   if (session && session->storage) {
1128     res = gst_object_ref (session->storage);
1129   }
1130   GST_RTP_BIN_UNLOCK (bin);
1131
1132   return res;
1133 }
1134
1135 static GObject *
1136 gst_rtp_bin_get_internal_storage (GstRtpBin * bin, guint session_id)
1137 {
1138   GObject *internal_storage = NULL;
1139   GstRtpBinSession *session;
1140
1141   GST_RTP_BIN_LOCK (bin);
1142   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1143       session_id);
1144   session = find_session_by_id (bin, (gint) session_id);
1145   if (session && session->storage) {
1146     g_object_get (session->storage, "internal-storage", &internal_storage,
1147         NULL);
1148   }
1149   GST_RTP_BIN_UNLOCK (bin);
1150
1151   return internal_storage;
1152 }
1153
1154 static void
1155 gst_rtp_bin_clear_ssrc (GstRtpBin * bin, guint session_id, guint32 ssrc)
1156 {
1157   GstRtpBinSession *session;
1158   GstElement *demux = NULL;
1159
1160   GST_RTP_BIN_LOCK (bin);
1161   GST_DEBUG_OBJECT (bin, "clearing ssrc %u for session %u", ssrc, session_id);
1162   session = find_session_by_id (bin, (gint) session_id);
1163   if (session)
1164     demux = gst_object_ref (session->demux);
1165   GST_RTP_BIN_UNLOCK (bin);
1166
1167   if (demux) {
1168     g_signal_emit_by_name (demux, "clear-ssrc", ssrc, NULL);
1169     gst_object_unref (demux);
1170   }
1171 }
1172
1173 static GstElement *
1174 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
1175 {
1176   GST_DEBUG_OBJECT (bin, "return NULL encoder");
1177   return NULL;
1178 }
1179
1180 static GstElement *
1181 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
1182 {
1183   GST_DEBUG_OBJECT (bin, "return NULL decoder");
1184   return NULL;
1185 }
1186
1187 static GstElement *
1188 gst_rtp_bin_request_jitterbuffer (GstRtpBin * bin, guint session_id)
1189 {
1190   return gst_element_factory_make ("rtpjitterbuffer", NULL);
1191 }
1192
1193 static void
1194 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
1195     const gchar * name, const GValue * value)
1196 {
1197   GSList *sessions, *streams;
1198
1199   GST_RTP_BIN_LOCK (bin);
1200   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1201     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1202
1203     GST_RTP_SESSION_LOCK (session);
1204     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1205       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1206       GObjectClass *jb_class;
1207
1208       jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1209       if (g_object_class_find_property (jb_class, name))
1210         g_object_set_property (G_OBJECT (stream->buffer), name, value);
1211       else
1212         GST_WARNING_OBJECT (bin,
1213             "Stream jitterbuffer does not expose property %s", name);
1214     }
1215     GST_RTP_SESSION_UNLOCK (session);
1216   }
1217   GST_RTP_BIN_UNLOCK (bin);
1218 }
1219
1220 static void
1221 gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin,
1222     const gchar * name, const GValue * value)
1223 {
1224   GSList *sessions;
1225
1226   GST_RTP_BIN_LOCK (bin);
1227   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1228     GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data;
1229
1230     g_object_set_property (G_OBJECT (sess->session), name, value);
1231   }
1232   GST_RTP_BIN_UNLOCK (bin);
1233 }
1234
1235 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
1236 static GstRtpBinClient *
1237 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
1238 {
1239   GstRtpBinClient *result = NULL;
1240   GSList *walk;
1241
1242   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
1243     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
1244
1245     if (len != client->cname_len)
1246       continue;
1247
1248     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
1249       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
1250           client->cname);
1251       result = client;
1252       break;
1253     }
1254   }
1255
1256   /* nothing found, create one */
1257   if (result == NULL) {
1258     result = g_new0 (GstRtpBinClient, 1);
1259     result->cname = g_strndup ((gchar *) data, len);
1260     result->cname_len = len;
1261     bin->clients = g_slist_prepend (bin->clients, result);
1262     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
1263         result->cname);
1264   }
1265   return result;
1266 }
1267
1268 static void
1269 free_client (GstRtpBinClient * client, GstRtpBin * bin)
1270 {
1271   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
1272   g_slist_free (client->streams);
1273   g_free (client->cname);
1274   g_free (client);
1275 }
1276
1277 static void
1278 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
1279     guint64 * ntpnstime)
1280 {
1281   guint64 ntpns = -1;
1282   GstClock *clock;
1283   GstClockTime base_time, rt, clock_time;
1284
1285   GST_OBJECT_LOCK (bin);
1286   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1287     base_time = GST_ELEMENT_CAST (bin)->base_time;
1288     gst_object_ref (clock);
1289     GST_OBJECT_UNLOCK (bin);
1290
1291     /* get current clock time and convert to running time */
1292     clock_time = gst_clock_get_time (clock);
1293     rt = clock_time - base_time;
1294
1295     if (bin->use_pipeline_clock) {
1296       ntpns = rt;
1297       /* add constant to convert from 1970 based time to 1900 based time */
1298       ntpns += (2208988800LL * GST_SECOND);
1299     } else {
1300       switch (bin->ntp_time_source) {
1301         case GST_RTP_NTP_TIME_SOURCE_NTP:
1302         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1303           /* get current NTP time */
1304           ntpns = g_get_real_time () * GST_USECOND;
1305
1306           /* add constant to convert from 1970 based time to 1900 based time */
1307           if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1308             ntpns += (2208988800LL * GST_SECOND);
1309           break;
1310         }
1311         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1312           ntpns = rt;
1313           break;
1314         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1315           ntpns = clock_time;
1316           break;
1317         default:
1318           ntpns = -1;           /* Fix uninited compiler warning */
1319           g_assert_not_reached ();
1320           break;
1321       }
1322     }
1323
1324     gst_object_unref (clock);
1325   } else {
1326     GST_OBJECT_UNLOCK (bin);
1327     rt = -1;
1328     ntpns = -1;
1329   }
1330   if (running_time)
1331     *running_time = rt;
1332   if (ntpnstime)
1333     *ntpnstime = ntpns;
1334 }
1335
1336 static void
1337 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1338     gint64 ts_offset, gint64 max_ts_offset, guint64 min_ts_offset,
1339     gboolean allow_positive_ts_offset)
1340 {
1341   gint64 prev_ts_offset;
1342   GObjectClass *jb_class;
1343
1344   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1345
1346   if (!g_object_class_find_property (jb_class, "ts-offset")) {
1347     GST_LOG_OBJECT (bin,
1348         "stream's jitterbuffer does not expose ts-offset property");
1349     return;
1350   }
1351
1352   if (bin->ts_offset_smoothing_factor > 0) {
1353     if (!stream->is_initialized) {
1354       stream->avg_ts_offset = ts_offset;
1355       stream->is_initialized = TRUE;
1356     } else {
1357       /* RMA algorithm using smoothing factor is following, but split into
1358        * parts to check for overflows:
1359        * stream->avg_ts_offset =
1360        *   ((bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset
1361        *    + ts_offset) / bin->ts_offset_smoothing_factor
1362        */
1363       guint64 max_possible_smoothing_factor =
1364           G_MAXINT64 / ABS (stream->avg_ts_offset);
1365       gint64 cur_avg_product =
1366           (bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset;
1367
1368       if ((max_possible_smoothing_factor < bin->ts_offset_smoothing_factor) ||
1369           (cur_avg_product > 0 && G_MAXINT64 - cur_avg_product < ts_offset) ||
1370           (cur_avg_product < 0 && G_MININT64 - cur_avg_product > ts_offset)) {
1371         GST_WARNING_OBJECT (bin,
1372             "ts-offset-smoothing-factor calculation overflow, fallback to using ts-offset directly");
1373         stream->avg_ts_offset = ts_offset;
1374       } else {
1375         stream->avg_ts_offset =
1376             (cur_avg_product + ts_offset) / bin->ts_offset_smoothing_factor;
1377       }
1378     }
1379   } else {
1380     stream->avg_ts_offset = ts_offset;
1381   }
1382
1383   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1384
1385   /* delta changed, see how much */
1386   if (prev_ts_offset != stream->avg_ts_offset) {
1387     gint64 diff;
1388
1389     diff = prev_ts_offset - stream->avg_ts_offset;
1390
1391     GST_DEBUG_OBJECT (bin,
1392         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1393         ", diff: %" G_GINT64_FORMAT, stream->avg_ts_offset, prev_ts_offset,
1394         diff);
1395
1396     /* ignore minor offsets */
1397     if (ABS (diff) < min_ts_offset) {
1398       GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1399       return;
1400     }
1401
1402     /* sanity check offset */
1403     if (max_ts_offset > 0) {
1404       if (stream->avg_ts_offset > 0 && !allow_positive_ts_offset) {
1405         GST_DEBUG_OBJECT (bin,
1406             "offset is positive (clocks are out of sync), ignoring");
1407         return;
1408       }
1409       if (ABS (stream->avg_ts_offset) > max_ts_offset) {
1410         GST_DEBUG_OBJECT (bin, "offset too large, ignoring");
1411         return;
1412       }
1413     }
1414
1415     g_object_set (stream->buffer, "ts-offset", stream->avg_ts_offset, NULL);
1416   }
1417   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1418       stream->ssrc, stream->avg_ts_offset);
1419 }
1420
1421 static void
1422 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1423 {
1424   if (stream->bin->send_sync_event) {
1425     GstEvent *event;
1426     GstPad *srcpad;
1427
1428     GST_DEBUG_OBJECT (stream->bin,
1429         "sending GstRTCPSRReceived event downstream");
1430
1431     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1432         gst_structure_new_empty ("GstRTCPSRReceived"));
1433
1434     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1435     gst_pad_push_event (srcpad, event);
1436     gst_object_unref (srcpad);
1437   }
1438 }
1439
1440 /* associate a stream to the given CNAME. This will make sure all streams for
1441  * that CNAME are synchronized together.
1442  * Must be called with GST_RTP_BIN_LOCK */
1443 static void
1444 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1445     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1446     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1447     gint64 rtp_clock_base)
1448 {
1449   GstRtpBinClient *client;
1450   gboolean created;
1451   GSList *walk;
1452   GstClockTime running_time, running_time_rtp;
1453   guint64 ntpnstime;
1454
1455   /* first find or create the CNAME */
1456   client = get_client (bin, len, data, &created);
1457
1458   /* find stream in the client */
1459   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1460     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1461
1462     if (ostream == stream)
1463       break;
1464   }
1465   /* not found, add it to the list */
1466   if (walk == NULL) {
1467     GST_DEBUG_OBJECT (bin,
1468         "new association of SSRC %08x with client %p with CNAME %s",
1469         stream->ssrc, client, client->cname);
1470     client->streams = g_slist_prepend (client->streams, stream);
1471     client->nstreams++;
1472   } else {
1473     GST_DEBUG_OBJECT (bin,
1474         "found association of SSRC %08x with client %p with CNAME %s",
1475         stream->ssrc, client, client->cname);
1476   }
1477
1478   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1479     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1480     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1481       /* we don't need that data, so carry on,
1482        * but make some values look saner */
1483       last_extrtptime = base_rtptime;
1484     } else {
1485       /* nothing we can do with this data in this case */
1486       GST_DEBUG_OBJECT (bin, "bailing out");
1487       return;
1488     }
1489   }
1490
1491   /* Take the extended rtptime we found in the SR packet and map it to the
1492    * local rtptime. The local rtp time is used to construct timestamps on the
1493    * buffers so we will calculate what running_time corresponds to the RTP
1494    * timestamp in the SR packet. */
1495   running_time_rtp = last_extrtptime - base_rtptime;
1496
1497   GST_DEBUG_OBJECT (bin,
1498       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1499       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1500       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1501       last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
1502
1503   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1504    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1505    * into a corresponding gstreamer timestamp. Note that the base_time also
1506    * contains the drift between sender and receiver. */
1507   running_time =
1508       gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
1509   running_time += base_time;
1510
1511   /* convert ntptime to nanoseconds */
1512   ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND,
1513       (G_GINT64_CONSTANT (1) << 32));
1514
1515   stream->have_sync = TRUE;
1516
1517   GST_DEBUG_OBJECT (bin,
1518       "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
1519       running_time, ntpnstime);
1520
1521   /* recalc inter stream playout offset, but only if there is more than one
1522    * stream or we're doing NTP sync. */
1523   if (bin->ntp_sync) {
1524     gint64 ntpdiff, rtdiff;
1525     guint64 local_ntpnstime;
1526     GstClockTime local_running_time;
1527
1528     /* For NTP sync we need to first get a snapshot of running_time and NTP
1529      * time. We know at what running_time we play a certain RTP time, we also
1530      * calculated when we would play the RTP time in the SR packet. Now we need
1531      * to know how the running_time and the NTP time relate to each other. */
1532     get_current_times (bin, &local_running_time, &local_ntpnstime);
1533
1534     /* see how far away the NTP time is. This is the difference between the
1535      * current NTP time and the NTP time in the last SR packet. */
1536     ntpdiff = local_ntpnstime - ntpnstime;
1537     /* see how far away the running_time is. This is the difference between the
1538      * current running_time and the running_time of the RTP timestamp in the
1539      * last SR packet. */
1540     rtdiff = local_running_time - running_time;
1541
1542     GST_DEBUG_OBJECT (bin,
1543         "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
1544         local_ntpnstime, ntpnstime);
1545     GST_DEBUG_OBJECT (bin,
1546         "local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
1547         G_GUINT64_FORMAT, local_running_time, running_time);
1548     GST_DEBUG_OBJECT (bin,
1549         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1550         rtdiff);
1551
1552     /* combine to get the final diff to apply to the running_time */
1553     stream->rt_delta = rtdiff - ntpdiff;
1554
1555     stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset,
1556         bin->min_ts_offset, FALSE);
1557   } else {
1558     gint64 min, rtp_min, clock_base = stream->clock_base;
1559     gboolean all_sync, use_rtp;
1560     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1561
1562     /* calculate delta between server and receiver. ntpnstime is created by
1563      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1564      * delta expresses the difference to our timeline and the server timeline. The
1565      * difference in itself doesn't mean much but we can combine the delta of
1566      * multiple streams to create a stream specific offset. */
1567     stream->rt_delta = ntpnstime - running_time;
1568
1569     /* calculate the min of all deltas, ignoring streams that did not yet have a
1570      * valid rt_delta because we did not yet receive an SR packet for those
1571      * streams.
1572      * We calculate the minimum because we would like to only apply positive
1573      * offsets to streams, delaying their playback instead of trying to speed up
1574      * other streams (which might be impossible when we have to create negative
1575      * latencies).
1576      * The stream that has the smallest diff is selected as the reference stream,
1577      * all other streams will have a positive offset to this difference. */
1578
1579     /* some alternative setting allow ignoring RTCP as much as possible,
1580      * for servers generating bogus ntp timeline */
1581     min = rtp_min = G_MAXINT64;
1582     use_rtp = FALSE;
1583     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1584       guint64 ext_base;
1585
1586       use_rtp = TRUE;
1587       /* signed version for convenience */
1588       clock_base = base_rtptime;
1589       /* deal with possible wrap-around */
1590       ext_base = base_rtptime;
1591       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1592       /* sanity check; base rtp and provided clock_base should be close */
1593       if (rtp_clock_base >= clock_base) {
1594         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1595           rtp_clock_base = base_time +
1596               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1597               GST_SECOND, clock_rate);
1598         } else {
1599           use_rtp = FALSE;
1600         }
1601       } else {
1602         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1603           rtp_clock_base = base_time -
1604               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1605               GST_SECOND, clock_rate);
1606         } else {
1607           use_rtp = FALSE;
1608         }
1609       }
1610       /* warn and bail for clarity out if no sane values */
1611       if (!use_rtp) {
1612         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1613         return;
1614       }
1615       /* store to track changes */
1616       clock_base = rtp_clock_base;
1617       /* generate a fake as before,
1618        * now equating rtptime obtained from RTP-Info,
1619        * where the large time represent the otherwise irrelevant npt/ntp time */
1620       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1621     } else {
1622       clock_base = rtp_clock_base;
1623     }
1624
1625     all_sync = TRUE;
1626     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1627       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1628
1629       if (!ostream->have_sync) {
1630         all_sync = FALSE;
1631         continue;
1632       }
1633
1634       /* change in current stream's base from previously init'ed value
1635        * leads to reset of all stream's base */
1636       if (stream != ostream && stream->clock_base >= 0 &&
1637           (stream->clock_base != clock_base)) {
1638         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1639         ostream->clock_base = -100 * GST_SECOND;
1640         ostream->rtp_delta = 0;
1641       }
1642
1643       if (ostream->rt_delta < min)
1644         min = ostream->rt_delta;
1645       if (ostream->rtp_delta < rtp_min)
1646         rtp_min = ostream->rtp_delta;
1647     }
1648
1649     /* arrange to re-sync for each stream upon significant change,
1650      * e.g. post-seek */
1651     all_sync = all_sync && (stream->clock_base == clock_base);
1652     stream->clock_base = clock_base;
1653
1654     /* may need init performed above later on, but nothing more to do now */
1655     if (client->nstreams <= 1)
1656       return;
1657
1658     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1659         " all sync %d", client, min, all_sync);
1660     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1661
1662     switch (rtcp_sync) {
1663       case GST_RTP_BIN_RTCP_SYNC_RTP:
1664         if (!use_rtp)
1665           break;
1666         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1667             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1668         /* fall-through */
1669       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1670         /* if all have been synced already, do not bother further */
1671         if (all_sync) {
1672           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1673           return;
1674         }
1675         break;
1676       default:
1677         break;
1678     }
1679
1680     /* bail out if we adjusted recently enough */
1681     if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
1682         bin->rtcp_sync_interval * GST_MSECOND) {
1683       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1684           "previous sender info too recent "
1685           "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
1686       return;
1687     }
1688     bin->priv->last_ntpnstime = ntpnstime;
1689
1690     /* calculate offsets for each stream */
1691     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1692       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1693       gint64 ts_offset;
1694
1695       /* ignore streams for which we didn't receive an SR packet yet, we
1696        * can't synchronize them yet. We can however sync other streams just
1697        * fine. */
1698       if (!ostream->have_sync)
1699         continue;
1700
1701       /* calculate offset to our reference stream, this should always give a
1702        * positive number. */
1703       if (use_rtp)
1704         ts_offset = ostream->rtp_delta - rtp_min;
1705       else
1706         ts_offset = ostream->rt_delta - min;
1707
1708       stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
1709           bin->min_ts_offset, TRUE);
1710     }
1711   }
1712   gst_rtp_bin_send_sync_event (stream);
1713
1714   return;
1715 }
1716
1717 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1718   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1719           (b) = gst_rtcp_packet_move_to_next ((packet)))
1720
1721 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1722   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1723           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1724
1725 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1726   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1727           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1728
1729 static void
1730 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1731     GstRtpBinStream * stream)
1732 {
1733   GstRtpBin *bin;
1734   GstRTCPPacket packet;
1735   guint32 ssrc;
1736   guint64 ntptime;
1737   gboolean have_sr, have_sdes;
1738   gboolean more;
1739   guint64 base_rtptime;
1740   guint64 base_time;
1741   guint clock_rate;
1742   guint64 clock_base;
1743   guint64 extrtptime;
1744   GstBuffer *buffer;
1745   GstRTCPBuffer rtcp = { NULL, };
1746
1747   bin = stream->bin;
1748
1749   GST_DEBUG_OBJECT (bin, "sync handler called");
1750
1751   /* get the last relation between the rtp timestamps and the gstreamer
1752    * timestamps. We get this info directly from the jitterbuffer which
1753    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1754    * what the current situation is. */
1755   base_rtptime =
1756       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1757   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1758   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1759   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1760   extrtptime =
1761       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1762   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1763
1764   have_sr = FALSE;
1765   have_sdes = FALSE;
1766
1767   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1768
1769   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1770     /* first packet must be SR or RR or else the validate would have failed */
1771     switch (gst_rtcp_packet_get_type (&packet)) {
1772       case GST_RTCP_TYPE_SR:
1773         /* only parse first. There is only supposed to be one SR in the packet
1774          * but we will deal with malformed packets gracefully */
1775         if (have_sr)
1776           break;
1777         /* get NTP and RTP times */
1778         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1779             NULL, NULL);
1780
1781         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1782         /* ignore SR that is not ours */
1783         if (ssrc != stream->ssrc)
1784           continue;
1785
1786         have_sr = TRUE;
1787         break;
1788       case GST_RTCP_TYPE_SDES:
1789       {
1790         gboolean more_items, more_entries;
1791
1792         /* only deal with first SDES, there is only supposed to be one SDES in
1793          * the RTCP packet but we deal with bad packets gracefully. Also bail
1794          * out if we have not seen an SR item yet. */
1795         if (have_sdes || !have_sr)
1796           break;
1797
1798         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1799           /* skip items that are not about the SSRC of the sender */
1800           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1801             continue;
1802
1803           /* find the CNAME entry */
1804           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1805             GstRTCPSDESType type;
1806             guint8 len;
1807             guint8 *data;
1808
1809             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1810
1811             if (type == GST_RTCP_SDES_CNAME) {
1812               GST_RTP_BIN_LOCK (bin);
1813               /* associate the stream to CNAME */
1814               gst_rtp_bin_associate (bin, stream, len, data,
1815                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1816                   clock_base);
1817               GST_RTP_BIN_UNLOCK (bin);
1818             }
1819           }
1820         }
1821         have_sdes = TRUE;
1822         break;
1823       }
1824       default:
1825         /* we can ignore these packets */
1826         break;
1827     }
1828   }
1829   gst_rtcp_buffer_unmap (&rtcp);
1830 }
1831
1832 /* create a new stream with @ssrc in @session. Must be called with
1833  * RTP_SESSION_LOCK. */
1834 static GstRtpBinStream *
1835 create_stream (GstRtpBinSession * session, guint32 ssrc)
1836 {
1837   GstElement *buffer, *demux = NULL;
1838   GstRtpBinStream *stream;
1839   GstRtpBin *rtpbin;
1840   GstState target;
1841   GObjectClass *jb_class;
1842
1843   rtpbin = session->bin;
1844
1845   if (g_slist_length (session->streams) >= rtpbin->max_streams)
1846     goto max_streams;
1847
1848   if (!(buffer =
1849           session_request_element (session, SIGNAL_REQUEST_JITTERBUFFER)))
1850     goto no_jitterbuffer;
1851
1852   if (!rtpbin->ignore_pt) {
1853     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1854       goto no_demux;
1855   }
1856
1857   stream = g_new0 (GstRtpBinStream, 1);
1858   stream->ssrc = ssrc;
1859   stream->bin = rtpbin;
1860   stream->session = session;
1861   stream->buffer = gst_object_ref (buffer);
1862   stream->demux = demux;
1863
1864   stream->have_sync = FALSE;
1865   stream->rt_delta = 0;
1866   stream->avg_ts_offset = 0;
1867   stream->is_initialized = FALSE;
1868   stream->rtp_delta = 0;
1869   stream->percent = 100;
1870   stream->clock_base = -100 * GST_SECOND;
1871   session->streams = g_slist_prepend (session->streams, stream);
1872
1873   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (buffer));
1874
1875   if (g_signal_lookup ("request-pt-map", G_OBJECT_TYPE (buffer)) != 0) {
1876     /* provide clock_rate to the jitterbuffer when needed */
1877     stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1878         (GCallback) pt_map_requested, session);
1879   }
1880   if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (buffer)) != 0) {
1881     stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1882         (GCallback) on_npt_stop, stream);
1883   }
1884
1885   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1886   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1887
1888   /* configure latency and packet lost */
1889   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1890
1891   if (g_object_class_find_property (jb_class, "drop-on-latency"))
1892     g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1893   if (g_object_class_find_property (jb_class, "do-lost"))
1894     g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1895   if (g_object_class_find_property (jb_class, "mode"))
1896     g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1897   if (g_object_class_find_property (jb_class, "do-retransmission"))
1898     g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1899   if (g_object_class_find_property (jb_class, "max-rtcp-rtp-time-diff"))
1900     g_object_set (buffer, "max-rtcp-rtp-time-diff",
1901         rtpbin->max_rtcp_rtp_time_diff, NULL);
1902   if (g_object_class_find_property (jb_class, "max-dropout-time"))
1903     g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, NULL);
1904   if (g_object_class_find_property (jb_class, "max-misorder-time"))
1905     g_object_set (buffer, "max-misorder-time", rtpbin->max_misorder_time, NULL);
1906   if (g_object_class_find_property (jb_class, "rfc7273-sync"))
1907     g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
1908   if (g_object_class_find_property (jb_class, "max-ts-offset-adjustment"))
1909     g_object_set (buffer, "max-ts-offset-adjustment",
1910         rtpbin->max_ts_offset_adjustment, NULL);
1911
1912   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1913       buffer, session->id, ssrc);
1914
1915   if (!rtpbin->ignore_pt)
1916     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1917
1918   /* link stuff */
1919   if (demux)
1920     gst_element_link_pads_full (buffer, "src", demux, "sink",
1921         GST_PAD_LINK_CHECK_NOTHING);
1922
1923   if (rtpbin->buffering) {
1924     guint64 last_out;
1925
1926     if (g_signal_lookup ("set-active", G_OBJECT_TYPE (buffer)) != 0) {
1927       GST_INFO_OBJECT (rtpbin,
1928           "bin is buffering, set jitterbuffer as not active");
1929       g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0,
1930           &last_out);
1931     }
1932   }
1933
1934
1935   GST_OBJECT_LOCK (rtpbin);
1936   target = GST_STATE_TARGET (rtpbin);
1937   GST_OBJECT_UNLOCK (rtpbin);
1938
1939   /* from sink to source */
1940   if (demux)
1941     gst_element_set_state (demux, target);
1942
1943   gst_element_set_state (buffer, target);
1944
1945   return stream;
1946
1947   /* ERRORS */
1948 max_streams:
1949   {
1950     GST_WARNING_OBJECT (rtpbin, "stream exceeds maximum (%d)",
1951         rtpbin->max_streams);
1952     return NULL;
1953   }
1954 no_jitterbuffer:
1955   {
1956     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1957     return NULL;
1958   }
1959 no_demux:
1960   {
1961     gst_object_unref (buffer);
1962     g_warning ("rtpbin: could not create rtpptdemux element");
1963     return NULL;
1964   }
1965 }
1966
1967 /* called with RTP_BIN_LOCK */
1968 static void
1969 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1970 {
1971   GstRtpBinSession *sess = stream->session;
1972   GSList *clients, *next_client;
1973
1974   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1975
1976   gst_element_set_locked_state (stream->buffer, TRUE);
1977   if (stream->demux)
1978     gst_element_set_locked_state (stream->demux, TRUE);
1979
1980   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1981   if (stream->demux)
1982     gst_element_set_state (stream->demux, GST_STATE_NULL);
1983
1984   if (stream->demux) {
1985     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1986     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1987     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1988     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1989   }
1990
1991   if (stream->buffer_handlesync_sig)
1992     g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1993   if (stream->buffer_ptreq_sig)
1994     g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1995   if (stream->buffer_ntpstop_sig)
1996     g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1997
1998   sess->elements = g_slist_remove (sess->elements, stream->buffer);
1999   remove_bin_element (stream->buffer, bin);
2000   gst_object_unref (stream->buffer);
2001
2002   if (stream->demux)
2003     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
2004
2005   for (clients = bin->clients; clients; clients = next_client) {
2006     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
2007     GSList *streams, *next_stream;
2008
2009     next_client = g_slist_next (clients);
2010
2011     for (streams = client->streams; streams; streams = next_stream) {
2012       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
2013
2014       next_stream = g_slist_next (streams);
2015
2016       if (ostream == stream) {
2017         client->streams = g_slist_delete_link (client->streams, streams);
2018         /* If this was the last stream belonging to this client,
2019          * clean up the client. */
2020         if (--client->nstreams == 0) {
2021           bin->clients = g_slist_delete_link (bin->clients, clients);
2022           free_client (client, bin);
2023           break;
2024         }
2025       }
2026     }
2027   }
2028   g_free (stream);
2029 }
2030
2031 /* GObject vmethods */
2032 static void gst_rtp_bin_dispose (GObject * object);
2033 static void gst_rtp_bin_finalize (GObject * object);
2034 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
2035     const GValue * value, GParamSpec * pspec);
2036 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
2037     GValue * value, GParamSpec * pspec);
2038
2039 /* GstElement vmethods */
2040 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
2041     GstStateChange transition);
2042 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
2043     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
2044 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
2045 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
2046
2047 #define gst_rtp_bin_parent_class parent_class
2048 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
2049 GST_ELEMENT_REGISTER_DEFINE (rtpbin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN);
2050
2051 static gboolean
2052 _gst_element_accumulator (GSignalInvocationHint * ihint,
2053     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2054 {
2055   GstElement *element;
2056
2057   element = g_value_get_object (handler_return);
2058   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
2059
2060   g_value_set_object (return_accu, element);
2061
2062   /* stop emission if we have an element */
2063   return (element == NULL);
2064 }
2065
2066 static gboolean
2067 _gst_caps_accumulator (GSignalInvocationHint * ihint,
2068     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2069 {
2070   GstCaps *caps;
2071
2072   caps = g_value_get_boxed (handler_return);
2073   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
2074
2075   g_value_set_boxed (return_accu, caps);
2076
2077   /* stop emission if we have a caps */
2078   return (caps == NULL);
2079 }
2080
2081 static void
2082 gst_rtp_bin_class_init (GstRtpBinClass * klass)
2083 {
2084   GObjectClass *gobject_class;
2085   GstElementClass *gstelement_class;
2086   GstBinClass *gstbin_class;
2087
2088   gobject_class = (GObjectClass *) klass;
2089   gstelement_class = (GstElementClass *) klass;
2090   gstbin_class = (GstBinClass *) klass;
2091
2092   gobject_class->dispose = gst_rtp_bin_dispose;
2093   gobject_class->finalize = gst_rtp_bin_finalize;
2094   gobject_class->set_property = gst_rtp_bin_set_property;
2095   gobject_class->get_property = gst_rtp_bin_get_property;
2096
2097   g_object_class_install_property (gobject_class, PROP_LATENCY,
2098       g_param_spec_uint ("latency", "Buffer latency in ms",
2099           "Default amount of ms to buffer in the jitterbuffers", 0,
2100           G_MAXUINT, DEFAULT_LATENCY_MS,
2101           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2102
2103   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
2104       g_param_spec_boolean ("drop-on-latency",
2105           "Drop buffers when maximum latency is reached",
2106           "Tells the jitterbuffer to never exceed the given latency in size",
2107           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2108
2109   /**
2110    * GstRtpBin::request-pt-map:
2111    * @rtpbin: the object which received the signal
2112    * @session: the session
2113    * @pt: the pt
2114    *
2115    * Request the payload type as #GstCaps for @pt in @session.
2116    */
2117   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
2118       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
2119       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
2120       _gst_caps_accumulator, NULL, NULL, GST_TYPE_CAPS, 2, G_TYPE_UINT,
2121       G_TYPE_UINT);
2122
2123     /**
2124    * GstRtpBin::payload-type-change:
2125    * @rtpbin: the object which received the signal
2126    * @session: the session
2127    * @pt: the pt
2128    *
2129    * Signal that the current payload type changed to @pt in @session.
2130    */
2131   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
2132       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
2133       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
2134       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2135
2136   /**
2137    * GstRtpBin::clear-pt-map:
2138    * @rtpbin: the object which received the signal
2139    *
2140    * Clear all previously cached pt-mapping obtained with
2141    * #GstRtpBin::request-pt-map.
2142    */
2143   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
2144       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
2145       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2146           clear_pt_map), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2147
2148   /**
2149    * GstRtpBin::reset-sync:
2150    * @rtpbin: the object which received the signal
2151    *
2152    * Reset all currently configured lip-sync parameters and require new SR
2153    * packets for all streams before lip-sync is attempted again.
2154    */
2155   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
2156       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
2157       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2158           reset_sync), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2159
2160   /**
2161    * GstRtpBin::get-session:
2162    * @rtpbin: the object which received the signal
2163    * @id: the session id
2164    *
2165    * Request the related GstRtpSession as #GstElement related with session @id.
2166    *
2167    * Since: 1.8
2168    */
2169   gst_rtp_bin_signals[SIGNAL_GET_SESSION] =
2170       g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass),
2171       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2172           get_session), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2173
2174   /**
2175    * GstRtpBin::get-internal-session:
2176    * @rtpbin: the object which received the signal
2177    * @id: the session id
2178    *
2179    * Request the internal RTPSession object as #GObject in session @id.
2180    */
2181   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
2182       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
2183       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2184           get_internal_session), NULL, NULL, NULL, RTP_TYPE_SESSION, 1,
2185       G_TYPE_UINT);
2186
2187   /**
2188    * GstRtpBin::get-internal-storage:
2189    * @rtpbin: the object which received the signal
2190    * @id: the session id
2191    *
2192    * Request the internal RTPStorage object as #GObject in session @id. This
2193    * is the internal storage used by the RTPStorage element, which is used to
2194    * keep a backlog of received RTP packets for the session @id.
2195    *
2196    * Since: 1.14
2197    */
2198   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_STORAGE] =
2199       g_signal_new ("get-internal-storage", G_TYPE_FROM_CLASS (klass),
2200       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2201           get_internal_storage), NULL, NULL, NULL, G_TYPE_OBJECT, 1,
2202       G_TYPE_UINT);
2203
2204   /**
2205    * GstRtpBin::get-storage:
2206    * @rtpbin: the object which received the signal
2207    * @id: the session id
2208    *
2209    * Request the RTPStorage element as #GObject in session @id. This element
2210    * is used to keep a backlog of received RTP packets for the session @id.
2211    *
2212    * Since: 1.16
2213    */
2214   gst_rtp_bin_signals[SIGNAL_GET_STORAGE] =
2215       g_signal_new ("get-storage", G_TYPE_FROM_CLASS (klass),
2216       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2217           get_storage), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2218
2219   /**
2220    * GstRtpBin::clear-ssrc:
2221    * @rtpbin: the object which received the signal
2222    * @id: the session id
2223    * @ssrc: the ssrc
2224    *
2225    * Remove all pads from rtpssrcdemux element associated with the specified
2226    * ssrc. This delegate the action signal to the rtpssrcdemux element
2227    * associated with the specified session.
2228    *
2229    * Since: 1.20
2230    */
2231   gst_rtp_bin_signals[SIGNAL_CLEAR_SSRC] =
2232       g_signal_new ("clear-ssrc", G_TYPE_FROM_CLASS (klass),
2233       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2234           clear_ssrc), NULL, NULL, NULL, G_TYPE_NONE, 2,
2235       G_TYPE_UINT, G_TYPE_UINT);
2236
2237   /**
2238    * GstRtpBin::on-new-ssrc:
2239    * @rtpbin: the object which received the signal
2240    * @session: the session
2241    * @ssrc: the SSRC
2242    *
2243    * Notify of a new SSRC that entered @session.
2244    */
2245   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
2246       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
2247       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
2248       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2249   /**
2250    * GstRtpBin::on-ssrc-collision:
2251    * @rtpbin: the object which received the signal
2252    * @session: the session
2253    * @ssrc: the SSRC
2254    *
2255    * Notify when we have an SSRC collision
2256    */
2257   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
2258       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
2259       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
2260       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2261   /**
2262    * GstRtpBin::on-ssrc-validated:
2263    * @rtpbin: the object which received the signal
2264    * @session: the session
2265    * @ssrc: the SSRC
2266    *
2267    * Notify of a new SSRC that became validated.
2268    */
2269   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
2270       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
2271       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
2272       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2273   /**
2274    * GstRtpBin::on-ssrc-active:
2275    * @rtpbin: the object which received the signal
2276    * @session: the session
2277    * @ssrc: the SSRC
2278    *
2279    * Notify of a SSRC that is active, i.e., sending RTCP.
2280    */
2281   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
2282       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
2283       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
2284       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2285   /**
2286    * GstRtpBin::on-ssrc-sdes:
2287    * @rtpbin: the object which received the signal
2288    * @session: the session
2289    * @ssrc: the SSRC
2290    *
2291    * Notify of a SSRC that is active, i.e., sending RTCP.
2292    */
2293   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
2294       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
2295       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
2296       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2297
2298   /**
2299    * GstRtpBin::on-bye-ssrc:
2300    * @rtpbin: the object which received the signal
2301    * @session: the session
2302    * @ssrc: the SSRC
2303    *
2304    * Notify of an SSRC that became inactive because of a BYE packet.
2305    */
2306   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
2307       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
2308       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
2309       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2310   /**
2311    * GstRtpBin::on-bye-timeout:
2312    * @rtpbin: the object which received the signal
2313    * @session: the session
2314    * @ssrc: the SSRC
2315    *
2316    * Notify of an SSRC that has timed out because of BYE
2317    */
2318   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
2319       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
2320       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
2321       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2322   /**
2323    * GstRtpBin::on-timeout:
2324    * @rtpbin: the object which received the signal
2325    * @session: the session
2326    * @ssrc: the SSRC
2327    *
2328    * Notify of an SSRC that has timed out
2329    */
2330   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
2331       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
2332       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
2333       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2334   /**
2335    * GstRtpBin::on-sender-timeout:
2336    * @rtpbin: the object which received the signal
2337    * @session: the session
2338    * @ssrc: the SSRC
2339    *
2340    * Notify of a sender SSRC that has timed out and became a receiver
2341    */
2342   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
2343       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
2344       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
2345       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2346
2347   /**
2348    * GstRtpBin::on-npt-stop:
2349    * @rtpbin: the object which received the signal
2350    * @session: the session
2351    * @ssrc: the SSRC
2352    *
2353    * Notify that SSRC sender has sent data up to the configured NPT stop time.
2354    */
2355   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
2356       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
2357       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
2358       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2359
2360   /**
2361    * GstRtpBin::request-rtp-encoder:
2362    * @rtpbin: the object which received the signal
2363    * @session: the session
2364    *
2365    * Request an RTP encoder element for the given @session. The encoder
2366    * element will be added to the bin if not previously added.
2367    *
2368    * If no handler is connected, no encoder will be used.
2369    *
2370    * Since: 1.4
2371    */
2372   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
2373       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
2374       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2375           request_rtp_encoder), _gst_element_accumulator, NULL, NULL,
2376       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2377
2378   /**
2379    * GstRtpBin::request-rtp-decoder:
2380    * @rtpbin: the object which received the signal
2381    * @session: the session
2382    *
2383    * Request an RTP decoder element for the given @session. The decoder
2384    * element will be added to the bin if not previously added.
2385    *
2386    * If no handler is connected, no encoder will be used.
2387    *
2388    * Since: 1.4
2389    */
2390   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
2391       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
2392       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2393           request_rtp_decoder), _gst_element_accumulator, NULL,
2394       NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2395
2396   /**
2397    * GstRtpBin::request-rtcp-encoder:
2398    * @rtpbin: the object which received the signal
2399    * @session: the session
2400    *
2401    * Request an RTCP encoder element for the given @session. The encoder
2402    * element will be added to the bin if not previously added.
2403    *
2404    * If no handler is connected, no encoder will be used.
2405    *
2406    * Since: 1.4
2407    */
2408   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
2409       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
2410       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2411           request_rtcp_encoder), _gst_element_accumulator, NULL, NULL,
2412       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2413
2414   /**
2415    * GstRtpBin::request-rtcp-decoder:
2416    * @rtpbin: the object which received the signal
2417    * @session: the session
2418    *
2419    * Request an RTCP decoder element for the given @session. The decoder
2420    * element will be added to the bin if not previously added.
2421    *
2422    * If no handler is connected, no encoder will be used.
2423    *
2424    * Since: 1.4
2425    */
2426   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
2427       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
2428       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2429           request_rtcp_decoder), _gst_element_accumulator, NULL, NULL,
2430       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2431
2432   /**
2433    * GstRtpBin::request-jitterbuffer:
2434    * @rtpbin: the object which received the signal
2435    * @session: the session
2436    *
2437    * Request a jitterbuffer element for the given @session.
2438    *
2439    * If no handler is connected, the default jitterbuffer will be used.
2440    *
2441    * Note: The provided element is expected to conform to the API exposed
2442    * by the standard #GstRtpJitterBuffer. Runtime checks will be made to
2443    * determine whether it exposes properties and signals before attempting
2444    * to set, call or connect to them, and some functionalities of #GstRtpBin
2445    * may not be available when that is not the case.
2446    *
2447    * This should be considered experimental API, as the standard jitterbuffer
2448    * API is susceptible to change, provided elements will have to update their
2449    * custom jitterbuffer's API to match the API of #GstRtpJitterBuffer if and
2450    * when it changes.
2451    *
2452    * Since: 1.18
2453    */
2454   gst_rtp_bin_signals[SIGNAL_REQUEST_JITTERBUFFER] =
2455       g_signal_new ("request-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2456       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2457           request_jitterbuffer), _gst_element_accumulator, NULL,
2458       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2459
2460   /**
2461    * GstRtpBin::new-jitterbuffer:
2462    * @rtpbin: the object which received the signal
2463    * @jitterbuffer: the new jitterbuffer
2464    * @session: the session
2465    * @ssrc: the SSRC
2466    *
2467    * Notify that a new @jitterbuffer was created for @session and @ssrc.
2468    * This signal can, for example, be used to configure @jitterbuffer.
2469    *
2470    * Since: 1.4
2471    */
2472   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
2473       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2474       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2475           new_jitterbuffer), NULL, NULL, NULL,
2476       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2477
2478   /**
2479    * GstRtpBin::new-storage:
2480    * @rtpbin: the object which received the signal
2481    * @storage: the new storage
2482    * @session: the session
2483    *
2484    * Notify that a new @storage was created for @session.
2485    * This signal can, for example, be used to configure @storage.
2486    *
2487    * Since: 1.14
2488    */
2489   gst_rtp_bin_signals[SIGNAL_NEW_STORAGE] =
2490       g_signal_new ("new-storage", G_TYPE_FROM_CLASS (klass),
2491       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2492           new_storage), NULL, NULL, NULL,
2493       G_TYPE_NONE, 2, GST_TYPE_ELEMENT, G_TYPE_UINT);
2494
2495   /**
2496    * GstRtpBin::request-aux-sender:
2497    * @rtpbin: the object which received the signal
2498    * @session: the session
2499    *
2500    * Request an AUX sender element for the given @session. The AUX
2501    * element will be added to the bin.
2502    *
2503    * If no handler is connected, no AUX element will be used.
2504    *
2505    * Since: 1.4
2506    */
2507   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2508       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2509       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2510           request_aux_sender), _gst_element_accumulator, NULL, NULL,
2511       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2512
2513   /**
2514    * GstRtpBin::request-aux-receiver:
2515    * @rtpbin: the object which received the signal
2516    * @session: the session
2517    *
2518    * Request an AUX receiver element for the given @session. The AUX
2519    * element will be added to the bin.
2520    *
2521    * If no handler is connected, no AUX element will be used.
2522    *
2523    * Since: 1.4
2524    */
2525   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2526       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2527       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2528           request_aux_receiver), _gst_element_accumulator, NULL, NULL,
2529       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2530
2531   /**
2532    * GstRtpBin::request-fec-decoder:
2533    * @rtpbin: the object which received the signal
2534    * @session: the session index
2535    *
2536    * Request a FEC decoder element for the given @session. The element
2537    * will be added to the bin after the pt demuxer.  If there are multiple
2538    * ssrc's and pt's in @session, this signal may be called multiple times for
2539    * the same @session each corresponding to a newly discovered ssrc.
2540    *
2541    * If no handler is connected, no FEC decoder will be used.
2542    *
2543    * Warning: usage of this signal is not appropriate for the BUNDLE case,
2544    * connect to #GstRtpBin::request-fec-decoder-full instead.
2545    *
2546    * Since: 1.14
2547    */
2548   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER] =
2549       g_signal_new ("request-fec-decoder", G_TYPE_FROM_CLASS (klass),
2550       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2551           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2552       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2553
2554   /**
2555    * GstRtpBin::request-fec-decoder-full:
2556    * @rtpbin: the object which received the signal
2557    * @session: the session index
2558    * @ssrc: the ssrc of the stream
2559    * @pt: the payload type
2560    *
2561    * Request a FEC decoder element for the given @session. The element
2562    * will be added to the bin after the pt demuxer.  If there are multiple
2563    * ssrc's and pt's in @session, this signal may be called multiple times for
2564    * the same @session each corresponding to a newly discovered ssrc and payload
2565    * type, those are provided as parameters.
2566    *
2567    * If no handler is connected, no FEC decoder will be used.
2568    *
2569    * Since: 1.20
2570    */
2571   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER_FULL] =
2572       g_signal_new ("request-fec-decoder-full", G_TYPE_FROM_CLASS (klass),
2573       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2574           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2575       GST_TYPE_ELEMENT, 3, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT);
2576
2577   /**
2578    * GstRtpBin::request-fec-encoder:
2579    * @rtpbin: the object which received the signal
2580    * @session: the session index
2581    *
2582    * Request a FEC encoder element for the given @session. The element
2583    * will be added to the bin after the RTPSession.
2584    *
2585    * If no handler is connected, no FEC encoder will be used.
2586    *
2587    * Since: 1.14
2588    */
2589   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_ENCODER] =
2590       g_signal_new ("request-fec-encoder", G_TYPE_FROM_CLASS (klass),
2591       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2592           request_fec_encoder), _gst_element_accumulator, NULL, NULL,
2593       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2594
2595   /**
2596    * GstRtpBin::on-new-sender-ssrc:
2597    * @rtpbin: the object which received the signal
2598    * @session: the session
2599    * @ssrc: the sender SSRC
2600    *
2601    * Notify of a new sender SSRC that entered @session.
2602    *
2603    * Since: 1.8
2604    */
2605   gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
2606       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
2607       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc),
2608       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2609   /**
2610    * GstRtpBin::on-sender-ssrc-active:
2611    * @rtpbin: the object which received the signal
2612    * @session: the session
2613    * @ssrc: the sender SSRC
2614    *
2615    * Notify of a sender SSRC that is active, i.e., sending RTCP.
2616    *
2617    * Since: 1.8
2618    */
2619   gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
2620       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
2621       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2622           on_sender_ssrc_active), NULL, NULL, NULL,
2623       G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2624
2625   g_object_class_install_property (gobject_class, PROP_SDES,
2626       g_param_spec_boxed ("sdes", "SDES",
2627           "The SDES items of this session",
2628           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
2629           | GST_PARAM_DOC_SHOW_DEFAULT));
2630
2631   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2632       g_param_spec_boolean ("do-lost", "Do Lost",
2633           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2634           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2635
2636   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2637       g_param_spec_boolean ("autoremove", "Auto Remove",
2638           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2639           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2640
2641   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2642       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2643           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2644           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2645
2646   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2647       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2648           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
2649           "(DEPRECATED: Use ntp-time-source property)",
2650           DEFAULT_USE_PIPELINE_CLOCK,
2651           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
2652   /**
2653    * GstRtpBin:buffer-mode:
2654    *
2655    * Control the buffering and timestamping mode used by the jitterbuffer.
2656    */
2657   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2658       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2659           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2660           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2661   /**
2662    * GstRtpBin:ntp-sync:
2663    *
2664    * Set the NTP time from the sender reports as the running-time on the
2665    * buffers. When both the sender and receiver have sychronized
2666    * running-time, i.e. when the clock and base-time is shared
2667    * between the receivers and the and the senders, this option can be
2668    * used to synchronize receivers on multiple machines.
2669    */
2670   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2671       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2672           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2673           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2674
2675   /**
2676    * GstRtpBin:rtcp-sync:
2677    *
2678    * If not synchronizing (directly) to the NTP clock, determines how to sync
2679    * the various streams.
2680    */
2681   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2682       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2683           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2684           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2685
2686   /**
2687    * GstRtpBin:rtcp-sync-interval:
2688    *
2689    * Determines how often to sync streams using RTCP data.
2690    */
2691   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2692       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2693           "RTCP SR interval synchronization (ms) (0 = always)",
2694           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2695           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2696
2697   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2698       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2699           "Send event downstream when a stream is synchronized to the sender",
2700           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2701
2702   /**
2703    * GstRtpBin:do-retransmission:
2704    *
2705    * Enables RTP retransmission on all streams. To control retransmission on
2706    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2707    * set the #GstRtpJitterBuffer:do-retransmission property on the
2708    * #GstRtpJitterBuffer object instead.
2709    */
2710   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2711       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2712           "Enable retransmission on all streams",
2713           DEFAULT_DO_RETRANSMISSION,
2714           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2715
2716   /**
2717    * GstRtpBin:rtp-profile:
2718    *
2719    * Sets the default RTP profile of newly created RTP sessions. The
2720    * profile can be changed afterwards on a per-session basis.
2721    */
2722   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
2723       g_param_spec_enum ("rtp-profile", "RTP Profile",
2724           "Default RTP profile of newly created sessions",
2725           GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
2726           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2727
2728   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
2729       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
2730           "NTP time source for RTCP packets",
2731           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
2732           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2733
2734   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
2735       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
2736           "Use send time or capture time for RTCP sync "
2737           "(TRUE = send time, FALSE = capture time)",
2738           DEFAULT_RTCP_SYNC_SEND_TIME,
2739           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2740
2741   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
2742       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
2743           "Maximum amount of time in ms that the RTP time in RTCP SRs "
2744           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
2745           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
2746           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2747
2748   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
2749       g_param_spec_uint ("max-dropout-time", "Max dropout time",
2750           "The maximum time (milliseconds) of missing packets tolerated.",
2751           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
2752           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2753
2754   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
2755       g_param_spec_uint ("max-misorder-time", "Max misorder time",
2756           "The maximum time (milliseconds) of misordered packets tolerated.",
2757           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
2758           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2759
2760   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
2761       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
2762           "Synchronize received streams to the RFC7273 clock "
2763           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
2764           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2765
2766   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
2767       g_param_spec_uint ("max-streams", "Max Streams",
2768           "The maximum number of streams to create for one session",
2769           0, G_MAXUINT, DEFAULT_MAX_STREAMS,
2770           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2771
2772   /**
2773    * GstRtpBin:max-ts-offset-adjustment:
2774    *
2775    * Syncing time stamps to NTP time adds a time offset. This parameter
2776    * specifies the maximum number of nanoseconds per frame that this time offset
2777    * may be adjusted with. This is used to avoid sudden large changes to time
2778    * stamps.
2779    *
2780    * Since: 1.14
2781    */
2782   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
2783       g_param_spec_uint64 ("max-ts-offset-adjustment",
2784           "Max Timestamp Offset Adjustment",
2785           "The maximum number of nanoseconds per frame that time stamp offsets "
2786           "may be adjusted (0 = no limit).", 0, G_MAXUINT64,
2787           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
2788           G_PARAM_STATIC_STRINGS));
2789
2790   /**
2791    * GstRtpBin:max-ts-offset:
2792    *
2793    * Used to set an upper limit of how large a time offset may be. This
2794    * is used to protect against unrealistic values as a result of either
2795    * client,server or clock issues.
2796    *
2797    * Since: 1.14
2798    */
2799   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET,
2800       g_param_spec_int64 ("max-ts-offset", "Max TS Offset",
2801           "The maximum absolute value of the time offset in (nanoseconds). "
2802           "Note, if the ntp-sync parameter is set the default value is "
2803           "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET,
2804           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2805
2806   /**
2807    * GstRtpBin:min-ts-offset:
2808    *
2809    * Used to set an lower limit for when a time offset is deemed large enough
2810    * to be useful for sync corrections.
2811    *
2812    * When streaming for instance audio, even very small ts_offsets cause
2813    * audible glitches. This property is used for controlling how sensitive the
2814    * adjustments should be to small deviations in ts_offset, occurring for
2815    * instance due to jittery network conditions or system load.
2816    *
2817    * Since: 1.22
2818    */
2819   g_object_class_install_property (gobject_class, PROP_MIN_TS_OFFSET,
2820       g_param_spec_uint64 ("min-ts-offset", "Min TS Offset",
2821           "The minimum absolute value of the time offset in (nanoseconds). "
2822           "Used to set an lower limit for when a time offset is deemed large "
2823           "enough to be useful for sync corrections."
2824           "Note, if the ntp-sync parameter is set the default value is "
2825           "changed to 0 (no limit)", 0, G_MAXUINT64, DEFAULT_MIN_TS_OFFSET,
2826           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2827
2828   /**
2829    * GstRtpBin:ts-offset-smoothing-factor:
2830    *
2831    * Controls the weighting between previous and current timestamp offsets in
2832    * a running moving average (RMA):
2833    * ts_offset_average(n) =
2834    *   ((ts-offset-smoothing-factor - 1) * ts_offset_average(n - 1) + ts_offset(n)) /
2835    *   ts-offset-smoothing-factor
2836    *
2837    * This can stabilize the timestamp offset and prevent unnecessary skew
2838    * corrections due to jitter introduced by network or system load.
2839    *
2840    * Since: 1.22
2841    */
2842   g_object_class_install_property (gobject_class,
2843       PROP_TS_OFFSET_SMOOTHING_FACTOR,
2844       g_param_spec_uint ("ts-offset-smoothing-factor",
2845           "Timestamp Offset Smoothing Factor",
2846           "Sets a smoothing factor for the timestamp offset in number of "
2847           "values for a calculated running moving average. "
2848           "(0 = no smoothing factor)", 0, G_MAXUINT,
2849           DEFAULT_TS_OFFSET_SMOOTHING_FACTOR,
2850           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2851
2852   /**
2853    * GstRtpBin:fec-decoders:
2854    *
2855    * Used to provide a factory used to build the FEC decoder for a
2856    * given session, as a command line alternative to
2857    * #GstRtpBin::request-fec-decoder.
2858    *
2859    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2860    *
2861    * Since: 1.20
2862    */
2863   g_object_class_install_property (gobject_class, PROP_FEC_DECODERS,
2864       g_param_spec_boxed ("fec-decoders", "Fec Decoders",
2865           "GstStructure mapping from session index to FEC decoder "
2866           "factory, eg "
2867           "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'",
2868           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2869
2870   /**
2871    * GstRtpBin:fec-encoders:
2872    *
2873    * Used to provide a factory used to build the FEC encoder for a
2874    * given session, as a command line alternative to
2875    * #GstRtpBin::request-fec-encoder.
2876    *
2877    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2878    *
2879    * Since: 1.20
2880    */
2881   g_object_class_install_property (gobject_class, PROP_FEC_ENCODERS,
2882       g_param_spec_boxed ("fec-encoders", "Fec Encoders",
2883           "GstStructure mapping from session index to FEC encoder "
2884           "factory, eg "
2885           "fec-encoders='fec,0=\"rtpst2022-1-fecenc\\ rows\\=5\\ columns\\=5\";'",
2886           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2887
2888   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2889   gstelement_class->request_new_pad =
2890       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2891   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2892
2893   /* sink pads */
2894   gst_element_class_add_static_pad_template (gstelement_class,
2895       &rtpbin_recv_rtp_sink_template);
2896   gst_element_class_add_static_pad_template (gstelement_class,
2897       &rtpbin_recv_fec_sink_template);
2898   gst_element_class_add_static_pad_template (gstelement_class,
2899       &rtpbin_recv_rtcp_sink_template);
2900   gst_element_class_add_static_pad_template (gstelement_class,
2901       &rtpbin_send_rtp_sink_template);
2902
2903   /* src pads */
2904   gst_element_class_add_static_pad_template (gstelement_class,
2905       &rtpbin_recv_rtp_src_template);
2906   gst_element_class_add_static_pad_template (gstelement_class,
2907       &rtpbin_send_rtcp_src_template);
2908   gst_element_class_add_static_pad_template (gstelement_class,
2909       &rtpbin_send_rtp_src_template);
2910   gst_element_class_add_static_pad_template (gstelement_class,
2911       &rtpbin_send_fec_src_template);
2912
2913   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2914       "Filter/Network/RTP",
2915       "Real-Time Transport Protocol bin",
2916       "Wim Taymans <wim.taymans@gmail.com>");
2917
2918   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2919
2920   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2921   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2922   klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session);
2923   klass->get_internal_session =
2924       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2925   klass->get_storage = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_storage);
2926   klass->get_internal_storage =
2927       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_storage);
2928   klass->clear_ssrc = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_ssrc);
2929   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2930   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2931   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2932   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2933   klass->request_jitterbuffer =
2934       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_jitterbuffer);
2935
2936   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2937
2938   gst_type_mark_as_plugin_api (GST_RTP_BIN_RTCP_SYNC_TYPE, 0);
2939 }
2940
2941 static void
2942 gst_rtp_bin_init (GstRtpBin * rtpbin)
2943 {
2944   gchar *cname;
2945
2946   rtpbin->priv = gst_rtp_bin_get_instance_private (rtpbin);
2947   g_mutex_init (&rtpbin->priv->bin_lock);
2948   g_mutex_init (&rtpbin->priv->dyn_lock);
2949
2950   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2951   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2952   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2953   rtpbin->do_lost = DEFAULT_DO_LOST;
2954   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2955   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2956   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2957   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2958   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2959   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2960   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2961   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2962   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2963   rtpbin->rtp_profile = DEFAULT_RTP_PROFILE;
2964   rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
2965   rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
2966   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
2967   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
2968   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
2969   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
2970   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
2971   rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
2972   rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
2973   rtpbin->max_ts_offset_is_set = FALSE;
2974   rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
2975   rtpbin->min_ts_offset_is_set = FALSE;
2976   rtpbin->ts_offset_smoothing_factor = DEFAULT_TS_OFFSET_SMOOTHING_FACTOR;
2977
2978   /* some default SDES entries */
2979   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2980   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2981       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2982   rtpbin->fec_decoders =
2983       gst_structure_new_empty ("application/x-rtp-fec-decoders");
2984   rtpbin->fec_encoders =
2985       gst_structure_new_empty ("application/x-rtp-fec-encoders");
2986   g_free (cname);
2987 }
2988
2989 static void
2990 gst_rtp_bin_dispose (GObject * object)
2991 {
2992   GstRtpBin *rtpbin;
2993
2994   rtpbin = GST_RTP_BIN (object);
2995
2996   GST_RTP_BIN_LOCK (rtpbin);
2997   GST_DEBUG_OBJECT (object, "freeing sessions");
2998   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2999   g_slist_free (rtpbin->sessions);
3000   rtpbin->sessions = NULL;
3001   GST_RTP_BIN_UNLOCK (rtpbin);
3002
3003   G_OBJECT_CLASS (parent_class)->dispose (object);
3004 }
3005
3006 static void
3007 gst_rtp_bin_finalize (GObject * object)
3008 {
3009   GstRtpBin *rtpbin;
3010
3011   rtpbin = GST_RTP_BIN (object);
3012
3013   if (rtpbin->sdes)
3014     gst_structure_free (rtpbin->sdes);
3015
3016   if (rtpbin->fec_decoders)
3017     gst_structure_free (rtpbin->fec_decoders);
3018
3019   if (rtpbin->fec_encoders)
3020     gst_structure_free (rtpbin->fec_encoders);
3021
3022   g_mutex_clear (&rtpbin->priv->bin_lock);
3023   g_mutex_clear (&rtpbin->priv->dyn_lock);
3024
3025   G_OBJECT_CLASS (parent_class)->finalize (object);
3026 }
3027
3028
3029 static void
3030 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
3031 {
3032   GSList *item;
3033
3034   if (sdes == NULL)
3035     return;
3036
3037   GST_RTP_BIN_LOCK (bin);
3038
3039   GST_OBJECT_LOCK (bin);
3040   if (bin->sdes)
3041     gst_structure_free (bin->sdes);
3042   bin->sdes = gst_structure_copy (sdes);
3043   GST_OBJECT_UNLOCK (bin);
3044
3045   /* store in all sessions */
3046   for (item = bin->sessions; item; item = g_slist_next (item)) {
3047     GstRtpBinSession *session = item->data;
3048     g_object_set (session->session, "sdes", sdes, NULL);
3049   }
3050
3051   GST_RTP_BIN_UNLOCK (bin);
3052 }
3053
3054 static void
3055 gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin,
3056     const GstStructure * decoders)
3057 {
3058   if (decoders == NULL)
3059     return;
3060
3061   GST_RTP_BIN_LOCK (bin);
3062
3063   GST_OBJECT_LOCK (bin);
3064   if (bin->fec_decoders)
3065     gst_structure_free (bin->fec_decoders);
3066   bin->fec_decoders = gst_structure_copy (decoders);
3067
3068   GST_OBJECT_UNLOCK (bin);
3069
3070   GST_RTP_BIN_UNLOCK (bin);
3071 }
3072
3073 static void
3074 gst_rtp_bin_set_fec_encoders_struct (GstRtpBin * bin,
3075     const GstStructure * encoders)
3076 {
3077   if (encoders == NULL)
3078     return;
3079
3080   GST_RTP_BIN_LOCK (bin);
3081
3082   GST_OBJECT_LOCK (bin);
3083   if (bin->fec_encoders)
3084     gst_structure_free (bin->fec_encoders);
3085   bin->fec_encoders = gst_structure_copy (encoders);
3086
3087   GST_OBJECT_UNLOCK (bin);
3088
3089   GST_RTP_BIN_UNLOCK (bin);
3090 }
3091
3092 static GstStructure *
3093 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
3094 {
3095   GstStructure *result;
3096
3097   GST_OBJECT_LOCK (bin);
3098   result = gst_structure_copy (bin->sdes);
3099   GST_OBJECT_UNLOCK (bin);
3100
3101   return result;
3102 }
3103
3104 static GstStructure *
3105 gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin)
3106 {
3107   GstStructure *result;
3108
3109   GST_OBJECT_LOCK (bin);
3110   result = gst_structure_copy (bin->fec_decoders);
3111   GST_OBJECT_UNLOCK (bin);
3112
3113   return result;
3114 }
3115
3116 static GstStructure *
3117 gst_rtp_bin_get_fec_encoders_struct (GstRtpBin * bin)
3118 {
3119   GstStructure *result;
3120
3121   GST_OBJECT_LOCK (bin);
3122   result = gst_structure_copy (bin->fec_encoders);
3123   GST_OBJECT_UNLOCK (bin);
3124
3125   return result;
3126 }
3127
3128 static void
3129 gst_rtp_bin_set_property (GObject * object, guint prop_id,
3130     const GValue * value, GParamSpec * pspec)
3131 {
3132   GstRtpBin *rtpbin;
3133
3134   rtpbin = GST_RTP_BIN (object);
3135
3136   switch (prop_id) {
3137     case PROP_LATENCY:
3138       GST_RTP_BIN_LOCK (rtpbin);
3139       rtpbin->latency_ms = g_value_get_uint (value);
3140       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
3141       GST_RTP_BIN_UNLOCK (rtpbin);
3142       /* propagate the property down to the jitterbuffer */
3143       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
3144       break;
3145     case PROP_DROP_ON_LATENCY:
3146       GST_RTP_BIN_LOCK (rtpbin);
3147       rtpbin->drop_on_latency = g_value_get_boolean (value);
3148       GST_RTP_BIN_UNLOCK (rtpbin);
3149       /* propagate the property down to the jitterbuffer */
3150       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3151           "drop-on-latency", value);
3152       break;
3153     case PROP_SDES:
3154       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
3155       break;
3156     case PROP_DO_LOST:
3157       GST_RTP_BIN_LOCK (rtpbin);
3158       rtpbin->do_lost = g_value_get_boolean (value);
3159       GST_RTP_BIN_UNLOCK (rtpbin);
3160       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
3161       break;
3162     case PROP_NTP_SYNC:
3163       rtpbin->ntp_sync = g_value_get_boolean (value);
3164       /* The default value of max_ts_offset depends on ntp_sync. If user
3165        * hasn't set it then change default value */
3166       if (!rtpbin->max_ts_offset_is_set) {
3167         if (rtpbin->ntp_sync) {
3168           rtpbin->max_ts_offset = 0;
3169         } else {
3170           rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
3171         }
3172       }
3173       if (!rtpbin->min_ts_offset_is_set) {
3174         if (rtpbin->ntp_sync) {
3175           rtpbin->min_ts_offset = 0;
3176         } else {
3177           rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
3178         }
3179       }
3180       break;
3181     case PROP_RTCP_SYNC:
3182       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
3183       break;
3184     case PROP_RTCP_SYNC_INTERVAL:
3185       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
3186       break;
3187     case PROP_IGNORE_PT:
3188       rtpbin->ignore_pt = g_value_get_boolean (value);
3189       break;
3190     case PROP_AUTOREMOVE:
3191       rtpbin->priv->autoremove = g_value_get_boolean (value);
3192       break;
3193     case PROP_USE_PIPELINE_CLOCK:
3194     {
3195       GSList *sessions;
3196       GST_RTP_BIN_LOCK (rtpbin);
3197       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
3198       for (sessions = rtpbin->sessions; sessions;
3199           sessions = g_slist_next (sessions)) {
3200         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3201
3202         g_object_set (G_OBJECT (session->session),
3203             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
3204       }
3205       GST_RTP_BIN_UNLOCK (rtpbin);
3206     }
3207       break;
3208     case PROP_DO_SYNC_EVENT:
3209       rtpbin->send_sync_event = g_value_get_boolean (value);
3210       break;
3211     case PROP_BUFFER_MODE:
3212       GST_RTP_BIN_LOCK (rtpbin);
3213       rtpbin->buffer_mode = g_value_get_enum (value);
3214       GST_RTP_BIN_UNLOCK (rtpbin);
3215       /* propagate the property down to the jitterbuffer */
3216       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
3217       break;
3218     case PROP_DO_RETRANSMISSION:
3219       GST_RTP_BIN_LOCK (rtpbin);
3220       rtpbin->do_retransmission = g_value_get_boolean (value);
3221       GST_RTP_BIN_UNLOCK (rtpbin);
3222       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3223           "do-retransmission", value);
3224       break;
3225     case PROP_RTP_PROFILE:
3226       rtpbin->rtp_profile = g_value_get_enum (value);
3227       break;
3228     case PROP_NTP_TIME_SOURCE:{
3229       GSList *sessions;
3230       GST_RTP_BIN_LOCK (rtpbin);
3231       rtpbin->ntp_time_source = g_value_get_enum (value);
3232       for (sessions = rtpbin->sessions; sessions;
3233           sessions = g_slist_next (sessions)) {
3234         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3235
3236         g_object_set (G_OBJECT (session->session),
3237             "ntp-time-source", rtpbin->ntp_time_source, NULL);
3238       }
3239       GST_RTP_BIN_UNLOCK (rtpbin);
3240       break;
3241     }
3242     case PROP_RTCP_SYNC_SEND_TIME:{
3243       GSList *sessions;
3244       GST_RTP_BIN_LOCK (rtpbin);
3245       rtpbin->rtcp_sync_send_time = g_value_get_boolean (value);
3246       for (sessions = rtpbin->sessions; sessions;
3247           sessions = g_slist_next (sessions)) {
3248         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3249
3250         g_object_set (G_OBJECT (session->session),
3251             "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL);
3252       }
3253       GST_RTP_BIN_UNLOCK (rtpbin);
3254       break;
3255     }
3256     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3257       GST_RTP_BIN_LOCK (rtpbin);
3258       rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value);
3259       GST_RTP_BIN_UNLOCK (rtpbin);
3260       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3261           "max-rtcp-rtp-time-diff", value);
3262       break;
3263     case PROP_MAX_DROPOUT_TIME:
3264       GST_RTP_BIN_LOCK (rtpbin);
3265       rtpbin->max_dropout_time = g_value_get_uint (value);
3266       GST_RTP_BIN_UNLOCK (rtpbin);
3267       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3268           "max-dropout-time", value);
3269       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
3270           value);
3271       break;
3272     case PROP_MAX_MISORDER_TIME:
3273       GST_RTP_BIN_LOCK (rtpbin);
3274       rtpbin->max_misorder_time = g_value_get_uint (value);
3275       GST_RTP_BIN_UNLOCK (rtpbin);
3276       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3277           "max-misorder-time", value);
3278       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time",
3279           value);
3280       break;
3281     case PROP_RFC7273_SYNC:
3282       rtpbin->rfc7273_sync = g_value_get_boolean (value);
3283       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3284           "rfc7273-sync", value);
3285       break;
3286     case PROP_MAX_STREAMS:
3287       rtpbin->max_streams = g_value_get_uint (value);
3288       break;
3289     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3290       rtpbin->max_ts_offset_adjustment = g_value_get_uint64 (value);
3291       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3292           "max-ts-offset-adjustment", value);
3293       break;
3294     case PROP_MAX_TS_OFFSET:
3295       rtpbin->max_ts_offset = g_value_get_int64 (value);
3296       rtpbin->max_ts_offset_is_set = TRUE;
3297       break;
3298     case PROP_MIN_TS_OFFSET:
3299       rtpbin->min_ts_offset = g_value_get_uint64 (value);
3300       rtpbin->min_ts_offset_is_set = TRUE;
3301       break;
3302     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3303       rtpbin->ts_offset_smoothing_factor = g_value_get_uint (value);
3304       break;
3305     case PROP_FEC_DECODERS:
3306       gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value));
3307       break;
3308     case PROP_FEC_ENCODERS:
3309       gst_rtp_bin_set_fec_encoders_struct (rtpbin, g_value_get_boxed (value));
3310       break;
3311     default:
3312       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3313       break;
3314   }
3315 }
3316
3317 static void
3318 gst_rtp_bin_get_property (GObject * object, guint prop_id,
3319     GValue * value, GParamSpec * pspec)
3320 {
3321   GstRtpBin *rtpbin;
3322
3323   rtpbin = GST_RTP_BIN (object);
3324
3325   switch (prop_id) {
3326     case PROP_LATENCY:
3327       GST_RTP_BIN_LOCK (rtpbin);
3328       g_value_set_uint (value, rtpbin->latency_ms);
3329       GST_RTP_BIN_UNLOCK (rtpbin);
3330       break;
3331     case PROP_DROP_ON_LATENCY:
3332       GST_RTP_BIN_LOCK (rtpbin);
3333       g_value_set_boolean (value, rtpbin->drop_on_latency);
3334       GST_RTP_BIN_UNLOCK (rtpbin);
3335       break;
3336     case PROP_SDES:
3337       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
3338       break;
3339     case PROP_DO_LOST:
3340       GST_RTP_BIN_LOCK (rtpbin);
3341       g_value_set_boolean (value, rtpbin->do_lost);
3342       GST_RTP_BIN_UNLOCK (rtpbin);
3343       break;
3344     case PROP_IGNORE_PT:
3345       g_value_set_boolean (value, rtpbin->ignore_pt);
3346       break;
3347     case PROP_NTP_SYNC:
3348       g_value_set_boolean (value, rtpbin->ntp_sync);
3349       break;
3350     case PROP_RTCP_SYNC:
3351       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
3352       break;
3353     case PROP_RTCP_SYNC_INTERVAL:
3354       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
3355       break;
3356     case PROP_AUTOREMOVE:
3357       g_value_set_boolean (value, rtpbin->priv->autoremove);
3358       break;
3359     case PROP_BUFFER_MODE:
3360       g_value_set_enum (value, rtpbin->buffer_mode);
3361       break;
3362     case PROP_USE_PIPELINE_CLOCK:
3363       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
3364       break;
3365     case PROP_DO_SYNC_EVENT:
3366       g_value_set_boolean (value, rtpbin->send_sync_event);
3367       break;
3368     case PROP_DO_RETRANSMISSION:
3369       GST_RTP_BIN_LOCK (rtpbin);
3370       g_value_set_boolean (value, rtpbin->do_retransmission);
3371       GST_RTP_BIN_UNLOCK (rtpbin);
3372       break;
3373     case PROP_RTP_PROFILE:
3374       g_value_set_enum (value, rtpbin->rtp_profile);
3375       break;
3376     case PROP_NTP_TIME_SOURCE:
3377       g_value_set_enum (value, rtpbin->ntp_time_source);
3378       break;
3379     case PROP_RTCP_SYNC_SEND_TIME:
3380       g_value_set_boolean (value, rtpbin->rtcp_sync_send_time);
3381       break;
3382     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3383       GST_RTP_BIN_LOCK (rtpbin);
3384       g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff);
3385       GST_RTP_BIN_UNLOCK (rtpbin);
3386       break;
3387     case PROP_MAX_DROPOUT_TIME:
3388       g_value_set_uint (value, rtpbin->max_dropout_time);
3389       break;
3390     case PROP_MAX_MISORDER_TIME:
3391       g_value_set_uint (value, rtpbin->max_misorder_time);
3392       break;
3393     case PROP_RFC7273_SYNC:
3394       g_value_set_boolean (value, rtpbin->rfc7273_sync);
3395       break;
3396     case PROP_MAX_STREAMS:
3397       g_value_set_uint (value, rtpbin->max_streams);
3398       break;
3399     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3400       g_value_set_uint64 (value, rtpbin->max_ts_offset_adjustment);
3401       break;
3402     case PROP_MAX_TS_OFFSET:
3403       g_value_set_int64 (value, rtpbin->max_ts_offset);
3404       break;
3405     case PROP_MIN_TS_OFFSET:
3406       g_value_set_uint64 (value, rtpbin->min_ts_offset);
3407       break;
3408     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3409       g_value_set_uint (value, rtpbin->ts_offset_smoothing_factor);
3410       break;
3411     case PROP_FEC_DECODERS:
3412       g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin));
3413       break;
3414     case PROP_FEC_ENCODERS:
3415       g_value_take_boxed (value, gst_rtp_bin_get_fec_encoders_struct (rtpbin));
3416       break;
3417     default:
3418       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3419       break;
3420   }
3421 }
3422
3423 static void
3424 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
3425 {
3426   GstRtpBin *rtpbin;
3427
3428   rtpbin = GST_RTP_BIN (bin);
3429
3430   switch (GST_MESSAGE_TYPE (message)) {
3431     case GST_MESSAGE_ELEMENT:
3432     {
3433       const GstStructure *s = gst_message_get_structure (message);
3434
3435       /* we change the structure name and add the session ID to it */
3436       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
3437         GstRtpBinSession *sess;
3438
3439         /* find the session we set it as object data */
3440         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3441             "GstRTPBin.session");
3442
3443         if (G_LIKELY (sess)) {
3444           message = gst_message_make_writable (message);
3445           s = gst_message_get_structure (message);
3446           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
3447               sess->id, NULL);
3448         }
3449       }
3450       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3451       break;
3452     }
3453     case GST_MESSAGE_BUFFERING:
3454     {
3455       gint percent;
3456       gint min_percent = 100;
3457       GSList *sessions, *streams;
3458       GstRtpBinStream *stream;
3459       gboolean change = FALSE, active = FALSE;
3460       GstClockTime min_out_time;
3461       GstBufferingMode mode;
3462       gint avg_in, avg_out;
3463       gint64 buffering_left;
3464
3465       gst_message_parse_buffering (message, &percent);
3466       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
3467           &buffering_left);
3468
3469       stream =
3470           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3471           "GstRTPBin.stream");
3472
3473       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
3474
3475       /* get the stream */
3476       if (G_LIKELY (stream)) {
3477         GST_RTP_BIN_LOCK (rtpbin);
3478         /* fill in the percent */
3479         stream->percent = percent;
3480
3481         /* calculate the min value for all streams */
3482         for (sessions = rtpbin->sessions; sessions;
3483             sessions = g_slist_next (sessions)) {
3484           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3485
3486           GST_RTP_SESSION_LOCK (session);
3487           if (session->streams) {
3488             for (streams = session->streams; streams;
3489                 streams = g_slist_next (streams)) {
3490               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3491
3492               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
3493                   stream->percent);
3494
3495               /* find min percent */
3496               if (min_percent > stream->percent)
3497                 min_percent = stream->percent;
3498             }
3499           } else {
3500             GST_INFO_OBJECT (bin,
3501                 "session has no streams, setting min_percent to 0");
3502             min_percent = 0;
3503           }
3504           GST_RTP_SESSION_UNLOCK (session);
3505         }
3506         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
3507
3508         if (rtpbin->buffering) {
3509           if (min_percent == 100) {
3510             rtpbin->buffering = FALSE;
3511             active = TRUE;
3512             change = TRUE;
3513           }
3514         } else {
3515           if (min_percent < 100) {
3516             /* pause the streams */
3517             rtpbin->buffering = TRUE;
3518             active = FALSE;
3519             change = TRUE;
3520           }
3521         }
3522         GST_RTP_BIN_UNLOCK (rtpbin);
3523
3524         gst_message_unref (message);
3525
3526         /* make a new buffering message with the min value */
3527         message =
3528             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
3529         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
3530             buffering_left);
3531
3532         if (G_UNLIKELY (change)) {
3533           GstClock *clock;
3534           guint64 running_time = 0;
3535           guint64 offset = 0;
3536
3537           /* figure out the running time when we have a clock */
3538           if (G_LIKELY ((clock =
3539                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
3540             guint64 now, base_time;
3541
3542             now = gst_clock_get_time (clock);
3543             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
3544             running_time = now - base_time;
3545             gst_object_unref (clock);
3546           }
3547           GST_DEBUG_OBJECT (bin,
3548               "running time now %" GST_TIME_FORMAT,
3549               GST_TIME_ARGS (running_time));
3550
3551           GST_RTP_BIN_LOCK (rtpbin);
3552
3553           /* when we reactivate, calculate the offsets so that all streams have
3554            * an output time that is at least as big as the running_time */
3555           offset = 0;
3556           if (active) {
3557             if (running_time > rtpbin->buffer_start) {
3558               offset = running_time - rtpbin->buffer_start;
3559               if (offset >= rtpbin->latency_ns)
3560                 offset -= rtpbin->latency_ns;
3561               else
3562                 offset = 0;
3563             }
3564           }
3565
3566           /* pause all streams */
3567           min_out_time = -1;
3568           for (sessions = rtpbin->sessions; sessions;
3569               sessions = g_slist_next (sessions)) {
3570             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3571
3572             GST_RTP_SESSION_LOCK (session);
3573             for (streams = session->streams; streams;
3574                 streams = g_slist_next (streams)) {
3575               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3576               GstElement *element = stream->buffer;
3577               guint64 last_out = -1;
3578
3579               if (g_signal_lookup ("set-active", G_OBJECT_TYPE (element)) != 0) {
3580                 g_signal_emit_by_name (element, "set-active", active, offset,
3581                     &last_out);
3582               }
3583
3584               if (!active) {
3585                 g_object_get (element, "percent", &stream->percent, NULL);
3586
3587                 if (last_out == -1)
3588                   last_out = 0;
3589                 if (min_out_time == -1 || last_out < min_out_time)
3590                   min_out_time = last_out;
3591               }
3592
3593               GST_DEBUG_OBJECT (bin,
3594                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
3595                   GST_TIME_FORMAT ", percent %d", element, active,
3596                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
3597                   stream->percent);
3598             }
3599             GST_RTP_SESSION_UNLOCK (session);
3600           }
3601           GST_DEBUG_OBJECT (bin,
3602               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
3603
3604           /* the buffer_start is the min out time of all paused jitterbuffers */
3605           if (!active)
3606             rtpbin->buffer_start = min_out_time;
3607
3608           GST_RTP_BIN_UNLOCK (rtpbin);
3609         }
3610       }
3611       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3612       break;
3613     }
3614     default:
3615     {
3616       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3617       break;
3618     }
3619   }
3620 }
3621
3622 static GstStateChangeReturn
3623 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
3624 {
3625   GstStateChangeReturn res;
3626   GstRtpBin *rtpbin;
3627   GstRtpBinPrivate *priv;
3628
3629   rtpbin = GST_RTP_BIN (element);
3630   priv = rtpbin->priv;
3631
3632   switch (transition) {
3633     case GST_STATE_CHANGE_NULL_TO_READY:
3634       break;
3635     case GST_STATE_CHANGE_READY_TO_PAUSED:
3636       priv->last_ntpnstime = 0;
3637       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
3638       g_atomic_int_set (&priv->shutdown, 0);
3639       break;
3640     case GST_STATE_CHANGE_PAUSED_TO_READY:
3641       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
3642       g_atomic_int_set (&priv->shutdown, 1);
3643       /* wait for all callbacks to end by taking the lock. No new callbacks will
3644        * be able to happen as we set the shutdown flag. */
3645       GST_RTP_BIN_DYN_LOCK (rtpbin);
3646       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
3647       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3648       break;
3649     default:
3650       break;
3651   }
3652
3653   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3654
3655   switch (transition) {
3656     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3657       break;
3658     case GST_STATE_CHANGE_PAUSED_TO_READY:
3659       break;
3660     case GST_STATE_CHANGE_READY_TO_NULL:
3661       break;
3662     default:
3663       break;
3664   }
3665   return res;
3666 }
3667
3668 static GstElement *
3669 session_request_element_full (GstRtpBinSession * session, guint signal,
3670     guint ssrc, guint8 pt)
3671 {
3672   GstElement *element = NULL;
3673   GstRtpBin *bin = session->bin;
3674
3675   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, ssrc, pt,
3676       &element);
3677
3678   if (element) {
3679     if (!bin_manage_element (bin, element))
3680       goto manage_failed;
3681     session->elements = g_slist_prepend (session->elements, element);
3682   }
3683   return element;
3684
3685   /* ERRORS */
3686 manage_failed:
3687   {
3688     GST_WARNING_OBJECT (bin, "unable to manage element");
3689     gst_object_unref (element);
3690     return NULL;
3691   }
3692 }
3693
3694 static GstElement *
3695 session_request_element (GstRtpBinSession * session, guint signal)
3696 {
3697   GstElement *element = NULL;
3698   GstRtpBin *bin = session->bin;
3699
3700   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
3701
3702   if (element) {
3703     if (!bin_manage_element (bin, element))
3704       goto manage_failed;
3705     session->elements = g_slist_prepend (session->elements, element);
3706   }
3707   return element;
3708
3709   /* ERRORS */
3710 manage_failed:
3711   {
3712     GST_WARNING_OBJECT (bin, "unable to manage element");
3713     gst_object_unref (element);
3714     return NULL;
3715   }
3716 }
3717
3718 static gboolean
3719 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
3720 {
3721   GstPad *gpad = GST_PAD_CAST (user_data);
3722
3723   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
3724   gst_pad_store_sticky_event (gpad, *event);
3725
3726   return TRUE;
3727 }
3728
3729 static gboolean
3730 ensure_early_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
3731 {
3732   const gchar *factory;
3733   gchar *sess_id_str;
3734
3735   if (session->early_fec_decoder)
3736     goto done;
3737
3738   sess_id_str = g_strdup_printf ("%u", session->id);
3739   factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str);
3740   g_free (sess_id_str);
3741
3742   /* First try the property */
3743   if (factory) {
3744     GError *err = NULL;
3745
3746     session->early_fec_decoder =
3747         gst_parse_bin_from_description_full (factory, TRUE, NULL,
3748         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
3749         &err);
3750     if (!session->early_fec_decoder) {
3751       GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
3752           err->message);
3753     }
3754
3755     bin_manage_element (session->bin, session->early_fec_decoder);
3756     session->elements =
3757         g_slist_prepend (session->elements, session->early_fec_decoder);
3758     GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
3759         " for session %u", session->early_fec_decoder, session->id);
3760   }
3761
3762   /* Do not fallback to the signal as the signal expects a fec decoder to
3763    * be placed at a different place in the pipeline */
3764
3765 done:
3766   return session->early_fec_decoder != NULL;
3767 }
3768
3769 static void
3770 expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
3771     guint8 pt)
3772 {
3773   GstElementClass *klass;
3774   GstPadTemplate *templ;
3775   gchar *padname;
3776   GstPad *gpad;
3777
3778   gst_object_ref (pad);
3779
3780   if (stream->session->storage) {
3781     /* First try the legacy signal, with no ssrc and pt as parameters.
3782      * This will likely cause issues for the BUNDLE case. */
3783     GstElement *fec_decoder =
3784         session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
3785
3786     /* Now try the new signal, where the application can provide a FEC
3787      * decoder according to ssrc and pt. */
3788     if (!fec_decoder) {
3789       fec_decoder =
3790           session_request_element_full (stream->session,
3791           SIGNAL_REQUEST_FEC_DECODER_FULL, stream->ssrc, pt);
3792     }
3793
3794     if (fec_decoder) {
3795       GstPad *sinkpad, *srcpad;
3796       GstPadLinkReturn ret;
3797
3798       sinkpad = gst_element_get_static_pad (fec_decoder, "sink");
3799
3800       if (!sinkpad)
3801         goto fec_decoder_sink_failed;
3802
3803       ret = gst_pad_link (pad, sinkpad);
3804       gst_object_unref (sinkpad);
3805
3806       if (ret != GST_PAD_LINK_OK)
3807         goto fec_decoder_link_failed;
3808
3809       srcpad = gst_element_get_static_pad (fec_decoder, "src");
3810
3811       if (!srcpad)
3812         goto fec_decoder_src_failed;
3813
3814       gst_pad_sticky_events_foreach (pad, copy_sticky_events, srcpad);
3815       gst_object_unref (pad);
3816       pad = srcpad;
3817     }
3818   }
3819
3820   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3821
3822   /* ghost the pad to the parent */
3823   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3824   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3825   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3826       stream->session->id, stream->ssrc, pt);
3827   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3828   g_free (padname);
3829   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
3830
3831   gst_pad_set_active (gpad, TRUE);
3832   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3833
3834   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3835   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3836
3837 done:
3838   gst_object_unref (pad);
3839
3840   return;
3841
3842 shutdown:
3843   {
3844     GST_DEBUG ("ignoring, we are shutting down");
3845     goto done;
3846   }
3847 fec_decoder_sink_failed:
3848   {
3849     g_warning ("rtpbin: failed to get fec encoder sink pad for session %u",
3850         stream->session->id);
3851     goto done;
3852   }
3853 fec_decoder_src_failed:
3854   {
3855     g_warning ("rtpbin: failed to get fec encoder src pad for session %u",
3856         stream->session->id);
3857     goto done;
3858   }
3859 fec_decoder_link_failed:
3860   {
3861     g_warning ("rtpbin: failed to link fec decoder for session %u",
3862         stream->session->id);
3863     goto done;
3864   }
3865 }
3866
3867 /* a new pad (SSRC) was created in @session. This signal is emitted from the
3868  * payload demuxer. */
3869 static void
3870 new_payload_found (GstElement * element, guint pt, GstPad * pad,
3871     GstRtpBinStream * stream)
3872 {
3873   GstRtpBin *rtpbin;
3874
3875   rtpbin = stream->bin;
3876
3877   GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt);
3878
3879   expose_recv_src_pad (rtpbin, pad, stream, pt);
3880 }
3881
3882 static void
3883 payload_pad_removed (GstElement * element, GstPad * pad,
3884     GstRtpBinStream * stream)
3885 {
3886   GstRtpBin *rtpbin;
3887   GstPad *gpad;
3888
3889   rtpbin = stream->bin;
3890
3891   GST_DEBUG ("payload pad removed");
3892
3893   GST_RTP_BIN_DYN_LOCK (rtpbin);
3894   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
3895     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
3896
3897     gst_pad_set_active (gpad, FALSE);
3898     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3899   }
3900   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3901 }
3902
3903 static GstCaps *
3904 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
3905 {
3906   GstRtpBin *rtpbin;
3907   GstCaps *caps;
3908
3909   rtpbin = session->bin;
3910
3911   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt,
3912       session->id);
3913
3914   caps = get_pt_map (session, pt);
3915   if (!caps)
3916     goto no_caps;
3917
3918   return caps;
3919
3920   /* ERRORS */
3921 no_caps:
3922   {
3923     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
3924     return NULL;
3925   }
3926 }
3927
3928 static GstCaps *
3929 ptdemux_pt_map_requested (GstElement * element, guint pt,
3930     GstRtpBinSession * session)
3931 {
3932   GstCaps *ret = pt_map_requested (element, pt, session);
3933
3934   if (ret && gst_caps_get_size (ret) == 1) {
3935     const GstStructure *s = gst_caps_get_structure (ret, 0);
3936     gboolean is_fec;
3937
3938     if (gst_structure_get_boolean (s, "is-fec", &is_fec) && is_fec) {
3939       GValue v = G_VALUE_INIT;
3940       GValue v2 = G_VALUE_INIT;
3941
3942       GST_INFO_OBJECT (session->bin, "Will ignore FEC pt %u in session %u", pt,
3943           session->id);
3944       g_value_init (&v, GST_TYPE_ARRAY);
3945       g_value_init (&v2, G_TYPE_INT);
3946       g_object_get_property (G_OBJECT (element), "ignored-payload-types", &v);
3947       g_value_set_int (&v2, pt);
3948       gst_value_array_append_value (&v, &v2);
3949       g_value_unset (&v2);
3950       g_object_set_property (G_OBJECT (element), "ignored-payload-types", &v);
3951       g_value_unset (&v);
3952     }
3953   }
3954
3955   return ret;
3956 }
3957
3958 static void
3959 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
3960 {
3961   GST_DEBUG_OBJECT (session->bin,
3962       "emitting signal for pt type changed to %u in session %u", pt,
3963       session->id);
3964
3965   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
3966       0, session->id, pt);
3967 }
3968
3969 /* emitted when caps changed for the session */
3970 static void
3971 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
3972 {
3973   GstRtpBin *bin;
3974   GstCaps *caps;
3975   gint payload;
3976   const GstStructure *s;
3977
3978   bin = session->bin;
3979
3980   g_object_get (pad, "caps", &caps, NULL);
3981
3982   if (caps == NULL)
3983     return;
3984
3985   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
3986
3987   s = gst_caps_get_structure (caps, 0);
3988
3989   /* get payload, finish when it's not there */
3990   if (!gst_structure_get_int (s, "payload", &payload)) {
3991     gst_caps_unref (caps);
3992     return;
3993   }
3994
3995   GST_RTP_SESSION_LOCK (session);
3996   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
3997   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
3998   GST_RTP_SESSION_UNLOCK (session);
3999 }
4000
4001 /* a new pad (SSRC) was created in @session */
4002 static void
4003 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
4004     GstRtpBinSession * session)
4005 {
4006   GstRtpBin *rtpbin;
4007   GstRtpBinStream *stream;
4008   GstPad *sinkpad, *srcpad;
4009   gchar *padname;
4010
4011   rtpbin = session->bin;
4012
4013   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
4014       GST_DEBUG_PAD_NAME (pad));
4015
4016   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
4017
4018   GST_RTP_SESSION_LOCK (session);
4019
4020   /* create new stream */
4021   stream = create_stream (session, ssrc);
4022   if (!stream)
4023     goto no_stream;
4024
4025   /* get pad and link */
4026   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
4027   padname = g_strdup_printf ("src_%u", ssrc);
4028   srcpad = gst_element_get_static_pad (element, padname);
4029   g_free (padname);
4030
4031   if (session->early_fec_decoder) {
4032     GST_DEBUG_OBJECT (rtpbin, "linking fec decoder");
4033     sinkpad = gst_element_get_static_pad (session->early_fec_decoder, "sink");
4034     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4035     gst_object_unref (sinkpad);
4036     gst_object_unref (srcpad);
4037     srcpad = gst_element_get_static_pad (session->early_fec_decoder, "src");
4038   }
4039
4040   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
4041   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4042   gst_object_unref (sinkpad);
4043   gst_object_unref (srcpad);
4044
4045   sinkpad = gst_element_request_pad_simple (stream->buffer, "sink_rtcp");
4046   if (sinkpad) {
4047     GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
4048     padname = g_strdup_printf ("rtcp_src_%u", ssrc);
4049     srcpad = gst_element_get_static_pad (element, padname);
4050     g_free (padname);
4051     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4052     gst_object_unref (sinkpad);
4053     gst_object_unref (srcpad);
4054   }
4055
4056   if (g_signal_lookup ("handle-sync", G_OBJECT_TYPE (stream->buffer)) != 0) {
4057     /* connect to the RTCP sync signal from the jitterbuffer */
4058     GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
4059     stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
4060         "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
4061   }
4062
4063   if (stream->demux) {
4064     /* connect to the new-pad signal of the payload demuxer, this will expose the
4065      * new pad by ghosting it. */
4066     stream->demux_newpad_sig = g_signal_connect (stream->demux,
4067         "new-payload-type", (GCallback) new_payload_found, stream);
4068     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
4069         "pad-removed", (GCallback) payload_pad_removed, stream);
4070
4071     /* connect to the request-pt-map signal. This signal will be emitted by the
4072      * demuxer so that it can apply a proper caps on the buffers for the
4073      * depayloaders. */
4074     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
4075         "request-pt-map", (GCallback) ptdemux_pt_map_requested, session);
4076     /* connect to the  signal so it can be forwarded. */
4077     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
4078         "payload-type-change", (GCallback) payload_type_change, session);
4079
4080     GST_RTP_SESSION_UNLOCK (session);
4081     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4082   } else {
4083     /* add rtpjitterbuffer src pad to pads */
4084     GstPad *pad;
4085
4086     pad = gst_element_get_static_pad (stream->buffer, "src");
4087
4088     GST_RTP_SESSION_UNLOCK (session);
4089     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4090
4091     expose_recv_src_pad (rtpbin, pad, stream, 255);
4092
4093     gst_object_unref (pad);
4094   }
4095
4096   return;
4097
4098   /* ERRORS */
4099 shutdown:
4100   {
4101     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
4102     return;
4103   }
4104 no_stream:
4105   {
4106     GST_RTP_SESSION_UNLOCK (session);
4107     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4108     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
4109     return;
4110   }
4111 }
4112
4113 static GstPad *
4114 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session)
4115 {
4116   guint sessid = session->id;
4117   GstPad *recv_rtp_sink;
4118   GstElement *decoder;
4119
4120   g_assert (!session->recv_rtp_sink);
4121
4122   /* get recv_rtp pad and store */
4123   session->recv_rtp_sink =
4124       gst_element_request_pad_simple (session->session, "recv_rtp_sink");
4125   if (session->recv_rtp_sink == NULL)
4126     goto pad_failed;
4127
4128   g_signal_connect (session->recv_rtp_sink, "notify::caps",
4129       (GCallback) caps_changed, session);
4130
4131   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
4132   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
4133   if (decoder) {
4134     GstPad *decsrc, *decsink;
4135     GstPadLinkReturn ret;
4136
4137     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
4138     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
4139     if (decsink == NULL)
4140       goto dec_sink_failed;
4141
4142     recv_rtp_sink = decsink;
4143
4144     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
4145     if (decsrc == NULL)
4146       goto dec_src_failed;
4147
4148     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
4149
4150     gst_object_unref (decsrc);
4151
4152     if (ret != GST_PAD_LINK_OK)
4153       goto dec_link_failed;
4154
4155   } else {
4156     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
4157     recv_rtp_sink = gst_object_ref (session->recv_rtp_sink);
4158   }
4159
4160   return recv_rtp_sink;
4161
4162   /* ERRORS */
4163 pad_failed:
4164   {
4165     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
4166     return NULL;
4167   }
4168 dec_sink_failed:
4169   {
4170     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4171     return NULL;
4172   }
4173 dec_src_failed:
4174   {
4175     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4176     gst_object_unref (recv_rtp_sink);
4177     return NULL;
4178   }
4179 dec_link_failed:
4180   {
4181     g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid);
4182     gst_object_unref (recv_rtp_sink);
4183     return NULL;
4184   }
4185 }
4186
4187 static void
4188 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
4189     guint sessid)
4190 {
4191   GstElement *aux;
4192   GstPad *recv_rtp_src;
4193
4194   g_assert (!session->recv_rtp_src);
4195
4196   session->recv_rtp_src =
4197       gst_element_get_static_pad (session->session, "recv_rtp_src");
4198   if (session->recv_rtp_src == NULL)
4199     goto pad_failed;
4200
4201   /* find out if we need AUX elements */
4202   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
4203   if (aux) {
4204     gchar *pname;
4205     GstPad *auxsink;
4206     GstPadLinkReturn ret;
4207
4208     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
4209
4210     pname = g_strdup_printf ("sink_%u", sessid);
4211     auxsink = gst_element_get_static_pad (aux, pname);
4212     g_free (pname);
4213     if (auxsink == NULL)
4214       goto aux_sink_failed;
4215
4216     ret = gst_pad_link (session->recv_rtp_src, auxsink);
4217     gst_object_unref (auxsink);
4218     if (ret != GST_PAD_LINK_OK)
4219       goto aux_link_failed;
4220
4221     /* this can be NULL when this AUX element is not to be linked any further */
4222     pname = g_strdup_printf ("src_%u", sessid);
4223     recv_rtp_src = gst_element_get_static_pad (aux, pname);
4224     g_free (pname);
4225   } else {
4226     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
4227   }
4228
4229   /* Add a storage element if needed */
4230   if (recv_rtp_src && session->storage) {
4231     GstPadLinkReturn ret;
4232     GstPad *sinkpad = gst_element_get_static_pad (session->storage, "sink");
4233
4234     ret = gst_pad_link (recv_rtp_src, sinkpad);
4235
4236     gst_object_unref (sinkpad);
4237     gst_object_unref (recv_rtp_src);
4238
4239     if (ret != GST_PAD_LINK_OK)
4240       goto storage_link_failed;
4241
4242     recv_rtp_src = gst_element_get_static_pad (session->storage, "src");
4243   }
4244
4245   if (recv_rtp_src) {
4246     GstPad *sinkdpad;
4247
4248     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
4249     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
4250     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
4251     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4252     gst_object_unref (sinkdpad);
4253     gst_object_unref (recv_rtp_src);
4254
4255     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
4256     session->demux_newpad_sig = g_signal_connect (session->demux,
4257         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
4258     session->demux_padremoved_sig = g_signal_connect (session->demux,
4259         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
4260   }
4261
4262   return;
4263
4264 pad_failed:
4265   {
4266     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
4267     return;
4268   }
4269 aux_sink_failed:
4270   {
4271     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4272     return;
4273   }
4274 aux_link_failed:
4275   {
4276     g_warning ("rtpbin: failed to link AUX pad to session %u", sessid);
4277     return;
4278   }
4279 storage_link_failed:
4280   {
4281     g_warning ("rtpbin: failed to link storage");
4282     return;
4283   }
4284 }
4285
4286 /* Create a pad for receiving RTP for the session in @name. Must be called with
4287  * RTP_BIN_LOCK.
4288  */
4289 static GstPad *
4290 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4291 {
4292   guint sessid;
4293   GstRtpBinSession *session;
4294   GstPad *recv_rtp_sink;
4295
4296   /* first get the session number */
4297   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
4298     goto no_name;
4299
4300   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4301
4302   /* get or create session */
4303   session = find_session_by_id (rtpbin, sessid);
4304   if (!session) {
4305     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4306     /* create session now */
4307     session = create_session (rtpbin, sessid);
4308     if (session == NULL)
4309       goto create_error;
4310   }
4311
4312   /* check if pad was requested */
4313   if (session->recv_rtp_sink_ghost != NULL)
4314     return session->recv_rtp_sink_ghost;
4315
4316   /* setup the session sink pad */
4317   recv_rtp_sink = complete_session_sink (rtpbin, session);
4318   if (!recv_rtp_sink)
4319     goto session_sink_failed;
4320
4321   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
4322   session->recv_rtp_sink_ghost =
4323       gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ);
4324   gst_object_unref (recv_rtp_sink);
4325   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
4326   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
4327
4328   complete_session_receiver (rtpbin, session, sessid);
4329
4330   return session->recv_rtp_sink_ghost;
4331
4332   /* ERRORS */
4333 no_name:
4334   {
4335     g_warning ("rtpbin: cannot find session id for pad: %s",
4336         GST_STR_NULL (name));
4337     return NULL;
4338   }
4339 create_error:
4340   {
4341     /* create_session already warned */
4342     return NULL;
4343   }
4344 session_sink_failed:
4345   {
4346     /* warning already done */
4347     return NULL;
4348   }
4349 }
4350
4351 static void
4352 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4353 {
4354   if (session->demux_newpad_sig) {
4355     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
4356     session->demux_newpad_sig = 0;
4357   }
4358   if (session->demux_padremoved_sig) {
4359     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
4360     session->demux_padremoved_sig = 0;
4361   }
4362   if (session->recv_rtp_src) {
4363     gst_object_unref (session->recv_rtp_src);
4364     session->recv_rtp_src = NULL;
4365   }
4366   if (session->recv_rtp_sink) {
4367     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
4368     gst_object_unref (session->recv_rtp_sink);
4369     session->recv_rtp_sink = NULL;
4370   }
4371   if (session->recv_rtp_sink_ghost) {
4372     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
4373     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4374         session->recv_rtp_sink_ghost);
4375     session->recv_rtp_sink_ghost = NULL;
4376   }
4377 }
4378
4379 static gint
4380 fec_sinkpad_find (const GValue * item, gchar * padname)
4381 {
4382   GstPad *pad = g_value_get_object (item);
4383   return g_strcmp0 (GST_PAD_NAME (pad), padname);
4384 }
4385
4386 static GstPad *
4387 complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
4388     guint fec_idx)
4389 {
4390   gboolean have_static_pad;
4391   gchar *padname;
4392
4393   GstPad *ret;
4394   GstIterator *it;
4395   GValue item = { 0, };
4396
4397   if (!ensure_early_fec_decoder (rtpbin, session))
4398     goto no_decoder;
4399
4400   padname = g_strdup_printf ("fec_%u", fec_idx);
4401
4402   GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad %s", padname);
4403
4404   /* First try to find the decoder static pad that matches the padname */
4405   it = gst_element_iterate_sink_pads (session->early_fec_decoder);
4406   have_static_pad =
4407       gst_iterator_find_custom (it, (GCompareFunc) fec_sinkpad_find, &item,
4408       padname);
4409
4410   if (have_static_pad) {
4411     ret = g_value_get_object (&item);
4412     gst_object_ref (ret);
4413     g_value_unset (&item);
4414   } else {
4415     ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
4416   }
4417
4418   g_free (padname);
4419   gst_iterator_free (it);
4420
4421   if (ret == NULL)
4422     goto pad_failed;
4423
4424   session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret);
4425
4426   return ret;
4427
4428 pad_failed:
4429   {
4430     g_warning ("rtpbin: failed to get decoder fec pad");
4431     return NULL;
4432   }
4433 no_decoder:
4434   {
4435     g_warning ("rtpbin: failed to build FEC decoder for session %u",
4436         session->id);
4437     return NULL;
4438   }
4439 }
4440
4441 static GstPad *
4442 complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
4443     guint sessid)
4444 {
4445   GstElement *decoder;
4446   GstPad *sinkdpad;
4447   GstPad *decsink = NULL;
4448
4449   /* get recv_rtp pad and store */
4450   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
4451   session->recv_rtcp_sink =
4452       gst_element_request_pad_simple (session->session, "recv_rtcp_sink");
4453   if (session->recv_rtcp_sink == NULL)
4454     goto pad_failed;
4455
4456   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
4457   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
4458   if (decoder) {
4459     GstPad *decsrc;
4460     GstPadLinkReturn ret;
4461
4462     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
4463     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
4464     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
4465
4466     if (decsink == NULL)
4467       goto dec_sink_failed;
4468
4469     if (decsrc == NULL)
4470       goto dec_src_failed;
4471
4472     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
4473
4474     gst_object_unref (decsrc);
4475
4476     if (ret != GST_PAD_LINK_OK)
4477       goto dec_link_failed;
4478   } else {
4479     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
4480     decsink = gst_object_ref (session->recv_rtcp_sink);
4481   }
4482
4483   /* get srcpad, link to SSRCDemux */
4484   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
4485   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
4486   if (session->sync_src == NULL)
4487     goto src_pad_failed;
4488
4489   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
4490   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
4491   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4492   gst_object_unref (sinkdpad);
4493
4494   return decsink;
4495
4496 pad_failed:
4497   {
4498     g_warning ("rtpbin: failed to get session rtcp_sink pad");
4499     return NULL;
4500   }
4501 dec_sink_failed:
4502   {
4503     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4504     return NULL;
4505   }
4506 dec_src_failed:
4507   {
4508     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4509     goto cleanup;
4510   }
4511 dec_link_failed:
4512   {
4513     g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid);
4514     goto cleanup;
4515   }
4516 src_pad_failed:
4517   {
4518     g_warning ("rtpbin: failed to get session sync_src pad");
4519   }
4520
4521 cleanup:
4522   gst_object_unref (decsink);
4523   return NULL;
4524 }
4525
4526 /* Create a pad for receiving RTCP for the session in @name. Must be called with
4527  * RTP_BIN_LOCK.
4528  */
4529 static GstPad *
4530 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
4531     const gchar * name)
4532 {
4533   guint sessid;
4534   GstRtpBinSession *session;
4535   GstPad *decsink = NULL;
4536
4537   /* first get the session number */
4538   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
4539     goto no_name;
4540
4541   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4542
4543   /* get or create the session */
4544   session = find_session_by_id (rtpbin, sessid);
4545   if (!session) {
4546     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4547     /* create session now */
4548     session = create_session (rtpbin, sessid);
4549     if (session == NULL)
4550       goto create_error;
4551   }
4552
4553   /* check if pad was requested */
4554   if (session->recv_rtcp_sink_ghost != NULL)
4555     return session->recv_rtcp_sink_ghost;
4556
4557   decsink = complete_session_rtcp (rtpbin, session, sessid);
4558   if (!decsink)
4559     goto create_error;
4560
4561   session->recv_rtcp_sink_ghost =
4562       gst_ghost_pad_new_from_template (name, decsink, templ);
4563   gst_object_unref (decsink);
4564   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
4565   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
4566       session->recv_rtcp_sink_ghost);
4567
4568   return session->recv_rtcp_sink_ghost;
4569
4570   /* ERRORS */
4571 no_name:
4572   {
4573     g_warning ("rtpbin: cannot find session id for pad: %s",
4574         GST_STR_NULL (name));
4575     return NULL;
4576   }
4577 create_error:
4578   {
4579     /* create_session already warned */
4580     return NULL;
4581   }
4582 }
4583
4584 static GstPad *
4585 create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4586 {
4587   guint sessid, fec_idx;
4588   GstRtpBinSession *session;
4589   GstPad *decsink = NULL;
4590   GstPad *ghost;
4591
4592   /* first get the session number */
4593   if (name == NULL
4594       || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2)
4595     goto no_name;
4596
4597   if (fec_idx > 1)
4598     goto invalid_idx;
4599
4600   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4601
4602   /* get or create the session */
4603   session = find_session_by_id (rtpbin, sessid);
4604   if (!session) {
4605     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4606     /* create session now */
4607     session = create_session (rtpbin, sessid);
4608     if (session == NULL)
4609       goto create_error;
4610   }
4611
4612   decsink = complete_session_fec (rtpbin, session, fec_idx);
4613   if (!decsink)
4614     goto create_error;
4615
4616   ghost = gst_ghost_pad_new_from_template (name, decsink, templ);
4617   session->recv_fec_sink_ghosts =
4618       g_slist_prepend (session->recv_fec_sink_ghosts, ghost);
4619   gst_object_unref (decsink);
4620   gst_pad_set_active (ghost, TRUE);
4621   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4622
4623   return ghost;
4624
4625   /* ERRORS */
4626 no_name:
4627   {
4628     g_warning ("rtpbin: cannot find session id for pad: %s",
4629         GST_STR_NULL (name));
4630     return NULL;
4631   }
4632 invalid_idx:
4633   {
4634     g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name));
4635     return NULL;
4636   }
4637 create_error:
4638   {
4639     /* create_session already warned */
4640     return NULL;
4641   }
4642 }
4643
4644 static void
4645 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4646 {
4647   if (session->recv_rtcp_sink_ghost) {
4648     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
4649     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4650         session->recv_rtcp_sink_ghost);
4651     session->recv_rtcp_sink_ghost = NULL;
4652   }
4653   if (session->sync_src) {
4654     /* releasing the request pad should also unref the sync pad */
4655     gst_object_unref (session->sync_src);
4656     session->sync_src = NULL;
4657   }
4658   if (session->recv_rtcp_sink) {
4659     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
4660     gst_object_unref (session->recv_rtcp_sink);
4661     session->recv_rtcp_sink = NULL;
4662   }
4663 }
4664
4665 static void
4666 remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
4667     GstPad * ghost)
4668 {
4669   GSList *item;
4670   GstPad *target;
4671
4672   target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
4673
4674   if (target) {
4675     item = g_slist_find (session->recv_fec_sinks, target);
4676     if (item) {
4677       GstPadTemplate *templ;
4678       GstPad *pad;
4679
4680       pad = item->data;
4681       templ = gst_pad_get_pad_template (pad);
4682
4683       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_REQUEST) {
4684         GST_DEBUG_OBJECT (rtpbin,
4685             "Releasing FEC decoder pad %" GST_PTR_FORMAT, pad);
4686         gst_element_release_request_pad (session->early_fec_decoder, pad);
4687       } else {
4688         gst_object_unref (pad);
4689       }
4690
4691       session->recv_fec_sinks =
4692           g_slist_delete_link (session->recv_fec_sinks, item);
4693
4694       gst_object_unref (templ);
4695     }
4696     gst_object_unref (target);
4697   }
4698
4699   item = g_slist_find (session->recv_fec_sink_ghosts, ghost);
4700   if (item)
4701     session->recv_fec_sink_ghosts =
4702         g_slist_delete_link (session->recv_fec_sink_ghosts, item);
4703
4704   gst_pad_set_active (ghost, FALSE);
4705   gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4706 }
4707
4708 static void
4709 remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
4710 {
4711   GSList *copy;
4712   GSList *tmp;
4713
4714   copy = g_slist_copy (session->recv_fec_sink_ghosts);
4715
4716   for (tmp = copy; tmp; tmp = tmp->next) {
4717     remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data);
4718   }
4719
4720   g_slist_free (copy);
4721 }
4722
4723 static gboolean
4724 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
4725 {
4726   gchar *gname;
4727   guint sessid = session->id;
4728   GstPad *send_rtp_src;
4729   GstElement *encoder;
4730   GstElementClass *klass;
4731   GstPadTemplate *templ;
4732   gboolean ret = FALSE;
4733
4734   /* get srcpad */
4735   send_rtp_src = gst_element_get_static_pad (session->session, "send_rtp_src");
4736
4737   if (send_rtp_src == NULL)
4738     goto no_srcpad;
4739
4740   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
4741   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
4742   if (encoder) {
4743     gchar *ename;
4744     GstPad *encsrc, *encsink;
4745     GstPadLinkReturn ret;
4746
4747     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
4748     ename = g_strdup_printf ("rtp_src_%u", sessid);
4749     encsrc = gst_element_get_static_pad (encoder, ename);
4750     g_free (ename);
4751
4752     if (encsrc == NULL)
4753       goto enc_src_failed;
4754
4755     ename = g_strdup_printf ("rtp_sink_%u", sessid);
4756     encsink = gst_element_get_static_pad (encoder, ename);
4757     g_free (ename);
4758     if (encsink == NULL)
4759       goto enc_sink_failed;
4760
4761     ret = gst_pad_link (send_rtp_src, encsink);
4762     gst_object_unref (encsink);
4763     gst_object_unref (send_rtp_src);
4764
4765     send_rtp_src = encsrc;
4766
4767     if (ret != GST_PAD_LINK_OK)
4768       goto enc_link_failed;
4769   } else {
4770     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
4771   }
4772
4773   /* ghost the new source pad */
4774   klass = GST_ELEMENT_GET_CLASS (rtpbin);
4775   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
4776   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
4777   session->send_rtp_src_ghost =
4778       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
4779   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
4780   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
4781       session->send_rtp_src_ghost);
4782   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
4783   g_free (gname);
4784
4785   ret = TRUE;
4786
4787 done:
4788   if (send_rtp_src)
4789     gst_object_unref (send_rtp_src);
4790
4791   return ret;
4792
4793   /* ERRORS */
4794 no_srcpad:
4795   {
4796     g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid);
4797     goto done;
4798   }
4799 enc_src_failed:
4800   {
4801     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4802         " src pad for session %u", encoder, sessid);
4803     goto done;
4804   }
4805 enc_sink_failed:
4806   {
4807     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4808         " sink pad for session %u", encoder, sessid);
4809     goto done;
4810   }
4811 enc_link_failed:
4812   {
4813     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
4814         encoder, sessid);
4815     goto done;
4816   }
4817 }
4818
4819 static gboolean
4820 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
4821 {
4822   GstPad *pad;
4823   gchar *name;
4824   guint sessid;
4825   GstRtpBinSession *session = user_data, *newsess;
4826   GstRtpBin *rtpbin = session->bin;
4827   GstPadLinkReturn ret;
4828
4829   pad = g_value_get_object (item);
4830   name = gst_pad_get_name (pad);
4831
4832   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
4833     goto no_name;
4834
4835   g_free (name);
4836
4837   newsess = find_session_by_id (rtpbin, sessid);
4838   if (newsess == NULL) {
4839     /* create new session */
4840     newsess = create_session (rtpbin, sessid);
4841     if (newsess == NULL)
4842       goto create_error;
4843   } else if (newsess->send_rtp_sink != NULL)
4844     goto existing_session;
4845
4846   /* get send_rtp pad and store */
4847   newsess->send_rtp_sink =
4848       gst_element_request_pad_simple (newsess->session, "send_rtp_sink");
4849   if (newsess->send_rtp_sink == NULL)
4850     goto pad_failed;
4851
4852   ret = gst_pad_link (pad, newsess->send_rtp_sink);
4853   if (ret != GST_PAD_LINK_OK)
4854     goto aux_link_failed;
4855
4856   if (!complete_session_src (rtpbin, newsess))
4857     goto session_src_failed;
4858
4859   return TRUE;
4860
4861   /* ERRORS */
4862 no_name:
4863   {
4864     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
4865     g_free (name);
4866     return TRUE;
4867   }
4868 create_error:
4869   {
4870     /* create_session already warned */
4871     return FALSE;
4872   }
4873 existing_session:
4874   {
4875     GST_DEBUG_OBJECT (rtpbin,
4876         "skipping src_%i setup, since it is already configured.", sessid);
4877     return TRUE;
4878   }
4879 pad_failed:
4880   {
4881     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4882     return FALSE;
4883   }
4884 aux_link_failed:
4885   {
4886     g_warning ("rtpbin: failed to link AUX for session %u", sessid);
4887     return FALSE;
4888   }
4889 session_src_failed:
4890   {
4891     g_warning ("rtpbin: failed to complete AUX for session %u", sessid);
4892     return FALSE;
4893   }
4894 }
4895
4896 static gboolean
4897 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
4898     GstElement * aux)
4899 {
4900   GstIterator *it;
4901   GValue result = { 0, };
4902   GstIteratorResult res;
4903
4904   it = gst_element_iterate_src_pads (aux);
4905   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
4906   gst_iterator_free (it);
4907
4908   return res == GST_ITERATOR_DONE;
4909 }
4910
4911 static void
4912 fec_encoder_add_pad_unlocked (GstPad * pad, GstRtpBinSession * session)
4913 {
4914   GstElementClass *klass;
4915   gchar *gname;
4916   GstPadTemplate *templ;
4917   guint fec_idx;
4918   GstPad *ghost;
4919
4920   if (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) != 1) {
4921     GST_WARNING_OBJECT (session->bin,
4922         "FEC encoder added pad with name not matching fec_%%u (%s)",
4923         GST_PAD_NAME (pad));
4924     goto done;
4925   }
4926
4927   GST_INFO_OBJECT (session->bin, "FEC encoder for session %u exposed new pad",
4928       session->id);
4929
4930   klass = GST_ELEMENT_GET_CLASS (session->bin);
4931   gname = g_strdup_printf ("send_fec_src_%u_%u", session->id, fec_idx);
4932   templ = gst_element_class_get_pad_template (klass, "send_fec_src_%u_%u");
4933   ghost = gst_ghost_pad_new_from_template (gname, pad, templ);
4934   session->send_fec_src_ghosts =
4935       g_slist_prepend (session->send_fec_src_ghosts, ghost);
4936   gst_pad_set_active (ghost, TRUE);
4937   gst_pad_sticky_events_foreach (pad, copy_sticky_events, ghost);
4938   gst_element_add_pad (GST_ELEMENT (session->bin), ghost);
4939   g_free (gname);
4940
4941 done:
4942   return;
4943 }
4944
4945 static void
4946 fec_encoder_add_pad (GstPad * pad, GstRtpBinSession * session)
4947 {
4948   GST_RTP_BIN_LOCK (session->bin);
4949   fec_encoder_add_pad_unlocked (pad, session);
4950   GST_RTP_BIN_UNLOCK (session->bin);
4951 }
4952
4953 static gint
4954 fec_srcpad_iterator_filter (const GValue * item, GValue * unused)
4955 {
4956   guint fec_idx;
4957   GstPad *pad = g_value_get_object (item);
4958   GstPadTemplate *templ = gst_pad_get_pad_template (pad);
4959
4960   gint have_static_pad =
4961       (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_ALWAYS) &&
4962       (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) == 1);
4963
4964   gst_object_unref (templ);
4965
4966   /* return 0 to retain pad in filtered iterator */
4967   return !have_static_pad;
4968 }
4969
4970 static void
4971 fec_srcpad_iterator_foreach (const GValue * item, GstRtpBinSession * session)
4972 {
4973   GstPad *pad = g_value_get_object (item);
4974   fec_encoder_add_pad_unlocked (pad, session);
4975 }
4976
4977 static void
4978 fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
4979     GstRtpBinSession * session)
4980 {
4981   fec_encoder_add_pad (pad, session);
4982 }
4983
4984 static GstElement *
4985 request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session,
4986     guint sessid)
4987 {
4988   GstElement *ret = NULL;
4989   const gchar *factory;
4990   gchar *sess_id_str;
4991
4992   sess_id_str = g_strdup_printf ("%u", sessid);
4993   factory = gst_structure_get_string (rtpbin->fec_encoders, sess_id_str);
4994   g_free (sess_id_str);
4995
4996   /* First try the property */
4997   if (factory) {
4998     GError *err = NULL;
4999
5000     ret =
5001         gst_parse_bin_from_description_full (factory, TRUE, NULL,
5002         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
5003         &err);
5004     if (!ret) {
5005       GST_ERROR_OBJECT (rtpbin, "Failed to build encoder from factory: %s",
5006           err->message);
5007       goto done;
5008     }
5009
5010     bin_manage_element (session->bin, ret);
5011     session->elements = g_slist_prepend (session->elements, ret);
5012     GST_INFO_OBJECT (rtpbin, "Built FEC encoder: %" GST_PTR_FORMAT
5013         " for session %u", ret, sessid);
5014   }
5015
5016   /* Fallback to the signal */
5017   if (!ret)
5018     ret = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER);
5019
5020   if (ret) {
5021     /* First, add encoder pads that match fec_% template and are already present */
5022     GstIterator *it, *filter;
5023     GstIteratorResult it_ret = GST_ITERATOR_OK;
5024
5025     it = gst_element_iterate_src_pads (ret);
5026     filter =
5027         gst_iterator_filter (it, (GCompareFunc) fec_srcpad_iterator_filter,
5028         NULL);
5029
5030     while (it_ret == GST_ITERATOR_OK || it_ret == GST_ITERATOR_RESYNC) {
5031       it_ret =
5032           gst_iterator_foreach (filter,
5033           (GstIteratorForeachFunction) fec_srcpad_iterator_foreach, session);
5034
5035       if (it_ret == GST_ITERATOR_RESYNC)
5036         gst_iterator_resync (filter);
5037     }
5038
5039     gst_iterator_free (filter);
5040
5041     /* Finally, connect to pad-added signal if any of the encoder pads are
5042      * added later */
5043     g_signal_connect (ret, "pad-added", G_CALLBACK (fec_encoder_pad_added_cb),
5044         session);
5045   }
5046
5047 done:
5048   return ret;
5049 }
5050
5051 /* Create a pad for sending RTP for the session in @name. Must be called with
5052  * RTP_BIN_LOCK.
5053  */
5054 static GstPad *
5055 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
5056 {
5057   gchar *pname;
5058   guint sessid;
5059   GstPad *send_rtp_sink;
5060   GstElement *aux;
5061   GstElement *encoder;
5062   GstElement *prev = NULL;
5063   GstRtpBinSession *session;
5064
5065   /* first get the session number */
5066   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
5067     goto no_name;
5068
5069   /* get or create session */
5070   session = find_session_by_id (rtpbin, sessid);
5071   if (!session) {
5072     /* create session now */
5073     session = create_session (rtpbin, sessid);
5074     if (session == NULL)
5075       goto create_error;
5076   }
5077
5078   /* check if pad was requested */
5079   if (session->send_rtp_sink_ghost != NULL)
5080     return session->send_rtp_sink_ghost;
5081
5082   /* check if we are already using this session as a sender */
5083   if (session->send_rtp_sink != NULL)
5084     goto existing_session;
5085
5086   encoder = request_fec_encoder (rtpbin, session, sessid);
5087
5088   if (encoder) {
5089     GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder");
5090
5091     send_rtp_sink = gst_element_get_static_pad (encoder, "sink");
5092
5093     if (!send_rtp_sink)
5094       goto enc_sink_failed;
5095
5096     prev = encoder;
5097   }
5098
5099   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
5100   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
5101   if (aux) {
5102     GstPad *sinkpad;
5103     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
5104     if (!setup_aux_sender (rtpbin, session, aux))
5105       goto aux_session_failed;
5106
5107     pname = g_strdup_printf ("sink_%u", sessid);
5108     sinkpad = gst_element_get_static_pad (aux, pname);
5109     g_free (pname);
5110
5111     if (sinkpad == NULL)
5112       goto aux_sink_failed;
5113
5114     if (!prev) {
5115       send_rtp_sink = sinkpad;
5116     } else {
5117       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5118       GstPadLinkReturn ret;
5119
5120       ret = gst_pad_link (srcpad, sinkpad);
5121       gst_object_unref (srcpad);
5122       if (ret != GST_PAD_LINK_OK) {
5123         goto aux_link_failed;
5124       }
5125       gst_object_unref (sinkpad);
5126     }
5127     prev = aux;
5128   } else {
5129     /* get send_rtp pad and store */
5130     session->send_rtp_sink =
5131         gst_element_request_pad_simple (session->session, "send_rtp_sink");
5132     if (session->send_rtp_sink == NULL)
5133       goto pad_failed;
5134
5135     if (!complete_session_src (rtpbin, session))
5136       goto session_src_failed;
5137
5138     if (!prev) {
5139       send_rtp_sink = gst_object_ref (session->send_rtp_sink);
5140     } else {
5141       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5142       GstPadLinkReturn ret;
5143
5144       ret = gst_pad_link (srcpad, session->send_rtp_sink);
5145       gst_object_unref (srcpad);
5146       if (ret != GST_PAD_LINK_OK)
5147         goto session_link_failed;
5148     }
5149   }
5150
5151   session->send_rtp_sink_ghost =
5152       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
5153   gst_object_unref (send_rtp_sink);
5154   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
5155   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
5156
5157   return session->send_rtp_sink_ghost;
5158
5159   /* ERRORS */
5160 no_name:
5161   {
5162     g_warning ("rtpbin: cannot find session id for pad: %s",
5163         GST_STR_NULL (name));
5164     return NULL;
5165   }
5166 create_error:
5167   {
5168     /* create_session already warned */
5169     return NULL;
5170   }
5171 existing_session:
5172   {
5173     g_warning ("rtpbin: session %u is already in use", sessid);
5174     return NULL;
5175   }
5176 aux_session_failed:
5177   {
5178     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5179     return NULL;
5180   }
5181 aux_sink_failed:
5182   {
5183     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5184     return NULL;
5185   }
5186 aux_link_failed:
5187   {
5188     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5189         aux, sessid);
5190     return NULL;
5191   }
5192 pad_failed:
5193   {
5194     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
5195     return NULL;
5196   }
5197 session_src_failed:
5198   {
5199     g_warning ("rtpbin: failed to setup source pads for session %u", sessid);
5200     return NULL;
5201   }
5202 session_link_failed:
5203   {
5204     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5205         session, sessid);
5206     return NULL;
5207   }
5208 enc_sink_failed:
5209   {
5210     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
5211         " sink pad for session %u", encoder, sessid);
5212     return NULL;
5213   }
5214 }
5215
5216 static void
5217 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5218 {
5219   if (session->send_rtp_src_ghost) {
5220     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
5221     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5222         session->send_rtp_src_ghost);
5223     session->send_rtp_src_ghost = NULL;
5224   }
5225   if (session->send_rtp_sink) {
5226     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
5227         session->send_rtp_sink);
5228     gst_object_unref (session->send_rtp_sink);
5229     session->send_rtp_sink = NULL;
5230   }
5231   if (session->send_rtp_sink_ghost) {
5232     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
5233     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5234         session->send_rtp_sink_ghost);
5235     session->send_rtp_sink_ghost = NULL;
5236   }
5237 }
5238
5239 static void
5240 remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
5241 {
5242   GSList *tmp;
5243
5244   for (tmp = session->send_fec_src_ghosts; tmp; tmp = tmp->next) {
5245     GstPad *ghost = GST_PAD (tmp->data);
5246     gst_pad_set_active (ghost, FALSE);
5247     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
5248   }
5249
5250   g_slist_free (session->send_fec_src_ghosts);
5251   session->send_fec_src_ghosts = NULL;
5252 }
5253
5254 /* Create a pad for sending RTCP for the session in @name. Must be called with
5255  * RTP_BIN_LOCK.
5256  */
5257 static GstPad *
5258 create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
5259     const gchar * name)
5260 {
5261   guint sessid;
5262   GstPad *encsrc;
5263   GstElement *encoder;
5264   GstRtpBinSession *session;
5265
5266   /* first get the session number */
5267   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
5268     goto no_name;
5269
5270   /* get or create session */
5271   session = find_session_by_id (rtpbin, sessid);
5272   if (!session) {
5273     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
5274     /* create session now */
5275     session = create_session (rtpbin, sessid);
5276     if (session == NULL)
5277       goto create_error;
5278   }
5279
5280   /* check if pad was requested */
5281   if (session->send_rtcp_src_ghost != NULL)
5282     return session->send_rtcp_src_ghost;
5283
5284   /* get rtcp_src pad and store */
5285   session->send_rtcp_src =
5286       gst_element_request_pad_simple (session->session, "send_rtcp_src");
5287   if (session->send_rtcp_src == NULL)
5288     goto pad_failed;
5289
5290   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
5291   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
5292   if (encoder) {
5293     gchar *ename;
5294     GstPad *encsink;
5295     GstPadLinkReturn ret;
5296
5297     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
5298
5299     ename = g_strdup_printf ("rtcp_src_%u", sessid);
5300     encsrc = gst_element_get_static_pad (encoder, ename);
5301     g_free (ename);
5302     if (encsrc == NULL)
5303       goto enc_src_failed;
5304
5305     ename = g_strdup_printf ("rtcp_sink_%u", sessid);
5306     encsink = gst_element_get_static_pad (encoder, ename);
5307     g_free (ename);
5308     if (encsink == NULL)
5309       goto enc_sink_failed;
5310
5311     ret = gst_pad_link (session->send_rtcp_src, encsink);
5312     gst_object_unref (encsink);
5313
5314     if (ret != GST_PAD_LINK_OK)
5315       goto enc_link_failed;
5316   } else {
5317     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
5318     encsrc = gst_object_ref (session->send_rtcp_src);
5319   }
5320
5321   session->send_rtcp_src_ghost =
5322       gst_ghost_pad_new_from_template (name, encsrc, templ);
5323   gst_object_unref (encsrc);
5324   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
5325   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
5326
5327   return session->send_rtcp_src_ghost;
5328
5329   /* ERRORS */
5330 no_name:
5331   {
5332     g_warning ("rtpbin: cannot find session id for pad: %s",
5333         GST_STR_NULL (name));
5334     return NULL;
5335   }
5336 create_error:
5337   {
5338     /* create_session already warned */
5339     return NULL;
5340   }
5341 pad_failed:
5342   {
5343     g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid);
5344     return NULL;
5345   }
5346 enc_src_failed:
5347   {
5348     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
5349     return NULL;
5350   }
5351 enc_sink_failed:
5352   {
5353     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
5354     gst_object_unref (encsrc);
5355     return NULL;
5356   }
5357 enc_link_failed:
5358   {
5359     g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid);
5360     gst_object_unref (encsrc);
5361     return NULL;
5362   }
5363 }
5364
5365 static void
5366 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5367 {
5368   if (session->send_rtcp_src_ghost) {
5369     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
5370     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5371         session->send_rtcp_src_ghost);
5372     session->send_rtcp_src_ghost = NULL;
5373   }
5374   if (session->send_rtcp_src) {
5375     gst_element_release_request_pad (session->session, session->send_rtcp_src);
5376     gst_object_unref (session->send_rtcp_src);
5377     session->send_rtcp_src = NULL;
5378   }
5379 }
5380
5381 /* If the requested name is NULL we should create a name with
5382  * the session number assuming we want the lowest possible session
5383  * with a free pad like the template */
5384 static gchar *
5385 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
5386 {
5387   gboolean name_found = FALSE;
5388   gint session = 0;
5389   GstIterator *pad_it = NULL;
5390   gchar *pad_name = NULL;
5391   GValue data = { 0, };
5392
5393   GST_DEBUG_OBJECT (element, "find a free pad name for template");
5394   while (!name_found) {
5395     gboolean done = FALSE;
5396
5397     g_free (pad_name);
5398     pad_name = g_strdup_printf (templ->name_template, session++);
5399     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
5400     name_found = TRUE;
5401     while (!done) {
5402       switch (gst_iterator_next (pad_it, &data)) {
5403         case GST_ITERATOR_OK:
5404         {
5405           GstPad *pad;
5406           gchar *name;
5407
5408           pad = g_value_get_object (&data);
5409           name = gst_pad_get_name (pad);
5410
5411           if (strcmp (name, pad_name) == 0) {
5412             done = TRUE;
5413             name_found = FALSE;
5414           }
5415           g_free (name);
5416           g_value_reset (&data);
5417           break;
5418         }
5419         case GST_ITERATOR_ERROR:
5420         case GST_ITERATOR_RESYNC:
5421           /* restart iteration */
5422           done = TRUE;
5423           name_found = FALSE;
5424           session = 0;
5425           break;
5426         case GST_ITERATOR_DONE:
5427           done = TRUE;
5428           break;
5429       }
5430     }
5431     g_value_unset (&data);
5432     gst_iterator_free (pad_it);
5433   }
5434
5435   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
5436   return pad_name;
5437 }
5438
5439 /*
5440  */
5441 static GstPad *
5442 gst_rtp_bin_request_new_pad (GstElement * element,
5443     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
5444 {
5445   GstRtpBin *rtpbin;
5446   GstElementClass *klass;
5447   GstPad *result;
5448
5449   gchar *pad_name = NULL;
5450
5451   g_return_val_if_fail (templ != NULL, NULL);
5452   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
5453
5454   rtpbin = GST_RTP_BIN (element);
5455   klass = GST_ELEMENT_GET_CLASS (element);
5456
5457   GST_RTP_BIN_LOCK (rtpbin);
5458
5459   if (name == NULL) {
5460     /* use a free pad name */
5461     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
5462   } else {
5463     /* use the provided name */
5464     pad_name = g_strdup (name);
5465   }
5466
5467   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
5468
5469   /* figure out the template */
5470   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
5471     result = create_recv_rtp (rtpbin, templ, pad_name);
5472   } else if (templ == gst_element_class_get_pad_template (klass,
5473           "recv_rtcp_sink_%u")) {
5474     result = create_recv_rtcp (rtpbin, templ, pad_name);
5475   } else if (templ == gst_element_class_get_pad_template (klass,
5476           "send_rtp_sink_%u")) {
5477     result = create_send_rtp (rtpbin, templ, pad_name);
5478   } else if (templ == gst_element_class_get_pad_template (klass,
5479           "send_rtcp_src_%u")) {
5480     result = create_send_rtcp (rtpbin, templ, pad_name);
5481   } else if (templ == gst_element_class_get_pad_template (klass,
5482           "recv_fec_sink_%u_%u")) {
5483     result = create_recv_fec (rtpbin, templ, pad_name);
5484   } else
5485     goto wrong_template;
5486
5487   g_free (pad_name);
5488   GST_RTP_BIN_UNLOCK (rtpbin);
5489
5490   return result;
5491
5492   /* ERRORS */
5493 wrong_template:
5494   {
5495     g_free (pad_name);
5496     GST_RTP_BIN_UNLOCK (rtpbin);
5497     g_warning ("rtpbin: this is not our template");
5498     return NULL;
5499   }
5500 }
5501
5502 static void
5503 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
5504 {
5505   GstRtpBinSession *session;
5506   GstRtpBin *rtpbin;
5507
5508   g_return_if_fail (GST_IS_GHOST_PAD (pad));
5509   g_return_if_fail (GST_IS_RTP_BIN (element));
5510
5511   rtpbin = GST_RTP_BIN (element);
5512
5513   GST_RTP_BIN_LOCK (rtpbin);
5514   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
5515       GST_DEBUG_PAD_NAME (pad));
5516
5517   if (!(session = find_session_by_pad (rtpbin, pad)))
5518     goto unknown_pad;
5519
5520   if (session->recv_rtp_sink_ghost == pad) {
5521     remove_recv_rtp (rtpbin, session);
5522   } else if (session->recv_rtcp_sink_ghost == pad) {
5523     remove_recv_rtcp (rtpbin, session);
5524   } else if (session->send_rtp_sink_ghost == pad) {
5525     remove_send_rtp (rtpbin, session);
5526   } else if (session->send_rtcp_src_ghost == pad) {
5527     remove_rtcp (rtpbin, session);
5528   } else if (pad_is_recv_fec (session, pad)) {
5529     remove_recv_fec_for_pad (rtpbin, session, pad);
5530   }
5531
5532   /* no more request pads, free the complete session */
5533   if (session->recv_rtp_sink_ghost == NULL
5534       && session->recv_rtcp_sink_ghost == NULL
5535       && session->send_rtp_sink_ghost == NULL
5536       && session->send_rtcp_src_ghost == NULL
5537       && session->recv_fec_sink_ghosts == NULL) {
5538     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
5539     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
5540     free_session (session, rtpbin);
5541   }
5542   GST_RTP_BIN_UNLOCK (rtpbin);
5543
5544   return;
5545
5546   /* ERROR */
5547 unknown_pad:
5548   {
5549     GST_RTP_BIN_UNLOCK (rtpbin);
5550     g_warning ("rtpbin: %s:%s is not one of our request pads",
5551         GST_DEBUG_PAD_NAME (pad));
5552     return;
5553   }
5554 }