rtpbin: Avoid holding lock GST_RTP_BIN_LOCK when emitting pad-added
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @title: rtpbin
23  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
24  *
25  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
26  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
27  * RTP sessions that will be synchronized together using RTCP SR packets.
28  *
29  * #GstRtpBin is configured with a number of request pads that define the
30  * functionality that is activated, similar to the #GstRtpSession element.
31  *
32  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
33  * number must be specified in the pad name.
34  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
35  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
36  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
37  * the packets are released from the jitterbuffer, they will be forwarded to a
38  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
39  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
40  * rtpbin with the session number, SSRC and payload type respectively as the pad
41  * name.
42  *
43  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
44  * session number must be specified in the pad name.
45  *
46  * If you want the session manager to generate and send RTCP packets, request
47  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
48  * on this pad contain SR/RR RTCP reports that should be sent to all participants
49  * in the session.
50  *
51  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
52  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
53  * the pad from the lowest available session will be returned. The session manager will modify the
54  * SSRC in the RTP packets to its own SSRC and will forward the packets on the
55  * send_rtp_src_\%u pad after updating its internal state.
56  *
57  * The session manager needs the clock-rate of the payload types it is handling
58  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
59  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
60  * signal.
61  *
62  * Access to the internal statistics of rtpbin is provided with the
63  * get-internal-session property. This action signal gives access to the
64  * RTPSession object which further provides action signals to retrieve the
65  * internal source and other sources.
66  *
67  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
68  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
69  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
70  * and decoders in order to support SRTP. The encoders must provide the pads
71  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
72  * RTCP. The session number will be used in the pad name. The decoders must provide
73  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
74  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
75  * internally.
76  *
77  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
78  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
79  * used to create or merge additional RTP streams. AUX elements are needed to
80  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
81  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
82  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
83  * and the pad will be linked to the session send_rtp_sink pad. Each session will
84  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
85  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
86  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
87  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
88  * The #GstRtpBin::request-jitterbuffer signal can be used to provide a custom
89  * element to perform arrival time smoothing, reordering and optionally packet
90  * loss detection and retransmission requests.
91  *
92  * ## Example pipelines
93  *
94  * |[
95  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
96  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
97  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
98  * |[
99  * gst-launch-1.0 rtpbin name=rtpbin \
100  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
101  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
102  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
103  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
104  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
105  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
106  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
107  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
108  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
109  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
110  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
111  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
112  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
113  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
114  * is received on port 5007. Since RTCP packets from the sender should be sent
115  * as soon as possible and do not participate in preroll, sync=false and
116  * async=false is configured on udpsink
117  * |[
118  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
119  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
120  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
121  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
122  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
123  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
124  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
125  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
126  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
127  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
128  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
129  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
130  * decode and display the video.
131  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
132  * decode and play the audio.
133  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
134  * session 1 on port 5003. These packets will be used for session management and
135  * synchronisation.
136  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
137  * on port 5007.
138  *
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #include "config.h"
143 #endif
144 #include <stdio.h>
145 #include <string.h>
146
147 #include <gst/rtp/gstrtpbuffer.h>
148 #include <gst/rtp/gstrtcpbuffer.h>
149
150 #include "gstrtpbin.h"
151 #include "rtpsession.h"
152 #include "gstrtpsession.h"
153 #include "gstrtpjitterbuffer.h"
154 #include "gstrtputils.h"
155
156 #include <gst/glib-compat-private.h>
157
158 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
159 #define GST_CAT_DEFAULT gst_rtp_bin_debug
160
161 /* sink pads */
162 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
163     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
164     GST_PAD_SINK,
165     GST_PAD_REQUEST,
166     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
167     );
168
169 /**
170  * GstRtpBin!recv_fec_sink_%u_%u:
171  *
172  * Sink template for receiving Forward Error Correction packets,
173  * in the form recv_fec_sink_<session_idx>_<fec_stream_idx>
174  *
175  * See #GstRTPST_2022_1_FecDec for example usage
176  *
177  * Since: 1.20
178  */
179 static GstStaticPadTemplate rtpbin_recv_fec_sink_template =
180 GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u",
181     GST_PAD_SINK,
182     GST_PAD_REQUEST,
183     GST_STATIC_CAPS ("application/x-rtp")
184     );
185
186 /**
187  * GstRtpBin!send_fec_src_%u_%u:
188  *
189  * Src template for sending Forward Error Correction packets,
190  * in the form send_fec_src_<session_idx>_<fec_stream_idx>
191  *
192  * See #GstRTPST_2022_1_FecEnc for example usage
193  *
194  * Since: 1.20
195  */
196 static GstStaticPadTemplate rtpbin_send_fec_src_template =
197 GST_STATIC_PAD_TEMPLATE ("send_fec_src_%u_%u",
198     GST_PAD_SRC,
199     GST_PAD_SOMETIMES,
200     GST_STATIC_CAPS ("application/x-rtp")
201     );
202
203 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
204     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
205     GST_PAD_SINK,
206     GST_PAD_REQUEST,
207     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
208     );
209
210 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
211 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
212     GST_PAD_SINK,
213     GST_PAD_REQUEST,
214     GST_STATIC_CAPS ("application/x-rtp")
215     );
216
217 /* src pads */
218 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
219 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
220     GST_PAD_SRC,
221     GST_PAD_SOMETIMES,
222     GST_STATIC_CAPS ("application/x-rtp")
223     );
224
225 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
226     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
227     GST_PAD_SRC,
228     GST_PAD_REQUEST,
229     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
230     );
231
232 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
233     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
234     GST_PAD_SRC,
235     GST_PAD_SOMETIMES,
236     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
237     );
238
239 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
240 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
241
242 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
243 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
244 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
245
246 /* lock for shutdown */
247 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
248 G_STMT_START {                                   \
249   if (g_atomic_int_get (&bin->priv->shutdown))   \
250     goto label;                                  \
251   GST_RTP_BIN_DYN_LOCK (bin);                    \
252   if (g_atomic_int_get (&bin->priv->shutdown)) { \
253     GST_RTP_BIN_DYN_UNLOCK (bin);                \
254     goto label;                                  \
255   }                                              \
256 } G_STMT_END
257
258 /* unlock for shutdown */
259 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
260   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
261
262 /* Minimum time offset to apply. This compensates for rounding errors in NTP to
263  * RTP timestamp conversions */
264 #define MIN_TS_OFFSET_ROUND_OFF_COMP (4 * GST_MSECOND)
265
266 struct _GstRtpBinPrivate
267 {
268   GMutex bin_lock;
269
270   /* lock protecting dynamic adding/removing */
271   GMutex dyn_lock;
272
273   /* if we are shutting down or not */
274   gint shutdown;
275
276   gboolean autoremove;
277
278   /* NTP time in ns of last SR sync used */
279   guint64 last_ntpnstime;
280
281   /* list of extra elements */
282   GList *elements;
283 };
284
285 /* signals and args */
286 enum
287 {
288   SIGNAL_REQUEST_PT_MAP,
289   SIGNAL_PAYLOAD_TYPE_CHANGE,
290   SIGNAL_CLEAR_PT_MAP,
291   SIGNAL_RESET_SYNC,
292   SIGNAL_GET_SESSION,
293   SIGNAL_GET_INTERNAL_SESSION,
294   SIGNAL_GET_STORAGE,
295   SIGNAL_GET_INTERNAL_STORAGE,
296   SIGNAL_CLEAR_SSRC,
297
298   SIGNAL_ON_NEW_SSRC,
299   SIGNAL_ON_SSRC_COLLISION,
300   SIGNAL_ON_SSRC_VALIDATED,
301   SIGNAL_ON_SSRC_ACTIVE,
302   SIGNAL_ON_SSRC_SDES,
303   SIGNAL_ON_BYE_SSRC,
304   SIGNAL_ON_BYE_TIMEOUT,
305   SIGNAL_ON_TIMEOUT,
306   SIGNAL_ON_SENDER_TIMEOUT,
307   SIGNAL_ON_NPT_STOP,
308
309   SIGNAL_REQUEST_RTP_ENCODER,
310   SIGNAL_REQUEST_RTP_DECODER,
311   SIGNAL_REQUEST_RTCP_ENCODER,
312   SIGNAL_REQUEST_RTCP_DECODER,
313
314   SIGNAL_REQUEST_FEC_DECODER,
315   SIGNAL_REQUEST_FEC_DECODER_FULL,
316   SIGNAL_REQUEST_FEC_ENCODER,
317
318   SIGNAL_REQUEST_JITTERBUFFER,
319
320   SIGNAL_NEW_JITTERBUFFER,
321   SIGNAL_NEW_STORAGE,
322
323   SIGNAL_REQUEST_AUX_SENDER,
324   SIGNAL_REQUEST_AUX_RECEIVER,
325
326   SIGNAL_ON_NEW_SENDER_SSRC,
327   SIGNAL_ON_SENDER_SSRC_ACTIVE,
328
329   SIGNAL_ON_BUNDLED_SSRC,
330
331   LAST_SIGNAL
332 };
333
334 #define DEFAULT_LATENCY_MS           200
335 #define DEFAULT_DROP_ON_LATENCY      FALSE
336 #define DEFAULT_SDES                 NULL
337 #define DEFAULT_DO_LOST              FALSE
338 #define DEFAULT_IGNORE_PT            FALSE
339 #define DEFAULT_NTP_SYNC             FALSE
340 #define DEFAULT_AUTOREMOVE           FALSE
341 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
342 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
343 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
344 #define DEFAULT_RTCP_SYNC_INTERVAL   0
345 #define DEFAULT_DO_SYNC_EVENT        FALSE
346 #define DEFAULT_DO_RETRANSMISSION    FALSE
347 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
348 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
349 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
350 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
351 #define DEFAULT_MAX_DROPOUT_TIME     60000
352 #define DEFAULT_MAX_MISORDER_TIME    2000
353 #define DEFAULT_RFC7273_SYNC         FALSE
354 #define DEFAULT_ADD_REFERENCE_TIMESTAMP_META FALSE
355 #define DEFAULT_MAX_STREAMS          G_MAXUINT
356 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0)
357 #define DEFAULT_MAX_TS_OFFSET        G_GINT64_CONSTANT(3000000000)
358 #define DEFAULT_MIN_TS_OFFSET        MIN_TS_OFFSET_ROUND_OFF_COMP
359 #define DEFAULT_TS_OFFSET_SMOOTHING_FACTOR  0
360
361 enum
362 {
363   PROP_0,
364   PROP_LATENCY,
365   PROP_DROP_ON_LATENCY,
366   PROP_SDES,
367   PROP_DO_LOST,
368   PROP_IGNORE_PT,
369   PROP_NTP_SYNC,
370   PROP_RTCP_SYNC,
371   PROP_RTCP_SYNC_INTERVAL,
372   PROP_AUTOREMOVE,
373   PROP_BUFFER_MODE,
374   PROP_USE_PIPELINE_CLOCK,
375   PROP_DO_SYNC_EVENT,
376   PROP_DO_RETRANSMISSION,
377   PROP_RTP_PROFILE,
378   PROP_NTP_TIME_SOURCE,
379   PROP_RTCP_SYNC_SEND_TIME,
380   PROP_MAX_RTCP_RTP_TIME_DIFF,
381   PROP_MAX_DROPOUT_TIME,
382   PROP_MAX_MISORDER_TIME,
383   PROP_RFC7273_SYNC,
384   PROP_ADD_REFERENCE_TIMESTAMP_META,
385   PROP_MAX_STREAMS,
386   PROP_MAX_TS_OFFSET_ADJUSTMENT,
387   PROP_MAX_TS_OFFSET,
388   PROP_MIN_TS_OFFSET,
389   PROP_TS_OFFSET_SMOOTHING_FACTOR,
390   PROP_FEC_DECODERS,
391   PROP_FEC_ENCODERS,
392 };
393
394 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
395 static GType
396 gst_rtp_bin_rtcp_sync_get_type (void)
397 {
398   static GType rtcp_sync_type = 0;
399   static const GEnumValue rtcp_sync_types[] = {
400     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
401     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
402     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
403     {0, NULL, NULL},
404   };
405
406   if (!rtcp_sync_type) {
407     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
408   }
409   return rtcp_sync_type;
410 }
411
412 /* helper objects */
413 typedef struct _GstRtpBinSession GstRtpBinSession;
414 typedef struct _GstRtpBinStream GstRtpBinStream;
415 typedef struct _GstRtpBinClient GstRtpBinClient;
416
417 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
418
419 static GstCaps *pt_map_requested (GstElement * element, guint pt,
420     GstRtpBinSession * session);
421 static void payload_type_change (GstElement * element, guint pt,
422     GstRtpBinSession * session);
423 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
424 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
425 static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
426 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
427 static void remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
428 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
429 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
430 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
431 static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id);
432 static GstPad *complete_session_sink (GstRtpBin * rtpbin,
433     GstRtpBinSession * session);
434 static void
435 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
436     guint sessid);
437 static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
438     GstRtpBinSession * session, guint sessid);
439 static GstElement *session_request_element (GstRtpBinSession * session,
440     guint signal);
441
442 /* Manages the RTP stream for one SSRC.
443  *
444  * We pipe the stream (coming from the SSRC demuxer) into a jitterbuffer.
445  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
446  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
447  * together (see below).
448  */
449 struct _GstRtpBinStream
450 {
451   /* the SSRC of this stream */
452   guint32 ssrc;
453
454   /* parent bin */
455   GstRtpBin *bin;
456
457   /* the session this SSRC belongs to */
458   GstRtpBinSession *session;
459
460   /* the jitterbuffer of the SSRC */
461   GstElement *buffer;
462   gulong buffer_handlesync_sig;
463   gulong buffer_ptreq_sig;
464   gulong buffer_ntpstop_sig;
465   gint percent;
466
467   /* the PT demuxer of the SSRC */
468   GstElement *demux;
469   gulong demux_newpad_sig;
470   gulong demux_padremoved_sig;
471   gulong demux_ptreq_sig;
472   gulong demux_ptchange_sig;
473
474   /* if we have calculated a valid rt_delta for this stream */
475   gboolean have_sync;
476   /* mapping to local RTP and NTP time */
477   gint64 rt_delta;
478   gint64 rtp_delta;
479   gint64 avg_ts_offset;
480   gboolean is_initialized;
481   /* base rtptime in gst time */
482   gint64 clock_base;
483 };
484
485 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
486 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
487
488 /* Manages the receiving end of the packets.
489  *
490  * There is one such structure for each RTP session (audio/video/...).
491  * We get the RTP/RTCP packets and stuff them into the session manager. From
492  * there they are pushed into an SSRC demuxer that splits the stream based on
493  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
494  * the GstRtpBinStream above).
495  *
496  * Before the SSRC demuxer, a storage element may be inserted for the purpose
497  * of Forward Error Correction.
498  */
499 struct _GstRtpBinSession
500 {
501   /* session id */
502   gint id;
503   /* the parent bin */
504   GstRtpBin *bin;
505   /* the session element */
506   GstElement *session;
507   /* the SSRC demuxer */
508   GstElement *demux;
509   gulong demux_newpad_sig;
510   gulong demux_padremoved_sig;
511
512   /* Fec support */
513   GstElement *storage;
514
515   GMutex lock;
516
517   /* list of GstRtpBinStream */
518   GSList *streams;
519
520   /* list of elements */
521   GSList *elements;
522
523   /* mapping of payload type to caps */
524   GHashTable *ptmap;
525
526   /* the pads of the session */
527   GstPad *recv_rtp_sink;
528   GstPad *recv_rtp_sink_ghost;
529   GstPad *recv_rtp_src;
530   GstPad *recv_rtcp_sink;
531   GstPad *recv_rtcp_sink_ghost;
532   GstPad *sync_src;
533   GstPad *send_rtp_sink;
534   GstPad *send_rtp_sink_ghost;
535   GstPad *send_rtp_src_ghost;
536   GstPad *send_rtcp_src;
537   GstPad *send_rtcp_src_ghost;
538
539   GSList *recv_fec_sinks;
540   GSList *recv_fec_sink_ghosts;
541   /* fec decoder placed before the rtpjitterbuffer but after the rtpssrcdemux.
542    * XXX: This does not yet support multiple ssrc's in the same rtp session
543    */
544   GstElement *early_fec_decoder;
545
546   GSList *send_fec_src_ghosts;
547 };
548
549 /* Manages the RTP streams that come from one client and should therefore be
550  * synchronized.
551  */
552 struct _GstRtpBinClient
553 {
554   /* the common CNAME for the streams */
555   gchar *cname;
556   guint cname_len;
557
558   /* the streams */
559   guint nstreams;
560   GSList *streams;
561 };
562
563 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
564 static GstRtpBinSession *
565 find_session_by_id (GstRtpBin * rtpbin, gint id)
566 {
567   GSList *walk;
568
569   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
570     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
571
572     if (sess->id == id)
573       return sess;
574   }
575   return NULL;
576 }
577
578 static gboolean
579 pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad)
580 {
581   return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL;
582 }
583
584 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
585 static GstRtpBinSession *
586 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
587 {
588   GSList *walk;
589
590   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
591     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
592
593     if ((sess->recv_rtp_sink_ghost == pad) ||
594         (sess->recv_rtcp_sink_ghost == pad) ||
595         (sess->send_rtp_sink_ghost == pad) ||
596         (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad))
597       return sess;
598   }
599   return NULL;
600 }
601
602 static void
603 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
604 {
605   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
606       sess->id, ssrc);
607 }
608
609 static void
610 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
611 {
612   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
613       sess->id, ssrc);
614 }
615
616 static void
617 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
618 {
619   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
620       sess->id, ssrc);
621 }
622
623 static void
624 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
625 {
626   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
627       sess->id, ssrc);
628 }
629
630 static void
631 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
632 {
633   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
634       sess->id, ssrc);
635 }
636
637 static void
638 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
639 {
640   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
641       sess->id, ssrc);
642 }
643
644 static void
645 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
646 {
647   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
648       sess->id, ssrc);
649
650   if (sess->bin->priv->autoremove)
651     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
652 }
653
654 static void
655 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
656 {
657   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
658       sess->id, ssrc);
659
660   if (sess->bin->priv->autoremove)
661     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
662 }
663
664 static void
665 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
666 {
667   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
668       sess->id, ssrc);
669 }
670
671 static void
672 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
673 {
674   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
675       stream->session->id, stream->ssrc);
676 }
677
678 static void
679 on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
680 {
681   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
682       sess->id, ssrc);
683 }
684
685 static void
686 on_sender_ssrc_active (GstElement * session, guint32 ssrc,
687     GstRtpBinSession * sess)
688 {
689   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE],
690       0, sess->id, ssrc);
691 }
692
693 /* must be called with the SESSION lock */
694 static GstRtpBinStream *
695 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
696 {
697   GSList *walk;
698
699   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
700     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
701
702     if (stream->ssrc == ssrc)
703       return stream;
704   }
705   return NULL;
706 }
707
708 static void
709 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
710     GstRtpBinSession * session)
711 {
712   GstRtpBinStream *stream = NULL;
713   GstRtpBin *rtpbin;
714
715   rtpbin = session->bin;
716
717   GST_RTP_BIN_LOCK (rtpbin);
718
719   GST_RTP_SESSION_LOCK (session);
720   if ((stream = find_stream_by_ssrc (session, ssrc)))
721     session->streams = g_slist_remove (session->streams, stream);
722   GST_RTP_SESSION_UNLOCK (session);
723
724   if (stream)
725     free_stream (stream, rtpbin);
726
727   GST_RTP_BIN_UNLOCK (rtpbin);
728 }
729
730 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
731 static GstRtpBinSession *
732 create_session (GstRtpBin * rtpbin, gint id)
733 {
734   GstRtpBinSession *sess;
735   GstElement *session, *demux;
736   GstElement *storage = NULL;
737   GstState target;
738
739   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
740     goto no_session;
741
742   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
743     goto no_demux;
744
745   if (!(storage = gst_element_factory_make ("rtpstorage", NULL)))
746     goto no_storage;
747
748   /* need to sink the storage or otherwise signal handlers from bindings will
749    * take ownership of it and we don't own it anymore */
750   gst_object_ref_sink (storage);
751   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_STORAGE], 0, storage,
752       id);
753
754   sess = g_new0 (GstRtpBinSession, 1);
755   g_mutex_init (&sess->lock);
756   sess->id = id;
757   sess->bin = rtpbin;
758   sess->session = session;
759   sess->demux = demux;
760   sess->storage = storage;
761
762   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
763       (GDestroyNotify) gst_caps_unref);
764   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
765
766   /* configure SDES items */
767   GST_OBJECT_LOCK (rtpbin);
768   g_object_set (demux, "max-streams", rtpbin->max_streams, NULL);
769   g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile",
770       rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time,
771       NULL);
772   if (rtpbin->use_pipeline_clock)
773     g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock,
774         NULL);
775   else
776     g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL);
777
778   g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time,
779       "max-misorder-time", rtpbin->max_misorder_time, NULL);
780   GST_OBJECT_UNLOCK (rtpbin);
781
782   /* provide clock_rate to the session manager when needed */
783   g_signal_connect (session, "request-pt-map",
784       (GCallback) pt_map_requested, sess);
785
786   g_signal_connect (sess->session, "on-new-ssrc",
787       (GCallback) on_new_ssrc, sess);
788   g_signal_connect (sess->session, "on-ssrc-collision",
789       (GCallback) on_ssrc_collision, sess);
790   g_signal_connect (sess->session, "on-ssrc-validated",
791       (GCallback) on_ssrc_validated, sess);
792   g_signal_connect (sess->session, "on-ssrc-active",
793       (GCallback) on_ssrc_active, sess);
794   g_signal_connect (sess->session, "on-ssrc-sdes",
795       (GCallback) on_ssrc_sdes, sess);
796   g_signal_connect (sess->session, "on-bye-ssrc",
797       (GCallback) on_bye_ssrc, sess);
798   g_signal_connect (sess->session, "on-bye-timeout",
799       (GCallback) on_bye_timeout, sess);
800   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
801   g_signal_connect (sess->session, "on-sender-timeout",
802       (GCallback) on_sender_timeout, sess);
803   g_signal_connect (sess->session, "on-new-sender-ssrc",
804       (GCallback) on_new_sender_ssrc, sess);
805   g_signal_connect (sess->session, "on-sender-ssrc-active",
806       (GCallback) on_sender_ssrc_active, sess);
807
808   gst_bin_add (GST_BIN_CAST (rtpbin), session);
809   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
810   gst_bin_add (GST_BIN_CAST (rtpbin), storage);
811
812   /* unref the storage again, the bin has a reference now and
813    * we don't need it anymore */
814   gst_object_unref (storage);
815
816   GST_OBJECT_LOCK (rtpbin);
817   target = GST_STATE_TARGET (rtpbin);
818   GST_OBJECT_UNLOCK (rtpbin);
819
820   /* change state only to what's needed */
821   gst_element_set_state (demux, target);
822   gst_element_set_state (session, target);
823   gst_element_set_state (storage, target);
824
825   return sess;
826
827   /* ERRORS */
828 no_session:
829   {
830     g_warning ("rtpbin: could not create rtpsession element");
831     return NULL;
832   }
833 no_demux:
834   {
835     gst_object_unref (session);
836     g_warning ("rtpbin: could not create rtpssrcdemux element");
837     return NULL;
838   }
839 no_storage:
840   {
841     gst_object_unref (session);
842     gst_object_unref (demux);
843     g_warning ("rtpbin: could not create rtpstorage element");
844     return NULL;
845   }
846 }
847
848 static gboolean
849 bin_manage_element (GstRtpBin * bin, GstElement * element)
850 {
851   GstRtpBinPrivate *priv = bin->priv;
852
853   if (g_list_find (priv->elements, element)) {
854     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
855   } else {
856     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
857
858     if (g_object_is_floating (element))
859       element = gst_object_ref_sink (element);
860
861     if (!gst_bin_add (GST_BIN_CAST (bin), element))
862       goto add_failed;
863     if (!gst_element_sync_state_with_parent (element))
864       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
865   }
866   /* we add the element multiple times, each we need an equal number of
867    * removes to really remove the element from the bin */
868   priv->elements = g_list_prepend (priv->elements, element);
869
870   return TRUE;
871
872   /* ERRORS */
873 add_failed:
874   {
875     GST_WARNING_OBJECT (bin, "unable to add element");
876     gst_object_unref (element);
877     return FALSE;
878   }
879 }
880
881 static void
882 remove_bin_element (GstElement * element, GstRtpBin * bin)
883 {
884   GstRtpBinPrivate *priv = bin->priv;
885   GList *find;
886
887   find = g_list_find (priv->elements, element);
888   if (find) {
889     priv->elements = g_list_delete_link (priv->elements, find);
890
891     if (!g_list_find (priv->elements, element)) {
892       gst_element_set_locked_state (element, TRUE);
893       gst_bin_remove (GST_BIN_CAST (bin), element);
894       gst_element_set_state (element, GST_STATE_NULL);
895     }
896
897     gst_object_unref (element);
898   }
899 }
900
901 /* called with RTP_BIN_LOCK */
902 static void
903 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
904 {
905   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
906
907   gst_element_set_locked_state (sess->demux, TRUE);
908   gst_element_set_locked_state (sess->session, TRUE);
909   gst_element_set_locked_state (sess->storage, TRUE);
910
911   gst_element_set_state (sess->demux, GST_STATE_NULL);
912   gst_element_set_state (sess->session, GST_STATE_NULL);
913   gst_element_set_state (sess->storage, GST_STATE_NULL);
914
915   remove_recv_rtp (bin, sess);
916   remove_recv_rtcp (bin, sess);
917   remove_recv_fec (bin, sess);
918   remove_send_rtp (bin, sess);
919   remove_send_fec (bin, sess);
920   remove_rtcp (bin, sess);
921
922   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
923   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
924   gst_bin_remove (GST_BIN_CAST (bin), sess->storage);
925
926   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
927   g_slist_free (sess->elements);
928   sess->elements = NULL;
929
930   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
931   g_slist_free (sess->streams);
932
933   g_mutex_clear (&sess->lock);
934   g_hash_table_destroy (sess->ptmap);
935
936   g_free (sess);
937 }
938
939 /* get the payload type caps for the specific payload @pt in @session */
940 static GstCaps *
941 get_pt_map (GstRtpBinSession * session, guint pt)
942 {
943   GstCaps *caps = NULL;
944   GstRtpBin *bin;
945   GValue ret = { 0 };
946   GValue args[3] = { {0}, {0}, {0} };
947
948   GST_DEBUG ("searching pt %u in cache", pt);
949
950   GST_RTP_SESSION_LOCK (session);
951
952   /* first look in the cache */
953   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
954   if (caps) {
955     gst_caps_ref (caps);
956     goto done;
957   }
958
959   bin = session->bin;
960
961   GST_DEBUG ("emitting signal for pt %u in session %u", pt, session->id);
962
963   /* not in cache, send signal to request caps */
964   g_value_init (&args[0], GST_TYPE_ELEMENT);
965   g_value_set_object (&args[0], bin);
966   g_value_init (&args[1], G_TYPE_UINT);
967   g_value_set_uint (&args[1], session->id);
968   g_value_init (&args[2], G_TYPE_UINT);
969   g_value_set_uint (&args[2], pt);
970
971   g_value_init (&ret, GST_TYPE_CAPS);
972   g_value_set_boxed (&ret, NULL);
973
974   GST_RTP_SESSION_UNLOCK (session);
975
976   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
977
978   GST_RTP_SESSION_LOCK (session);
979
980   g_value_unset (&args[0]);
981   g_value_unset (&args[1]);
982   g_value_unset (&args[2]);
983
984   /* look in the cache again because we let the lock go */
985   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
986   if (caps) {
987     gst_caps_ref (caps);
988     g_value_unset (&ret);
989     goto done;
990   }
991
992   caps = (GstCaps *) g_value_dup_boxed (&ret);
993   g_value_unset (&ret);
994   if (!caps)
995     goto no_caps;
996
997   GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps);
998
999   /* store in cache, take additional ref */
1000   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
1001       gst_caps_ref (caps));
1002
1003 done:
1004   GST_RTP_SESSION_UNLOCK (session);
1005
1006   return caps;
1007
1008   /* ERRORS */
1009 no_caps:
1010   {
1011     GST_RTP_SESSION_UNLOCK (session);
1012     GST_DEBUG ("no pt map could be obtained");
1013     return NULL;
1014   }
1015 }
1016
1017 static gboolean
1018 return_true (gpointer key, gpointer value, gpointer user_data)
1019 {
1020   return TRUE;
1021 }
1022
1023 static void
1024 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
1025 {
1026   GSList *clients, *streams;
1027
1028   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
1029
1030   GST_RTP_BIN_LOCK (rtpbin);
1031   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
1032     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1033
1034     /* reset sync on all streams for this client */
1035     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
1036       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1037
1038       /* make use require a new SR packet for this stream before we attempt new
1039        * lip-sync */
1040       stream->have_sync = FALSE;
1041       stream->rt_delta = 0;
1042       stream->avg_ts_offset = 0;
1043       stream->is_initialized = FALSE;
1044       stream->rtp_delta = 0;
1045       stream->clock_base = -100 * GST_SECOND;
1046     }
1047   }
1048   GST_RTP_BIN_UNLOCK (rtpbin);
1049 }
1050
1051 static void
1052 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
1053 {
1054   GSList *sessions, *streams;
1055
1056   GST_RTP_BIN_LOCK (bin);
1057   GST_DEBUG_OBJECT (bin, "clearing pt map");
1058   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1059     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1060
1061     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
1062     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
1063
1064     GST_RTP_SESSION_LOCK (session);
1065     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
1066
1067     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1068       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1069
1070       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
1071       if (g_signal_lookup ("clear-pt-map", G_OBJECT_TYPE (stream->buffer)) != 0)
1072         g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
1073       if (stream->demux)
1074         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
1075     }
1076     GST_RTP_SESSION_UNLOCK (session);
1077   }
1078   GST_RTP_BIN_UNLOCK (bin);
1079
1080   /* reset sync too */
1081   gst_rtp_bin_reset_sync (bin);
1082 }
1083
1084 static GstElement *
1085 gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id)
1086 {
1087   GstRtpBinSession *session;
1088   GstElement *ret = NULL;
1089
1090   GST_RTP_BIN_LOCK (bin);
1091   GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id);
1092   session = find_session_by_id (bin, (gint) session_id);
1093   if (session) {
1094     ret = gst_object_ref (session->session);
1095   }
1096   GST_RTP_BIN_UNLOCK (bin);
1097
1098   return ret;
1099 }
1100
1101 static RTPSession *
1102 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
1103 {
1104   RTPSession *internal_session = NULL;
1105   GstRtpBinSession *session;
1106
1107   GST_RTP_BIN_LOCK (bin);
1108   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u",
1109       session_id);
1110   session = find_session_by_id (bin, (gint) session_id);
1111   if (session) {
1112     g_object_get (session->session, "internal-session", &internal_session,
1113         NULL);
1114   }
1115   GST_RTP_BIN_UNLOCK (bin);
1116
1117   return internal_session;
1118 }
1119
1120 static GstElement *
1121 gst_rtp_bin_get_storage (GstRtpBin * bin, guint session_id)
1122 {
1123   GstRtpBinSession *session;
1124   GstElement *res = NULL;
1125
1126   GST_RTP_BIN_LOCK (bin);
1127   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1128       session_id);
1129   session = find_session_by_id (bin, (gint) session_id);
1130   if (session && session->storage) {
1131     res = gst_object_ref (session->storage);
1132   }
1133   GST_RTP_BIN_UNLOCK (bin);
1134
1135   return res;
1136 }
1137
1138 static GObject *
1139 gst_rtp_bin_get_internal_storage (GstRtpBin * bin, guint session_id)
1140 {
1141   GObject *internal_storage = NULL;
1142   GstRtpBinSession *session;
1143
1144   GST_RTP_BIN_LOCK (bin);
1145   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1146       session_id);
1147   session = find_session_by_id (bin, (gint) session_id);
1148   if (session && session->storage) {
1149     g_object_get (session->storage, "internal-storage", &internal_storage,
1150         NULL);
1151   }
1152   GST_RTP_BIN_UNLOCK (bin);
1153
1154   return internal_storage;
1155 }
1156
1157 static void
1158 gst_rtp_bin_clear_ssrc (GstRtpBin * bin, guint session_id, guint32 ssrc)
1159 {
1160   GstRtpBinSession *session;
1161   GstElement *demux = NULL;
1162
1163   GST_RTP_BIN_LOCK (bin);
1164   GST_DEBUG_OBJECT (bin, "clearing ssrc %u for session %u", ssrc, session_id);
1165   session = find_session_by_id (bin, (gint) session_id);
1166   if (session)
1167     demux = gst_object_ref (session->demux);
1168   GST_RTP_BIN_UNLOCK (bin);
1169
1170   if (demux) {
1171     g_signal_emit_by_name (demux, "clear-ssrc", ssrc, NULL);
1172     gst_object_unref (demux);
1173   }
1174 }
1175
1176 static GstElement *
1177 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
1178 {
1179   GST_DEBUG_OBJECT (bin, "return NULL encoder");
1180   return NULL;
1181 }
1182
1183 static GstElement *
1184 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
1185 {
1186   GST_DEBUG_OBJECT (bin, "return NULL decoder");
1187   return NULL;
1188 }
1189
1190 static GstElement *
1191 gst_rtp_bin_request_jitterbuffer (GstRtpBin * bin, guint session_id)
1192 {
1193   return gst_element_factory_make ("rtpjitterbuffer", NULL);
1194 }
1195
1196 static void
1197 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
1198     const gchar * name, const GValue * value)
1199 {
1200   GSList *sessions, *streams;
1201
1202   GST_RTP_BIN_LOCK (bin);
1203   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1204     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1205
1206     GST_RTP_SESSION_LOCK (session);
1207     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1208       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1209       GObjectClass *jb_class;
1210
1211       jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1212       if (g_object_class_find_property (jb_class, name))
1213         g_object_set_property (G_OBJECT (stream->buffer), name, value);
1214       else
1215         GST_WARNING_OBJECT (bin,
1216             "Stream jitterbuffer does not expose property %s", name);
1217     }
1218     GST_RTP_SESSION_UNLOCK (session);
1219   }
1220   GST_RTP_BIN_UNLOCK (bin);
1221 }
1222
1223 static void
1224 gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin,
1225     const gchar * name, const GValue * value)
1226 {
1227   GSList *sessions;
1228
1229   GST_RTP_BIN_LOCK (bin);
1230   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1231     GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data;
1232
1233     g_object_set_property (G_OBJECT (sess->session), name, value);
1234   }
1235   GST_RTP_BIN_UNLOCK (bin);
1236 }
1237
1238 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
1239 static GstRtpBinClient *
1240 get_client (GstRtpBin * bin, guint8 len, const guint8 * data,
1241     gboolean * created)
1242 {
1243   GstRtpBinClient *result = NULL;
1244   GSList *walk;
1245
1246   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
1247     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
1248
1249     if (len != client->cname_len)
1250       continue;
1251
1252     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
1253       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
1254           client->cname);
1255       result = client;
1256       break;
1257     }
1258   }
1259
1260   /* nothing found, create one */
1261   if (result == NULL) {
1262     result = g_new0 (GstRtpBinClient, 1);
1263     result->cname = g_strndup ((gchar *) data, len);
1264     result->cname_len = len;
1265     bin->clients = g_slist_prepend (bin->clients, result);
1266     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
1267         result->cname);
1268   }
1269   return result;
1270 }
1271
1272 static void
1273 free_client (GstRtpBinClient * client, GstRtpBin * bin)
1274 {
1275   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
1276   g_slist_free (client->streams);
1277   g_free (client->cname);
1278   g_free (client);
1279 }
1280
1281 static void
1282 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
1283     guint64 * ntpnstime)
1284 {
1285   guint64 ntpns = -1;
1286   GstClock *clock;
1287   GstClockTime base_time, rt, clock_time;
1288
1289   GST_OBJECT_LOCK (bin);
1290   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1291     base_time = GST_ELEMENT_CAST (bin)->base_time;
1292     gst_object_ref (clock);
1293     GST_OBJECT_UNLOCK (bin);
1294
1295     /* get current clock time and convert to running time */
1296     clock_time = gst_clock_get_time (clock);
1297     rt = clock_time - base_time;
1298
1299     if (bin->use_pipeline_clock) {
1300       ntpns = rt;
1301       /* add constant to convert from 1970 based time to 1900 based time */
1302       ntpns += (GST_RTP_NTP_UNIX_OFFSET * GST_SECOND);
1303     } else {
1304       switch (bin->ntp_time_source) {
1305         case GST_RTP_NTP_TIME_SOURCE_NTP:
1306         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1307           /* get current NTP time */
1308           ntpns = g_get_real_time () * GST_USECOND;
1309
1310           /* add constant to convert from 1970 based time to 1900 based time */
1311           if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1312             ntpns += (GST_RTP_NTP_UNIX_OFFSET * GST_SECOND);
1313           break;
1314         }
1315         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1316           ntpns = rt;
1317           break;
1318         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1319           ntpns = clock_time;
1320           break;
1321         default:
1322           ntpns = -1;           /* Fix uninited compiler warning */
1323           g_assert_not_reached ();
1324           break;
1325       }
1326     }
1327
1328     gst_object_unref (clock);
1329   } else {
1330     GST_OBJECT_UNLOCK (bin);
1331     rt = -1;
1332     ntpns = -1;
1333   }
1334   if (running_time)
1335     *running_time = rt;
1336   if (ntpnstime)
1337     *ntpnstime = ntpns;
1338 }
1339
1340 static void
1341 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1342     gint64 ts_offset, gint64 max_ts_offset, guint64 min_ts_offset,
1343     gboolean allow_positive_ts_offset)
1344 {
1345   gint64 prev_ts_offset;
1346   GObjectClass *jb_class;
1347
1348   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1349
1350   if (!g_object_class_find_property (jb_class, "ts-offset")) {
1351     GST_LOG_OBJECT (bin,
1352         "stream's jitterbuffer does not expose ts-offset property");
1353     return;
1354   }
1355
1356   if (bin->ts_offset_smoothing_factor > 0) {
1357     if (!stream->is_initialized) {
1358       stream->avg_ts_offset = ts_offset;
1359       stream->is_initialized = TRUE;
1360     } else {
1361       /* RMA algorithm using smoothing factor is following, but split into
1362        * parts to check for overflows:
1363        * stream->avg_ts_offset =
1364        *   ((bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset
1365        *    + ts_offset) / bin->ts_offset_smoothing_factor
1366        */
1367       guint64 max_possible_smoothing_factor = G_MAXUINT64;
1368       gint64 cur_avg_product =
1369           (bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset;
1370       if (stream->avg_ts_offset != 0)
1371         max_possible_smoothing_factor =
1372             G_MAXINT64 / ABS (stream->avg_ts_offset);
1373
1374       if ((max_possible_smoothing_factor < bin->ts_offset_smoothing_factor) ||
1375           (cur_avg_product > 0 && G_MAXINT64 - cur_avg_product < ts_offset) ||
1376           (cur_avg_product < 0 && G_MININT64 - cur_avg_product > ts_offset)) {
1377         GST_WARNING_OBJECT (bin,
1378             "ts-offset-smoothing-factor calculation overflow, fallback to using ts-offset directly");
1379         stream->avg_ts_offset = ts_offset;
1380       } else {
1381         stream->avg_ts_offset =
1382             (cur_avg_product + ts_offset) / bin->ts_offset_smoothing_factor;
1383       }
1384     }
1385   } else {
1386     stream->avg_ts_offset = ts_offset;
1387   }
1388
1389   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1390
1391   /* delta changed, see how much */
1392   if (prev_ts_offset != stream->avg_ts_offset) {
1393     gint64 diff;
1394
1395     diff = prev_ts_offset - stream->avg_ts_offset;
1396
1397     GST_DEBUG_OBJECT (bin,
1398         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1399         ", diff: %" G_GINT64_FORMAT, stream->avg_ts_offset, prev_ts_offset,
1400         diff);
1401
1402     /* ignore minor offsets */
1403     if (ABS (diff) < min_ts_offset) {
1404       GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1405       return;
1406     }
1407
1408     /* sanity check offset */
1409     if (max_ts_offset > 0) {
1410       if (stream->avg_ts_offset > 0 && !allow_positive_ts_offset) {
1411         GST_DEBUG_OBJECT (bin,
1412             "offset is positive (clocks are out of sync), ignoring");
1413         return;
1414       }
1415       if (ABS (stream->avg_ts_offset) > max_ts_offset) {
1416         GST_DEBUG_OBJECT (bin, "offset too large, ignoring");
1417         return;
1418       }
1419     }
1420
1421     g_object_set (stream->buffer, "ts-offset", stream->avg_ts_offset, NULL);
1422   }
1423   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1424       stream->ssrc, stream->avg_ts_offset);
1425 }
1426
1427 static void
1428 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1429 {
1430   if (stream->bin->send_sync_event) {
1431     GstEvent *event;
1432     GstPad *srcpad;
1433
1434     GST_DEBUG_OBJECT (stream->bin,
1435         "sending GstRTCPSRReceived event downstream");
1436
1437     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1438         gst_structure_new_empty ("GstRTCPSRReceived"));
1439
1440     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1441     gst_pad_push_event (srcpad, event);
1442     gst_object_unref (srcpad);
1443   }
1444 }
1445
1446 /* associate a stream to the given CNAME. This will make sure all streams for
1447  * that CNAME are synchronized together.
1448  * Must be called with GST_RTP_BIN_LOCK */
1449 static void
1450 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1451     const guint8 * data, guint64 ntpnstime, guint64 last_extrtptime,
1452     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1453     gint64 rtp_clock_base)
1454 {
1455   GstRtpBinClient *client;
1456   gboolean created;
1457   GSList *walk;
1458   GstClockTime running_time, running_time_rtp;
1459
1460   /* first find or create the CNAME */
1461   client = get_client (bin, len, data, &created);
1462
1463   /* find stream in the client */
1464   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1465     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1466
1467     if (ostream == stream)
1468       break;
1469   }
1470   /* not found, add it to the list */
1471   if (walk == NULL) {
1472     GST_DEBUG_OBJECT (bin,
1473         "new association of SSRC %08x with client %p with CNAME %s",
1474         stream->ssrc, client, client->cname);
1475     client->streams = g_slist_prepend (client->streams, stream);
1476     client->nstreams++;
1477   } else {
1478     GST_DEBUG_OBJECT (bin,
1479         "found association of SSRC %08x with client %p with CNAME %s",
1480         stream->ssrc, client, client->cname);
1481   }
1482
1483   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1484     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1485     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1486       /* we don't need that data, so carry on,
1487        * but make some values look saner */
1488       last_extrtptime = base_rtptime;
1489     } else {
1490       /* nothing we can do with this data in this case */
1491       GST_DEBUG_OBJECT (bin, "bailing out");
1492       return;
1493     }
1494   }
1495
1496   /* Take the extended rtptime we found in the SR packet and map it to the
1497    * local rtptime. The local rtp time is used to construct timestamps on the
1498    * buffers so we will calculate what running_time corresponds to the RTP
1499    * timestamp in the SR packet. */
1500   running_time_rtp = last_extrtptime - base_rtptime;
1501
1502   GST_DEBUG_OBJECT (bin,
1503       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1504       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1505       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1506       last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
1507
1508   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1509    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1510    * into a corresponding gstreamer timestamp. Note that the base_time also
1511    * contains the drift between sender and receiver. */
1512   running_time =
1513       gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
1514   running_time += base_time;
1515
1516   stream->have_sync = TRUE;
1517
1518   GST_DEBUG_OBJECT (bin,
1519       "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
1520       running_time, ntpnstime);
1521
1522   /* recalc inter stream playout offset, but only if there is more than one
1523    * stream or we're doing NTP sync. */
1524   if (bin->ntp_sync) {
1525     gint64 ntpdiff, rtdiff;
1526     guint64 local_ntpnstime;
1527     GstClockTime local_running_time;
1528
1529     /* For NTP sync we need to first get a snapshot of running_time and NTP
1530      * time. We know at what running_time we play a certain RTP time, we also
1531      * calculated when we would play the RTP time in the SR packet. Now we need
1532      * to know how the running_time and the NTP time relate to each other. */
1533     get_current_times (bin, &local_running_time, &local_ntpnstime);
1534
1535     /* see how far away the NTP time is. This is the difference between the
1536      * current NTP time and the NTP time in the last SR packet. */
1537     ntpdiff = local_ntpnstime - ntpnstime;
1538     /* see how far away the running_time is. This is the difference between the
1539      * current running_time and the running_time of the RTP timestamp in the
1540      * last SR packet. */
1541     rtdiff = local_running_time - running_time;
1542
1543     GST_DEBUG_OBJECT (bin,
1544         "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
1545         local_ntpnstime, ntpnstime);
1546     GST_DEBUG_OBJECT (bin,
1547         "local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
1548         G_GUINT64_FORMAT, local_running_time, running_time);
1549     GST_DEBUG_OBJECT (bin,
1550         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1551         rtdiff);
1552
1553     /* combine to get the final diff to apply to the running_time */
1554     stream->rt_delta = rtdiff - ntpdiff;
1555
1556     stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset,
1557         bin->min_ts_offset, FALSE);
1558   } else {
1559     gint64 min, rtp_min, clock_base = stream->clock_base;
1560     gboolean all_sync, use_rtp;
1561     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1562
1563     /* calculate delta between server and receiver. ntpnstime is created by
1564      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1565      * delta expresses the difference to our timeline and the server timeline. The
1566      * difference in itself doesn't mean much but we can combine the delta of
1567      * multiple streams to create a stream specific offset. */
1568     stream->rt_delta = ntpnstime - running_time;
1569
1570     /* calculate the min of all deltas, ignoring streams that did not yet have a
1571      * valid rt_delta because we did not yet receive an SR packet for those
1572      * streams.
1573      * We calculate the minimum because we would like to only apply positive
1574      * offsets to streams, delaying their playback instead of trying to speed up
1575      * other streams (which might be impossible when we have to create negative
1576      * latencies).
1577      * The stream that has the smallest diff is selected as the reference stream,
1578      * all other streams will have a positive offset to this difference. */
1579
1580     /* some alternative setting allow ignoring RTCP as much as possible,
1581      * for servers generating bogus ntp timeline */
1582     min = rtp_min = G_MAXINT64;
1583     use_rtp = FALSE;
1584     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1585       guint64 ext_base;
1586
1587       use_rtp = TRUE;
1588       /* signed version for convenience */
1589       clock_base = base_rtptime;
1590       /* deal with possible wrap-around */
1591       ext_base = base_rtptime;
1592       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1593       /* sanity check; base rtp and provided clock_base should be close */
1594       if (rtp_clock_base >= clock_base) {
1595         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1596           rtp_clock_base = base_time +
1597               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1598               GST_SECOND, clock_rate);
1599         } else {
1600           use_rtp = FALSE;
1601         }
1602       } else {
1603         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1604           rtp_clock_base = base_time -
1605               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1606               GST_SECOND, clock_rate);
1607         } else {
1608           use_rtp = FALSE;
1609         }
1610       }
1611       /* warn and bail for clarity out if no sane values */
1612       if (!use_rtp) {
1613         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1614         return;
1615       }
1616       /* store to track changes */
1617       clock_base = rtp_clock_base;
1618       /* generate a fake as before,
1619        * now equating rtptime obtained from RTP-Info,
1620        * where the large time represent the otherwise irrelevant npt/ntp time */
1621       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1622     } else {
1623       clock_base = rtp_clock_base;
1624     }
1625
1626     all_sync = TRUE;
1627     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1628       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1629
1630       if (!ostream->have_sync) {
1631         all_sync = FALSE;
1632         continue;
1633       }
1634
1635       /* change in current stream's base from previously init'ed value
1636        * leads to reset of all stream's base */
1637       if (stream != ostream && stream->clock_base >= 0 &&
1638           (stream->clock_base != clock_base)) {
1639         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1640         ostream->clock_base = -100 * GST_SECOND;
1641         ostream->rtp_delta = 0;
1642       }
1643
1644       if (ostream->rt_delta < min)
1645         min = ostream->rt_delta;
1646       if (ostream->rtp_delta < rtp_min)
1647         rtp_min = ostream->rtp_delta;
1648     }
1649
1650     /* arrange to re-sync for each stream upon significant change,
1651      * e.g. post-seek */
1652     all_sync = all_sync && (stream->clock_base == clock_base);
1653     stream->clock_base = clock_base;
1654
1655     /* may need init performed above later on, but nothing more to do now */
1656     if (client->nstreams <= 1)
1657       return;
1658
1659     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1660         " all sync %d", client, min, all_sync);
1661     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1662
1663     switch (rtcp_sync) {
1664       case GST_RTP_BIN_RTCP_SYNC_RTP:
1665         if (!use_rtp)
1666           break;
1667         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1668             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1669         /* fall-through */
1670       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1671         /* if all have been synced already, do not bother further */
1672         if (all_sync) {
1673           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1674           return;
1675         }
1676         break;
1677       default:
1678         break;
1679     }
1680
1681     /* bail out if we adjusted recently enough */
1682     if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
1683         bin->rtcp_sync_interval * GST_MSECOND) {
1684       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1685           "previous sender info too recent "
1686           "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
1687       return;
1688     }
1689     bin->priv->last_ntpnstime = ntpnstime;
1690
1691     /* calculate offsets for each stream */
1692     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1693       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1694       gint64 ts_offset;
1695
1696       /* ignore streams for which we didn't receive an SR packet yet, we
1697        * can't synchronize them yet. We can however sync other streams just
1698        * fine. */
1699       if (!ostream->have_sync)
1700         continue;
1701
1702       /* calculate offset to our reference stream, this should always give a
1703        * positive number. */
1704       if (use_rtp)
1705         ts_offset = ostream->rtp_delta - rtp_min;
1706       else
1707         ts_offset = ostream->rt_delta - min;
1708
1709       stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
1710           bin->min_ts_offset, TRUE);
1711     }
1712   }
1713   gst_rtp_bin_send_sync_event (stream);
1714
1715   return;
1716 }
1717
1718 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1719   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1720           (b) = gst_rtcp_packet_move_to_next ((packet)))
1721
1722 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1723   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1724           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1725
1726 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1727   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1728           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1729
1730 static void
1731 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1732     GstRtpBinStream * stream)
1733 {
1734   GstRtpBin *bin;
1735   GstRTCPPacket packet;
1736   guint32 ssrc;
1737   guint64 ntpnstime, inband_ntpnstime;
1738   gboolean have_sr;
1739   gboolean more;
1740   guint64 base_rtptime;
1741   guint64 base_time;
1742   guint clock_rate;
1743   guint64 clock_base;
1744   guint64 extrtptime, inband_ext_rtptime;
1745   GstBuffer *buffer;
1746   const gchar *cname;
1747   GstRTCPBuffer rtcp = { NULL, };
1748
1749   bin = stream->bin;
1750
1751   GST_DEBUG_OBJECT (bin, "sync handler called");
1752
1753   /* get the last relation between the rtp timestamps and the gstreamer
1754    * timestamps. We get this info directly from the jitterbuffer which
1755    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1756    * what the current situation is. */
1757   if (!gst_structure_get_uint64 (s, "base-rtptime", &base_rtptime) ||
1758       !gst_structure_get_uint64 (s, "base-time", &base_time) ||
1759       !gst_structure_get_uint (s, "clock-rate", &clock_rate) ||
1760       !gst_structure_get_uint64 (s, "clock-base", &clock_base)) {
1761     /* invalid structure */
1762     return;
1763   }
1764
1765   cname = gst_structure_get_string (s, "cname");
1766
1767   /* if the jitterbuffer directly got the NTP timestamp then don't work
1768    * through the RTCP SR, otherwise extract it from there */
1769   if (gst_structure_get_uint64 (s, "inband-ntpnstime", &inband_ntpnstime)
1770       && gst_structure_get_uint64 (s, "inband-ext-rtptime", &inband_ext_rtptime)
1771       && (cname = gst_structure_get_string (s, "cname"))
1772       && gst_structure_get_uint (s, "ssrc", &ssrc)) {
1773     GST_DEBUG_OBJECT (bin,
1774         "handle sync from inband NTP-64 information for SSRC %08x", ssrc);
1775
1776     if (ssrc != stream->ssrc)
1777       return;
1778
1779     GST_RTP_BIN_LOCK (bin);
1780     gst_rtp_bin_associate (bin, stream, strlen (cname), (const guint8 *) cname,
1781         inband_ntpnstime, inband_ext_rtptime, base_rtptime, base_time,
1782         clock_rate, clock_base);
1783     GST_RTP_BIN_UNLOCK (bin);
1784     return;
1785   }
1786
1787   if (!gst_structure_get_uint64 (s, "sr-ext-rtptime", &extrtptime)
1788       || !gst_structure_has_field_typed (s, "sr-buffer", GST_TYPE_BUFFER)) {
1789     /* invalid structure */
1790     return;
1791   }
1792
1793   GST_DEBUG_OBJECT (bin, "handle sync from RTCP SR information");
1794
1795   /* get RTCP SR ntpnstime if available */
1796   if (gst_structure_get_uint64 (s, "sr-ntpnstime", &ntpnstime) && cname) {
1797     GST_RTP_BIN_LOCK (bin);
1798     /* associate the stream to CNAME */
1799     gst_rtp_bin_associate (bin, stream, strlen (cname),
1800         (const guint8 *) cname, ntpnstime, extrtptime, base_rtptime,
1801         base_time, clock_rate, clock_base);
1802     GST_RTP_BIN_UNLOCK (bin);
1803     return;
1804   }
1805
1806   /* otherwise parse the RTCP packet */
1807   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1808
1809   have_sr = FALSE;
1810
1811   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1812
1813   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1814     /* first packet must be SR or RR or else the validate would have failed */
1815     switch (gst_rtcp_packet_get_type (&packet)) {
1816       case GST_RTCP_TYPE_SR:
1817         /* only parse first. There is only supposed to be one SR in the packet
1818          * but we will deal with malformed packets gracefully by trying the
1819          * next RTCP packet. */
1820         if (have_sr)
1821           continue;
1822
1823         /* get NTP time */
1824         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntpnstime, NULL,
1825             NULL, NULL);
1826
1827         /* convert ntptime to nanoseconds */
1828         ntpnstime = gst_util_uint64_scale (ntpnstime, GST_SECOND,
1829             (G_GINT64_CONSTANT (1) << 32));
1830
1831         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1832
1833         /* ignore SR that is not ours and check the next RTCP packet */
1834         if (ssrc != stream->ssrc)
1835           continue;
1836
1837         have_sr = TRUE;
1838
1839         /* If we already have the CNAME don't require parsing SDES */
1840         if (cname) {
1841           GST_RTP_BIN_LOCK (bin);
1842           /* associate the stream to CNAME */
1843           gst_rtp_bin_associate (bin, stream, strlen (cname),
1844               (const guint8 *) cname, ntpnstime, extrtptime, base_rtptime,
1845               base_time, clock_rate, clock_base);
1846           GST_RTP_BIN_UNLOCK (bin);
1847
1848           goto out;
1849         }
1850
1851         break;
1852       case GST_RTCP_TYPE_SDES:
1853       {
1854         gboolean more_items;
1855
1856         /* Bail out if we have not seen an SR item yet. */
1857         if (!have_sr)
1858           goto out;
1859
1860         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1861           gboolean more_entries;
1862
1863           /* skip items that are not about the SSRC of the sender */
1864           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1865             continue;
1866
1867           /* find the CNAME entry */
1868           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1869             GstRTCPSDESType type;
1870             guint8 len;
1871             const guint8 *data;
1872
1873             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len,
1874                 (guint8 **) & data);
1875
1876             if (type == GST_RTCP_SDES_CNAME) {
1877               GST_RTP_BIN_LOCK (bin);
1878               /* associate the stream to CNAME */
1879               gst_rtp_bin_associate (bin, stream, len, data,
1880                   ntpnstime, extrtptime, base_rtptime, base_time, clock_rate,
1881                   clock_base);
1882               GST_RTP_BIN_UNLOCK (bin);
1883
1884               goto out;
1885             }
1886           }
1887         }
1888
1889         /* only deal with first SDES, there is only supposed to be one SDES in
1890          * the RTCP packet but we deal with bad packets gracefully. */
1891         goto out;
1892       }
1893       default:
1894         /* we can ignore these packets */
1895         break;
1896     }
1897   }
1898 out:
1899   gst_rtcp_buffer_unmap (&rtcp);
1900 }
1901
1902 /* create a new stream with @ssrc in @session. Must be called with
1903  * RTP_SESSION_LOCK. */
1904 static GstRtpBinStream *
1905 create_stream (GstRtpBinSession * session, guint32 ssrc)
1906 {
1907   GstElement *buffer, *demux = NULL;
1908   GstRtpBinStream *stream;
1909   GstRtpBin *rtpbin;
1910   GstState target;
1911   GObjectClass *jb_class;
1912
1913   rtpbin = session->bin;
1914
1915   if (g_slist_length (session->streams) >= rtpbin->max_streams)
1916     goto max_streams;
1917
1918   if (!(buffer =
1919           session_request_element (session, SIGNAL_REQUEST_JITTERBUFFER)))
1920     goto no_jitterbuffer;
1921
1922   if (!rtpbin->ignore_pt) {
1923     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1924       goto no_demux;
1925   }
1926
1927   stream = g_new0 (GstRtpBinStream, 1);
1928   stream->ssrc = ssrc;
1929   stream->bin = rtpbin;
1930   stream->session = session;
1931   stream->buffer = gst_object_ref (buffer);
1932   stream->demux = demux;
1933
1934   stream->have_sync = FALSE;
1935   stream->rt_delta = 0;
1936   stream->avg_ts_offset = 0;
1937   stream->is_initialized = FALSE;
1938   stream->rtp_delta = 0;
1939   stream->percent = 100;
1940   stream->clock_base = -100 * GST_SECOND;
1941   session->streams = g_slist_prepend (session->streams, stream);
1942
1943   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (buffer));
1944
1945   if (g_signal_lookup ("request-pt-map", G_OBJECT_TYPE (buffer)) != 0) {
1946     /* provide clock_rate to the jitterbuffer when needed */
1947     stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1948         (GCallback) pt_map_requested, session);
1949   }
1950   if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (buffer)) != 0) {
1951     stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1952         (GCallback) on_npt_stop, stream);
1953   }
1954
1955   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1956   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1957
1958   /* configure latency and packet lost */
1959   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1960
1961   if (g_object_class_find_property (jb_class, "drop-on-latency"))
1962     g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1963   if (g_object_class_find_property (jb_class, "do-lost"))
1964     g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1965   if (g_object_class_find_property (jb_class, "mode"))
1966     g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1967   if (g_object_class_find_property (jb_class, "do-retransmission"))
1968     g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1969   if (g_object_class_find_property (jb_class, "max-rtcp-rtp-time-diff"))
1970     g_object_set (buffer, "max-rtcp-rtp-time-diff",
1971         rtpbin->max_rtcp_rtp_time_diff, NULL);
1972   if (g_object_class_find_property (jb_class, "max-dropout-time"))
1973     g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, NULL);
1974   if (g_object_class_find_property (jb_class, "max-misorder-time"))
1975     g_object_set (buffer, "max-misorder-time", rtpbin->max_misorder_time, NULL);
1976   if (g_object_class_find_property (jb_class, "rfc7273-sync"))
1977     g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
1978   if (g_object_class_find_property (jb_class, "add-reference-timestamp-meta"))
1979     g_object_set (buffer, "add-reference-timestamp-meta",
1980         rtpbin->add_reference_timestamp_meta, NULL);
1981   if (g_object_class_find_property (jb_class, "max-ts-offset-adjustment"))
1982     g_object_set (buffer, "max-ts-offset-adjustment",
1983         rtpbin->max_ts_offset_adjustment, NULL);
1984   if (g_object_class_find_property (jb_class, "sync-interval"))
1985     g_object_set (buffer, "sync-interval", rtpbin->rtcp_sync_interval, NULL);
1986
1987   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1988       buffer, session->id, ssrc);
1989
1990   if (!rtpbin->ignore_pt)
1991     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1992
1993   /* link stuff */
1994   if (demux)
1995     gst_element_link_pads_full (buffer, "src", demux, "sink",
1996         GST_PAD_LINK_CHECK_NOTHING);
1997
1998   if (rtpbin->buffering) {
1999     guint64 last_out;
2000
2001     if (g_signal_lookup ("set-active", G_OBJECT_TYPE (buffer)) != 0) {
2002       GST_INFO_OBJECT (rtpbin,
2003           "bin is buffering, set jitterbuffer as not active");
2004       g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0,
2005           &last_out);
2006     }
2007   }
2008
2009
2010   GST_OBJECT_LOCK (rtpbin);
2011   target = GST_STATE_TARGET (rtpbin);
2012   GST_OBJECT_UNLOCK (rtpbin);
2013
2014   /* from sink to source */
2015   if (demux)
2016     gst_element_set_state (demux, target);
2017
2018   gst_element_set_state (buffer, target);
2019
2020   return stream;
2021
2022   /* ERRORS */
2023 max_streams:
2024   {
2025     GST_WARNING_OBJECT (rtpbin, "stream exceeds maximum (%d)",
2026         rtpbin->max_streams);
2027     return NULL;
2028   }
2029 no_jitterbuffer:
2030   {
2031     g_warning ("rtpbin: could not create rtpjitterbuffer element");
2032     return NULL;
2033   }
2034 no_demux:
2035   {
2036     gst_object_unref (buffer);
2037     g_warning ("rtpbin: could not create rtpptdemux element");
2038     return NULL;
2039   }
2040 }
2041
2042 /* called with RTP_BIN_LOCK */
2043 static void
2044 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
2045 {
2046   GstRtpBinSession *sess = stream->session;
2047   GSList *clients, *next_client;
2048
2049   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
2050
2051   gst_element_set_locked_state (stream->buffer, TRUE);
2052   if (stream->demux)
2053     gst_element_set_locked_state (stream->demux, TRUE);
2054
2055   gst_element_set_state (stream->buffer, GST_STATE_NULL);
2056   if (stream->demux)
2057     gst_element_set_state (stream->demux, GST_STATE_NULL);
2058
2059   if (stream->demux) {
2060     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
2061     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
2062     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
2063     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
2064   }
2065
2066   if (stream->buffer_handlesync_sig)
2067     g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
2068   if (stream->buffer_ptreq_sig)
2069     g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
2070   if (stream->buffer_ntpstop_sig)
2071     g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
2072
2073   sess->elements = g_slist_remove (sess->elements, stream->buffer);
2074   remove_bin_element (stream->buffer, bin);
2075   gst_object_unref (stream->buffer);
2076
2077   if (stream->demux)
2078     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
2079
2080   for (clients = bin->clients; clients; clients = next_client) {
2081     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
2082     GSList *streams, *next_stream;
2083
2084     next_client = g_slist_next (clients);
2085
2086     for (streams = client->streams; streams; streams = next_stream) {
2087       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
2088
2089       next_stream = g_slist_next (streams);
2090
2091       if (ostream == stream) {
2092         client->streams = g_slist_delete_link (client->streams, streams);
2093         /* If this was the last stream belonging to this client,
2094          * clean up the client. */
2095         if (--client->nstreams == 0) {
2096           bin->clients = g_slist_delete_link (bin->clients, clients);
2097           free_client (client, bin);
2098           break;
2099         }
2100       }
2101     }
2102   }
2103   g_free (stream);
2104 }
2105
2106 /* GObject vmethods */
2107 static void gst_rtp_bin_dispose (GObject * object);
2108 static void gst_rtp_bin_finalize (GObject * object);
2109 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
2110     const GValue * value, GParamSpec * pspec);
2111 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
2112     GValue * value, GParamSpec * pspec);
2113
2114 /* GstElement vmethods */
2115 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
2116     GstStateChange transition);
2117 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
2118     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
2119 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
2120 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
2121
2122 #define gst_rtp_bin_parent_class parent_class
2123 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
2124 GST_ELEMENT_REGISTER_DEFINE (rtpbin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN);
2125
2126 static gboolean
2127 _gst_element_accumulator (GSignalInvocationHint * ihint,
2128     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2129 {
2130   GstElement *element;
2131
2132   element = g_value_get_object (handler_return);
2133   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
2134
2135   g_value_set_object (return_accu, element);
2136
2137   /* stop emission if we have an element */
2138   return (element == NULL);
2139 }
2140
2141 static gboolean
2142 _gst_caps_accumulator (GSignalInvocationHint * ihint,
2143     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2144 {
2145   GstCaps *caps;
2146
2147   caps = g_value_get_boxed (handler_return);
2148   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
2149
2150   g_value_set_boxed (return_accu, caps);
2151
2152   /* stop emission if we have a caps */
2153   return (caps == NULL);
2154 }
2155
2156 static void
2157 gst_rtp_bin_class_init (GstRtpBinClass * klass)
2158 {
2159   GObjectClass *gobject_class;
2160   GstElementClass *gstelement_class;
2161   GstBinClass *gstbin_class;
2162
2163   gobject_class = (GObjectClass *) klass;
2164   gstelement_class = (GstElementClass *) klass;
2165   gstbin_class = (GstBinClass *) klass;
2166
2167   gobject_class->dispose = gst_rtp_bin_dispose;
2168   gobject_class->finalize = gst_rtp_bin_finalize;
2169   gobject_class->set_property = gst_rtp_bin_set_property;
2170   gobject_class->get_property = gst_rtp_bin_get_property;
2171
2172   g_object_class_install_property (gobject_class, PROP_LATENCY,
2173       g_param_spec_uint ("latency", "Buffer latency in ms",
2174           "Default amount of ms to buffer in the jitterbuffers", 0,
2175           G_MAXUINT, DEFAULT_LATENCY_MS,
2176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2177
2178   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
2179       g_param_spec_boolean ("drop-on-latency",
2180           "Drop buffers when maximum latency is reached",
2181           "Tells the jitterbuffer to never exceed the given latency in size",
2182           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2183
2184   /**
2185    * GstRtpBin::request-pt-map:
2186    * @rtpbin: the object which received the signal
2187    * @session: the session
2188    * @pt: the pt
2189    *
2190    * Request the payload type as #GstCaps for @pt in @session.
2191    */
2192   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
2193       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
2194       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
2195       _gst_caps_accumulator, NULL, NULL, GST_TYPE_CAPS, 2, G_TYPE_UINT,
2196       G_TYPE_UINT);
2197
2198     /**
2199    * GstRtpBin::payload-type-change:
2200    * @rtpbin: the object which received the signal
2201    * @session: the session
2202    * @pt: the pt
2203    *
2204    * Signal that the current payload type changed to @pt in @session.
2205    */
2206   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
2207       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
2208       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
2209       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2210
2211   /**
2212    * GstRtpBin::clear-pt-map:
2213    * @rtpbin: the object which received the signal
2214    *
2215    * Clear all previously cached pt-mapping obtained with
2216    * #GstRtpBin::request-pt-map.
2217    */
2218   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
2219       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
2220       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2221           clear_pt_map), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2222
2223   /**
2224    * GstRtpBin::reset-sync:
2225    * @rtpbin: the object which received the signal
2226    *
2227    * Reset all currently configured lip-sync parameters and require new SR
2228    * packets for all streams before lip-sync is attempted again.
2229    */
2230   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
2231       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
2232       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2233           reset_sync), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2234
2235   /**
2236    * GstRtpBin::get-session:
2237    * @rtpbin: the object which received the signal
2238    * @id: the session id
2239    *
2240    * Request the related GstRtpSession as #GstElement related with session @id.
2241    *
2242    * Since: 1.8
2243    */
2244   gst_rtp_bin_signals[SIGNAL_GET_SESSION] =
2245       g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass),
2246       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2247           get_session), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2248
2249   /**
2250    * GstRtpBin::get-internal-session:
2251    * @rtpbin: the object which received the signal
2252    * @id: the session id
2253    *
2254    * Request the internal RTPSession object as #GObject in session @id.
2255    */
2256   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
2257       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
2258       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2259           get_internal_session), NULL, NULL, NULL, RTP_TYPE_SESSION, 1,
2260       G_TYPE_UINT);
2261
2262   /**
2263    * GstRtpBin::get-internal-storage:
2264    * @rtpbin: the object which received the signal
2265    * @id: the session id
2266    *
2267    * Request the internal RTPStorage object as #GObject in session @id. This
2268    * is the internal storage used by the RTPStorage element, which is used to
2269    * keep a backlog of received RTP packets for the session @id.
2270    *
2271    * Since: 1.14
2272    */
2273   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_STORAGE] =
2274       g_signal_new ("get-internal-storage", G_TYPE_FROM_CLASS (klass),
2275       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2276           get_internal_storage), NULL, NULL, NULL, G_TYPE_OBJECT, 1,
2277       G_TYPE_UINT);
2278
2279   /**
2280    * GstRtpBin::get-storage:
2281    * @rtpbin: the object which received the signal
2282    * @id: the session id
2283    *
2284    * Request the RTPStorage element as #GObject in session @id. This element
2285    * is used to keep a backlog of received RTP packets for the session @id.
2286    *
2287    * Since: 1.16
2288    */
2289   gst_rtp_bin_signals[SIGNAL_GET_STORAGE] =
2290       g_signal_new ("get-storage", G_TYPE_FROM_CLASS (klass),
2291       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2292           get_storage), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2293
2294   /**
2295    * GstRtpBin::clear-ssrc:
2296    * @rtpbin: the object which received the signal
2297    * @id: the session id
2298    * @ssrc: the ssrc
2299    *
2300    * Remove all pads from rtpssrcdemux element associated with the specified
2301    * ssrc. This delegate the action signal to the rtpssrcdemux element
2302    * associated with the specified session.
2303    *
2304    * Since: 1.20
2305    */
2306   gst_rtp_bin_signals[SIGNAL_CLEAR_SSRC] =
2307       g_signal_new ("clear-ssrc", G_TYPE_FROM_CLASS (klass),
2308       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2309           clear_ssrc), NULL, NULL, NULL, G_TYPE_NONE, 2,
2310       G_TYPE_UINT, G_TYPE_UINT);
2311
2312   /**
2313    * GstRtpBin::on-new-ssrc:
2314    * @rtpbin: the object which received the signal
2315    * @session: the session
2316    * @ssrc: the SSRC
2317    *
2318    * Notify of a new SSRC that entered @session.
2319    */
2320   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
2321       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
2322       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
2323       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2324   /**
2325    * GstRtpBin::on-ssrc-collision:
2326    * @rtpbin: the object which received the signal
2327    * @session: the session
2328    * @ssrc: the SSRC
2329    *
2330    * Notify when we have an SSRC collision
2331    */
2332   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
2333       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
2334       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
2335       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2336   /**
2337    * GstRtpBin::on-ssrc-validated:
2338    * @rtpbin: the object which received the signal
2339    * @session: the session
2340    * @ssrc: the SSRC
2341    *
2342    * Notify of a new SSRC that became validated.
2343    */
2344   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
2345       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
2346       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
2347       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2348   /**
2349    * GstRtpBin::on-ssrc-active:
2350    * @rtpbin: the object which received the signal
2351    * @session: the session
2352    * @ssrc: the SSRC
2353    *
2354    * Notify of a SSRC that is active, i.e., sending RTCP.
2355    */
2356   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
2357       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
2358       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
2359       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2360   /**
2361    * GstRtpBin::on-ssrc-sdes:
2362    * @rtpbin: the object which received the signal
2363    * @session: the session
2364    * @ssrc: the SSRC
2365    *
2366    * Notify of a SSRC that is active, i.e., sending RTCP.
2367    */
2368   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
2369       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
2370       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
2371       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2372
2373   /**
2374    * GstRtpBin::on-bye-ssrc:
2375    * @rtpbin: the object which received the signal
2376    * @session: the session
2377    * @ssrc: the SSRC
2378    *
2379    * Notify of an SSRC that became inactive because of a BYE packet.
2380    */
2381   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
2382       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
2383       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
2384       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2385   /**
2386    * GstRtpBin::on-bye-timeout:
2387    * @rtpbin: the object which received the signal
2388    * @session: the session
2389    * @ssrc: the SSRC
2390    *
2391    * Notify of an SSRC that has timed out because of BYE
2392    */
2393   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
2394       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
2395       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
2396       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2397   /**
2398    * GstRtpBin::on-timeout:
2399    * @rtpbin: the object which received the signal
2400    * @session: the session
2401    * @ssrc: the SSRC
2402    *
2403    * Notify of an SSRC that has timed out
2404    */
2405   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
2406       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
2407       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
2408       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2409   /**
2410    * GstRtpBin::on-sender-timeout:
2411    * @rtpbin: the object which received the signal
2412    * @session: the session
2413    * @ssrc: the SSRC
2414    *
2415    * Notify of a sender SSRC that has timed out and became a receiver
2416    */
2417   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
2418       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
2419       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
2420       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2421
2422   /**
2423    * GstRtpBin::on-npt-stop:
2424    * @rtpbin: the object which received the signal
2425    * @session: the session
2426    * @ssrc: the SSRC
2427    *
2428    * Notify that SSRC sender has sent data up to the configured NPT stop time.
2429    */
2430   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
2431       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
2432       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
2433       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2434
2435   /**
2436    * GstRtpBin::request-rtp-encoder:
2437    * @rtpbin: the object which received the signal
2438    * @session: the session
2439    *
2440    * Request an RTP encoder element for the given @session. The encoder
2441    * element will be added to the bin if not previously added.
2442    *
2443    * If no handler is connected, no encoder will be used.
2444    *
2445    * Since: 1.4
2446    */
2447   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
2448       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
2449       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2450           request_rtp_encoder), _gst_element_accumulator, NULL, NULL,
2451       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2452
2453   /**
2454    * GstRtpBin::request-rtp-decoder:
2455    * @rtpbin: the object which received the signal
2456    * @session: the session
2457    *
2458    * Request an RTP decoder element for the given @session. The decoder
2459    * element will be added to the bin if not previously added.
2460    *
2461    * If no handler is connected, no encoder will be used.
2462    *
2463    * Since: 1.4
2464    */
2465   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
2466       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
2467       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2468           request_rtp_decoder), _gst_element_accumulator, NULL,
2469       NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2470
2471   /**
2472    * GstRtpBin::request-rtcp-encoder:
2473    * @rtpbin: the object which received the signal
2474    * @session: the session
2475    *
2476    * Request an RTCP encoder element for the given @session. The encoder
2477    * element will be added to the bin if not previously added.
2478    *
2479    * If no handler is connected, no encoder will be used.
2480    *
2481    * Since: 1.4
2482    */
2483   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
2484       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
2485       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2486           request_rtcp_encoder), _gst_element_accumulator, NULL, NULL,
2487       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2488
2489   /**
2490    * GstRtpBin::request-rtcp-decoder:
2491    * @rtpbin: the object which received the signal
2492    * @session: the session
2493    *
2494    * Request an RTCP decoder element for the given @session. The decoder
2495    * element will be added to the bin if not previously added.
2496    *
2497    * If no handler is connected, no encoder will be used.
2498    *
2499    * Since: 1.4
2500    */
2501   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
2502       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
2503       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2504           request_rtcp_decoder), _gst_element_accumulator, NULL, NULL,
2505       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2506
2507   /**
2508    * GstRtpBin::request-jitterbuffer:
2509    * @rtpbin: the object which received the signal
2510    * @session: the session
2511    *
2512    * Request a jitterbuffer element for the given @session.
2513    *
2514    * If no handler is connected, the default jitterbuffer will be used.
2515    *
2516    * Note: The provided element is expected to conform to the API exposed
2517    * by the standard #GstRtpJitterBuffer. Runtime checks will be made to
2518    * determine whether it exposes properties and signals before attempting
2519    * to set, call or connect to them, and some functionalities of #GstRtpBin
2520    * may not be available when that is not the case.
2521    *
2522    * This should be considered experimental API, as the standard jitterbuffer
2523    * API is susceptible to change, provided elements will have to update their
2524    * custom jitterbuffer's API to match the API of #GstRtpJitterBuffer if and
2525    * when it changes.
2526    *
2527    * Since: 1.18
2528    */
2529   gst_rtp_bin_signals[SIGNAL_REQUEST_JITTERBUFFER] =
2530       g_signal_new ("request-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2531       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2532           request_jitterbuffer), _gst_element_accumulator, NULL,
2533       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2534
2535   /**
2536    * GstRtpBin::new-jitterbuffer:
2537    * @rtpbin: the object which received the signal
2538    * @jitterbuffer: the new jitterbuffer
2539    * @session: the session
2540    * @ssrc: the SSRC
2541    *
2542    * Notify that a new @jitterbuffer was created for @session and @ssrc.
2543    * This signal can, for example, be used to configure @jitterbuffer.
2544    *
2545    * Since: 1.4
2546    */
2547   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
2548       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2549       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2550           new_jitterbuffer), NULL, NULL, NULL,
2551       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2552
2553   /**
2554    * GstRtpBin::new-storage:
2555    * @rtpbin: the object which received the signal
2556    * @storage: the new storage
2557    * @session: the session
2558    *
2559    * Notify that a new @storage was created for @session.
2560    * This signal can, for example, be used to configure @storage.
2561    *
2562    * Since: 1.14
2563    */
2564   gst_rtp_bin_signals[SIGNAL_NEW_STORAGE] =
2565       g_signal_new ("new-storage", G_TYPE_FROM_CLASS (klass),
2566       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2567           new_storage), NULL, NULL, NULL,
2568       G_TYPE_NONE, 2, GST_TYPE_ELEMENT, G_TYPE_UINT);
2569
2570   /**
2571    * GstRtpBin::request-aux-sender:
2572    * @rtpbin: the object which received the signal
2573    * @session: the session
2574    *
2575    * Request an AUX sender element for the given @session. The AUX
2576    * element will be added to the bin.
2577    *
2578    * If no handler is connected, no AUX element will be used.
2579    *
2580    * Since: 1.4
2581    */
2582   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2583       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2584       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2585           request_aux_sender), _gst_element_accumulator, NULL, NULL,
2586       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2587
2588   /**
2589    * GstRtpBin::request-aux-receiver:
2590    * @rtpbin: the object which received the signal
2591    * @session: the session
2592    *
2593    * Request an AUX receiver element for the given @session. The AUX
2594    * element will be added to the bin.
2595    *
2596    * If no handler is connected, no AUX element will be used.
2597    *
2598    * Since: 1.4
2599    */
2600   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2601       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2602       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2603           request_aux_receiver), _gst_element_accumulator, NULL, NULL,
2604       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2605
2606   /**
2607    * GstRtpBin::request-fec-decoder:
2608    * @rtpbin: the object which received the signal
2609    * @session: the session index
2610    *
2611    * Request a FEC decoder element for the given @session. The element
2612    * will be added to the bin after the pt demuxer.  If there are multiple
2613    * ssrc's and pt's in @session, this signal may be called multiple times for
2614    * the same @session each corresponding to a newly discovered ssrc.
2615    *
2616    * If no handler is connected, no FEC decoder will be used.
2617    *
2618    * Warning: usage of this signal is not appropriate for the BUNDLE case,
2619    * connect to #GstRtpBin::request-fec-decoder-full instead.
2620    *
2621    * Since: 1.14
2622    */
2623   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER] =
2624       g_signal_new ("request-fec-decoder", G_TYPE_FROM_CLASS (klass),
2625       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2626           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2627       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2628
2629   /**
2630    * GstRtpBin::request-fec-decoder-full:
2631    * @rtpbin: the object which received the signal
2632    * @session: the session index
2633    * @ssrc: the ssrc of the stream
2634    * @pt: the payload type
2635    *
2636    * Request a FEC decoder element for the given @session. The element
2637    * will be added to the bin after the pt demuxer.  If there are multiple
2638    * ssrc's and pt's in @session, this signal may be called multiple times for
2639    * the same @session each corresponding to a newly discovered ssrc and payload
2640    * type, those are provided as parameters.
2641    *
2642    * If no handler is connected, no FEC decoder will be used.
2643    *
2644    * Since: 1.20
2645    */
2646   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER_FULL] =
2647       g_signal_new ("request-fec-decoder-full", G_TYPE_FROM_CLASS (klass),
2648       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2649           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2650       GST_TYPE_ELEMENT, 3, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT);
2651
2652   /**
2653    * GstRtpBin::request-fec-encoder:
2654    * @rtpbin: the object which received the signal
2655    * @session: the session index
2656    *
2657    * Request a FEC encoder element for the given @session. The element
2658    * will be added to the bin after the RTPSession.
2659    *
2660    * If no handler is connected, no FEC encoder will be used.
2661    *
2662    * Since: 1.14
2663    */
2664   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_ENCODER] =
2665       g_signal_new ("request-fec-encoder", G_TYPE_FROM_CLASS (klass),
2666       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2667           request_fec_encoder), _gst_element_accumulator, NULL, NULL,
2668       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2669
2670   /**
2671    * GstRtpBin::on-new-sender-ssrc:
2672    * @rtpbin: the object which received the signal
2673    * @session: the session
2674    * @ssrc: the sender SSRC
2675    *
2676    * Notify of a new sender SSRC that entered @session.
2677    *
2678    * Since: 1.8
2679    */
2680   gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
2681       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
2682       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc),
2683       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2684   /**
2685    * GstRtpBin::on-sender-ssrc-active:
2686    * @rtpbin: the object which received the signal
2687    * @session: the session
2688    * @ssrc: the sender SSRC
2689    *
2690    * Notify of a sender SSRC that is active, i.e., sending RTCP.
2691    *
2692    * Since: 1.8
2693    */
2694   gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
2695       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
2696       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2697           on_sender_ssrc_active), NULL, NULL, NULL,
2698       G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2699
2700   g_object_class_install_property (gobject_class, PROP_SDES,
2701       g_param_spec_boxed ("sdes", "SDES",
2702           "The SDES items of this session",
2703           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
2704           | GST_PARAM_DOC_SHOW_DEFAULT));
2705
2706   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2707       g_param_spec_boolean ("do-lost", "Do Lost",
2708           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2709           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2710
2711   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2712       g_param_spec_boolean ("autoremove", "Auto Remove",
2713           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2714           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2715
2716   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2717       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2718           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2719           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2720
2721   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2722       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2723           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
2724           "(DEPRECATED: Use ntp-time-source property)",
2725           DEFAULT_USE_PIPELINE_CLOCK,
2726           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
2727   /**
2728    * GstRtpBin:buffer-mode:
2729    *
2730    * Control the buffering and timestamping mode used by the jitterbuffer.
2731    */
2732   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2733       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2734           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2735           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2736   /**
2737    * GstRtpBin:ntp-sync:
2738    *
2739    * Set the NTP time from the sender reports as the running-time on the
2740    * buffers. When both the sender and receiver have sychronized
2741    * running-time, i.e. when the clock and base-time is shared
2742    * between the receivers and the and the senders, this option can be
2743    * used to synchronize receivers on multiple machines.
2744    */
2745   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2746       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2747           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2748           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2749
2750   /**
2751    * GstRtpBin:rtcp-sync:
2752    *
2753    * If not synchronizing (directly) to the NTP clock, determines how to sync
2754    * the various streams.
2755    */
2756   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2757       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2758           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2759           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2760
2761   /**
2762    * GstRtpBin:rtcp-sync-interval:
2763    *
2764    * Determines how often to sync streams using RTCP data or inband NTP-64
2765    * header extensions.
2766    */
2767   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2768       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2769           "RTCP SR / NTP-64 interval synchronization (ms) (0 = always)",
2770           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2771           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2772
2773   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2774       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2775           "Send event downstream when a stream is synchronized to the sender",
2776           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2777
2778   /**
2779    * GstRtpBin:do-retransmission:
2780    *
2781    * Enables RTP retransmission on all streams. To control retransmission on
2782    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2783    * set the #GstRtpJitterBuffer:do-retransmission property on the
2784    * #GstRtpJitterBuffer object instead.
2785    */
2786   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2787       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2788           "Enable retransmission on all streams",
2789           DEFAULT_DO_RETRANSMISSION,
2790           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2791
2792   /**
2793    * GstRtpBin:rtp-profile:
2794    *
2795    * Sets the default RTP profile of newly created RTP sessions. The
2796    * profile can be changed afterwards on a per-session basis.
2797    */
2798   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
2799       g_param_spec_enum ("rtp-profile", "RTP Profile",
2800           "Default RTP profile of newly created sessions",
2801           GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
2802           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2803
2804   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
2805       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
2806           "NTP time source for RTCP packets",
2807           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
2808           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2809
2810   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
2811       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
2812           "Use send time or capture time for RTCP sync "
2813           "(TRUE = send time, FALSE = capture time)",
2814           DEFAULT_RTCP_SYNC_SEND_TIME,
2815           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2816
2817   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
2818       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
2819           "Maximum amount of time in ms that the RTP time in RTCP SRs "
2820           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
2821           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
2822           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2823
2824   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
2825       g_param_spec_uint ("max-dropout-time", "Max dropout time",
2826           "The maximum time (milliseconds) of missing packets tolerated.",
2827           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
2828           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2829
2830   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
2831       g_param_spec_uint ("max-misorder-time", "Max misorder time",
2832           "The maximum time (milliseconds) of misordered packets tolerated.",
2833           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
2834           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2835
2836   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
2837       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
2838           "Synchronize received streams to the RFC7273 clock "
2839           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
2840           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2841
2842   /**
2843    * GstRtpBin:add-reference-timestamp-meta:
2844    *
2845    * When syncing to a RFC7273 clock or after clock synchronization via RTCP or
2846    * inband NTP-64 header extensions has happened, add #GstReferenceTimestampMeta
2847    * to buffers with the original reconstructed reference clock timestamp.
2848    *
2849    * Since: 1.22
2850    */
2851   g_object_class_install_property (gobject_class,
2852       PROP_ADD_REFERENCE_TIMESTAMP_META,
2853       g_param_spec_boolean ("add-reference-timestamp-meta",
2854           "Add Reference Timestamp Meta",
2855           "Add Reference Timestamp Meta to buffers with the original clock timestamp "
2856           "before any adjustments when syncing to an RFC7273 clock or after clock "
2857           "synchronization via RTCP or inband NTP-64 header extensions has happened.",
2858           DEFAULT_ADD_REFERENCE_TIMESTAMP_META,
2859           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2860
2861   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
2862       g_param_spec_uint ("max-streams", "Max Streams",
2863           "The maximum number of streams to create for one session",
2864           0, G_MAXUINT, DEFAULT_MAX_STREAMS,
2865           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2866
2867   /**
2868    * GstRtpBin:max-ts-offset-adjustment:
2869    *
2870    * Syncing time stamps to NTP time adds a time offset. This parameter
2871    * specifies the maximum number of nanoseconds per frame that this time offset
2872    * may be adjusted with. This is used to avoid sudden large changes to time
2873    * stamps.
2874    *
2875    * Since: 1.14
2876    */
2877   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
2878       g_param_spec_uint64 ("max-ts-offset-adjustment",
2879           "Max Timestamp Offset Adjustment",
2880           "The maximum number of nanoseconds per frame that time stamp offsets "
2881           "may be adjusted (0 = no limit).", 0, G_MAXUINT64,
2882           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
2883           G_PARAM_STATIC_STRINGS));
2884
2885   /**
2886    * GstRtpBin:max-ts-offset:
2887    *
2888    * Used to set an upper limit of how large a time offset may be. This
2889    * is used to protect against unrealistic values as a result of either
2890    * client,server or clock issues.
2891    *
2892    * Since: 1.14
2893    */
2894   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET,
2895       g_param_spec_int64 ("max-ts-offset", "Max TS Offset",
2896           "The maximum absolute value of the time offset in (nanoseconds). "
2897           "Note, if the ntp-sync parameter is set the default value is "
2898           "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET,
2899           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2900
2901   /**
2902    * GstRtpBin:min-ts-offset:
2903    *
2904    * Used to set an lower limit for when a time offset is deemed large enough
2905    * to be useful for sync corrections.
2906    *
2907    * When streaming for instance audio, even very small ts_offsets cause
2908    * audible glitches. This property is used for controlling how sensitive the
2909    * adjustments should be to small deviations in ts_offset, occurring for
2910    * instance due to jittery network conditions or system load.
2911    *
2912    * Since: 1.22
2913    */
2914   g_object_class_install_property (gobject_class, PROP_MIN_TS_OFFSET,
2915       g_param_spec_uint64 ("min-ts-offset", "Min TS Offset",
2916           "The minimum absolute value of the time offset in (nanoseconds). "
2917           "Used to set an lower limit for when a time offset is deemed large "
2918           "enough to be useful for sync corrections."
2919           "Note, if the ntp-sync parameter is set the default value is "
2920           "changed to 0 (no limit)", 0, G_MAXUINT64, DEFAULT_MIN_TS_OFFSET,
2921           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2922
2923   /**
2924    * GstRtpBin:ts-offset-smoothing-factor:
2925    *
2926    * Controls the weighting between previous and current timestamp offsets in
2927    * a running moving average (RMA):
2928    * ts_offset_average(n) =
2929    *   ((ts-offset-smoothing-factor - 1) * ts_offset_average(n - 1) + ts_offset(n)) /
2930    *   ts-offset-smoothing-factor
2931    *
2932    * This can stabilize the timestamp offset and prevent unnecessary skew
2933    * corrections due to jitter introduced by network or system load.
2934    *
2935    * Since: 1.22
2936    */
2937   g_object_class_install_property (gobject_class,
2938       PROP_TS_OFFSET_SMOOTHING_FACTOR,
2939       g_param_spec_uint ("ts-offset-smoothing-factor",
2940           "Timestamp Offset Smoothing Factor",
2941           "Sets a smoothing factor for the timestamp offset in number of "
2942           "values for a calculated running moving average. "
2943           "(0 = no smoothing factor)", 0, G_MAXUINT,
2944           DEFAULT_TS_OFFSET_SMOOTHING_FACTOR,
2945           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2946
2947   /**
2948    * GstRtpBin:fec-decoders:
2949    *
2950    * Used to provide a factory used to build the FEC decoder for a
2951    * given session, as a command line alternative to
2952    * #GstRtpBin::request-fec-decoder.
2953    *
2954    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2955    *
2956    * Since: 1.20
2957    */
2958   g_object_class_install_property (gobject_class, PROP_FEC_DECODERS,
2959       g_param_spec_boxed ("fec-decoders", "Fec Decoders",
2960           "GstStructure mapping from session index to FEC decoder "
2961           "factory, eg "
2962           "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'",
2963           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2964
2965   /**
2966    * GstRtpBin:fec-encoders:
2967    *
2968    * Used to provide a factory used to build the FEC encoder for a
2969    * given session, as a command line alternative to
2970    * #GstRtpBin::request-fec-encoder.
2971    *
2972    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2973    *
2974    * Since: 1.20
2975    */
2976   g_object_class_install_property (gobject_class, PROP_FEC_ENCODERS,
2977       g_param_spec_boxed ("fec-encoders", "Fec Encoders",
2978           "GstStructure mapping from session index to FEC encoder "
2979           "factory, eg "
2980           "fec-encoders='fec,0=\"rtpst2022-1-fecenc\\ rows\\=5\\ columns\\=5\";'",
2981           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2982
2983   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2984   gstelement_class->request_new_pad =
2985       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2986   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2987
2988   /* sink pads */
2989   gst_element_class_add_static_pad_template (gstelement_class,
2990       &rtpbin_recv_rtp_sink_template);
2991   gst_element_class_add_static_pad_template (gstelement_class,
2992       &rtpbin_recv_fec_sink_template);
2993   gst_element_class_add_static_pad_template (gstelement_class,
2994       &rtpbin_recv_rtcp_sink_template);
2995   gst_element_class_add_static_pad_template (gstelement_class,
2996       &rtpbin_send_rtp_sink_template);
2997
2998   /* src pads */
2999   gst_element_class_add_static_pad_template (gstelement_class,
3000       &rtpbin_recv_rtp_src_template);
3001   gst_element_class_add_static_pad_template (gstelement_class,
3002       &rtpbin_send_rtcp_src_template);
3003   gst_element_class_add_static_pad_template (gstelement_class,
3004       &rtpbin_send_rtp_src_template);
3005   gst_element_class_add_static_pad_template (gstelement_class,
3006       &rtpbin_send_fec_src_template);
3007
3008   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
3009       "Filter/Network/RTP",
3010       "Real-Time Transport Protocol bin",
3011       "Wim Taymans <wim.taymans@gmail.com>");
3012
3013   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
3014
3015   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
3016   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
3017   klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session);
3018   klass->get_internal_session =
3019       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
3020   klass->get_storage = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_storage);
3021   klass->get_internal_storage =
3022       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_storage);
3023   klass->clear_ssrc = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_ssrc);
3024   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
3025   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
3026   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
3027   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
3028   klass->request_jitterbuffer =
3029       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_jitterbuffer);
3030
3031   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
3032
3033   gst_type_mark_as_plugin_api (GST_RTP_BIN_RTCP_SYNC_TYPE, 0);
3034 }
3035
3036 static void
3037 gst_rtp_bin_init (GstRtpBin * rtpbin)
3038 {
3039   gchar *cname;
3040
3041   rtpbin->priv = gst_rtp_bin_get_instance_private (rtpbin);
3042   g_mutex_init (&rtpbin->priv->bin_lock);
3043   g_mutex_init (&rtpbin->priv->dyn_lock);
3044
3045   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
3046   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
3047   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
3048   rtpbin->do_lost = DEFAULT_DO_LOST;
3049   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
3050   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
3051   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
3052   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
3053   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
3054   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
3055   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
3056   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
3057   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
3058   rtpbin->rtp_profile = DEFAULT_RTP_PROFILE;
3059   rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
3060   rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
3061   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
3062   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
3063   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
3064   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
3065   rtpbin->add_reference_timestamp_meta = DEFAULT_ADD_REFERENCE_TIMESTAMP_META;
3066   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
3067   rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
3068   rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
3069   rtpbin->max_ts_offset_is_set = FALSE;
3070   rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
3071   rtpbin->min_ts_offset_is_set = FALSE;
3072   rtpbin->ts_offset_smoothing_factor = DEFAULT_TS_OFFSET_SMOOTHING_FACTOR;
3073
3074   /* some default SDES entries */
3075   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
3076   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
3077       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
3078   rtpbin->fec_decoders =
3079       gst_structure_new_empty ("application/x-rtp-fec-decoders");
3080   rtpbin->fec_encoders =
3081       gst_structure_new_empty ("application/x-rtp-fec-encoders");
3082   g_free (cname);
3083 }
3084
3085 static void
3086 gst_rtp_bin_dispose (GObject * object)
3087 {
3088   GstRtpBin *rtpbin;
3089
3090   rtpbin = GST_RTP_BIN (object);
3091
3092   GST_RTP_BIN_LOCK (rtpbin);
3093   GST_DEBUG_OBJECT (object, "freeing sessions");
3094   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
3095   g_slist_free (rtpbin->sessions);
3096   rtpbin->sessions = NULL;
3097   GST_RTP_BIN_UNLOCK (rtpbin);
3098
3099   G_OBJECT_CLASS (parent_class)->dispose (object);
3100 }
3101
3102 static void
3103 gst_rtp_bin_finalize (GObject * object)
3104 {
3105   GstRtpBin *rtpbin;
3106
3107   rtpbin = GST_RTP_BIN (object);
3108
3109   if (rtpbin->sdes)
3110     gst_structure_free (rtpbin->sdes);
3111
3112   if (rtpbin->fec_decoders)
3113     gst_structure_free (rtpbin->fec_decoders);
3114
3115   if (rtpbin->fec_encoders)
3116     gst_structure_free (rtpbin->fec_encoders);
3117
3118   g_mutex_clear (&rtpbin->priv->bin_lock);
3119   g_mutex_clear (&rtpbin->priv->dyn_lock);
3120
3121   G_OBJECT_CLASS (parent_class)->finalize (object);
3122 }
3123
3124
3125 static void
3126 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
3127 {
3128   GSList *item;
3129
3130   if (sdes == NULL)
3131     return;
3132
3133   GST_RTP_BIN_LOCK (bin);
3134
3135   GST_OBJECT_LOCK (bin);
3136   if (bin->sdes)
3137     gst_structure_free (bin->sdes);
3138   bin->sdes = gst_structure_copy (sdes);
3139   GST_OBJECT_UNLOCK (bin);
3140
3141   /* store in all sessions */
3142   for (item = bin->sessions; item; item = g_slist_next (item)) {
3143     GstRtpBinSession *session = item->data;
3144     g_object_set (session->session, "sdes", sdes, NULL);
3145   }
3146
3147   GST_RTP_BIN_UNLOCK (bin);
3148 }
3149
3150 static void
3151 gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin,
3152     const GstStructure * decoders)
3153 {
3154   if (decoders == NULL)
3155     return;
3156
3157   GST_RTP_BIN_LOCK (bin);
3158
3159   GST_OBJECT_LOCK (bin);
3160   if (bin->fec_decoders)
3161     gst_structure_free (bin->fec_decoders);
3162   bin->fec_decoders = gst_structure_copy (decoders);
3163
3164   GST_OBJECT_UNLOCK (bin);
3165
3166   GST_RTP_BIN_UNLOCK (bin);
3167 }
3168
3169 static void
3170 gst_rtp_bin_set_fec_encoders_struct (GstRtpBin * bin,
3171     const GstStructure * encoders)
3172 {
3173   if (encoders == NULL)
3174     return;
3175
3176   GST_RTP_BIN_LOCK (bin);
3177
3178   GST_OBJECT_LOCK (bin);
3179   if (bin->fec_encoders)
3180     gst_structure_free (bin->fec_encoders);
3181   bin->fec_encoders = gst_structure_copy (encoders);
3182
3183   GST_OBJECT_UNLOCK (bin);
3184
3185   GST_RTP_BIN_UNLOCK (bin);
3186 }
3187
3188 static GstStructure *
3189 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
3190 {
3191   GstStructure *result;
3192
3193   GST_OBJECT_LOCK (bin);
3194   result = gst_structure_copy (bin->sdes);
3195   GST_OBJECT_UNLOCK (bin);
3196
3197   return result;
3198 }
3199
3200 static GstStructure *
3201 gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin)
3202 {
3203   GstStructure *result;
3204
3205   GST_OBJECT_LOCK (bin);
3206   result = gst_structure_copy (bin->fec_decoders);
3207   GST_OBJECT_UNLOCK (bin);
3208
3209   return result;
3210 }
3211
3212 static GstStructure *
3213 gst_rtp_bin_get_fec_encoders_struct (GstRtpBin * bin)
3214 {
3215   GstStructure *result;
3216
3217   GST_OBJECT_LOCK (bin);
3218   result = gst_structure_copy (bin->fec_encoders);
3219   GST_OBJECT_UNLOCK (bin);
3220
3221   return result;
3222 }
3223
3224 static void
3225 gst_rtp_bin_set_property (GObject * object, guint prop_id,
3226     const GValue * value, GParamSpec * pspec)
3227 {
3228   GstRtpBin *rtpbin;
3229
3230   rtpbin = GST_RTP_BIN (object);
3231
3232   switch (prop_id) {
3233     case PROP_LATENCY:
3234       GST_RTP_BIN_LOCK (rtpbin);
3235       rtpbin->latency_ms = g_value_get_uint (value);
3236       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
3237       GST_RTP_BIN_UNLOCK (rtpbin);
3238       /* propagate the property down to the jitterbuffer */
3239       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
3240       break;
3241     case PROP_DROP_ON_LATENCY:
3242       GST_RTP_BIN_LOCK (rtpbin);
3243       rtpbin->drop_on_latency = g_value_get_boolean (value);
3244       GST_RTP_BIN_UNLOCK (rtpbin);
3245       /* propagate the property down to the jitterbuffer */
3246       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3247           "drop-on-latency", value);
3248       break;
3249     case PROP_SDES:
3250       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
3251       break;
3252     case PROP_DO_LOST:
3253       GST_RTP_BIN_LOCK (rtpbin);
3254       rtpbin->do_lost = g_value_get_boolean (value);
3255       GST_RTP_BIN_UNLOCK (rtpbin);
3256       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
3257       break;
3258     case PROP_NTP_SYNC:
3259       rtpbin->ntp_sync = g_value_get_boolean (value);
3260       /* The default value of max_ts_offset depends on ntp_sync. If user
3261        * hasn't set it then change default value */
3262       if (!rtpbin->max_ts_offset_is_set) {
3263         if (rtpbin->ntp_sync) {
3264           rtpbin->max_ts_offset = 0;
3265         } else {
3266           rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
3267         }
3268       }
3269       if (!rtpbin->min_ts_offset_is_set) {
3270         if (rtpbin->ntp_sync) {
3271           rtpbin->min_ts_offset = 0;
3272         } else {
3273           rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
3274         }
3275       }
3276       break;
3277     case PROP_RTCP_SYNC:
3278       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
3279       break;
3280     case PROP_RTCP_SYNC_INTERVAL:
3281       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
3282       break;
3283     case PROP_IGNORE_PT:
3284       rtpbin->ignore_pt = g_value_get_boolean (value);
3285       break;
3286     case PROP_AUTOREMOVE:
3287       rtpbin->priv->autoremove = g_value_get_boolean (value);
3288       break;
3289     case PROP_USE_PIPELINE_CLOCK:
3290     {
3291       GSList *sessions;
3292       GST_RTP_BIN_LOCK (rtpbin);
3293       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
3294       for (sessions = rtpbin->sessions; sessions;
3295           sessions = g_slist_next (sessions)) {
3296         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3297
3298         g_object_set (G_OBJECT (session->session),
3299             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
3300       }
3301       GST_RTP_BIN_UNLOCK (rtpbin);
3302     }
3303       break;
3304     case PROP_DO_SYNC_EVENT:
3305       rtpbin->send_sync_event = g_value_get_boolean (value);
3306       break;
3307     case PROP_BUFFER_MODE:
3308       GST_RTP_BIN_LOCK (rtpbin);
3309       rtpbin->buffer_mode = g_value_get_enum (value);
3310       GST_RTP_BIN_UNLOCK (rtpbin);
3311       /* propagate the property down to the jitterbuffer */
3312       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
3313       break;
3314     case PROP_DO_RETRANSMISSION:
3315       GST_RTP_BIN_LOCK (rtpbin);
3316       rtpbin->do_retransmission = g_value_get_boolean (value);
3317       GST_RTP_BIN_UNLOCK (rtpbin);
3318       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3319           "do-retransmission", value);
3320       break;
3321     case PROP_RTP_PROFILE:
3322       rtpbin->rtp_profile = g_value_get_enum (value);
3323       break;
3324     case PROP_NTP_TIME_SOURCE:{
3325       GSList *sessions;
3326       GST_RTP_BIN_LOCK (rtpbin);
3327       rtpbin->ntp_time_source = g_value_get_enum (value);
3328       for (sessions = rtpbin->sessions; sessions;
3329           sessions = g_slist_next (sessions)) {
3330         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3331
3332         g_object_set (G_OBJECT (session->session),
3333             "ntp-time-source", rtpbin->ntp_time_source, NULL);
3334       }
3335       GST_RTP_BIN_UNLOCK (rtpbin);
3336       break;
3337     }
3338     case PROP_RTCP_SYNC_SEND_TIME:{
3339       GSList *sessions;
3340       GST_RTP_BIN_LOCK (rtpbin);
3341       rtpbin->rtcp_sync_send_time = g_value_get_boolean (value);
3342       for (sessions = rtpbin->sessions; sessions;
3343           sessions = g_slist_next (sessions)) {
3344         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3345
3346         g_object_set (G_OBJECT (session->session),
3347             "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL);
3348       }
3349       GST_RTP_BIN_UNLOCK (rtpbin);
3350       break;
3351     }
3352     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3353       GST_RTP_BIN_LOCK (rtpbin);
3354       rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value);
3355       GST_RTP_BIN_UNLOCK (rtpbin);
3356       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3357           "max-rtcp-rtp-time-diff", value);
3358       break;
3359     case PROP_MAX_DROPOUT_TIME:
3360       GST_RTP_BIN_LOCK (rtpbin);
3361       rtpbin->max_dropout_time = g_value_get_uint (value);
3362       GST_RTP_BIN_UNLOCK (rtpbin);
3363       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3364           "max-dropout-time", value);
3365       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
3366           value);
3367       break;
3368     case PROP_MAX_MISORDER_TIME:
3369       GST_RTP_BIN_LOCK (rtpbin);
3370       rtpbin->max_misorder_time = g_value_get_uint (value);
3371       GST_RTP_BIN_UNLOCK (rtpbin);
3372       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3373           "max-misorder-time", value);
3374       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time",
3375           value);
3376       break;
3377     case PROP_RFC7273_SYNC:
3378       rtpbin->rfc7273_sync = g_value_get_boolean (value);
3379       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3380           "rfc7273-sync", value);
3381       break;
3382     case PROP_ADD_REFERENCE_TIMESTAMP_META:
3383       rtpbin->add_reference_timestamp_meta = g_value_get_boolean (value);
3384       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3385           "add-reference-timestamp-meta", value);
3386       break;
3387     case PROP_MAX_STREAMS:
3388       rtpbin->max_streams = g_value_get_uint (value);
3389       break;
3390     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3391       rtpbin->max_ts_offset_adjustment = g_value_get_uint64 (value);
3392       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3393           "max-ts-offset-adjustment", value);
3394       break;
3395     case PROP_MAX_TS_OFFSET:
3396       rtpbin->max_ts_offset = g_value_get_int64 (value);
3397       rtpbin->max_ts_offset_is_set = TRUE;
3398       break;
3399     case PROP_MIN_TS_OFFSET:
3400       rtpbin->min_ts_offset = g_value_get_uint64 (value);
3401       rtpbin->min_ts_offset_is_set = TRUE;
3402       break;
3403     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3404       rtpbin->ts_offset_smoothing_factor = g_value_get_uint (value);
3405       break;
3406     case PROP_FEC_DECODERS:
3407       gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value));
3408       break;
3409     case PROP_FEC_ENCODERS:
3410       gst_rtp_bin_set_fec_encoders_struct (rtpbin, g_value_get_boxed (value));
3411       break;
3412     default:
3413       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3414       break;
3415   }
3416 }
3417
3418 static void
3419 gst_rtp_bin_get_property (GObject * object, guint prop_id,
3420     GValue * value, GParamSpec * pspec)
3421 {
3422   GstRtpBin *rtpbin;
3423
3424   rtpbin = GST_RTP_BIN (object);
3425
3426   switch (prop_id) {
3427     case PROP_LATENCY:
3428       GST_RTP_BIN_LOCK (rtpbin);
3429       g_value_set_uint (value, rtpbin->latency_ms);
3430       GST_RTP_BIN_UNLOCK (rtpbin);
3431       break;
3432     case PROP_DROP_ON_LATENCY:
3433       GST_RTP_BIN_LOCK (rtpbin);
3434       g_value_set_boolean (value, rtpbin->drop_on_latency);
3435       GST_RTP_BIN_UNLOCK (rtpbin);
3436       break;
3437     case PROP_SDES:
3438       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
3439       break;
3440     case PROP_DO_LOST:
3441       GST_RTP_BIN_LOCK (rtpbin);
3442       g_value_set_boolean (value, rtpbin->do_lost);
3443       GST_RTP_BIN_UNLOCK (rtpbin);
3444       break;
3445     case PROP_IGNORE_PT:
3446       g_value_set_boolean (value, rtpbin->ignore_pt);
3447       break;
3448     case PROP_NTP_SYNC:
3449       g_value_set_boolean (value, rtpbin->ntp_sync);
3450       break;
3451     case PROP_RTCP_SYNC:
3452       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
3453       break;
3454     case PROP_RTCP_SYNC_INTERVAL:
3455       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
3456       break;
3457     case PROP_AUTOREMOVE:
3458       g_value_set_boolean (value, rtpbin->priv->autoremove);
3459       break;
3460     case PROP_BUFFER_MODE:
3461       g_value_set_enum (value, rtpbin->buffer_mode);
3462       break;
3463     case PROP_USE_PIPELINE_CLOCK:
3464       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
3465       break;
3466     case PROP_DO_SYNC_EVENT:
3467       g_value_set_boolean (value, rtpbin->send_sync_event);
3468       break;
3469     case PROP_DO_RETRANSMISSION:
3470       GST_RTP_BIN_LOCK (rtpbin);
3471       g_value_set_boolean (value, rtpbin->do_retransmission);
3472       GST_RTP_BIN_UNLOCK (rtpbin);
3473       break;
3474     case PROP_RTP_PROFILE:
3475       g_value_set_enum (value, rtpbin->rtp_profile);
3476       break;
3477     case PROP_NTP_TIME_SOURCE:
3478       g_value_set_enum (value, rtpbin->ntp_time_source);
3479       break;
3480     case PROP_RTCP_SYNC_SEND_TIME:
3481       g_value_set_boolean (value, rtpbin->rtcp_sync_send_time);
3482       break;
3483     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3484       GST_RTP_BIN_LOCK (rtpbin);
3485       g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff);
3486       GST_RTP_BIN_UNLOCK (rtpbin);
3487       break;
3488     case PROP_MAX_DROPOUT_TIME:
3489       g_value_set_uint (value, rtpbin->max_dropout_time);
3490       break;
3491     case PROP_MAX_MISORDER_TIME:
3492       g_value_set_uint (value, rtpbin->max_misorder_time);
3493       break;
3494     case PROP_RFC7273_SYNC:
3495       g_value_set_boolean (value, rtpbin->rfc7273_sync);
3496       break;
3497     case PROP_ADD_REFERENCE_TIMESTAMP_META:
3498       g_value_set_boolean (value, rtpbin->add_reference_timestamp_meta);
3499       break;
3500     case PROP_MAX_STREAMS:
3501       g_value_set_uint (value, rtpbin->max_streams);
3502       break;
3503     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3504       g_value_set_uint64 (value, rtpbin->max_ts_offset_adjustment);
3505       break;
3506     case PROP_MAX_TS_OFFSET:
3507       g_value_set_int64 (value, rtpbin->max_ts_offset);
3508       break;
3509     case PROP_MIN_TS_OFFSET:
3510       g_value_set_uint64 (value, rtpbin->min_ts_offset);
3511       break;
3512     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3513       g_value_set_uint (value, rtpbin->ts_offset_smoothing_factor);
3514       break;
3515     case PROP_FEC_DECODERS:
3516       g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin));
3517       break;
3518     case PROP_FEC_ENCODERS:
3519       g_value_take_boxed (value, gst_rtp_bin_get_fec_encoders_struct (rtpbin));
3520       break;
3521     default:
3522       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3523       break;
3524   }
3525 }
3526
3527 static void
3528 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
3529 {
3530   GstRtpBin *rtpbin;
3531
3532   rtpbin = GST_RTP_BIN (bin);
3533
3534   switch (GST_MESSAGE_TYPE (message)) {
3535     case GST_MESSAGE_ELEMENT:
3536     {
3537       const GstStructure *s = gst_message_get_structure (message);
3538
3539       /* we change the structure name and add the session ID to it */
3540       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
3541         GstRtpBinSession *sess;
3542
3543         /* find the session we set it as object data */
3544         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3545             "GstRTPBin.session");
3546
3547         if (G_LIKELY (sess)) {
3548           message = gst_message_make_writable (message);
3549           s = gst_message_get_structure (message);
3550           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
3551               sess->id, NULL);
3552         }
3553       }
3554       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3555       break;
3556     }
3557     case GST_MESSAGE_BUFFERING:
3558     {
3559       gint percent;
3560       gint min_percent = 100;
3561       GSList *sessions, *streams;
3562       GstRtpBinStream *stream;
3563       gboolean change = FALSE, active = FALSE;
3564       GstClockTime min_out_time;
3565       GstBufferingMode mode;
3566       gint avg_in, avg_out;
3567       gint64 buffering_left;
3568
3569       gst_message_parse_buffering (message, &percent);
3570       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
3571           &buffering_left);
3572
3573       stream =
3574           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3575           "GstRTPBin.stream");
3576
3577       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
3578
3579       /* get the stream */
3580       if (G_LIKELY (stream)) {
3581         GST_RTP_BIN_LOCK (rtpbin);
3582         /* fill in the percent */
3583         stream->percent = percent;
3584
3585         /* calculate the min value for all streams */
3586         for (sessions = rtpbin->sessions; sessions;
3587             sessions = g_slist_next (sessions)) {
3588           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3589
3590           GST_RTP_SESSION_LOCK (session);
3591           if (session->streams) {
3592             for (streams = session->streams; streams;
3593                 streams = g_slist_next (streams)) {
3594               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3595
3596               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
3597                   stream->percent);
3598
3599               /* find min percent */
3600               if (min_percent > stream->percent)
3601                 min_percent = stream->percent;
3602             }
3603           } else {
3604             GST_INFO_OBJECT (bin,
3605                 "session has no streams, setting min_percent to 0");
3606             min_percent = 0;
3607           }
3608           GST_RTP_SESSION_UNLOCK (session);
3609         }
3610         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
3611
3612         if (rtpbin->buffering) {
3613           if (min_percent == 100) {
3614             rtpbin->buffering = FALSE;
3615             active = TRUE;
3616             change = TRUE;
3617           }
3618         } else {
3619           if (min_percent < 100) {
3620             /* pause the streams */
3621             rtpbin->buffering = TRUE;
3622             active = FALSE;
3623             change = TRUE;
3624           }
3625         }
3626         GST_RTP_BIN_UNLOCK (rtpbin);
3627
3628         gst_message_unref (message);
3629
3630         /* make a new buffering message with the min value */
3631         message =
3632             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
3633         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
3634             buffering_left);
3635
3636         if (G_UNLIKELY (change)) {
3637           GstClock *clock;
3638           guint64 running_time = 0;
3639           guint64 offset = 0;
3640
3641           /* figure out the running time when we have a clock */
3642           if (G_LIKELY ((clock =
3643                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
3644             guint64 now, base_time;
3645
3646             now = gst_clock_get_time (clock);
3647             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
3648             running_time = now - base_time;
3649             gst_object_unref (clock);
3650           }
3651           GST_DEBUG_OBJECT (bin,
3652               "running time now %" GST_TIME_FORMAT,
3653               GST_TIME_ARGS (running_time));
3654
3655           GST_RTP_BIN_LOCK (rtpbin);
3656
3657           /* when we reactivate, calculate the offsets so that all streams have
3658            * an output time that is at least as big as the running_time */
3659           offset = 0;
3660           if (active) {
3661             if (running_time > rtpbin->buffer_start) {
3662               offset = running_time - rtpbin->buffer_start;
3663               if (offset >= rtpbin->latency_ns)
3664                 offset -= rtpbin->latency_ns;
3665               else
3666                 offset = 0;
3667             }
3668           }
3669
3670           /* pause all streams */
3671           min_out_time = -1;
3672           for (sessions = rtpbin->sessions; sessions;
3673               sessions = g_slist_next (sessions)) {
3674             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3675
3676             GST_RTP_SESSION_LOCK (session);
3677             for (streams = session->streams; streams;
3678                 streams = g_slist_next (streams)) {
3679               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3680               GstElement *element = stream->buffer;
3681               guint64 last_out = -1;
3682
3683               if (g_signal_lookup ("set-active", G_OBJECT_TYPE (element)) != 0) {
3684                 g_signal_emit_by_name (element, "set-active", active, offset,
3685                     &last_out);
3686               }
3687
3688               if (!active) {
3689                 g_object_get (element, "percent", &stream->percent, NULL);
3690
3691                 if (last_out == -1)
3692                   last_out = 0;
3693                 if (min_out_time == -1 || last_out < min_out_time)
3694                   min_out_time = last_out;
3695               }
3696
3697               GST_DEBUG_OBJECT (bin,
3698                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
3699                   GST_TIME_FORMAT ", percent %d", element, active,
3700                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
3701                   stream->percent);
3702             }
3703             GST_RTP_SESSION_UNLOCK (session);
3704           }
3705           GST_DEBUG_OBJECT (bin,
3706               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
3707
3708           /* the buffer_start is the min out time of all paused jitterbuffers */
3709           if (!active)
3710             rtpbin->buffer_start = min_out_time;
3711
3712           GST_RTP_BIN_UNLOCK (rtpbin);
3713         }
3714       }
3715       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3716       break;
3717     }
3718     default:
3719     {
3720       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3721       break;
3722     }
3723   }
3724 }
3725
3726 static GstStateChangeReturn
3727 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
3728 {
3729   GstStateChangeReturn res;
3730   GstRtpBin *rtpbin;
3731   GstRtpBinPrivate *priv;
3732
3733   rtpbin = GST_RTP_BIN (element);
3734   priv = rtpbin->priv;
3735
3736   switch (transition) {
3737     case GST_STATE_CHANGE_NULL_TO_READY:
3738       break;
3739     case GST_STATE_CHANGE_READY_TO_PAUSED:
3740       priv->last_ntpnstime = 0;
3741       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
3742       g_atomic_int_set (&priv->shutdown, 0);
3743       break;
3744     case GST_STATE_CHANGE_PAUSED_TO_READY:
3745       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
3746       g_atomic_int_set (&priv->shutdown, 1);
3747       /* wait for all callbacks to end by taking the lock. No new callbacks will
3748        * be able to happen as we set the shutdown flag. */
3749       GST_RTP_BIN_DYN_LOCK (rtpbin);
3750       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
3751       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3752       break;
3753     default:
3754       break;
3755   }
3756
3757   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3758
3759   switch (transition) {
3760     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3761       break;
3762     case GST_STATE_CHANGE_PAUSED_TO_READY:
3763       break;
3764     case GST_STATE_CHANGE_READY_TO_NULL:
3765       break;
3766     default:
3767       break;
3768   }
3769   return res;
3770 }
3771
3772 static GstElement *
3773 session_request_element_full (GstRtpBinSession * session, guint signal,
3774     guint ssrc, guint8 pt)
3775 {
3776   GstElement *element = NULL;
3777   GstRtpBin *bin = session->bin;
3778
3779   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, ssrc, pt,
3780       &element);
3781
3782   if (element) {
3783     if (!bin_manage_element (bin, element))
3784       goto manage_failed;
3785     session->elements = g_slist_prepend (session->elements, element);
3786   }
3787   return element;
3788
3789   /* ERRORS */
3790 manage_failed:
3791   {
3792     GST_WARNING_OBJECT (bin, "unable to manage element");
3793     gst_object_unref (element);
3794     return NULL;
3795   }
3796 }
3797
3798 static GstElement *
3799 session_request_element (GstRtpBinSession * session, guint signal)
3800 {
3801   GstElement *element = NULL;
3802   GstRtpBin *bin = session->bin;
3803
3804   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
3805
3806   if (element) {
3807     if (!bin_manage_element (bin, element))
3808       goto manage_failed;
3809     session->elements = g_slist_prepend (session->elements, element);
3810   }
3811   return element;
3812
3813   /* ERRORS */
3814 manage_failed:
3815   {
3816     GST_WARNING_OBJECT (bin, "unable to manage element");
3817     gst_object_unref (element);
3818     return NULL;
3819   }
3820 }
3821
3822 static gboolean
3823 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
3824 {
3825   GstPad *gpad = GST_PAD_CAST (user_data);
3826
3827   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
3828   gst_pad_store_sticky_event (gpad, *event);
3829
3830   return TRUE;
3831 }
3832
3833 static gboolean
3834 ensure_early_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
3835 {
3836   const gchar *factory;
3837   gchar *sess_id_str;
3838
3839   if (session->early_fec_decoder)
3840     goto done;
3841
3842   sess_id_str = g_strdup_printf ("%u", session->id);
3843   factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str);
3844   g_free (sess_id_str);
3845
3846   /* First try the property */
3847   if (factory) {
3848     GError *err = NULL;
3849
3850     session->early_fec_decoder =
3851         gst_parse_bin_from_description_full (factory, TRUE, NULL,
3852         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
3853         &err);
3854     if (!session->early_fec_decoder) {
3855       GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
3856           err->message);
3857     }
3858
3859     bin_manage_element (session->bin, session->early_fec_decoder);
3860     session->elements =
3861         g_slist_prepend (session->elements, session->early_fec_decoder);
3862     GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
3863         " for session %u", session->early_fec_decoder, session->id);
3864   }
3865
3866   /* Do not fallback to the signal as the signal expects a fec decoder to
3867    * be placed at a different place in the pipeline */
3868
3869 done:
3870   return session->early_fec_decoder != NULL;
3871 }
3872
3873 static void
3874 expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
3875     guint8 pt)
3876 {
3877   GstElementClass *klass;
3878   GstPadTemplate *templ;
3879   gchar *padname;
3880   GstPad *gpad;
3881
3882   gst_object_ref (pad);
3883
3884   if (stream->session->storage) {
3885     /* First try the legacy signal, with no ssrc and pt as parameters.
3886      * This will likely cause issues for the BUNDLE case. */
3887     GstElement *fec_decoder =
3888         session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
3889
3890     /* Now try the new signal, where the application can provide a FEC
3891      * decoder according to ssrc and pt. */
3892     if (!fec_decoder) {
3893       fec_decoder =
3894           session_request_element_full (stream->session,
3895           SIGNAL_REQUEST_FEC_DECODER_FULL, stream->ssrc, pt);
3896     }
3897
3898     if (fec_decoder) {
3899       GstPad *sinkpad, *srcpad;
3900       GstPadLinkReturn ret;
3901
3902       sinkpad = gst_element_get_static_pad (fec_decoder, "sink");
3903
3904       if (!sinkpad)
3905         goto fec_decoder_sink_failed;
3906
3907       ret = gst_pad_link (pad, sinkpad);
3908       gst_object_unref (sinkpad);
3909
3910       if (ret != GST_PAD_LINK_OK)
3911         goto fec_decoder_link_failed;
3912
3913       srcpad = gst_element_get_static_pad (fec_decoder, "src");
3914
3915       if (!srcpad)
3916         goto fec_decoder_src_failed;
3917
3918       gst_pad_sticky_events_foreach (pad, copy_sticky_events, srcpad);
3919       gst_object_unref (pad);
3920       pad = srcpad;
3921     }
3922   }
3923
3924   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3925
3926   /* ghost the pad to the parent */
3927   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3928   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3929   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3930       stream->session->id, stream->ssrc, pt);
3931   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3932   g_free (padname);
3933   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
3934
3935   gst_pad_set_active (gpad, TRUE);
3936   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3937
3938   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3939   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3940
3941 done:
3942   gst_object_unref (pad);
3943
3944   return;
3945
3946 shutdown:
3947   {
3948     GST_DEBUG ("ignoring, we are shutting down");
3949     goto done;
3950   }
3951 fec_decoder_sink_failed:
3952   {
3953     g_warning ("rtpbin: failed to get fec encoder sink pad for session %u",
3954         stream->session->id);
3955     goto done;
3956   }
3957 fec_decoder_src_failed:
3958   {
3959     g_warning ("rtpbin: failed to get fec encoder src pad for session %u",
3960         stream->session->id);
3961     goto done;
3962   }
3963 fec_decoder_link_failed:
3964   {
3965     g_warning ("rtpbin: failed to link fec decoder for session %u",
3966         stream->session->id);
3967     goto done;
3968   }
3969 }
3970
3971 /* a new pad (SSRC) was created in @session. This signal is emitted from the
3972  * payload demuxer. */
3973 static void
3974 new_payload_found (GstElement * element, guint pt, GstPad * pad,
3975     GstRtpBinStream * stream)
3976 {
3977   GstRtpBin *rtpbin;
3978
3979   rtpbin = stream->bin;
3980
3981   GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt);
3982
3983   expose_recv_src_pad (rtpbin, pad, stream, pt);
3984 }
3985
3986 static void
3987 payload_pad_removed (GstElement * element, GstPad * pad,
3988     GstRtpBinStream * stream)
3989 {
3990   GstRtpBin *rtpbin;
3991   GstPad *gpad;
3992
3993   rtpbin = stream->bin;
3994
3995   GST_DEBUG ("payload pad removed");
3996
3997   GST_RTP_BIN_DYN_LOCK (rtpbin);
3998   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
3999     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
4000
4001     gst_pad_set_active (gpad, FALSE);
4002     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
4003   }
4004   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
4005 }
4006
4007 static GstCaps *
4008 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
4009 {
4010   GstRtpBin *rtpbin;
4011   GstCaps *caps;
4012
4013   rtpbin = session->bin;
4014
4015   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt,
4016       session->id);
4017
4018   caps = get_pt_map (session, pt);
4019   if (!caps)
4020     goto no_caps;
4021
4022   return caps;
4023
4024   /* ERRORS */
4025 no_caps:
4026   {
4027     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
4028     return NULL;
4029   }
4030 }
4031
4032 static GstCaps *
4033 ptdemux_pt_map_requested (GstElement * element, guint pt,
4034     GstRtpBinSession * session)
4035 {
4036   GstCaps *ret = pt_map_requested (element, pt, session);
4037
4038   if (ret && gst_caps_get_size (ret) == 1) {
4039     const GstStructure *s = gst_caps_get_structure (ret, 0);
4040     gboolean is_fec;
4041
4042     if (gst_structure_get_boolean (s, "is-fec", &is_fec) && is_fec) {
4043       GValue v = G_VALUE_INIT;
4044       GValue v2 = G_VALUE_INIT;
4045
4046       GST_INFO_OBJECT (session->bin, "Will ignore FEC pt %u in session %u", pt,
4047           session->id);
4048       g_value_init (&v, GST_TYPE_ARRAY);
4049       g_value_init (&v2, G_TYPE_INT);
4050       g_object_get_property (G_OBJECT (element), "ignored-payload-types", &v);
4051       g_value_set_int (&v2, pt);
4052       gst_value_array_append_value (&v, &v2);
4053       g_value_unset (&v2);
4054       g_object_set_property (G_OBJECT (element), "ignored-payload-types", &v);
4055       g_value_unset (&v);
4056     }
4057   }
4058
4059   return ret;
4060 }
4061
4062 static void
4063 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
4064 {
4065   GST_DEBUG_OBJECT (session->bin,
4066       "emitting signal for pt type changed to %u in session %u", pt,
4067       session->id);
4068
4069   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
4070       0, session->id, pt);
4071 }
4072
4073 /* emitted when caps changed for the session */
4074 static void
4075 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
4076 {
4077   GstRtpBin *bin;
4078   GstCaps *caps;
4079   gint payload;
4080   const GstStructure *s;
4081
4082   bin = session->bin;
4083
4084   g_object_get (pad, "caps", &caps, NULL);
4085
4086   if (caps == NULL)
4087     return;
4088
4089   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
4090
4091   s = gst_caps_get_structure (caps, 0);
4092
4093   /* get payload, finish when it's not there */
4094   if (!gst_structure_get_int (s, "payload", &payload)) {
4095     gst_caps_unref (caps);
4096     return;
4097   }
4098
4099   GST_RTP_SESSION_LOCK (session);
4100   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
4101   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
4102   GST_RTP_SESSION_UNLOCK (session);
4103 }
4104
4105 /* a new pad (SSRC) was created in @session */
4106 static void
4107 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
4108     GstRtpBinSession * session)
4109 {
4110   GstRtpBin *rtpbin;
4111   GstRtpBinStream *stream;
4112   GstPad *sinkpad, *srcpad;
4113   gchar *padname;
4114
4115   rtpbin = session->bin;
4116
4117   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
4118       GST_DEBUG_PAD_NAME (pad));
4119
4120   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
4121
4122   GST_RTP_SESSION_LOCK (session);
4123
4124   /* create new stream */
4125   stream = create_stream (session, ssrc);
4126   if (!stream)
4127     goto no_stream;
4128
4129   /* get pad and link */
4130   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
4131   padname = g_strdup_printf ("src_%u", ssrc);
4132   srcpad = gst_element_get_static_pad (element, padname);
4133   g_free (padname);
4134
4135   if (session->early_fec_decoder) {
4136     GST_DEBUG_OBJECT (rtpbin, "linking fec decoder");
4137     sinkpad = gst_element_get_static_pad (session->early_fec_decoder, "sink");
4138     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4139     gst_object_unref (sinkpad);
4140     gst_object_unref (srcpad);
4141     srcpad = gst_element_get_static_pad (session->early_fec_decoder, "src");
4142   }
4143
4144   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
4145   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4146   gst_object_unref (sinkpad);
4147   gst_object_unref (srcpad);
4148
4149   sinkpad = gst_element_request_pad_simple (stream->buffer, "sink_rtcp");
4150   if (sinkpad) {
4151     GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
4152     padname = g_strdup_printf ("rtcp_src_%u", ssrc);
4153     srcpad = gst_element_get_static_pad (element, padname);
4154     g_free (padname);
4155     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4156     gst_object_unref (sinkpad);
4157     gst_object_unref (srcpad);
4158   }
4159
4160   if (g_signal_lookup ("handle-sync", G_OBJECT_TYPE (stream->buffer)) != 0) {
4161     /* connect to the RTCP sync signal from the jitterbuffer */
4162     GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
4163     stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
4164         "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
4165   }
4166
4167   if (stream->demux) {
4168     /* connect to the new-pad signal of the payload demuxer, this will expose the
4169      * new pad by ghosting it. */
4170     stream->demux_newpad_sig = g_signal_connect (stream->demux,
4171         "new-payload-type", (GCallback) new_payload_found, stream);
4172     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
4173         "pad-removed", (GCallback) payload_pad_removed, stream);
4174
4175     /* connect to the request-pt-map signal. This signal will be emitted by the
4176      * demuxer so that it can apply a proper caps on the buffers for the
4177      * depayloaders. */
4178     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
4179         "request-pt-map", (GCallback) ptdemux_pt_map_requested, session);
4180     /* connect to the  signal so it can be forwarded. */
4181     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
4182         "payload-type-change", (GCallback) payload_type_change, session);
4183
4184     GST_RTP_SESSION_UNLOCK (session);
4185     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4186   } else {
4187     /* add rtpjitterbuffer src pad to pads */
4188     GstPad *pad;
4189
4190     pad = gst_element_get_static_pad (stream->buffer, "src");
4191
4192     GST_RTP_SESSION_UNLOCK (session);
4193     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4194
4195     expose_recv_src_pad (rtpbin, pad, stream, 255);
4196
4197     gst_object_unref (pad);
4198   }
4199
4200   return;
4201
4202   /* ERRORS */
4203 shutdown:
4204   {
4205     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
4206     return;
4207   }
4208 no_stream:
4209   {
4210     GST_RTP_SESSION_UNLOCK (session);
4211     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4212     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
4213     return;
4214   }
4215 }
4216
4217 static GstPad *
4218 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session)
4219 {
4220   guint sessid = session->id;
4221   GstPad *recv_rtp_sink;
4222   GstElement *decoder;
4223
4224   g_assert (!session->recv_rtp_sink);
4225
4226   /* get recv_rtp pad and store */
4227   session->recv_rtp_sink =
4228       gst_element_request_pad_simple (session->session, "recv_rtp_sink");
4229   if (session->recv_rtp_sink == NULL)
4230     goto pad_failed;
4231
4232   g_signal_connect (session->recv_rtp_sink, "notify::caps",
4233       (GCallback) caps_changed, session);
4234
4235   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
4236   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
4237   if (decoder) {
4238     GstPad *decsrc, *decsink;
4239     GstPadLinkReturn ret;
4240
4241     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
4242     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
4243     if (decsink == NULL)
4244       goto dec_sink_failed;
4245
4246     recv_rtp_sink = decsink;
4247
4248     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
4249     if (decsrc == NULL)
4250       goto dec_src_failed;
4251
4252     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
4253
4254     gst_object_unref (decsrc);
4255
4256     if (ret != GST_PAD_LINK_OK)
4257       goto dec_link_failed;
4258
4259   } else {
4260     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
4261     recv_rtp_sink = gst_object_ref (session->recv_rtp_sink);
4262   }
4263
4264   return recv_rtp_sink;
4265
4266   /* ERRORS */
4267 pad_failed:
4268   {
4269     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
4270     return NULL;
4271   }
4272 dec_sink_failed:
4273   {
4274     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4275     return NULL;
4276   }
4277 dec_src_failed:
4278   {
4279     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4280     gst_object_unref (recv_rtp_sink);
4281     return NULL;
4282   }
4283 dec_link_failed:
4284   {
4285     g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid);
4286     gst_object_unref (recv_rtp_sink);
4287     return NULL;
4288   }
4289 }
4290
4291 static void
4292 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
4293     guint sessid)
4294 {
4295   GstElement *aux;
4296   GstPad *recv_rtp_src;
4297
4298   g_assert (!session->recv_rtp_src);
4299
4300   session->recv_rtp_src =
4301       gst_element_get_static_pad (session->session, "recv_rtp_src");
4302   if (session->recv_rtp_src == NULL)
4303     goto pad_failed;
4304
4305   /* find out if we need AUX elements */
4306   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
4307   if (aux) {
4308     gchar *pname;
4309     GstPad *auxsink;
4310     GstPadLinkReturn ret;
4311
4312     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
4313
4314     pname = g_strdup_printf ("sink_%u", sessid);
4315     auxsink = gst_element_get_static_pad (aux, pname);
4316     g_free (pname);
4317     if (auxsink == NULL)
4318       goto aux_sink_failed;
4319
4320     ret = gst_pad_link (session->recv_rtp_src, auxsink);
4321     gst_object_unref (auxsink);
4322     if (ret != GST_PAD_LINK_OK)
4323       goto aux_link_failed;
4324
4325     /* this can be NULL when this AUX element is not to be linked any further */
4326     pname = g_strdup_printf ("src_%u", sessid);
4327     recv_rtp_src = gst_element_get_static_pad (aux, pname);
4328     g_free (pname);
4329   } else {
4330     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
4331   }
4332
4333   /* Add a storage element if needed */
4334   if (recv_rtp_src && session->storage) {
4335     GstPadLinkReturn ret;
4336     GstPad *sinkpad = gst_element_get_static_pad (session->storage, "sink");
4337
4338     ret = gst_pad_link (recv_rtp_src, sinkpad);
4339
4340     gst_object_unref (sinkpad);
4341     gst_object_unref (recv_rtp_src);
4342
4343     if (ret != GST_PAD_LINK_OK)
4344       goto storage_link_failed;
4345
4346     recv_rtp_src = gst_element_get_static_pad (session->storage, "src");
4347   }
4348
4349   if (recv_rtp_src) {
4350     GstPad *sinkdpad;
4351
4352     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
4353     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
4354     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
4355     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4356     gst_object_unref (sinkdpad);
4357     gst_object_unref (recv_rtp_src);
4358
4359     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
4360     session->demux_newpad_sig = g_signal_connect (session->demux,
4361         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
4362     session->demux_padremoved_sig = g_signal_connect (session->demux,
4363         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
4364   }
4365
4366   return;
4367
4368 pad_failed:
4369   {
4370     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
4371     return;
4372   }
4373 aux_sink_failed:
4374   {
4375     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4376     return;
4377   }
4378 aux_link_failed:
4379   {
4380     g_warning ("rtpbin: failed to link AUX pad to session %u", sessid);
4381     return;
4382   }
4383 storage_link_failed:
4384   {
4385     g_warning ("rtpbin: failed to link storage");
4386     return;
4387   }
4388 }
4389
4390 /* Create a pad for receiving RTP for the session in @name. Must be called with
4391  * RTP_BIN_LOCK.
4392  */
4393 static GstPad *
4394 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4395 {
4396   guint sessid;
4397   GstRtpBinSession *session;
4398   GstPad *recv_rtp_sink;
4399
4400   /* first get the session number */
4401   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
4402     goto no_name;
4403
4404   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4405
4406   /* get or create session */
4407   session = find_session_by_id (rtpbin, sessid);
4408   if (!session) {
4409     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4410     /* create session now */
4411     session = create_session (rtpbin, sessid);
4412     if (session == NULL)
4413       goto create_error;
4414   }
4415
4416   /* check if pad was requested */
4417   if (session->recv_rtp_sink_ghost != NULL)
4418     return session->recv_rtp_sink_ghost;
4419
4420   /* setup the session sink pad */
4421   recv_rtp_sink = complete_session_sink (rtpbin, session);
4422   if (!recv_rtp_sink)
4423     goto session_sink_failed;
4424
4425   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
4426   session->recv_rtp_sink_ghost =
4427       gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ);
4428   gst_object_unref (recv_rtp_sink);
4429
4430   complete_session_receiver (rtpbin, session, sessid);
4431
4432   return session->recv_rtp_sink_ghost;
4433
4434   /* ERRORS */
4435 no_name:
4436   {
4437     g_warning ("rtpbin: cannot find session id for pad: %s",
4438         GST_STR_NULL (name));
4439     return NULL;
4440   }
4441 create_error:
4442   {
4443     /* create_session already warned */
4444     return NULL;
4445   }
4446 session_sink_failed:
4447   {
4448     /* warning already done */
4449     return NULL;
4450   }
4451 }
4452
4453 static void
4454 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4455 {
4456   if (session->demux_newpad_sig) {
4457     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
4458     session->demux_newpad_sig = 0;
4459   }
4460   if (session->demux_padremoved_sig) {
4461     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
4462     session->demux_padremoved_sig = 0;
4463   }
4464   if (session->recv_rtp_src) {
4465     gst_object_unref (session->recv_rtp_src);
4466     session->recv_rtp_src = NULL;
4467   }
4468   if (session->recv_rtp_sink) {
4469     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
4470     gst_object_unref (session->recv_rtp_sink);
4471     session->recv_rtp_sink = NULL;
4472   }
4473   if (session->recv_rtp_sink_ghost) {
4474     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
4475     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4476         session->recv_rtp_sink_ghost);
4477     session->recv_rtp_sink_ghost = NULL;
4478   }
4479 }
4480
4481 static gint
4482 fec_sinkpad_find (const GValue * item, gchar * padname)
4483 {
4484   GstPad *pad = g_value_get_object (item);
4485   return g_strcmp0 (GST_PAD_NAME (pad), padname);
4486 }
4487
4488 static GstPad *
4489 complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
4490     guint fec_idx)
4491 {
4492   gboolean have_static_pad;
4493   gchar *padname;
4494
4495   GstPad *ret;
4496   GstIterator *it;
4497   GValue item = { 0, };
4498
4499   if (!ensure_early_fec_decoder (rtpbin, session))
4500     goto no_decoder;
4501
4502   padname = g_strdup_printf ("fec_%u", fec_idx);
4503
4504   GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad %s", padname);
4505
4506   /* First try to find the decoder static pad that matches the padname */
4507   it = gst_element_iterate_sink_pads (session->early_fec_decoder);
4508   have_static_pad =
4509       gst_iterator_find_custom (it, (GCompareFunc) fec_sinkpad_find, &item,
4510       padname);
4511
4512   if (have_static_pad) {
4513     ret = g_value_get_object (&item);
4514     gst_object_ref (ret);
4515     g_value_unset (&item);
4516   } else {
4517     ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
4518   }
4519
4520   g_free (padname);
4521   gst_iterator_free (it);
4522
4523   if (ret == NULL)
4524     goto pad_failed;
4525
4526   session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret);
4527
4528   return ret;
4529
4530 pad_failed:
4531   {
4532     g_warning ("rtpbin: failed to get decoder fec pad");
4533     return NULL;
4534   }
4535 no_decoder:
4536   {
4537     g_warning ("rtpbin: failed to build FEC decoder for session %u",
4538         session->id);
4539     return NULL;
4540   }
4541 }
4542
4543 static GstPad *
4544 complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
4545     guint sessid)
4546 {
4547   GstElement *decoder;
4548   GstPad *sinkdpad;
4549   GstPad *decsink = NULL;
4550
4551   /* get recv_rtp pad and store */
4552   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
4553   session->recv_rtcp_sink =
4554       gst_element_request_pad_simple (session->session, "recv_rtcp_sink");
4555   if (session->recv_rtcp_sink == NULL)
4556     goto pad_failed;
4557
4558   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
4559   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
4560   if (decoder) {
4561     GstPad *decsrc;
4562     GstPadLinkReturn ret;
4563
4564     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
4565     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
4566     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
4567
4568     if (decsink == NULL)
4569       goto dec_sink_failed;
4570
4571     if (decsrc == NULL)
4572       goto dec_src_failed;
4573
4574     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
4575
4576     gst_object_unref (decsrc);
4577
4578     if (ret != GST_PAD_LINK_OK)
4579       goto dec_link_failed;
4580   } else {
4581     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
4582     decsink = gst_object_ref (session->recv_rtcp_sink);
4583   }
4584
4585   /* get srcpad, link to SSRCDemux */
4586   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
4587   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
4588   if (session->sync_src == NULL)
4589     goto src_pad_failed;
4590
4591   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
4592   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
4593   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4594   gst_object_unref (sinkdpad);
4595
4596   return decsink;
4597
4598 pad_failed:
4599   {
4600     g_warning ("rtpbin: failed to get session rtcp_sink pad");
4601     return NULL;
4602   }
4603 dec_sink_failed:
4604   {
4605     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4606     return NULL;
4607   }
4608 dec_src_failed:
4609   {
4610     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4611     goto cleanup;
4612   }
4613 dec_link_failed:
4614   {
4615     g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid);
4616     goto cleanup;
4617   }
4618 src_pad_failed:
4619   {
4620     g_warning ("rtpbin: failed to get session sync_src pad");
4621   }
4622
4623 cleanup:
4624   gst_object_unref (decsink);
4625   return NULL;
4626 }
4627
4628 /* Create a pad for receiving RTCP for the session in @name. Must be called with
4629  * RTP_BIN_LOCK.
4630  */
4631 static GstPad *
4632 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
4633     const gchar * name)
4634 {
4635   guint sessid;
4636   GstRtpBinSession *session;
4637   GstPad *decsink = NULL;
4638
4639   /* first get the session number */
4640   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
4641     goto no_name;
4642
4643   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4644
4645   /* get or create the session */
4646   session = find_session_by_id (rtpbin, sessid);
4647   if (!session) {
4648     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4649     /* create session now */
4650     session = create_session (rtpbin, sessid);
4651     if (session == NULL)
4652       goto create_error;
4653   }
4654
4655   /* check if pad was requested */
4656   if (session->recv_rtcp_sink_ghost != NULL)
4657     return session->recv_rtcp_sink_ghost;
4658
4659   decsink = complete_session_rtcp (rtpbin, session, sessid);
4660   if (!decsink)
4661     goto create_error;
4662
4663   session->recv_rtcp_sink_ghost =
4664       gst_ghost_pad_new_from_template (name, decsink, templ);
4665   gst_object_unref (decsink);
4666
4667   return session->recv_rtcp_sink_ghost;
4668
4669   /* ERRORS */
4670 no_name:
4671   {
4672     g_warning ("rtpbin: cannot find session id for pad: %s",
4673         GST_STR_NULL (name));
4674     return NULL;
4675   }
4676 create_error:
4677   {
4678     /* create_session already warned */
4679     return NULL;
4680   }
4681 }
4682
4683 static GstPad *
4684 create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4685 {
4686   guint sessid, fec_idx;
4687   GstRtpBinSession *session;
4688   GstPad *decsink = NULL;
4689   GstPad *ghost;
4690
4691   /* first get the session number */
4692   if (name == NULL
4693       || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2)
4694     goto no_name;
4695
4696   if (fec_idx > 1)
4697     goto invalid_idx;
4698
4699   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4700
4701   /* get or create the session */
4702   session = find_session_by_id (rtpbin, sessid);
4703   if (!session) {
4704     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4705     /* create session now */
4706     session = create_session (rtpbin, sessid);
4707     if (session == NULL)
4708       goto create_error;
4709   }
4710
4711   decsink = complete_session_fec (rtpbin, session, fec_idx);
4712   if (!decsink)
4713     goto create_error;
4714
4715   ghost = gst_ghost_pad_new_from_template (name, decsink, templ);
4716   session->recv_fec_sink_ghosts =
4717       g_slist_prepend (session->recv_fec_sink_ghosts, ghost);
4718   gst_object_unref (decsink);
4719
4720   return ghost;
4721
4722   /* ERRORS */
4723 no_name:
4724   {
4725     g_warning ("rtpbin: cannot find session id for pad: %s",
4726         GST_STR_NULL (name));
4727     return NULL;
4728   }
4729 invalid_idx:
4730   {
4731     g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name));
4732     return NULL;
4733   }
4734 create_error:
4735   {
4736     /* create_session already warned */
4737     return NULL;
4738   }
4739 }
4740
4741 static void
4742 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4743 {
4744   if (session->recv_rtcp_sink_ghost) {
4745     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
4746     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4747         session->recv_rtcp_sink_ghost);
4748     session->recv_rtcp_sink_ghost = NULL;
4749   }
4750   if (session->sync_src) {
4751     /* releasing the request pad should also unref the sync pad */
4752     gst_object_unref (session->sync_src);
4753     session->sync_src = NULL;
4754   }
4755   if (session->recv_rtcp_sink) {
4756     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
4757     gst_object_unref (session->recv_rtcp_sink);
4758     session->recv_rtcp_sink = NULL;
4759   }
4760 }
4761
4762 static void
4763 remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
4764     GstPad * ghost)
4765 {
4766   GSList *item;
4767   GstPad *target;
4768
4769   target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
4770
4771   if (target) {
4772     item = g_slist_find (session->recv_fec_sinks, target);
4773     if (item) {
4774       GstPadTemplate *templ;
4775       GstPad *pad;
4776
4777       pad = item->data;
4778       templ = gst_pad_get_pad_template (pad);
4779
4780       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_REQUEST) {
4781         GST_DEBUG_OBJECT (rtpbin,
4782             "Releasing FEC decoder pad %" GST_PTR_FORMAT, pad);
4783         gst_element_release_request_pad (session->early_fec_decoder, pad);
4784       } else {
4785         gst_object_unref (pad);
4786       }
4787
4788       session->recv_fec_sinks =
4789           g_slist_delete_link (session->recv_fec_sinks, item);
4790
4791       gst_object_unref (templ);
4792     }
4793     gst_object_unref (target);
4794   }
4795
4796   item = g_slist_find (session->recv_fec_sink_ghosts, ghost);
4797   if (item)
4798     session->recv_fec_sink_ghosts =
4799         g_slist_delete_link (session->recv_fec_sink_ghosts, item);
4800
4801   gst_pad_set_active (ghost, FALSE);
4802   gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4803 }
4804
4805 static void
4806 remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
4807 {
4808   GSList *copy;
4809   GSList *tmp;
4810
4811   copy = g_slist_copy (session->recv_fec_sink_ghosts);
4812
4813   for (tmp = copy; tmp; tmp = tmp->next) {
4814     remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data);
4815   }
4816
4817   g_slist_free (copy);
4818 }
4819
4820 static gboolean
4821 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
4822 {
4823   gchar *gname;
4824   guint sessid = session->id;
4825   GstPad *send_rtp_src;
4826   GstElement *encoder;
4827   GstElementClass *klass;
4828   GstPadTemplate *templ;
4829   gboolean ret = FALSE;
4830
4831   /* get srcpad */
4832   send_rtp_src = gst_element_get_static_pad (session->session, "send_rtp_src");
4833
4834   if (send_rtp_src == NULL)
4835     goto no_srcpad;
4836
4837   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
4838   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
4839   if (encoder) {
4840     gchar *ename;
4841     GstPad *encsrc, *encsink;
4842     GstPadLinkReturn ret;
4843
4844     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
4845     ename = g_strdup_printf ("rtp_src_%u", sessid);
4846     encsrc = gst_element_get_static_pad (encoder, ename);
4847     g_free (ename);
4848
4849     if (encsrc == NULL)
4850       goto enc_src_failed;
4851
4852     ename = g_strdup_printf ("rtp_sink_%u", sessid);
4853     encsink = gst_element_get_static_pad (encoder, ename);
4854     g_free (ename);
4855     if (encsink == NULL)
4856       goto enc_sink_failed;
4857
4858     ret = gst_pad_link (send_rtp_src, encsink);
4859     gst_object_unref (encsink);
4860     gst_object_unref (send_rtp_src);
4861
4862     send_rtp_src = encsrc;
4863
4864     if (ret != GST_PAD_LINK_OK)
4865       goto enc_link_failed;
4866   } else {
4867     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
4868   }
4869
4870   /* ghost the new source pad */
4871   klass = GST_ELEMENT_GET_CLASS (rtpbin);
4872   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
4873   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
4874   session->send_rtp_src_ghost =
4875       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
4876   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
4877   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
4878       session->send_rtp_src_ghost);
4879   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
4880   g_free (gname);
4881
4882   ret = TRUE;
4883
4884 done:
4885   if (send_rtp_src)
4886     gst_object_unref (send_rtp_src);
4887
4888   return ret;
4889
4890   /* ERRORS */
4891 no_srcpad:
4892   {
4893     g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid);
4894     goto done;
4895   }
4896 enc_src_failed:
4897   {
4898     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4899         " src pad for session %u", encoder, sessid);
4900     goto done;
4901   }
4902 enc_sink_failed:
4903   {
4904     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4905         " sink pad for session %u", encoder, sessid);
4906     goto done;
4907   }
4908 enc_link_failed:
4909   {
4910     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
4911         encoder, sessid);
4912     goto done;
4913   }
4914 }
4915
4916 static gboolean
4917 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
4918 {
4919   GstPad *pad;
4920   gchar *name;
4921   guint sessid;
4922   GstRtpBinSession *session = user_data, *newsess;
4923   GstRtpBin *rtpbin = session->bin;
4924   GstPadLinkReturn ret;
4925
4926   pad = g_value_get_object (item);
4927   name = gst_pad_get_name (pad);
4928
4929   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
4930     goto no_name;
4931
4932   g_free (name);
4933
4934   newsess = find_session_by_id (rtpbin, sessid);
4935   if (newsess == NULL) {
4936     /* create new session */
4937     newsess = create_session (rtpbin, sessid);
4938     if (newsess == NULL)
4939       goto create_error;
4940   } else if (newsess->send_rtp_sink != NULL)
4941     goto existing_session;
4942
4943   /* get send_rtp pad and store */
4944   newsess->send_rtp_sink =
4945       gst_element_request_pad_simple (newsess->session, "send_rtp_sink");
4946   if (newsess->send_rtp_sink == NULL)
4947     goto pad_failed;
4948
4949   ret = gst_pad_link (pad, newsess->send_rtp_sink);
4950   if (ret != GST_PAD_LINK_OK)
4951     goto aux_link_failed;
4952
4953   if (!complete_session_src (rtpbin, newsess))
4954     goto session_src_failed;
4955
4956   return TRUE;
4957
4958   /* ERRORS */
4959 no_name:
4960   {
4961     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
4962     g_free (name);
4963     return TRUE;
4964   }
4965 create_error:
4966   {
4967     /* create_session already warned */
4968     return FALSE;
4969   }
4970 existing_session:
4971   {
4972     GST_DEBUG_OBJECT (rtpbin,
4973         "skipping src_%i setup, since it is already configured.", sessid);
4974     return TRUE;
4975   }
4976 pad_failed:
4977   {
4978     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4979     return FALSE;
4980   }
4981 aux_link_failed:
4982   {
4983     g_warning ("rtpbin: failed to link AUX for session %u", sessid);
4984     return FALSE;
4985   }
4986 session_src_failed:
4987   {
4988     g_warning ("rtpbin: failed to complete AUX for session %u", sessid);
4989     return FALSE;
4990   }
4991 }
4992
4993 static gboolean
4994 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
4995     GstElement * aux)
4996 {
4997   GstIterator *it;
4998   GValue result = { 0, };
4999   GstIteratorResult res;
5000
5001   it = gst_element_iterate_src_pads (aux);
5002   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
5003   gst_iterator_free (it);
5004
5005   return res == GST_ITERATOR_DONE;
5006 }
5007
5008 static void
5009 fec_encoder_add_pad_unlocked (GstPad * pad, GstRtpBinSession * session)
5010 {
5011   GstElementClass *klass;
5012   gchar *gname;
5013   GstPadTemplate *templ;
5014   guint fec_idx;
5015   GstPad *ghost;
5016
5017   if (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) != 1) {
5018     GST_WARNING_OBJECT (session->bin,
5019         "FEC encoder added pad with name not matching fec_%%u (%s)",
5020         GST_PAD_NAME (pad));
5021     goto done;
5022   }
5023
5024   GST_INFO_OBJECT (session->bin, "FEC encoder for session %u exposed new pad",
5025       session->id);
5026
5027   klass = GST_ELEMENT_GET_CLASS (session->bin);
5028   gname = g_strdup_printf ("send_fec_src_%u_%u", session->id, fec_idx);
5029   templ = gst_element_class_get_pad_template (klass, "send_fec_src_%u_%u");
5030   ghost = gst_ghost_pad_new_from_template (gname, pad, templ);
5031   session->send_fec_src_ghosts =
5032       g_slist_prepend (session->send_fec_src_ghosts, ghost);
5033   gst_pad_set_active (ghost, TRUE);
5034   gst_pad_sticky_events_foreach (pad, copy_sticky_events, ghost);
5035   gst_element_add_pad (GST_ELEMENT (session->bin), ghost);
5036   g_free (gname);
5037
5038 done:
5039   return;
5040 }
5041
5042 static void
5043 fec_encoder_add_pad (GstPad * pad, GstRtpBinSession * session)
5044 {
5045   GST_RTP_BIN_LOCK (session->bin);
5046   fec_encoder_add_pad_unlocked (pad, session);
5047   GST_RTP_BIN_UNLOCK (session->bin);
5048 }
5049
5050 static gint
5051 fec_srcpad_iterator_filter (const GValue * item, GValue * unused)
5052 {
5053   guint fec_idx;
5054   GstPad *pad = g_value_get_object (item);
5055   GstPadTemplate *templ = gst_pad_get_pad_template (pad);
5056
5057   gint have_static_pad =
5058       (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_ALWAYS) &&
5059       (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) == 1);
5060
5061   gst_object_unref (templ);
5062
5063   /* return 0 to retain pad in filtered iterator */
5064   return !have_static_pad;
5065 }
5066
5067 static void
5068 fec_srcpad_iterator_foreach (const GValue * item, GstRtpBinSession * session)
5069 {
5070   GstPad *pad = g_value_get_object (item);
5071   fec_encoder_add_pad_unlocked (pad, session);
5072 }
5073
5074 static void
5075 fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
5076     GstRtpBinSession * session)
5077 {
5078   fec_encoder_add_pad (pad, session);
5079 }
5080
5081 static GstElement *
5082 request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session,
5083     guint sessid)
5084 {
5085   GstElement *ret = NULL;
5086   const gchar *factory;
5087   gchar *sess_id_str;
5088
5089   sess_id_str = g_strdup_printf ("%u", sessid);
5090   factory = gst_structure_get_string (rtpbin->fec_encoders, sess_id_str);
5091   g_free (sess_id_str);
5092
5093   /* First try the property */
5094   if (factory) {
5095     GError *err = NULL;
5096
5097     ret =
5098         gst_parse_bin_from_description_full (factory, TRUE, NULL,
5099         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
5100         &err);
5101     if (!ret) {
5102       GST_ERROR_OBJECT (rtpbin, "Failed to build encoder from factory: %s",
5103           err->message);
5104       goto done;
5105     }
5106
5107     bin_manage_element (session->bin, ret);
5108     session->elements = g_slist_prepend (session->elements, ret);
5109     GST_INFO_OBJECT (rtpbin, "Built FEC encoder: %" GST_PTR_FORMAT
5110         " for session %u", ret, sessid);
5111   }
5112
5113   /* Fallback to the signal */
5114   if (!ret)
5115     ret = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER);
5116
5117   if (ret) {
5118     /* First, add encoder pads that match fec_% template and are already present */
5119     GstIterator *it, *filter;
5120     GstIteratorResult it_ret = GST_ITERATOR_OK;
5121
5122     it = gst_element_iterate_src_pads (ret);
5123     filter =
5124         gst_iterator_filter (it, (GCompareFunc) fec_srcpad_iterator_filter,
5125         NULL);
5126
5127     while (it_ret == GST_ITERATOR_OK || it_ret == GST_ITERATOR_RESYNC) {
5128       it_ret =
5129           gst_iterator_foreach (filter,
5130           (GstIteratorForeachFunction) fec_srcpad_iterator_foreach, session);
5131
5132       if (it_ret == GST_ITERATOR_RESYNC)
5133         gst_iterator_resync (filter);
5134     }
5135
5136     gst_iterator_free (filter);
5137
5138     /* Finally, connect to pad-added signal if any of the encoder pads are
5139      * added later */
5140     g_signal_connect (ret, "pad-added", G_CALLBACK (fec_encoder_pad_added_cb),
5141         session);
5142   }
5143
5144 done:
5145   return ret;
5146 }
5147
5148 /* Create a pad for sending RTP for the session in @name. Must be called with
5149  * RTP_BIN_LOCK.
5150  */
5151 static GstPad *
5152 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
5153 {
5154   gchar *pname;
5155   guint sessid;
5156   GstPad *send_rtp_sink;
5157   GstElement *aux;
5158   GstElement *encoder;
5159   GstElement *prev = NULL;
5160   GstRtpBinSession *session;
5161
5162   /* first get the session number */
5163   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
5164     goto no_name;
5165
5166   /* get or create session */
5167   session = find_session_by_id (rtpbin, sessid);
5168   if (!session) {
5169     /* create session now */
5170     session = create_session (rtpbin, sessid);
5171     if (session == NULL)
5172       goto create_error;
5173   }
5174
5175   /* check if pad was requested */
5176   if (session->send_rtp_sink_ghost != NULL)
5177     return session->send_rtp_sink_ghost;
5178
5179   /* check if we are already using this session as a sender */
5180   if (session->send_rtp_sink != NULL)
5181     goto existing_session;
5182
5183   encoder = request_fec_encoder (rtpbin, session, sessid);
5184
5185   if (encoder) {
5186     GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder");
5187
5188     send_rtp_sink = gst_element_get_static_pad (encoder, "sink");
5189
5190     if (!send_rtp_sink)
5191       goto enc_sink_failed;
5192
5193     prev = encoder;
5194   }
5195
5196   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
5197   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
5198   if (aux) {
5199     GstPad *sinkpad;
5200     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
5201     if (!setup_aux_sender (rtpbin, session, aux))
5202       goto aux_session_failed;
5203
5204     pname = g_strdup_printf ("sink_%u", sessid);
5205     sinkpad = gst_element_get_static_pad (aux, pname);
5206     g_free (pname);
5207
5208     if (sinkpad == NULL)
5209       goto aux_sink_failed;
5210
5211     if (!prev) {
5212       send_rtp_sink = sinkpad;
5213     } else {
5214       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5215       GstPadLinkReturn ret;
5216
5217       ret = gst_pad_link (srcpad, sinkpad);
5218       gst_object_unref (srcpad);
5219       if (ret != GST_PAD_LINK_OK) {
5220         goto aux_link_failed;
5221       }
5222       gst_object_unref (sinkpad);
5223     }
5224     prev = aux;
5225   } else {
5226     /* get send_rtp pad and store */
5227     session->send_rtp_sink =
5228         gst_element_request_pad_simple (session->session, "send_rtp_sink");
5229     if (session->send_rtp_sink == NULL)
5230       goto pad_failed;
5231
5232     if (!complete_session_src (rtpbin, session))
5233       goto session_src_failed;
5234
5235     if (!prev) {
5236       send_rtp_sink = gst_object_ref (session->send_rtp_sink);
5237     } else {
5238       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5239       GstPadLinkReturn ret;
5240
5241       ret = gst_pad_link (srcpad, session->send_rtp_sink);
5242       gst_object_unref (srcpad);
5243       if (ret != GST_PAD_LINK_OK)
5244         goto session_link_failed;
5245     }
5246   }
5247
5248   session->send_rtp_sink_ghost =
5249       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
5250   gst_object_unref (send_rtp_sink);
5251
5252   return session->send_rtp_sink_ghost;
5253
5254   /* ERRORS */
5255 no_name:
5256   {
5257     g_warning ("rtpbin: cannot find session id for pad: %s",
5258         GST_STR_NULL (name));
5259     return NULL;
5260   }
5261 create_error:
5262   {
5263     /* create_session already warned */
5264     return NULL;
5265   }
5266 existing_session:
5267   {
5268     g_warning ("rtpbin: session %u is already in use", sessid);
5269     return NULL;
5270   }
5271 aux_session_failed:
5272   {
5273     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5274     return NULL;
5275   }
5276 aux_sink_failed:
5277   {
5278     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5279     return NULL;
5280   }
5281 aux_link_failed:
5282   {
5283     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5284         aux, sessid);
5285     return NULL;
5286   }
5287 pad_failed:
5288   {
5289     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
5290     return NULL;
5291   }
5292 session_src_failed:
5293   {
5294     g_warning ("rtpbin: failed to setup source pads for session %u", sessid);
5295     return NULL;
5296   }
5297 session_link_failed:
5298   {
5299     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5300         session, sessid);
5301     return NULL;
5302   }
5303 enc_sink_failed:
5304   {
5305     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
5306         " sink pad for session %u", encoder, sessid);
5307     return NULL;
5308   }
5309 }
5310
5311 static void
5312 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5313 {
5314   if (session->send_rtp_src_ghost) {
5315     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
5316     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5317         session->send_rtp_src_ghost);
5318     session->send_rtp_src_ghost = NULL;
5319   }
5320   if (session->send_rtp_sink) {
5321     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
5322         session->send_rtp_sink);
5323     gst_object_unref (session->send_rtp_sink);
5324     session->send_rtp_sink = NULL;
5325   }
5326   if (session->send_rtp_sink_ghost) {
5327     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
5328     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5329         session->send_rtp_sink_ghost);
5330     session->send_rtp_sink_ghost = NULL;
5331   }
5332 }
5333
5334 static void
5335 remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
5336 {
5337   GSList *tmp;
5338
5339   for (tmp = session->send_fec_src_ghosts; tmp; tmp = tmp->next) {
5340     GstPad *ghost = GST_PAD (tmp->data);
5341     gst_pad_set_active (ghost, FALSE);
5342     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
5343   }
5344
5345   g_slist_free (session->send_fec_src_ghosts);
5346   session->send_fec_src_ghosts = NULL;
5347 }
5348
5349 /* Create a pad for sending RTCP for the session in @name. Must be called with
5350  * RTP_BIN_LOCK.
5351  */
5352 static GstPad *
5353 create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
5354     const gchar * name)
5355 {
5356   guint sessid;
5357   GstPad *encsrc;
5358   GstElement *encoder;
5359   GstRtpBinSession *session;
5360
5361   /* first get the session number */
5362   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
5363     goto no_name;
5364
5365   /* get or create session */
5366   session = find_session_by_id (rtpbin, sessid);
5367   if (!session) {
5368     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
5369     /* create session now */
5370     session = create_session (rtpbin, sessid);
5371     if (session == NULL)
5372       goto create_error;
5373   }
5374
5375   /* check if pad was requested */
5376   if (session->send_rtcp_src_ghost != NULL)
5377     return session->send_rtcp_src_ghost;
5378
5379   /* get rtcp_src pad and store */
5380   session->send_rtcp_src =
5381       gst_element_request_pad_simple (session->session, "send_rtcp_src");
5382   if (session->send_rtcp_src == NULL)
5383     goto pad_failed;
5384
5385   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
5386   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
5387   if (encoder) {
5388     gchar *ename;
5389     GstPad *encsink;
5390     GstPadLinkReturn ret;
5391
5392     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
5393
5394     ename = g_strdup_printf ("rtcp_src_%u", sessid);
5395     encsrc = gst_element_get_static_pad (encoder, ename);
5396     g_free (ename);
5397     if (encsrc == NULL)
5398       goto enc_src_failed;
5399
5400     ename = g_strdup_printf ("rtcp_sink_%u", sessid);
5401     encsink = gst_element_get_static_pad (encoder, ename);
5402     g_free (ename);
5403     if (encsink == NULL)
5404       goto enc_sink_failed;
5405
5406     ret = gst_pad_link (session->send_rtcp_src, encsink);
5407     gst_object_unref (encsink);
5408
5409     if (ret != GST_PAD_LINK_OK)
5410       goto enc_link_failed;
5411   } else {
5412     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
5413     encsrc = gst_object_ref (session->send_rtcp_src);
5414   }
5415
5416   session->send_rtcp_src_ghost =
5417       gst_ghost_pad_new_from_template (name, encsrc, templ);
5418   gst_object_unref (encsrc);
5419
5420   return session->send_rtcp_src_ghost;
5421
5422   /* ERRORS */
5423 no_name:
5424   {
5425     g_warning ("rtpbin: cannot find session id for pad: %s",
5426         GST_STR_NULL (name));
5427     return NULL;
5428   }
5429 create_error:
5430   {
5431     /* create_session already warned */
5432     return NULL;
5433   }
5434 pad_failed:
5435   {
5436     g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid);
5437     return NULL;
5438   }
5439 enc_src_failed:
5440   {
5441     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
5442     return NULL;
5443   }
5444 enc_sink_failed:
5445   {
5446     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
5447     gst_object_unref (encsrc);
5448     return NULL;
5449   }
5450 enc_link_failed:
5451   {
5452     g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid);
5453     gst_object_unref (encsrc);
5454     return NULL;
5455   }
5456 }
5457
5458 static void
5459 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5460 {
5461   if (session->send_rtcp_src_ghost) {
5462     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
5463     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5464         session->send_rtcp_src_ghost);
5465     session->send_rtcp_src_ghost = NULL;
5466   }
5467   if (session->send_rtcp_src) {
5468     gst_element_release_request_pad (session->session, session->send_rtcp_src);
5469     gst_object_unref (session->send_rtcp_src);
5470     session->send_rtcp_src = NULL;
5471   }
5472 }
5473
5474 /* If the requested name is NULL we should create a name with
5475  * the session number assuming we want the lowest possible session
5476  * with a free pad like the template */
5477 static gchar *
5478 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
5479 {
5480   gboolean name_found = FALSE;
5481   gint session = 0;
5482   GstIterator *pad_it = NULL;
5483   gchar *pad_name = NULL;
5484   GValue data = { 0, };
5485
5486   GST_DEBUG_OBJECT (element, "find a free pad name for template");
5487   while (!name_found) {
5488     gboolean done = FALSE;
5489
5490     g_free (pad_name);
5491     pad_name = g_strdup_printf (templ->name_template, session++);
5492     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
5493     name_found = TRUE;
5494     while (!done) {
5495       switch (gst_iterator_next (pad_it, &data)) {
5496         case GST_ITERATOR_OK:
5497         {
5498           GstPad *pad;
5499           gchar *name;
5500
5501           pad = g_value_get_object (&data);
5502           name = gst_pad_get_name (pad);
5503
5504           if (strcmp (name, pad_name) == 0) {
5505             done = TRUE;
5506             name_found = FALSE;
5507           }
5508           g_free (name);
5509           g_value_reset (&data);
5510           break;
5511         }
5512         case GST_ITERATOR_ERROR:
5513         case GST_ITERATOR_RESYNC:
5514           /* restart iteration */
5515           done = TRUE;
5516           name_found = FALSE;
5517           session = 0;
5518           break;
5519         case GST_ITERATOR_DONE:
5520           done = TRUE;
5521           break;
5522       }
5523     }
5524     g_value_unset (&data);
5525     gst_iterator_free (pad_it);
5526   }
5527
5528   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
5529   return pad_name;
5530 }
5531
5532 /*
5533  */
5534 static GstPad *
5535 gst_rtp_bin_request_new_pad (GstElement * element,
5536     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
5537 {
5538   GstRtpBin *rtpbin;
5539   GstElementClass *klass;
5540   GstPad *result;
5541
5542   gchar *pad_name = NULL;
5543
5544   g_return_val_if_fail (templ != NULL, NULL);
5545   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
5546
5547   rtpbin = GST_RTP_BIN (element);
5548   klass = GST_ELEMENT_GET_CLASS (element);
5549
5550   GST_RTP_BIN_LOCK (rtpbin);
5551
5552   if (name == NULL) {
5553     /* use a free pad name */
5554     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
5555   } else {
5556     /* use the provided name */
5557     pad_name = g_strdup (name);
5558   }
5559
5560   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
5561
5562   /* figure out the template */
5563   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
5564     result = create_recv_rtp (rtpbin, templ, pad_name);
5565   } else if (templ == gst_element_class_get_pad_template (klass,
5566           "recv_rtcp_sink_%u")) {
5567     result = create_recv_rtcp (rtpbin, templ, pad_name);
5568   } else if (templ == gst_element_class_get_pad_template (klass,
5569           "send_rtp_sink_%u")) {
5570     result = create_send_rtp (rtpbin, templ, pad_name);
5571   } else if (templ == gst_element_class_get_pad_template (klass,
5572           "send_rtcp_src_%u")) {
5573     result = create_send_rtcp (rtpbin, templ, pad_name);
5574   } else if (templ == gst_element_class_get_pad_template (klass,
5575           "recv_fec_sink_%u_%u")) {
5576     result = create_recv_fec (rtpbin, templ, pad_name);
5577   } else
5578     goto wrong_template;
5579
5580   g_free (pad_name);
5581   GST_RTP_BIN_UNLOCK (rtpbin);
5582
5583   if (result) {
5584     gst_pad_set_active (result, TRUE);
5585     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
5586   }
5587
5588   return result;
5589
5590   /* ERRORS */
5591 wrong_template:
5592   {
5593     g_free (pad_name);
5594     GST_RTP_BIN_UNLOCK (rtpbin);
5595     g_warning ("rtpbin: this is not our template");
5596     return NULL;
5597   }
5598 }
5599
5600 static void
5601 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
5602 {
5603   GstRtpBinSession *session;
5604   GstRtpBin *rtpbin;
5605
5606   g_return_if_fail (GST_IS_GHOST_PAD (pad));
5607   g_return_if_fail (GST_IS_RTP_BIN (element));
5608
5609   rtpbin = GST_RTP_BIN (element);
5610
5611   GST_RTP_BIN_LOCK (rtpbin);
5612   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
5613       GST_DEBUG_PAD_NAME (pad));
5614
5615   if (!(session = find_session_by_pad (rtpbin, pad)))
5616     goto unknown_pad;
5617
5618   if (session->recv_rtp_sink_ghost == pad) {
5619     remove_recv_rtp (rtpbin, session);
5620   } else if (session->recv_rtcp_sink_ghost == pad) {
5621     remove_recv_rtcp (rtpbin, session);
5622   } else if (session->send_rtp_sink_ghost == pad) {
5623     remove_send_rtp (rtpbin, session);
5624   } else if (session->send_rtcp_src_ghost == pad) {
5625     remove_rtcp (rtpbin, session);
5626   } else if (pad_is_recv_fec (session, pad)) {
5627     remove_recv_fec_for_pad (rtpbin, session, pad);
5628   }
5629
5630   /* no more request pads, free the complete session */
5631   if (session->recv_rtp_sink_ghost == NULL
5632       && session->recv_rtcp_sink_ghost == NULL
5633       && session->send_rtp_sink_ghost == NULL
5634       && session->send_rtcp_src_ghost == NULL
5635       && session->recv_fec_sink_ghosts == NULL) {
5636     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
5637     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
5638     free_session (session, rtpbin);
5639   }
5640   GST_RTP_BIN_UNLOCK (rtpbin);
5641
5642   return;
5643
5644   /* ERROR */
5645 unknown_pad:
5646   {
5647     GST_RTP_BIN_UNLOCK (rtpbin);
5648     g_warning ("rtpbin: %s:%s is not one of our request pads",
5649         GST_DEBUG_PAD_NAME (pad));
5650     return;
5651   }
5652 }