rtpbin: receive bundle support
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * #GstRtpBin can also demultiplex incoming bundled streams. The first
57  * #GstRtpSession will have a #GstRtpSsrcDemux element splitting the streams
58  * based on their SSRC and potentially dispatched to a different #GstRtpSession.
59  * Because retransmission SSRCs need to be merged with the corresponding media
60  * stream the #GstRtpBin::on-bundled-ssrc signal is emitted so that the
61  * application can find out to which session the SSRC belongs.
62  *
63  * The session manager needs the clock-rate of the payload types it is handling
64  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
65  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
66  * signal.
67  *
68  * Access to the internal statistics of rtpbin is provided with the
69  * get-internal-session property. This action signal gives access to the
70  * RTPSession object which further provides action signals to retrieve the
71  * internal source and other sources.
72  *
73  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
74  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
75  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
76  * and decoders in order to support SRTP. The encoders must provide the pads
77  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
78  * RTCP. The session number will be used in the pad name. The decoders must provide
79  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
80  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
81  * internally.
82  *
83  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
84  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
85  * used to create or merge additional RTP streams. AUX elements are needed to
86  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
87  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
88  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
89  * and the pad will be linked to the session send_rtp_sink pad. Each session will
90  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
91  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
92  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
93  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
94  *
95  * <refsect2>
96  * <title>Example pipelines</title>
97  * |[
98  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
99  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
100  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
101  * |[
102  * gst-launch-1.0 rtpbin name=rtpbin \
103  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
104  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
105  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
106  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
107  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
108  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
109  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
110  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
111  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
112  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
113  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
114  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
115  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
116  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
117  * is received on port 5007. Since RTCP packets from the sender should be sent
118  * as soon as possible and do not participate in preroll, sync=false and
119  * async=false is configured on udpsink
120  * |[
121  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
122  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
123  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
124  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
125  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
126  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
127  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
128  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
129  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
130  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
131  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
132  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
133  * decode and display the video.
134  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
135  * decode and play the audio.
136  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
137  * session 1 on port 5003. These packets will be used for session management and
138  * synchronisation.
139  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
140  * on port 5007.
141  * </refsect2>
142  */
143
144 #ifdef HAVE_CONFIG_H
145 #include "config.h"
146 #endif
147 #include <stdio.h>
148 #include <string.h>
149
150 #include <gst/rtp/gstrtpbuffer.h>
151 #include <gst/rtp/gstrtcpbuffer.h>
152
153 #include "gstrtpbin.h"
154 #include "rtpsession.h"
155 #include "gstrtpsession.h"
156 #include "gstrtpjitterbuffer.h"
157
158 #include <gst/glib-compat-private.h>
159
160 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
161 #define GST_CAT_DEFAULT gst_rtp_bin_debug
162
163 /* sink pads */
164 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
165     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
166     GST_PAD_SINK,
167     GST_PAD_REQUEST,
168     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
169     );
170
171 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
172     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
173     GST_PAD_SINK,
174     GST_PAD_REQUEST,
175     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
176     );
177
178 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS ("application/x-rtp")
183     );
184
185 /* src pads */
186 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
187 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
188     GST_PAD_SRC,
189     GST_PAD_SOMETIMES,
190     GST_STATIC_CAPS ("application/x-rtp")
191     );
192
193 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
194     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
195     GST_PAD_SRC,
196     GST_PAD_REQUEST,
197     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
198     );
199
200 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
201     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
202     GST_PAD_SRC,
203     GST_PAD_SOMETIMES,
204     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
205     );
206
207 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
208    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
209
210 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
211 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
212
213 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
214 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
215 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
216
217 /* lock for shutdown */
218 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
219 G_STMT_START {                                   \
220   if (g_atomic_int_get (&bin->priv->shutdown))   \
221     goto label;                                  \
222   GST_RTP_BIN_DYN_LOCK (bin);                    \
223   if (g_atomic_int_get (&bin->priv->shutdown)) { \
224     GST_RTP_BIN_DYN_UNLOCK (bin);                \
225     goto label;                                  \
226   }                                              \
227 } G_STMT_END
228
229 /* unlock for shutdown */
230 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
231   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
232
233 struct _GstRtpBinPrivate
234 {
235   GMutex bin_lock;
236
237   /* lock protecting dynamic adding/removing */
238   GMutex dyn_lock;
239
240   /* if we are shutting down or not */
241   gint shutdown;
242
243   gboolean autoremove;
244
245   /* NTP time in ns of last SR sync used */
246   guint64 last_ntpnstime;
247
248   /* list of extra elements */
249   GList *elements;
250 };
251
252 /* signals and args */
253 enum
254 {
255   SIGNAL_REQUEST_PT_MAP,
256   SIGNAL_PAYLOAD_TYPE_CHANGE,
257   SIGNAL_CLEAR_PT_MAP,
258   SIGNAL_RESET_SYNC,
259   SIGNAL_GET_SESSION,
260   SIGNAL_GET_INTERNAL_SESSION,
261
262   SIGNAL_ON_NEW_SSRC,
263   SIGNAL_ON_SSRC_COLLISION,
264   SIGNAL_ON_SSRC_VALIDATED,
265   SIGNAL_ON_SSRC_ACTIVE,
266   SIGNAL_ON_SSRC_SDES,
267   SIGNAL_ON_BYE_SSRC,
268   SIGNAL_ON_BYE_TIMEOUT,
269   SIGNAL_ON_TIMEOUT,
270   SIGNAL_ON_SENDER_TIMEOUT,
271   SIGNAL_ON_NPT_STOP,
272
273   SIGNAL_REQUEST_RTP_ENCODER,
274   SIGNAL_REQUEST_RTP_DECODER,
275   SIGNAL_REQUEST_RTCP_ENCODER,
276   SIGNAL_REQUEST_RTCP_DECODER,
277
278   SIGNAL_NEW_JITTERBUFFER,
279
280   SIGNAL_REQUEST_AUX_SENDER,
281   SIGNAL_REQUEST_AUX_RECEIVER,
282
283   SIGNAL_ON_NEW_SENDER_SSRC,
284   SIGNAL_ON_SENDER_SSRC_ACTIVE,
285
286   SIGNAL_ON_BUNDLED_SSRC,
287
288   LAST_SIGNAL
289 };
290
291 #define DEFAULT_LATENCY_MS           200
292 #define DEFAULT_DROP_ON_LATENCY      FALSE
293 #define DEFAULT_SDES                 NULL
294 #define DEFAULT_DO_LOST              FALSE
295 #define DEFAULT_IGNORE_PT            FALSE
296 #define DEFAULT_NTP_SYNC             FALSE
297 #define DEFAULT_AUTOREMOVE           FALSE
298 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
299 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
300 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
301 #define DEFAULT_RTCP_SYNC_INTERVAL   0
302 #define DEFAULT_DO_SYNC_EVENT        FALSE
303 #define DEFAULT_DO_RETRANSMISSION    FALSE
304 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
305 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
306 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
307 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
308 #define DEFAULT_MAX_DROPOUT_TIME     60000
309 #define DEFAULT_MAX_MISORDER_TIME    2000
310 #define DEFAULT_RFC7273_SYNC         FALSE
311 #define DEFAULT_MAX_STREAMS          G_MAXUINT
312
313 enum
314 {
315   PROP_0,
316   PROP_LATENCY,
317   PROP_DROP_ON_LATENCY,
318   PROP_SDES,
319   PROP_DO_LOST,
320   PROP_IGNORE_PT,
321   PROP_NTP_SYNC,
322   PROP_RTCP_SYNC,
323   PROP_RTCP_SYNC_INTERVAL,
324   PROP_AUTOREMOVE,
325   PROP_BUFFER_MODE,
326   PROP_USE_PIPELINE_CLOCK,
327   PROP_DO_SYNC_EVENT,
328   PROP_DO_RETRANSMISSION,
329   PROP_RTP_PROFILE,
330   PROP_NTP_TIME_SOURCE,
331   PROP_RTCP_SYNC_SEND_TIME,
332   PROP_MAX_RTCP_RTP_TIME_DIFF,
333   PROP_MAX_DROPOUT_TIME,
334   PROP_MAX_MISORDER_TIME,
335   PROP_RFC7273_SYNC,
336   PROP_MAX_STREAMS
337 };
338
339 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
340 static GType
341 gst_rtp_bin_rtcp_sync_get_type (void)
342 {
343   static GType rtcp_sync_type = 0;
344   static const GEnumValue rtcp_sync_types[] = {
345     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
346     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
347     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
348     {0, NULL, NULL},
349   };
350
351   if (!rtcp_sync_type) {
352     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
353   }
354   return rtcp_sync_type;
355 }
356
357 /* helper objects */
358 typedef struct _GstRtpBinSession GstRtpBinSession;
359 typedef struct _GstRtpBinStream GstRtpBinStream;
360 typedef struct _GstRtpBinClient GstRtpBinClient;
361
362 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
363
364 static GstCaps *pt_map_requested (GstElement * element, guint pt,
365     GstRtpBinSession * session);
366 static void payload_type_change (GstElement * element, guint pt,
367     GstRtpBinSession * session);
368 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
369 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
370 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
371 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
372 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
373 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
374 static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id);
375 static GstPad *complete_session_sink (GstRtpBin * rtpbin,
376     GstRtpBinSession * session, gboolean bundle_demuxer_needed);
377 static void
378 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
379     guint sessid);
380 static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
381     GstRtpBinSession * session, guint sessid, gboolean bundle_demuxer_needed);
382
383 /* Manages the RTP stream for one SSRC.
384  *
385  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
386  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
387  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
388  * together (see below).
389  */
390 struct _GstRtpBinStream
391 {
392   /* the SSRC of this stream */
393   guint32 ssrc;
394
395   /* parent bin */
396   GstRtpBin *bin;
397
398   /* the session this SSRC belongs to */
399   GstRtpBinSession *session;
400
401   /* the jitterbuffer of the SSRC */
402   GstElement *buffer;
403   gulong buffer_handlesync_sig;
404   gulong buffer_ptreq_sig;
405   gulong buffer_ntpstop_sig;
406   gint percent;
407
408   /* the PT demuxer of the SSRC */
409   GstElement *demux;
410   gulong demux_newpad_sig;
411   gulong demux_padremoved_sig;
412   gulong demux_ptreq_sig;
413   gulong demux_ptchange_sig;
414
415   /* if we have calculated a valid rt_delta for this stream */
416   gboolean have_sync;
417   /* mapping to local RTP and NTP time */
418   gint64 rt_delta;
419   gint64 rtp_delta;
420   /* base rtptime in gst time */
421   gint64 clock_base;
422 };
423
424 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
425 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
426
427 /* Manages the receiving end of the packets.
428  *
429  * There is one such structure for each RTP session (audio/video/...).
430  * We get the RTP/RTCP packets and stuff them into the session manager. From
431  * there they are pushed into an SSRC demuxer that splits the stream based on
432  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
433  * the GstRtpBinStream above).
434  */
435 struct _GstRtpBinSession
436 {
437   /* session id */
438   gint id;
439   /* the parent bin */
440   GstRtpBin *bin;
441   /* the session element */
442   GstElement *session;
443   /* the SSRC demuxer */
444   GstElement *demux;
445   gulong demux_newpad_sig;
446   gulong demux_padremoved_sig;
447
448   /* Bundling support */
449   GstElement *rtp_funnel;
450   GstElement *rtcp_funnel;
451   GstElement *bundle_demux;
452   gulong bundle_demux_newpad_sig;
453
454   GMutex lock;
455
456   /* list of GstRtpBinStream */
457   GSList *streams;
458
459   /* list of elements */
460   GSList *elements;
461
462   /* mapping of payload type to caps */
463   GHashTable *ptmap;
464
465   /* the pads of the session */
466   GstPad *recv_rtp_sink;
467   GstPad *recv_rtp_sink_ghost;
468   GstPad *recv_rtp_src;
469   GstPad *recv_rtcp_sink;
470   GstPad *recv_rtcp_sink_ghost;
471   GstPad *sync_src;
472   GstPad *send_rtp_sink;
473   GstPad *send_rtp_sink_ghost;
474   GstPad *send_rtp_src;
475   GstPad *send_rtp_src_ghost;
476   GstPad *send_rtcp_src;
477   GstPad *send_rtcp_src_ghost;
478 };
479
480 /* Manages the RTP streams that come from one client and should therefore be
481  * synchronized.
482  */
483 struct _GstRtpBinClient
484 {
485   /* the common CNAME for the streams */
486   gchar *cname;
487   guint cname_len;
488
489   /* the streams */
490   guint nstreams;
491   GSList *streams;
492 };
493
494 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
495 static GstRtpBinSession *
496 find_session_by_id (GstRtpBin * rtpbin, gint id)
497 {
498   GSList *walk;
499
500   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
501     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
502
503     if (sess->id == id)
504       return sess;
505   }
506   return NULL;
507 }
508
509 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
510 static GstRtpBinSession *
511 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
512 {
513   GSList *walk;
514
515   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
516     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
517
518     if ((sess->recv_rtp_sink_ghost == pad) ||
519         (sess->recv_rtcp_sink_ghost == pad) ||
520         (sess->send_rtp_sink_ghost == pad)
521         || (sess->send_rtcp_src_ghost == pad))
522       return sess;
523   }
524   return NULL;
525 }
526
527 static void
528 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
529 {
530   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
531       sess->id, ssrc);
532 }
533
534 static void
535 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
536 {
537   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
538       sess->id, ssrc);
539 }
540
541 static void
542 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
543 {
544   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
545       sess->id, ssrc);
546 }
547
548 static void
549 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
550 {
551   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
552       sess->id, ssrc);
553 }
554
555 static void
556 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
557 {
558   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
559       sess->id, ssrc);
560 }
561
562 static void
563 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
564 {
565   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
566       sess->id, ssrc);
567 }
568
569 static void
570 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
571 {
572   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
573       sess->id, ssrc);
574
575   if (sess->bin->priv->autoremove)
576     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
577 }
578
579 static void
580 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
581 {
582   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
583       sess->id, ssrc);
584
585   if (sess->bin->priv->autoremove)
586     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
587 }
588
589 static void
590 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
591 {
592   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
593       sess->id, ssrc);
594 }
595
596 static void
597 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
598 {
599   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
600       stream->session->id, stream->ssrc);
601 }
602
603 static void
604 on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
605 {
606   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
607       sess->id, ssrc);
608 }
609
610 static void
611 on_sender_ssrc_active (GstElement * session, guint32 ssrc,
612     GstRtpBinSession * sess)
613 {
614   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE],
615       0, sess->id, ssrc);
616 }
617
618 /* must be called with the SESSION lock */
619 static GstRtpBinStream *
620 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
621 {
622   GSList *walk;
623
624   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
625     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
626
627     if (stream->ssrc == ssrc)
628       return stream;
629   }
630   return NULL;
631 }
632
633 static void
634 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
635     GstRtpBinSession * session)
636 {
637   GstRtpBinStream *stream = NULL;
638   GstRtpBin *rtpbin;
639
640   rtpbin = session->bin;
641
642   GST_RTP_BIN_LOCK (rtpbin);
643
644   GST_RTP_SESSION_LOCK (session);
645   if ((stream = find_stream_by_ssrc (session, ssrc)))
646     session->streams = g_slist_remove (session->streams, stream);
647   GST_RTP_SESSION_UNLOCK (session);
648
649   if (stream)
650     free_stream (stream, rtpbin);
651
652   GST_RTP_BIN_UNLOCK (rtpbin);
653 }
654
655 static void
656 new_bundled_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
657     GstRtpBinSession * session)
658 {
659   GValue result = G_VALUE_INIT;
660   GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT };
661   guint session_id = 0;
662   GstRtpBinSession *target_session = NULL;
663   GstRtpBin *rtpbin = session->bin;
664   gchar *name;
665   GstPad *src_pad;
666   GstPad *recv_rtp_sink = NULL;
667   GstPad *recv_rtcp_sink = NULL;
668   GstPadLinkReturn ret;
669
670   GST_RTP_BIN_DYN_LOCK (rtpbin);
671   GST_DEBUG_OBJECT (rtpbin, "new bundled SSRC pad %08x, %s:%s", ssrc,
672       GST_DEBUG_PAD_NAME (pad));
673
674   g_value_init (&result, G_TYPE_UINT);
675   g_value_init (&params[0], GST_TYPE_ELEMENT);
676   g_value_set_object (&params[0], rtpbin);
677   g_value_init (&params[1], G_TYPE_UINT);
678   g_value_set_uint (&params[1], ssrc);
679
680   g_signal_emitv (params,
681       gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, &result);
682   g_value_unset (&params[0]);
683
684   session_id = g_value_get_uint (&result);
685   if (session_id == 0) {
686     target_session = session;
687   } else {
688     target_session = find_session_by_id (rtpbin, (gint) session_id);
689     if (!target_session) {
690       target_session = create_session (rtpbin, session_id);
691     }
692     if (!target_session->recv_rtp_sink) {
693       recv_rtp_sink = complete_session_sink (rtpbin, target_session, FALSE);
694     }
695
696     if (!target_session->recv_rtp_src)
697       complete_session_receiver (rtpbin, target_session, session_id);
698
699     if (!target_session->recv_rtcp_sink) {
700       recv_rtcp_sink =
701           complete_session_rtcp (rtpbin, target_session, session_id, FALSE);
702     }
703   }
704
705   GST_DEBUG_OBJECT (rtpbin, "Assigning bundled ssrc %u to session %u", ssrc,
706       session_id);
707
708   if (!recv_rtp_sink) {
709     recv_rtp_sink =
710         gst_element_get_request_pad (target_session->rtp_funnel, "sink_%u");
711   }
712
713   if (!recv_rtcp_sink) {
714     recv_rtcp_sink =
715         gst_element_get_request_pad (target_session->rtcp_funnel, "sink_%u");
716   }
717
718   name = g_strdup_printf ("src_%u", ssrc);
719   src_pad = gst_element_get_static_pad (element, name);
720   ret = gst_pad_link (src_pad, recv_rtp_sink);
721   g_free (name);
722   gst_object_unref (src_pad);
723   gst_object_unref (recv_rtp_sink);
724   if (ret != GST_PAD_LINK_OK) {
725     g_warning
726         ("rtpbin: failed to link bundle demuxer to receive rtp funnel for session %u",
727         session_id);
728   }
729
730   name = g_strdup_printf ("rtcp_src_%u", ssrc);
731   src_pad = gst_element_get_static_pad (element, name);
732   gst_pad_link (src_pad, recv_rtcp_sink);
733   g_free (name);
734   gst_object_unref (src_pad);
735   gst_object_unref (recv_rtcp_sink);
736   if (ret != GST_PAD_LINK_OK) {
737     g_warning
738         ("rtpbin: failed to link bundle demuxer to receive rtcp sink pad for session %u",
739         session_id);
740   }
741
742   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
743 }
744
745 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
746 static GstRtpBinSession *
747 create_session (GstRtpBin * rtpbin, gint id)
748 {
749   GstRtpBinSession *sess;
750   GstElement *session, *demux;
751   GstState target;
752
753   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
754     goto no_session;
755
756   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
757     goto no_demux;
758
759   sess = g_new0 (GstRtpBinSession, 1);
760   g_mutex_init (&sess->lock);
761   sess->id = id;
762   sess->bin = rtpbin;
763   sess->session = session;
764   sess->demux = demux;
765
766   sess->rtp_funnel = gst_element_factory_make ("funnel", NULL);
767   sess->rtcp_funnel = gst_element_factory_make ("funnel", NULL);
768
769   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
770       (GDestroyNotify) gst_caps_unref);
771   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
772
773   /* configure SDES items */
774   GST_OBJECT_LOCK (rtpbin);
775   g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile",
776       rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time,
777       NULL);
778   if (rtpbin->use_pipeline_clock)
779     g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock,
780         NULL);
781   else
782     g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL);
783
784   g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time,
785       "max-misorder-time", rtpbin->max_misorder_time, NULL);
786   GST_OBJECT_UNLOCK (rtpbin);
787
788   /* provide clock_rate to the session manager when needed */
789   g_signal_connect (session, "request-pt-map",
790       (GCallback) pt_map_requested, sess);
791
792   g_signal_connect (sess->session, "on-new-ssrc",
793       (GCallback) on_new_ssrc, sess);
794   g_signal_connect (sess->session, "on-ssrc-collision",
795       (GCallback) on_ssrc_collision, sess);
796   g_signal_connect (sess->session, "on-ssrc-validated",
797       (GCallback) on_ssrc_validated, sess);
798   g_signal_connect (sess->session, "on-ssrc-active",
799       (GCallback) on_ssrc_active, sess);
800   g_signal_connect (sess->session, "on-ssrc-sdes",
801       (GCallback) on_ssrc_sdes, sess);
802   g_signal_connect (sess->session, "on-bye-ssrc",
803       (GCallback) on_bye_ssrc, sess);
804   g_signal_connect (sess->session, "on-bye-timeout",
805       (GCallback) on_bye_timeout, sess);
806   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
807   g_signal_connect (sess->session, "on-sender-timeout",
808       (GCallback) on_sender_timeout, sess);
809   g_signal_connect (sess->session, "on-new-sender-ssrc",
810       (GCallback) on_new_sender_ssrc, sess);
811   g_signal_connect (sess->session, "on-sender-ssrc-active",
812       (GCallback) on_sender_ssrc_active, sess);
813
814   gst_bin_add (GST_BIN_CAST (rtpbin), session);
815   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
816   gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtp_funnel);
817   gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtcp_funnel);
818
819   GST_OBJECT_LOCK (rtpbin);
820   target = GST_STATE_TARGET (rtpbin);
821   GST_OBJECT_UNLOCK (rtpbin);
822
823   /* change state only to what's needed */
824   gst_element_set_state (demux, target);
825   gst_element_set_state (session, target);
826   gst_element_set_state (sess->rtp_funnel, target);
827   gst_element_set_state (sess->rtcp_funnel, target);
828
829   return sess;
830
831   /* ERRORS */
832 no_session:
833   {
834     g_warning ("rtpbin: could not create rtpsession element");
835     return NULL;
836   }
837 no_demux:
838   {
839     gst_object_unref (session);
840     g_warning ("rtpbin: could not create rtpssrcdemux element");
841     return NULL;
842   }
843 }
844
845 static gboolean
846 bin_manage_element (GstRtpBin * bin, GstElement * element)
847 {
848   GstRtpBinPrivate *priv = bin->priv;
849
850   if (g_list_find (priv->elements, element)) {
851     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
852   } else {
853     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
854     if (!gst_bin_add (GST_BIN_CAST (bin), element))
855       goto add_failed;
856     if (!gst_element_sync_state_with_parent (element))
857       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
858   }
859   /* we add the element multiple times, each we need an equal number of
860    * removes to really remove the element from the bin */
861   priv->elements = g_list_prepend (priv->elements, element);
862
863   return TRUE;
864
865   /* ERRORS */
866 add_failed:
867   {
868     GST_WARNING_OBJECT (bin, "unable to add element");
869     return FALSE;
870   }
871 }
872
873 static void
874 remove_bin_element (GstElement * element, GstRtpBin * bin)
875 {
876   GstRtpBinPrivate *priv = bin->priv;
877   GList *find;
878
879   find = g_list_find (priv->elements, element);
880   if (find) {
881     priv->elements = g_list_delete_link (priv->elements, find);
882
883     if (!g_list_find (priv->elements, element))
884       gst_bin_remove (GST_BIN_CAST (bin), element);
885     else
886       gst_object_unref (element);
887   }
888 }
889
890 /* called with RTP_BIN_LOCK */
891 static void
892 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
893 {
894   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
895
896   gst_element_set_locked_state (sess->demux, TRUE);
897   gst_element_set_locked_state (sess->session, TRUE);
898
899   gst_element_set_state (sess->demux, GST_STATE_NULL);
900   gst_element_set_state (sess->session, GST_STATE_NULL);
901
902   remove_recv_rtp (bin, sess);
903   remove_recv_rtcp (bin, sess);
904   remove_send_rtp (bin, sess);
905   remove_rtcp (bin, sess);
906
907   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
908   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
909
910   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
911   g_slist_free (sess->elements);
912
913   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
914   g_slist_free (sess->streams);
915
916   g_mutex_clear (&sess->lock);
917   g_hash_table_destroy (sess->ptmap);
918
919   g_free (sess);
920 }
921
922 /* get the payload type caps for the specific payload @pt in @session */
923 static GstCaps *
924 get_pt_map (GstRtpBinSession * session, guint pt)
925 {
926   GstCaps *caps = NULL;
927   GstRtpBin *bin;
928   GValue ret = { 0 };
929   GValue args[3] = { {0}, {0}, {0} };
930
931   GST_DEBUG ("searching pt %u in cache", pt);
932
933   GST_RTP_SESSION_LOCK (session);
934
935   /* first look in the cache */
936   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
937   if (caps) {
938     gst_caps_ref (caps);
939     goto done;
940   }
941
942   bin = session->bin;
943
944   GST_DEBUG ("emiting signal for pt %u in session %u", pt, session->id);
945
946   /* not in cache, send signal to request caps */
947   g_value_init (&args[0], GST_TYPE_ELEMENT);
948   g_value_set_object (&args[0], bin);
949   g_value_init (&args[1], G_TYPE_UINT);
950   g_value_set_uint (&args[1], session->id);
951   g_value_init (&args[2], G_TYPE_UINT);
952   g_value_set_uint (&args[2], pt);
953
954   g_value_init (&ret, GST_TYPE_CAPS);
955   g_value_set_boxed (&ret, NULL);
956
957   GST_RTP_SESSION_UNLOCK (session);
958
959   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
960
961   GST_RTP_SESSION_LOCK (session);
962
963   g_value_unset (&args[0]);
964   g_value_unset (&args[1]);
965   g_value_unset (&args[2]);
966
967   /* look in the cache again because we let the lock go */
968   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
969   if (caps) {
970     gst_caps_ref (caps);
971     g_value_unset (&ret);
972     goto done;
973   }
974
975   caps = (GstCaps *) g_value_dup_boxed (&ret);
976   g_value_unset (&ret);
977   if (!caps)
978     goto no_caps;
979
980   GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps);
981
982   /* store in cache, take additional ref */
983   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
984       gst_caps_ref (caps));
985
986 done:
987   GST_RTP_SESSION_UNLOCK (session);
988
989   return caps;
990
991   /* ERRORS */
992 no_caps:
993   {
994     GST_RTP_SESSION_UNLOCK (session);
995     GST_DEBUG ("no pt map could be obtained");
996     return NULL;
997   }
998 }
999
1000 static gboolean
1001 return_true (gpointer key, gpointer value, gpointer user_data)
1002 {
1003   return TRUE;
1004 }
1005
1006 static void
1007 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
1008 {
1009   GSList *clients, *streams;
1010
1011   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
1012
1013   GST_RTP_BIN_LOCK (rtpbin);
1014   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
1015     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1016
1017     /* reset sync on all streams for this client */
1018     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
1019       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1020
1021       /* make use require a new SR packet for this stream before we attempt new
1022        * lip-sync */
1023       stream->have_sync = FALSE;
1024       stream->rt_delta = 0;
1025       stream->rtp_delta = 0;
1026       stream->clock_base = -100 * GST_SECOND;
1027     }
1028   }
1029   GST_RTP_BIN_UNLOCK (rtpbin);
1030 }
1031
1032 static void
1033 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
1034 {
1035   GSList *sessions, *streams;
1036
1037   GST_RTP_BIN_LOCK (bin);
1038   GST_DEBUG_OBJECT (bin, "clearing pt map");
1039   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1040     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1041
1042     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
1043     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
1044
1045     GST_RTP_SESSION_LOCK (session);
1046     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
1047
1048     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1049       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1050
1051       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
1052       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
1053       if (stream->demux)
1054         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
1055     }
1056     GST_RTP_SESSION_UNLOCK (session);
1057   }
1058   GST_RTP_BIN_UNLOCK (bin);
1059
1060   /* reset sync too */
1061   gst_rtp_bin_reset_sync (bin);
1062 }
1063
1064 static GstElement *
1065 gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id)
1066 {
1067   GstRtpBinSession *session;
1068   GstElement *ret = NULL;
1069
1070   GST_RTP_BIN_LOCK (bin);
1071   GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id);
1072   session = find_session_by_id (bin, (gint) session_id);
1073   if (session) {
1074     ret = gst_object_ref (session->session);
1075   }
1076   GST_RTP_BIN_UNLOCK (bin);
1077
1078   return ret;
1079 }
1080
1081 static RTPSession *
1082 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
1083 {
1084   RTPSession *internal_session = NULL;
1085   GstRtpBinSession *session;
1086
1087   GST_RTP_BIN_LOCK (bin);
1088   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u",
1089       session_id);
1090   session = find_session_by_id (bin, (gint) session_id);
1091   if (session) {
1092     g_object_get (session->session, "internal-session", &internal_session,
1093         NULL);
1094   }
1095   GST_RTP_BIN_UNLOCK (bin);
1096
1097   return internal_session;
1098 }
1099
1100 static GstElement *
1101 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
1102 {
1103   GST_DEBUG_OBJECT (bin, "return NULL encoder");
1104   return NULL;
1105 }
1106
1107 static GstElement *
1108 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
1109 {
1110   GST_DEBUG_OBJECT (bin, "return NULL decoder");
1111   return NULL;
1112 }
1113
1114 static void
1115 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
1116     const gchar * name, const GValue * value)
1117 {
1118   GSList *sessions, *streams;
1119
1120   GST_RTP_BIN_LOCK (bin);
1121   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1122     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1123
1124     GST_RTP_SESSION_LOCK (session);
1125     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1126       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1127
1128       g_object_set_property (G_OBJECT (stream->buffer), name, value);
1129     }
1130     GST_RTP_SESSION_UNLOCK (session);
1131   }
1132   GST_RTP_BIN_UNLOCK (bin);
1133 }
1134
1135 static void
1136 gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin,
1137     const gchar * name, const GValue * value)
1138 {
1139   GSList *sessions;
1140
1141   GST_RTP_BIN_LOCK (bin);
1142   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1143     GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data;
1144
1145     g_object_set_property (G_OBJECT (sess->session), name, value);
1146   }
1147   GST_RTP_BIN_UNLOCK (bin);
1148 }
1149
1150 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
1151 static GstRtpBinClient *
1152 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
1153 {
1154   GstRtpBinClient *result = NULL;
1155   GSList *walk;
1156
1157   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
1158     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
1159
1160     if (len != client->cname_len)
1161       continue;
1162
1163     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
1164       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
1165           client->cname);
1166       result = client;
1167       break;
1168     }
1169   }
1170
1171   /* nothing found, create one */
1172   if (result == NULL) {
1173     result = g_new0 (GstRtpBinClient, 1);
1174     result->cname = g_strndup ((gchar *) data, len);
1175     result->cname_len = len;
1176     bin->clients = g_slist_prepend (bin->clients, result);
1177     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
1178         result->cname);
1179   }
1180   return result;
1181 }
1182
1183 static void
1184 free_client (GstRtpBinClient * client, GstRtpBin * bin)
1185 {
1186   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
1187   g_slist_free (client->streams);
1188   g_free (client->cname);
1189   g_free (client);
1190 }
1191
1192 static void
1193 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
1194     guint64 * ntpnstime)
1195 {
1196   guint64 ntpns = -1;
1197   GstClock *clock;
1198   GstClockTime base_time, rt, clock_time;
1199
1200   GST_OBJECT_LOCK (bin);
1201   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1202     base_time = GST_ELEMENT_CAST (bin)->base_time;
1203     gst_object_ref (clock);
1204     GST_OBJECT_UNLOCK (bin);
1205
1206     /* get current clock time and convert to running time */
1207     clock_time = gst_clock_get_time (clock);
1208     rt = clock_time - base_time;
1209
1210     if (bin->use_pipeline_clock) {
1211       ntpns = rt;
1212       /* add constant to convert from 1970 based time to 1900 based time */
1213       ntpns += (2208988800LL * GST_SECOND);
1214     } else {
1215       switch (bin->ntp_time_source) {
1216         case GST_RTP_NTP_TIME_SOURCE_NTP:
1217         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1218           GTimeVal current;
1219
1220           /* get current NTP time */
1221           g_get_current_time (&current);
1222           ntpns = GST_TIMEVAL_TO_TIME (current);
1223
1224           /* add constant to convert from 1970 based time to 1900 based time */
1225           if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1226             ntpns += (2208988800LL * GST_SECOND);
1227           break;
1228         }
1229         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1230           ntpns = rt;
1231           break;
1232         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1233           ntpns = clock_time;
1234           break;
1235         default:
1236           ntpns = -1;           /* Fix uninited compiler warning */
1237           g_assert_not_reached ();
1238           break;
1239       }
1240     }
1241
1242     gst_object_unref (clock);
1243   } else {
1244     GST_OBJECT_UNLOCK (bin);
1245     rt = -1;
1246     ntpns = -1;
1247   }
1248   if (running_time)
1249     *running_time = rt;
1250   if (ntpnstime)
1251     *ntpnstime = ntpns;
1252 }
1253
1254 static void
1255 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1256     gint64 ts_offset, gboolean check)
1257 {
1258   gint64 prev_ts_offset;
1259
1260   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1261
1262   /* delta changed, see how much */
1263   if (prev_ts_offset != ts_offset) {
1264     gint64 diff;
1265
1266     diff = prev_ts_offset - ts_offset;
1267
1268     GST_DEBUG_OBJECT (bin,
1269         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1270         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1271
1272     if (check) {
1273       /* only change diff when it changed more than 4 milliseconds. This
1274        * compensates for rounding errors in NTP to RTP timestamp
1275        * conversions */
1276       if (ABS (diff) < 4 * GST_MSECOND) {
1277         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1278         return;
1279       }
1280       if (ABS (diff) > (3 * GST_SECOND)) {
1281         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
1282         return;
1283       }
1284     }
1285     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
1286   }
1287   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1288       stream->ssrc, ts_offset);
1289 }
1290
1291 static void
1292 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1293 {
1294   if (stream->bin->send_sync_event) {
1295     GstEvent *event;
1296     GstPad *srcpad;
1297
1298     GST_DEBUG_OBJECT (stream->bin,
1299         "sending GstRTCPSRReceived event downstream");
1300
1301     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1302         gst_structure_new_empty ("GstRTCPSRReceived"));
1303
1304     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1305     gst_pad_push_event (srcpad, event);
1306     gst_object_unref (srcpad);
1307   }
1308 }
1309
1310 /* associate a stream to the given CNAME. This will make sure all streams for
1311  * that CNAME are synchronized together.
1312  * Must be called with GST_RTP_BIN_LOCK */
1313 static void
1314 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1315     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1316     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1317     gint64 rtp_clock_base)
1318 {
1319   GstRtpBinClient *client;
1320   gboolean created;
1321   GSList *walk;
1322   GstClockTime running_time, running_time_rtp;
1323   guint64 ntpnstime;
1324
1325   /* first find or create the CNAME */
1326   client = get_client (bin, len, data, &created);
1327
1328   /* find stream in the client */
1329   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1330     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1331
1332     if (ostream == stream)
1333       break;
1334   }
1335   /* not found, add it to the list */
1336   if (walk == NULL) {
1337     GST_DEBUG_OBJECT (bin,
1338         "new association of SSRC %08x with client %p with CNAME %s",
1339         stream->ssrc, client, client->cname);
1340     client->streams = g_slist_prepend (client->streams, stream);
1341     client->nstreams++;
1342   } else {
1343     GST_DEBUG_OBJECT (bin,
1344         "found association of SSRC %08x with client %p with CNAME %s",
1345         stream->ssrc, client, client->cname);
1346   }
1347
1348   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1349     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1350     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1351       /* we don't need that data, so carry on,
1352        * but make some values look saner */
1353       last_extrtptime = base_rtptime;
1354     } else {
1355       /* nothing we can do with this data in this case */
1356       GST_DEBUG_OBJECT (bin, "bailing out");
1357       return;
1358     }
1359   }
1360
1361   /* Take the extended rtptime we found in the SR packet and map it to the
1362    * local rtptime. The local rtp time is used to construct timestamps on the
1363    * buffers so we will calculate what running_time corresponds to the RTP
1364    * timestamp in the SR packet. */
1365   running_time_rtp = last_extrtptime - base_rtptime;
1366
1367   GST_DEBUG_OBJECT (bin,
1368       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1369       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1370       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1371       last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
1372
1373   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1374    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1375    * into a corresponding gstreamer timestamp. Note that the base_time also
1376    * contains the drift between sender and receiver. */
1377   running_time =
1378       gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
1379   running_time += base_time;
1380
1381   /* convert ntptime to nanoseconds */
1382   ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND,
1383       (G_GINT64_CONSTANT (1) << 32));
1384
1385   stream->have_sync = TRUE;
1386
1387   GST_DEBUG_OBJECT (bin,
1388       "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
1389       running_time, ntpnstime);
1390
1391   /* recalc inter stream playout offset, but only if there is more than one
1392    * stream or we're doing NTP sync. */
1393   if (bin->ntp_sync) {
1394     gint64 ntpdiff, rtdiff;
1395     guint64 local_ntpnstime;
1396     GstClockTime local_running_time;
1397
1398     /* For NTP sync we need to first get a snapshot of running_time and NTP
1399      * time. We know at what running_time we play a certain RTP time, we also
1400      * calculated when we would play the RTP time in the SR packet. Now we need
1401      * to know how the running_time and the NTP time relate to eachother. */
1402     get_current_times (bin, &local_running_time, &local_ntpnstime);
1403
1404     /* see how far away the NTP time is. This is the difference between the
1405      * current NTP time and the NTP time in the last SR packet. */
1406     ntpdiff = local_ntpnstime - ntpnstime;
1407     /* see how far away the running_time is. This is the difference between the
1408      * current running_time and the running_time of the RTP timestamp in the
1409      * last SR packet. */
1410     rtdiff = local_running_time - running_time;
1411
1412     GST_DEBUG_OBJECT (bin,
1413         "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
1414         local_ntpnstime, ntpnstime);
1415     GST_DEBUG_OBJECT (bin,
1416         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1417         rtdiff);
1418
1419     /* combine to get the final diff to apply to the running_time */
1420     stream->rt_delta = rtdiff - ntpdiff;
1421
1422     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1423   } else {
1424     gint64 min, rtp_min, clock_base = stream->clock_base;
1425     gboolean all_sync, use_rtp;
1426     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1427
1428     /* calculate delta between server and receiver. ntpnstime is created by
1429      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1430      * delta expresses the difference to our timeline and the server timeline. The
1431      * difference in itself doesn't mean much but we can combine the delta of
1432      * multiple streams to create a stream specific offset. */
1433     stream->rt_delta = ntpnstime - running_time;
1434
1435     /* calculate the min of all deltas, ignoring streams that did not yet have a
1436      * valid rt_delta because we did not yet receive an SR packet for those
1437      * streams.
1438      * We calculate the mininum because we would like to only apply positive
1439      * offsets to streams, delaying their playback instead of trying to speed up
1440      * other streams (which might be imposible when we have to create negative
1441      * latencies).
1442      * The stream that has the smallest diff is selected as the reference stream,
1443      * all other streams will have a positive offset to this difference. */
1444
1445     /* some alternative setting allow ignoring RTCP as much as possible,
1446      * for servers generating bogus ntp timeline */
1447     min = rtp_min = G_MAXINT64;
1448     use_rtp = FALSE;
1449     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1450       guint64 ext_base;
1451
1452       use_rtp = TRUE;
1453       /* signed version for convienience */
1454       clock_base = base_rtptime;
1455       /* deal with possible wrap-around */
1456       ext_base = base_rtptime;
1457       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1458       /* sanity check; base rtp and provided clock_base should be close */
1459       if (rtp_clock_base >= clock_base) {
1460         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1461           rtp_clock_base = base_time +
1462               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1463               GST_SECOND, clock_rate);
1464         } else {
1465           use_rtp = FALSE;
1466         }
1467       } else {
1468         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1469           rtp_clock_base = base_time -
1470               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1471               GST_SECOND, clock_rate);
1472         } else {
1473           use_rtp = FALSE;
1474         }
1475       }
1476       /* warn and bail for clarity out if no sane values */
1477       if (!use_rtp) {
1478         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1479         return;
1480       }
1481       /* store to track changes */
1482       clock_base = rtp_clock_base;
1483       /* generate a fake as before,
1484        * now equating rtptime obtained from RTP-Info,
1485        * where the large time represent the otherwise irrelevant npt/ntp time */
1486       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1487     } else {
1488       clock_base = rtp_clock_base;
1489     }
1490
1491     all_sync = TRUE;
1492     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1493       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1494
1495       if (!ostream->have_sync) {
1496         all_sync = FALSE;
1497         continue;
1498       }
1499
1500       /* change in current stream's base from previously init'ed value
1501        * leads to reset of all stream's base */
1502       if (stream != ostream && stream->clock_base >= 0 &&
1503           (stream->clock_base != clock_base)) {
1504         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1505         ostream->clock_base = -100 * GST_SECOND;
1506         ostream->rtp_delta = 0;
1507       }
1508
1509       if (ostream->rt_delta < min)
1510         min = ostream->rt_delta;
1511       if (ostream->rtp_delta < rtp_min)
1512         rtp_min = ostream->rtp_delta;
1513     }
1514
1515     /* arrange to re-sync for each stream upon significant change,
1516      * e.g. post-seek */
1517     all_sync = all_sync && (stream->clock_base == clock_base);
1518     stream->clock_base = clock_base;
1519
1520     /* may need init performed above later on, but nothing more to do now */
1521     if (client->nstreams <= 1)
1522       return;
1523
1524     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1525         " all sync %d", client, min, all_sync);
1526     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1527
1528     switch (rtcp_sync) {
1529       case GST_RTP_BIN_RTCP_SYNC_RTP:
1530         if (!use_rtp)
1531           break;
1532         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1533             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1534         /* fall-through */
1535       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1536         /* if all have been synced already, do not bother further */
1537         if (all_sync) {
1538           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1539           return;
1540         }
1541         break;
1542       default:
1543         break;
1544     }
1545
1546     /* bail out if we adjusted recently enough */
1547     if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
1548         bin->rtcp_sync_interval * GST_MSECOND) {
1549       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1550           "previous sender info too recent "
1551           "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
1552       return;
1553     }
1554     bin->priv->last_ntpnstime = ntpnstime;
1555
1556     /* calculate offsets for each stream */
1557     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1558       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1559       gint64 ts_offset;
1560
1561       /* ignore streams for which we didn't receive an SR packet yet, we
1562        * can't synchronize them yet. We can however sync other streams just
1563        * fine. */
1564       if (!ostream->have_sync)
1565         continue;
1566
1567       /* calculate offset to our reference stream, this should always give a
1568        * positive number. */
1569       if (use_rtp)
1570         ts_offset = ostream->rtp_delta - rtp_min;
1571       else
1572         ts_offset = ostream->rt_delta - min;
1573
1574       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1575     }
1576   }
1577   gst_rtp_bin_send_sync_event (stream);
1578
1579   return;
1580 }
1581
1582 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1583   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1584           (b) = gst_rtcp_packet_move_to_next ((packet)))
1585
1586 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1587   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1588           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1589
1590 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1591   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1592           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1593
1594 static void
1595 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1596     GstRtpBinStream * stream)
1597 {
1598   GstRtpBin *bin;
1599   GstRTCPPacket packet;
1600   guint32 ssrc;
1601   guint64 ntptime;
1602   gboolean have_sr, have_sdes;
1603   gboolean more;
1604   guint64 base_rtptime;
1605   guint64 base_time;
1606   guint clock_rate;
1607   guint64 clock_base;
1608   guint64 extrtptime;
1609   GstBuffer *buffer;
1610   GstRTCPBuffer rtcp = { NULL, };
1611
1612   bin = stream->bin;
1613
1614   GST_DEBUG_OBJECT (bin, "sync handler called");
1615
1616   /* get the last relation between the rtp timestamps and the gstreamer
1617    * timestamps. We get this info directly from the jitterbuffer which
1618    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1619    * what the current situation is. */
1620   base_rtptime =
1621       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1622   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1623   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1624   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1625   extrtptime =
1626       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1627   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1628
1629   have_sr = FALSE;
1630   have_sdes = FALSE;
1631
1632   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1633
1634   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1635     /* first packet must be SR or RR or else the validate would have failed */
1636     switch (gst_rtcp_packet_get_type (&packet)) {
1637       case GST_RTCP_TYPE_SR:
1638         /* only parse first. There is only supposed to be one SR in the packet
1639          * but we will deal with malformed packets gracefully */
1640         if (have_sr)
1641           break;
1642         /* get NTP and RTP times */
1643         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1644             NULL, NULL);
1645
1646         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1647         /* ignore SR that is not ours */
1648         if (ssrc != stream->ssrc)
1649           continue;
1650
1651         have_sr = TRUE;
1652         break;
1653       case GST_RTCP_TYPE_SDES:
1654       {
1655         gboolean more_items, more_entries;
1656
1657         /* only deal with first SDES, there is only supposed to be one SDES in
1658          * the RTCP packet but we deal with bad packets gracefully. Also bail
1659          * out if we have not seen an SR item yet. */
1660         if (have_sdes || !have_sr)
1661           break;
1662
1663         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1664           /* skip items that are not about the SSRC of the sender */
1665           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1666             continue;
1667
1668           /* find the CNAME entry */
1669           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1670             GstRTCPSDESType type;
1671             guint8 len;
1672             guint8 *data;
1673
1674             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1675
1676             if (type == GST_RTCP_SDES_CNAME) {
1677               GST_RTP_BIN_LOCK (bin);
1678               /* associate the stream to CNAME */
1679               gst_rtp_bin_associate (bin, stream, len, data,
1680                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1681                   clock_base);
1682               GST_RTP_BIN_UNLOCK (bin);
1683             }
1684           }
1685         }
1686         have_sdes = TRUE;
1687         break;
1688       }
1689       default:
1690         /* we can ignore these packets */
1691         break;
1692     }
1693   }
1694   gst_rtcp_buffer_unmap (&rtcp);
1695 }
1696
1697 /* create a new stream with @ssrc in @session. Must be called with
1698  * RTP_SESSION_LOCK. */
1699 static GstRtpBinStream *
1700 create_stream (GstRtpBinSession * session, guint32 ssrc)
1701 {
1702   GstElement *buffer, *demux = NULL;
1703   GstRtpBinStream *stream;
1704   GstRtpBin *rtpbin;
1705   GstState target;
1706
1707   rtpbin = session->bin;
1708
1709   if (g_slist_length (session->streams) >= rtpbin->max_streams)
1710     goto max_streams;
1711
1712   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1713     goto no_jitterbuffer;
1714
1715   if (!rtpbin->ignore_pt)
1716     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1717       goto no_demux;
1718
1719   stream = g_new0 (GstRtpBinStream, 1);
1720   stream->ssrc = ssrc;
1721   stream->bin = rtpbin;
1722   stream->session = session;
1723   stream->buffer = buffer;
1724   stream->demux = demux;
1725
1726   stream->have_sync = FALSE;
1727   stream->rt_delta = 0;
1728   stream->rtp_delta = 0;
1729   stream->percent = 100;
1730   stream->clock_base = -100 * GST_SECOND;
1731   session->streams = g_slist_prepend (session->streams, stream);
1732
1733   /* provide clock_rate to the jitterbuffer when needed */
1734   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1735       (GCallback) pt_map_requested, session);
1736   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1737       (GCallback) on_npt_stop, stream);
1738
1739   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1740   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1741
1742   /* configure latency and packet lost */
1743   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1744   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1745   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1746   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1747   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1748   g_object_set (buffer, "max-rtcp-rtp-time-diff",
1749       rtpbin->max_rtcp_rtp_time_diff, NULL);
1750   g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time,
1751       "max-misorder-time", rtpbin->max_misorder_time, NULL);
1752   g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
1753
1754   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1755       buffer, session->id, ssrc);
1756
1757   if (!rtpbin->ignore_pt)
1758     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1759   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1760
1761   /* link stuff */
1762   if (demux)
1763     gst_element_link_pads_full (buffer, "src", demux, "sink",
1764         GST_PAD_LINK_CHECK_NOTHING);
1765
1766   if (rtpbin->buffering) {
1767     guint64 last_out;
1768
1769     GST_INFO_OBJECT (rtpbin,
1770         "bin is buffering, set jitterbuffer as not active");
1771     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1772   }
1773
1774
1775   GST_OBJECT_LOCK (rtpbin);
1776   target = GST_STATE_TARGET (rtpbin);
1777   GST_OBJECT_UNLOCK (rtpbin);
1778
1779   /* from sink to source */
1780   if (demux)
1781     gst_element_set_state (demux, target);
1782
1783   gst_element_set_state (buffer, target);
1784
1785   return stream;
1786
1787   /* ERRORS */
1788 max_streams:
1789   {
1790     GST_WARNING_OBJECT (rtpbin, "stream exeeds maximum (%d)",
1791         rtpbin->max_streams);
1792     return NULL;
1793   }
1794 no_jitterbuffer:
1795   {
1796     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1797     return NULL;
1798   }
1799 no_demux:
1800   {
1801     gst_object_unref (buffer);
1802     g_warning ("rtpbin: could not create rtpptdemux element");
1803     return NULL;
1804   }
1805 }
1806
1807 /* called with RTP_BIN_LOCK */
1808 static void
1809 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1810 {
1811   GSList *clients, *next_client;
1812
1813   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1814
1815   if (stream->demux) {
1816     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1817     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1818     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1819   }
1820   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1821   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1822   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1823
1824   if (stream->demux)
1825     gst_element_set_locked_state (stream->demux, TRUE);
1826   gst_element_set_locked_state (stream->buffer, TRUE);
1827
1828   if (stream->demux)
1829     gst_element_set_state (stream->demux, GST_STATE_NULL);
1830   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1831
1832   /* now remove this signal, we need this while going to NULL because it to
1833    * do some cleanups */
1834   if (stream->demux)
1835     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1836
1837   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1838   if (stream->demux)
1839     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1840
1841   for (clients = bin->clients; clients; clients = next_client) {
1842     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1843     GSList *streams, *next_stream;
1844
1845     next_client = g_slist_next (clients);
1846
1847     for (streams = client->streams; streams; streams = next_stream) {
1848       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1849
1850       next_stream = g_slist_next (streams);
1851
1852       if (ostream == stream) {
1853         client->streams = g_slist_delete_link (client->streams, streams);
1854         /* If this was the last stream belonging to this client,
1855          * clean up the client. */
1856         if (--client->nstreams == 0) {
1857           bin->clients = g_slist_delete_link (bin->clients, clients);
1858           free_client (client, bin);
1859           break;
1860         }
1861       }
1862     }
1863   }
1864   g_free (stream);
1865 }
1866
1867 /* GObject vmethods */
1868 static void gst_rtp_bin_dispose (GObject * object);
1869 static void gst_rtp_bin_finalize (GObject * object);
1870 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1871     const GValue * value, GParamSpec * pspec);
1872 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1873     GValue * value, GParamSpec * pspec);
1874
1875 /* GstElement vmethods */
1876 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1877     GstStateChange transition);
1878 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1879     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1880 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1881 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1882
1883 #define gst_rtp_bin_parent_class parent_class
1884 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1885
1886 static gboolean
1887 _gst_element_accumulator (GSignalInvocationHint * ihint,
1888     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1889 {
1890   GstElement *element;
1891
1892   element = g_value_get_object (handler_return);
1893   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
1894
1895   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1896     g_value_set_object (return_accu, element);
1897
1898   /* stop emission if we have an element */
1899   return (element == NULL);
1900 }
1901
1902 static gboolean
1903 _gst_caps_accumulator (GSignalInvocationHint * ihint,
1904     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1905 {
1906   GstCaps *caps;
1907
1908   caps = g_value_get_boxed (handler_return);
1909   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
1910
1911   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1912     g_value_set_boxed (return_accu, caps);
1913
1914   /* stop emission if we have a caps */
1915   return (caps == NULL);
1916 }
1917
1918 static void
1919 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1920 {
1921   GObjectClass *gobject_class;
1922   GstElementClass *gstelement_class;
1923   GstBinClass *gstbin_class;
1924
1925   gobject_class = (GObjectClass *) klass;
1926   gstelement_class = (GstElementClass *) klass;
1927   gstbin_class = (GstBinClass *) klass;
1928
1929   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1930
1931   gobject_class->dispose = gst_rtp_bin_dispose;
1932   gobject_class->finalize = gst_rtp_bin_finalize;
1933   gobject_class->set_property = gst_rtp_bin_set_property;
1934   gobject_class->get_property = gst_rtp_bin_get_property;
1935
1936   g_object_class_install_property (gobject_class, PROP_LATENCY,
1937       g_param_spec_uint ("latency", "Buffer latency in ms",
1938           "Default amount of ms to buffer in the jitterbuffers", 0,
1939           G_MAXUINT, DEFAULT_LATENCY_MS,
1940           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1941
1942   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1943       g_param_spec_boolean ("drop-on-latency",
1944           "Drop buffers when maximum latency is reached",
1945           "Tells the jitterbuffer to never exceed the given latency in size",
1946           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1947
1948   /**
1949    * GstRtpBin::request-pt-map:
1950    * @rtpbin: the object which received the signal
1951    * @session: the session
1952    * @pt: the pt
1953    *
1954    * Request the payload type as #GstCaps for @pt in @session.
1955    */
1956   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1957       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1958       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1959       _gst_caps_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS,
1960       2, G_TYPE_UINT, G_TYPE_UINT);
1961
1962     /**
1963    * GstRtpBin::payload-type-change:
1964    * @rtpbin: the object which received the signal
1965    * @session: the session
1966    * @pt: the pt
1967    *
1968    * Signal that the current payload type changed to @pt in @session.
1969    */
1970   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1971       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1972       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1973       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1974       G_TYPE_UINT);
1975
1976   /**
1977    * GstRtpBin::clear-pt-map:
1978    * @rtpbin: the object which received the signal
1979    *
1980    * Clear all previously cached pt-mapping obtained with
1981    * #GstRtpBin::request-pt-map.
1982    */
1983   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1984       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1985       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1986           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1987       0, G_TYPE_NONE);
1988
1989   /**
1990    * GstRtpBin::reset-sync:
1991    * @rtpbin: the object which received the signal
1992    *
1993    * Reset all currently configured lip-sync parameters and require new SR
1994    * packets for all streams before lip-sync is attempted again.
1995    */
1996   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1997       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1998       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1999           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
2000       0, G_TYPE_NONE);
2001
2002   /**
2003    * GstRtpBin::get-session:
2004    * @rtpbin: the object which received the signal
2005    * @id: the session id
2006    *
2007    * Request the related GstRtpSession as #GstElement related with session @id.
2008    *
2009    * Since: 1.8
2010    */
2011   gst_rtp_bin_signals[SIGNAL_GET_SESSION] =
2012       g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass),
2013       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2014           get_session), NULL, NULL, g_cclosure_marshal_generic,
2015       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2016
2017   /**
2018    * GstRtpBin::get-internal-session:
2019    * @rtpbin: the object which received the signal
2020    * @id: the session id
2021    *
2022    * Request the internal RTPSession object as #GObject in session @id.
2023    */
2024   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
2025       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
2026       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2027           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
2028       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
2029
2030   /**
2031    * GstRtpBin::on-new-ssrc:
2032    * @rtpbin: the object which received the signal
2033    * @session: the session
2034    * @ssrc: the SSRC
2035    *
2036    * Notify of a new SSRC that entered @session.
2037    */
2038   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
2039       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
2040       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
2041       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2042       G_TYPE_UINT);
2043   /**
2044    * GstRtpBin::on-ssrc-collision:
2045    * @rtpbin: the object which received the signal
2046    * @session: the session
2047    * @ssrc: the SSRC
2048    *
2049    * Notify when we have an SSRC collision
2050    */
2051   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
2052       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
2053       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
2054       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2055       G_TYPE_UINT);
2056   /**
2057    * GstRtpBin::on-ssrc-validated:
2058    * @rtpbin: the object which received the signal
2059    * @session: the session
2060    * @ssrc: the SSRC
2061    *
2062    * Notify of a new SSRC that became validated.
2063    */
2064   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
2065       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
2066       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
2067       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2068       G_TYPE_UINT);
2069   /**
2070    * GstRtpBin::on-ssrc-active:
2071    * @rtpbin: the object which received the signal
2072    * @session: the session
2073    * @ssrc: the SSRC
2074    *
2075    * Notify of a SSRC that is active, i.e., sending RTCP.
2076    */
2077   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
2078       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
2079       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
2080       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2081       G_TYPE_UINT);
2082   /**
2083    * GstRtpBin::on-ssrc-sdes:
2084    * @rtpbin: the object which received the signal
2085    * @session: the session
2086    * @ssrc: the SSRC
2087    *
2088    * Notify of a SSRC that is active, i.e., sending RTCP.
2089    */
2090   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
2091       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
2092       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
2093       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2094       G_TYPE_UINT);
2095
2096   /**
2097    * GstRtpBin::on-bye-ssrc:
2098    * @rtpbin: the object which received the signal
2099    * @session: the session
2100    * @ssrc: the SSRC
2101    *
2102    * Notify of an SSRC that became inactive because of a BYE packet.
2103    */
2104   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
2105       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
2106       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
2107       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2108       G_TYPE_UINT);
2109   /**
2110    * GstRtpBin::on-bye-timeout:
2111    * @rtpbin: the object which received the signal
2112    * @session: the session
2113    * @ssrc: the SSRC
2114    *
2115    * Notify of an SSRC that has timed out because of BYE
2116    */
2117   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
2118       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
2119       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
2120       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2121       G_TYPE_UINT);
2122   /**
2123    * GstRtpBin::on-timeout:
2124    * @rtpbin: the object which received the signal
2125    * @session: the session
2126    * @ssrc: the SSRC
2127    *
2128    * Notify of an SSRC that has timed out
2129    */
2130   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
2131       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
2132       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
2133       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2134       G_TYPE_UINT);
2135   /**
2136    * GstRtpBin::on-sender-timeout:
2137    * @rtpbin: the object which received the signal
2138    * @session: the session
2139    * @ssrc: the SSRC
2140    *
2141    * Notify of a sender SSRC that has timed out and became a receiver
2142    */
2143   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
2144       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
2145       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
2146       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2147       G_TYPE_UINT);
2148
2149   /**
2150    * GstRtpBin::on-npt-stop:
2151    * @rtpbin: the object which received the signal
2152    * @session: the session
2153    * @ssrc: the SSRC
2154    *
2155    * Notify that SSRC sender has sent data up to the configured NPT stop time.
2156    */
2157   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
2158       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
2159       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
2160       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2161       G_TYPE_UINT);
2162
2163   /**
2164    * GstRtpBin::request-rtp-encoder:
2165    * @rtpbin: the object which received the signal
2166    * @session: the session
2167    *
2168    * Request an RTP encoder element for the given @session. The encoder
2169    * element will be added to the bin if not previously added.
2170    *
2171    * If no handler is connected, no encoder will be used.
2172    *
2173    * Since: 1.4
2174    */
2175   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
2176       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
2177       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2178           request_rtp_encoder), _gst_element_accumulator, NULL,
2179       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2180
2181   /**
2182    * GstRtpBin::request-rtp-decoder:
2183    * @rtpbin: the object which received the signal
2184    * @session: the session
2185    *
2186    * Request an RTP decoder element for the given @session. The decoder
2187    * element will be added to the bin if not previously added.
2188    *
2189    * If no handler is connected, no encoder will be used.
2190    *
2191    * Since: 1.4
2192    */
2193   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
2194       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
2195       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2196           request_rtp_decoder), _gst_element_accumulator, NULL,
2197       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2198
2199   /**
2200    * GstRtpBin::request-rtcp-encoder:
2201    * @rtpbin: the object which received the signal
2202    * @session: the session
2203    *
2204    * Request an RTCP encoder element for the given @session. The encoder
2205    * element will be added to the bin if not previously added.
2206    *
2207    * If no handler is connected, no encoder will be used.
2208    *
2209    * Since: 1.4
2210    */
2211   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
2212       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
2213       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2214           request_rtcp_encoder), _gst_element_accumulator, NULL,
2215       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2216
2217   /**
2218    * GstRtpBin::request-rtcp-decoder:
2219    * @rtpbin: the object which received the signal
2220    * @session: the session
2221    *
2222    * Request an RTCP decoder element for the given @session. The decoder
2223    * element will be added to the bin if not previously added.
2224    *
2225    * If no handler is connected, no encoder will be used.
2226    *
2227    * Since: 1.4
2228    */
2229   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
2230       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
2231       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2232           request_rtcp_decoder), _gst_element_accumulator, NULL,
2233       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2234
2235   /**
2236    * GstRtpBin::new-jitterbuffer:
2237    * @rtpbin: the object which received the signal
2238    * @jitterbuffer: the new jitterbuffer
2239    * @session: the session
2240    * @ssrc: the SSRC
2241    *
2242    * Notify that a new @jitterbuffer was created for @session and @ssrc.
2243    * This signal can, for example, be used to configure @jitterbuffer.
2244    *
2245    * Since: 1.4
2246    */
2247   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
2248       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2249       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2250           new_jitterbuffer), NULL, NULL, g_cclosure_marshal_generic,
2251       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2252
2253   /**
2254    * GstRtpBin::request-aux-sender:
2255    * @rtpbin: the object which received the signal
2256    * @session: the session
2257    *
2258    * Request an AUX sender element for the given @session. The AUX
2259    * element will be added to the bin.
2260    *
2261    * If no handler is connected, no AUX element will be used.
2262    *
2263    * Since: 1.4
2264    */
2265   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2266       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2267       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2268           request_aux_sender), _gst_element_accumulator, NULL,
2269       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2270   /**
2271    * GstRtpBin::request-aux-receiver:
2272    * @rtpbin: the object which received the signal
2273    * @session: the session
2274    *
2275    * Request an AUX receiver element for the given @session. The AUX
2276    * element will be added to the bin.
2277    *
2278    * If no handler is connected, no AUX element will be used.
2279    *
2280    * Since: 1.4
2281    */
2282   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2283       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2284       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2285           request_aux_receiver), _gst_element_accumulator, NULL,
2286       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2287   /**
2288    * GstRtpBin::on-new-sender-ssrc:
2289    * @rtpbin: the object which received the signal
2290    * @session: the session
2291    * @ssrc: the sender SSRC
2292    *
2293    * Since: 1.8
2294    *
2295    * Notify of a new sender SSRC that entered @session.
2296    */
2297   gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
2298       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
2299       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc),
2300       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2301       G_TYPE_UINT);
2302   /**
2303    * GstRtpBin::on-sender-ssrc-active:
2304    * @rtpbin: the object which received the signal
2305    * @session: the session
2306    * @ssrc: the sender SSRC
2307    *
2308    * Since: 1.8
2309    *
2310    * Notify of a sender SSRC that is active, i.e., sending RTCP.
2311    */
2312   gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
2313       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
2314       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2315           on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_generic,
2316       G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2317
2318
2319   /**
2320    * GstRtpBin::on-bundled-ssrc:
2321    * @rtpbin: the object which received the signal
2322    * @ssrc: the bundled SSRC
2323    *
2324    * Notify of a new incoming bundled SSRC. If no handler is connected to the
2325    * signal then the #GstRtpSession created for the recv_rtp_sink_\%u
2326    * request pad will be managing this new SSRC. However if there is a handler
2327    * connected then the application can decided to dispatch this new stream to
2328    * another session by providing its ID as return value of the handler. This
2329    * can be particularly useful to keep retransmission SSRCs grouped with the
2330    * session for which they handle retransmission.
2331    *
2332    * Since: 1.12
2333    */
2334   gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC] =
2335       g_signal_new ("on-bundled-ssrc", G_TYPE_FROM_CLASS (klass),
2336       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2337           on_bundled_ssrc), NULL, NULL,
2338       g_cclosure_marshal_generic, G_TYPE_UINT, 1, G_TYPE_UINT);
2339
2340
2341   g_object_class_install_property (gobject_class, PROP_SDES,
2342       g_param_spec_boxed ("sdes", "SDES",
2343           "The SDES items of this session",
2344           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2345
2346   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2347       g_param_spec_boolean ("do-lost", "Do Lost",
2348           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2350
2351   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2352       g_param_spec_boolean ("autoremove", "Auto Remove",
2353           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2354           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2355
2356   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2357       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2358           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2359           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2360
2361   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2362       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2363           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
2364           "(DEPRECATED: Use ntp-time-source property)",
2365           DEFAULT_USE_PIPELINE_CLOCK,
2366           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
2367   /**
2368    * GstRtpBin:buffer-mode:
2369    *
2370    * Control the buffering and timestamping mode used by the jitterbuffer.
2371    */
2372   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2373       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2374           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2375           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2376   /**
2377    * GstRtpBin:ntp-sync:
2378    *
2379    * Set the NTP time from the sender reports as the running-time on the
2380    * buffers. When both the sender and receiver have sychronized
2381    * running-time, i.e. when the clock and base-time is shared
2382    * between the receivers and the and the senders, this option can be
2383    * used to synchronize receivers on multiple machines.
2384    */
2385   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2386       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2387           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2388           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2389
2390   /**
2391    * GstRtpBin:rtcp-sync:
2392    *
2393    * If not synchronizing (directly) to the NTP clock, determines how to sync
2394    * the various streams.
2395    */
2396   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2397       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2398           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2399           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2400
2401   /**
2402    * GstRtpBin:rtcp-sync-interval:
2403    *
2404    * Determines how often to sync streams using RTCP data.
2405    */
2406   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2407       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2408           "RTCP SR interval synchronization (ms) (0 = always)",
2409           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2410           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2411
2412   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2413       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2414           "Send event downstream when a stream is synchronized to the sender",
2415           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2416
2417   /**
2418    * GstRtpBin:do-retransmission:
2419    *
2420    * Enables RTP retransmission on all streams. To control retransmission on
2421    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2422    * set the #GstRtpJitterBuffer::do-retransmission property on the
2423    * #GstRtpJitterBuffer object instead.
2424    */
2425   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2426       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2427           "Enable retransmission on all streams",
2428           DEFAULT_DO_RETRANSMISSION,
2429           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2430
2431   /**
2432    * GstRtpBin:rtp-profile:
2433    *
2434    * Sets the default RTP profile of newly created RTP sessions. The
2435    * profile can be changed afterwards on a per-session basis.
2436    */
2437   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
2438       g_param_spec_enum ("rtp-profile", "RTP Profile",
2439           "Default RTP profile of newly created sessions",
2440           GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
2441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2442
2443   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
2444       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
2445           "NTP time source for RTCP packets",
2446           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
2447           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2448
2449   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
2450       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
2451           "Use send time or capture time for RTCP sync "
2452           "(TRUE = send time, FALSE = capture time)",
2453           DEFAULT_RTCP_SYNC_SEND_TIME,
2454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2455
2456   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
2457       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
2458           "Maximum amount of time in ms that the RTP time in RTCP SRs "
2459           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
2460           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
2461           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2462
2463   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
2464       g_param_spec_uint ("max-dropout-time", "Max dropout time",
2465           "The maximum time (milliseconds) of missing packets tolerated.",
2466           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
2467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2468
2469   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
2470       g_param_spec_uint ("max-misorder-time", "Max misorder time",
2471           "The maximum time (milliseconds) of misordered packets tolerated.",
2472           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
2473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2474
2475   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
2476       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
2477           "Synchronize received streams to the RFC7273 clock "
2478           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
2479           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2480
2481   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
2482       g_param_spec_uint ("max-streams", "Max Streams",
2483           "The maximum number of streams to create for one session",
2484           0, G_MAXUINT, DEFAULT_MAX_STREAMS,
2485           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2486
2487   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2488   gstelement_class->request_new_pad =
2489       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2490   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2491
2492   /* sink pads */
2493   gst_element_class_add_static_pad_template (gstelement_class,
2494       &rtpbin_recv_rtp_sink_template);
2495   gst_element_class_add_static_pad_template (gstelement_class,
2496       &rtpbin_recv_rtcp_sink_template);
2497   gst_element_class_add_static_pad_template (gstelement_class,
2498       &rtpbin_send_rtp_sink_template);
2499
2500   /* src pads */
2501   gst_element_class_add_static_pad_template (gstelement_class,
2502       &rtpbin_recv_rtp_src_template);
2503   gst_element_class_add_static_pad_template (gstelement_class,
2504       &rtpbin_send_rtcp_src_template);
2505   gst_element_class_add_static_pad_template (gstelement_class,
2506       &rtpbin_send_rtp_src_template);
2507
2508   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2509       "Filter/Network/RTP",
2510       "Real-Time Transport Protocol bin",
2511       "Wim Taymans <wim.taymans@gmail.com>");
2512
2513   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2514
2515   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2516   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2517   klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session);
2518   klass->get_internal_session =
2519       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2520   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2521   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2522   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2523   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2524
2525   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2526 }
2527
2528 static void
2529 gst_rtp_bin_init (GstRtpBin * rtpbin)
2530 {
2531   gchar *cname;
2532
2533   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
2534   g_mutex_init (&rtpbin->priv->bin_lock);
2535   g_mutex_init (&rtpbin->priv->dyn_lock);
2536
2537   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2538   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2539   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2540   rtpbin->do_lost = DEFAULT_DO_LOST;
2541   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2542   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2543   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2544   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2545   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2546   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2547   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2548   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2549   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2550   rtpbin->rtp_profile = DEFAULT_RTP_PROFILE;
2551   rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
2552   rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
2553   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
2554   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
2555   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
2556   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
2557   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
2558
2559   /* some default SDES entries */
2560   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2561   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2562       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2563   g_free (cname);
2564 }
2565
2566 static void
2567 gst_rtp_bin_dispose (GObject * object)
2568 {
2569   GstRtpBin *rtpbin;
2570
2571   rtpbin = GST_RTP_BIN (object);
2572
2573   GST_RTP_BIN_LOCK (rtpbin);
2574   GST_DEBUG_OBJECT (object, "freeing sessions");
2575   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2576   g_slist_free (rtpbin->sessions);
2577   rtpbin->sessions = NULL;
2578   GST_RTP_BIN_UNLOCK (rtpbin);
2579
2580   G_OBJECT_CLASS (parent_class)->dispose (object);
2581 }
2582
2583 static void
2584 gst_rtp_bin_finalize (GObject * object)
2585 {
2586   GstRtpBin *rtpbin;
2587
2588   rtpbin = GST_RTP_BIN (object);
2589
2590   if (rtpbin->sdes)
2591     gst_structure_free (rtpbin->sdes);
2592
2593   g_mutex_clear (&rtpbin->priv->bin_lock);
2594   g_mutex_clear (&rtpbin->priv->dyn_lock);
2595
2596   G_OBJECT_CLASS (parent_class)->finalize (object);
2597 }
2598
2599
2600 static void
2601 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
2602 {
2603   GSList *item;
2604
2605   if (sdes == NULL)
2606     return;
2607
2608   GST_RTP_BIN_LOCK (bin);
2609
2610   GST_OBJECT_LOCK (bin);
2611   if (bin->sdes)
2612     gst_structure_free (bin->sdes);
2613   bin->sdes = gst_structure_copy (sdes);
2614   GST_OBJECT_UNLOCK (bin);
2615
2616   /* store in all sessions */
2617   for (item = bin->sessions; item; item = g_slist_next (item)) {
2618     GstRtpBinSession *session = item->data;
2619     g_object_set (session->session, "sdes", sdes, NULL);
2620   }
2621
2622   GST_RTP_BIN_UNLOCK (bin);
2623 }
2624
2625 static GstStructure *
2626 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2627 {
2628   GstStructure *result;
2629
2630   GST_OBJECT_LOCK (bin);
2631   result = gst_structure_copy (bin->sdes);
2632   GST_OBJECT_UNLOCK (bin);
2633
2634   return result;
2635 }
2636
2637 static void
2638 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2639     const GValue * value, GParamSpec * pspec)
2640 {
2641   GstRtpBin *rtpbin;
2642
2643   rtpbin = GST_RTP_BIN (object);
2644
2645   switch (prop_id) {
2646     case PROP_LATENCY:
2647       GST_RTP_BIN_LOCK (rtpbin);
2648       rtpbin->latency_ms = g_value_get_uint (value);
2649       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2650       GST_RTP_BIN_UNLOCK (rtpbin);
2651       /* propagate the property down to the jitterbuffer */
2652       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2653       break;
2654     case PROP_DROP_ON_LATENCY:
2655       GST_RTP_BIN_LOCK (rtpbin);
2656       rtpbin->drop_on_latency = g_value_get_boolean (value);
2657       GST_RTP_BIN_UNLOCK (rtpbin);
2658       /* propagate the property down to the jitterbuffer */
2659       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2660           "drop-on-latency", value);
2661       break;
2662     case PROP_SDES:
2663       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2664       break;
2665     case PROP_DO_LOST:
2666       GST_RTP_BIN_LOCK (rtpbin);
2667       rtpbin->do_lost = g_value_get_boolean (value);
2668       GST_RTP_BIN_UNLOCK (rtpbin);
2669       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2670       break;
2671     case PROP_NTP_SYNC:
2672       rtpbin->ntp_sync = g_value_get_boolean (value);
2673       break;
2674     case PROP_RTCP_SYNC:
2675       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2676       break;
2677     case PROP_RTCP_SYNC_INTERVAL:
2678       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2679       break;
2680     case PROP_IGNORE_PT:
2681       rtpbin->ignore_pt = g_value_get_boolean (value);
2682       break;
2683     case PROP_AUTOREMOVE:
2684       rtpbin->priv->autoremove = g_value_get_boolean (value);
2685       break;
2686     case PROP_USE_PIPELINE_CLOCK:
2687     {
2688       GSList *sessions;
2689       GST_RTP_BIN_LOCK (rtpbin);
2690       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2691       for (sessions = rtpbin->sessions; sessions;
2692           sessions = g_slist_next (sessions)) {
2693         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2694
2695         g_object_set (G_OBJECT (session->session),
2696             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2697       }
2698       GST_RTP_BIN_UNLOCK (rtpbin);
2699     }
2700       break;
2701     case PROP_DO_SYNC_EVENT:
2702       rtpbin->send_sync_event = g_value_get_boolean (value);
2703       break;
2704     case PROP_BUFFER_MODE:
2705       GST_RTP_BIN_LOCK (rtpbin);
2706       rtpbin->buffer_mode = g_value_get_enum (value);
2707       GST_RTP_BIN_UNLOCK (rtpbin);
2708       /* propagate the property down to the jitterbuffer */
2709       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2710       break;
2711     case PROP_DO_RETRANSMISSION:
2712       GST_RTP_BIN_LOCK (rtpbin);
2713       rtpbin->do_retransmission = g_value_get_boolean (value);
2714       GST_RTP_BIN_UNLOCK (rtpbin);
2715       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2716           "do-retransmission", value);
2717       break;
2718     case PROP_RTP_PROFILE:
2719       rtpbin->rtp_profile = g_value_get_enum (value);
2720       break;
2721     case PROP_NTP_TIME_SOURCE:{
2722       GSList *sessions;
2723       GST_RTP_BIN_LOCK (rtpbin);
2724       rtpbin->ntp_time_source = g_value_get_enum (value);
2725       for (sessions = rtpbin->sessions; sessions;
2726           sessions = g_slist_next (sessions)) {
2727         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2728
2729         g_object_set (G_OBJECT (session->session),
2730             "ntp-time-source", rtpbin->ntp_time_source, NULL);
2731       }
2732       GST_RTP_BIN_UNLOCK (rtpbin);
2733       break;
2734     }
2735     case PROP_RTCP_SYNC_SEND_TIME:{
2736       GSList *sessions;
2737       GST_RTP_BIN_LOCK (rtpbin);
2738       rtpbin->rtcp_sync_send_time = g_value_get_boolean (value);
2739       for (sessions = rtpbin->sessions; sessions;
2740           sessions = g_slist_next (sessions)) {
2741         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2742
2743         g_object_set (G_OBJECT (session->session),
2744             "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL);
2745       }
2746       GST_RTP_BIN_UNLOCK (rtpbin);
2747       break;
2748     }
2749     case PROP_MAX_RTCP_RTP_TIME_DIFF:
2750       GST_RTP_BIN_LOCK (rtpbin);
2751       rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value);
2752       GST_RTP_BIN_UNLOCK (rtpbin);
2753       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2754           "max-rtcp-rtp-time-diff", value);
2755       break;
2756     case PROP_MAX_DROPOUT_TIME:
2757       GST_RTP_BIN_LOCK (rtpbin);
2758       rtpbin->max_dropout_time = g_value_get_uint (value);
2759       GST_RTP_BIN_UNLOCK (rtpbin);
2760       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2761           "max-dropout-time", value);
2762       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
2763           value);
2764       break;
2765     case PROP_MAX_MISORDER_TIME:
2766       GST_RTP_BIN_LOCK (rtpbin);
2767       rtpbin->max_misorder_time = g_value_get_uint (value);
2768       GST_RTP_BIN_UNLOCK (rtpbin);
2769       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2770           "max-misorder-time", value);
2771       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time",
2772           value);
2773       break;
2774     case PROP_RFC7273_SYNC:
2775       rtpbin->rfc7273_sync = g_value_get_boolean (value);
2776       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2777           "rfc7273-sync", value);
2778       break;
2779     case PROP_MAX_STREAMS:
2780       rtpbin->max_streams = g_value_get_uint (value);
2781       break;
2782     default:
2783       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2784       break;
2785   }
2786 }
2787
2788 static void
2789 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2790     GValue * value, GParamSpec * pspec)
2791 {
2792   GstRtpBin *rtpbin;
2793
2794   rtpbin = GST_RTP_BIN (object);
2795
2796   switch (prop_id) {
2797     case PROP_LATENCY:
2798       GST_RTP_BIN_LOCK (rtpbin);
2799       g_value_set_uint (value, rtpbin->latency_ms);
2800       GST_RTP_BIN_UNLOCK (rtpbin);
2801       break;
2802     case PROP_DROP_ON_LATENCY:
2803       GST_RTP_BIN_LOCK (rtpbin);
2804       g_value_set_boolean (value, rtpbin->drop_on_latency);
2805       GST_RTP_BIN_UNLOCK (rtpbin);
2806       break;
2807     case PROP_SDES:
2808       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2809       break;
2810     case PROP_DO_LOST:
2811       GST_RTP_BIN_LOCK (rtpbin);
2812       g_value_set_boolean (value, rtpbin->do_lost);
2813       GST_RTP_BIN_UNLOCK (rtpbin);
2814       break;
2815     case PROP_IGNORE_PT:
2816       g_value_set_boolean (value, rtpbin->ignore_pt);
2817       break;
2818     case PROP_NTP_SYNC:
2819       g_value_set_boolean (value, rtpbin->ntp_sync);
2820       break;
2821     case PROP_RTCP_SYNC:
2822       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2823       break;
2824     case PROP_RTCP_SYNC_INTERVAL:
2825       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2826       break;
2827     case PROP_AUTOREMOVE:
2828       g_value_set_boolean (value, rtpbin->priv->autoremove);
2829       break;
2830     case PROP_BUFFER_MODE:
2831       g_value_set_enum (value, rtpbin->buffer_mode);
2832       break;
2833     case PROP_USE_PIPELINE_CLOCK:
2834       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2835       break;
2836     case PROP_DO_SYNC_EVENT:
2837       g_value_set_boolean (value, rtpbin->send_sync_event);
2838       break;
2839     case PROP_DO_RETRANSMISSION:
2840       GST_RTP_BIN_LOCK (rtpbin);
2841       g_value_set_boolean (value, rtpbin->do_retransmission);
2842       GST_RTP_BIN_UNLOCK (rtpbin);
2843       break;
2844     case PROP_RTP_PROFILE:
2845       g_value_set_enum (value, rtpbin->rtp_profile);
2846       break;
2847     case PROP_NTP_TIME_SOURCE:
2848       g_value_set_enum (value, rtpbin->ntp_time_source);
2849       break;
2850     case PROP_RTCP_SYNC_SEND_TIME:
2851       g_value_set_boolean (value, rtpbin->rtcp_sync_send_time);
2852       break;
2853     case PROP_MAX_RTCP_RTP_TIME_DIFF:
2854       GST_RTP_BIN_LOCK (rtpbin);
2855       g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff);
2856       GST_RTP_BIN_UNLOCK (rtpbin);
2857       break;
2858     case PROP_MAX_DROPOUT_TIME:
2859       g_value_set_uint (value, rtpbin->max_dropout_time);
2860       break;
2861     case PROP_MAX_MISORDER_TIME:
2862       g_value_set_uint (value, rtpbin->max_misorder_time);
2863       break;
2864     case PROP_RFC7273_SYNC:
2865       g_value_set_boolean (value, rtpbin->rfc7273_sync);
2866       break;
2867     case PROP_MAX_STREAMS:
2868       g_value_set_uint (value, rtpbin->max_streams);
2869       break;
2870     default:
2871       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2872       break;
2873   }
2874 }
2875
2876 static void
2877 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2878 {
2879   GstRtpBin *rtpbin;
2880
2881   rtpbin = GST_RTP_BIN (bin);
2882
2883   switch (GST_MESSAGE_TYPE (message)) {
2884     case GST_MESSAGE_ELEMENT:
2885     {
2886       const GstStructure *s = gst_message_get_structure (message);
2887
2888       /* we change the structure name and add the session ID to it */
2889       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2890         GstRtpBinSession *sess;
2891
2892         /* find the session we set it as object data */
2893         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2894             "GstRTPBin.session");
2895
2896         if (G_LIKELY (sess)) {
2897           message = gst_message_make_writable (message);
2898           s = gst_message_get_structure (message);
2899           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2900               sess->id, NULL);
2901         }
2902       }
2903       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2904       break;
2905     }
2906     case GST_MESSAGE_BUFFERING:
2907     {
2908       gint percent;
2909       gint min_percent = 100;
2910       GSList *sessions, *streams;
2911       GstRtpBinStream *stream;
2912       gboolean change = FALSE, active = FALSE;
2913       GstClockTime min_out_time;
2914       GstBufferingMode mode;
2915       gint avg_in, avg_out;
2916       gint64 buffering_left;
2917
2918       gst_message_parse_buffering (message, &percent);
2919       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2920           &buffering_left);
2921
2922       stream =
2923           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2924           "GstRTPBin.stream");
2925
2926       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2927
2928       /* get the stream */
2929       if (G_LIKELY (stream)) {
2930         GST_RTP_BIN_LOCK (rtpbin);
2931         /* fill in the percent */
2932         stream->percent = percent;
2933
2934         /* calculate the min value for all streams */
2935         for (sessions = rtpbin->sessions; sessions;
2936             sessions = g_slist_next (sessions)) {
2937           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2938
2939           GST_RTP_SESSION_LOCK (session);
2940           if (session->streams) {
2941             for (streams = session->streams; streams;
2942                 streams = g_slist_next (streams)) {
2943               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2944
2945               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2946                   stream->percent);
2947
2948               /* find min percent */
2949               if (min_percent > stream->percent)
2950                 min_percent = stream->percent;
2951             }
2952           } else {
2953             GST_INFO_OBJECT (bin,
2954                 "session has no streams, setting min_percent to 0");
2955             min_percent = 0;
2956           }
2957           GST_RTP_SESSION_UNLOCK (session);
2958         }
2959         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2960
2961         if (rtpbin->buffering) {
2962           if (min_percent == 100) {
2963             rtpbin->buffering = FALSE;
2964             active = TRUE;
2965             change = TRUE;
2966           }
2967         } else {
2968           if (min_percent < 100) {
2969             /* pause the streams */
2970             rtpbin->buffering = TRUE;
2971             active = FALSE;
2972             change = TRUE;
2973           }
2974         }
2975         GST_RTP_BIN_UNLOCK (rtpbin);
2976
2977         gst_message_unref (message);
2978
2979         /* make a new buffering message with the min value */
2980         message =
2981             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2982         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2983             buffering_left);
2984
2985         if (G_UNLIKELY (change)) {
2986           GstClock *clock;
2987           guint64 running_time = 0;
2988           guint64 offset = 0;
2989
2990           /* figure out the running time when we have a clock */
2991           if (G_LIKELY ((clock =
2992                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2993             guint64 now, base_time;
2994
2995             now = gst_clock_get_time (clock);
2996             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2997             running_time = now - base_time;
2998             gst_object_unref (clock);
2999           }
3000           GST_DEBUG_OBJECT (bin,
3001               "running time now %" GST_TIME_FORMAT,
3002               GST_TIME_ARGS (running_time));
3003
3004           GST_RTP_BIN_LOCK (rtpbin);
3005
3006           /* when we reactivate, calculate the offsets so that all streams have
3007            * an output time that is at least as big as the running_time */
3008           offset = 0;
3009           if (active) {
3010             if (running_time > rtpbin->buffer_start) {
3011               offset = running_time - rtpbin->buffer_start;
3012               if (offset >= rtpbin->latency_ns)
3013                 offset -= rtpbin->latency_ns;
3014               else
3015                 offset = 0;
3016             }
3017           }
3018
3019           /* pause all streams */
3020           min_out_time = -1;
3021           for (sessions = rtpbin->sessions; sessions;
3022               sessions = g_slist_next (sessions)) {
3023             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3024
3025             GST_RTP_SESSION_LOCK (session);
3026             for (streams = session->streams; streams;
3027                 streams = g_slist_next (streams)) {
3028               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3029               GstElement *element = stream->buffer;
3030               guint64 last_out;
3031
3032               g_signal_emit_by_name (element, "set-active", active, offset,
3033                   &last_out);
3034
3035               if (!active) {
3036                 g_object_get (element, "percent", &stream->percent, NULL);
3037
3038                 if (last_out == -1)
3039                   last_out = 0;
3040                 if (min_out_time == -1 || last_out < min_out_time)
3041                   min_out_time = last_out;
3042               }
3043
3044               GST_DEBUG_OBJECT (bin,
3045                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
3046                   GST_TIME_FORMAT ", percent %d", element, active,
3047                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
3048                   stream->percent);
3049             }
3050             GST_RTP_SESSION_UNLOCK (session);
3051           }
3052           GST_DEBUG_OBJECT (bin,
3053               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
3054
3055           /* the buffer_start is the min out time of all paused jitterbuffers */
3056           if (!active)
3057             rtpbin->buffer_start = min_out_time;
3058
3059           GST_RTP_BIN_UNLOCK (rtpbin);
3060         }
3061       }
3062       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3063       break;
3064     }
3065     default:
3066     {
3067       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3068       break;
3069     }
3070   }
3071 }
3072
3073 static GstStateChangeReturn
3074 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
3075 {
3076   GstStateChangeReturn res;
3077   GstRtpBin *rtpbin;
3078   GstRtpBinPrivate *priv;
3079
3080   rtpbin = GST_RTP_BIN (element);
3081   priv = rtpbin->priv;
3082
3083   switch (transition) {
3084     case GST_STATE_CHANGE_NULL_TO_READY:
3085       break;
3086     case GST_STATE_CHANGE_READY_TO_PAUSED:
3087       priv->last_ntpnstime = 0;
3088       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
3089       g_atomic_int_set (&priv->shutdown, 0);
3090       break;
3091     case GST_STATE_CHANGE_PAUSED_TO_READY:
3092       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
3093       g_atomic_int_set (&priv->shutdown, 1);
3094       /* wait for all callbacks to end by taking the lock. No new callbacks will
3095        * be able to happen as we set the shutdown flag. */
3096       GST_RTP_BIN_DYN_LOCK (rtpbin);
3097       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
3098       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3099       break;
3100     default:
3101       break;
3102   }
3103
3104   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3105
3106   switch (transition) {
3107     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3108       break;
3109     case GST_STATE_CHANGE_PAUSED_TO_READY:
3110       break;
3111     case GST_STATE_CHANGE_READY_TO_NULL:
3112       break;
3113     default:
3114       break;
3115   }
3116   return res;
3117 }
3118
3119 static GstElement *
3120 session_request_element (GstRtpBinSession * session, guint signal)
3121 {
3122   GstElement *element = NULL;
3123   GstRtpBin *bin = session->bin;
3124
3125   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
3126
3127   if (element) {
3128     if (!bin_manage_element (bin, element))
3129       goto manage_failed;
3130     session->elements = g_slist_prepend (session->elements, element);
3131   }
3132   return element;
3133
3134   /* ERRORS */
3135 manage_failed:
3136   {
3137     GST_WARNING_OBJECT (bin, "unable to manage element");
3138     gst_object_unref (element);
3139     return NULL;
3140   }
3141 }
3142
3143 static gboolean
3144 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
3145 {
3146   GstPad *gpad = GST_PAD_CAST (user_data);
3147
3148   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
3149   gst_pad_store_sticky_event (gpad, *event);
3150
3151   return TRUE;
3152 }
3153
3154 /* a new pad (SSRC) was created in @session. This signal is emited from the
3155  * payload demuxer. */
3156 static void
3157 new_payload_found (GstElement * element, guint pt, GstPad * pad,
3158     GstRtpBinStream * stream)
3159 {
3160   GstRtpBin *rtpbin;
3161   GstElementClass *klass;
3162   GstPadTemplate *templ;
3163   gchar *padname;
3164   GstPad *gpad;
3165
3166   rtpbin = stream->bin;
3167
3168   GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt);
3169
3170   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3171
3172   /* ghost the pad to the parent */
3173   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3174   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3175   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3176       stream->session->id, stream->ssrc, pt);
3177   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3178   g_free (padname);
3179   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
3180
3181   gst_pad_set_active (gpad, TRUE);
3182   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3183
3184   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3185   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3186
3187   return;
3188
3189 shutdown:
3190   {
3191     GST_DEBUG ("ignoring, we are shutting down");
3192     return;
3193   }
3194 }
3195
3196 static void
3197 payload_pad_removed (GstElement * element, GstPad * pad,
3198     GstRtpBinStream * stream)
3199 {
3200   GstRtpBin *rtpbin;
3201   GstPad *gpad;
3202
3203   rtpbin = stream->bin;
3204
3205   GST_DEBUG ("payload pad removed");
3206
3207   GST_RTP_BIN_DYN_LOCK (rtpbin);
3208   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
3209     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
3210
3211     gst_pad_set_active (gpad, FALSE);
3212     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3213   }
3214   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3215 }
3216
3217 static GstCaps *
3218 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
3219 {
3220   GstRtpBin *rtpbin;
3221   GstCaps *caps;
3222
3223   rtpbin = session->bin;
3224
3225   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt,
3226       session->id);
3227
3228   caps = get_pt_map (session, pt);
3229   if (!caps)
3230     goto no_caps;
3231
3232   return caps;
3233
3234   /* ERRORS */
3235 no_caps:
3236   {
3237     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
3238     return NULL;
3239   }
3240 }
3241
3242 static void
3243 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
3244 {
3245   GST_DEBUG_OBJECT (session->bin,
3246       "emiting signal for pt type changed to %u in session %u", pt,
3247       session->id);
3248
3249   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
3250       0, session->id, pt);
3251 }
3252
3253 /* emited when caps changed for the session */
3254 static void
3255 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
3256 {
3257   GstRtpBin *bin;
3258   GstCaps *caps;
3259   gint payload;
3260   const GstStructure *s;
3261
3262   bin = session->bin;
3263
3264   g_object_get (pad, "caps", &caps, NULL);
3265
3266   if (caps == NULL)
3267     return;
3268
3269   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
3270
3271   s = gst_caps_get_structure (caps, 0);
3272
3273   /* get payload, finish when it's not there */
3274   if (!gst_structure_get_int (s, "payload", &payload)) {
3275     gst_caps_unref (caps);
3276     return;
3277   }
3278
3279   GST_RTP_SESSION_LOCK (session);
3280   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
3281   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
3282   GST_RTP_SESSION_UNLOCK (session);
3283 }
3284
3285 /* a new pad (SSRC) was created in @session */
3286 static void
3287 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
3288     GstRtpBinSession * session)
3289 {
3290   GstRtpBin *rtpbin;
3291   GstRtpBinStream *stream;
3292   GstPad *sinkpad, *srcpad;
3293   gchar *padname;
3294
3295   rtpbin = session->bin;
3296
3297   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
3298       GST_DEBUG_PAD_NAME (pad));
3299
3300   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3301
3302   GST_RTP_SESSION_LOCK (session);
3303
3304   /* create new stream */
3305   stream = create_stream (session, ssrc);
3306   if (!stream)
3307     goto no_stream;
3308
3309   /* get pad and link */
3310   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
3311   padname = g_strdup_printf ("src_%u", ssrc);
3312   srcpad = gst_element_get_static_pad (element, padname);
3313   g_free (padname);
3314   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
3315   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
3316   gst_object_unref (sinkpad);
3317   gst_object_unref (srcpad);
3318
3319   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
3320   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
3321   srcpad = gst_element_get_static_pad (element, padname);
3322   g_free (padname);
3323   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
3324   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
3325   gst_object_unref (sinkpad);
3326   gst_object_unref (srcpad);
3327
3328   /* connect to the RTCP sync signal from the jitterbuffer */
3329   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
3330   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
3331       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
3332
3333   if (stream->demux) {
3334     /* connect to the new-pad signal of the payload demuxer, this will expose the
3335      * new pad by ghosting it. */
3336     stream->demux_newpad_sig = g_signal_connect (stream->demux,
3337         "new-payload-type", (GCallback) new_payload_found, stream);
3338     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
3339         "pad-removed", (GCallback) payload_pad_removed, stream);
3340
3341     /* connect to the request-pt-map signal. This signal will be emited by the
3342      * demuxer so that it can apply a proper caps on the buffers for the
3343      * depayloaders. */
3344     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
3345         "request-pt-map", (GCallback) pt_map_requested, session);
3346     /* connect to the  signal so it can be forwarded. */
3347     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
3348         "payload-type-change", (GCallback) payload_type_change, session);
3349   } else {
3350     /* add rtpjitterbuffer src pad to pads */
3351     GstElementClass *klass;
3352     GstPadTemplate *templ;
3353     gchar *padname;
3354     GstPad *gpad, *pad;
3355
3356     pad = gst_element_get_static_pad (stream->buffer, "src");
3357
3358     /* ghost the pad to the parent */
3359     klass = GST_ELEMENT_GET_CLASS (rtpbin);
3360     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3361     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3362         stream->session->id, stream->ssrc, 255);
3363     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3364     g_free (padname);
3365
3366     gst_pad_set_active (gpad, TRUE);
3367     gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3368     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3369
3370     gst_object_unref (pad);
3371   }
3372
3373   GST_RTP_SESSION_UNLOCK (session);
3374   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3375
3376   return;
3377
3378   /* ERRORS */
3379 shutdown:
3380   {
3381     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
3382     return;
3383   }
3384 no_stream:
3385   {
3386     GST_RTP_SESSION_UNLOCK (session);
3387     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3388     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
3389     return;
3390   }
3391 }
3392
3393 static void
3394 session_maybe_create_bundle_demuxer (GstRtpBinSession * session)
3395 {
3396   GstRtpBin *rtpbin;
3397
3398   if (session->bundle_demux)
3399     return;
3400
3401   rtpbin = session->bin;
3402   if (g_signal_has_handler_pending (rtpbin,
3403           gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, TRUE)) {
3404     GST_DEBUG_OBJECT (rtpbin, "Adding a bundle SSRC demuxer to session %u",
3405         session->id);
3406     session->bundle_demux = gst_element_factory_make ("rtpssrcdemux", NULL);
3407     session->bundle_demux_newpad_sig = g_signal_connect (session->bundle_demux,
3408         "new-ssrc-pad", (GCallback) new_bundled_ssrc_pad_found, session);
3409
3410     gst_bin_add (GST_BIN_CAST (rtpbin), session->bundle_demux);
3411     gst_element_sync_state_with_parent (session->bundle_demux);
3412   } else {
3413     GST_DEBUG_OBJECT (rtpbin,
3414         "No handler for the on-bundled-ssrc signal so no need for a bundle SSRC demuxer in session %u",
3415         session->id);
3416   }
3417 }
3418
3419 static GstPad *
3420 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session,
3421     gboolean bundle_demuxer_needed)
3422 {
3423   guint sessid = session->id;
3424   GstPad *recv_rtp_sink;
3425   GstPad *funnel_src;
3426   GstElement *decoder;
3427
3428   g_assert (!session->recv_rtp_sink);
3429
3430   /* get recv_rtp pad and store */
3431   session->recv_rtp_sink =
3432       gst_element_get_request_pad (session->session, "recv_rtp_sink");
3433   if (session->recv_rtp_sink == NULL)
3434     goto pad_failed;
3435
3436   g_signal_connect (session->recv_rtp_sink, "notify::caps",
3437       (GCallback) caps_changed, session);
3438
3439   if (bundle_demuxer_needed)
3440     session_maybe_create_bundle_demuxer (session);
3441
3442   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
3443   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
3444   if (decoder) {
3445     GstPad *decsrc, *decsink;
3446     GstPadLinkReturn ret;
3447
3448     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
3449     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
3450     if (decsink == NULL)
3451       goto dec_sink_failed;
3452
3453     recv_rtp_sink = decsink;
3454
3455     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
3456     if (decsrc == NULL)
3457       goto dec_src_failed;
3458
3459     if (session->bundle_demux) {
3460       GstPad *demux_sink;
3461       demux_sink = gst_element_get_static_pad (session->bundle_demux, "sink");
3462       ret = gst_pad_link (decsrc, demux_sink);
3463       gst_object_unref (demux_sink);
3464     } else {
3465       ret = gst_pad_link (decsrc, session->recv_rtp_sink);
3466     }
3467     gst_object_unref (decsrc);
3468
3469     if (ret != GST_PAD_LINK_OK)
3470       goto dec_link_failed;
3471
3472   } else {
3473     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
3474     if (session->bundle_demux) {
3475       recv_rtp_sink =
3476           gst_element_get_static_pad (session->bundle_demux, "sink");
3477     } else {
3478       recv_rtp_sink =
3479           gst_element_get_request_pad (session->rtp_funnel, "sink_%u");
3480     }
3481   }
3482
3483   funnel_src = gst_element_get_static_pad (session->rtp_funnel, "src");
3484   gst_pad_link (funnel_src, session->recv_rtp_sink);
3485   gst_object_unref (funnel_src);
3486
3487   return recv_rtp_sink;
3488
3489   /* ERRORS */
3490 pad_failed:
3491   {
3492     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
3493     return NULL;
3494   }
3495 dec_sink_failed:
3496   {
3497     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
3498     return NULL;
3499   }
3500 dec_src_failed:
3501   {
3502     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
3503     gst_object_unref (recv_rtp_sink);
3504     return NULL;
3505   }
3506 dec_link_failed:
3507   {
3508     g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid);
3509     gst_object_unref (recv_rtp_sink);
3510     return NULL;
3511   }
3512 }
3513
3514 static void
3515 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
3516     guint sessid)
3517 {
3518   GstElement *aux;
3519   GstPad *recv_rtp_src;
3520
3521   g_assert (!session->recv_rtp_src);
3522
3523   session->recv_rtp_src =
3524       gst_element_get_static_pad (session->session, "recv_rtp_src");
3525   if (session->recv_rtp_src == NULL)
3526     goto pad_failed;
3527
3528   /* find out if we need AUX elements or if we can go into the SSRC demuxer
3529    * directly */
3530   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
3531   if (aux) {
3532     gchar *pname;
3533     GstPad *auxsink;
3534     GstPadLinkReturn ret;
3535
3536     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
3537
3538     pname = g_strdup_printf ("sink_%u", sessid);
3539     auxsink = gst_element_get_static_pad (aux, pname);
3540     g_free (pname);
3541     if (auxsink == NULL)
3542       goto aux_sink_failed;
3543
3544     ret = gst_pad_link (session->recv_rtp_src, auxsink);
3545     gst_object_unref (auxsink);
3546     if (ret != GST_PAD_LINK_OK)
3547       goto aux_link_failed;
3548
3549     /* this can be NULL when this AUX element is not to be linked to
3550      * an SSRC demuxer */
3551     pname = g_strdup_printf ("src_%u", sessid);
3552     recv_rtp_src = gst_element_get_static_pad (aux, pname);
3553     g_free (pname);
3554   } else {
3555     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
3556   }
3557
3558   if (recv_rtp_src) {
3559     GstPad *sinkdpad;
3560
3561     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
3562     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
3563     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
3564     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3565     gst_object_unref (sinkdpad);
3566     gst_object_unref (recv_rtp_src);
3567
3568     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
3569     session->demux_newpad_sig = g_signal_connect (session->demux,
3570         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
3571     session->demux_padremoved_sig = g_signal_connect (session->demux,
3572         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
3573   }
3574
3575   return;
3576
3577 pad_failed:
3578   {
3579     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
3580     return;
3581   }
3582 aux_sink_failed:
3583   {
3584     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
3585     return;
3586   }
3587 aux_link_failed:
3588   {
3589     g_warning ("rtpbin: failed to link AUX pad to session %u", sessid);
3590     return;
3591   }
3592 }
3593
3594 /* Create a pad for receiving RTP for the session in @name. Must be called with
3595  * RTP_BIN_LOCK.
3596  */
3597 static GstPad *
3598 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3599 {
3600   guint sessid;
3601   GstRtpBinSession *session;
3602   GstPad *recv_rtp_sink;
3603
3604   /* first get the session number */
3605   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
3606     goto no_name;
3607
3608   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
3609
3610   /* get or create session */
3611   session = find_session_by_id (rtpbin, sessid);
3612   if (!session) {
3613     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
3614     /* create session now */
3615     session = create_session (rtpbin, sessid);
3616     if (session == NULL)
3617       goto create_error;
3618   }
3619
3620   /* check if pad was requested */
3621   if (session->recv_rtp_sink_ghost != NULL)
3622     return session->recv_rtp_sink_ghost;
3623
3624   /* setup the session sink pad */
3625   recv_rtp_sink = complete_session_sink (rtpbin, session, TRUE);
3626   if (!recv_rtp_sink)
3627     goto session_sink_failed;
3628
3629
3630   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
3631   session->recv_rtp_sink_ghost =
3632       gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ);
3633   gst_object_unref (recv_rtp_sink);
3634   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
3635   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
3636
3637   complete_session_receiver (rtpbin, session, sessid);
3638
3639   return session->recv_rtp_sink_ghost;
3640
3641   /* ERRORS */
3642 no_name:
3643   {
3644     g_warning ("rtpbin: invalid name given");
3645     return NULL;
3646   }
3647 create_error:
3648   {
3649     /* create_session already warned */
3650     return NULL;
3651   }
3652 session_sink_failed:
3653   {
3654     /* warning already done */
3655     return NULL;
3656   }
3657 }
3658
3659 static void
3660 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3661 {
3662   if (session->demux_newpad_sig) {
3663     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
3664     session->demux_newpad_sig = 0;
3665   }
3666   if (session->demux_padremoved_sig) {
3667     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
3668     session->demux_padremoved_sig = 0;
3669   }
3670   if (session->bundle_demux_newpad_sig) {
3671     g_signal_handler_disconnect (session->bundle_demux,
3672         session->bundle_demux_newpad_sig);
3673     session->bundle_demux_newpad_sig = 0;
3674   }
3675   if (session->recv_rtp_src) {
3676     gst_object_unref (session->recv_rtp_src);
3677     session->recv_rtp_src = NULL;
3678   }
3679   if (session->recv_rtp_sink) {
3680     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
3681     gst_object_unref (session->recv_rtp_sink);
3682     session->recv_rtp_sink = NULL;
3683   }
3684   if (session->recv_rtp_sink_ghost) {
3685     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
3686     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3687         session->recv_rtp_sink_ghost);
3688     session->recv_rtp_sink_ghost = NULL;
3689   }
3690 }
3691
3692 static GstPad *
3693 complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
3694     guint sessid, gboolean bundle_demuxer_needed)
3695 {
3696   GstElement *decoder;
3697   GstPad *sinkdpad;
3698   GstPad *decsink = NULL;
3699   GstPad *funnel_src;
3700
3701   /* get recv_rtp pad and store */
3702   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
3703   session->recv_rtcp_sink =
3704       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
3705   if (session->recv_rtcp_sink == NULL)
3706     goto pad_failed;
3707
3708   if (bundle_demuxer_needed)
3709     session_maybe_create_bundle_demuxer (session);
3710
3711   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
3712   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
3713   if (decoder) {
3714     GstPad *decsrc;
3715     GstPadLinkReturn ret;
3716
3717     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
3718     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
3719     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
3720
3721     if (decsink == NULL)
3722       goto dec_sink_failed;
3723
3724     if (decsrc == NULL)
3725       goto dec_src_failed;
3726
3727     if (session->bundle_demux) {
3728       GstPad *demux_sink;
3729       demux_sink =
3730           gst_element_get_static_pad (session->bundle_demux, "rtcp_sink");
3731       ret = gst_pad_link (decsrc, demux_sink);
3732       gst_object_unref (demux_sink);
3733     } else {
3734       ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
3735     }
3736     gst_object_unref (decsrc);
3737
3738     if (ret != GST_PAD_LINK_OK)
3739       goto dec_link_failed;
3740   } else {
3741     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
3742     if (session->bundle_demux) {
3743       decsink = gst_element_get_static_pad (session->bundle_demux, "rtcp_sink");
3744     } else {
3745       decsink = gst_element_get_request_pad (session->rtcp_funnel, "sink_%u");
3746     }
3747   }
3748
3749   /* get srcpad, link to SSRCDemux */
3750   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
3751   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
3752   if (session->sync_src == NULL)
3753     goto src_pad_failed;
3754
3755   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
3756   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
3757   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3758   gst_object_unref (sinkdpad);
3759
3760   funnel_src = gst_element_get_static_pad (session->rtcp_funnel, "src");
3761   gst_pad_link (funnel_src, session->recv_rtcp_sink);
3762   gst_object_unref (funnel_src);
3763
3764   return decsink;
3765
3766 pad_failed:
3767   {
3768     g_warning ("rtpbin: failed to get session rtcp_sink pad");
3769     return NULL;
3770   }
3771 dec_sink_failed:
3772   {
3773     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
3774     return NULL;
3775   }
3776 dec_src_failed:
3777   {
3778     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
3779     goto cleanup;
3780   }
3781 dec_link_failed:
3782   {
3783     g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid);
3784     goto cleanup;
3785   }
3786 src_pad_failed:
3787   {
3788     g_warning ("rtpbin: failed to get session sync_src pad");
3789   }
3790
3791 cleanup:
3792   gst_object_unref (decsink);
3793   return NULL;
3794 }
3795
3796 /* Create a pad for receiving RTCP for the session in @name. Must be called with
3797  * RTP_BIN_LOCK.
3798  */
3799 static GstPad *
3800 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
3801     const gchar * name)
3802 {
3803   guint sessid;
3804   GstRtpBinSession *session;
3805   GstPad *decsink = NULL;
3806
3807   /* first get the session number */
3808   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
3809     goto no_name;
3810
3811   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
3812
3813   /* get or create the session */
3814   session = find_session_by_id (rtpbin, sessid);
3815   if (!session) {
3816     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
3817     /* create session now */
3818     session = create_session (rtpbin, sessid);
3819     if (session == NULL)
3820       goto create_error;
3821   }
3822
3823   /* check if pad was requested */
3824   if (session->recv_rtcp_sink_ghost != NULL)
3825     return session->recv_rtcp_sink_ghost;
3826
3827   decsink = complete_session_rtcp (rtpbin, session, sessid, TRUE);
3828   if (!decsink)
3829     goto create_error;
3830
3831   session->recv_rtcp_sink_ghost =
3832       gst_ghost_pad_new_from_template (name, decsink, templ);
3833   gst_object_unref (decsink);
3834   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
3835   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
3836       session->recv_rtcp_sink_ghost);
3837
3838   return session->recv_rtcp_sink_ghost;
3839
3840   /* ERRORS */
3841 no_name:
3842   {
3843     g_warning ("rtpbin: invalid name given");
3844     return NULL;
3845   }
3846 create_error:
3847   {
3848     /* create_session already warned */
3849     return NULL;
3850   }
3851 }
3852
3853 static void
3854 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3855 {
3856   if (session->recv_rtcp_sink_ghost) {
3857     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
3858     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3859         session->recv_rtcp_sink_ghost);
3860     session->recv_rtcp_sink_ghost = NULL;
3861   }
3862   if (session->sync_src) {
3863     /* releasing the request pad should also unref the sync pad */
3864     gst_object_unref (session->sync_src);
3865     session->sync_src = NULL;
3866   }
3867   if (session->recv_rtcp_sink) {
3868     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
3869     gst_object_unref (session->recv_rtcp_sink);
3870     session->recv_rtcp_sink = NULL;
3871   }
3872 }
3873
3874 static gboolean
3875 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
3876 {
3877   gchar *gname;
3878   guint sessid = session->id;
3879   GstPad *send_rtp_src;
3880   GstElement *encoder;
3881   GstElementClass *klass;
3882   GstPadTemplate *templ;
3883
3884   /* get srcpad */
3885   session->send_rtp_src =
3886       gst_element_get_static_pad (session->session, "send_rtp_src");
3887   if (session->send_rtp_src == NULL)
3888     goto no_srcpad;
3889
3890   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
3891   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
3892   if (encoder) {
3893     gchar *ename;
3894     GstPad *encsrc, *encsink;
3895     GstPadLinkReturn ret;
3896
3897     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
3898     ename = g_strdup_printf ("rtp_src_%u", sessid);
3899     encsrc = gst_element_get_static_pad (encoder, ename);
3900     g_free (ename);
3901
3902     if (encsrc == NULL)
3903       goto enc_src_failed;
3904
3905     send_rtp_src = encsrc;
3906
3907     ename = g_strdup_printf ("rtp_sink_%u", sessid);
3908     encsink = gst_element_get_static_pad (encoder, ename);
3909     g_free (ename);
3910     if (encsink == NULL)
3911       goto enc_sink_failed;
3912
3913     ret = gst_pad_link (session->send_rtp_src, encsink);
3914     gst_object_unref (encsink);
3915
3916     if (ret != GST_PAD_LINK_OK)
3917       goto enc_link_failed;
3918   } else {
3919     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
3920     send_rtp_src = gst_object_ref (session->send_rtp_src);
3921   }
3922
3923   /* ghost the new source pad */
3924   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3925   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
3926   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
3927   session->send_rtp_src_ghost =
3928       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
3929   gst_object_unref (send_rtp_src);
3930   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
3931   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
3932       session->send_rtp_src_ghost);
3933   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
3934   g_free (gname);
3935
3936   return TRUE;
3937
3938   /* ERRORS */
3939 no_srcpad:
3940   {
3941     g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid);
3942     return FALSE;
3943   }
3944 enc_src_failed:
3945   {
3946     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
3947     return FALSE;
3948   }
3949 enc_sink_failed:
3950   {
3951     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
3952     gst_object_unref (send_rtp_src);
3953     return FALSE;
3954   }
3955 enc_link_failed:
3956   {
3957     g_warning ("rtpbin: failed to link rtp encoder for session %u", sessid);
3958     gst_object_unref (send_rtp_src);
3959     return FALSE;
3960   }
3961 }
3962
3963 static gboolean
3964 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
3965 {
3966   GstPad *pad;
3967   gchar *name;
3968   guint sessid;
3969   GstRtpBinSession *session = user_data, *newsess;
3970   GstRtpBin *rtpbin = session->bin;
3971   GstPadLinkReturn ret;
3972
3973   pad = g_value_get_object (item);
3974   name = gst_pad_get_name (pad);
3975
3976   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
3977     goto no_name;
3978
3979   g_free (name);
3980
3981   newsess = find_session_by_id (rtpbin, sessid);
3982   if (newsess == NULL) {
3983     /* create new session */
3984     newsess = create_session (rtpbin, sessid);
3985     if (newsess == NULL)
3986       goto create_error;
3987   } else if (newsess->send_rtp_sink != NULL)
3988     goto existing_session;
3989
3990   /* get send_rtp pad and store */
3991   newsess->send_rtp_sink =
3992       gst_element_get_request_pad (newsess->session, "send_rtp_sink");
3993   if (newsess->send_rtp_sink == NULL)
3994     goto pad_failed;
3995
3996   ret = gst_pad_link (pad, newsess->send_rtp_sink);
3997   if (ret != GST_PAD_LINK_OK)
3998     goto aux_link_failed;
3999
4000   if (!complete_session_src (rtpbin, newsess))
4001     goto session_src_failed;
4002
4003   return TRUE;
4004
4005   /* ERRORS */
4006 no_name:
4007   {
4008     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
4009     g_free (name);
4010     return TRUE;
4011   }
4012 create_error:
4013   {
4014     /* create_session already warned */
4015     return FALSE;
4016   }
4017 existing_session:
4018   {
4019     g_warning ("rtpbin: session %u is already a sender", sessid);
4020     return FALSE;
4021   }
4022 pad_failed:
4023   {
4024     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4025     return FALSE;
4026   }
4027 aux_link_failed:
4028   {
4029     g_warning ("rtpbin: failed to link AUX for session %u", sessid);
4030     return FALSE;
4031   }
4032 session_src_failed:
4033   {
4034     g_warning ("rtpbin: failed to complete AUX for session %u", sessid);
4035     return FALSE;
4036   }
4037 }
4038
4039 static gboolean
4040 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
4041     GstElement * aux)
4042 {
4043   GstIterator *it;
4044   GValue result = { 0, };
4045   GstIteratorResult res;
4046
4047   it = gst_element_iterate_src_pads (aux);
4048   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
4049   gst_iterator_free (it);
4050
4051   return res == GST_ITERATOR_DONE;
4052 }
4053
4054 /* Create a pad for sending RTP for the session in @name. Must be called with
4055  * RTP_BIN_LOCK.
4056  */
4057 static GstPad *
4058 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4059 {
4060   gchar *pname;
4061   guint sessid;
4062   GstPad *send_rtp_sink;
4063   GstElement *aux;
4064   GstRtpBinSession *session;
4065
4066   /* first get the session number */
4067   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
4068     goto no_name;
4069
4070   /* get or create session */
4071   session = find_session_by_id (rtpbin, sessid);
4072   if (!session) {
4073     /* create session now */
4074     session = create_session (rtpbin, sessid);
4075     if (session == NULL)
4076       goto create_error;
4077   }
4078
4079   /* check if pad was requested */
4080   if (session->send_rtp_sink_ghost != NULL)
4081     return session->send_rtp_sink_ghost;
4082
4083   /* check if we are already using this session as a sender */
4084   if (session->send_rtp_sink != NULL)
4085     goto existing_session;
4086
4087   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
4088   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
4089   if (aux) {
4090     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
4091     if (!setup_aux_sender (rtpbin, session, aux))
4092       goto aux_session_failed;
4093
4094     pname = g_strdup_printf ("sink_%u", sessid);
4095     send_rtp_sink = gst_element_get_static_pad (aux, pname);
4096     g_free (pname);
4097
4098     if (send_rtp_sink == NULL)
4099       goto aux_sink_failed;
4100   } else {
4101     /* get send_rtp pad and store */
4102     session->send_rtp_sink =
4103         gst_element_get_request_pad (session->session, "send_rtp_sink");
4104     if (session->send_rtp_sink == NULL)
4105       goto pad_failed;
4106
4107     if (!complete_session_src (rtpbin, session))
4108       goto session_src_failed;
4109
4110     send_rtp_sink = gst_object_ref (session->send_rtp_sink);
4111   }
4112
4113   session->send_rtp_sink_ghost =
4114       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
4115   gst_object_unref (send_rtp_sink);
4116   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
4117   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
4118
4119   return session->send_rtp_sink_ghost;
4120
4121   /* ERRORS */
4122 no_name:
4123   {
4124     g_warning ("rtpbin: invalid name given");
4125     return NULL;
4126   }
4127 create_error:
4128   {
4129     /* create_session already warned */
4130     return NULL;
4131   }
4132 existing_session:
4133   {
4134     g_warning ("rtpbin: session %u is already in use", sessid);
4135     return NULL;
4136   }
4137 aux_session_failed:
4138   {
4139     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4140     return NULL;
4141   }
4142 aux_sink_failed:
4143   {
4144     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4145     return NULL;
4146   }
4147 pad_failed:
4148   {
4149     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4150     return NULL;
4151   }
4152 session_src_failed:
4153   {
4154     g_warning ("rtpbin: failed to setup source pads for session %u", sessid);
4155     return NULL;
4156   }
4157 }
4158
4159 static void
4160 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4161 {
4162   if (session->send_rtp_src_ghost) {
4163     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
4164     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4165         session->send_rtp_src_ghost);
4166     session->send_rtp_src_ghost = NULL;
4167   }
4168   if (session->send_rtp_src) {
4169     gst_object_unref (session->send_rtp_src);
4170     session->send_rtp_src = NULL;
4171   }
4172   if (session->send_rtp_sink) {
4173     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
4174         session->send_rtp_sink);
4175     gst_object_unref (session->send_rtp_sink);
4176     session->send_rtp_sink = NULL;
4177   }
4178   if (session->send_rtp_sink_ghost) {
4179     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
4180     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4181         session->send_rtp_sink_ghost);
4182     session->send_rtp_sink_ghost = NULL;
4183   }
4184 }
4185
4186 /* Create a pad for sending RTCP for the session in @name. Must be called with
4187  * RTP_BIN_LOCK.
4188  */
4189 static GstPad *
4190 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4191 {
4192   guint sessid;
4193   GstPad *encsrc;
4194   GstElement *encoder;
4195   GstRtpBinSession *session;
4196
4197   /* first get the session number */
4198   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
4199     goto no_name;
4200
4201   /* get or create session */
4202   session = find_session_by_id (rtpbin, sessid);
4203   if (!session)
4204     goto no_session;
4205
4206   /* check if pad was requested */
4207   if (session->send_rtcp_src_ghost != NULL)
4208     return session->send_rtcp_src_ghost;
4209
4210   /* get rtcp_src pad and store */
4211   session->send_rtcp_src =
4212       gst_element_get_request_pad (session->session, "send_rtcp_src");
4213   if (session->send_rtcp_src == NULL)
4214     goto pad_failed;
4215
4216   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
4217   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
4218   if (encoder) {
4219     gchar *ename;
4220     GstPad *encsink;
4221     GstPadLinkReturn ret;
4222
4223     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
4224
4225     ename = g_strdup_printf ("rtcp_src_%u", sessid);
4226     encsrc = gst_element_get_static_pad (encoder, ename);
4227     g_free (ename);
4228     if (encsrc == NULL)
4229       goto enc_src_failed;
4230
4231     ename = g_strdup_printf ("rtcp_sink_%u", sessid);
4232     encsink = gst_element_get_static_pad (encoder, ename);
4233     g_free (ename);
4234     if (encsink == NULL)
4235       goto enc_sink_failed;
4236
4237     ret = gst_pad_link (session->send_rtcp_src, encsink);
4238     gst_object_unref (encsink);
4239
4240     if (ret != GST_PAD_LINK_OK)
4241       goto enc_link_failed;
4242   } else {
4243     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
4244     encsrc = gst_object_ref (session->send_rtcp_src);
4245   }
4246
4247   session->send_rtcp_src_ghost =
4248       gst_ghost_pad_new_from_template (name, encsrc, templ);
4249   gst_object_unref (encsrc);
4250   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
4251   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
4252
4253   return session->send_rtcp_src_ghost;
4254
4255   /* ERRORS */
4256 no_name:
4257   {
4258     g_warning ("rtpbin: invalid name given");
4259     return NULL;
4260   }
4261 no_session:
4262   {
4263     g_warning ("rtpbin: session with id %d does not exist", sessid);
4264     return NULL;
4265   }
4266 pad_failed:
4267   {
4268     g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid);
4269     return NULL;
4270   }
4271 enc_src_failed:
4272   {
4273     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
4274     return NULL;
4275   }
4276 enc_sink_failed:
4277   {
4278     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
4279     gst_object_unref (encsrc);
4280     return NULL;
4281   }
4282 enc_link_failed:
4283   {
4284     g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid);
4285     gst_object_unref (encsrc);
4286     return NULL;
4287   }
4288 }
4289
4290 static void
4291 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4292 {
4293   if (session->send_rtcp_src_ghost) {
4294     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
4295     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4296         session->send_rtcp_src_ghost);
4297     session->send_rtcp_src_ghost = NULL;
4298   }
4299   if (session->send_rtcp_src) {
4300     gst_element_release_request_pad (session->session, session->send_rtcp_src);
4301     gst_object_unref (session->send_rtcp_src);
4302     session->send_rtcp_src = NULL;
4303   }
4304 }
4305
4306 /* If the requested name is NULL we should create a name with
4307  * the session number assuming we want the lowest posible session
4308  * with a free pad like the template */
4309 static gchar *
4310 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
4311 {
4312   gboolean name_found = FALSE;
4313   gint session = 0;
4314   GstIterator *pad_it = NULL;
4315   gchar *pad_name = NULL;
4316   GValue data = { 0, };
4317
4318   GST_DEBUG_OBJECT (element, "find a free pad name for template");
4319   while (!name_found) {
4320     gboolean done = FALSE;
4321
4322     g_free (pad_name);
4323     pad_name = g_strdup_printf (templ->name_template, session++);
4324     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
4325     name_found = TRUE;
4326     while (!done) {
4327       switch (gst_iterator_next (pad_it, &data)) {
4328         case GST_ITERATOR_OK:
4329         {
4330           GstPad *pad;
4331           gchar *name;
4332
4333           pad = g_value_get_object (&data);
4334           name = gst_pad_get_name (pad);
4335
4336           if (strcmp (name, pad_name) == 0) {
4337             done = TRUE;
4338             name_found = FALSE;
4339           }
4340           g_free (name);
4341           g_value_reset (&data);
4342           break;
4343         }
4344         case GST_ITERATOR_ERROR:
4345         case GST_ITERATOR_RESYNC:
4346           /* restart iteration */
4347           done = TRUE;
4348           name_found = FALSE;
4349           session = 0;
4350           break;
4351         case GST_ITERATOR_DONE:
4352           done = TRUE;
4353           break;
4354       }
4355     }
4356     g_value_unset (&data);
4357     gst_iterator_free (pad_it);
4358   }
4359
4360   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
4361   return pad_name;
4362 }
4363
4364 /*
4365  */
4366 static GstPad *
4367 gst_rtp_bin_request_new_pad (GstElement * element,
4368     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
4369 {
4370   GstRtpBin *rtpbin;
4371   GstElementClass *klass;
4372   GstPad *result;
4373
4374   gchar *pad_name = NULL;
4375
4376   g_return_val_if_fail (templ != NULL, NULL);
4377   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
4378
4379   rtpbin = GST_RTP_BIN (element);
4380   klass = GST_ELEMENT_GET_CLASS (element);
4381
4382   GST_RTP_BIN_LOCK (rtpbin);
4383
4384   if (name == NULL) {
4385     /* use a free pad name */
4386     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
4387   } else {
4388     /* use the provided name */
4389     pad_name = g_strdup (name);
4390   }
4391
4392   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
4393
4394   /* figure out the template */
4395   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
4396     result = create_recv_rtp (rtpbin, templ, pad_name);
4397   } else if (templ == gst_element_class_get_pad_template (klass,
4398           "recv_rtcp_sink_%u")) {
4399     result = create_recv_rtcp (rtpbin, templ, pad_name);
4400   } else if (templ == gst_element_class_get_pad_template (klass,
4401           "send_rtp_sink_%u")) {
4402     result = create_send_rtp (rtpbin, templ, pad_name);
4403   } else if (templ == gst_element_class_get_pad_template (klass,
4404           "send_rtcp_src_%u")) {
4405     result = create_rtcp (rtpbin, templ, pad_name);
4406   } else
4407     goto wrong_template;
4408
4409   g_free (pad_name);
4410   GST_RTP_BIN_UNLOCK (rtpbin);
4411
4412   return result;
4413
4414   /* ERRORS */
4415 wrong_template:
4416   {
4417     g_free (pad_name);
4418     GST_RTP_BIN_UNLOCK (rtpbin);
4419     g_warning ("rtpbin: this is not our template");
4420     return NULL;
4421   }
4422 }
4423
4424 static void
4425 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
4426 {
4427   GstRtpBinSession *session;
4428   GstRtpBin *rtpbin;
4429
4430   g_return_if_fail (GST_IS_GHOST_PAD (pad));
4431   g_return_if_fail (GST_IS_RTP_BIN (element));
4432
4433   rtpbin = GST_RTP_BIN (element);
4434
4435   GST_RTP_BIN_LOCK (rtpbin);
4436   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
4437       GST_DEBUG_PAD_NAME (pad));
4438
4439   if (!(session = find_session_by_pad (rtpbin, pad)))
4440     goto unknown_pad;
4441
4442   if (session->recv_rtp_sink_ghost == pad) {
4443     remove_recv_rtp (rtpbin, session);
4444   } else if (session->recv_rtcp_sink_ghost == pad) {
4445     remove_recv_rtcp (rtpbin, session);
4446   } else if (session->send_rtp_sink_ghost == pad) {
4447     remove_send_rtp (rtpbin, session);
4448   } else if (session->send_rtcp_src_ghost == pad) {
4449     remove_rtcp (rtpbin, session);
4450   }
4451
4452   /* no more request pads, free the complete session */
4453   if (session->recv_rtp_sink_ghost == NULL
4454       && session->recv_rtcp_sink_ghost == NULL
4455       && session->send_rtp_sink_ghost == NULL
4456       && session->send_rtcp_src_ghost == NULL) {
4457     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
4458     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
4459     free_session (session, rtpbin);
4460   }
4461   GST_RTP_BIN_UNLOCK (rtpbin);
4462
4463   return;
4464
4465   /* ERROR */
4466 unknown_pad:
4467   {
4468     GST_RTP_BIN_UNLOCK (rtpbin);
4469     g_warning ("rtpbin: %s:%s is not one of our request pads",
4470         GST_DEBUG_PAD_NAME (pad));
4471     return;
4472   }
4473 }