rtpbin: proxy new "add-reference-timestamp-meta" property from rtpjitterbuffer
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @title: rtpbin
23  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
24  *
25  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
26  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
27  * RTP sessions that will be synchronized together using RTCP SR packets.
28  *
29  * #GstRtpBin is configured with a number of request pads that define the
30  * functionality that is activated, similar to the #GstRtpSession element.
31  *
32  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
33  * number must be specified in the pad name.
34  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
35  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
36  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
37  * the packets are released from the jitterbuffer, they will be forwarded to a
38  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
39  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
40  * rtpbin with the session number, SSRC and payload type respectively as the pad
41  * name.
42  *
43  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
44  * session number must be specified in the pad name.
45  *
46  * If you want the session manager to generate and send RTCP packets, request
47  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
48  * on this pad contain SR/RR RTCP reports that should be sent to all participants
49  * in the session.
50  *
51  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
52  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
53  * the pad from the lowest available session will be returned. The session manager will modify the
54  * SSRC in the RTP packets to its own SSRC and will forward the packets on the
55  * send_rtp_src_\%u pad after updating its internal state.
56  *
57  * The session manager needs the clock-rate of the payload types it is handling
58  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
59  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
60  * signal.
61  *
62  * Access to the internal statistics of rtpbin is provided with the
63  * get-internal-session property. This action signal gives access to the
64  * RTPSession object which further provides action signals to retrieve the
65  * internal source and other sources.
66  *
67  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
68  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
69  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
70  * and decoders in order to support SRTP. The encoders must provide the pads
71  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
72  * RTCP. The session number will be used in the pad name. The decoders must provide
73  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
74  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
75  * internally.
76  *
77  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
78  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
79  * used to create or merge additional RTP streams. AUX elements are needed to
80  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
81  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
82  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
83  * and the pad will be linked to the session send_rtp_sink pad. Each session will
84  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
85  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
86  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
87  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
88  * The #GstRtpBin::request-jitterbuffer signal can be used to provide a custom
89  * element to perform arrival time smoothing, reordering and optionally packet
90  * loss detection and retransmission requests.
91  *
92  * ## Example pipelines
93  *
94  * |[
95  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
96  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
97  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
98  * |[
99  * gst-launch-1.0 rtpbin name=rtpbin \
100  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
101  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
102  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
103  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
104  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
105  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
106  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
107  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
108  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
109  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
110  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
111  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
112  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
113  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
114  * is received on port 5007. Since RTCP packets from the sender should be sent
115  * as soon as possible and do not participate in preroll, sync=false and
116  * async=false is configured on udpsink
117  * |[
118  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
119  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
120  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
121  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
122  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
123  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
124  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
125  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
126  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
127  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
128  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
129  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
130  * decode and display the video.
131  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
132  * decode and play the audio.
133  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
134  * session 1 on port 5003. These packets will be used for session management and
135  * synchronisation.
136  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
137  * on port 5007.
138  *
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #include "config.h"
143 #endif
144 #include <stdio.h>
145 #include <string.h>
146
147 #include <gst/rtp/gstrtpbuffer.h>
148 #include <gst/rtp/gstrtcpbuffer.h>
149
150 #include "gstrtpbin.h"
151 #include "rtpsession.h"
152 #include "gstrtpsession.h"
153 #include "gstrtpjitterbuffer.h"
154
155 #include <gst/glib-compat-private.h>
156
157 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
158 #define GST_CAT_DEFAULT gst_rtp_bin_debug
159
160 /* sink pads */
161 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
162     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
163     GST_PAD_SINK,
164     GST_PAD_REQUEST,
165     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
166     );
167
168 /**
169  * GstRtpBin!recv_fec_sink_%u_%u:
170  *
171  * Sink template for receiving Forward Error Correction packets,
172  * in the form recv_fec_sink_<session_idx>_<fec_stream_idx>
173  *
174  * See #GstRTPST_2022_1_FecDec for example usage
175  *
176  * Since: 1.20
177  */
178 static GstStaticPadTemplate rtpbin_recv_fec_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS ("application/x-rtp")
183     );
184
185 /**
186  * GstRtpBin!send_fec_src_%u_%u:
187  *
188  * Src template for sending Forward Error Correction packets,
189  * in the form send_fec_src_<session_idx>_<fec_stream_idx>
190  *
191  * See #GstRTPST_2022_1_FecEnc for example usage
192  *
193  * Since: 1.20
194  */
195 static GstStaticPadTemplate rtpbin_send_fec_src_template =
196 GST_STATIC_PAD_TEMPLATE ("send_fec_src_%u_%u",
197     GST_PAD_SRC,
198     GST_PAD_SOMETIMES,
199     GST_STATIC_CAPS ("application/x-rtp")
200     );
201
202 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
203     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
204     GST_PAD_SINK,
205     GST_PAD_REQUEST,
206     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
207     );
208
209 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
210 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
211     GST_PAD_SINK,
212     GST_PAD_REQUEST,
213     GST_STATIC_CAPS ("application/x-rtp")
214     );
215
216 /* src pads */
217 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
218 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
219     GST_PAD_SRC,
220     GST_PAD_SOMETIMES,
221     GST_STATIC_CAPS ("application/x-rtp")
222     );
223
224 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
225     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
226     GST_PAD_SRC,
227     GST_PAD_REQUEST,
228     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
229     );
230
231 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
232     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
233     GST_PAD_SRC,
234     GST_PAD_SOMETIMES,
235     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
236     );
237
238 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
239 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
240
241 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
242 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
243 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
244
245 /* lock for shutdown */
246 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
247 G_STMT_START {                                   \
248   if (g_atomic_int_get (&bin->priv->shutdown))   \
249     goto label;                                  \
250   GST_RTP_BIN_DYN_LOCK (bin);                    \
251   if (g_atomic_int_get (&bin->priv->shutdown)) { \
252     GST_RTP_BIN_DYN_UNLOCK (bin);                \
253     goto label;                                  \
254   }                                              \
255 } G_STMT_END
256
257 /* unlock for shutdown */
258 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
259   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
260
261 /* Minimum time offset to apply. This compensates for rounding errors in NTP to
262  * RTP timestamp conversions */
263 #define MIN_TS_OFFSET_ROUND_OFF_COMP (4 * GST_MSECOND)
264
265 struct _GstRtpBinPrivate
266 {
267   GMutex bin_lock;
268
269   /* lock protecting dynamic adding/removing */
270   GMutex dyn_lock;
271
272   /* if we are shutting down or not */
273   gint shutdown;
274
275   gboolean autoremove;
276
277   /* NTP time in ns of last SR sync used */
278   guint64 last_ntpnstime;
279
280   /* list of extra elements */
281   GList *elements;
282 };
283
284 /* signals and args */
285 enum
286 {
287   SIGNAL_REQUEST_PT_MAP,
288   SIGNAL_PAYLOAD_TYPE_CHANGE,
289   SIGNAL_CLEAR_PT_MAP,
290   SIGNAL_RESET_SYNC,
291   SIGNAL_GET_SESSION,
292   SIGNAL_GET_INTERNAL_SESSION,
293   SIGNAL_GET_STORAGE,
294   SIGNAL_GET_INTERNAL_STORAGE,
295   SIGNAL_CLEAR_SSRC,
296
297   SIGNAL_ON_NEW_SSRC,
298   SIGNAL_ON_SSRC_COLLISION,
299   SIGNAL_ON_SSRC_VALIDATED,
300   SIGNAL_ON_SSRC_ACTIVE,
301   SIGNAL_ON_SSRC_SDES,
302   SIGNAL_ON_BYE_SSRC,
303   SIGNAL_ON_BYE_TIMEOUT,
304   SIGNAL_ON_TIMEOUT,
305   SIGNAL_ON_SENDER_TIMEOUT,
306   SIGNAL_ON_NPT_STOP,
307
308   SIGNAL_REQUEST_RTP_ENCODER,
309   SIGNAL_REQUEST_RTP_DECODER,
310   SIGNAL_REQUEST_RTCP_ENCODER,
311   SIGNAL_REQUEST_RTCP_DECODER,
312
313   SIGNAL_REQUEST_FEC_DECODER,
314   SIGNAL_REQUEST_FEC_DECODER_FULL,
315   SIGNAL_REQUEST_FEC_ENCODER,
316
317   SIGNAL_REQUEST_JITTERBUFFER,
318
319   SIGNAL_NEW_JITTERBUFFER,
320   SIGNAL_NEW_STORAGE,
321
322   SIGNAL_REQUEST_AUX_SENDER,
323   SIGNAL_REQUEST_AUX_RECEIVER,
324
325   SIGNAL_ON_NEW_SENDER_SSRC,
326   SIGNAL_ON_SENDER_SSRC_ACTIVE,
327
328   SIGNAL_ON_BUNDLED_SSRC,
329
330   LAST_SIGNAL
331 };
332
333 #define DEFAULT_LATENCY_MS           200
334 #define DEFAULT_DROP_ON_LATENCY      FALSE
335 #define DEFAULT_SDES                 NULL
336 #define DEFAULT_DO_LOST              FALSE
337 #define DEFAULT_IGNORE_PT            FALSE
338 #define DEFAULT_NTP_SYNC             FALSE
339 #define DEFAULT_AUTOREMOVE           FALSE
340 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
341 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
342 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
343 #define DEFAULT_RTCP_SYNC_INTERVAL   0
344 #define DEFAULT_DO_SYNC_EVENT        FALSE
345 #define DEFAULT_DO_RETRANSMISSION    FALSE
346 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
347 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
348 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
349 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
350 #define DEFAULT_MAX_DROPOUT_TIME     60000
351 #define DEFAULT_MAX_MISORDER_TIME    2000
352 #define DEFAULT_RFC7273_SYNC         FALSE
353 #define DEFAULT_ADD_REFERENCE_TIMESTAMP_META FALSE
354 #define DEFAULT_MAX_STREAMS          G_MAXUINT
355 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0)
356 #define DEFAULT_MAX_TS_OFFSET        G_GINT64_CONSTANT(3000000000)
357 #define DEFAULT_MIN_TS_OFFSET        MIN_TS_OFFSET_ROUND_OFF_COMP
358 #define DEFAULT_TS_OFFSET_SMOOTHING_FACTOR  0
359
360 enum
361 {
362   PROP_0,
363   PROP_LATENCY,
364   PROP_DROP_ON_LATENCY,
365   PROP_SDES,
366   PROP_DO_LOST,
367   PROP_IGNORE_PT,
368   PROP_NTP_SYNC,
369   PROP_RTCP_SYNC,
370   PROP_RTCP_SYNC_INTERVAL,
371   PROP_AUTOREMOVE,
372   PROP_BUFFER_MODE,
373   PROP_USE_PIPELINE_CLOCK,
374   PROP_DO_SYNC_EVENT,
375   PROP_DO_RETRANSMISSION,
376   PROP_RTP_PROFILE,
377   PROP_NTP_TIME_SOURCE,
378   PROP_RTCP_SYNC_SEND_TIME,
379   PROP_MAX_RTCP_RTP_TIME_DIFF,
380   PROP_MAX_DROPOUT_TIME,
381   PROP_MAX_MISORDER_TIME,
382   PROP_RFC7273_SYNC,
383   PROP_ADD_REFERENCE_TIMESTAMP_META,
384   PROP_MAX_STREAMS,
385   PROP_MAX_TS_OFFSET_ADJUSTMENT,
386   PROP_MAX_TS_OFFSET,
387   PROP_MIN_TS_OFFSET,
388   PROP_TS_OFFSET_SMOOTHING_FACTOR,
389   PROP_FEC_DECODERS,
390   PROP_FEC_ENCODERS,
391 };
392
393 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
394 static GType
395 gst_rtp_bin_rtcp_sync_get_type (void)
396 {
397   static GType rtcp_sync_type = 0;
398   static const GEnumValue rtcp_sync_types[] = {
399     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
400     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
401     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
402     {0, NULL, NULL},
403   };
404
405   if (!rtcp_sync_type) {
406     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
407   }
408   return rtcp_sync_type;
409 }
410
411 /* helper objects */
412 typedef struct _GstRtpBinSession GstRtpBinSession;
413 typedef struct _GstRtpBinStream GstRtpBinStream;
414 typedef struct _GstRtpBinClient GstRtpBinClient;
415
416 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
417
418 static GstCaps *pt_map_requested (GstElement * element, guint pt,
419     GstRtpBinSession * session);
420 static void payload_type_change (GstElement * element, guint pt,
421     GstRtpBinSession * session);
422 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
423 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
424 static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
425 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
426 static void remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session);
427 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
428 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
429 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
430 static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id);
431 static GstPad *complete_session_sink (GstRtpBin * rtpbin,
432     GstRtpBinSession * session);
433 static void
434 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
435     guint sessid);
436 static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
437     GstRtpBinSession * session, guint sessid);
438 static GstElement *session_request_element (GstRtpBinSession * session,
439     guint signal);
440
441 /* Manages the RTP stream for one SSRC.
442  *
443  * We pipe the stream (coming from the SSRC demuxer) into a jitterbuffer.
444  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
445  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
446  * together (see below).
447  */
448 struct _GstRtpBinStream
449 {
450   /* the SSRC of this stream */
451   guint32 ssrc;
452
453   /* parent bin */
454   GstRtpBin *bin;
455
456   /* the session this SSRC belongs to */
457   GstRtpBinSession *session;
458
459   /* the jitterbuffer of the SSRC */
460   GstElement *buffer;
461   gulong buffer_handlesync_sig;
462   gulong buffer_ptreq_sig;
463   gulong buffer_ntpstop_sig;
464   gint percent;
465
466   /* the PT demuxer of the SSRC */
467   GstElement *demux;
468   gulong demux_newpad_sig;
469   gulong demux_padremoved_sig;
470   gulong demux_ptreq_sig;
471   gulong demux_ptchange_sig;
472
473   /* if we have calculated a valid rt_delta for this stream */
474   gboolean have_sync;
475   /* mapping to local RTP and NTP time */
476   gint64 rt_delta;
477   gint64 rtp_delta;
478   gint64 avg_ts_offset;
479   gboolean is_initialized;
480   /* base rtptime in gst time */
481   gint64 clock_base;
482 };
483
484 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
485 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
486
487 /* Manages the receiving end of the packets.
488  *
489  * There is one such structure for each RTP session (audio/video/...).
490  * We get the RTP/RTCP packets and stuff them into the session manager. From
491  * there they are pushed into an SSRC demuxer that splits the stream based on
492  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
493  * the GstRtpBinStream above).
494  *
495  * Before the SSRC demuxer, a storage element may be inserted for the purpose
496  * of Forward Error Correction.
497  */
498 struct _GstRtpBinSession
499 {
500   /* session id */
501   gint id;
502   /* the parent bin */
503   GstRtpBin *bin;
504   /* the session element */
505   GstElement *session;
506   /* the SSRC demuxer */
507   GstElement *demux;
508   gulong demux_newpad_sig;
509   gulong demux_padremoved_sig;
510
511   /* Fec support */
512   GstElement *storage;
513
514   GMutex lock;
515
516   /* list of GstRtpBinStream */
517   GSList *streams;
518
519   /* list of elements */
520   GSList *elements;
521
522   /* mapping of payload type to caps */
523   GHashTable *ptmap;
524
525   /* the pads of the session */
526   GstPad *recv_rtp_sink;
527   GstPad *recv_rtp_sink_ghost;
528   GstPad *recv_rtp_src;
529   GstPad *recv_rtcp_sink;
530   GstPad *recv_rtcp_sink_ghost;
531   GstPad *sync_src;
532   GstPad *send_rtp_sink;
533   GstPad *send_rtp_sink_ghost;
534   GstPad *send_rtp_src_ghost;
535   GstPad *send_rtcp_src;
536   GstPad *send_rtcp_src_ghost;
537
538   GSList *recv_fec_sinks;
539   GSList *recv_fec_sink_ghosts;
540   /* fec decoder placed before the rtpjitterbuffer but after the rtpssrcdemux.
541    * XXX: This does not yet support multiple ssrc's in the same rtp session
542    */
543   GstElement *early_fec_decoder;
544
545   GSList *send_fec_src_ghosts;
546 };
547
548 /* Manages the RTP streams that come from one client and should therefore be
549  * synchronized.
550  */
551 struct _GstRtpBinClient
552 {
553   /* the common CNAME for the streams */
554   gchar *cname;
555   guint cname_len;
556
557   /* the streams */
558   guint nstreams;
559   GSList *streams;
560 };
561
562 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
563 static GstRtpBinSession *
564 find_session_by_id (GstRtpBin * rtpbin, gint id)
565 {
566   GSList *walk;
567
568   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
569     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
570
571     if (sess->id == id)
572       return sess;
573   }
574   return NULL;
575 }
576
577 static gboolean
578 pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad)
579 {
580   return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL;
581 }
582
583 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
584 static GstRtpBinSession *
585 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
586 {
587   GSList *walk;
588
589   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
590     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
591
592     if ((sess->recv_rtp_sink_ghost == pad) ||
593         (sess->recv_rtcp_sink_ghost == pad) ||
594         (sess->send_rtp_sink_ghost == pad) ||
595         (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad))
596       return sess;
597   }
598   return NULL;
599 }
600
601 static void
602 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
603 {
604   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
605       sess->id, ssrc);
606 }
607
608 static void
609 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
610 {
611   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
612       sess->id, ssrc);
613 }
614
615 static void
616 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
617 {
618   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
619       sess->id, ssrc);
620 }
621
622 static void
623 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
624 {
625   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
626       sess->id, ssrc);
627 }
628
629 static void
630 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
631 {
632   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
633       sess->id, ssrc);
634 }
635
636 static void
637 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
638 {
639   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
640       sess->id, ssrc);
641 }
642
643 static void
644 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
645 {
646   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
647       sess->id, ssrc);
648
649   if (sess->bin->priv->autoremove)
650     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
651 }
652
653 static void
654 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
655 {
656   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
657       sess->id, ssrc);
658
659   if (sess->bin->priv->autoremove)
660     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
661 }
662
663 static void
664 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
665 {
666   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
667       sess->id, ssrc);
668 }
669
670 static void
671 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
672 {
673   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
674       stream->session->id, stream->ssrc);
675 }
676
677 static void
678 on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
679 {
680   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
681       sess->id, ssrc);
682 }
683
684 static void
685 on_sender_ssrc_active (GstElement * session, guint32 ssrc,
686     GstRtpBinSession * sess)
687 {
688   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE],
689       0, sess->id, ssrc);
690 }
691
692 /* must be called with the SESSION lock */
693 static GstRtpBinStream *
694 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
695 {
696   GSList *walk;
697
698   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
699     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
700
701     if (stream->ssrc == ssrc)
702       return stream;
703   }
704   return NULL;
705 }
706
707 static void
708 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
709     GstRtpBinSession * session)
710 {
711   GstRtpBinStream *stream = NULL;
712   GstRtpBin *rtpbin;
713
714   rtpbin = session->bin;
715
716   GST_RTP_BIN_LOCK (rtpbin);
717
718   GST_RTP_SESSION_LOCK (session);
719   if ((stream = find_stream_by_ssrc (session, ssrc)))
720     session->streams = g_slist_remove (session->streams, stream);
721   GST_RTP_SESSION_UNLOCK (session);
722
723   if (stream)
724     free_stream (stream, rtpbin);
725
726   GST_RTP_BIN_UNLOCK (rtpbin);
727 }
728
729 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
730 static GstRtpBinSession *
731 create_session (GstRtpBin * rtpbin, gint id)
732 {
733   GstRtpBinSession *sess;
734   GstElement *session, *demux;
735   GstElement *storage = NULL;
736   GstState target;
737
738   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
739     goto no_session;
740
741   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
742     goto no_demux;
743
744   if (!(storage = gst_element_factory_make ("rtpstorage", NULL)))
745     goto no_storage;
746
747   /* need to sink the storage or otherwise signal handlers from bindings will
748    * take ownership of it and we don't own it anymore */
749   gst_object_ref_sink (storage);
750   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_STORAGE], 0, storage,
751       id);
752
753   sess = g_new0 (GstRtpBinSession, 1);
754   g_mutex_init (&sess->lock);
755   sess->id = id;
756   sess->bin = rtpbin;
757   sess->session = session;
758   sess->demux = demux;
759   sess->storage = storage;
760
761   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
762       (GDestroyNotify) gst_caps_unref);
763   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
764
765   /* configure SDES items */
766   GST_OBJECT_LOCK (rtpbin);
767   g_object_set (demux, "max-streams", rtpbin->max_streams, NULL);
768   g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile",
769       rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time,
770       NULL);
771   if (rtpbin->use_pipeline_clock)
772     g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock,
773         NULL);
774   else
775     g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL);
776
777   g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time,
778       "max-misorder-time", rtpbin->max_misorder_time, NULL);
779   GST_OBJECT_UNLOCK (rtpbin);
780
781   /* provide clock_rate to the session manager when needed */
782   g_signal_connect (session, "request-pt-map",
783       (GCallback) pt_map_requested, sess);
784
785   g_signal_connect (sess->session, "on-new-ssrc",
786       (GCallback) on_new_ssrc, sess);
787   g_signal_connect (sess->session, "on-ssrc-collision",
788       (GCallback) on_ssrc_collision, sess);
789   g_signal_connect (sess->session, "on-ssrc-validated",
790       (GCallback) on_ssrc_validated, sess);
791   g_signal_connect (sess->session, "on-ssrc-active",
792       (GCallback) on_ssrc_active, sess);
793   g_signal_connect (sess->session, "on-ssrc-sdes",
794       (GCallback) on_ssrc_sdes, sess);
795   g_signal_connect (sess->session, "on-bye-ssrc",
796       (GCallback) on_bye_ssrc, sess);
797   g_signal_connect (sess->session, "on-bye-timeout",
798       (GCallback) on_bye_timeout, sess);
799   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
800   g_signal_connect (sess->session, "on-sender-timeout",
801       (GCallback) on_sender_timeout, sess);
802   g_signal_connect (sess->session, "on-new-sender-ssrc",
803       (GCallback) on_new_sender_ssrc, sess);
804   g_signal_connect (sess->session, "on-sender-ssrc-active",
805       (GCallback) on_sender_ssrc_active, sess);
806
807   gst_bin_add (GST_BIN_CAST (rtpbin), session);
808   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
809   gst_bin_add (GST_BIN_CAST (rtpbin), storage);
810
811   /* unref the storage again, the bin has a reference now and
812    * we don't need it anymore */
813   gst_object_unref (storage);
814
815   GST_OBJECT_LOCK (rtpbin);
816   target = GST_STATE_TARGET (rtpbin);
817   GST_OBJECT_UNLOCK (rtpbin);
818
819   /* change state only to what's needed */
820   gst_element_set_state (demux, target);
821   gst_element_set_state (session, target);
822   gst_element_set_state (storage, target);
823
824   return sess;
825
826   /* ERRORS */
827 no_session:
828   {
829     g_warning ("rtpbin: could not create rtpsession element");
830     return NULL;
831   }
832 no_demux:
833   {
834     gst_object_unref (session);
835     g_warning ("rtpbin: could not create rtpssrcdemux element");
836     return NULL;
837   }
838 no_storage:
839   {
840     gst_object_unref (session);
841     gst_object_unref (demux);
842     g_warning ("rtpbin: could not create rtpstorage element");
843     return NULL;
844   }
845 }
846
847 static gboolean
848 bin_manage_element (GstRtpBin * bin, GstElement * element)
849 {
850   GstRtpBinPrivate *priv = bin->priv;
851
852   if (g_list_find (priv->elements, element)) {
853     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
854   } else {
855     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
856
857     if (g_object_is_floating (element))
858       element = gst_object_ref_sink (element);
859
860     if (!gst_bin_add (GST_BIN_CAST (bin), element))
861       goto add_failed;
862     if (!gst_element_sync_state_with_parent (element))
863       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
864   }
865   /* we add the element multiple times, each we need an equal number of
866    * removes to really remove the element from the bin */
867   priv->elements = g_list_prepend (priv->elements, element);
868
869   return TRUE;
870
871   /* ERRORS */
872 add_failed:
873   {
874     GST_WARNING_OBJECT (bin, "unable to add element");
875     gst_object_unref (element);
876     return FALSE;
877   }
878 }
879
880 static void
881 remove_bin_element (GstElement * element, GstRtpBin * bin)
882 {
883   GstRtpBinPrivate *priv = bin->priv;
884   GList *find;
885
886   find = g_list_find (priv->elements, element);
887   if (find) {
888     priv->elements = g_list_delete_link (priv->elements, find);
889
890     if (!g_list_find (priv->elements, element)) {
891       gst_element_set_locked_state (element, TRUE);
892       gst_bin_remove (GST_BIN_CAST (bin), element);
893       gst_element_set_state (element, GST_STATE_NULL);
894     }
895
896     gst_object_unref (element);
897   }
898 }
899
900 /* called with RTP_BIN_LOCK */
901 static void
902 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
903 {
904   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
905
906   gst_element_set_locked_state (sess->demux, TRUE);
907   gst_element_set_locked_state (sess->session, TRUE);
908   gst_element_set_locked_state (sess->storage, TRUE);
909
910   gst_element_set_state (sess->demux, GST_STATE_NULL);
911   gst_element_set_state (sess->session, GST_STATE_NULL);
912   gst_element_set_state (sess->storage, GST_STATE_NULL);
913
914   remove_recv_rtp (bin, sess);
915   remove_recv_rtcp (bin, sess);
916   remove_recv_fec (bin, sess);
917   remove_send_rtp (bin, sess);
918   remove_send_fec (bin, sess);
919   remove_rtcp (bin, sess);
920
921   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
922   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
923   gst_bin_remove (GST_BIN_CAST (bin), sess->storage);
924
925   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
926   g_slist_free (sess->elements);
927   sess->elements = NULL;
928
929   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
930   g_slist_free (sess->streams);
931
932   g_mutex_clear (&sess->lock);
933   g_hash_table_destroy (sess->ptmap);
934
935   g_free (sess);
936 }
937
938 /* get the payload type caps for the specific payload @pt in @session */
939 static GstCaps *
940 get_pt_map (GstRtpBinSession * session, guint pt)
941 {
942   GstCaps *caps = NULL;
943   GstRtpBin *bin;
944   GValue ret = { 0 };
945   GValue args[3] = { {0}, {0}, {0} };
946
947   GST_DEBUG ("searching pt %u in cache", pt);
948
949   GST_RTP_SESSION_LOCK (session);
950
951   /* first look in the cache */
952   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
953   if (caps) {
954     gst_caps_ref (caps);
955     goto done;
956   }
957
958   bin = session->bin;
959
960   GST_DEBUG ("emitting signal for pt %u in session %u", pt, session->id);
961
962   /* not in cache, send signal to request caps */
963   g_value_init (&args[0], GST_TYPE_ELEMENT);
964   g_value_set_object (&args[0], bin);
965   g_value_init (&args[1], G_TYPE_UINT);
966   g_value_set_uint (&args[1], session->id);
967   g_value_init (&args[2], G_TYPE_UINT);
968   g_value_set_uint (&args[2], pt);
969
970   g_value_init (&ret, GST_TYPE_CAPS);
971   g_value_set_boxed (&ret, NULL);
972
973   GST_RTP_SESSION_UNLOCK (session);
974
975   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
976
977   GST_RTP_SESSION_LOCK (session);
978
979   g_value_unset (&args[0]);
980   g_value_unset (&args[1]);
981   g_value_unset (&args[2]);
982
983   /* look in the cache again because we let the lock go */
984   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
985   if (caps) {
986     gst_caps_ref (caps);
987     g_value_unset (&ret);
988     goto done;
989   }
990
991   caps = (GstCaps *) g_value_dup_boxed (&ret);
992   g_value_unset (&ret);
993   if (!caps)
994     goto no_caps;
995
996   GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps);
997
998   /* store in cache, take additional ref */
999   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
1000       gst_caps_ref (caps));
1001
1002 done:
1003   GST_RTP_SESSION_UNLOCK (session);
1004
1005   return caps;
1006
1007   /* ERRORS */
1008 no_caps:
1009   {
1010     GST_RTP_SESSION_UNLOCK (session);
1011     GST_DEBUG ("no pt map could be obtained");
1012     return NULL;
1013   }
1014 }
1015
1016 static gboolean
1017 return_true (gpointer key, gpointer value, gpointer user_data)
1018 {
1019   return TRUE;
1020 }
1021
1022 static void
1023 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
1024 {
1025   GSList *clients, *streams;
1026
1027   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
1028
1029   GST_RTP_BIN_LOCK (rtpbin);
1030   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
1031     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1032
1033     /* reset sync on all streams for this client */
1034     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
1035       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1036
1037       /* make use require a new SR packet for this stream before we attempt new
1038        * lip-sync */
1039       stream->have_sync = FALSE;
1040       stream->rt_delta = 0;
1041       stream->avg_ts_offset = 0;
1042       stream->is_initialized = FALSE;
1043       stream->rtp_delta = 0;
1044       stream->clock_base = -100 * GST_SECOND;
1045     }
1046   }
1047   GST_RTP_BIN_UNLOCK (rtpbin);
1048 }
1049
1050 static void
1051 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
1052 {
1053   GSList *sessions, *streams;
1054
1055   GST_RTP_BIN_LOCK (bin);
1056   GST_DEBUG_OBJECT (bin, "clearing pt map");
1057   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1058     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1059
1060     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
1061     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
1062
1063     GST_RTP_SESSION_LOCK (session);
1064     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
1065
1066     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1067       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1068
1069       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
1070       if (g_signal_lookup ("clear-pt-map", G_OBJECT_TYPE (stream->buffer)) != 0)
1071         g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
1072       if (stream->demux)
1073         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
1074     }
1075     GST_RTP_SESSION_UNLOCK (session);
1076   }
1077   GST_RTP_BIN_UNLOCK (bin);
1078
1079   /* reset sync too */
1080   gst_rtp_bin_reset_sync (bin);
1081 }
1082
1083 static GstElement *
1084 gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id)
1085 {
1086   GstRtpBinSession *session;
1087   GstElement *ret = NULL;
1088
1089   GST_RTP_BIN_LOCK (bin);
1090   GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id);
1091   session = find_session_by_id (bin, (gint) session_id);
1092   if (session) {
1093     ret = gst_object_ref (session->session);
1094   }
1095   GST_RTP_BIN_UNLOCK (bin);
1096
1097   return ret;
1098 }
1099
1100 static RTPSession *
1101 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
1102 {
1103   RTPSession *internal_session = NULL;
1104   GstRtpBinSession *session;
1105
1106   GST_RTP_BIN_LOCK (bin);
1107   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u",
1108       session_id);
1109   session = find_session_by_id (bin, (gint) session_id);
1110   if (session) {
1111     g_object_get (session->session, "internal-session", &internal_session,
1112         NULL);
1113   }
1114   GST_RTP_BIN_UNLOCK (bin);
1115
1116   return internal_session;
1117 }
1118
1119 static GstElement *
1120 gst_rtp_bin_get_storage (GstRtpBin * bin, guint session_id)
1121 {
1122   GstRtpBinSession *session;
1123   GstElement *res = NULL;
1124
1125   GST_RTP_BIN_LOCK (bin);
1126   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1127       session_id);
1128   session = find_session_by_id (bin, (gint) session_id);
1129   if (session && session->storage) {
1130     res = gst_object_ref (session->storage);
1131   }
1132   GST_RTP_BIN_UNLOCK (bin);
1133
1134   return res;
1135 }
1136
1137 static GObject *
1138 gst_rtp_bin_get_internal_storage (GstRtpBin * bin, guint session_id)
1139 {
1140   GObject *internal_storage = NULL;
1141   GstRtpBinSession *session;
1142
1143   GST_RTP_BIN_LOCK (bin);
1144   GST_DEBUG_OBJECT (bin, "retrieving internal storage object, index: %u",
1145       session_id);
1146   session = find_session_by_id (bin, (gint) session_id);
1147   if (session && session->storage) {
1148     g_object_get (session->storage, "internal-storage", &internal_storage,
1149         NULL);
1150   }
1151   GST_RTP_BIN_UNLOCK (bin);
1152
1153   return internal_storage;
1154 }
1155
1156 static void
1157 gst_rtp_bin_clear_ssrc (GstRtpBin * bin, guint session_id, guint32 ssrc)
1158 {
1159   GstRtpBinSession *session;
1160   GstElement *demux = NULL;
1161
1162   GST_RTP_BIN_LOCK (bin);
1163   GST_DEBUG_OBJECT (bin, "clearing ssrc %u for session %u", ssrc, session_id);
1164   session = find_session_by_id (bin, (gint) session_id);
1165   if (session)
1166     demux = gst_object_ref (session->demux);
1167   GST_RTP_BIN_UNLOCK (bin);
1168
1169   if (demux) {
1170     g_signal_emit_by_name (demux, "clear-ssrc", ssrc, NULL);
1171     gst_object_unref (demux);
1172   }
1173 }
1174
1175 static GstElement *
1176 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
1177 {
1178   GST_DEBUG_OBJECT (bin, "return NULL encoder");
1179   return NULL;
1180 }
1181
1182 static GstElement *
1183 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
1184 {
1185   GST_DEBUG_OBJECT (bin, "return NULL decoder");
1186   return NULL;
1187 }
1188
1189 static GstElement *
1190 gst_rtp_bin_request_jitterbuffer (GstRtpBin * bin, guint session_id)
1191 {
1192   return gst_element_factory_make ("rtpjitterbuffer", NULL);
1193 }
1194
1195 static void
1196 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
1197     const gchar * name, const GValue * value)
1198 {
1199   GSList *sessions, *streams;
1200
1201   GST_RTP_BIN_LOCK (bin);
1202   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1203     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1204
1205     GST_RTP_SESSION_LOCK (session);
1206     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1207       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1208       GObjectClass *jb_class;
1209
1210       jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1211       if (g_object_class_find_property (jb_class, name))
1212         g_object_set_property (G_OBJECT (stream->buffer), name, value);
1213       else
1214         GST_WARNING_OBJECT (bin,
1215             "Stream jitterbuffer does not expose property %s", name);
1216     }
1217     GST_RTP_SESSION_UNLOCK (session);
1218   }
1219   GST_RTP_BIN_UNLOCK (bin);
1220 }
1221
1222 static void
1223 gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin,
1224     const gchar * name, const GValue * value)
1225 {
1226   GSList *sessions;
1227
1228   GST_RTP_BIN_LOCK (bin);
1229   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1230     GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data;
1231
1232     g_object_set_property (G_OBJECT (sess->session), name, value);
1233   }
1234   GST_RTP_BIN_UNLOCK (bin);
1235 }
1236
1237 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
1238 static GstRtpBinClient *
1239 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
1240 {
1241   GstRtpBinClient *result = NULL;
1242   GSList *walk;
1243
1244   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
1245     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
1246
1247     if (len != client->cname_len)
1248       continue;
1249
1250     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
1251       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
1252           client->cname);
1253       result = client;
1254       break;
1255     }
1256   }
1257
1258   /* nothing found, create one */
1259   if (result == NULL) {
1260     result = g_new0 (GstRtpBinClient, 1);
1261     result->cname = g_strndup ((gchar *) data, len);
1262     result->cname_len = len;
1263     bin->clients = g_slist_prepend (bin->clients, result);
1264     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
1265         result->cname);
1266   }
1267   return result;
1268 }
1269
1270 static void
1271 free_client (GstRtpBinClient * client, GstRtpBin * bin)
1272 {
1273   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
1274   g_slist_free (client->streams);
1275   g_free (client->cname);
1276   g_free (client);
1277 }
1278
1279 static void
1280 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
1281     guint64 * ntpnstime)
1282 {
1283   guint64 ntpns = -1;
1284   GstClock *clock;
1285   GstClockTime base_time, rt, clock_time;
1286
1287   GST_OBJECT_LOCK (bin);
1288   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1289     base_time = GST_ELEMENT_CAST (bin)->base_time;
1290     gst_object_ref (clock);
1291     GST_OBJECT_UNLOCK (bin);
1292
1293     /* get current clock time and convert to running time */
1294     clock_time = gst_clock_get_time (clock);
1295     rt = clock_time - base_time;
1296
1297     if (bin->use_pipeline_clock) {
1298       ntpns = rt;
1299       /* add constant to convert from 1970 based time to 1900 based time */
1300       ntpns += (2208988800LL * GST_SECOND);
1301     } else {
1302       switch (bin->ntp_time_source) {
1303         case GST_RTP_NTP_TIME_SOURCE_NTP:
1304         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1305           /* get current NTP time */
1306           ntpns = g_get_real_time () * GST_USECOND;
1307
1308           /* add constant to convert from 1970 based time to 1900 based time */
1309           if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1310             ntpns += (2208988800LL * GST_SECOND);
1311           break;
1312         }
1313         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1314           ntpns = rt;
1315           break;
1316         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1317           ntpns = clock_time;
1318           break;
1319         default:
1320           ntpns = -1;           /* Fix uninited compiler warning */
1321           g_assert_not_reached ();
1322           break;
1323       }
1324     }
1325
1326     gst_object_unref (clock);
1327   } else {
1328     GST_OBJECT_UNLOCK (bin);
1329     rt = -1;
1330     ntpns = -1;
1331   }
1332   if (running_time)
1333     *running_time = rt;
1334   if (ntpnstime)
1335     *ntpnstime = ntpns;
1336 }
1337
1338 static void
1339 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1340     gint64 ts_offset, gint64 max_ts_offset, guint64 min_ts_offset,
1341     gboolean allow_positive_ts_offset)
1342 {
1343   gint64 prev_ts_offset;
1344   GObjectClass *jb_class;
1345
1346   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
1347
1348   if (!g_object_class_find_property (jb_class, "ts-offset")) {
1349     GST_LOG_OBJECT (bin,
1350         "stream's jitterbuffer does not expose ts-offset property");
1351     return;
1352   }
1353
1354   if (bin->ts_offset_smoothing_factor > 0) {
1355     if (!stream->is_initialized) {
1356       stream->avg_ts_offset = ts_offset;
1357       stream->is_initialized = TRUE;
1358     } else {
1359       /* RMA algorithm using smoothing factor is following, but split into
1360        * parts to check for overflows:
1361        * stream->avg_ts_offset =
1362        *   ((bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset
1363        *    + ts_offset) / bin->ts_offset_smoothing_factor
1364        */
1365       guint64 max_possible_smoothing_factor =
1366           G_MAXINT64 / ABS (stream->avg_ts_offset);
1367       gint64 cur_avg_product =
1368           (bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset;
1369
1370       if ((max_possible_smoothing_factor < bin->ts_offset_smoothing_factor) ||
1371           (cur_avg_product > 0 && G_MAXINT64 - cur_avg_product < ts_offset) ||
1372           (cur_avg_product < 0 && G_MININT64 - cur_avg_product > ts_offset)) {
1373         GST_WARNING_OBJECT (bin,
1374             "ts-offset-smoothing-factor calculation overflow, fallback to using ts-offset directly");
1375         stream->avg_ts_offset = ts_offset;
1376       } else {
1377         stream->avg_ts_offset =
1378             (cur_avg_product + ts_offset) / bin->ts_offset_smoothing_factor;
1379       }
1380     }
1381   } else {
1382     stream->avg_ts_offset = ts_offset;
1383   }
1384
1385   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1386
1387   /* delta changed, see how much */
1388   if (prev_ts_offset != stream->avg_ts_offset) {
1389     gint64 diff;
1390
1391     diff = prev_ts_offset - stream->avg_ts_offset;
1392
1393     GST_DEBUG_OBJECT (bin,
1394         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1395         ", diff: %" G_GINT64_FORMAT, stream->avg_ts_offset, prev_ts_offset,
1396         diff);
1397
1398     /* ignore minor offsets */
1399     if (ABS (diff) < min_ts_offset) {
1400       GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1401       return;
1402     }
1403
1404     /* sanity check offset */
1405     if (max_ts_offset > 0) {
1406       if (stream->avg_ts_offset > 0 && !allow_positive_ts_offset) {
1407         GST_DEBUG_OBJECT (bin,
1408             "offset is positive (clocks are out of sync), ignoring");
1409         return;
1410       }
1411       if (ABS (stream->avg_ts_offset) > max_ts_offset) {
1412         GST_DEBUG_OBJECT (bin, "offset too large, ignoring");
1413         return;
1414       }
1415     }
1416
1417     g_object_set (stream->buffer, "ts-offset", stream->avg_ts_offset, NULL);
1418   }
1419   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1420       stream->ssrc, stream->avg_ts_offset);
1421 }
1422
1423 static void
1424 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1425 {
1426   if (stream->bin->send_sync_event) {
1427     GstEvent *event;
1428     GstPad *srcpad;
1429
1430     GST_DEBUG_OBJECT (stream->bin,
1431         "sending GstRTCPSRReceived event downstream");
1432
1433     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1434         gst_structure_new_empty ("GstRTCPSRReceived"));
1435
1436     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1437     gst_pad_push_event (srcpad, event);
1438     gst_object_unref (srcpad);
1439   }
1440 }
1441
1442 /* associate a stream to the given CNAME. This will make sure all streams for
1443  * that CNAME are synchronized together.
1444  * Must be called with GST_RTP_BIN_LOCK */
1445 static void
1446 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1447     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1448     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1449     gint64 rtp_clock_base)
1450 {
1451   GstRtpBinClient *client;
1452   gboolean created;
1453   GSList *walk;
1454   GstClockTime running_time, running_time_rtp;
1455   guint64 ntpnstime;
1456
1457   /* first find or create the CNAME */
1458   client = get_client (bin, len, data, &created);
1459
1460   /* find stream in the client */
1461   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1462     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1463
1464     if (ostream == stream)
1465       break;
1466   }
1467   /* not found, add it to the list */
1468   if (walk == NULL) {
1469     GST_DEBUG_OBJECT (bin,
1470         "new association of SSRC %08x with client %p with CNAME %s",
1471         stream->ssrc, client, client->cname);
1472     client->streams = g_slist_prepend (client->streams, stream);
1473     client->nstreams++;
1474   } else {
1475     GST_DEBUG_OBJECT (bin,
1476         "found association of SSRC %08x with client %p with CNAME %s",
1477         stream->ssrc, client, client->cname);
1478   }
1479
1480   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1481     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1482     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1483       /* we don't need that data, so carry on,
1484        * but make some values look saner */
1485       last_extrtptime = base_rtptime;
1486     } else {
1487       /* nothing we can do with this data in this case */
1488       GST_DEBUG_OBJECT (bin, "bailing out");
1489       return;
1490     }
1491   }
1492
1493   /* Take the extended rtptime we found in the SR packet and map it to the
1494    * local rtptime. The local rtp time is used to construct timestamps on the
1495    * buffers so we will calculate what running_time corresponds to the RTP
1496    * timestamp in the SR packet. */
1497   running_time_rtp = last_extrtptime - base_rtptime;
1498
1499   GST_DEBUG_OBJECT (bin,
1500       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1501       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1502       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1503       last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
1504
1505   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1506    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1507    * into a corresponding gstreamer timestamp. Note that the base_time also
1508    * contains the drift between sender and receiver. */
1509   running_time =
1510       gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
1511   running_time += base_time;
1512
1513   /* convert ntptime to nanoseconds */
1514   ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND,
1515       (G_GINT64_CONSTANT (1) << 32));
1516
1517   stream->have_sync = TRUE;
1518
1519   GST_DEBUG_OBJECT (bin,
1520       "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
1521       running_time, ntpnstime);
1522
1523   /* recalc inter stream playout offset, but only if there is more than one
1524    * stream or we're doing NTP sync. */
1525   if (bin->ntp_sync) {
1526     gint64 ntpdiff, rtdiff;
1527     guint64 local_ntpnstime;
1528     GstClockTime local_running_time;
1529
1530     /* For NTP sync we need to first get a snapshot of running_time and NTP
1531      * time. We know at what running_time we play a certain RTP time, we also
1532      * calculated when we would play the RTP time in the SR packet. Now we need
1533      * to know how the running_time and the NTP time relate to each other. */
1534     get_current_times (bin, &local_running_time, &local_ntpnstime);
1535
1536     /* see how far away the NTP time is. This is the difference between the
1537      * current NTP time and the NTP time in the last SR packet. */
1538     ntpdiff = local_ntpnstime - ntpnstime;
1539     /* see how far away the running_time is. This is the difference between the
1540      * current running_time and the running_time of the RTP timestamp in the
1541      * last SR packet. */
1542     rtdiff = local_running_time - running_time;
1543
1544     GST_DEBUG_OBJECT (bin,
1545         "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
1546         local_ntpnstime, ntpnstime);
1547     GST_DEBUG_OBJECT (bin,
1548         "local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
1549         G_GUINT64_FORMAT, local_running_time, running_time);
1550     GST_DEBUG_OBJECT (bin,
1551         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1552         rtdiff);
1553
1554     /* combine to get the final diff to apply to the running_time */
1555     stream->rt_delta = rtdiff - ntpdiff;
1556
1557     stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset,
1558         bin->min_ts_offset, FALSE);
1559   } else {
1560     gint64 min, rtp_min, clock_base = stream->clock_base;
1561     gboolean all_sync, use_rtp;
1562     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1563
1564     /* calculate delta between server and receiver. ntpnstime is created by
1565      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1566      * delta expresses the difference to our timeline and the server timeline. The
1567      * difference in itself doesn't mean much but we can combine the delta of
1568      * multiple streams to create a stream specific offset. */
1569     stream->rt_delta = ntpnstime - running_time;
1570
1571     /* calculate the min of all deltas, ignoring streams that did not yet have a
1572      * valid rt_delta because we did not yet receive an SR packet for those
1573      * streams.
1574      * We calculate the minimum because we would like to only apply positive
1575      * offsets to streams, delaying their playback instead of trying to speed up
1576      * other streams (which might be impossible when we have to create negative
1577      * latencies).
1578      * The stream that has the smallest diff is selected as the reference stream,
1579      * all other streams will have a positive offset to this difference. */
1580
1581     /* some alternative setting allow ignoring RTCP as much as possible,
1582      * for servers generating bogus ntp timeline */
1583     min = rtp_min = G_MAXINT64;
1584     use_rtp = FALSE;
1585     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1586       guint64 ext_base;
1587
1588       use_rtp = TRUE;
1589       /* signed version for convenience */
1590       clock_base = base_rtptime;
1591       /* deal with possible wrap-around */
1592       ext_base = base_rtptime;
1593       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1594       /* sanity check; base rtp and provided clock_base should be close */
1595       if (rtp_clock_base >= clock_base) {
1596         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1597           rtp_clock_base = base_time +
1598               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1599               GST_SECOND, clock_rate);
1600         } else {
1601           use_rtp = FALSE;
1602         }
1603       } else {
1604         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1605           rtp_clock_base = base_time -
1606               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1607               GST_SECOND, clock_rate);
1608         } else {
1609           use_rtp = FALSE;
1610         }
1611       }
1612       /* warn and bail for clarity out if no sane values */
1613       if (!use_rtp) {
1614         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1615         return;
1616       }
1617       /* store to track changes */
1618       clock_base = rtp_clock_base;
1619       /* generate a fake as before,
1620        * now equating rtptime obtained from RTP-Info,
1621        * where the large time represent the otherwise irrelevant npt/ntp time */
1622       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1623     } else {
1624       clock_base = rtp_clock_base;
1625     }
1626
1627     all_sync = TRUE;
1628     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1629       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1630
1631       if (!ostream->have_sync) {
1632         all_sync = FALSE;
1633         continue;
1634       }
1635
1636       /* change in current stream's base from previously init'ed value
1637        * leads to reset of all stream's base */
1638       if (stream != ostream && stream->clock_base >= 0 &&
1639           (stream->clock_base != clock_base)) {
1640         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1641         ostream->clock_base = -100 * GST_SECOND;
1642         ostream->rtp_delta = 0;
1643       }
1644
1645       if (ostream->rt_delta < min)
1646         min = ostream->rt_delta;
1647       if (ostream->rtp_delta < rtp_min)
1648         rtp_min = ostream->rtp_delta;
1649     }
1650
1651     /* arrange to re-sync for each stream upon significant change,
1652      * e.g. post-seek */
1653     all_sync = all_sync && (stream->clock_base == clock_base);
1654     stream->clock_base = clock_base;
1655
1656     /* may need init performed above later on, but nothing more to do now */
1657     if (client->nstreams <= 1)
1658       return;
1659
1660     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1661         " all sync %d", client, min, all_sync);
1662     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1663
1664     switch (rtcp_sync) {
1665       case GST_RTP_BIN_RTCP_SYNC_RTP:
1666         if (!use_rtp)
1667           break;
1668         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1669             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1670         /* fall-through */
1671       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1672         /* if all have been synced already, do not bother further */
1673         if (all_sync) {
1674           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1675           return;
1676         }
1677         break;
1678       default:
1679         break;
1680     }
1681
1682     /* bail out if we adjusted recently enough */
1683     if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
1684         bin->rtcp_sync_interval * GST_MSECOND) {
1685       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1686           "previous sender info too recent "
1687           "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
1688       return;
1689     }
1690     bin->priv->last_ntpnstime = ntpnstime;
1691
1692     /* calculate offsets for each stream */
1693     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1694       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1695       gint64 ts_offset;
1696
1697       /* ignore streams for which we didn't receive an SR packet yet, we
1698        * can't synchronize them yet. We can however sync other streams just
1699        * fine. */
1700       if (!ostream->have_sync)
1701         continue;
1702
1703       /* calculate offset to our reference stream, this should always give a
1704        * positive number. */
1705       if (use_rtp)
1706         ts_offset = ostream->rtp_delta - rtp_min;
1707       else
1708         ts_offset = ostream->rt_delta - min;
1709
1710       stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset,
1711           bin->min_ts_offset, TRUE);
1712     }
1713   }
1714   gst_rtp_bin_send_sync_event (stream);
1715
1716   return;
1717 }
1718
1719 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1720   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1721           (b) = gst_rtcp_packet_move_to_next ((packet)))
1722
1723 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1724   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1725           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1726
1727 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1728   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1729           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1730
1731 static void
1732 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1733     GstRtpBinStream * stream)
1734 {
1735   GstRtpBin *bin;
1736   GstRTCPPacket packet;
1737   guint32 ssrc;
1738   guint64 ntptime;
1739   gboolean have_sr, have_sdes;
1740   gboolean more;
1741   guint64 base_rtptime;
1742   guint64 base_time;
1743   guint clock_rate;
1744   guint64 clock_base;
1745   guint64 extrtptime;
1746   GstBuffer *buffer;
1747   GstRTCPBuffer rtcp = { NULL, };
1748
1749   bin = stream->bin;
1750
1751   GST_DEBUG_OBJECT (bin, "sync handler called");
1752
1753   /* get the last relation between the rtp timestamps and the gstreamer
1754    * timestamps. We get this info directly from the jitterbuffer which
1755    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1756    * what the current situation is. */
1757   base_rtptime =
1758       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1759   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1760   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1761   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1762   extrtptime =
1763       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1764   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1765
1766   have_sr = FALSE;
1767   have_sdes = FALSE;
1768
1769   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1770
1771   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1772     /* first packet must be SR or RR or else the validate would have failed */
1773     switch (gst_rtcp_packet_get_type (&packet)) {
1774       case GST_RTCP_TYPE_SR:
1775         /* only parse first. There is only supposed to be one SR in the packet
1776          * but we will deal with malformed packets gracefully */
1777         if (have_sr)
1778           break;
1779         /* get NTP and RTP times */
1780         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1781             NULL, NULL);
1782
1783         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1784         /* ignore SR that is not ours */
1785         if (ssrc != stream->ssrc)
1786           continue;
1787
1788         have_sr = TRUE;
1789         break;
1790       case GST_RTCP_TYPE_SDES:
1791       {
1792         gboolean more_items, more_entries;
1793
1794         /* only deal with first SDES, there is only supposed to be one SDES in
1795          * the RTCP packet but we deal with bad packets gracefully. Also bail
1796          * out if we have not seen an SR item yet. */
1797         if (have_sdes || !have_sr)
1798           break;
1799
1800         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1801           /* skip items that are not about the SSRC of the sender */
1802           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1803             continue;
1804
1805           /* find the CNAME entry */
1806           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1807             GstRTCPSDESType type;
1808             guint8 len;
1809             guint8 *data;
1810
1811             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1812
1813             if (type == GST_RTCP_SDES_CNAME) {
1814               GST_RTP_BIN_LOCK (bin);
1815               /* associate the stream to CNAME */
1816               gst_rtp_bin_associate (bin, stream, len, data,
1817                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1818                   clock_base);
1819               GST_RTP_BIN_UNLOCK (bin);
1820             }
1821           }
1822         }
1823         have_sdes = TRUE;
1824         break;
1825       }
1826       default:
1827         /* we can ignore these packets */
1828         break;
1829     }
1830   }
1831   gst_rtcp_buffer_unmap (&rtcp);
1832 }
1833
1834 /* create a new stream with @ssrc in @session. Must be called with
1835  * RTP_SESSION_LOCK. */
1836 static GstRtpBinStream *
1837 create_stream (GstRtpBinSession * session, guint32 ssrc)
1838 {
1839   GstElement *buffer, *demux = NULL;
1840   GstRtpBinStream *stream;
1841   GstRtpBin *rtpbin;
1842   GstState target;
1843   GObjectClass *jb_class;
1844
1845   rtpbin = session->bin;
1846
1847   if (g_slist_length (session->streams) >= rtpbin->max_streams)
1848     goto max_streams;
1849
1850   if (!(buffer =
1851           session_request_element (session, SIGNAL_REQUEST_JITTERBUFFER)))
1852     goto no_jitterbuffer;
1853
1854   if (!rtpbin->ignore_pt) {
1855     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1856       goto no_demux;
1857   }
1858
1859   stream = g_new0 (GstRtpBinStream, 1);
1860   stream->ssrc = ssrc;
1861   stream->bin = rtpbin;
1862   stream->session = session;
1863   stream->buffer = gst_object_ref (buffer);
1864   stream->demux = demux;
1865
1866   stream->have_sync = FALSE;
1867   stream->rt_delta = 0;
1868   stream->avg_ts_offset = 0;
1869   stream->is_initialized = FALSE;
1870   stream->rtp_delta = 0;
1871   stream->percent = 100;
1872   stream->clock_base = -100 * GST_SECOND;
1873   session->streams = g_slist_prepend (session->streams, stream);
1874
1875   jb_class = G_OBJECT_GET_CLASS (G_OBJECT (buffer));
1876
1877   if (g_signal_lookup ("request-pt-map", G_OBJECT_TYPE (buffer)) != 0) {
1878     /* provide clock_rate to the jitterbuffer when needed */
1879     stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1880         (GCallback) pt_map_requested, session);
1881   }
1882   if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (buffer)) != 0) {
1883     stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1884         (GCallback) on_npt_stop, stream);
1885   }
1886
1887   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1888   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1889
1890   /* configure latency and packet lost */
1891   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1892
1893   if (g_object_class_find_property (jb_class, "drop-on-latency"))
1894     g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1895   if (g_object_class_find_property (jb_class, "do-lost"))
1896     g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1897   if (g_object_class_find_property (jb_class, "mode"))
1898     g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1899   if (g_object_class_find_property (jb_class, "do-retransmission"))
1900     g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1901   if (g_object_class_find_property (jb_class, "max-rtcp-rtp-time-diff"))
1902     g_object_set (buffer, "max-rtcp-rtp-time-diff",
1903         rtpbin->max_rtcp_rtp_time_diff, NULL);
1904   if (g_object_class_find_property (jb_class, "max-dropout-time"))
1905     g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, NULL);
1906   if (g_object_class_find_property (jb_class, "max-misorder-time"))
1907     g_object_set (buffer, "max-misorder-time", rtpbin->max_misorder_time, NULL);
1908   if (g_object_class_find_property (jb_class, "rfc7273-sync"))
1909     g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
1910   if (g_object_class_find_property (jb_class, "add-reference-timestamp-meta"))
1911     g_object_set (buffer, "add-reference-timestamp-meta",
1912         rtpbin->add_reference_timestamp_meta, NULL);
1913   if (g_object_class_find_property (jb_class, "max-ts-offset-adjustment"))
1914     g_object_set (buffer, "max-ts-offset-adjustment",
1915         rtpbin->max_ts_offset_adjustment, NULL);
1916
1917   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1918       buffer, session->id, ssrc);
1919
1920   if (!rtpbin->ignore_pt)
1921     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1922
1923   /* link stuff */
1924   if (demux)
1925     gst_element_link_pads_full (buffer, "src", demux, "sink",
1926         GST_PAD_LINK_CHECK_NOTHING);
1927
1928   if (rtpbin->buffering) {
1929     guint64 last_out;
1930
1931     if (g_signal_lookup ("set-active", G_OBJECT_TYPE (buffer)) != 0) {
1932       GST_INFO_OBJECT (rtpbin,
1933           "bin is buffering, set jitterbuffer as not active");
1934       g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0,
1935           &last_out);
1936     }
1937   }
1938
1939
1940   GST_OBJECT_LOCK (rtpbin);
1941   target = GST_STATE_TARGET (rtpbin);
1942   GST_OBJECT_UNLOCK (rtpbin);
1943
1944   /* from sink to source */
1945   if (demux)
1946     gst_element_set_state (demux, target);
1947
1948   gst_element_set_state (buffer, target);
1949
1950   return stream;
1951
1952   /* ERRORS */
1953 max_streams:
1954   {
1955     GST_WARNING_OBJECT (rtpbin, "stream exceeds maximum (%d)",
1956         rtpbin->max_streams);
1957     return NULL;
1958   }
1959 no_jitterbuffer:
1960   {
1961     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1962     return NULL;
1963   }
1964 no_demux:
1965   {
1966     gst_object_unref (buffer);
1967     g_warning ("rtpbin: could not create rtpptdemux element");
1968     return NULL;
1969   }
1970 }
1971
1972 /* called with RTP_BIN_LOCK */
1973 static void
1974 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1975 {
1976   GstRtpBinSession *sess = stream->session;
1977   GSList *clients, *next_client;
1978
1979   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1980
1981   gst_element_set_locked_state (stream->buffer, TRUE);
1982   if (stream->demux)
1983     gst_element_set_locked_state (stream->demux, TRUE);
1984
1985   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1986   if (stream->demux)
1987     gst_element_set_state (stream->demux, GST_STATE_NULL);
1988
1989   if (stream->demux) {
1990     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1991     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1992     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1993     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1994   }
1995
1996   if (stream->buffer_handlesync_sig)
1997     g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1998   if (stream->buffer_ptreq_sig)
1999     g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
2000   if (stream->buffer_ntpstop_sig)
2001     g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
2002
2003   sess->elements = g_slist_remove (sess->elements, stream->buffer);
2004   remove_bin_element (stream->buffer, bin);
2005   gst_object_unref (stream->buffer);
2006
2007   if (stream->demux)
2008     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
2009
2010   for (clients = bin->clients; clients; clients = next_client) {
2011     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
2012     GSList *streams, *next_stream;
2013
2014     next_client = g_slist_next (clients);
2015
2016     for (streams = client->streams; streams; streams = next_stream) {
2017       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
2018
2019       next_stream = g_slist_next (streams);
2020
2021       if (ostream == stream) {
2022         client->streams = g_slist_delete_link (client->streams, streams);
2023         /* If this was the last stream belonging to this client,
2024          * clean up the client. */
2025         if (--client->nstreams == 0) {
2026           bin->clients = g_slist_delete_link (bin->clients, clients);
2027           free_client (client, bin);
2028           break;
2029         }
2030       }
2031     }
2032   }
2033   g_free (stream);
2034 }
2035
2036 /* GObject vmethods */
2037 static void gst_rtp_bin_dispose (GObject * object);
2038 static void gst_rtp_bin_finalize (GObject * object);
2039 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
2040     const GValue * value, GParamSpec * pspec);
2041 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
2042     GValue * value, GParamSpec * pspec);
2043
2044 /* GstElement vmethods */
2045 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
2046     GstStateChange transition);
2047 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
2048     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
2049 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
2050 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
2051
2052 #define gst_rtp_bin_parent_class parent_class
2053 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
2054 GST_ELEMENT_REGISTER_DEFINE (rtpbin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN);
2055
2056 static gboolean
2057 _gst_element_accumulator (GSignalInvocationHint * ihint,
2058     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2059 {
2060   GstElement *element;
2061
2062   element = g_value_get_object (handler_return);
2063   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
2064
2065   g_value_set_object (return_accu, element);
2066
2067   /* stop emission if we have an element */
2068   return (element == NULL);
2069 }
2070
2071 static gboolean
2072 _gst_caps_accumulator (GSignalInvocationHint * ihint,
2073     GValue * return_accu, const GValue * handler_return, gpointer dummy)
2074 {
2075   GstCaps *caps;
2076
2077   caps = g_value_get_boxed (handler_return);
2078   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
2079
2080   g_value_set_boxed (return_accu, caps);
2081
2082   /* stop emission if we have a caps */
2083   return (caps == NULL);
2084 }
2085
2086 static void
2087 gst_rtp_bin_class_init (GstRtpBinClass * klass)
2088 {
2089   GObjectClass *gobject_class;
2090   GstElementClass *gstelement_class;
2091   GstBinClass *gstbin_class;
2092
2093   gobject_class = (GObjectClass *) klass;
2094   gstelement_class = (GstElementClass *) klass;
2095   gstbin_class = (GstBinClass *) klass;
2096
2097   gobject_class->dispose = gst_rtp_bin_dispose;
2098   gobject_class->finalize = gst_rtp_bin_finalize;
2099   gobject_class->set_property = gst_rtp_bin_set_property;
2100   gobject_class->get_property = gst_rtp_bin_get_property;
2101
2102   g_object_class_install_property (gobject_class, PROP_LATENCY,
2103       g_param_spec_uint ("latency", "Buffer latency in ms",
2104           "Default amount of ms to buffer in the jitterbuffers", 0,
2105           G_MAXUINT, DEFAULT_LATENCY_MS,
2106           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2107
2108   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
2109       g_param_spec_boolean ("drop-on-latency",
2110           "Drop buffers when maximum latency is reached",
2111           "Tells the jitterbuffer to never exceed the given latency in size",
2112           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2113
2114   /**
2115    * GstRtpBin::request-pt-map:
2116    * @rtpbin: the object which received the signal
2117    * @session: the session
2118    * @pt: the pt
2119    *
2120    * Request the payload type as #GstCaps for @pt in @session.
2121    */
2122   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
2123       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
2124       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
2125       _gst_caps_accumulator, NULL, NULL, GST_TYPE_CAPS, 2, G_TYPE_UINT,
2126       G_TYPE_UINT);
2127
2128     /**
2129    * GstRtpBin::payload-type-change:
2130    * @rtpbin: the object which received the signal
2131    * @session: the session
2132    * @pt: the pt
2133    *
2134    * Signal that the current payload type changed to @pt in @session.
2135    */
2136   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
2137       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
2138       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
2139       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2140
2141   /**
2142    * GstRtpBin::clear-pt-map:
2143    * @rtpbin: the object which received the signal
2144    *
2145    * Clear all previously cached pt-mapping obtained with
2146    * #GstRtpBin::request-pt-map.
2147    */
2148   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
2149       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
2150       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2151           clear_pt_map), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2152
2153   /**
2154    * GstRtpBin::reset-sync:
2155    * @rtpbin: the object which received the signal
2156    *
2157    * Reset all currently configured lip-sync parameters and require new SR
2158    * packets for all streams before lip-sync is attempted again.
2159    */
2160   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
2161       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
2162       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2163           reset_sync), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
2164
2165   /**
2166    * GstRtpBin::get-session:
2167    * @rtpbin: the object which received the signal
2168    * @id: the session id
2169    *
2170    * Request the related GstRtpSession as #GstElement related with session @id.
2171    *
2172    * Since: 1.8
2173    */
2174   gst_rtp_bin_signals[SIGNAL_GET_SESSION] =
2175       g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass),
2176       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2177           get_session), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2178
2179   /**
2180    * GstRtpBin::get-internal-session:
2181    * @rtpbin: the object which received the signal
2182    * @id: the session id
2183    *
2184    * Request the internal RTPSession object as #GObject in session @id.
2185    */
2186   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
2187       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
2188       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2189           get_internal_session), NULL, NULL, NULL, RTP_TYPE_SESSION, 1,
2190       G_TYPE_UINT);
2191
2192   /**
2193    * GstRtpBin::get-internal-storage:
2194    * @rtpbin: the object which received the signal
2195    * @id: the session id
2196    *
2197    * Request the internal RTPStorage object as #GObject in session @id. This
2198    * is the internal storage used by the RTPStorage element, which is used to
2199    * keep a backlog of received RTP packets for the session @id.
2200    *
2201    * Since: 1.14
2202    */
2203   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_STORAGE] =
2204       g_signal_new ("get-internal-storage", G_TYPE_FROM_CLASS (klass),
2205       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2206           get_internal_storage), NULL, NULL, NULL, G_TYPE_OBJECT, 1,
2207       G_TYPE_UINT);
2208
2209   /**
2210    * GstRtpBin::get-storage:
2211    * @rtpbin: the object which received the signal
2212    * @id: the session id
2213    *
2214    * Request the RTPStorage element as #GObject in session @id. This element
2215    * is used to keep a backlog of received RTP packets for the session @id.
2216    *
2217    * Since: 1.16
2218    */
2219   gst_rtp_bin_signals[SIGNAL_GET_STORAGE] =
2220       g_signal_new ("get-storage", G_TYPE_FROM_CLASS (klass),
2221       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2222           get_storage), NULL, NULL, NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2223
2224   /**
2225    * GstRtpBin::clear-ssrc:
2226    * @rtpbin: the object which received the signal
2227    * @id: the session id
2228    * @ssrc: the ssrc
2229    *
2230    * Remove all pads from rtpssrcdemux element associated with the specified
2231    * ssrc. This delegate the action signal to the rtpssrcdemux element
2232    * associated with the specified session.
2233    *
2234    * Since: 1.20
2235    */
2236   gst_rtp_bin_signals[SIGNAL_CLEAR_SSRC] =
2237       g_signal_new ("clear-ssrc", G_TYPE_FROM_CLASS (klass),
2238       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2239           clear_ssrc), NULL, NULL, NULL, G_TYPE_NONE, 2,
2240       G_TYPE_UINT, G_TYPE_UINT);
2241
2242   /**
2243    * GstRtpBin::on-new-ssrc:
2244    * @rtpbin: the object which received the signal
2245    * @session: the session
2246    * @ssrc: the SSRC
2247    *
2248    * Notify of a new SSRC that entered @session.
2249    */
2250   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
2251       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
2252       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
2253       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2254   /**
2255    * GstRtpBin::on-ssrc-collision:
2256    * @rtpbin: the object which received the signal
2257    * @session: the session
2258    * @ssrc: the SSRC
2259    *
2260    * Notify when we have an SSRC collision
2261    */
2262   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
2263       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
2264       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
2265       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2266   /**
2267    * GstRtpBin::on-ssrc-validated:
2268    * @rtpbin: the object which received the signal
2269    * @session: the session
2270    * @ssrc: the SSRC
2271    *
2272    * Notify of a new SSRC that became validated.
2273    */
2274   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
2275       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
2276       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
2277       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2278   /**
2279    * GstRtpBin::on-ssrc-active:
2280    * @rtpbin: the object which received the signal
2281    * @session: the session
2282    * @ssrc: the SSRC
2283    *
2284    * Notify of a SSRC that is active, i.e., sending RTCP.
2285    */
2286   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
2287       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
2288       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
2289       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2290   /**
2291    * GstRtpBin::on-ssrc-sdes:
2292    * @rtpbin: the object which received the signal
2293    * @session: the session
2294    * @ssrc: the SSRC
2295    *
2296    * Notify of a SSRC that is active, i.e., sending RTCP.
2297    */
2298   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
2299       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
2300       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
2301       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2302
2303   /**
2304    * GstRtpBin::on-bye-ssrc:
2305    * @rtpbin: the object which received the signal
2306    * @session: the session
2307    * @ssrc: the SSRC
2308    *
2309    * Notify of an SSRC that became inactive because of a BYE packet.
2310    */
2311   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
2312       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
2313       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
2314       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2315   /**
2316    * GstRtpBin::on-bye-timeout:
2317    * @rtpbin: the object which received the signal
2318    * @session: the session
2319    * @ssrc: the SSRC
2320    *
2321    * Notify of an SSRC that has timed out because of BYE
2322    */
2323   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
2324       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
2325       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
2326       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2327   /**
2328    * GstRtpBin::on-timeout:
2329    * @rtpbin: the object which received the signal
2330    * @session: the session
2331    * @ssrc: the SSRC
2332    *
2333    * Notify of an SSRC that has timed out
2334    */
2335   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
2336       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
2337       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
2338       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2339   /**
2340    * GstRtpBin::on-sender-timeout:
2341    * @rtpbin: the object which received the signal
2342    * @session: the session
2343    * @ssrc: the SSRC
2344    *
2345    * Notify of a sender SSRC that has timed out and became a receiver
2346    */
2347   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
2348       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
2349       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
2350       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2351
2352   /**
2353    * GstRtpBin::on-npt-stop:
2354    * @rtpbin: the object which received the signal
2355    * @session: the session
2356    * @ssrc: the SSRC
2357    *
2358    * Notify that SSRC sender has sent data up to the configured NPT stop time.
2359    */
2360   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
2361       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
2362       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
2363       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2364
2365   /**
2366    * GstRtpBin::request-rtp-encoder:
2367    * @rtpbin: the object which received the signal
2368    * @session: the session
2369    *
2370    * Request an RTP encoder element for the given @session. The encoder
2371    * element will be added to the bin if not previously added.
2372    *
2373    * If no handler is connected, no encoder will be used.
2374    *
2375    * Since: 1.4
2376    */
2377   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
2378       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
2379       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2380           request_rtp_encoder), _gst_element_accumulator, NULL, NULL,
2381       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2382
2383   /**
2384    * GstRtpBin::request-rtp-decoder:
2385    * @rtpbin: the object which received the signal
2386    * @session: the session
2387    *
2388    * Request an RTP decoder element for the given @session. The decoder
2389    * element will be added to the bin if not previously added.
2390    *
2391    * If no handler is connected, no encoder will be used.
2392    *
2393    * Since: 1.4
2394    */
2395   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
2396       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
2397       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2398           request_rtp_decoder), _gst_element_accumulator, NULL,
2399       NULL, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2400
2401   /**
2402    * GstRtpBin::request-rtcp-encoder:
2403    * @rtpbin: the object which received the signal
2404    * @session: the session
2405    *
2406    * Request an RTCP encoder element for the given @session. The encoder
2407    * element will be added to the bin if not previously added.
2408    *
2409    * If no handler is connected, no encoder will be used.
2410    *
2411    * Since: 1.4
2412    */
2413   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
2414       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
2415       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2416           request_rtcp_encoder), _gst_element_accumulator, NULL, NULL,
2417       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2418
2419   /**
2420    * GstRtpBin::request-rtcp-decoder:
2421    * @rtpbin: the object which received the signal
2422    * @session: the session
2423    *
2424    * Request an RTCP decoder element for the given @session. The decoder
2425    * element will be added to the bin if not previously added.
2426    *
2427    * If no handler is connected, no encoder will be used.
2428    *
2429    * Since: 1.4
2430    */
2431   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
2432       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
2433       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2434           request_rtcp_decoder), _gst_element_accumulator, NULL, NULL,
2435       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2436
2437   /**
2438    * GstRtpBin::request-jitterbuffer:
2439    * @rtpbin: the object which received the signal
2440    * @session: the session
2441    *
2442    * Request a jitterbuffer element for the given @session.
2443    *
2444    * If no handler is connected, the default jitterbuffer will be used.
2445    *
2446    * Note: The provided element is expected to conform to the API exposed
2447    * by the standard #GstRtpJitterBuffer. Runtime checks will be made to
2448    * determine whether it exposes properties and signals before attempting
2449    * to set, call or connect to them, and some functionalities of #GstRtpBin
2450    * may not be available when that is not the case.
2451    *
2452    * This should be considered experimental API, as the standard jitterbuffer
2453    * API is susceptible to change, provided elements will have to update their
2454    * custom jitterbuffer's API to match the API of #GstRtpJitterBuffer if and
2455    * when it changes.
2456    *
2457    * Since: 1.18
2458    */
2459   gst_rtp_bin_signals[SIGNAL_REQUEST_JITTERBUFFER] =
2460       g_signal_new ("request-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2461       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2462           request_jitterbuffer), _gst_element_accumulator, NULL,
2463       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2464
2465   /**
2466    * GstRtpBin::new-jitterbuffer:
2467    * @rtpbin: the object which received the signal
2468    * @jitterbuffer: the new jitterbuffer
2469    * @session: the session
2470    * @ssrc: the SSRC
2471    *
2472    * Notify that a new @jitterbuffer was created for @session and @ssrc.
2473    * This signal can, for example, be used to configure @jitterbuffer.
2474    *
2475    * Since: 1.4
2476    */
2477   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
2478       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2479       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2480           new_jitterbuffer), NULL, NULL, NULL,
2481       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2482
2483   /**
2484    * GstRtpBin::new-storage:
2485    * @rtpbin: the object which received the signal
2486    * @storage: the new storage
2487    * @session: the session
2488    *
2489    * Notify that a new @storage was created for @session.
2490    * This signal can, for example, be used to configure @storage.
2491    *
2492    * Since: 1.14
2493    */
2494   gst_rtp_bin_signals[SIGNAL_NEW_STORAGE] =
2495       g_signal_new ("new-storage", G_TYPE_FROM_CLASS (klass),
2496       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2497           new_storage), NULL, NULL, NULL,
2498       G_TYPE_NONE, 2, GST_TYPE_ELEMENT, G_TYPE_UINT);
2499
2500   /**
2501    * GstRtpBin::request-aux-sender:
2502    * @rtpbin: the object which received the signal
2503    * @session: the session
2504    *
2505    * Request an AUX sender element for the given @session. The AUX
2506    * element will be added to the bin.
2507    *
2508    * If no handler is connected, no AUX element will be used.
2509    *
2510    * Since: 1.4
2511    */
2512   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2513       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2514       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2515           request_aux_sender), _gst_element_accumulator, NULL, NULL,
2516       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2517
2518   /**
2519    * GstRtpBin::request-aux-receiver:
2520    * @rtpbin: the object which received the signal
2521    * @session: the session
2522    *
2523    * Request an AUX receiver element for the given @session. The AUX
2524    * element will be added to the bin.
2525    *
2526    * If no handler is connected, no AUX element will be used.
2527    *
2528    * Since: 1.4
2529    */
2530   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2531       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2532       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2533           request_aux_receiver), _gst_element_accumulator, NULL, NULL,
2534       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2535
2536   /**
2537    * GstRtpBin::request-fec-decoder:
2538    * @rtpbin: the object which received the signal
2539    * @session: the session index
2540    *
2541    * Request a FEC decoder element for the given @session. The element
2542    * will be added to the bin after the pt demuxer.  If there are multiple
2543    * ssrc's and pt's in @session, this signal may be called multiple times for
2544    * the same @session each corresponding to a newly discovered ssrc.
2545    *
2546    * If no handler is connected, no FEC decoder will be used.
2547    *
2548    * Warning: usage of this signal is not appropriate for the BUNDLE case,
2549    * connect to #GstRtpBin::request-fec-decoder-full instead.
2550    *
2551    * Since: 1.14
2552    */
2553   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER] =
2554       g_signal_new ("request-fec-decoder", G_TYPE_FROM_CLASS (klass),
2555       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2556           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2557       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2558
2559   /**
2560    * GstRtpBin::request-fec-decoder-full:
2561    * @rtpbin: the object which received the signal
2562    * @session: the session index
2563    * @ssrc: the ssrc of the stream
2564    * @pt: the payload type
2565    *
2566    * Request a FEC decoder element for the given @session. The element
2567    * will be added to the bin after the pt demuxer.  If there are multiple
2568    * ssrc's and pt's in @session, this signal may be called multiple times for
2569    * the same @session each corresponding to a newly discovered ssrc and payload
2570    * type, those are provided as parameters.
2571    *
2572    * If no handler is connected, no FEC decoder will be used.
2573    *
2574    * Since: 1.20
2575    */
2576   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_DECODER_FULL] =
2577       g_signal_new ("request-fec-decoder-full", G_TYPE_FROM_CLASS (klass),
2578       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2579           request_fec_decoder), _gst_element_accumulator, NULL, NULL,
2580       GST_TYPE_ELEMENT, 3, G_TYPE_UINT, G_TYPE_UINT, G_TYPE_UINT);
2581
2582   /**
2583    * GstRtpBin::request-fec-encoder:
2584    * @rtpbin: the object which received the signal
2585    * @session: the session index
2586    *
2587    * Request a FEC encoder element for the given @session. The element
2588    * will be added to the bin after the RTPSession.
2589    *
2590    * If no handler is connected, no FEC encoder will be used.
2591    *
2592    * Since: 1.14
2593    */
2594   gst_rtp_bin_signals[SIGNAL_REQUEST_FEC_ENCODER] =
2595       g_signal_new ("request-fec-encoder", G_TYPE_FROM_CLASS (klass),
2596       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2597           request_fec_encoder), _gst_element_accumulator, NULL, NULL,
2598       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2599
2600   /**
2601    * GstRtpBin::on-new-sender-ssrc:
2602    * @rtpbin: the object which received the signal
2603    * @session: the session
2604    * @ssrc: the sender SSRC
2605    *
2606    * Notify of a new sender SSRC that entered @session.
2607    *
2608    * Since: 1.8
2609    */
2610   gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
2611       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
2612       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc),
2613       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2614   /**
2615    * GstRtpBin::on-sender-ssrc-active:
2616    * @rtpbin: the object which received the signal
2617    * @session: the session
2618    * @ssrc: the sender SSRC
2619    *
2620    * Notify of a sender SSRC that is active, i.e., sending RTCP.
2621    *
2622    * Since: 1.8
2623    */
2624   gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
2625       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
2626       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2627           on_sender_ssrc_active), NULL, NULL, NULL,
2628       G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2629
2630   g_object_class_install_property (gobject_class, PROP_SDES,
2631       g_param_spec_boxed ("sdes", "SDES",
2632           "The SDES items of this session",
2633           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
2634           | GST_PARAM_DOC_SHOW_DEFAULT));
2635
2636   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2637       g_param_spec_boolean ("do-lost", "Do Lost",
2638           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2639           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2640
2641   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2642       g_param_spec_boolean ("autoremove", "Auto Remove",
2643           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2644           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2645
2646   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2647       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2648           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2649           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2650
2651   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2652       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2653           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
2654           "(DEPRECATED: Use ntp-time-source property)",
2655           DEFAULT_USE_PIPELINE_CLOCK,
2656           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
2657   /**
2658    * GstRtpBin:buffer-mode:
2659    *
2660    * Control the buffering and timestamping mode used by the jitterbuffer.
2661    */
2662   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2663       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2664           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2665           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2666   /**
2667    * GstRtpBin:ntp-sync:
2668    *
2669    * Set the NTP time from the sender reports as the running-time on the
2670    * buffers. When both the sender and receiver have sychronized
2671    * running-time, i.e. when the clock and base-time is shared
2672    * between the receivers and the and the senders, this option can be
2673    * used to synchronize receivers on multiple machines.
2674    */
2675   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2676       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2677           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2678           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2679
2680   /**
2681    * GstRtpBin:rtcp-sync:
2682    *
2683    * If not synchronizing (directly) to the NTP clock, determines how to sync
2684    * the various streams.
2685    */
2686   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2687       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2688           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2689           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2690
2691   /**
2692    * GstRtpBin:rtcp-sync-interval:
2693    *
2694    * Determines how often to sync streams using RTCP data.
2695    */
2696   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2697       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2698           "RTCP SR interval synchronization (ms) (0 = always)",
2699           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2700           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2701
2702   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2703       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2704           "Send event downstream when a stream is synchronized to the sender",
2705           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2706
2707   /**
2708    * GstRtpBin:do-retransmission:
2709    *
2710    * Enables RTP retransmission on all streams. To control retransmission on
2711    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2712    * set the #GstRtpJitterBuffer:do-retransmission property on the
2713    * #GstRtpJitterBuffer object instead.
2714    */
2715   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2716       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2717           "Enable retransmission on all streams",
2718           DEFAULT_DO_RETRANSMISSION,
2719           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2720
2721   /**
2722    * GstRtpBin:rtp-profile:
2723    *
2724    * Sets the default RTP profile of newly created RTP sessions. The
2725    * profile can be changed afterwards on a per-session basis.
2726    */
2727   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
2728       g_param_spec_enum ("rtp-profile", "RTP Profile",
2729           "Default RTP profile of newly created sessions",
2730           GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
2731           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2732
2733   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
2734       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
2735           "NTP time source for RTCP packets",
2736           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
2737           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2738
2739   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
2740       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
2741           "Use send time or capture time for RTCP sync "
2742           "(TRUE = send time, FALSE = capture time)",
2743           DEFAULT_RTCP_SYNC_SEND_TIME,
2744           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2745
2746   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
2747       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
2748           "Maximum amount of time in ms that the RTP time in RTCP SRs "
2749           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
2750           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
2751           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2752
2753   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
2754       g_param_spec_uint ("max-dropout-time", "Max dropout time",
2755           "The maximum time (milliseconds) of missing packets tolerated.",
2756           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
2757           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2758
2759   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
2760       g_param_spec_uint ("max-misorder-time", "Max misorder time",
2761           "The maximum time (milliseconds) of misordered packets tolerated.",
2762           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
2763           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2764
2765   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
2766       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
2767           "Synchronize received streams to the RFC7273 clock "
2768           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
2769           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2770
2771   /**
2772    * GstRtpBin:add-reference-timestamp-meta:
2773    *
2774    * When syncing to a RFC7273 clock, add #GstReferenceTimestampMeta
2775    * to buffers with the original reconstructed reference clock timestamp.
2776    *
2777    * Since: 1.22
2778    */
2779   g_object_class_install_property (gobject_class,
2780       PROP_ADD_REFERENCE_TIMESTAMP_META,
2781       g_param_spec_boolean ("add-reference-timestamp-meta",
2782           "Add Reference Timestamp Meta",
2783           "Add Reference Timestamp Meta to buffers with the original clock timestamp "
2784           "before any adjustments when syncing to an RFC7273 clock.",
2785           DEFAULT_ADD_REFERENCE_TIMESTAMP_META,
2786           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2787
2788   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
2789       g_param_spec_uint ("max-streams", "Max Streams",
2790           "The maximum number of streams to create for one session",
2791           0, G_MAXUINT, DEFAULT_MAX_STREAMS,
2792           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2793
2794   /**
2795    * GstRtpBin:max-ts-offset-adjustment:
2796    *
2797    * Syncing time stamps to NTP time adds a time offset. This parameter
2798    * specifies the maximum number of nanoseconds per frame that this time offset
2799    * may be adjusted with. This is used to avoid sudden large changes to time
2800    * stamps.
2801    *
2802    * Since: 1.14
2803    */
2804   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
2805       g_param_spec_uint64 ("max-ts-offset-adjustment",
2806           "Max Timestamp Offset Adjustment",
2807           "The maximum number of nanoseconds per frame that time stamp offsets "
2808           "may be adjusted (0 = no limit).", 0, G_MAXUINT64,
2809           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
2810           G_PARAM_STATIC_STRINGS));
2811
2812   /**
2813    * GstRtpBin:max-ts-offset:
2814    *
2815    * Used to set an upper limit of how large a time offset may be. This
2816    * is used to protect against unrealistic values as a result of either
2817    * client,server or clock issues.
2818    *
2819    * Since: 1.14
2820    */
2821   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET,
2822       g_param_spec_int64 ("max-ts-offset", "Max TS Offset",
2823           "The maximum absolute value of the time offset in (nanoseconds). "
2824           "Note, if the ntp-sync parameter is set the default value is "
2825           "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET,
2826           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2827
2828   /**
2829    * GstRtpBin:min-ts-offset:
2830    *
2831    * Used to set an lower limit for when a time offset is deemed large enough
2832    * to be useful for sync corrections.
2833    *
2834    * When streaming for instance audio, even very small ts_offsets cause
2835    * audible glitches. This property is used for controlling how sensitive the
2836    * adjustments should be to small deviations in ts_offset, occurring for
2837    * instance due to jittery network conditions or system load.
2838    *
2839    * Since: 1.22
2840    */
2841   g_object_class_install_property (gobject_class, PROP_MIN_TS_OFFSET,
2842       g_param_spec_uint64 ("min-ts-offset", "Min TS Offset",
2843           "The minimum absolute value of the time offset in (nanoseconds). "
2844           "Used to set an lower limit for when a time offset is deemed large "
2845           "enough to be useful for sync corrections."
2846           "Note, if the ntp-sync parameter is set the default value is "
2847           "changed to 0 (no limit)", 0, G_MAXUINT64, DEFAULT_MIN_TS_OFFSET,
2848           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2849
2850   /**
2851    * GstRtpBin:ts-offset-smoothing-factor:
2852    *
2853    * Controls the weighting between previous and current timestamp offsets in
2854    * a running moving average (RMA):
2855    * ts_offset_average(n) =
2856    *   ((ts-offset-smoothing-factor - 1) * ts_offset_average(n - 1) + ts_offset(n)) /
2857    *   ts-offset-smoothing-factor
2858    *
2859    * This can stabilize the timestamp offset and prevent unnecessary skew
2860    * corrections due to jitter introduced by network or system load.
2861    *
2862    * Since: 1.22
2863    */
2864   g_object_class_install_property (gobject_class,
2865       PROP_TS_OFFSET_SMOOTHING_FACTOR,
2866       g_param_spec_uint ("ts-offset-smoothing-factor",
2867           "Timestamp Offset Smoothing Factor",
2868           "Sets a smoothing factor for the timestamp offset in number of "
2869           "values for a calculated running moving average. "
2870           "(0 = no smoothing factor)", 0, G_MAXUINT,
2871           DEFAULT_TS_OFFSET_SMOOTHING_FACTOR,
2872           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2873
2874   /**
2875    * GstRtpBin:fec-decoders:
2876    *
2877    * Used to provide a factory used to build the FEC decoder for a
2878    * given session, as a command line alternative to
2879    * #GstRtpBin::request-fec-decoder.
2880    *
2881    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2882    *
2883    * Since: 1.20
2884    */
2885   g_object_class_install_property (gobject_class, PROP_FEC_DECODERS,
2886       g_param_spec_boxed ("fec-decoders", "Fec Decoders",
2887           "GstStructure mapping from session index to FEC decoder "
2888           "factory, eg "
2889           "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'",
2890           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2891
2892   /**
2893    * GstRtpBin:fec-encoders:
2894    *
2895    * Used to provide a factory used to build the FEC encoder for a
2896    * given session, as a command line alternative to
2897    * #GstRtpBin::request-fec-encoder.
2898    *
2899    * Expects a GstStructure in the form session_id (gint) -> factory (string)
2900    *
2901    * Since: 1.20
2902    */
2903   g_object_class_install_property (gobject_class, PROP_FEC_ENCODERS,
2904       g_param_spec_boxed ("fec-encoders", "Fec Encoders",
2905           "GstStructure mapping from session index to FEC encoder "
2906           "factory, eg "
2907           "fec-encoders='fec,0=\"rtpst2022-1-fecenc\\ rows\\=5\\ columns\\=5\";'",
2908           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2909
2910   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2911   gstelement_class->request_new_pad =
2912       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2913   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2914
2915   /* sink pads */
2916   gst_element_class_add_static_pad_template (gstelement_class,
2917       &rtpbin_recv_rtp_sink_template);
2918   gst_element_class_add_static_pad_template (gstelement_class,
2919       &rtpbin_recv_fec_sink_template);
2920   gst_element_class_add_static_pad_template (gstelement_class,
2921       &rtpbin_recv_rtcp_sink_template);
2922   gst_element_class_add_static_pad_template (gstelement_class,
2923       &rtpbin_send_rtp_sink_template);
2924
2925   /* src pads */
2926   gst_element_class_add_static_pad_template (gstelement_class,
2927       &rtpbin_recv_rtp_src_template);
2928   gst_element_class_add_static_pad_template (gstelement_class,
2929       &rtpbin_send_rtcp_src_template);
2930   gst_element_class_add_static_pad_template (gstelement_class,
2931       &rtpbin_send_rtp_src_template);
2932   gst_element_class_add_static_pad_template (gstelement_class,
2933       &rtpbin_send_fec_src_template);
2934
2935   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2936       "Filter/Network/RTP",
2937       "Real-Time Transport Protocol bin",
2938       "Wim Taymans <wim.taymans@gmail.com>");
2939
2940   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2941
2942   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2943   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2944   klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session);
2945   klass->get_internal_session =
2946       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2947   klass->get_storage = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_storage);
2948   klass->get_internal_storage =
2949       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_storage);
2950   klass->clear_ssrc = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_ssrc);
2951   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2952   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2953   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2954   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2955   klass->request_jitterbuffer =
2956       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_jitterbuffer);
2957
2958   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2959
2960   gst_type_mark_as_plugin_api (GST_RTP_BIN_RTCP_SYNC_TYPE, 0);
2961 }
2962
2963 static void
2964 gst_rtp_bin_init (GstRtpBin * rtpbin)
2965 {
2966   gchar *cname;
2967
2968   rtpbin->priv = gst_rtp_bin_get_instance_private (rtpbin);
2969   g_mutex_init (&rtpbin->priv->bin_lock);
2970   g_mutex_init (&rtpbin->priv->dyn_lock);
2971
2972   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2973   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2974   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2975   rtpbin->do_lost = DEFAULT_DO_LOST;
2976   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2977   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2978   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2979   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2980   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2981   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2982   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2983   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2984   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2985   rtpbin->rtp_profile = DEFAULT_RTP_PROFILE;
2986   rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
2987   rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
2988   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
2989   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
2990   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
2991   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
2992   rtpbin->add_reference_timestamp_meta = DEFAULT_ADD_REFERENCE_TIMESTAMP_META;
2993   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
2994   rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
2995   rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
2996   rtpbin->max_ts_offset_is_set = FALSE;
2997   rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
2998   rtpbin->min_ts_offset_is_set = FALSE;
2999   rtpbin->ts_offset_smoothing_factor = DEFAULT_TS_OFFSET_SMOOTHING_FACTOR;
3000
3001   /* some default SDES entries */
3002   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
3003   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
3004       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
3005   rtpbin->fec_decoders =
3006       gst_structure_new_empty ("application/x-rtp-fec-decoders");
3007   rtpbin->fec_encoders =
3008       gst_structure_new_empty ("application/x-rtp-fec-encoders");
3009   g_free (cname);
3010 }
3011
3012 static void
3013 gst_rtp_bin_dispose (GObject * object)
3014 {
3015   GstRtpBin *rtpbin;
3016
3017   rtpbin = GST_RTP_BIN (object);
3018
3019   GST_RTP_BIN_LOCK (rtpbin);
3020   GST_DEBUG_OBJECT (object, "freeing sessions");
3021   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
3022   g_slist_free (rtpbin->sessions);
3023   rtpbin->sessions = NULL;
3024   GST_RTP_BIN_UNLOCK (rtpbin);
3025
3026   G_OBJECT_CLASS (parent_class)->dispose (object);
3027 }
3028
3029 static void
3030 gst_rtp_bin_finalize (GObject * object)
3031 {
3032   GstRtpBin *rtpbin;
3033
3034   rtpbin = GST_RTP_BIN (object);
3035
3036   if (rtpbin->sdes)
3037     gst_structure_free (rtpbin->sdes);
3038
3039   if (rtpbin->fec_decoders)
3040     gst_structure_free (rtpbin->fec_decoders);
3041
3042   if (rtpbin->fec_encoders)
3043     gst_structure_free (rtpbin->fec_encoders);
3044
3045   g_mutex_clear (&rtpbin->priv->bin_lock);
3046   g_mutex_clear (&rtpbin->priv->dyn_lock);
3047
3048   G_OBJECT_CLASS (parent_class)->finalize (object);
3049 }
3050
3051
3052 static void
3053 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
3054 {
3055   GSList *item;
3056
3057   if (sdes == NULL)
3058     return;
3059
3060   GST_RTP_BIN_LOCK (bin);
3061
3062   GST_OBJECT_LOCK (bin);
3063   if (bin->sdes)
3064     gst_structure_free (bin->sdes);
3065   bin->sdes = gst_structure_copy (sdes);
3066   GST_OBJECT_UNLOCK (bin);
3067
3068   /* store in all sessions */
3069   for (item = bin->sessions; item; item = g_slist_next (item)) {
3070     GstRtpBinSession *session = item->data;
3071     g_object_set (session->session, "sdes", sdes, NULL);
3072   }
3073
3074   GST_RTP_BIN_UNLOCK (bin);
3075 }
3076
3077 static void
3078 gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin,
3079     const GstStructure * decoders)
3080 {
3081   if (decoders == NULL)
3082     return;
3083
3084   GST_RTP_BIN_LOCK (bin);
3085
3086   GST_OBJECT_LOCK (bin);
3087   if (bin->fec_decoders)
3088     gst_structure_free (bin->fec_decoders);
3089   bin->fec_decoders = gst_structure_copy (decoders);
3090
3091   GST_OBJECT_UNLOCK (bin);
3092
3093   GST_RTP_BIN_UNLOCK (bin);
3094 }
3095
3096 static void
3097 gst_rtp_bin_set_fec_encoders_struct (GstRtpBin * bin,
3098     const GstStructure * encoders)
3099 {
3100   if (encoders == NULL)
3101     return;
3102
3103   GST_RTP_BIN_LOCK (bin);
3104
3105   GST_OBJECT_LOCK (bin);
3106   if (bin->fec_encoders)
3107     gst_structure_free (bin->fec_encoders);
3108   bin->fec_encoders = gst_structure_copy (encoders);
3109
3110   GST_OBJECT_UNLOCK (bin);
3111
3112   GST_RTP_BIN_UNLOCK (bin);
3113 }
3114
3115 static GstStructure *
3116 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
3117 {
3118   GstStructure *result;
3119
3120   GST_OBJECT_LOCK (bin);
3121   result = gst_structure_copy (bin->sdes);
3122   GST_OBJECT_UNLOCK (bin);
3123
3124   return result;
3125 }
3126
3127 static GstStructure *
3128 gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin)
3129 {
3130   GstStructure *result;
3131
3132   GST_OBJECT_LOCK (bin);
3133   result = gst_structure_copy (bin->fec_decoders);
3134   GST_OBJECT_UNLOCK (bin);
3135
3136   return result;
3137 }
3138
3139 static GstStructure *
3140 gst_rtp_bin_get_fec_encoders_struct (GstRtpBin * bin)
3141 {
3142   GstStructure *result;
3143
3144   GST_OBJECT_LOCK (bin);
3145   result = gst_structure_copy (bin->fec_encoders);
3146   GST_OBJECT_UNLOCK (bin);
3147
3148   return result;
3149 }
3150
3151 static void
3152 gst_rtp_bin_set_property (GObject * object, guint prop_id,
3153     const GValue * value, GParamSpec * pspec)
3154 {
3155   GstRtpBin *rtpbin;
3156
3157   rtpbin = GST_RTP_BIN (object);
3158
3159   switch (prop_id) {
3160     case PROP_LATENCY:
3161       GST_RTP_BIN_LOCK (rtpbin);
3162       rtpbin->latency_ms = g_value_get_uint (value);
3163       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
3164       GST_RTP_BIN_UNLOCK (rtpbin);
3165       /* propagate the property down to the jitterbuffer */
3166       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
3167       break;
3168     case PROP_DROP_ON_LATENCY:
3169       GST_RTP_BIN_LOCK (rtpbin);
3170       rtpbin->drop_on_latency = g_value_get_boolean (value);
3171       GST_RTP_BIN_UNLOCK (rtpbin);
3172       /* propagate the property down to the jitterbuffer */
3173       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3174           "drop-on-latency", value);
3175       break;
3176     case PROP_SDES:
3177       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
3178       break;
3179     case PROP_DO_LOST:
3180       GST_RTP_BIN_LOCK (rtpbin);
3181       rtpbin->do_lost = g_value_get_boolean (value);
3182       GST_RTP_BIN_UNLOCK (rtpbin);
3183       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
3184       break;
3185     case PROP_NTP_SYNC:
3186       rtpbin->ntp_sync = g_value_get_boolean (value);
3187       /* The default value of max_ts_offset depends on ntp_sync. If user
3188        * hasn't set it then change default value */
3189       if (!rtpbin->max_ts_offset_is_set) {
3190         if (rtpbin->ntp_sync) {
3191           rtpbin->max_ts_offset = 0;
3192         } else {
3193           rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
3194         }
3195       }
3196       if (!rtpbin->min_ts_offset_is_set) {
3197         if (rtpbin->ntp_sync) {
3198           rtpbin->min_ts_offset = 0;
3199         } else {
3200           rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
3201         }
3202       }
3203       break;
3204     case PROP_RTCP_SYNC:
3205       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
3206       break;
3207     case PROP_RTCP_SYNC_INTERVAL:
3208       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
3209       break;
3210     case PROP_IGNORE_PT:
3211       rtpbin->ignore_pt = g_value_get_boolean (value);
3212       break;
3213     case PROP_AUTOREMOVE:
3214       rtpbin->priv->autoremove = g_value_get_boolean (value);
3215       break;
3216     case PROP_USE_PIPELINE_CLOCK:
3217     {
3218       GSList *sessions;
3219       GST_RTP_BIN_LOCK (rtpbin);
3220       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
3221       for (sessions = rtpbin->sessions; sessions;
3222           sessions = g_slist_next (sessions)) {
3223         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3224
3225         g_object_set (G_OBJECT (session->session),
3226             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
3227       }
3228       GST_RTP_BIN_UNLOCK (rtpbin);
3229     }
3230       break;
3231     case PROP_DO_SYNC_EVENT:
3232       rtpbin->send_sync_event = g_value_get_boolean (value);
3233       break;
3234     case PROP_BUFFER_MODE:
3235       GST_RTP_BIN_LOCK (rtpbin);
3236       rtpbin->buffer_mode = g_value_get_enum (value);
3237       GST_RTP_BIN_UNLOCK (rtpbin);
3238       /* propagate the property down to the jitterbuffer */
3239       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
3240       break;
3241     case PROP_DO_RETRANSMISSION:
3242       GST_RTP_BIN_LOCK (rtpbin);
3243       rtpbin->do_retransmission = g_value_get_boolean (value);
3244       GST_RTP_BIN_UNLOCK (rtpbin);
3245       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3246           "do-retransmission", value);
3247       break;
3248     case PROP_RTP_PROFILE:
3249       rtpbin->rtp_profile = g_value_get_enum (value);
3250       break;
3251     case PROP_NTP_TIME_SOURCE:{
3252       GSList *sessions;
3253       GST_RTP_BIN_LOCK (rtpbin);
3254       rtpbin->ntp_time_source = g_value_get_enum (value);
3255       for (sessions = rtpbin->sessions; sessions;
3256           sessions = g_slist_next (sessions)) {
3257         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3258
3259         g_object_set (G_OBJECT (session->session),
3260             "ntp-time-source", rtpbin->ntp_time_source, NULL);
3261       }
3262       GST_RTP_BIN_UNLOCK (rtpbin);
3263       break;
3264     }
3265     case PROP_RTCP_SYNC_SEND_TIME:{
3266       GSList *sessions;
3267       GST_RTP_BIN_LOCK (rtpbin);
3268       rtpbin->rtcp_sync_send_time = g_value_get_boolean (value);
3269       for (sessions = rtpbin->sessions; sessions;
3270           sessions = g_slist_next (sessions)) {
3271         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3272
3273         g_object_set (G_OBJECT (session->session),
3274             "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL);
3275       }
3276       GST_RTP_BIN_UNLOCK (rtpbin);
3277       break;
3278     }
3279     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3280       GST_RTP_BIN_LOCK (rtpbin);
3281       rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value);
3282       GST_RTP_BIN_UNLOCK (rtpbin);
3283       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3284           "max-rtcp-rtp-time-diff", value);
3285       break;
3286     case PROP_MAX_DROPOUT_TIME:
3287       GST_RTP_BIN_LOCK (rtpbin);
3288       rtpbin->max_dropout_time = g_value_get_uint (value);
3289       GST_RTP_BIN_UNLOCK (rtpbin);
3290       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3291           "max-dropout-time", value);
3292       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
3293           value);
3294       break;
3295     case PROP_MAX_MISORDER_TIME:
3296       GST_RTP_BIN_LOCK (rtpbin);
3297       rtpbin->max_misorder_time = g_value_get_uint (value);
3298       GST_RTP_BIN_UNLOCK (rtpbin);
3299       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3300           "max-misorder-time", value);
3301       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time",
3302           value);
3303       break;
3304     case PROP_RFC7273_SYNC:
3305       rtpbin->rfc7273_sync = g_value_get_boolean (value);
3306       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3307           "rfc7273-sync", value);
3308       break;
3309     case PROP_ADD_REFERENCE_TIMESTAMP_META:
3310       rtpbin->add_reference_timestamp_meta = g_value_get_boolean (value);
3311       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3312           "add-reference-timestamp-meta", value);
3313       break;
3314     case PROP_MAX_STREAMS:
3315       rtpbin->max_streams = g_value_get_uint (value);
3316       break;
3317     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3318       rtpbin->max_ts_offset_adjustment = g_value_get_uint64 (value);
3319       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
3320           "max-ts-offset-adjustment", value);
3321       break;
3322     case PROP_MAX_TS_OFFSET:
3323       rtpbin->max_ts_offset = g_value_get_int64 (value);
3324       rtpbin->max_ts_offset_is_set = TRUE;
3325       break;
3326     case PROP_MIN_TS_OFFSET:
3327       rtpbin->min_ts_offset = g_value_get_uint64 (value);
3328       rtpbin->min_ts_offset_is_set = TRUE;
3329       break;
3330     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3331       rtpbin->ts_offset_smoothing_factor = g_value_get_uint (value);
3332       break;
3333     case PROP_FEC_DECODERS:
3334       gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value));
3335       break;
3336     case PROP_FEC_ENCODERS:
3337       gst_rtp_bin_set_fec_encoders_struct (rtpbin, g_value_get_boxed (value));
3338       break;
3339     default:
3340       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3341       break;
3342   }
3343 }
3344
3345 static void
3346 gst_rtp_bin_get_property (GObject * object, guint prop_id,
3347     GValue * value, GParamSpec * pspec)
3348 {
3349   GstRtpBin *rtpbin;
3350
3351   rtpbin = GST_RTP_BIN (object);
3352
3353   switch (prop_id) {
3354     case PROP_LATENCY:
3355       GST_RTP_BIN_LOCK (rtpbin);
3356       g_value_set_uint (value, rtpbin->latency_ms);
3357       GST_RTP_BIN_UNLOCK (rtpbin);
3358       break;
3359     case PROP_DROP_ON_LATENCY:
3360       GST_RTP_BIN_LOCK (rtpbin);
3361       g_value_set_boolean (value, rtpbin->drop_on_latency);
3362       GST_RTP_BIN_UNLOCK (rtpbin);
3363       break;
3364     case PROP_SDES:
3365       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
3366       break;
3367     case PROP_DO_LOST:
3368       GST_RTP_BIN_LOCK (rtpbin);
3369       g_value_set_boolean (value, rtpbin->do_lost);
3370       GST_RTP_BIN_UNLOCK (rtpbin);
3371       break;
3372     case PROP_IGNORE_PT:
3373       g_value_set_boolean (value, rtpbin->ignore_pt);
3374       break;
3375     case PROP_NTP_SYNC:
3376       g_value_set_boolean (value, rtpbin->ntp_sync);
3377       break;
3378     case PROP_RTCP_SYNC:
3379       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
3380       break;
3381     case PROP_RTCP_SYNC_INTERVAL:
3382       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
3383       break;
3384     case PROP_AUTOREMOVE:
3385       g_value_set_boolean (value, rtpbin->priv->autoremove);
3386       break;
3387     case PROP_BUFFER_MODE:
3388       g_value_set_enum (value, rtpbin->buffer_mode);
3389       break;
3390     case PROP_USE_PIPELINE_CLOCK:
3391       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
3392       break;
3393     case PROP_DO_SYNC_EVENT:
3394       g_value_set_boolean (value, rtpbin->send_sync_event);
3395       break;
3396     case PROP_DO_RETRANSMISSION:
3397       GST_RTP_BIN_LOCK (rtpbin);
3398       g_value_set_boolean (value, rtpbin->do_retransmission);
3399       GST_RTP_BIN_UNLOCK (rtpbin);
3400       break;
3401     case PROP_RTP_PROFILE:
3402       g_value_set_enum (value, rtpbin->rtp_profile);
3403       break;
3404     case PROP_NTP_TIME_SOURCE:
3405       g_value_set_enum (value, rtpbin->ntp_time_source);
3406       break;
3407     case PROP_RTCP_SYNC_SEND_TIME:
3408       g_value_set_boolean (value, rtpbin->rtcp_sync_send_time);
3409       break;
3410     case PROP_MAX_RTCP_RTP_TIME_DIFF:
3411       GST_RTP_BIN_LOCK (rtpbin);
3412       g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff);
3413       GST_RTP_BIN_UNLOCK (rtpbin);
3414       break;
3415     case PROP_MAX_DROPOUT_TIME:
3416       g_value_set_uint (value, rtpbin->max_dropout_time);
3417       break;
3418     case PROP_MAX_MISORDER_TIME:
3419       g_value_set_uint (value, rtpbin->max_misorder_time);
3420       break;
3421     case PROP_RFC7273_SYNC:
3422       g_value_set_boolean (value, rtpbin->rfc7273_sync);
3423       break;
3424     case PROP_ADD_REFERENCE_TIMESTAMP_META:
3425       g_value_set_boolean (value, rtpbin->add_reference_timestamp_meta);
3426       break;
3427     case PROP_MAX_STREAMS:
3428       g_value_set_uint (value, rtpbin->max_streams);
3429       break;
3430     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
3431       g_value_set_uint64 (value, rtpbin->max_ts_offset_adjustment);
3432       break;
3433     case PROP_MAX_TS_OFFSET:
3434       g_value_set_int64 (value, rtpbin->max_ts_offset);
3435       break;
3436     case PROP_MIN_TS_OFFSET:
3437       g_value_set_uint64 (value, rtpbin->min_ts_offset);
3438       break;
3439     case PROP_TS_OFFSET_SMOOTHING_FACTOR:
3440       g_value_set_uint (value, rtpbin->ts_offset_smoothing_factor);
3441       break;
3442     case PROP_FEC_DECODERS:
3443       g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin));
3444       break;
3445     case PROP_FEC_ENCODERS:
3446       g_value_take_boxed (value, gst_rtp_bin_get_fec_encoders_struct (rtpbin));
3447       break;
3448     default:
3449       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3450       break;
3451   }
3452 }
3453
3454 static void
3455 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
3456 {
3457   GstRtpBin *rtpbin;
3458
3459   rtpbin = GST_RTP_BIN (bin);
3460
3461   switch (GST_MESSAGE_TYPE (message)) {
3462     case GST_MESSAGE_ELEMENT:
3463     {
3464       const GstStructure *s = gst_message_get_structure (message);
3465
3466       /* we change the structure name and add the session ID to it */
3467       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
3468         GstRtpBinSession *sess;
3469
3470         /* find the session we set it as object data */
3471         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3472             "GstRTPBin.session");
3473
3474         if (G_LIKELY (sess)) {
3475           message = gst_message_make_writable (message);
3476           s = gst_message_get_structure (message);
3477           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
3478               sess->id, NULL);
3479         }
3480       }
3481       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3482       break;
3483     }
3484     case GST_MESSAGE_BUFFERING:
3485     {
3486       gint percent;
3487       gint min_percent = 100;
3488       GSList *sessions, *streams;
3489       GstRtpBinStream *stream;
3490       gboolean change = FALSE, active = FALSE;
3491       GstClockTime min_out_time;
3492       GstBufferingMode mode;
3493       gint avg_in, avg_out;
3494       gint64 buffering_left;
3495
3496       gst_message_parse_buffering (message, &percent);
3497       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
3498           &buffering_left);
3499
3500       stream =
3501           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
3502           "GstRTPBin.stream");
3503
3504       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
3505
3506       /* get the stream */
3507       if (G_LIKELY (stream)) {
3508         GST_RTP_BIN_LOCK (rtpbin);
3509         /* fill in the percent */
3510         stream->percent = percent;
3511
3512         /* calculate the min value for all streams */
3513         for (sessions = rtpbin->sessions; sessions;
3514             sessions = g_slist_next (sessions)) {
3515           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3516
3517           GST_RTP_SESSION_LOCK (session);
3518           if (session->streams) {
3519             for (streams = session->streams; streams;
3520                 streams = g_slist_next (streams)) {
3521               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3522
3523               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
3524                   stream->percent);
3525
3526               /* find min percent */
3527               if (min_percent > stream->percent)
3528                 min_percent = stream->percent;
3529             }
3530           } else {
3531             GST_INFO_OBJECT (bin,
3532                 "session has no streams, setting min_percent to 0");
3533             min_percent = 0;
3534           }
3535           GST_RTP_SESSION_UNLOCK (session);
3536         }
3537         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
3538
3539         if (rtpbin->buffering) {
3540           if (min_percent == 100) {
3541             rtpbin->buffering = FALSE;
3542             active = TRUE;
3543             change = TRUE;
3544           }
3545         } else {
3546           if (min_percent < 100) {
3547             /* pause the streams */
3548             rtpbin->buffering = TRUE;
3549             active = FALSE;
3550             change = TRUE;
3551           }
3552         }
3553         GST_RTP_BIN_UNLOCK (rtpbin);
3554
3555         gst_message_unref (message);
3556
3557         /* make a new buffering message with the min value */
3558         message =
3559             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
3560         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
3561             buffering_left);
3562
3563         if (G_UNLIKELY (change)) {
3564           GstClock *clock;
3565           guint64 running_time = 0;
3566           guint64 offset = 0;
3567
3568           /* figure out the running time when we have a clock */
3569           if (G_LIKELY ((clock =
3570                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
3571             guint64 now, base_time;
3572
3573             now = gst_clock_get_time (clock);
3574             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
3575             running_time = now - base_time;
3576             gst_object_unref (clock);
3577           }
3578           GST_DEBUG_OBJECT (bin,
3579               "running time now %" GST_TIME_FORMAT,
3580               GST_TIME_ARGS (running_time));
3581
3582           GST_RTP_BIN_LOCK (rtpbin);
3583
3584           /* when we reactivate, calculate the offsets so that all streams have
3585            * an output time that is at least as big as the running_time */
3586           offset = 0;
3587           if (active) {
3588             if (running_time > rtpbin->buffer_start) {
3589               offset = running_time - rtpbin->buffer_start;
3590               if (offset >= rtpbin->latency_ns)
3591                 offset -= rtpbin->latency_ns;
3592               else
3593                 offset = 0;
3594             }
3595           }
3596
3597           /* pause all streams */
3598           min_out_time = -1;
3599           for (sessions = rtpbin->sessions; sessions;
3600               sessions = g_slist_next (sessions)) {
3601             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3602
3603             GST_RTP_SESSION_LOCK (session);
3604             for (streams = session->streams; streams;
3605                 streams = g_slist_next (streams)) {
3606               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3607               GstElement *element = stream->buffer;
3608               guint64 last_out = -1;
3609
3610               if (g_signal_lookup ("set-active", G_OBJECT_TYPE (element)) != 0) {
3611                 g_signal_emit_by_name (element, "set-active", active, offset,
3612                     &last_out);
3613               }
3614
3615               if (!active) {
3616                 g_object_get (element, "percent", &stream->percent, NULL);
3617
3618                 if (last_out == -1)
3619                   last_out = 0;
3620                 if (min_out_time == -1 || last_out < min_out_time)
3621                   min_out_time = last_out;
3622               }
3623
3624               GST_DEBUG_OBJECT (bin,
3625                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
3626                   GST_TIME_FORMAT ", percent %d", element, active,
3627                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
3628                   stream->percent);
3629             }
3630             GST_RTP_SESSION_UNLOCK (session);
3631           }
3632           GST_DEBUG_OBJECT (bin,
3633               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
3634
3635           /* the buffer_start is the min out time of all paused jitterbuffers */
3636           if (!active)
3637             rtpbin->buffer_start = min_out_time;
3638
3639           GST_RTP_BIN_UNLOCK (rtpbin);
3640         }
3641       }
3642       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3643       break;
3644     }
3645     default:
3646     {
3647       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3648       break;
3649     }
3650   }
3651 }
3652
3653 static GstStateChangeReturn
3654 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
3655 {
3656   GstStateChangeReturn res;
3657   GstRtpBin *rtpbin;
3658   GstRtpBinPrivate *priv;
3659
3660   rtpbin = GST_RTP_BIN (element);
3661   priv = rtpbin->priv;
3662
3663   switch (transition) {
3664     case GST_STATE_CHANGE_NULL_TO_READY:
3665       break;
3666     case GST_STATE_CHANGE_READY_TO_PAUSED:
3667       priv->last_ntpnstime = 0;
3668       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
3669       g_atomic_int_set (&priv->shutdown, 0);
3670       break;
3671     case GST_STATE_CHANGE_PAUSED_TO_READY:
3672       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
3673       g_atomic_int_set (&priv->shutdown, 1);
3674       /* wait for all callbacks to end by taking the lock. No new callbacks will
3675        * be able to happen as we set the shutdown flag. */
3676       GST_RTP_BIN_DYN_LOCK (rtpbin);
3677       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
3678       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3679       break;
3680     default:
3681       break;
3682   }
3683
3684   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3685
3686   switch (transition) {
3687     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3688       break;
3689     case GST_STATE_CHANGE_PAUSED_TO_READY:
3690       break;
3691     case GST_STATE_CHANGE_READY_TO_NULL:
3692       break;
3693     default:
3694       break;
3695   }
3696   return res;
3697 }
3698
3699 static GstElement *
3700 session_request_element_full (GstRtpBinSession * session, guint signal,
3701     guint ssrc, guint8 pt)
3702 {
3703   GstElement *element = NULL;
3704   GstRtpBin *bin = session->bin;
3705
3706   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, ssrc, pt,
3707       &element);
3708
3709   if (element) {
3710     if (!bin_manage_element (bin, element))
3711       goto manage_failed;
3712     session->elements = g_slist_prepend (session->elements, element);
3713   }
3714   return element;
3715
3716   /* ERRORS */
3717 manage_failed:
3718   {
3719     GST_WARNING_OBJECT (bin, "unable to manage element");
3720     gst_object_unref (element);
3721     return NULL;
3722   }
3723 }
3724
3725 static GstElement *
3726 session_request_element (GstRtpBinSession * session, guint signal)
3727 {
3728   GstElement *element = NULL;
3729   GstRtpBin *bin = session->bin;
3730
3731   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
3732
3733   if (element) {
3734     if (!bin_manage_element (bin, element))
3735       goto manage_failed;
3736     session->elements = g_slist_prepend (session->elements, element);
3737   }
3738   return element;
3739
3740   /* ERRORS */
3741 manage_failed:
3742   {
3743     GST_WARNING_OBJECT (bin, "unable to manage element");
3744     gst_object_unref (element);
3745     return NULL;
3746   }
3747 }
3748
3749 static gboolean
3750 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
3751 {
3752   GstPad *gpad = GST_PAD_CAST (user_data);
3753
3754   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
3755   gst_pad_store_sticky_event (gpad, *event);
3756
3757   return TRUE;
3758 }
3759
3760 static gboolean
3761 ensure_early_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session)
3762 {
3763   const gchar *factory;
3764   gchar *sess_id_str;
3765
3766   if (session->early_fec_decoder)
3767     goto done;
3768
3769   sess_id_str = g_strdup_printf ("%u", session->id);
3770   factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str);
3771   g_free (sess_id_str);
3772
3773   /* First try the property */
3774   if (factory) {
3775     GError *err = NULL;
3776
3777     session->early_fec_decoder =
3778         gst_parse_bin_from_description_full (factory, TRUE, NULL,
3779         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
3780         &err);
3781     if (!session->early_fec_decoder) {
3782       GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s",
3783           err->message);
3784     }
3785
3786     bin_manage_element (session->bin, session->early_fec_decoder);
3787     session->elements =
3788         g_slist_prepend (session->elements, session->early_fec_decoder);
3789     GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT
3790         " for session %u", session->early_fec_decoder, session->id);
3791   }
3792
3793   /* Do not fallback to the signal as the signal expects a fec decoder to
3794    * be placed at a different place in the pipeline */
3795
3796 done:
3797   return session->early_fec_decoder != NULL;
3798 }
3799
3800 static void
3801 expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream,
3802     guint8 pt)
3803 {
3804   GstElementClass *klass;
3805   GstPadTemplate *templ;
3806   gchar *padname;
3807   GstPad *gpad;
3808
3809   gst_object_ref (pad);
3810
3811   if (stream->session->storage) {
3812     /* First try the legacy signal, with no ssrc and pt as parameters.
3813      * This will likely cause issues for the BUNDLE case. */
3814     GstElement *fec_decoder =
3815         session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER);
3816
3817     /* Now try the new signal, where the application can provide a FEC
3818      * decoder according to ssrc and pt. */
3819     if (!fec_decoder) {
3820       fec_decoder =
3821           session_request_element_full (stream->session,
3822           SIGNAL_REQUEST_FEC_DECODER_FULL, stream->ssrc, pt);
3823     }
3824
3825     if (fec_decoder) {
3826       GstPad *sinkpad, *srcpad;
3827       GstPadLinkReturn ret;
3828
3829       sinkpad = gst_element_get_static_pad (fec_decoder, "sink");
3830
3831       if (!sinkpad)
3832         goto fec_decoder_sink_failed;
3833
3834       ret = gst_pad_link (pad, sinkpad);
3835       gst_object_unref (sinkpad);
3836
3837       if (ret != GST_PAD_LINK_OK)
3838         goto fec_decoder_link_failed;
3839
3840       srcpad = gst_element_get_static_pad (fec_decoder, "src");
3841
3842       if (!srcpad)
3843         goto fec_decoder_src_failed;
3844
3845       gst_pad_sticky_events_foreach (pad, copy_sticky_events, srcpad);
3846       gst_object_unref (pad);
3847       pad = srcpad;
3848     }
3849   }
3850
3851   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3852
3853   /* ghost the pad to the parent */
3854   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3855   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3856   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3857       stream->session->id, stream->ssrc, pt);
3858   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3859   g_free (padname);
3860   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
3861
3862   gst_pad_set_active (gpad, TRUE);
3863   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3864
3865   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3866   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3867
3868 done:
3869   gst_object_unref (pad);
3870
3871   return;
3872
3873 shutdown:
3874   {
3875     GST_DEBUG ("ignoring, we are shutting down");
3876     goto done;
3877   }
3878 fec_decoder_sink_failed:
3879   {
3880     g_warning ("rtpbin: failed to get fec encoder sink pad for session %u",
3881         stream->session->id);
3882     goto done;
3883   }
3884 fec_decoder_src_failed:
3885   {
3886     g_warning ("rtpbin: failed to get fec encoder src pad for session %u",
3887         stream->session->id);
3888     goto done;
3889   }
3890 fec_decoder_link_failed:
3891   {
3892     g_warning ("rtpbin: failed to link fec decoder for session %u",
3893         stream->session->id);
3894     goto done;
3895   }
3896 }
3897
3898 /* a new pad (SSRC) was created in @session. This signal is emitted from the
3899  * payload demuxer. */
3900 static void
3901 new_payload_found (GstElement * element, guint pt, GstPad * pad,
3902     GstRtpBinStream * stream)
3903 {
3904   GstRtpBin *rtpbin;
3905
3906   rtpbin = stream->bin;
3907
3908   GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt);
3909
3910   expose_recv_src_pad (rtpbin, pad, stream, pt);
3911 }
3912
3913 static void
3914 payload_pad_removed (GstElement * element, GstPad * pad,
3915     GstRtpBinStream * stream)
3916 {
3917   GstRtpBin *rtpbin;
3918   GstPad *gpad;
3919
3920   rtpbin = stream->bin;
3921
3922   GST_DEBUG ("payload pad removed");
3923
3924   GST_RTP_BIN_DYN_LOCK (rtpbin);
3925   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
3926     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
3927
3928     gst_pad_set_active (gpad, FALSE);
3929     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3930   }
3931   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3932 }
3933
3934 static GstCaps *
3935 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
3936 {
3937   GstRtpBin *rtpbin;
3938   GstCaps *caps;
3939
3940   rtpbin = session->bin;
3941
3942   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt,
3943       session->id);
3944
3945   caps = get_pt_map (session, pt);
3946   if (!caps)
3947     goto no_caps;
3948
3949   return caps;
3950
3951   /* ERRORS */
3952 no_caps:
3953   {
3954     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
3955     return NULL;
3956   }
3957 }
3958
3959 static GstCaps *
3960 ptdemux_pt_map_requested (GstElement * element, guint pt,
3961     GstRtpBinSession * session)
3962 {
3963   GstCaps *ret = pt_map_requested (element, pt, session);
3964
3965   if (ret && gst_caps_get_size (ret) == 1) {
3966     const GstStructure *s = gst_caps_get_structure (ret, 0);
3967     gboolean is_fec;
3968
3969     if (gst_structure_get_boolean (s, "is-fec", &is_fec) && is_fec) {
3970       GValue v = G_VALUE_INIT;
3971       GValue v2 = G_VALUE_INIT;
3972
3973       GST_INFO_OBJECT (session->bin, "Will ignore FEC pt %u in session %u", pt,
3974           session->id);
3975       g_value_init (&v, GST_TYPE_ARRAY);
3976       g_value_init (&v2, G_TYPE_INT);
3977       g_object_get_property (G_OBJECT (element), "ignored-payload-types", &v);
3978       g_value_set_int (&v2, pt);
3979       gst_value_array_append_value (&v, &v2);
3980       g_value_unset (&v2);
3981       g_object_set_property (G_OBJECT (element), "ignored-payload-types", &v);
3982       g_value_unset (&v);
3983     }
3984   }
3985
3986   return ret;
3987 }
3988
3989 static void
3990 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
3991 {
3992   GST_DEBUG_OBJECT (session->bin,
3993       "emitting signal for pt type changed to %u in session %u", pt,
3994       session->id);
3995
3996   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
3997       0, session->id, pt);
3998 }
3999
4000 /* emitted when caps changed for the session */
4001 static void
4002 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
4003 {
4004   GstRtpBin *bin;
4005   GstCaps *caps;
4006   gint payload;
4007   const GstStructure *s;
4008
4009   bin = session->bin;
4010
4011   g_object_get (pad, "caps", &caps, NULL);
4012
4013   if (caps == NULL)
4014     return;
4015
4016   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
4017
4018   s = gst_caps_get_structure (caps, 0);
4019
4020   /* get payload, finish when it's not there */
4021   if (!gst_structure_get_int (s, "payload", &payload)) {
4022     gst_caps_unref (caps);
4023     return;
4024   }
4025
4026   GST_RTP_SESSION_LOCK (session);
4027   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
4028   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
4029   GST_RTP_SESSION_UNLOCK (session);
4030 }
4031
4032 /* a new pad (SSRC) was created in @session */
4033 static void
4034 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
4035     GstRtpBinSession * session)
4036 {
4037   GstRtpBin *rtpbin;
4038   GstRtpBinStream *stream;
4039   GstPad *sinkpad, *srcpad;
4040   gchar *padname;
4041
4042   rtpbin = session->bin;
4043
4044   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
4045       GST_DEBUG_PAD_NAME (pad));
4046
4047   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
4048
4049   GST_RTP_SESSION_LOCK (session);
4050
4051   /* create new stream */
4052   stream = create_stream (session, ssrc);
4053   if (!stream)
4054     goto no_stream;
4055
4056   /* get pad and link */
4057   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
4058   padname = g_strdup_printf ("src_%u", ssrc);
4059   srcpad = gst_element_get_static_pad (element, padname);
4060   g_free (padname);
4061
4062   if (session->early_fec_decoder) {
4063     GST_DEBUG_OBJECT (rtpbin, "linking fec decoder");
4064     sinkpad = gst_element_get_static_pad (session->early_fec_decoder, "sink");
4065     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4066     gst_object_unref (sinkpad);
4067     gst_object_unref (srcpad);
4068     srcpad = gst_element_get_static_pad (session->early_fec_decoder, "src");
4069   }
4070
4071   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
4072   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4073   gst_object_unref (sinkpad);
4074   gst_object_unref (srcpad);
4075
4076   sinkpad = gst_element_request_pad_simple (stream->buffer, "sink_rtcp");
4077   if (sinkpad) {
4078     GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
4079     padname = g_strdup_printf ("rtcp_src_%u", ssrc);
4080     srcpad = gst_element_get_static_pad (element, padname);
4081     g_free (padname);
4082     gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
4083     gst_object_unref (sinkpad);
4084     gst_object_unref (srcpad);
4085   }
4086
4087   if (g_signal_lookup ("handle-sync", G_OBJECT_TYPE (stream->buffer)) != 0) {
4088     /* connect to the RTCP sync signal from the jitterbuffer */
4089     GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
4090     stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
4091         "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
4092   }
4093
4094   if (stream->demux) {
4095     /* connect to the new-pad signal of the payload demuxer, this will expose the
4096      * new pad by ghosting it. */
4097     stream->demux_newpad_sig = g_signal_connect (stream->demux,
4098         "new-payload-type", (GCallback) new_payload_found, stream);
4099     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
4100         "pad-removed", (GCallback) payload_pad_removed, stream);
4101
4102     /* connect to the request-pt-map signal. This signal will be emitted by the
4103      * demuxer so that it can apply a proper caps on the buffers for the
4104      * depayloaders. */
4105     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
4106         "request-pt-map", (GCallback) ptdemux_pt_map_requested, session);
4107     /* connect to the  signal so it can be forwarded. */
4108     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
4109         "payload-type-change", (GCallback) payload_type_change, session);
4110
4111     GST_RTP_SESSION_UNLOCK (session);
4112     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4113   } else {
4114     /* add rtpjitterbuffer src pad to pads */
4115     GstPad *pad;
4116
4117     pad = gst_element_get_static_pad (stream->buffer, "src");
4118
4119     GST_RTP_SESSION_UNLOCK (session);
4120     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4121
4122     expose_recv_src_pad (rtpbin, pad, stream, 255);
4123
4124     gst_object_unref (pad);
4125   }
4126
4127   return;
4128
4129   /* ERRORS */
4130 shutdown:
4131   {
4132     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
4133     return;
4134   }
4135 no_stream:
4136   {
4137     GST_RTP_SESSION_UNLOCK (session);
4138     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
4139     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
4140     return;
4141   }
4142 }
4143
4144 static GstPad *
4145 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session)
4146 {
4147   guint sessid = session->id;
4148   GstPad *recv_rtp_sink;
4149   GstElement *decoder;
4150
4151   g_assert (!session->recv_rtp_sink);
4152
4153   /* get recv_rtp pad and store */
4154   session->recv_rtp_sink =
4155       gst_element_request_pad_simple (session->session, "recv_rtp_sink");
4156   if (session->recv_rtp_sink == NULL)
4157     goto pad_failed;
4158
4159   g_signal_connect (session->recv_rtp_sink, "notify::caps",
4160       (GCallback) caps_changed, session);
4161
4162   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
4163   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
4164   if (decoder) {
4165     GstPad *decsrc, *decsink;
4166     GstPadLinkReturn ret;
4167
4168     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
4169     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
4170     if (decsink == NULL)
4171       goto dec_sink_failed;
4172
4173     recv_rtp_sink = decsink;
4174
4175     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
4176     if (decsrc == NULL)
4177       goto dec_src_failed;
4178
4179     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
4180
4181     gst_object_unref (decsrc);
4182
4183     if (ret != GST_PAD_LINK_OK)
4184       goto dec_link_failed;
4185
4186   } else {
4187     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
4188     recv_rtp_sink = gst_object_ref (session->recv_rtp_sink);
4189   }
4190
4191   return recv_rtp_sink;
4192
4193   /* ERRORS */
4194 pad_failed:
4195   {
4196     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
4197     return NULL;
4198   }
4199 dec_sink_failed:
4200   {
4201     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4202     return NULL;
4203   }
4204 dec_src_failed:
4205   {
4206     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4207     gst_object_unref (recv_rtp_sink);
4208     return NULL;
4209   }
4210 dec_link_failed:
4211   {
4212     g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid);
4213     gst_object_unref (recv_rtp_sink);
4214     return NULL;
4215   }
4216 }
4217
4218 static void
4219 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
4220     guint sessid)
4221 {
4222   GstElement *aux;
4223   GstPad *recv_rtp_src;
4224
4225   g_assert (!session->recv_rtp_src);
4226
4227   session->recv_rtp_src =
4228       gst_element_get_static_pad (session->session, "recv_rtp_src");
4229   if (session->recv_rtp_src == NULL)
4230     goto pad_failed;
4231
4232   /* find out if we need AUX elements */
4233   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
4234   if (aux) {
4235     gchar *pname;
4236     GstPad *auxsink;
4237     GstPadLinkReturn ret;
4238
4239     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
4240
4241     pname = g_strdup_printf ("sink_%u", sessid);
4242     auxsink = gst_element_get_static_pad (aux, pname);
4243     g_free (pname);
4244     if (auxsink == NULL)
4245       goto aux_sink_failed;
4246
4247     ret = gst_pad_link (session->recv_rtp_src, auxsink);
4248     gst_object_unref (auxsink);
4249     if (ret != GST_PAD_LINK_OK)
4250       goto aux_link_failed;
4251
4252     /* this can be NULL when this AUX element is not to be linked any further */
4253     pname = g_strdup_printf ("src_%u", sessid);
4254     recv_rtp_src = gst_element_get_static_pad (aux, pname);
4255     g_free (pname);
4256   } else {
4257     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
4258   }
4259
4260   /* Add a storage element if needed */
4261   if (recv_rtp_src && session->storage) {
4262     GstPadLinkReturn ret;
4263     GstPad *sinkpad = gst_element_get_static_pad (session->storage, "sink");
4264
4265     ret = gst_pad_link (recv_rtp_src, sinkpad);
4266
4267     gst_object_unref (sinkpad);
4268     gst_object_unref (recv_rtp_src);
4269
4270     if (ret != GST_PAD_LINK_OK)
4271       goto storage_link_failed;
4272
4273     recv_rtp_src = gst_element_get_static_pad (session->storage, "src");
4274   }
4275
4276   if (recv_rtp_src) {
4277     GstPad *sinkdpad;
4278
4279     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
4280     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
4281     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
4282     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4283     gst_object_unref (sinkdpad);
4284     gst_object_unref (recv_rtp_src);
4285
4286     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
4287     session->demux_newpad_sig = g_signal_connect (session->demux,
4288         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
4289     session->demux_padremoved_sig = g_signal_connect (session->demux,
4290         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
4291   }
4292
4293   return;
4294
4295 pad_failed:
4296   {
4297     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
4298     return;
4299   }
4300 aux_sink_failed:
4301   {
4302     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4303     return;
4304   }
4305 aux_link_failed:
4306   {
4307     g_warning ("rtpbin: failed to link AUX pad to session %u", sessid);
4308     return;
4309   }
4310 storage_link_failed:
4311   {
4312     g_warning ("rtpbin: failed to link storage");
4313     return;
4314   }
4315 }
4316
4317 /* Create a pad for receiving RTP for the session in @name. Must be called with
4318  * RTP_BIN_LOCK.
4319  */
4320 static GstPad *
4321 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4322 {
4323   guint sessid;
4324   GstRtpBinSession *session;
4325   GstPad *recv_rtp_sink;
4326
4327   /* first get the session number */
4328   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
4329     goto no_name;
4330
4331   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4332
4333   /* get or create session */
4334   session = find_session_by_id (rtpbin, sessid);
4335   if (!session) {
4336     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4337     /* create session now */
4338     session = create_session (rtpbin, sessid);
4339     if (session == NULL)
4340       goto create_error;
4341   }
4342
4343   /* check if pad was requested */
4344   if (session->recv_rtp_sink_ghost != NULL)
4345     return session->recv_rtp_sink_ghost;
4346
4347   /* setup the session sink pad */
4348   recv_rtp_sink = complete_session_sink (rtpbin, session);
4349   if (!recv_rtp_sink)
4350     goto session_sink_failed;
4351
4352   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
4353   session->recv_rtp_sink_ghost =
4354       gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ);
4355   gst_object_unref (recv_rtp_sink);
4356   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
4357   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
4358
4359   complete_session_receiver (rtpbin, session, sessid);
4360
4361   return session->recv_rtp_sink_ghost;
4362
4363   /* ERRORS */
4364 no_name:
4365   {
4366     g_warning ("rtpbin: cannot find session id for pad: %s",
4367         GST_STR_NULL (name));
4368     return NULL;
4369   }
4370 create_error:
4371   {
4372     /* create_session already warned */
4373     return NULL;
4374   }
4375 session_sink_failed:
4376   {
4377     /* warning already done */
4378     return NULL;
4379   }
4380 }
4381
4382 static void
4383 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4384 {
4385   if (session->demux_newpad_sig) {
4386     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
4387     session->demux_newpad_sig = 0;
4388   }
4389   if (session->demux_padremoved_sig) {
4390     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
4391     session->demux_padremoved_sig = 0;
4392   }
4393   if (session->recv_rtp_src) {
4394     gst_object_unref (session->recv_rtp_src);
4395     session->recv_rtp_src = NULL;
4396   }
4397   if (session->recv_rtp_sink) {
4398     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
4399     gst_object_unref (session->recv_rtp_sink);
4400     session->recv_rtp_sink = NULL;
4401   }
4402   if (session->recv_rtp_sink_ghost) {
4403     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
4404     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4405         session->recv_rtp_sink_ghost);
4406     session->recv_rtp_sink_ghost = NULL;
4407   }
4408 }
4409
4410 static gint
4411 fec_sinkpad_find (const GValue * item, gchar * padname)
4412 {
4413   GstPad *pad = g_value_get_object (item);
4414   return g_strcmp0 (GST_PAD_NAME (pad), padname);
4415 }
4416
4417 static GstPad *
4418 complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
4419     guint fec_idx)
4420 {
4421   gboolean have_static_pad;
4422   gchar *padname;
4423
4424   GstPad *ret;
4425   GstIterator *it;
4426   GValue item = { 0, };
4427
4428   if (!ensure_early_fec_decoder (rtpbin, session))
4429     goto no_decoder;
4430
4431   padname = g_strdup_printf ("fec_%u", fec_idx);
4432
4433   GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad %s", padname);
4434
4435   /* First try to find the decoder static pad that matches the padname */
4436   it = gst_element_iterate_sink_pads (session->early_fec_decoder);
4437   have_static_pad =
4438       gst_iterator_find_custom (it, (GCompareFunc) fec_sinkpad_find, &item,
4439       padname);
4440
4441   if (have_static_pad) {
4442     ret = g_value_get_object (&item);
4443     gst_object_ref (ret);
4444     g_value_unset (&item);
4445   } else {
4446     ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
4447   }
4448
4449   g_free (padname);
4450   gst_iterator_free (it);
4451
4452   if (ret == NULL)
4453     goto pad_failed;
4454
4455   session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret);
4456
4457   return ret;
4458
4459 pad_failed:
4460   {
4461     g_warning ("rtpbin: failed to get decoder fec pad");
4462     return NULL;
4463   }
4464 no_decoder:
4465   {
4466     g_warning ("rtpbin: failed to build FEC decoder for session %u",
4467         session->id);
4468     return NULL;
4469   }
4470 }
4471
4472 static GstPad *
4473 complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
4474     guint sessid)
4475 {
4476   GstElement *decoder;
4477   GstPad *sinkdpad;
4478   GstPad *decsink = NULL;
4479
4480   /* get recv_rtp pad and store */
4481   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
4482   session->recv_rtcp_sink =
4483       gst_element_request_pad_simple (session->session, "recv_rtcp_sink");
4484   if (session->recv_rtcp_sink == NULL)
4485     goto pad_failed;
4486
4487   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
4488   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
4489   if (decoder) {
4490     GstPad *decsrc;
4491     GstPadLinkReturn ret;
4492
4493     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
4494     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
4495     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
4496
4497     if (decsink == NULL)
4498       goto dec_sink_failed;
4499
4500     if (decsrc == NULL)
4501       goto dec_src_failed;
4502
4503     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
4504
4505     gst_object_unref (decsrc);
4506
4507     if (ret != GST_PAD_LINK_OK)
4508       goto dec_link_failed;
4509   } else {
4510     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
4511     decsink = gst_object_ref (session->recv_rtcp_sink);
4512   }
4513
4514   /* get srcpad, link to SSRCDemux */
4515   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
4516   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
4517   if (session->sync_src == NULL)
4518     goto src_pad_failed;
4519
4520   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
4521   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
4522   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
4523   gst_object_unref (sinkdpad);
4524
4525   return decsink;
4526
4527 pad_failed:
4528   {
4529     g_warning ("rtpbin: failed to get session rtcp_sink pad");
4530     return NULL;
4531   }
4532 dec_sink_failed:
4533   {
4534     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
4535     return NULL;
4536   }
4537 dec_src_failed:
4538   {
4539     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
4540     goto cleanup;
4541   }
4542 dec_link_failed:
4543   {
4544     g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid);
4545     goto cleanup;
4546   }
4547 src_pad_failed:
4548   {
4549     g_warning ("rtpbin: failed to get session sync_src pad");
4550   }
4551
4552 cleanup:
4553   gst_object_unref (decsink);
4554   return NULL;
4555 }
4556
4557 /* Create a pad for receiving RTCP for the session in @name. Must be called with
4558  * RTP_BIN_LOCK.
4559  */
4560 static GstPad *
4561 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
4562     const gchar * name)
4563 {
4564   guint sessid;
4565   GstRtpBinSession *session;
4566   GstPad *decsink = NULL;
4567
4568   /* first get the session number */
4569   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
4570     goto no_name;
4571
4572   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4573
4574   /* get or create the session */
4575   session = find_session_by_id (rtpbin, sessid);
4576   if (!session) {
4577     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4578     /* create session now */
4579     session = create_session (rtpbin, sessid);
4580     if (session == NULL)
4581       goto create_error;
4582   }
4583
4584   /* check if pad was requested */
4585   if (session->recv_rtcp_sink_ghost != NULL)
4586     return session->recv_rtcp_sink_ghost;
4587
4588   decsink = complete_session_rtcp (rtpbin, session, sessid);
4589   if (!decsink)
4590     goto create_error;
4591
4592   session->recv_rtcp_sink_ghost =
4593       gst_ghost_pad_new_from_template (name, decsink, templ);
4594   gst_object_unref (decsink);
4595   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
4596   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
4597       session->recv_rtcp_sink_ghost);
4598
4599   return session->recv_rtcp_sink_ghost;
4600
4601   /* ERRORS */
4602 no_name:
4603   {
4604     g_warning ("rtpbin: cannot find session id for pad: %s",
4605         GST_STR_NULL (name));
4606     return NULL;
4607   }
4608 create_error:
4609   {
4610     /* create_session already warned */
4611     return NULL;
4612   }
4613 }
4614
4615 static GstPad *
4616 create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4617 {
4618   guint sessid, fec_idx;
4619   GstRtpBinSession *session;
4620   GstPad *decsink = NULL;
4621   GstPad *ghost;
4622
4623   /* first get the session number */
4624   if (name == NULL
4625       || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2)
4626     goto no_name;
4627
4628   if (fec_idx > 1)
4629     goto invalid_idx;
4630
4631   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
4632
4633   /* get or create the session */
4634   session = find_session_by_id (rtpbin, sessid);
4635   if (!session) {
4636     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4637     /* create session now */
4638     session = create_session (rtpbin, sessid);
4639     if (session == NULL)
4640       goto create_error;
4641   }
4642
4643   decsink = complete_session_fec (rtpbin, session, fec_idx);
4644   if (!decsink)
4645     goto create_error;
4646
4647   ghost = gst_ghost_pad_new_from_template (name, decsink, templ);
4648   session->recv_fec_sink_ghosts =
4649       g_slist_prepend (session->recv_fec_sink_ghosts, ghost);
4650   gst_object_unref (decsink);
4651   gst_pad_set_active (ghost, TRUE);
4652   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4653
4654   return ghost;
4655
4656   /* ERRORS */
4657 no_name:
4658   {
4659     g_warning ("rtpbin: cannot find session id for pad: %s",
4660         GST_STR_NULL (name));
4661     return NULL;
4662   }
4663 invalid_idx:
4664   {
4665     g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name));
4666     return NULL;
4667   }
4668 create_error:
4669   {
4670     /* create_session already warned */
4671     return NULL;
4672   }
4673 }
4674
4675 static void
4676 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4677 {
4678   if (session->recv_rtcp_sink_ghost) {
4679     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
4680     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4681         session->recv_rtcp_sink_ghost);
4682     session->recv_rtcp_sink_ghost = NULL;
4683   }
4684   if (session->sync_src) {
4685     /* releasing the request pad should also unref the sync pad */
4686     gst_object_unref (session->sync_src);
4687     session->sync_src = NULL;
4688   }
4689   if (session->recv_rtcp_sink) {
4690     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
4691     gst_object_unref (session->recv_rtcp_sink);
4692     session->recv_rtcp_sink = NULL;
4693   }
4694 }
4695
4696 static void
4697 remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
4698     GstPad * ghost)
4699 {
4700   GSList *item;
4701   GstPad *target;
4702
4703   target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
4704
4705   if (target) {
4706     item = g_slist_find (session->recv_fec_sinks, target);
4707     if (item) {
4708       GstPadTemplate *templ;
4709       GstPad *pad;
4710
4711       pad = item->data;
4712       templ = gst_pad_get_pad_template (pad);
4713
4714       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_REQUEST) {
4715         GST_DEBUG_OBJECT (rtpbin,
4716             "Releasing FEC decoder pad %" GST_PTR_FORMAT, pad);
4717         gst_element_release_request_pad (session->early_fec_decoder, pad);
4718       } else {
4719         gst_object_unref (pad);
4720       }
4721
4722       session->recv_fec_sinks =
4723           g_slist_delete_link (session->recv_fec_sinks, item);
4724
4725       gst_object_unref (templ);
4726     }
4727     gst_object_unref (target);
4728   }
4729
4730   item = g_slist_find (session->recv_fec_sink_ghosts, ghost);
4731   if (item)
4732     session->recv_fec_sink_ghosts =
4733         g_slist_delete_link (session->recv_fec_sink_ghosts, item);
4734
4735   gst_pad_set_active (ghost, FALSE);
4736   gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
4737 }
4738
4739 static void
4740 remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
4741 {
4742   GSList *copy;
4743   GSList *tmp;
4744
4745   copy = g_slist_copy (session->recv_fec_sink_ghosts);
4746
4747   for (tmp = copy; tmp; tmp = tmp->next) {
4748     remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data);
4749   }
4750
4751   g_slist_free (copy);
4752 }
4753
4754 static gboolean
4755 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
4756 {
4757   gchar *gname;
4758   guint sessid = session->id;
4759   GstPad *send_rtp_src;
4760   GstElement *encoder;
4761   GstElementClass *klass;
4762   GstPadTemplate *templ;
4763   gboolean ret = FALSE;
4764
4765   /* get srcpad */
4766   send_rtp_src = gst_element_get_static_pad (session->session, "send_rtp_src");
4767
4768   if (send_rtp_src == NULL)
4769     goto no_srcpad;
4770
4771   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
4772   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
4773   if (encoder) {
4774     gchar *ename;
4775     GstPad *encsrc, *encsink;
4776     GstPadLinkReturn ret;
4777
4778     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
4779     ename = g_strdup_printf ("rtp_src_%u", sessid);
4780     encsrc = gst_element_get_static_pad (encoder, ename);
4781     g_free (ename);
4782
4783     if (encsrc == NULL)
4784       goto enc_src_failed;
4785
4786     ename = g_strdup_printf ("rtp_sink_%u", sessid);
4787     encsink = gst_element_get_static_pad (encoder, ename);
4788     g_free (ename);
4789     if (encsink == NULL)
4790       goto enc_sink_failed;
4791
4792     ret = gst_pad_link (send_rtp_src, encsink);
4793     gst_object_unref (encsink);
4794     gst_object_unref (send_rtp_src);
4795
4796     send_rtp_src = encsrc;
4797
4798     if (ret != GST_PAD_LINK_OK)
4799       goto enc_link_failed;
4800   } else {
4801     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
4802   }
4803
4804   /* ghost the new source pad */
4805   klass = GST_ELEMENT_GET_CLASS (rtpbin);
4806   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
4807   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
4808   session->send_rtp_src_ghost =
4809       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
4810   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
4811   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
4812       session->send_rtp_src_ghost);
4813   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
4814   g_free (gname);
4815
4816   ret = TRUE;
4817
4818 done:
4819   if (send_rtp_src)
4820     gst_object_unref (send_rtp_src);
4821
4822   return ret;
4823
4824   /* ERRORS */
4825 no_srcpad:
4826   {
4827     g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid);
4828     goto done;
4829   }
4830 enc_src_failed:
4831   {
4832     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4833         " src pad for session %u", encoder, sessid);
4834     goto done;
4835   }
4836 enc_sink_failed:
4837   {
4838     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
4839         " sink pad for session %u", encoder, sessid);
4840     goto done;
4841   }
4842 enc_link_failed:
4843   {
4844     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
4845         encoder, sessid);
4846     goto done;
4847   }
4848 }
4849
4850 static gboolean
4851 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
4852 {
4853   GstPad *pad;
4854   gchar *name;
4855   guint sessid;
4856   GstRtpBinSession *session = user_data, *newsess;
4857   GstRtpBin *rtpbin = session->bin;
4858   GstPadLinkReturn ret;
4859
4860   pad = g_value_get_object (item);
4861   name = gst_pad_get_name (pad);
4862
4863   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
4864     goto no_name;
4865
4866   g_free (name);
4867
4868   newsess = find_session_by_id (rtpbin, sessid);
4869   if (newsess == NULL) {
4870     /* create new session */
4871     newsess = create_session (rtpbin, sessid);
4872     if (newsess == NULL)
4873       goto create_error;
4874   } else if (newsess->send_rtp_sink != NULL)
4875     goto existing_session;
4876
4877   /* get send_rtp pad and store */
4878   newsess->send_rtp_sink =
4879       gst_element_request_pad_simple (newsess->session, "send_rtp_sink");
4880   if (newsess->send_rtp_sink == NULL)
4881     goto pad_failed;
4882
4883   ret = gst_pad_link (pad, newsess->send_rtp_sink);
4884   if (ret != GST_PAD_LINK_OK)
4885     goto aux_link_failed;
4886
4887   if (!complete_session_src (rtpbin, newsess))
4888     goto session_src_failed;
4889
4890   return TRUE;
4891
4892   /* ERRORS */
4893 no_name:
4894   {
4895     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
4896     g_free (name);
4897     return TRUE;
4898   }
4899 create_error:
4900   {
4901     /* create_session already warned */
4902     return FALSE;
4903   }
4904 existing_session:
4905   {
4906     GST_DEBUG_OBJECT (rtpbin,
4907         "skipping src_%i setup, since it is already configured.", sessid);
4908     return TRUE;
4909   }
4910 pad_failed:
4911   {
4912     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4913     return FALSE;
4914   }
4915 aux_link_failed:
4916   {
4917     g_warning ("rtpbin: failed to link AUX for session %u", sessid);
4918     return FALSE;
4919   }
4920 session_src_failed:
4921   {
4922     g_warning ("rtpbin: failed to complete AUX for session %u", sessid);
4923     return FALSE;
4924   }
4925 }
4926
4927 static gboolean
4928 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
4929     GstElement * aux)
4930 {
4931   GstIterator *it;
4932   GValue result = { 0, };
4933   GstIteratorResult res;
4934
4935   it = gst_element_iterate_src_pads (aux);
4936   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
4937   gst_iterator_free (it);
4938
4939   return res == GST_ITERATOR_DONE;
4940 }
4941
4942 static void
4943 fec_encoder_add_pad_unlocked (GstPad * pad, GstRtpBinSession * session)
4944 {
4945   GstElementClass *klass;
4946   gchar *gname;
4947   GstPadTemplate *templ;
4948   guint fec_idx;
4949   GstPad *ghost;
4950
4951   if (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) != 1) {
4952     GST_WARNING_OBJECT (session->bin,
4953         "FEC encoder added pad with name not matching fec_%%u (%s)",
4954         GST_PAD_NAME (pad));
4955     goto done;
4956   }
4957
4958   GST_INFO_OBJECT (session->bin, "FEC encoder for session %u exposed new pad",
4959       session->id);
4960
4961   klass = GST_ELEMENT_GET_CLASS (session->bin);
4962   gname = g_strdup_printf ("send_fec_src_%u_%u", session->id, fec_idx);
4963   templ = gst_element_class_get_pad_template (klass, "send_fec_src_%u_%u");
4964   ghost = gst_ghost_pad_new_from_template (gname, pad, templ);
4965   session->send_fec_src_ghosts =
4966       g_slist_prepend (session->send_fec_src_ghosts, ghost);
4967   gst_pad_set_active (ghost, TRUE);
4968   gst_pad_sticky_events_foreach (pad, copy_sticky_events, ghost);
4969   gst_element_add_pad (GST_ELEMENT (session->bin), ghost);
4970   g_free (gname);
4971
4972 done:
4973   return;
4974 }
4975
4976 static void
4977 fec_encoder_add_pad (GstPad * pad, GstRtpBinSession * session)
4978 {
4979   GST_RTP_BIN_LOCK (session->bin);
4980   fec_encoder_add_pad_unlocked (pad, session);
4981   GST_RTP_BIN_UNLOCK (session->bin);
4982 }
4983
4984 static gint
4985 fec_srcpad_iterator_filter (const GValue * item, GValue * unused)
4986 {
4987   guint fec_idx;
4988   GstPad *pad = g_value_get_object (item);
4989   GstPadTemplate *templ = gst_pad_get_pad_template (pad);
4990
4991   gint have_static_pad =
4992       (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_ALWAYS) &&
4993       (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) == 1);
4994
4995   gst_object_unref (templ);
4996
4997   /* return 0 to retain pad in filtered iterator */
4998   return !have_static_pad;
4999 }
5000
5001 static void
5002 fec_srcpad_iterator_foreach (const GValue * item, GstRtpBinSession * session)
5003 {
5004   GstPad *pad = g_value_get_object (item);
5005   fec_encoder_add_pad_unlocked (pad, session);
5006 }
5007
5008 static void
5009 fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
5010     GstRtpBinSession * session)
5011 {
5012   fec_encoder_add_pad (pad, session);
5013 }
5014
5015 static GstElement *
5016 request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session,
5017     guint sessid)
5018 {
5019   GstElement *ret = NULL;
5020   const gchar *factory;
5021   gchar *sess_id_str;
5022
5023   sess_id_str = g_strdup_printf ("%u", sessid);
5024   factory = gst_structure_get_string (rtpbin->fec_encoders, sess_id_str);
5025   g_free (sess_id_str);
5026
5027   /* First try the property */
5028   if (factory) {
5029     GError *err = NULL;
5030
5031     ret =
5032         gst_parse_bin_from_description_full (factory, TRUE, NULL,
5033         GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS,
5034         &err);
5035     if (!ret) {
5036       GST_ERROR_OBJECT (rtpbin, "Failed to build encoder from factory: %s",
5037           err->message);
5038       goto done;
5039     }
5040
5041     bin_manage_element (session->bin, ret);
5042     session->elements = g_slist_prepend (session->elements, ret);
5043     GST_INFO_OBJECT (rtpbin, "Built FEC encoder: %" GST_PTR_FORMAT
5044         " for session %u", ret, sessid);
5045   }
5046
5047   /* Fallback to the signal */
5048   if (!ret)
5049     ret = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER);
5050
5051   if (ret) {
5052     /* First, add encoder pads that match fec_% template and are already present */
5053     GstIterator *it, *filter;
5054     GstIteratorResult it_ret = GST_ITERATOR_OK;
5055
5056     it = gst_element_iterate_src_pads (ret);
5057     filter =
5058         gst_iterator_filter (it, (GCompareFunc) fec_srcpad_iterator_filter,
5059         NULL);
5060
5061     while (it_ret == GST_ITERATOR_OK || it_ret == GST_ITERATOR_RESYNC) {
5062       it_ret =
5063           gst_iterator_foreach (filter,
5064           (GstIteratorForeachFunction) fec_srcpad_iterator_foreach, session);
5065
5066       if (it_ret == GST_ITERATOR_RESYNC)
5067         gst_iterator_resync (filter);
5068     }
5069
5070     gst_iterator_free (filter);
5071
5072     /* Finally, connect to pad-added signal if any of the encoder pads are
5073      * added later */
5074     g_signal_connect (ret, "pad-added", G_CALLBACK (fec_encoder_pad_added_cb),
5075         session);
5076   }
5077
5078 done:
5079   return ret;
5080 }
5081
5082 /* Create a pad for sending RTP for the session in @name. Must be called with
5083  * RTP_BIN_LOCK.
5084  */
5085 static GstPad *
5086 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
5087 {
5088   gchar *pname;
5089   guint sessid;
5090   GstPad *send_rtp_sink;
5091   GstElement *aux;
5092   GstElement *encoder;
5093   GstElement *prev = NULL;
5094   GstRtpBinSession *session;
5095
5096   /* first get the session number */
5097   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
5098     goto no_name;
5099
5100   /* get or create session */
5101   session = find_session_by_id (rtpbin, sessid);
5102   if (!session) {
5103     /* create session now */
5104     session = create_session (rtpbin, sessid);
5105     if (session == NULL)
5106       goto create_error;
5107   }
5108
5109   /* check if pad was requested */
5110   if (session->send_rtp_sink_ghost != NULL)
5111     return session->send_rtp_sink_ghost;
5112
5113   /* check if we are already using this session as a sender */
5114   if (session->send_rtp_sink != NULL)
5115     goto existing_session;
5116
5117   encoder = request_fec_encoder (rtpbin, session, sessid);
5118
5119   if (encoder) {
5120     GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder");
5121
5122     send_rtp_sink = gst_element_get_static_pad (encoder, "sink");
5123
5124     if (!send_rtp_sink)
5125       goto enc_sink_failed;
5126
5127     prev = encoder;
5128   }
5129
5130   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
5131   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
5132   if (aux) {
5133     GstPad *sinkpad;
5134     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
5135     if (!setup_aux_sender (rtpbin, session, aux))
5136       goto aux_session_failed;
5137
5138     pname = g_strdup_printf ("sink_%u", sessid);
5139     sinkpad = gst_element_get_static_pad (aux, pname);
5140     g_free (pname);
5141
5142     if (sinkpad == NULL)
5143       goto aux_sink_failed;
5144
5145     if (!prev) {
5146       send_rtp_sink = sinkpad;
5147     } else {
5148       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5149       GstPadLinkReturn ret;
5150
5151       ret = gst_pad_link (srcpad, sinkpad);
5152       gst_object_unref (srcpad);
5153       if (ret != GST_PAD_LINK_OK) {
5154         goto aux_link_failed;
5155       }
5156       gst_object_unref (sinkpad);
5157     }
5158     prev = aux;
5159   } else {
5160     /* get send_rtp pad and store */
5161     session->send_rtp_sink =
5162         gst_element_request_pad_simple (session->session, "send_rtp_sink");
5163     if (session->send_rtp_sink == NULL)
5164       goto pad_failed;
5165
5166     if (!complete_session_src (rtpbin, session))
5167       goto session_src_failed;
5168
5169     if (!prev) {
5170       send_rtp_sink = gst_object_ref (session->send_rtp_sink);
5171     } else {
5172       GstPad *srcpad = gst_element_get_static_pad (prev, "src");
5173       GstPadLinkReturn ret;
5174
5175       ret = gst_pad_link (srcpad, session->send_rtp_sink);
5176       gst_object_unref (srcpad);
5177       if (ret != GST_PAD_LINK_OK)
5178         goto session_link_failed;
5179     }
5180   }
5181
5182   session->send_rtp_sink_ghost =
5183       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
5184   gst_object_unref (send_rtp_sink);
5185   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
5186   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
5187
5188   return session->send_rtp_sink_ghost;
5189
5190   /* ERRORS */
5191 no_name:
5192   {
5193     g_warning ("rtpbin: cannot find session id for pad: %s",
5194         GST_STR_NULL (name));
5195     return NULL;
5196   }
5197 create_error:
5198   {
5199     /* create_session already warned */
5200     return NULL;
5201   }
5202 existing_session:
5203   {
5204     g_warning ("rtpbin: session %u is already in use", sessid);
5205     return NULL;
5206   }
5207 aux_session_failed:
5208   {
5209     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5210     return NULL;
5211   }
5212 aux_sink_failed:
5213   {
5214     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
5215     return NULL;
5216   }
5217 aux_link_failed:
5218   {
5219     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5220         aux, sessid);
5221     return NULL;
5222   }
5223 pad_failed:
5224   {
5225     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
5226     return NULL;
5227   }
5228 session_src_failed:
5229   {
5230     g_warning ("rtpbin: failed to setup source pads for session %u", sessid);
5231     return NULL;
5232   }
5233 session_link_failed:
5234   {
5235     g_warning ("rtpbin: failed to link %" GST_PTR_FORMAT " for session %u",
5236         session, sessid);
5237     return NULL;
5238   }
5239 enc_sink_failed:
5240   {
5241     g_warning ("rtpbin: failed to get %" GST_PTR_FORMAT
5242         " sink pad for session %u", encoder, sessid);
5243     return NULL;
5244   }
5245 }
5246
5247 static void
5248 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5249 {
5250   if (session->send_rtp_src_ghost) {
5251     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
5252     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5253         session->send_rtp_src_ghost);
5254     session->send_rtp_src_ghost = NULL;
5255   }
5256   if (session->send_rtp_sink) {
5257     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
5258         session->send_rtp_sink);
5259     gst_object_unref (session->send_rtp_sink);
5260     session->send_rtp_sink = NULL;
5261   }
5262   if (session->send_rtp_sink_ghost) {
5263     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
5264     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5265         session->send_rtp_sink_ghost);
5266     session->send_rtp_sink_ghost = NULL;
5267   }
5268 }
5269
5270 static void
5271 remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session)
5272 {
5273   GSList *tmp;
5274
5275   for (tmp = session->send_fec_src_ghosts; tmp; tmp = tmp->next) {
5276     GstPad *ghost = GST_PAD (tmp->data);
5277     gst_pad_set_active (ghost, FALSE);
5278     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost);
5279   }
5280
5281   g_slist_free (session->send_fec_src_ghosts);
5282   session->send_fec_src_ghosts = NULL;
5283 }
5284
5285 /* Create a pad for sending RTCP for the session in @name. Must be called with
5286  * RTP_BIN_LOCK.
5287  */
5288 static GstPad *
5289 create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
5290     const gchar * name)
5291 {
5292   guint sessid;
5293   GstPad *encsrc;
5294   GstElement *encoder;
5295   GstRtpBinSession *session;
5296
5297   /* first get the session number */
5298   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
5299     goto no_name;
5300
5301   /* get or create session */
5302   session = find_session_by_id (rtpbin, sessid);
5303   if (!session) {
5304     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
5305     /* create session now */
5306     session = create_session (rtpbin, sessid);
5307     if (session == NULL)
5308       goto create_error;
5309   }
5310
5311   /* check if pad was requested */
5312   if (session->send_rtcp_src_ghost != NULL)
5313     return session->send_rtcp_src_ghost;
5314
5315   /* get rtcp_src pad and store */
5316   session->send_rtcp_src =
5317       gst_element_request_pad_simple (session->session, "send_rtcp_src");
5318   if (session->send_rtcp_src == NULL)
5319     goto pad_failed;
5320
5321   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
5322   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
5323   if (encoder) {
5324     gchar *ename;
5325     GstPad *encsink;
5326     GstPadLinkReturn ret;
5327
5328     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
5329
5330     ename = g_strdup_printf ("rtcp_src_%u", sessid);
5331     encsrc = gst_element_get_static_pad (encoder, ename);
5332     g_free (ename);
5333     if (encsrc == NULL)
5334       goto enc_src_failed;
5335
5336     ename = g_strdup_printf ("rtcp_sink_%u", sessid);
5337     encsink = gst_element_get_static_pad (encoder, ename);
5338     g_free (ename);
5339     if (encsink == NULL)
5340       goto enc_sink_failed;
5341
5342     ret = gst_pad_link (session->send_rtcp_src, encsink);
5343     gst_object_unref (encsink);
5344
5345     if (ret != GST_PAD_LINK_OK)
5346       goto enc_link_failed;
5347   } else {
5348     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
5349     encsrc = gst_object_ref (session->send_rtcp_src);
5350   }
5351
5352   session->send_rtcp_src_ghost =
5353       gst_ghost_pad_new_from_template (name, encsrc, templ);
5354   gst_object_unref (encsrc);
5355   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
5356   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
5357
5358   return session->send_rtcp_src_ghost;
5359
5360   /* ERRORS */
5361 no_name:
5362   {
5363     g_warning ("rtpbin: cannot find session id for pad: %s",
5364         GST_STR_NULL (name));
5365     return NULL;
5366   }
5367 create_error:
5368   {
5369     /* create_session already warned */
5370     return NULL;
5371   }
5372 pad_failed:
5373   {
5374     g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid);
5375     return NULL;
5376   }
5377 enc_src_failed:
5378   {
5379     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
5380     return NULL;
5381   }
5382 enc_sink_failed:
5383   {
5384     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
5385     gst_object_unref (encsrc);
5386     return NULL;
5387   }
5388 enc_link_failed:
5389   {
5390     g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid);
5391     gst_object_unref (encsrc);
5392     return NULL;
5393   }
5394 }
5395
5396 static void
5397 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
5398 {
5399   if (session->send_rtcp_src_ghost) {
5400     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
5401     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
5402         session->send_rtcp_src_ghost);
5403     session->send_rtcp_src_ghost = NULL;
5404   }
5405   if (session->send_rtcp_src) {
5406     gst_element_release_request_pad (session->session, session->send_rtcp_src);
5407     gst_object_unref (session->send_rtcp_src);
5408     session->send_rtcp_src = NULL;
5409   }
5410 }
5411
5412 /* If the requested name is NULL we should create a name with
5413  * the session number assuming we want the lowest possible session
5414  * with a free pad like the template */
5415 static gchar *
5416 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
5417 {
5418   gboolean name_found = FALSE;
5419   gint session = 0;
5420   GstIterator *pad_it = NULL;
5421   gchar *pad_name = NULL;
5422   GValue data = { 0, };
5423
5424   GST_DEBUG_OBJECT (element, "find a free pad name for template");
5425   while (!name_found) {
5426     gboolean done = FALSE;
5427
5428     g_free (pad_name);
5429     pad_name = g_strdup_printf (templ->name_template, session++);
5430     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
5431     name_found = TRUE;
5432     while (!done) {
5433       switch (gst_iterator_next (pad_it, &data)) {
5434         case GST_ITERATOR_OK:
5435         {
5436           GstPad *pad;
5437           gchar *name;
5438
5439           pad = g_value_get_object (&data);
5440           name = gst_pad_get_name (pad);
5441
5442           if (strcmp (name, pad_name) == 0) {
5443             done = TRUE;
5444             name_found = FALSE;
5445           }
5446           g_free (name);
5447           g_value_reset (&data);
5448           break;
5449         }
5450         case GST_ITERATOR_ERROR:
5451         case GST_ITERATOR_RESYNC:
5452           /* restart iteration */
5453           done = TRUE;
5454           name_found = FALSE;
5455           session = 0;
5456           break;
5457         case GST_ITERATOR_DONE:
5458           done = TRUE;
5459           break;
5460       }
5461     }
5462     g_value_unset (&data);
5463     gst_iterator_free (pad_it);
5464   }
5465
5466   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
5467   return pad_name;
5468 }
5469
5470 /*
5471  */
5472 static GstPad *
5473 gst_rtp_bin_request_new_pad (GstElement * element,
5474     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
5475 {
5476   GstRtpBin *rtpbin;
5477   GstElementClass *klass;
5478   GstPad *result;
5479
5480   gchar *pad_name = NULL;
5481
5482   g_return_val_if_fail (templ != NULL, NULL);
5483   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
5484
5485   rtpbin = GST_RTP_BIN (element);
5486   klass = GST_ELEMENT_GET_CLASS (element);
5487
5488   GST_RTP_BIN_LOCK (rtpbin);
5489
5490   if (name == NULL) {
5491     /* use a free pad name */
5492     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
5493   } else {
5494     /* use the provided name */
5495     pad_name = g_strdup (name);
5496   }
5497
5498   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
5499
5500   /* figure out the template */
5501   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
5502     result = create_recv_rtp (rtpbin, templ, pad_name);
5503   } else if (templ == gst_element_class_get_pad_template (klass,
5504           "recv_rtcp_sink_%u")) {
5505     result = create_recv_rtcp (rtpbin, templ, pad_name);
5506   } else if (templ == gst_element_class_get_pad_template (klass,
5507           "send_rtp_sink_%u")) {
5508     result = create_send_rtp (rtpbin, templ, pad_name);
5509   } else if (templ == gst_element_class_get_pad_template (klass,
5510           "send_rtcp_src_%u")) {
5511     result = create_send_rtcp (rtpbin, templ, pad_name);
5512   } else if (templ == gst_element_class_get_pad_template (klass,
5513           "recv_fec_sink_%u_%u")) {
5514     result = create_recv_fec (rtpbin, templ, pad_name);
5515   } else
5516     goto wrong_template;
5517
5518   g_free (pad_name);
5519   GST_RTP_BIN_UNLOCK (rtpbin);
5520
5521   return result;
5522
5523   /* ERRORS */
5524 wrong_template:
5525   {
5526     g_free (pad_name);
5527     GST_RTP_BIN_UNLOCK (rtpbin);
5528     g_warning ("rtpbin: this is not our template");
5529     return NULL;
5530   }
5531 }
5532
5533 static void
5534 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
5535 {
5536   GstRtpBinSession *session;
5537   GstRtpBin *rtpbin;
5538
5539   g_return_if_fail (GST_IS_GHOST_PAD (pad));
5540   g_return_if_fail (GST_IS_RTP_BIN (element));
5541
5542   rtpbin = GST_RTP_BIN (element);
5543
5544   GST_RTP_BIN_LOCK (rtpbin);
5545   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
5546       GST_DEBUG_PAD_NAME (pad));
5547
5548   if (!(session = find_session_by_pad (rtpbin, pad)))
5549     goto unknown_pad;
5550
5551   if (session->recv_rtp_sink_ghost == pad) {
5552     remove_recv_rtp (rtpbin, session);
5553   } else if (session->recv_rtcp_sink_ghost == pad) {
5554     remove_recv_rtcp (rtpbin, session);
5555   } else if (session->send_rtp_sink_ghost == pad) {
5556     remove_send_rtp (rtpbin, session);
5557   } else if (session->send_rtcp_src_ghost == pad) {
5558     remove_rtcp (rtpbin, session);
5559   } else if (pad_is_recv_fec (session, pad)) {
5560     remove_recv_fec_for_pad (rtpbin, session, pad);
5561   }
5562
5563   /* no more request pads, free the complete session */
5564   if (session->recv_rtp_sink_ghost == NULL
5565       && session->recv_rtcp_sink_ghost == NULL
5566       && session->send_rtp_sink_ghost == NULL
5567       && session->send_rtcp_src_ghost == NULL
5568       && session->recv_fec_sink_ghosts == NULL) {
5569     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
5570     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
5571     free_session (session, rtpbin);
5572   }
5573   GST_RTP_BIN_UNLOCK (rtpbin);
5574
5575   return;
5576
5577   /* ERROR */
5578 unknown_pad:
5579   {
5580     GST_RTP_BIN_UNLOCK (rtpbin);
5581     g_warning ("rtpbin: %s:%s is not one of our request pads",
5582         GST_DEBUG_PAD_NAME (pad));
5583     return;
5584   }
5585 }