rtpbin: Also log local and SR RTP running times when doing ntp-sync=true
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * #GstRtpBin can also demultiplex incoming bundled streams. The first
57  * #GstRtpSession will have a #GstRtpSsrcDemux element splitting the streams
58  * based on their SSRC and potentially dispatched to a different #GstRtpSession.
59  * Because retransmission SSRCs need to be merged with the corresponding media
60  * stream the #GstRtpBin::on-bundled-ssrc signal is emitted so that the
61  * application can find out to which session the SSRC belongs.
62  *
63  * The session manager needs the clock-rate of the payload types it is handling
64  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
65  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
66  * signal.
67  *
68  * Access to the internal statistics of rtpbin is provided with the
69  * get-internal-session property. This action signal gives access to the
70  * RTPSession object which further provides action signals to retrieve the
71  * internal source and other sources.
72  *
73  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
74  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
75  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
76  * and decoders in order to support SRTP. The encoders must provide the pads
77  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
78  * RTCP. The session number will be used in the pad name. The decoders must provide
79  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
80  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
81  * internally.
82  *
83  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
84  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
85  * used to create or merge additional RTP streams. AUX elements are needed to
86  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
87  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
88  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
89  * and the pad will be linked to the session send_rtp_sink pad. Each session will
90  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
91  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
92  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
93  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
94  *
95  * <refsect2>
96  * <title>Example pipelines</title>
97  * |[
98  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
99  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
100  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
101  * |[
102  * gst-launch-1.0 rtpbin name=rtpbin \
103  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
104  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
105  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
106  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
107  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
108  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
109  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
110  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
111  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
112  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
113  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
114  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
115  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
116  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
117  * is received on port 5007. Since RTCP packets from the sender should be sent
118  * as soon as possible and do not participate in preroll, sync=false and
119  * async=false is configured on udpsink
120  * |[
121  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
122  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
123  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
124  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
125  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
126  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
127  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
128  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
129  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
130  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
131  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
132  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
133  * decode and display the video.
134  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
135  * decode and play the audio.
136  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
137  * session 1 on port 5003. These packets will be used for session management and
138  * synchronisation.
139  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
140  * on port 5007.
141  * </refsect2>
142  */
143
144 #ifdef HAVE_CONFIG_H
145 #include "config.h"
146 #endif
147 #include <stdio.h>
148 #include <string.h>
149
150 #include <gst/rtp/gstrtpbuffer.h>
151 #include <gst/rtp/gstrtcpbuffer.h>
152
153 #include "gstrtpbin.h"
154 #include "rtpsession.h"
155 #include "gstrtpsession.h"
156 #include "gstrtpjitterbuffer.h"
157
158 #include <gst/glib-compat-private.h>
159
160 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
161 #define GST_CAT_DEFAULT gst_rtp_bin_debug
162
163 /* sink pads */
164 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
165     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
166     GST_PAD_SINK,
167     GST_PAD_REQUEST,
168     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
169     );
170
171 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
172     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
173     GST_PAD_SINK,
174     GST_PAD_REQUEST,
175     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
176     );
177
178 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS ("application/x-rtp")
183     );
184
185 /* src pads */
186 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
187 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
188     GST_PAD_SRC,
189     GST_PAD_SOMETIMES,
190     GST_STATIC_CAPS ("application/x-rtp")
191     );
192
193 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
194     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
195     GST_PAD_SRC,
196     GST_PAD_REQUEST,
197     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
198     );
199
200 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
201     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
202     GST_PAD_SRC,
203     GST_PAD_SOMETIMES,
204     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
205     );
206
207 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
208    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
209
210 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
211 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
212
213 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
214 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
215 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
216
217 /* lock for shutdown */
218 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
219 G_STMT_START {                                   \
220   if (g_atomic_int_get (&bin->priv->shutdown))   \
221     goto label;                                  \
222   GST_RTP_BIN_DYN_LOCK (bin);                    \
223   if (g_atomic_int_get (&bin->priv->shutdown)) { \
224     GST_RTP_BIN_DYN_UNLOCK (bin);                \
225     goto label;                                  \
226   }                                              \
227 } G_STMT_END
228
229 /* unlock for shutdown */
230 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
231   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
232
233 struct _GstRtpBinPrivate
234 {
235   GMutex bin_lock;
236
237   /* lock protecting dynamic adding/removing */
238   GMutex dyn_lock;
239
240   /* if we are shutting down or not */
241   gint shutdown;
242
243   gboolean autoremove;
244
245   /* NTP time in ns of last SR sync used */
246   guint64 last_ntpnstime;
247
248   /* list of extra elements */
249   GList *elements;
250 };
251
252 /* signals and args */
253 enum
254 {
255   SIGNAL_REQUEST_PT_MAP,
256   SIGNAL_PAYLOAD_TYPE_CHANGE,
257   SIGNAL_CLEAR_PT_MAP,
258   SIGNAL_RESET_SYNC,
259   SIGNAL_GET_SESSION,
260   SIGNAL_GET_INTERNAL_SESSION,
261
262   SIGNAL_ON_NEW_SSRC,
263   SIGNAL_ON_SSRC_COLLISION,
264   SIGNAL_ON_SSRC_VALIDATED,
265   SIGNAL_ON_SSRC_ACTIVE,
266   SIGNAL_ON_SSRC_SDES,
267   SIGNAL_ON_BYE_SSRC,
268   SIGNAL_ON_BYE_TIMEOUT,
269   SIGNAL_ON_TIMEOUT,
270   SIGNAL_ON_SENDER_TIMEOUT,
271   SIGNAL_ON_NPT_STOP,
272
273   SIGNAL_REQUEST_RTP_ENCODER,
274   SIGNAL_REQUEST_RTP_DECODER,
275   SIGNAL_REQUEST_RTCP_ENCODER,
276   SIGNAL_REQUEST_RTCP_DECODER,
277
278   SIGNAL_NEW_JITTERBUFFER,
279
280   SIGNAL_REQUEST_AUX_SENDER,
281   SIGNAL_REQUEST_AUX_RECEIVER,
282
283   SIGNAL_ON_NEW_SENDER_SSRC,
284   SIGNAL_ON_SENDER_SSRC_ACTIVE,
285
286   SIGNAL_ON_BUNDLED_SSRC,
287
288   LAST_SIGNAL
289 };
290
291 #define DEFAULT_LATENCY_MS           200
292 #define DEFAULT_DROP_ON_LATENCY      FALSE
293 #define DEFAULT_SDES                 NULL
294 #define DEFAULT_DO_LOST              FALSE
295 #define DEFAULT_IGNORE_PT            FALSE
296 #define DEFAULT_NTP_SYNC             FALSE
297 #define DEFAULT_AUTOREMOVE           FALSE
298 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
299 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
300 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
301 #define DEFAULT_RTCP_SYNC_INTERVAL   0
302 #define DEFAULT_DO_SYNC_EVENT        FALSE
303 #define DEFAULT_DO_RETRANSMISSION    FALSE
304 #define DEFAULT_RTP_PROFILE          GST_RTP_PROFILE_AVP
305 #define DEFAULT_NTP_TIME_SOURCE      GST_RTP_NTP_TIME_SOURCE_NTP
306 #define DEFAULT_RTCP_SYNC_SEND_TIME  TRUE
307 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
308 #define DEFAULT_MAX_DROPOUT_TIME     60000
309 #define DEFAULT_MAX_MISORDER_TIME    2000
310 #define DEFAULT_RFC7273_SYNC         FALSE
311 #define DEFAULT_MAX_STREAMS          G_MAXUINT
312
313 enum
314 {
315   PROP_0,
316   PROP_LATENCY,
317   PROP_DROP_ON_LATENCY,
318   PROP_SDES,
319   PROP_DO_LOST,
320   PROP_IGNORE_PT,
321   PROP_NTP_SYNC,
322   PROP_RTCP_SYNC,
323   PROP_RTCP_SYNC_INTERVAL,
324   PROP_AUTOREMOVE,
325   PROP_BUFFER_MODE,
326   PROP_USE_PIPELINE_CLOCK,
327   PROP_DO_SYNC_EVENT,
328   PROP_DO_RETRANSMISSION,
329   PROP_RTP_PROFILE,
330   PROP_NTP_TIME_SOURCE,
331   PROP_RTCP_SYNC_SEND_TIME,
332   PROP_MAX_RTCP_RTP_TIME_DIFF,
333   PROP_MAX_DROPOUT_TIME,
334   PROP_MAX_MISORDER_TIME,
335   PROP_RFC7273_SYNC,
336   PROP_MAX_STREAMS
337 };
338
339 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
340 static GType
341 gst_rtp_bin_rtcp_sync_get_type (void)
342 {
343   static GType rtcp_sync_type = 0;
344   static const GEnumValue rtcp_sync_types[] = {
345     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
346     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
347     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
348     {0, NULL, NULL},
349   };
350
351   if (!rtcp_sync_type) {
352     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
353   }
354   return rtcp_sync_type;
355 }
356
357 /* helper objects */
358 typedef struct _GstRtpBinSession GstRtpBinSession;
359 typedef struct _GstRtpBinStream GstRtpBinStream;
360 typedef struct _GstRtpBinClient GstRtpBinClient;
361
362 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
363
364 static GstCaps *pt_map_requested (GstElement * element, guint pt,
365     GstRtpBinSession * session);
366 static void payload_type_change (GstElement * element, guint pt,
367     GstRtpBinSession * session);
368 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
369 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
370 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
371 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
372 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
373 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
374 static GstRtpBinSession *create_session (GstRtpBin * rtpbin, gint id);
375 static GstPad *complete_session_sink (GstRtpBin * rtpbin,
376     GstRtpBinSession * session, gboolean bundle_demuxer_needed);
377 static void
378 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
379     guint sessid);
380 static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
381     GstRtpBinSession * session, guint sessid, gboolean bundle_demuxer_needed);
382
383 /* Manages the RTP stream for one SSRC.
384  *
385  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
386  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
387  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
388  * together (see below).
389  */
390 struct _GstRtpBinStream
391 {
392   /* the SSRC of this stream */
393   guint32 ssrc;
394
395   /* parent bin */
396   GstRtpBin *bin;
397
398   /* the session this SSRC belongs to */
399   GstRtpBinSession *session;
400
401   /* the jitterbuffer of the SSRC */
402   GstElement *buffer;
403   gulong buffer_handlesync_sig;
404   gulong buffer_ptreq_sig;
405   gulong buffer_ntpstop_sig;
406   gint percent;
407
408   /* the PT demuxer of the SSRC */
409   GstElement *demux;
410   gulong demux_newpad_sig;
411   gulong demux_padremoved_sig;
412   gulong demux_ptreq_sig;
413   gulong demux_ptchange_sig;
414
415   /* if we have calculated a valid rt_delta for this stream */
416   gboolean have_sync;
417   /* mapping to local RTP and NTP time */
418   gint64 rt_delta;
419   gint64 rtp_delta;
420   /* base rtptime in gst time */
421   gint64 clock_base;
422 };
423
424 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
425 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
426
427 /* Manages the receiving end of the packets.
428  *
429  * There is one such structure for each RTP session (audio/video/...).
430  * We get the RTP/RTCP packets and stuff them into the session manager. From
431  * there they are pushed into an SSRC demuxer that splits the stream based on
432  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
433  * the GstRtpBinStream above).
434  */
435 struct _GstRtpBinSession
436 {
437   /* session id */
438   gint id;
439   /* the parent bin */
440   GstRtpBin *bin;
441   /* the session element */
442   GstElement *session;
443   /* the SSRC demuxer */
444   GstElement *demux;
445   gulong demux_newpad_sig;
446   gulong demux_padremoved_sig;
447
448   /* Bundling support */
449   GstElement *rtp_funnel;
450   GstElement *rtcp_funnel;
451   GstElement *bundle_demux;
452   gulong bundle_demux_newpad_sig;
453
454   GMutex lock;
455
456   /* list of GstRtpBinStream */
457   GSList *streams;
458
459   /* list of elements */
460   GSList *elements;
461
462   /* mapping of payload type to caps */
463   GHashTable *ptmap;
464
465   /* the pads of the session */
466   GstPad *recv_rtp_sink;
467   GstPad *recv_rtp_sink_ghost;
468   GstPad *recv_rtp_src;
469   GstPad *recv_rtcp_sink;
470   GstPad *recv_rtcp_sink_ghost;
471   GstPad *sync_src;
472   GstPad *send_rtp_sink;
473   GstPad *send_rtp_sink_ghost;
474   GstPad *send_rtp_src;
475   GstPad *send_rtp_src_ghost;
476   GstPad *send_rtcp_src;
477   GstPad *send_rtcp_src_ghost;
478 };
479
480 /* Manages the RTP streams that come from one client and should therefore be
481  * synchronized.
482  */
483 struct _GstRtpBinClient
484 {
485   /* the common CNAME for the streams */
486   gchar *cname;
487   guint cname_len;
488
489   /* the streams */
490   guint nstreams;
491   GSList *streams;
492 };
493
494 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
495 static GstRtpBinSession *
496 find_session_by_id (GstRtpBin * rtpbin, gint id)
497 {
498   GSList *walk;
499
500   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
501     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
502
503     if (sess->id == id)
504       return sess;
505   }
506   return NULL;
507 }
508
509 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
510 static GstRtpBinSession *
511 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
512 {
513   GSList *walk;
514
515   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
516     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
517
518     if ((sess->recv_rtp_sink_ghost == pad) ||
519         (sess->recv_rtcp_sink_ghost == pad) ||
520         (sess->send_rtp_sink_ghost == pad)
521         || (sess->send_rtcp_src_ghost == pad))
522       return sess;
523   }
524   return NULL;
525 }
526
527 static void
528 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
529 {
530   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
531       sess->id, ssrc);
532 }
533
534 static void
535 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
536 {
537   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
538       sess->id, ssrc);
539 }
540
541 static void
542 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
543 {
544   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
545       sess->id, ssrc);
546 }
547
548 static void
549 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
550 {
551   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
552       sess->id, ssrc);
553 }
554
555 static void
556 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
557 {
558   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
559       sess->id, ssrc);
560 }
561
562 static void
563 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
564 {
565   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
566       sess->id, ssrc);
567 }
568
569 static void
570 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
571 {
572   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
573       sess->id, ssrc);
574
575   if (sess->bin->priv->autoremove)
576     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
577 }
578
579 static void
580 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
581 {
582   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
583       sess->id, ssrc);
584
585   if (sess->bin->priv->autoremove)
586     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
587 }
588
589 static void
590 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
591 {
592   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
593       sess->id, ssrc);
594 }
595
596 static void
597 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
598 {
599   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
600       stream->session->id, stream->ssrc);
601 }
602
603 static void
604 on_new_sender_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
605 {
606   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC], 0,
607       sess->id, ssrc);
608 }
609
610 static void
611 on_sender_ssrc_active (GstElement * session, guint32 ssrc,
612     GstRtpBinSession * sess)
613 {
614   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE],
615       0, sess->id, ssrc);
616 }
617
618 /* must be called with the SESSION lock */
619 static GstRtpBinStream *
620 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
621 {
622   GSList *walk;
623
624   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
625     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
626
627     if (stream->ssrc == ssrc)
628       return stream;
629   }
630   return NULL;
631 }
632
633 static void
634 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
635     GstRtpBinSession * session)
636 {
637   GstRtpBinStream *stream = NULL;
638   GstRtpBin *rtpbin;
639
640   rtpbin = session->bin;
641
642   GST_RTP_BIN_LOCK (rtpbin);
643
644   GST_RTP_SESSION_LOCK (session);
645   if ((stream = find_stream_by_ssrc (session, ssrc)))
646     session->streams = g_slist_remove (session->streams, stream);
647   GST_RTP_SESSION_UNLOCK (session);
648
649   if (stream)
650     free_stream (stream, rtpbin);
651
652   GST_RTP_BIN_UNLOCK (rtpbin);
653 }
654
655 static void
656 new_bundled_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
657     GstRtpBinSession * session)
658 {
659   GValue result = G_VALUE_INIT;
660   GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT };
661   guint session_id = 0;
662   GstRtpBinSession *target_session = NULL;
663   GstRtpBin *rtpbin = session->bin;
664   gchar *name;
665   GstPad *src_pad;
666   GstPad *recv_rtp_sink = NULL;
667   GstPad *recv_rtcp_sink = NULL;
668   GstPadLinkReturn ret;
669
670   GST_RTP_BIN_DYN_LOCK (rtpbin);
671   GST_DEBUG_OBJECT (rtpbin, "new bundled SSRC pad %08x, %s:%s", ssrc,
672       GST_DEBUG_PAD_NAME (pad));
673
674   g_value_init (&result, G_TYPE_UINT);
675   g_value_init (&params[0], GST_TYPE_ELEMENT);
676   g_value_set_object (&params[0], rtpbin);
677   g_value_init (&params[1], G_TYPE_UINT);
678   g_value_set_uint (&params[1], ssrc);
679
680   g_signal_emitv (params,
681       gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, &result);
682   g_value_unset (&params[0]);
683
684   session_id = g_value_get_uint (&result);
685   if (session_id == 0) {
686     target_session = session;
687   } else {
688     target_session = find_session_by_id (rtpbin, (gint) session_id);
689     if (!target_session) {
690       target_session = create_session (rtpbin, session_id);
691     }
692     if (!target_session) {
693       /* create_session() warned already */
694       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
695       return;
696     }
697
698     if (!target_session->recv_rtp_sink) {
699       recv_rtp_sink = complete_session_sink (rtpbin, target_session, FALSE);
700     }
701
702     if (!target_session->recv_rtp_src)
703       complete_session_receiver (rtpbin, target_session, session_id);
704
705     if (!target_session->recv_rtcp_sink) {
706       recv_rtcp_sink =
707           complete_session_rtcp (rtpbin, target_session, session_id, FALSE);
708     }
709   }
710
711   GST_DEBUG_OBJECT (rtpbin, "Assigning bundled ssrc %u to session %u", ssrc,
712       session_id);
713
714   if (!recv_rtp_sink) {
715     recv_rtp_sink =
716         gst_element_get_request_pad (target_session->rtp_funnel, "sink_%u");
717   }
718
719   if (!recv_rtcp_sink) {
720     recv_rtcp_sink =
721         gst_element_get_request_pad (target_session->rtcp_funnel, "sink_%u");
722   }
723
724   name = g_strdup_printf ("src_%u", ssrc);
725   src_pad = gst_element_get_static_pad (element, name);
726   ret = gst_pad_link (src_pad, recv_rtp_sink);
727   g_free (name);
728   gst_object_unref (src_pad);
729   gst_object_unref (recv_rtp_sink);
730   if (ret != GST_PAD_LINK_OK) {
731     g_warning
732         ("rtpbin: failed to link bundle demuxer to receive rtp funnel for session %u",
733         session_id);
734   }
735
736   name = g_strdup_printf ("rtcp_src_%u", ssrc);
737   src_pad = gst_element_get_static_pad (element, name);
738   gst_pad_link (src_pad, recv_rtcp_sink);
739   g_free (name);
740   gst_object_unref (src_pad);
741   gst_object_unref (recv_rtcp_sink);
742   if (ret != GST_PAD_LINK_OK) {
743     g_warning
744         ("rtpbin: failed to link bundle demuxer to receive rtcp sink pad for session %u",
745         session_id);
746   }
747
748   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
749 }
750
751 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
752 static GstRtpBinSession *
753 create_session (GstRtpBin * rtpbin, gint id)
754 {
755   GstRtpBinSession *sess;
756   GstElement *session, *demux;
757   GstState target;
758
759   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
760     goto no_session;
761
762   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
763     goto no_demux;
764
765   sess = g_new0 (GstRtpBinSession, 1);
766   g_mutex_init (&sess->lock);
767   sess->id = id;
768   sess->bin = rtpbin;
769   sess->session = session;
770   sess->demux = demux;
771
772   sess->rtp_funnel = gst_element_factory_make ("funnel", NULL);
773   sess->rtcp_funnel = gst_element_factory_make ("funnel", NULL);
774
775   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
776       (GDestroyNotify) gst_caps_unref);
777   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
778
779   /* configure SDES items */
780   GST_OBJECT_LOCK (rtpbin);
781   g_object_set (session, "sdes", rtpbin->sdes, "rtp-profile",
782       rtpbin->rtp_profile, "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time,
783       NULL);
784   if (rtpbin->use_pipeline_clock)
785     g_object_set (session, "use-pipeline-clock", rtpbin->use_pipeline_clock,
786         NULL);
787   else
788     g_object_set (session, "ntp-time-source", rtpbin->ntp_time_source, NULL);
789
790   g_object_set (session, "max-dropout-time", rtpbin->max_dropout_time,
791       "max-misorder-time", rtpbin->max_misorder_time, NULL);
792   GST_OBJECT_UNLOCK (rtpbin);
793
794   /* provide clock_rate to the session manager when needed */
795   g_signal_connect (session, "request-pt-map",
796       (GCallback) pt_map_requested, sess);
797
798   g_signal_connect (sess->session, "on-new-ssrc",
799       (GCallback) on_new_ssrc, sess);
800   g_signal_connect (sess->session, "on-ssrc-collision",
801       (GCallback) on_ssrc_collision, sess);
802   g_signal_connect (sess->session, "on-ssrc-validated",
803       (GCallback) on_ssrc_validated, sess);
804   g_signal_connect (sess->session, "on-ssrc-active",
805       (GCallback) on_ssrc_active, sess);
806   g_signal_connect (sess->session, "on-ssrc-sdes",
807       (GCallback) on_ssrc_sdes, sess);
808   g_signal_connect (sess->session, "on-bye-ssrc",
809       (GCallback) on_bye_ssrc, sess);
810   g_signal_connect (sess->session, "on-bye-timeout",
811       (GCallback) on_bye_timeout, sess);
812   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
813   g_signal_connect (sess->session, "on-sender-timeout",
814       (GCallback) on_sender_timeout, sess);
815   g_signal_connect (sess->session, "on-new-sender-ssrc",
816       (GCallback) on_new_sender_ssrc, sess);
817   g_signal_connect (sess->session, "on-sender-ssrc-active",
818       (GCallback) on_sender_ssrc_active, sess);
819
820   gst_bin_add (GST_BIN_CAST (rtpbin), session);
821   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
822   gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtp_funnel);
823   gst_bin_add (GST_BIN_CAST (rtpbin), sess->rtcp_funnel);
824
825   GST_OBJECT_LOCK (rtpbin);
826   target = GST_STATE_TARGET (rtpbin);
827   GST_OBJECT_UNLOCK (rtpbin);
828
829   /* change state only to what's needed */
830   gst_element_set_state (demux, target);
831   gst_element_set_state (session, target);
832   gst_element_set_state (sess->rtp_funnel, target);
833   gst_element_set_state (sess->rtcp_funnel, target);
834
835   return sess;
836
837   /* ERRORS */
838 no_session:
839   {
840     g_warning ("rtpbin: could not create rtpsession element");
841     return NULL;
842   }
843 no_demux:
844   {
845     gst_object_unref (session);
846     g_warning ("rtpbin: could not create rtpssrcdemux element");
847     return NULL;
848   }
849 }
850
851 static gboolean
852 bin_manage_element (GstRtpBin * bin, GstElement * element)
853 {
854   GstRtpBinPrivate *priv = bin->priv;
855
856   if (g_list_find (priv->elements, element)) {
857     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
858   } else {
859     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
860     if (!gst_bin_add (GST_BIN_CAST (bin), element))
861       goto add_failed;
862     if (!gst_element_sync_state_with_parent (element))
863       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
864   }
865   /* we add the element multiple times, each we need an equal number of
866    * removes to really remove the element from the bin */
867   priv->elements = g_list_prepend (priv->elements, element);
868
869   return TRUE;
870
871   /* ERRORS */
872 add_failed:
873   {
874     GST_WARNING_OBJECT (bin, "unable to add element");
875     return FALSE;
876   }
877 }
878
879 static void
880 remove_bin_element (GstElement * element, GstRtpBin * bin)
881 {
882   GstRtpBinPrivate *priv = bin->priv;
883   GList *find;
884
885   find = g_list_find (priv->elements, element);
886   if (find) {
887     priv->elements = g_list_delete_link (priv->elements, find);
888
889     if (!g_list_find (priv->elements, element))
890       gst_bin_remove (GST_BIN_CAST (bin), element);
891     else
892       gst_object_unref (element);
893   }
894 }
895
896 /* called with RTP_BIN_LOCK */
897 static void
898 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
899 {
900   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
901
902   gst_element_set_locked_state (sess->demux, TRUE);
903   gst_element_set_locked_state (sess->session, TRUE);
904
905   gst_element_set_state (sess->demux, GST_STATE_NULL);
906   gst_element_set_state (sess->session, GST_STATE_NULL);
907
908   remove_recv_rtp (bin, sess);
909   remove_recv_rtcp (bin, sess);
910   remove_send_rtp (bin, sess);
911   remove_rtcp (bin, sess);
912
913   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
914   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
915
916   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
917   g_slist_free (sess->elements);
918
919   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
920   g_slist_free (sess->streams);
921
922   g_mutex_clear (&sess->lock);
923   g_hash_table_destroy (sess->ptmap);
924
925   g_free (sess);
926 }
927
928 /* get the payload type caps for the specific payload @pt in @session */
929 static GstCaps *
930 get_pt_map (GstRtpBinSession * session, guint pt)
931 {
932   GstCaps *caps = NULL;
933   GstRtpBin *bin;
934   GValue ret = { 0 };
935   GValue args[3] = { {0}, {0}, {0} };
936
937   GST_DEBUG ("searching pt %u in cache", pt);
938
939   GST_RTP_SESSION_LOCK (session);
940
941   /* first look in the cache */
942   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
943   if (caps) {
944     gst_caps_ref (caps);
945     goto done;
946   }
947
948   bin = session->bin;
949
950   GST_DEBUG ("emiting signal for pt %u in session %u", pt, session->id);
951
952   /* not in cache, send signal to request caps */
953   g_value_init (&args[0], GST_TYPE_ELEMENT);
954   g_value_set_object (&args[0], bin);
955   g_value_init (&args[1], G_TYPE_UINT);
956   g_value_set_uint (&args[1], session->id);
957   g_value_init (&args[2], G_TYPE_UINT);
958   g_value_set_uint (&args[2], pt);
959
960   g_value_init (&ret, GST_TYPE_CAPS);
961   g_value_set_boxed (&ret, NULL);
962
963   GST_RTP_SESSION_UNLOCK (session);
964
965   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
966
967   GST_RTP_SESSION_LOCK (session);
968
969   g_value_unset (&args[0]);
970   g_value_unset (&args[1]);
971   g_value_unset (&args[2]);
972
973   /* look in the cache again because we let the lock go */
974   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
975   if (caps) {
976     gst_caps_ref (caps);
977     g_value_unset (&ret);
978     goto done;
979   }
980
981   caps = (GstCaps *) g_value_dup_boxed (&ret);
982   g_value_unset (&ret);
983   if (!caps)
984     goto no_caps;
985
986   GST_DEBUG ("caching pt %u as %" GST_PTR_FORMAT, pt, caps);
987
988   /* store in cache, take additional ref */
989   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
990       gst_caps_ref (caps));
991
992 done:
993   GST_RTP_SESSION_UNLOCK (session);
994
995   return caps;
996
997   /* ERRORS */
998 no_caps:
999   {
1000     GST_RTP_SESSION_UNLOCK (session);
1001     GST_DEBUG ("no pt map could be obtained");
1002     return NULL;
1003   }
1004 }
1005
1006 static gboolean
1007 return_true (gpointer key, gpointer value, gpointer user_data)
1008 {
1009   return TRUE;
1010 }
1011
1012 static void
1013 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
1014 {
1015   GSList *clients, *streams;
1016
1017   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
1018
1019   GST_RTP_BIN_LOCK (rtpbin);
1020   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
1021     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1022
1023     /* reset sync on all streams for this client */
1024     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
1025       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1026
1027       /* make use require a new SR packet for this stream before we attempt new
1028        * lip-sync */
1029       stream->have_sync = FALSE;
1030       stream->rt_delta = 0;
1031       stream->rtp_delta = 0;
1032       stream->clock_base = -100 * GST_SECOND;
1033     }
1034   }
1035   GST_RTP_BIN_UNLOCK (rtpbin);
1036 }
1037
1038 static void
1039 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
1040 {
1041   GSList *sessions, *streams;
1042
1043   GST_RTP_BIN_LOCK (bin);
1044   GST_DEBUG_OBJECT (bin, "clearing pt map");
1045   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1046     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1047
1048     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
1049     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
1050
1051     GST_RTP_SESSION_LOCK (session);
1052     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
1053
1054     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1055       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1056
1057       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
1058       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
1059       if (stream->demux)
1060         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
1061     }
1062     GST_RTP_SESSION_UNLOCK (session);
1063   }
1064   GST_RTP_BIN_UNLOCK (bin);
1065
1066   /* reset sync too */
1067   gst_rtp_bin_reset_sync (bin);
1068 }
1069
1070 static GstElement *
1071 gst_rtp_bin_get_session (GstRtpBin * bin, guint session_id)
1072 {
1073   GstRtpBinSession *session;
1074   GstElement *ret = NULL;
1075
1076   GST_RTP_BIN_LOCK (bin);
1077   GST_DEBUG_OBJECT (bin, "retrieving GstRtpSession, index: %u", session_id);
1078   session = find_session_by_id (bin, (gint) session_id);
1079   if (session) {
1080     ret = gst_object_ref (session->session);
1081   }
1082   GST_RTP_BIN_UNLOCK (bin);
1083
1084   return ret;
1085 }
1086
1087 static RTPSession *
1088 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
1089 {
1090   RTPSession *internal_session = NULL;
1091   GstRtpBinSession *session;
1092
1093   GST_RTP_BIN_LOCK (bin);
1094   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %u",
1095       session_id);
1096   session = find_session_by_id (bin, (gint) session_id);
1097   if (session) {
1098     g_object_get (session->session, "internal-session", &internal_session,
1099         NULL);
1100   }
1101   GST_RTP_BIN_UNLOCK (bin);
1102
1103   return internal_session;
1104 }
1105
1106 static GstElement *
1107 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
1108 {
1109   GST_DEBUG_OBJECT (bin, "return NULL encoder");
1110   return NULL;
1111 }
1112
1113 static GstElement *
1114 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
1115 {
1116   GST_DEBUG_OBJECT (bin, "return NULL decoder");
1117   return NULL;
1118 }
1119
1120 static void
1121 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
1122     const gchar * name, const GValue * value)
1123 {
1124   GSList *sessions, *streams;
1125
1126   GST_RTP_BIN_LOCK (bin);
1127   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1128     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
1129
1130     GST_RTP_SESSION_LOCK (session);
1131     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
1132       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
1133
1134       g_object_set_property (G_OBJECT (stream->buffer), name, value);
1135     }
1136     GST_RTP_SESSION_UNLOCK (session);
1137   }
1138   GST_RTP_BIN_UNLOCK (bin);
1139 }
1140
1141 static void
1142 gst_rtp_bin_propagate_property_to_session (GstRtpBin * bin,
1143     const gchar * name, const GValue * value)
1144 {
1145   GSList *sessions;
1146
1147   GST_RTP_BIN_LOCK (bin);
1148   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
1149     GstRtpBinSession *sess = (GstRtpBinSession *) sessions->data;
1150
1151     g_object_set_property (G_OBJECT (sess->session), name, value);
1152   }
1153   GST_RTP_BIN_UNLOCK (bin);
1154 }
1155
1156 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
1157 static GstRtpBinClient *
1158 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
1159 {
1160   GstRtpBinClient *result = NULL;
1161   GSList *walk;
1162
1163   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
1164     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
1165
1166     if (len != client->cname_len)
1167       continue;
1168
1169     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
1170       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
1171           client->cname);
1172       result = client;
1173       break;
1174     }
1175   }
1176
1177   /* nothing found, create one */
1178   if (result == NULL) {
1179     result = g_new0 (GstRtpBinClient, 1);
1180     result->cname = g_strndup ((gchar *) data, len);
1181     result->cname_len = len;
1182     bin->clients = g_slist_prepend (bin->clients, result);
1183     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
1184         result->cname);
1185   }
1186   return result;
1187 }
1188
1189 static void
1190 free_client (GstRtpBinClient * client, GstRtpBin * bin)
1191 {
1192   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
1193   g_slist_free (client->streams);
1194   g_free (client->cname);
1195   g_free (client);
1196 }
1197
1198 static void
1199 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
1200     guint64 * ntpnstime)
1201 {
1202   guint64 ntpns = -1;
1203   GstClock *clock;
1204   GstClockTime base_time, rt, clock_time;
1205
1206   GST_OBJECT_LOCK (bin);
1207   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1208     base_time = GST_ELEMENT_CAST (bin)->base_time;
1209     gst_object_ref (clock);
1210     GST_OBJECT_UNLOCK (bin);
1211
1212     /* get current clock time and convert to running time */
1213     clock_time = gst_clock_get_time (clock);
1214     rt = clock_time - base_time;
1215
1216     if (bin->use_pipeline_clock) {
1217       ntpns = rt;
1218       /* add constant to convert from 1970 based time to 1900 based time */
1219       ntpns += (2208988800LL * GST_SECOND);
1220     } else {
1221       switch (bin->ntp_time_source) {
1222         case GST_RTP_NTP_TIME_SOURCE_NTP:
1223         case GST_RTP_NTP_TIME_SOURCE_UNIX:{
1224           GTimeVal current;
1225
1226           /* get current NTP time */
1227           g_get_current_time (&current);
1228           ntpns = GST_TIMEVAL_TO_TIME (current);
1229
1230           /* add constant to convert from 1970 based time to 1900 based time */
1231           if (bin->ntp_time_source == GST_RTP_NTP_TIME_SOURCE_NTP)
1232             ntpns += (2208988800LL * GST_SECOND);
1233           break;
1234         }
1235         case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
1236           ntpns = rt;
1237           break;
1238         case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
1239           ntpns = clock_time;
1240           break;
1241         default:
1242           ntpns = -1;           /* Fix uninited compiler warning */
1243           g_assert_not_reached ();
1244           break;
1245       }
1246     }
1247
1248     gst_object_unref (clock);
1249   } else {
1250     GST_OBJECT_UNLOCK (bin);
1251     rt = -1;
1252     ntpns = -1;
1253   }
1254   if (running_time)
1255     *running_time = rt;
1256   if (ntpnstime)
1257     *ntpnstime = ntpns;
1258 }
1259
1260 static void
1261 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1262     gint64 ts_offset, gboolean check)
1263 {
1264   gint64 prev_ts_offset;
1265
1266   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1267
1268   /* delta changed, see how much */
1269   if (prev_ts_offset != ts_offset) {
1270     gint64 diff;
1271
1272     diff = prev_ts_offset - ts_offset;
1273
1274     GST_DEBUG_OBJECT (bin,
1275         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1276         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1277
1278     if (check) {
1279       /* only change diff when it changed more than 4 milliseconds. This
1280        * compensates for rounding errors in NTP to RTP timestamp
1281        * conversions */
1282       if (ABS (diff) < 4 * GST_MSECOND) {
1283         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1284         return;
1285       }
1286       if (ABS (diff) > (3 * GST_SECOND)) {
1287         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
1288         return;
1289       }
1290     }
1291     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
1292   }
1293   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1294       stream->ssrc, ts_offset);
1295 }
1296
1297 static void
1298 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1299 {
1300   if (stream->bin->send_sync_event) {
1301     GstEvent *event;
1302     GstPad *srcpad;
1303
1304     GST_DEBUG_OBJECT (stream->bin,
1305         "sending GstRTCPSRReceived event downstream");
1306
1307     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1308         gst_structure_new_empty ("GstRTCPSRReceived"));
1309
1310     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1311     gst_pad_push_event (srcpad, event);
1312     gst_object_unref (srcpad);
1313   }
1314 }
1315
1316 /* associate a stream to the given CNAME. This will make sure all streams for
1317  * that CNAME are synchronized together.
1318  * Must be called with GST_RTP_BIN_LOCK */
1319 static void
1320 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1321     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1322     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1323     gint64 rtp_clock_base)
1324 {
1325   GstRtpBinClient *client;
1326   gboolean created;
1327   GSList *walk;
1328   GstClockTime running_time, running_time_rtp;
1329   guint64 ntpnstime;
1330
1331   /* first find or create the CNAME */
1332   client = get_client (bin, len, data, &created);
1333
1334   /* find stream in the client */
1335   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1336     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1337
1338     if (ostream == stream)
1339       break;
1340   }
1341   /* not found, add it to the list */
1342   if (walk == NULL) {
1343     GST_DEBUG_OBJECT (bin,
1344         "new association of SSRC %08x with client %p with CNAME %s",
1345         stream->ssrc, client, client->cname);
1346     client->streams = g_slist_prepend (client->streams, stream);
1347     client->nstreams++;
1348   } else {
1349     GST_DEBUG_OBJECT (bin,
1350         "found association of SSRC %08x with client %p with CNAME %s",
1351         stream->ssrc, client, client->cname);
1352   }
1353
1354   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1355     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1356     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1357       /* we don't need that data, so carry on,
1358        * but make some values look saner */
1359       last_extrtptime = base_rtptime;
1360     } else {
1361       /* nothing we can do with this data in this case */
1362       GST_DEBUG_OBJECT (bin, "bailing out");
1363       return;
1364     }
1365   }
1366
1367   /* Take the extended rtptime we found in the SR packet and map it to the
1368    * local rtptime. The local rtp time is used to construct timestamps on the
1369    * buffers so we will calculate what running_time corresponds to the RTP
1370    * timestamp in the SR packet. */
1371   running_time_rtp = last_extrtptime - base_rtptime;
1372
1373   GST_DEBUG_OBJECT (bin,
1374       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1375       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1376       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1377       last_extrtptime, running_time_rtp, clock_rate, rtp_clock_base);
1378
1379   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1380    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1381    * into a corresponding gstreamer timestamp. Note that the base_time also
1382    * contains the drift between sender and receiver. */
1383   running_time =
1384       gst_util_uint64_scale_int (running_time_rtp, GST_SECOND, clock_rate);
1385   running_time += base_time;
1386
1387   /* convert ntptime to nanoseconds */
1388   ntpnstime = gst_util_uint64_scale (ntptime, GST_SECOND,
1389       (G_GINT64_CONSTANT (1) << 32));
1390
1391   stream->have_sync = TRUE;
1392
1393   GST_DEBUG_OBJECT (bin,
1394       "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT,
1395       running_time, ntpnstime);
1396
1397   /* recalc inter stream playout offset, but only if there is more than one
1398    * stream or we're doing NTP sync. */
1399   if (bin->ntp_sync) {
1400     gint64 ntpdiff, rtdiff;
1401     guint64 local_ntpnstime;
1402     GstClockTime local_running_time;
1403
1404     /* For NTP sync we need to first get a snapshot of running_time and NTP
1405      * time. We know at what running_time we play a certain RTP time, we also
1406      * calculated when we would play the RTP time in the SR packet. Now we need
1407      * to know how the running_time and the NTP time relate to eachother. */
1408     get_current_times (bin, &local_running_time, &local_ntpnstime);
1409
1410     /* see how far away the NTP time is. This is the difference between the
1411      * current NTP time and the NTP time in the last SR packet. */
1412     ntpdiff = local_ntpnstime - ntpnstime;
1413     /* see how far away the running_time is. This is the difference between the
1414      * current running_time and the running_time of the RTP timestamp in the
1415      * last SR packet. */
1416     rtdiff = local_running_time - running_time;
1417
1418     GST_DEBUG_OBJECT (bin,
1419         "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT,
1420         local_ntpnstime, ntpnstime);
1421     GST_DEBUG_OBJECT (bin,
1422         "local running time %" G_GUINT64_FORMAT ", SR RTP running time %"
1423         G_GUINT64_FORMAT, local_running_time, running_time);
1424     GST_DEBUG_OBJECT (bin,
1425         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1426         rtdiff);
1427
1428     /* combine to get the final diff to apply to the running_time */
1429     stream->rt_delta = rtdiff - ntpdiff;
1430
1431     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1432   } else {
1433     gint64 min, rtp_min, clock_base = stream->clock_base;
1434     gboolean all_sync, use_rtp;
1435     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1436
1437     /* calculate delta between server and receiver. ntpnstime is created by
1438      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1439      * delta expresses the difference to our timeline and the server timeline. The
1440      * difference in itself doesn't mean much but we can combine the delta of
1441      * multiple streams to create a stream specific offset. */
1442     stream->rt_delta = ntpnstime - running_time;
1443
1444     /* calculate the min of all deltas, ignoring streams that did not yet have a
1445      * valid rt_delta because we did not yet receive an SR packet for those
1446      * streams.
1447      * We calculate the mininum because we would like to only apply positive
1448      * offsets to streams, delaying their playback instead of trying to speed up
1449      * other streams (which might be imposible when we have to create negative
1450      * latencies).
1451      * The stream that has the smallest diff is selected as the reference stream,
1452      * all other streams will have a positive offset to this difference. */
1453
1454     /* some alternative setting allow ignoring RTCP as much as possible,
1455      * for servers generating bogus ntp timeline */
1456     min = rtp_min = G_MAXINT64;
1457     use_rtp = FALSE;
1458     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1459       guint64 ext_base;
1460
1461       use_rtp = TRUE;
1462       /* signed version for convienience */
1463       clock_base = base_rtptime;
1464       /* deal with possible wrap-around */
1465       ext_base = base_rtptime;
1466       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1467       /* sanity check; base rtp and provided clock_base should be close */
1468       if (rtp_clock_base >= clock_base) {
1469         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1470           rtp_clock_base = base_time +
1471               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1472               GST_SECOND, clock_rate);
1473         } else {
1474           use_rtp = FALSE;
1475         }
1476       } else {
1477         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1478           rtp_clock_base = base_time -
1479               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1480               GST_SECOND, clock_rate);
1481         } else {
1482           use_rtp = FALSE;
1483         }
1484       }
1485       /* warn and bail for clarity out if no sane values */
1486       if (!use_rtp) {
1487         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1488         return;
1489       }
1490       /* store to track changes */
1491       clock_base = rtp_clock_base;
1492       /* generate a fake as before,
1493        * now equating rtptime obtained from RTP-Info,
1494        * where the large time represent the otherwise irrelevant npt/ntp time */
1495       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1496     } else {
1497       clock_base = rtp_clock_base;
1498     }
1499
1500     all_sync = TRUE;
1501     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1502       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1503
1504       if (!ostream->have_sync) {
1505         all_sync = FALSE;
1506         continue;
1507       }
1508
1509       /* change in current stream's base from previously init'ed value
1510        * leads to reset of all stream's base */
1511       if (stream != ostream && stream->clock_base >= 0 &&
1512           (stream->clock_base != clock_base)) {
1513         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1514         ostream->clock_base = -100 * GST_SECOND;
1515         ostream->rtp_delta = 0;
1516       }
1517
1518       if (ostream->rt_delta < min)
1519         min = ostream->rt_delta;
1520       if (ostream->rtp_delta < rtp_min)
1521         rtp_min = ostream->rtp_delta;
1522     }
1523
1524     /* arrange to re-sync for each stream upon significant change,
1525      * e.g. post-seek */
1526     all_sync = all_sync && (stream->clock_base == clock_base);
1527     stream->clock_base = clock_base;
1528
1529     /* may need init performed above later on, but nothing more to do now */
1530     if (client->nstreams <= 1)
1531       return;
1532
1533     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1534         " all sync %d", client, min, all_sync);
1535     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1536
1537     switch (rtcp_sync) {
1538       case GST_RTP_BIN_RTCP_SYNC_RTP:
1539         if (!use_rtp)
1540           break;
1541         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1542             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1543         /* fall-through */
1544       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1545         /* if all have been synced already, do not bother further */
1546         if (all_sync) {
1547           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1548           return;
1549         }
1550         break;
1551       default:
1552         break;
1553     }
1554
1555     /* bail out if we adjusted recently enough */
1556     if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) <
1557         bin->rtcp_sync_interval * GST_MSECOND) {
1558       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1559           "previous sender info too recent "
1560           "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime);
1561       return;
1562     }
1563     bin->priv->last_ntpnstime = ntpnstime;
1564
1565     /* calculate offsets for each stream */
1566     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1567       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1568       gint64 ts_offset;
1569
1570       /* ignore streams for which we didn't receive an SR packet yet, we
1571        * can't synchronize them yet. We can however sync other streams just
1572        * fine. */
1573       if (!ostream->have_sync)
1574         continue;
1575
1576       /* calculate offset to our reference stream, this should always give a
1577        * positive number. */
1578       if (use_rtp)
1579         ts_offset = ostream->rtp_delta - rtp_min;
1580       else
1581         ts_offset = ostream->rt_delta - min;
1582
1583       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1584     }
1585   }
1586   gst_rtp_bin_send_sync_event (stream);
1587
1588   return;
1589 }
1590
1591 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1592   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1593           (b) = gst_rtcp_packet_move_to_next ((packet)))
1594
1595 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1596   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1597           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1598
1599 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1600   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1601           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1602
1603 static void
1604 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1605     GstRtpBinStream * stream)
1606 {
1607   GstRtpBin *bin;
1608   GstRTCPPacket packet;
1609   guint32 ssrc;
1610   guint64 ntptime;
1611   gboolean have_sr, have_sdes;
1612   gboolean more;
1613   guint64 base_rtptime;
1614   guint64 base_time;
1615   guint clock_rate;
1616   guint64 clock_base;
1617   guint64 extrtptime;
1618   GstBuffer *buffer;
1619   GstRTCPBuffer rtcp = { NULL, };
1620
1621   bin = stream->bin;
1622
1623   GST_DEBUG_OBJECT (bin, "sync handler called");
1624
1625   /* get the last relation between the rtp timestamps and the gstreamer
1626    * timestamps. We get this info directly from the jitterbuffer which
1627    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1628    * what the current situation is. */
1629   base_rtptime =
1630       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1631   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1632   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1633   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1634   extrtptime =
1635       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1636   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1637
1638   have_sr = FALSE;
1639   have_sdes = FALSE;
1640
1641   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1642
1643   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1644     /* first packet must be SR or RR or else the validate would have failed */
1645     switch (gst_rtcp_packet_get_type (&packet)) {
1646       case GST_RTCP_TYPE_SR:
1647         /* only parse first. There is only supposed to be one SR in the packet
1648          * but we will deal with malformed packets gracefully */
1649         if (have_sr)
1650           break;
1651         /* get NTP and RTP times */
1652         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1653             NULL, NULL);
1654
1655         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1656         /* ignore SR that is not ours */
1657         if (ssrc != stream->ssrc)
1658           continue;
1659
1660         have_sr = TRUE;
1661         break;
1662       case GST_RTCP_TYPE_SDES:
1663       {
1664         gboolean more_items, more_entries;
1665
1666         /* only deal with first SDES, there is only supposed to be one SDES in
1667          * the RTCP packet but we deal with bad packets gracefully. Also bail
1668          * out if we have not seen an SR item yet. */
1669         if (have_sdes || !have_sr)
1670           break;
1671
1672         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1673           /* skip items that are not about the SSRC of the sender */
1674           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1675             continue;
1676
1677           /* find the CNAME entry */
1678           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1679             GstRTCPSDESType type;
1680             guint8 len;
1681             guint8 *data;
1682
1683             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1684
1685             if (type == GST_RTCP_SDES_CNAME) {
1686               GST_RTP_BIN_LOCK (bin);
1687               /* associate the stream to CNAME */
1688               gst_rtp_bin_associate (bin, stream, len, data,
1689                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1690                   clock_base);
1691               GST_RTP_BIN_UNLOCK (bin);
1692             }
1693           }
1694         }
1695         have_sdes = TRUE;
1696         break;
1697       }
1698       default:
1699         /* we can ignore these packets */
1700         break;
1701     }
1702   }
1703   gst_rtcp_buffer_unmap (&rtcp);
1704 }
1705
1706 /* create a new stream with @ssrc in @session. Must be called with
1707  * RTP_SESSION_LOCK. */
1708 static GstRtpBinStream *
1709 create_stream (GstRtpBinSession * session, guint32 ssrc)
1710 {
1711   GstElement *buffer, *demux = NULL;
1712   GstRtpBinStream *stream;
1713   GstRtpBin *rtpbin;
1714   GstState target;
1715
1716   rtpbin = session->bin;
1717
1718   if (g_slist_length (session->streams) >= rtpbin->max_streams)
1719     goto max_streams;
1720
1721   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1722     goto no_jitterbuffer;
1723
1724   if (!rtpbin->ignore_pt)
1725     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1726       goto no_demux;
1727
1728   stream = g_new0 (GstRtpBinStream, 1);
1729   stream->ssrc = ssrc;
1730   stream->bin = rtpbin;
1731   stream->session = session;
1732   stream->buffer = buffer;
1733   stream->demux = demux;
1734
1735   stream->have_sync = FALSE;
1736   stream->rt_delta = 0;
1737   stream->rtp_delta = 0;
1738   stream->percent = 100;
1739   stream->clock_base = -100 * GST_SECOND;
1740   session->streams = g_slist_prepend (session->streams, stream);
1741
1742   /* provide clock_rate to the jitterbuffer when needed */
1743   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1744       (GCallback) pt_map_requested, session);
1745   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1746       (GCallback) on_npt_stop, stream);
1747
1748   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1749   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1750
1751   /* configure latency and packet lost */
1752   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1753   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1754   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1755   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1756   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1757   g_object_set (buffer, "max-rtcp-rtp-time-diff",
1758       rtpbin->max_rtcp_rtp_time_diff, NULL);
1759   g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time,
1760       "max-misorder-time", rtpbin->max_misorder_time, NULL);
1761   g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
1762
1763   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1764       buffer, session->id, ssrc);
1765
1766   if (!rtpbin->ignore_pt)
1767     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1768   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1769
1770   /* link stuff */
1771   if (demux)
1772     gst_element_link_pads_full (buffer, "src", demux, "sink",
1773         GST_PAD_LINK_CHECK_NOTHING);
1774
1775   if (rtpbin->buffering) {
1776     guint64 last_out;
1777
1778     GST_INFO_OBJECT (rtpbin,
1779         "bin is buffering, set jitterbuffer as not active");
1780     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1781   }
1782
1783
1784   GST_OBJECT_LOCK (rtpbin);
1785   target = GST_STATE_TARGET (rtpbin);
1786   GST_OBJECT_UNLOCK (rtpbin);
1787
1788   /* from sink to source */
1789   if (demux)
1790     gst_element_set_state (demux, target);
1791
1792   gst_element_set_state (buffer, target);
1793
1794   return stream;
1795
1796   /* ERRORS */
1797 max_streams:
1798   {
1799     GST_WARNING_OBJECT (rtpbin, "stream exeeds maximum (%d)",
1800         rtpbin->max_streams);
1801     return NULL;
1802   }
1803 no_jitterbuffer:
1804   {
1805     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1806     return NULL;
1807   }
1808 no_demux:
1809   {
1810     gst_object_unref (buffer);
1811     g_warning ("rtpbin: could not create rtpptdemux element");
1812     return NULL;
1813   }
1814 }
1815
1816 /* called with RTP_BIN_LOCK */
1817 static void
1818 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1819 {
1820   GSList *clients, *next_client;
1821
1822   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1823
1824   if (stream->demux) {
1825     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1826     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1827     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1828   }
1829   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1830   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1831   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1832
1833   if (stream->demux)
1834     gst_element_set_locked_state (stream->demux, TRUE);
1835   gst_element_set_locked_state (stream->buffer, TRUE);
1836
1837   if (stream->demux)
1838     gst_element_set_state (stream->demux, GST_STATE_NULL);
1839   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1840
1841   /* now remove this signal, we need this while going to NULL because it to
1842    * do some cleanups */
1843   if (stream->demux)
1844     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1845
1846   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1847   if (stream->demux)
1848     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1849
1850   for (clients = bin->clients; clients; clients = next_client) {
1851     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1852     GSList *streams, *next_stream;
1853
1854     next_client = g_slist_next (clients);
1855
1856     for (streams = client->streams; streams; streams = next_stream) {
1857       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1858
1859       next_stream = g_slist_next (streams);
1860
1861       if (ostream == stream) {
1862         client->streams = g_slist_delete_link (client->streams, streams);
1863         /* If this was the last stream belonging to this client,
1864          * clean up the client. */
1865         if (--client->nstreams == 0) {
1866           bin->clients = g_slist_delete_link (bin->clients, clients);
1867           free_client (client, bin);
1868           break;
1869         }
1870       }
1871     }
1872   }
1873   g_free (stream);
1874 }
1875
1876 /* GObject vmethods */
1877 static void gst_rtp_bin_dispose (GObject * object);
1878 static void gst_rtp_bin_finalize (GObject * object);
1879 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1880     const GValue * value, GParamSpec * pspec);
1881 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1882     GValue * value, GParamSpec * pspec);
1883
1884 /* GstElement vmethods */
1885 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1886     GstStateChange transition);
1887 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1888     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1889 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1890 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1891
1892 #define gst_rtp_bin_parent_class parent_class
1893 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1894
1895 static gboolean
1896 _gst_element_accumulator (GSignalInvocationHint * ihint,
1897     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1898 {
1899   GstElement *element;
1900
1901   element = g_value_get_object (handler_return);
1902   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
1903
1904   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1905     g_value_set_object (return_accu, element);
1906
1907   /* stop emission if we have an element */
1908   return (element == NULL);
1909 }
1910
1911 static gboolean
1912 _gst_caps_accumulator (GSignalInvocationHint * ihint,
1913     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1914 {
1915   GstCaps *caps;
1916
1917   caps = g_value_get_boxed (handler_return);
1918   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
1919
1920   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1921     g_value_set_boxed (return_accu, caps);
1922
1923   /* stop emission if we have a caps */
1924   return (caps == NULL);
1925 }
1926
1927 static void
1928 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1929 {
1930   GObjectClass *gobject_class;
1931   GstElementClass *gstelement_class;
1932   GstBinClass *gstbin_class;
1933
1934   gobject_class = (GObjectClass *) klass;
1935   gstelement_class = (GstElementClass *) klass;
1936   gstbin_class = (GstBinClass *) klass;
1937
1938   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1939
1940   gobject_class->dispose = gst_rtp_bin_dispose;
1941   gobject_class->finalize = gst_rtp_bin_finalize;
1942   gobject_class->set_property = gst_rtp_bin_set_property;
1943   gobject_class->get_property = gst_rtp_bin_get_property;
1944
1945   g_object_class_install_property (gobject_class, PROP_LATENCY,
1946       g_param_spec_uint ("latency", "Buffer latency in ms",
1947           "Default amount of ms to buffer in the jitterbuffers", 0,
1948           G_MAXUINT, DEFAULT_LATENCY_MS,
1949           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1950
1951   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1952       g_param_spec_boolean ("drop-on-latency",
1953           "Drop buffers when maximum latency is reached",
1954           "Tells the jitterbuffer to never exceed the given latency in size",
1955           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1956
1957   /**
1958    * GstRtpBin::request-pt-map:
1959    * @rtpbin: the object which received the signal
1960    * @session: the session
1961    * @pt: the pt
1962    *
1963    * Request the payload type as #GstCaps for @pt in @session.
1964    */
1965   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1966       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1967       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1968       _gst_caps_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS,
1969       2, G_TYPE_UINT, G_TYPE_UINT);
1970
1971     /**
1972    * GstRtpBin::payload-type-change:
1973    * @rtpbin: the object which received the signal
1974    * @session: the session
1975    * @pt: the pt
1976    *
1977    * Signal that the current payload type changed to @pt in @session.
1978    */
1979   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1980       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1981       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1982       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1983       G_TYPE_UINT);
1984
1985   /**
1986    * GstRtpBin::clear-pt-map:
1987    * @rtpbin: the object which received the signal
1988    *
1989    * Clear all previously cached pt-mapping obtained with
1990    * #GstRtpBin::request-pt-map.
1991    */
1992   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1993       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1994       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1995           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1996       0, G_TYPE_NONE);
1997
1998   /**
1999    * GstRtpBin::reset-sync:
2000    * @rtpbin: the object which received the signal
2001    *
2002    * Reset all currently configured lip-sync parameters and require new SR
2003    * packets for all streams before lip-sync is attempted again.
2004    */
2005   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
2006       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
2007       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2008           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
2009       0, G_TYPE_NONE);
2010
2011   /**
2012    * GstRtpBin::get-session:
2013    * @rtpbin: the object which received the signal
2014    * @id: the session id
2015    *
2016    * Request the related GstRtpSession as #GstElement related with session @id.
2017    *
2018    * Since: 1.8
2019    */
2020   gst_rtp_bin_signals[SIGNAL_GET_SESSION] =
2021       g_signal_new ("get-session", G_TYPE_FROM_CLASS (klass),
2022       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2023           get_session), NULL, NULL, g_cclosure_marshal_generic,
2024       GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2025
2026   /**
2027    * GstRtpBin::get-internal-session:
2028    * @rtpbin: the object which received the signal
2029    * @id: the session id
2030    *
2031    * Request the internal RTPSession object as #GObject in session @id.
2032    */
2033   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
2034       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
2035       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
2036           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
2037       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
2038
2039   /**
2040    * GstRtpBin::on-new-ssrc:
2041    * @rtpbin: the object which received the signal
2042    * @session: the session
2043    * @ssrc: the SSRC
2044    *
2045    * Notify of a new SSRC that entered @session.
2046    */
2047   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
2048       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
2049       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
2050       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2051       G_TYPE_UINT);
2052   /**
2053    * GstRtpBin::on-ssrc-collision:
2054    * @rtpbin: the object which received the signal
2055    * @session: the session
2056    * @ssrc: the SSRC
2057    *
2058    * Notify when we have an SSRC collision
2059    */
2060   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
2061       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
2062       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
2063       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2064       G_TYPE_UINT);
2065   /**
2066    * GstRtpBin::on-ssrc-validated:
2067    * @rtpbin: the object which received the signal
2068    * @session: the session
2069    * @ssrc: the SSRC
2070    *
2071    * Notify of a new SSRC that became validated.
2072    */
2073   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
2074       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
2075       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
2076       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2077       G_TYPE_UINT);
2078   /**
2079    * GstRtpBin::on-ssrc-active:
2080    * @rtpbin: the object which received the signal
2081    * @session: the session
2082    * @ssrc: the SSRC
2083    *
2084    * Notify of a SSRC that is active, i.e., sending RTCP.
2085    */
2086   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
2087       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
2088       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
2089       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2090       G_TYPE_UINT);
2091   /**
2092    * GstRtpBin::on-ssrc-sdes:
2093    * @rtpbin: the object which received the signal
2094    * @session: the session
2095    * @ssrc: the SSRC
2096    *
2097    * Notify of a SSRC that is active, i.e., sending RTCP.
2098    */
2099   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
2100       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
2101       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
2102       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2103       G_TYPE_UINT);
2104
2105   /**
2106    * GstRtpBin::on-bye-ssrc:
2107    * @rtpbin: the object which received the signal
2108    * @session: the session
2109    * @ssrc: the SSRC
2110    *
2111    * Notify of an SSRC that became inactive because of a BYE packet.
2112    */
2113   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
2114       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
2115       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
2116       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2117       G_TYPE_UINT);
2118   /**
2119    * GstRtpBin::on-bye-timeout:
2120    * @rtpbin: the object which received the signal
2121    * @session: the session
2122    * @ssrc: the SSRC
2123    *
2124    * Notify of an SSRC that has timed out because of BYE
2125    */
2126   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
2127       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
2128       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
2129       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2130       G_TYPE_UINT);
2131   /**
2132    * GstRtpBin::on-timeout:
2133    * @rtpbin: the object which received the signal
2134    * @session: the session
2135    * @ssrc: the SSRC
2136    *
2137    * Notify of an SSRC that has timed out
2138    */
2139   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
2140       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
2141       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
2142       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2143       G_TYPE_UINT);
2144   /**
2145    * GstRtpBin::on-sender-timeout:
2146    * @rtpbin: the object which received the signal
2147    * @session: the session
2148    * @ssrc: the SSRC
2149    *
2150    * Notify of a sender SSRC that has timed out and became a receiver
2151    */
2152   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
2153       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
2154       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
2155       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2156       G_TYPE_UINT);
2157
2158   /**
2159    * GstRtpBin::on-npt-stop:
2160    * @rtpbin: the object which received the signal
2161    * @session: the session
2162    * @ssrc: the SSRC
2163    *
2164    * Notify that SSRC sender has sent data up to the configured NPT stop time.
2165    */
2166   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
2167       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
2168       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
2169       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2170       G_TYPE_UINT);
2171
2172   /**
2173    * GstRtpBin::request-rtp-encoder:
2174    * @rtpbin: the object which received the signal
2175    * @session: the session
2176    *
2177    * Request an RTP encoder element for the given @session. The encoder
2178    * element will be added to the bin if not previously added.
2179    *
2180    * If no handler is connected, no encoder will be used.
2181    *
2182    * Since: 1.4
2183    */
2184   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
2185       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
2186       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2187           request_rtp_encoder), _gst_element_accumulator, NULL,
2188       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2189
2190   /**
2191    * GstRtpBin::request-rtp-decoder:
2192    * @rtpbin: the object which received the signal
2193    * @session: the session
2194    *
2195    * Request an RTP decoder element for the given @session. The decoder
2196    * element will be added to the bin if not previously added.
2197    *
2198    * If no handler is connected, no encoder will be used.
2199    *
2200    * Since: 1.4
2201    */
2202   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
2203       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
2204       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2205           request_rtp_decoder), _gst_element_accumulator, NULL,
2206       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2207
2208   /**
2209    * GstRtpBin::request-rtcp-encoder:
2210    * @rtpbin: the object which received the signal
2211    * @session: the session
2212    *
2213    * Request an RTCP encoder element for the given @session. The encoder
2214    * element will be added to the bin if not previously added.
2215    *
2216    * If no handler is connected, no encoder will be used.
2217    *
2218    * Since: 1.4
2219    */
2220   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
2221       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
2222       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2223           request_rtcp_encoder), _gst_element_accumulator, NULL,
2224       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2225
2226   /**
2227    * GstRtpBin::request-rtcp-decoder:
2228    * @rtpbin: the object which received the signal
2229    * @session: the session
2230    *
2231    * Request an RTCP decoder element for the given @session. The decoder
2232    * element will be added to the bin if not previously added.
2233    *
2234    * If no handler is connected, no encoder will be used.
2235    *
2236    * Since: 1.4
2237    */
2238   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
2239       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
2240       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2241           request_rtcp_decoder), _gst_element_accumulator, NULL,
2242       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2243
2244   /**
2245    * GstRtpBin::new-jitterbuffer:
2246    * @rtpbin: the object which received the signal
2247    * @jitterbuffer: the new jitterbuffer
2248    * @session: the session
2249    * @ssrc: the SSRC
2250    *
2251    * Notify that a new @jitterbuffer was created for @session and @ssrc.
2252    * This signal can, for example, be used to configure @jitterbuffer.
2253    *
2254    * Since: 1.4
2255    */
2256   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
2257       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2258       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2259           new_jitterbuffer), NULL, NULL, g_cclosure_marshal_generic,
2260       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2261
2262   /**
2263    * GstRtpBin::request-aux-sender:
2264    * @rtpbin: the object which received the signal
2265    * @session: the session
2266    *
2267    * Request an AUX sender element for the given @session. The AUX
2268    * element will be added to the bin.
2269    *
2270    * If no handler is connected, no AUX element will be used.
2271    *
2272    * Since: 1.4
2273    */
2274   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2275       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2276       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2277           request_aux_sender), _gst_element_accumulator, NULL,
2278       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2279   /**
2280    * GstRtpBin::request-aux-receiver:
2281    * @rtpbin: the object which received the signal
2282    * @session: the session
2283    *
2284    * Request an AUX receiver element for the given @session. The AUX
2285    * element will be added to the bin.
2286    *
2287    * If no handler is connected, no AUX element will be used.
2288    *
2289    * Since: 1.4
2290    */
2291   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2292       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2293       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2294           request_aux_receiver), _gst_element_accumulator, NULL,
2295       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2296   /**
2297    * GstRtpBin::on-new-sender-ssrc:
2298    * @rtpbin: the object which received the signal
2299    * @session: the session
2300    * @ssrc: the sender SSRC
2301    *
2302    * Notify of a new sender SSRC that entered @session.
2303    *
2304    * Since: 1.8
2305    */
2306   gst_rtp_bin_signals[SIGNAL_ON_NEW_SENDER_SSRC] =
2307       g_signal_new ("on-new-sender-ssrc", G_TYPE_FROM_CLASS (klass),
2308       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_sender_ssrc),
2309       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
2310       G_TYPE_UINT);
2311   /**
2312    * GstRtpBin::on-sender-ssrc-active:
2313    * @rtpbin: the object which received the signal
2314    * @session: the session
2315    * @ssrc: the sender SSRC
2316    *
2317    * Notify of a sender SSRC that is active, i.e., sending RTCP.
2318    *
2319    * Since: 1.8
2320    */
2321   gst_rtp_bin_signals[SIGNAL_ON_SENDER_SSRC_ACTIVE] =
2322       g_signal_new ("on-sender-ssrc-active", G_TYPE_FROM_CLASS (klass),
2323       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2324           on_sender_ssrc_active), NULL, NULL, g_cclosure_marshal_generic,
2325       G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT);
2326
2327
2328   /**
2329    * GstRtpBin::on-bundled-ssrc:
2330    * @rtpbin: the object which received the signal
2331    * @ssrc: the bundled SSRC
2332    *
2333    * Notify of a new incoming bundled SSRC. If no handler is connected to the
2334    * signal then the #GstRtpSession created for the recv_rtp_sink_\%u
2335    * request pad will be managing this new SSRC. However if there is a handler
2336    * connected then the application can decided to dispatch this new stream to
2337    * another session by providing its ID as return value of the handler. This
2338    * can be particularly useful to keep retransmission SSRCs grouped with the
2339    * session for which they handle retransmission.
2340    *
2341    * Since: 1.12
2342    */
2343   gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC] =
2344       g_signal_new ("on-bundled-ssrc", G_TYPE_FROM_CLASS (klass),
2345       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2346           on_bundled_ssrc), NULL, NULL,
2347       g_cclosure_marshal_generic, G_TYPE_UINT, 1, G_TYPE_UINT);
2348
2349
2350   g_object_class_install_property (gobject_class, PROP_SDES,
2351       g_param_spec_boxed ("sdes", "SDES",
2352           "The SDES items of this session",
2353           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2354
2355   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2356       g_param_spec_boolean ("do-lost", "Do Lost",
2357           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2358           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2359
2360   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2361       g_param_spec_boolean ("autoremove", "Auto Remove",
2362           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2364
2365   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2366       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2367           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2369
2370   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2371       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2372           "Use the pipeline running-time to set the NTP time in the RTCP SR messages "
2373           "(DEPRECATED: Use ntp-time-source property)",
2374           DEFAULT_USE_PIPELINE_CLOCK,
2375           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
2376   /**
2377    * GstRtpBin:buffer-mode:
2378    *
2379    * Control the buffering and timestamping mode used by the jitterbuffer.
2380    */
2381   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2382       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2383           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2384           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2385   /**
2386    * GstRtpBin:ntp-sync:
2387    *
2388    * Set the NTP time from the sender reports as the running-time on the
2389    * buffers. When both the sender and receiver have sychronized
2390    * running-time, i.e. when the clock and base-time is shared
2391    * between the receivers and the and the senders, this option can be
2392    * used to synchronize receivers on multiple machines.
2393    */
2394   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2395       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2396           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2397           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2398
2399   /**
2400    * GstRtpBin:rtcp-sync:
2401    *
2402    * If not synchronizing (directly) to the NTP clock, determines how to sync
2403    * the various streams.
2404    */
2405   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2406       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2407           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2408           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2409
2410   /**
2411    * GstRtpBin:rtcp-sync-interval:
2412    *
2413    * Determines how often to sync streams using RTCP data.
2414    */
2415   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2416       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2417           "RTCP SR interval synchronization (ms) (0 = always)",
2418           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2419           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2420
2421   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2422       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2423           "Send event downstream when a stream is synchronized to the sender",
2424           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2425
2426   /**
2427    * GstRtpBin:do-retransmission:
2428    *
2429    * Enables RTP retransmission on all streams. To control retransmission on
2430    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2431    * set the #GstRtpJitterBuffer::do-retransmission property on the
2432    * #GstRtpJitterBuffer object instead.
2433    */
2434   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2435       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2436           "Enable retransmission on all streams",
2437           DEFAULT_DO_RETRANSMISSION,
2438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2439
2440   /**
2441    * GstRtpBin:rtp-profile:
2442    *
2443    * Sets the default RTP profile of newly created RTP sessions. The
2444    * profile can be changed afterwards on a per-session basis.
2445    */
2446   g_object_class_install_property (gobject_class, PROP_RTP_PROFILE,
2447       g_param_spec_enum ("rtp-profile", "RTP Profile",
2448           "Default RTP profile of newly created sessions",
2449           GST_TYPE_RTP_PROFILE, DEFAULT_RTP_PROFILE,
2450           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2451
2452   g_object_class_install_property (gobject_class, PROP_NTP_TIME_SOURCE,
2453       g_param_spec_enum ("ntp-time-source", "NTP Time Source",
2454           "NTP time source for RTCP packets",
2455           gst_rtp_ntp_time_source_get_type (), DEFAULT_NTP_TIME_SOURCE,
2456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2457
2458   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_SEND_TIME,
2459       g_param_spec_boolean ("rtcp-sync-send-time", "RTCP Sync Send Time",
2460           "Use send time or capture time for RTCP sync "
2461           "(TRUE = send time, FALSE = capture time)",
2462           DEFAULT_RTCP_SYNC_SEND_TIME,
2463           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2464
2465   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
2466       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
2467           "Maximum amount of time in ms that the RTP time in RTCP SRs "
2468           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
2469           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
2470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2471
2472   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
2473       g_param_spec_uint ("max-dropout-time", "Max dropout time",
2474           "The maximum time (milliseconds) of missing packets tolerated.",
2475           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
2476           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2477
2478   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
2479       g_param_spec_uint ("max-misorder-time", "Max misorder time",
2480           "The maximum time (milliseconds) of misordered packets tolerated.",
2481           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
2482           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2483
2484   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
2485       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
2486           "Synchronize received streams to the RFC7273 clock "
2487           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
2488           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2489
2490   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
2491       g_param_spec_uint ("max-streams", "Max Streams",
2492           "The maximum number of streams to create for one session",
2493           0, G_MAXUINT, DEFAULT_MAX_STREAMS,
2494           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2495
2496   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2497   gstelement_class->request_new_pad =
2498       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2499   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2500
2501   /* sink pads */
2502   gst_element_class_add_static_pad_template (gstelement_class,
2503       &rtpbin_recv_rtp_sink_template);
2504   gst_element_class_add_static_pad_template (gstelement_class,
2505       &rtpbin_recv_rtcp_sink_template);
2506   gst_element_class_add_static_pad_template (gstelement_class,
2507       &rtpbin_send_rtp_sink_template);
2508
2509   /* src pads */
2510   gst_element_class_add_static_pad_template (gstelement_class,
2511       &rtpbin_recv_rtp_src_template);
2512   gst_element_class_add_static_pad_template (gstelement_class,
2513       &rtpbin_send_rtcp_src_template);
2514   gst_element_class_add_static_pad_template (gstelement_class,
2515       &rtpbin_send_rtp_src_template);
2516
2517   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2518       "Filter/Network/RTP",
2519       "Real-Time Transport Protocol bin",
2520       "Wim Taymans <wim.taymans@gmail.com>");
2521
2522   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2523
2524   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2525   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2526   klass->get_session = GST_DEBUG_FUNCPTR (gst_rtp_bin_get_session);
2527   klass->get_internal_session =
2528       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2529   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2530   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2531   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2532   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2533
2534   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2535 }
2536
2537 static void
2538 gst_rtp_bin_init (GstRtpBin * rtpbin)
2539 {
2540   gchar *cname;
2541
2542   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
2543   g_mutex_init (&rtpbin->priv->bin_lock);
2544   g_mutex_init (&rtpbin->priv->dyn_lock);
2545
2546   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2547   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2548   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2549   rtpbin->do_lost = DEFAULT_DO_LOST;
2550   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2551   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2552   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2553   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2554   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2555   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2556   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2557   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2558   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2559   rtpbin->rtp_profile = DEFAULT_RTP_PROFILE;
2560   rtpbin->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
2561   rtpbin->rtcp_sync_send_time = DEFAULT_RTCP_SYNC_SEND_TIME;
2562   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
2563   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
2564   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
2565   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
2566   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
2567
2568   /* some default SDES entries */
2569   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2570   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2571       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2572   g_free (cname);
2573 }
2574
2575 static void
2576 gst_rtp_bin_dispose (GObject * object)
2577 {
2578   GstRtpBin *rtpbin;
2579
2580   rtpbin = GST_RTP_BIN (object);
2581
2582   GST_RTP_BIN_LOCK (rtpbin);
2583   GST_DEBUG_OBJECT (object, "freeing sessions");
2584   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2585   g_slist_free (rtpbin->sessions);
2586   rtpbin->sessions = NULL;
2587   GST_RTP_BIN_UNLOCK (rtpbin);
2588
2589   G_OBJECT_CLASS (parent_class)->dispose (object);
2590 }
2591
2592 static void
2593 gst_rtp_bin_finalize (GObject * object)
2594 {
2595   GstRtpBin *rtpbin;
2596
2597   rtpbin = GST_RTP_BIN (object);
2598
2599   if (rtpbin->sdes)
2600     gst_structure_free (rtpbin->sdes);
2601
2602   g_mutex_clear (&rtpbin->priv->bin_lock);
2603   g_mutex_clear (&rtpbin->priv->dyn_lock);
2604
2605   G_OBJECT_CLASS (parent_class)->finalize (object);
2606 }
2607
2608
2609 static void
2610 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
2611 {
2612   GSList *item;
2613
2614   if (sdes == NULL)
2615     return;
2616
2617   GST_RTP_BIN_LOCK (bin);
2618
2619   GST_OBJECT_LOCK (bin);
2620   if (bin->sdes)
2621     gst_structure_free (bin->sdes);
2622   bin->sdes = gst_structure_copy (sdes);
2623   GST_OBJECT_UNLOCK (bin);
2624
2625   /* store in all sessions */
2626   for (item = bin->sessions; item; item = g_slist_next (item)) {
2627     GstRtpBinSession *session = item->data;
2628     g_object_set (session->session, "sdes", sdes, NULL);
2629   }
2630
2631   GST_RTP_BIN_UNLOCK (bin);
2632 }
2633
2634 static GstStructure *
2635 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2636 {
2637   GstStructure *result;
2638
2639   GST_OBJECT_LOCK (bin);
2640   result = gst_structure_copy (bin->sdes);
2641   GST_OBJECT_UNLOCK (bin);
2642
2643   return result;
2644 }
2645
2646 static void
2647 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2648     const GValue * value, GParamSpec * pspec)
2649 {
2650   GstRtpBin *rtpbin;
2651
2652   rtpbin = GST_RTP_BIN (object);
2653
2654   switch (prop_id) {
2655     case PROP_LATENCY:
2656       GST_RTP_BIN_LOCK (rtpbin);
2657       rtpbin->latency_ms = g_value_get_uint (value);
2658       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2659       GST_RTP_BIN_UNLOCK (rtpbin);
2660       /* propagate the property down to the jitterbuffer */
2661       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2662       break;
2663     case PROP_DROP_ON_LATENCY:
2664       GST_RTP_BIN_LOCK (rtpbin);
2665       rtpbin->drop_on_latency = g_value_get_boolean (value);
2666       GST_RTP_BIN_UNLOCK (rtpbin);
2667       /* propagate the property down to the jitterbuffer */
2668       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2669           "drop-on-latency", value);
2670       break;
2671     case PROP_SDES:
2672       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2673       break;
2674     case PROP_DO_LOST:
2675       GST_RTP_BIN_LOCK (rtpbin);
2676       rtpbin->do_lost = g_value_get_boolean (value);
2677       GST_RTP_BIN_UNLOCK (rtpbin);
2678       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2679       break;
2680     case PROP_NTP_SYNC:
2681       rtpbin->ntp_sync = g_value_get_boolean (value);
2682       break;
2683     case PROP_RTCP_SYNC:
2684       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2685       break;
2686     case PROP_RTCP_SYNC_INTERVAL:
2687       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2688       break;
2689     case PROP_IGNORE_PT:
2690       rtpbin->ignore_pt = g_value_get_boolean (value);
2691       break;
2692     case PROP_AUTOREMOVE:
2693       rtpbin->priv->autoremove = g_value_get_boolean (value);
2694       break;
2695     case PROP_USE_PIPELINE_CLOCK:
2696     {
2697       GSList *sessions;
2698       GST_RTP_BIN_LOCK (rtpbin);
2699       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2700       for (sessions = rtpbin->sessions; sessions;
2701           sessions = g_slist_next (sessions)) {
2702         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2703
2704         g_object_set (G_OBJECT (session->session),
2705             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2706       }
2707       GST_RTP_BIN_UNLOCK (rtpbin);
2708     }
2709       break;
2710     case PROP_DO_SYNC_EVENT:
2711       rtpbin->send_sync_event = g_value_get_boolean (value);
2712       break;
2713     case PROP_BUFFER_MODE:
2714       GST_RTP_BIN_LOCK (rtpbin);
2715       rtpbin->buffer_mode = g_value_get_enum (value);
2716       GST_RTP_BIN_UNLOCK (rtpbin);
2717       /* propagate the property down to the jitterbuffer */
2718       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2719       break;
2720     case PROP_DO_RETRANSMISSION:
2721       GST_RTP_BIN_LOCK (rtpbin);
2722       rtpbin->do_retransmission = g_value_get_boolean (value);
2723       GST_RTP_BIN_UNLOCK (rtpbin);
2724       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2725           "do-retransmission", value);
2726       break;
2727     case PROP_RTP_PROFILE:
2728       rtpbin->rtp_profile = g_value_get_enum (value);
2729       break;
2730     case PROP_NTP_TIME_SOURCE:{
2731       GSList *sessions;
2732       GST_RTP_BIN_LOCK (rtpbin);
2733       rtpbin->ntp_time_source = g_value_get_enum (value);
2734       for (sessions = rtpbin->sessions; sessions;
2735           sessions = g_slist_next (sessions)) {
2736         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2737
2738         g_object_set (G_OBJECT (session->session),
2739             "ntp-time-source", rtpbin->ntp_time_source, NULL);
2740       }
2741       GST_RTP_BIN_UNLOCK (rtpbin);
2742       break;
2743     }
2744     case PROP_RTCP_SYNC_SEND_TIME:{
2745       GSList *sessions;
2746       GST_RTP_BIN_LOCK (rtpbin);
2747       rtpbin->rtcp_sync_send_time = g_value_get_boolean (value);
2748       for (sessions = rtpbin->sessions; sessions;
2749           sessions = g_slist_next (sessions)) {
2750         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2751
2752         g_object_set (G_OBJECT (session->session),
2753             "rtcp-sync-send-time", rtpbin->rtcp_sync_send_time, NULL);
2754       }
2755       GST_RTP_BIN_UNLOCK (rtpbin);
2756       break;
2757     }
2758     case PROP_MAX_RTCP_RTP_TIME_DIFF:
2759       GST_RTP_BIN_LOCK (rtpbin);
2760       rtpbin->max_rtcp_rtp_time_diff = g_value_get_int (value);
2761       GST_RTP_BIN_UNLOCK (rtpbin);
2762       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2763           "max-rtcp-rtp-time-diff", value);
2764       break;
2765     case PROP_MAX_DROPOUT_TIME:
2766       GST_RTP_BIN_LOCK (rtpbin);
2767       rtpbin->max_dropout_time = g_value_get_uint (value);
2768       GST_RTP_BIN_UNLOCK (rtpbin);
2769       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2770           "max-dropout-time", value);
2771       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
2772           value);
2773       break;
2774     case PROP_MAX_MISORDER_TIME:
2775       GST_RTP_BIN_LOCK (rtpbin);
2776       rtpbin->max_misorder_time = g_value_get_uint (value);
2777       GST_RTP_BIN_UNLOCK (rtpbin);
2778       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2779           "max-misorder-time", value);
2780       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-misorder-time",
2781           value);
2782       break;
2783     case PROP_RFC7273_SYNC:
2784       rtpbin->rfc7273_sync = g_value_get_boolean (value);
2785       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2786           "rfc7273-sync", value);
2787       break;
2788     case PROP_MAX_STREAMS:
2789       rtpbin->max_streams = g_value_get_uint (value);
2790       break;
2791     default:
2792       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2793       break;
2794   }
2795 }
2796
2797 static void
2798 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2799     GValue * value, GParamSpec * pspec)
2800 {
2801   GstRtpBin *rtpbin;
2802
2803   rtpbin = GST_RTP_BIN (object);
2804
2805   switch (prop_id) {
2806     case PROP_LATENCY:
2807       GST_RTP_BIN_LOCK (rtpbin);
2808       g_value_set_uint (value, rtpbin->latency_ms);
2809       GST_RTP_BIN_UNLOCK (rtpbin);
2810       break;
2811     case PROP_DROP_ON_LATENCY:
2812       GST_RTP_BIN_LOCK (rtpbin);
2813       g_value_set_boolean (value, rtpbin->drop_on_latency);
2814       GST_RTP_BIN_UNLOCK (rtpbin);
2815       break;
2816     case PROP_SDES:
2817       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2818       break;
2819     case PROP_DO_LOST:
2820       GST_RTP_BIN_LOCK (rtpbin);
2821       g_value_set_boolean (value, rtpbin->do_lost);
2822       GST_RTP_BIN_UNLOCK (rtpbin);
2823       break;
2824     case PROP_IGNORE_PT:
2825       g_value_set_boolean (value, rtpbin->ignore_pt);
2826       break;
2827     case PROP_NTP_SYNC:
2828       g_value_set_boolean (value, rtpbin->ntp_sync);
2829       break;
2830     case PROP_RTCP_SYNC:
2831       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2832       break;
2833     case PROP_RTCP_SYNC_INTERVAL:
2834       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2835       break;
2836     case PROP_AUTOREMOVE:
2837       g_value_set_boolean (value, rtpbin->priv->autoremove);
2838       break;
2839     case PROP_BUFFER_MODE:
2840       g_value_set_enum (value, rtpbin->buffer_mode);
2841       break;
2842     case PROP_USE_PIPELINE_CLOCK:
2843       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2844       break;
2845     case PROP_DO_SYNC_EVENT:
2846       g_value_set_boolean (value, rtpbin->send_sync_event);
2847       break;
2848     case PROP_DO_RETRANSMISSION:
2849       GST_RTP_BIN_LOCK (rtpbin);
2850       g_value_set_boolean (value, rtpbin->do_retransmission);
2851       GST_RTP_BIN_UNLOCK (rtpbin);
2852       break;
2853     case PROP_RTP_PROFILE:
2854       g_value_set_enum (value, rtpbin->rtp_profile);
2855       break;
2856     case PROP_NTP_TIME_SOURCE:
2857       g_value_set_enum (value, rtpbin->ntp_time_source);
2858       break;
2859     case PROP_RTCP_SYNC_SEND_TIME:
2860       g_value_set_boolean (value, rtpbin->rtcp_sync_send_time);
2861       break;
2862     case PROP_MAX_RTCP_RTP_TIME_DIFF:
2863       GST_RTP_BIN_LOCK (rtpbin);
2864       g_value_set_int (value, rtpbin->max_rtcp_rtp_time_diff);
2865       GST_RTP_BIN_UNLOCK (rtpbin);
2866       break;
2867     case PROP_MAX_DROPOUT_TIME:
2868       g_value_set_uint (value, rtpbin->max_dropout_time);
2869       break;
2870     case PROP_MAX_MISORDER_TIME:
2871       g_value_set_uint (value, rtpbin->max_misorder_time);
2872       break;
2873     case PROP_RFC7273_SYNC:
2874       g_value_set_boolean (value, rtpbin->rfc7273_sync);
2875       break;
2876     case PROP_MAX_STREAMS:
2877       g_value_set_uint (value, rtpbin->max_streams);
2878       break;
2879     default:
2880       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2881       break;
2882   }
2883 }
2884
2885 static void
2886 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2887 {
2888   GstRtpBin *rtpbin;
2889
2890   rtpbin = GST_RTP_BIN (bin);
2891
2892   switch (GST_MESSAGE_TYPE (message)) {
2893     case GST_MESSAGE_ELEMENT:
2894     {
2895       const GstStructure *s = gst_message_get_structure (message);
2896
2897       /* we change the structure name and add the session ID to it */
2898       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2899         GstRtpBinSession *sess;
2900
2901         /* find the session we set it as object data */
2902         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2903             "GstRTPBin.session");
2904
2905         if (G_LIKELY (sess)) {
2906           message = gst_message_make_writable (message);
2907           s = gst_message_get_structure (message);
2908           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2909               sess->id, NULL);
2910         }
2911       }
2912       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2913       break;
2914     }
2915     case GST_MESSAGE_BUFFERING:
2916     {
2917       gint percent;
2918       gint min_percent = 100;
2919       GSList *sessions, *streams;
2920       GstRtpBinStream *stream;
2921       gboolean change = FALSE, active = FALSE;
2922       GstClockTime min_out_time;
2923       GstBufferingMode mode;
2924       gint avg_in, avg_out;
2925       gint64 buffering_left;
2926
2927       gst_message_parse_buffering (message, &percent);
2928       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2929           &buffering_left);
2930
2931       stream =
2932           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2933           "GstRTPBin.stream");
2934
2935       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2936
2937       /* get the stream */
2938       if (G_LIKELY (stream)) {
2939         GST_RTP_BIN_LOCK (rtpbin);
2940         /* fill in the percent */
2941         stream->percent = percent;
2942
2943         /* calculate the min value for all streams */
2944         for (sessions = rtpbin->sessions; sessions;
2945             sessions = g_slist_next (sessions)) {
2946           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2947
2948           GST_RTP_SESSION_LOCK (session);
2949           if (session->streams) {
2950             for (streams = session->streams; streams;
2951                 streams = g_slist_next (streams)) {
2952               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2953
2954               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2955                   stream->percent);
2956
2957               /* find min percent */
2958               if (min_percent > stream->percent)
2959                 min_percent = stream->percent;
2960             }
2961           } else {
2962             GST_INFO_OBJECT (bin,
2963                 "session has no streams, setting min_percent to 0");
2964             min_percent = 0;
2965           }
2966           GST_RTP_SESSION_UNLOCK (session);
2967         }
2968         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2969
2970         if (rtpbin->buffering) {
2971           if (min_percent == 100) {
2972             rtpbin->buffering = FALSE;
2973             active = TRUE;
2974             change = TRUE;
2975           }
2976         } else {
2977           if (min_percent < 100) {
2978             /* pause the streams */
2979             rtpbin->buffering = TRUE;
2980             active = FALSE;
2981             change = TRUE;
2982           }
2983         }
2984         GST_RTP_BIN_UNLOCK (rtpbin);
2985
2986         gst_message_unref (message);
2987
2988         /* make a new buffering message with the min value */
2989         message =
2990             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2991         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2992             buffering_left);
2993
2994         if (G_UNLIKELY (change)) {
2995           GstClock *clock;
2996           guint64 running_time = 0;
2997           guint64 offset = 0;
2998
2999           /* figure out the running time when we have a clock */
3000           if (G_LIKELY ((clock =
3001                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
3002             guint64 now, base_time;
3003
3004             now = gst_clock_get_time (clock);
3005             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
3006             running_time = now - base_time;
3007             gst_object_unref (clock);
3008           }
3009           GST_DEBUG_OBJECT (bin,
3010               "running time now %" GST_TIME_FORMAT,
3011               GST_TIME_ARGS (running_time));
3012
3013           GST_RTP_BIN_LOCK (rtpbin);
3014
3015           /* when we reactivate, calculate the offsets so that all streams have
3016            * an output time that is at least as big as the running_time */
3017           offset = 0;
3018           if (active) {
3019             if (running_time > rtpbin->buffer_start) {
3020               offset = running_time - rtpbin->buffer_start;
3021               if (offset >= rtpbin->latency_ns)
3022                 offset -= rtpbin->latency_ns;
3023               else
3024                 offset = 0;
3025             }
3026           }
3027
3028           /* pause all streams */
3029           min_out_time = -1;
3030           for (sessions = rtpbin->sessions; sessions;
3031               sessions = g_slist_next (sessions)) {
3032             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
3033
3034             GST_RTP_SESSION_LOCK (session);
3035             for (streams = session->streams; streams;
3036                 streams = g_slist_next (streams)) {
3037               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
3038               GstElement *element = stream->buffer;
3039               guint64 last_out;
3040
3041               g_signal_emit_by_name (element, "set-active", active, offset,
3042                   &last_out);
3043
3044               if (!active) {
3045                 g_object_get (element, "percent", &stream->percent, NULL);
3046
3047                 if (last_out == -1)
3048                   last_out = 0;
3049                 if (min_out_time == -1 || last_out < min_out_time)
3050                   min_out_time = last_out;
3051               }
3052
3053               GST_DEBUG_OBJECT (bin,
3054                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
3055                   GST_TIME_FORMAT ", percent %d", element, active,
3056                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
3057                   stream->percent);
3058             }
3059             GST_RTP_SESSION_UNLOCK (session);
3060           }
3061           GST_DEBUG_OBJECT (bin,
3062               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
3063
3064           /* the buffer_start is the min out time of all paused jitterbuffers */
3065           if (!active)
3066             rtpbin->buffer_start = min_out_time;
3067
3068           GST_RTP_BIN_UNLOCK (rtpbin);
3069         }
3070       }
3071       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3072       break;
3073     }
3074     default:
3075     {
3076       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
3077       break;
3078     }
3079   }
3080 }
3081
3082 static GstStateChangeReturn
3083 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
3084 {
3085   GstStateChangeReturn res;
3086   GstRtpBin *rtpbin;
3087   GstRtpBinPrivate *priv;
3088
3089   rtpbin = GST_RTP_BIN (element);
3090   priv = rtpbin->priv;
3091
3092   switch (transition) {
3093     case GST_STATE_CHANGE_NULL_TO_READY:
3094       break;
3095     case GST_STATE_CHANGE_READY_TO_PAUSED:
3096       priv->last_ntpnstime = 0;
3097       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
3098       g_atomic_int_set (&priv->shutdown, 0);
3099       break;
3100     case GST_STATE_CHANGE_PAUSED_TO_READY:
3101       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
3102       g_atomic_int_set (&priv->shutdown, 1);
3103       /* wait for all callbacks to end by taking the lock. No new callbacks will
3104        * be able to happen as we set the shutdown flag. */
3105       GST_RTP_BIN_DYN_LOCK (rtpbin);
3106       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
3107       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3108       break;
3109     default:
3110       break;
3111   }
3112
3113   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3114
3115   switch (transition) {
3116     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3117       break;
3118     case GST_STATE_CHANGE_PAUSED_TO_READY:
3119       break;
3120     case GST_STATE_CHANGE_READY_TO_NULL:
3121       break;
3122     default:
3123       break;
3124   }
3125   return res;
3126 }
3127
3128 static GstElement *
3129 session_request_element (GstRtpBinSession * session, guint signal)
3130 {
3131   GstElement *element = NULL;
3132   GstRtpBin *bin = session->bin;
3133
3134   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
3135
3136   if (element) {
3137     if (!bin_manage_element (bin, element))
3138       goto manage_failed;
3139     session->elements = g_slist_prepend (session->elements, element);
3140   }
3141   return element;
3142
3143   /* ERRORS */
3144 manage_failed:
3145   {
3146     GST_WARNING_OBJECT (bin, "unable to manage element");
3147     gst_object_unref (element);
3148     return NULL;
3149   }
3150 }
3151
3152 static gboolean
3153 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
3154 {
3155   GstPad *gpad = GST_PAD_CAST (user_data);
3156
3157   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
3158   gst_pad_store_sticky_event (gpad, *event);
3159
3160   return TRUE;
3161 }
3162
3163 /* a new pad (SSRC) was created in @session. This signal is emited from the
3164  * payload demuxer. */
3165 static void
3166 new_payload_found (GstElement * element, guint pt, GstPad * pad,
3167     GstRtpBinStream * stream)
3168 {
3169   GstRtpBin *rtpbin;
3170   GstElementClass *klass;
3171   GstPadTemplate *templ;
3172   gchar *padname;
3173   GstPad *gpad;
3174
3175   rtpbin = stream->bin;
3176
3177   GST_DEBUG_OBJECT (rtpbin, "new payload pad %u", pt);
3178
3179   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3180
3181   /* ghost the pad to the parent */
3182   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3183   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3184   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3185       stream->session->id, stream->ssrc, pt);
3186   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3187   g_free (padname);
3188   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
3189
3190   gst_pad_set_active (gpad, TRUE);
3191   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3192
3193   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3194   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3195
3196   return;
3197
3198 shutdown:
3199   {
3200     GST_DEBUG ("ignoring, we are shutting down");
3201     return;
3202   }
3203 }
3204
3205 static void
3206 payload_pad_removed (GstElement * element, GstPad * pad,
3207     GstRtpBinStream * stream)
3208 {
3209   GstRtpBin *rtpbin;
3210   GstPad *gpad;
3211
3212   rtpbin = stream->bin;
3213
3214   GST_DEBUG ("payload pad removed");
3215
3216   GST_RTP_BIN_DYN_LOCK (rtpbin);
3217   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
3218     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
3219
3220     gst_pad_set_active (gpad, FALSE);
3221     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3222   }
3223   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
3224 }
3225
3226 static GstCaps *
3227 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
3228 {
3229   GstRtpBin *rtpbin;
3230   GstCaps *caps;
3231
3232   rtpbin = session->bin;
3233
3234   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %u in session %u", pt,
3235       session->id);
3236
3237   caps = get_pt_map (session, pt);
3238   if (!caps)
3239     goto no_caps;
3240
3241   return caps;
3242
3243   /* ERRORS */
3244 no_caps:
3245   {
3246     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
3247     return NULL;
3248   }
3249 }
3250
3251 static void
3252 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
3253 {
3254   GST_DEBUG_OBJECT (session->bin,
3255       "emiting signal for pt type changed to %u in session %u", pt,
3256       session->id);
3257
3258   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
3259       0, session->id, pt);
3260 }
3261
3262 /* emited when caps changed for the session */
3263 static void
3264 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
3265 {
3266   GstRtpBin *bin;
3267   GstCaps *caps;
3268   gint payload;
3269   const GstStructure *s;
3270
3271   bin = session->bin;
3272
3273   g_object_get (pad, "caps", &caps, NULL);
3274
3275   if (caps == NULL)
3276     return;
3277
3278   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
3279
3280   s = gst_caps_get_structure (caps, 0);
3281
3282   /* get payload, finish when it's not there */
3283   if (!gst_structure_get_int (s, "payload", &payload)) {
3284     gst_caps_unref (caps);
3285     return;
3286   }
3287
3288   GST_RTP_SESSION_LOCK (session);
3289   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
3290   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
3291   GST_RTP_SESSION_UNLOCK (session);
3292 }
3293
3294 /* a new pad (SSRC) was created in @session */
3295 static void
3296 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
3297     GstRtpBinSession * session)
3298 {
3299   GstRtpBin *rtpbin;
3300   GstRtpBinStream *stream;
3301   GstPad *sinkpad, *srcpad;
3302   gchar *padname;
3303
3304   rtpbin = session->bin;
3305
3306   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
3307       GST_DEBUG_PAD_NAME (pad));
3308
3309   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
3310
3311   GST_RTP_SESSION_LOCK (session);
3312
3313   /* create new stream */
3314   stream = create_stream (session, ssrc);
3315   if (!stream)
3316     goto no_stream;
3317
3318   /* get pad and link */
3319   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
3320   padname = g_strdup_printf ("src_%u", ssrc);
3321   srcpad = gst_element_get_static_pad (element, padname);
3322   g_free (padname);
3323   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
3324   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
3325   gst_object_unref (sinkpad);
3326   gst_object_unref (srcpad);
3327
3328   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
3329   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
3330   srcpad = gst_element_get_static_pad (element, padname);
3331   g_free (padname);
3332   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
3333   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
3334   gst_object_unref (sinkpad);
3335   gst_object_unref (srcpad);
3336
3337   /* connect to the RTCP sync signal from the jitterbuffer */
3338   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
3339   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
3340       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
3341
3342   if (stream->demux) {
3343     /* connect to the new-pad signal of the payload demuxer, this will expose the
3344      * new pad by ghosting it. */
3345     stream->demux_newpad_sig = g_signal_connect (stream->demux,
3346         "new-payload-type", (GCallback) new_payload_found, stream);
3347     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
3348         "pad-removed", (GCallback) payload_pad_removed, stream);
3349
3350     /* connect to the request-pt-map signal. This signal will be emited by the
3351      * demuxer so that it can apply a proper caps on the buffers for the
3352      * depayloaders. */
3353     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
3354         "request-pt-map", (GCallback) pt_map_requested, session);
3355     /* connect to the  signal so it can be forwarded. */
3356     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
3357         "payload-type-change", (GCallback) payload_type_change, session);
3358   } else {
3359     /* add rtpjitterbuffer src pad to pads */
3360     GstElementClass *klass;
3361     GstPadTemplate *templ;
3362     gchar *padname;
3363     GstPad *gpad, *pad;
3364
3365     pad = gst_element_get_static_pad (stream->buffer, "src");
3366
3367     /* ghost the pad to the parent */
3368     klass = GST_ELEMENT_GET_CLASS (rtpbin);
3369     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
3370     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
3371         stream->session->id, stream->ssrc, 255);
3372     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
3373     g_free (padname);
3374
3375     gst_pad_set_active (gpad, TRUE);
3376     gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
3377     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
3378
3379     gst_object_unref (pad);
3380   }
3381
3382   GST_RTP_SESSION_UNLOCK (session);
3383   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3384
3385   return;
3386
3387   /* ERRORS */
3388 shutdown:
3389   {
3390     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
3391     return;
3392   }
3393 no_stream:
3394   {
3395     GST_RTP_SESSION_UNLOCK (session);
3396     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
3397     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
3398     return;
3399   }
3400 }
3401
3402 static void
3403 session_maybe_create_bundle_demuxer (GstRtpBinSession * session)
3404 {
3405   GstRtpBin *rtpbin;
3406
3407   if (session->bundle_demux)
3408     return;
3409
3410   rtpbin = session->bin;
3411   if (g_signal_has_handler_pending (rtpbin,
3412           gst_rtp_bin_signals[SIGNAL_ON_BUNDLED_SSRC], 0, TRUE)) {
3413     GST_DEBUG_OBJECT (rtpbin, "Adding a bundle SSRC demuxer to session %u",
3414         session->id);
3415     session->bundle_demux = gst_element_factory_make ("rtpssrcdemux", NULL);
3416     session->bundle_demux_newpad_sig = g_signal_connect (session->bundle_demux,
3417         "new-ssrc-pad", (GCallback) new_bundled_ssrc_pad_found, session);
3418
3419     gst_bin_add (GST_BIN_CAST (rtpbin), session->bundle_demux);
3420     gst_element_sync_state_with_parent (session->bundle_demux);
3421   } else {
3422     GST_DEBUG_OBJECT (rtpbin,
3423         "No handler for the on-bundled-ssrc signal so no need for a bundle SSRC demuxer in session %u",
3424         session->id);
3425   }
3426 }
3427
3428 static GstPad *
3429 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session,
3430     gboolean bundle_demuxer_needed)
3431 {
3432   guint sessid = session->id;
3433   GstPad *recv_rtp_sink;
3434   GstPad *funnel_src;
3435   GstElement *decoder;
3436
3437   g_assert (!session->recv_rtp_sink);
3438
3439   /* get recv_rtp pad and store */
3440   session->recv_rtp_sink =
3441       gst_element_get_request_pad (session->session, "recv_rtp_sink");
3442   if (session->recv_rtp_sink == NULL)
3443     goto pad_failed;
3444
3445   g_signal_connect (session->recv_rtp_sink, "notify::caps",
3446       (GCallback) caps_changed, session);
3447
3448   if (bundle_demuxer_needed)
3449     session_maybe_create_bundle_demuxer (session);
3450
3451   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
3452   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
3453   if (decoder) {
3454     GstPad *decsrc, *decsink;
3455     GstPadLinkReturn ret;
3456
3457     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
3458     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
3459     if (decsink == NULL)
3460       goto dec_sink_failed;
3461
3462     recv_rtp_sink = decsink;
3463
3464     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
3465     if (decsrc == NULL)
3466       goto dec_src_failed;
3467
3468     if (session->bundle_demux) {
3469       GstPad *demux_sink;
3470       demux_sink = gst_element_get_static_pad (session->bundle_demux, "sink");
3471       ret = gst_pad_link (decsrc, demux_sink);
3472       gst_object_unref (demux_sink);
3473     } else {
3474       ret = gst_pad_link (decsrc, session->recv_rtp_sink);
3475     }
3476     gst_object_unref (decsrc);
3477
3478     if (ret != GST_PAD_LINK_OK)
3479       goto dec_link_failed;
3480
3481   } else {
3482     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
3483     if (session->bundle_demux) {
3484       recv_rtp_sink =
3485           gst_element_get_static_pad (session->bundle_demux, "sink");
3486     } else {
3487       recv_rtp_sink =
3488           gst_element_get_request_pad (session->rtp_funnel, "sink_%u");
3489     }
3490   }
3491
3492   funnel_src = gst_element_get_static_pad (session->rtp_funnel, "src");
3493   gst_pad_link (funnel_src, session->recv_rtp_sink);
3494   gst_object_unref (funnel_src);
3495
3496   return recv_rtp_sink;
3497
3498   /* ERRORS */
3499 pad_failed:
3500   {
3501     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
3502     return NULL;
3503   }
3504 dec_sink_failed:
3505   {
3506     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
3507     return NULL;
3508   }
3509 dec_src_failed:
3510   {
3511     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
3512     gst_object_unref (recv_rtp_sink);
3513     return NULL;
3514   }
3515 dec_link_failed:
3516   {
3517     g_warning ("rtpbin: failed to link rtp decoder for session %u", sessid);
3518     gst_object_unref (recv_rtp_sink);
3519     return NULL;
3520   }
3521 }
3522
3523 static void
3524 complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
3525     guint sessid)
3526 {
3527   GstElement *aux;
3528   GstPad *recv_rtp_src;
3529
3530   g_assert (!session->recv_rtp_src);
3531
3532   session->recv_rtp_src =
3533       gst_element_get_static_pad (session->session, "recv_rtp_src");
3534   if (session->recv_rtp_src == NULL)
3535     goto pad_failed;
3536
3537   /* find out if we need AUX elements or if we can go into the SSRC demuxer
3538    * directly */
3539   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
3540   if (aux) {
3541     gchar *pname;
3542     GstPad *auxsink;
3543     GstPadLinkReturn ret;
3544
3545     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
3546
3547     pname = g_strdup_printf ("sink_%u", sessid);
3548     auxsink = gst_element_get_static_pad (aux, pname);
3549     g_free (pname);
3550     if (auxsink == NULL)
3551       goto aux_sink_failed;
3552
3553     ret = gst_pad_link (session->recv_rtp_src, auxsink);
3554     gst_object_unref (auxsink);
3555     if (ret != GST_PAD_LINK_OK)
3556       goto aux_link_failed;
3557
3558     /* this can be NULL when this AUX element is not to be linked to
3559      * an SSRC demuxer */
3560     pname = g_strdup_printf ("src_%u", sessid);
3561     recv_rtp_src = gst_element_get_static_pad (aux, pname);
3562     g_free (pname);
3563   } else {
3564     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
3565   }
3566
3567   if (recv_rtp_src) {
3568     GstPad *sinkdpad;
3569
3570     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
3571     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
3572     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
3573     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3574     gst_object_unref (sinkdpad);
3575     gst_object_unref (recv_rtp_src);
3576
3577     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
3578     session->demux_newpad_sig = g_signal_connect (session->demux,
3579         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
3580     session->demux_padremoved_sig = g_signal_connect (session->demux,
3581         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
3582   }
3583
3584   return;
3585
3586 pad_failed:
3587   {
3588     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
3589     return;
3590   }
3591 aux_sink_failed:
3592   {
3593     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
3594     return;
3595   }
3596 aux_link_failed:
3597   {
3598     g_warning ("rtpbin: failed to link AUX pad to session %u", sessid);
3599     return;
3600   }
3601 }
3602
3603 /* Create a pad for receiving RTP for the session in @name. Must be called with
3604  * RTP_BIN_LOCK.
3605  */
3606 static GstPad *
3607 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3608 {
3609   guint sessid;
3610   GstRtpBinSession *session;
3611   GstPad *recv_rtp_sink;
3612
3613   /* first get the session number */
3614   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
3615     goto no_name;
3616
3617   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
3618
3619   /* get or create session */
3620   session = find_session_by_id (rtpbin, sessid);
3621   if (!session) {
3622     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
3623     /* create session now */
3624     session = create_session (rtpbin, sessid);
3625     if (session == NULL)
3626       goto create_error;
3627   }
3628
3629   /* check if pad was requested */
3630   if (session->recv_rtp_sink_ghost != NULL)
3631     return session->recv_rtp_sink_ghost;
3632
3633   /* setup the session sink pad */
3634   recv_rtp_sink = complete_session_sink (rtpbin, session, TRUE);
3635   if (!recv_rtp_sink)
3636     goto session_sink_failed;
3637
3638
3639   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
3640   session->recv_rtp_sink_ghost =
3641       gst_ghost_pad_new_from_template (name, recv_rtp_sink, templ);
3642   gst_object_unref (recv_rtp_sink);
3643   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
3644   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
3645
3646   complete_session_receiver (rtpbin, session, sessid);
3647
3648   return session->recv_rtp_sink_ghost;
3649
3650   /* ERRORS */
3651 no_name:
3652   {
3653     g_warning ("rtpbin: invalid name given");
3654     return NULL;
3655   }
3656 create_error:
3657   {
3658     /* create_session already warned */
3659     return NULL;
3660   }
3661 session_sink_failed:
3662   {
3663     /* warning already done */
3664     return NULL;
3665   }
3666 }
3667
3668 static void
3669 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3670 {
3671   if (session->demux_newpad_sig) {
3672     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
3673     session->demux_newpad_sig = 0;
3674   }
3675   if (session->demux_padremoved_sig) {
3676     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
3677     session->demux_padremoved_sig = 0;
3678   }
3679   if (session->bundle_demux_newpad_sig) {
3680     g_signal_handler_disconnect (session->bundle_demux,
3681         session->bundle_demux_newpad_sig);
3682     session->bundle_demux_newpad_sig = 0;
3683   }
3684   if (session->recv_rtp_src) {
3685     gst_object_unref (session->recv_rtp_src);
3686     session->recv_rtp_src = NULL;
3687   }
3688   if (session->recv_rtp_sink) {
3689     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
3690     gst_object_unref (session->recv_rtp_sink);
3691     session->recv_rtp_sink = NULL;
3692   }
3693   if (session->recv_rtp_sink_ghost) {
3694     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
3695     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3696         session->recv_rtp_sink_ghost);
3697     session->recv_rtp_sink_ghost = NULL;
3698   }
3699 }
3700
3701 static GstPad *
3702 complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session,
3703     guint sessid, gboolean bundle_demuxer_needed)
3704 {
3705   GstElement *decoder;
3706   GstPad *sinkdpad;
3707   GstPad *decsink = NULL;
3708   GstPad *funnel_src;
3709
3710   /* get recv_rtp pad and store */
3711   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
3712   session->recv_rtcp_sink =
3713       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
3714   if (session->recv_rtcp_sink == NULL)
3715     goto pad_failed;
3716
3717   if (bundle_demuxer_needed)
3718     session_maybe_create_bundle_demuxer (session);
3719
3720   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
3721   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
3722   if (decoder) {
3723     GstPad *decsrc;
3724     GstPadLinkReturn ret;
3725
3726     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
3727     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
3728     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
3729
3730     if (decsink == NULL)
3731       goto dec_sink_failed;
3732
3733     if (decsrc == NULL)
3734       goto dec_src_failed;
3735
3736     if (session->bundle_demux) {
3737       GstPad *demux_sink;
3738       demux_sink =
3739           gst_element_get_static_pad (session->bundle_demux, "rtcp_sink");
3740       ret = gst_pad_link (decsrc, demux_sink);
3741       gst_object_unref (demux_sink);
3742     } else {
3743       ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
3744     }
3745     gst_object_unref (decsrc);
3746
3747     if (ret != GST_PAD_LINK_OK)
3748       goto dec_link_failed;
3749   } else {
3750     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
3751     if (session->bundle_demux) {
3752       decsink = gst_element_get_static_pad (session->bundle_demux, "rtcp_sink");
3753     } else {
3754       decsink = gst_element_get_request_pad (session->rtcp_funnel, "sink_%u");
3755     }
3756   }
3757
3758   /* get srcpad, link to SSRCDemux */
3759   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
3760   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
3761   if (session->sync_src == NULL)
3762     goto src_pad_failed;
3763
3764   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
3765   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
3766   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3767   gst_object_unref (sinkdpad);
3768
3769   funnel_src = gst_element_get_static_pad (session->rtcp_funnel, "src");
3770   gst_pad_link (funnel_src, session->recv_rtcp_sink);
3771   gst_object_unref (funnel_src);
3772
3773   return decsink;
3774
3775 pad_failed:
3776   {
3777     g_warning ("rtpbin: failed to get session rtcp_sink pad");
3778     return NULL;
3779   }
3780 dec_sink_failed:
3781   {
3782     g_warning ("rtpbin: failed to get decoder sink pad for session %u", sessid);
3783     return NULL;
3784   }
3785 dec_src_failed:
3786   {
3787     g_warning ("rtpbin: failed to get decoder src pad for session %u", sessid);
3788     goto cleanup;
3789   }
3790 dec_link_failed:
3791   {
3792     g_warning ("rtpbin: failed to link rtcp decoder for session %u", sessid);
3793     goto cleanup;
3794   }
3795 src_pad_failed:
3796   {
3797     g_warning ("rtpbin: failed to get session sync_src pad");
3798   }
3799
3800 cleanup:
3801   gst_object_unref (decsink);
3802   return NULL;
3803 }
3804
3805 /* Create a pad for receiving RTCP for the session in @name. Must be called with
3806  * RTP_BIN_LOCK.
3807  */
3808 static GstPad *
3809 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
3810     const gchar * name)
3811 {
3812   guint sessid;
3813   GstRtpBinSession *session;
3814   GstPad *decsink = NULL;
3815
3816   /* first get the session number */
3817   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
3818     goto no_name;
3819
3820   GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid);
3821
3822   /* get or create the session */
3823   session = find_session_by_id (rtpbin, sessid);
3824   if (!session) {
3825     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
3826     /* create session now */
3827     session = create_session (rtpbin, sessid);
3828     if (session == NULL)
3829       goto create_error;
3830   }
3831
3832   /* check if pad was requested */
3833   if (session->recv_rtcp_sink_ghost != NULL)
3834     return session->recv_rtcp_sink_ghost;
3835
3836   decsink = complete_session_rtcp (rtpbin, session, sessid, TRUE);
3837   if (!decsink)
3838     goto create_error;
3839
3840   session->recv_rtcp_sink_ghost =
3841       gst_ghost_pad_new_from_template (name, decsink, templ);
3842   gst_object_unref (decsink);
3843   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
3844   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
3845       session->recv_rtcp_sink_ghost);
3846
3847   return session->recv_rtcp_sink_ghost;
3848
3849   /* ERRORS */
3850 no_name:
3851   {
3852     g_warning ("rtpbin: invalid name given");
3853     return NULL;
3854   }
3855 create_error:
3856   {
3857     /* create_session already warned */
3858     return NULL;
3859   }
3860 }
3861
3862 static void
3863 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3864 {
3865   if (session->recv_rtcp_sink_ghost) {
3866     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
3867     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3868         session->recv_rtcp_sink_ghost);
3869     session->recv_rtcp_sink_ghost = NULL;
3870   }
3871   if (session->sync_src) {
3872     /* releasing the request pad should also unref the sync pad */
3873     gst_object_unref (session->sync_src);
3874     session->sync_src = NULL;
3875   }
3876   if (session->recv_rtcp_sink) {
3877     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
3878     gst_object_unref (session->recv_rtcp_sink);
3879     session->recv_rtcp_sink = NULL;
3880   }
3881 }
3882
3883 static gboolean
3884 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
3885 {
3886   gchar *gname;
3887   guint sessid = session->id;
3888   GstPad *send_rtp_src;
3889   GstElement *encoder;
3890   GstElementClass *klass;
3891   GstPadTemplate *templ;
3892
3893   /* get srcpad */
3894   session->send_rtp_src =
3895       gst_element_get_static_pad (session->session, "send_rtp_src");
3896   if (session->send_rtp_src == NULL)
3897     goto no_srcpad;
3898
3899   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
3900   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
3901   if (encoder) {
3902     gchar *ename;
3903     GstPad *encsrc, *encsink;
3904     GstPadLinkReturn ret;
3905
3906     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
3907     ename = g_strdup_printf ("rtp_src_%u", sessid);
3908     encsrc = gst_element_get_static_pad (encoder, ename);
3909     g_free (ename);
3910
3911     if (encsrc == NULL)
3912       goto enc_src_failed;
3913
3914     send_rtp_src = encsrc;
3915
3916     ename = g_strdup_printf ("rtp_sink_%u", sessid);
3917     encsink = gst_element_get_static_pad (encoder, ename);
3918     g_free (ename);
3919     if (encsink == NULL)
3920       goto enc_sink_failed;
3921
3922     ret = gst_pad_link (session->send_rtp_src, encsink);
3923     gst_object_unref (encsink);
3924
3925     if (ret != GST_PAD_LINK_OK)
3926       goto enc_link_failed;
3927   } else {
3928     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
3929     send_rtp_src = gst_object_ref (session->send_rtp_src);
3930   }
3931
3932   /* ghost the new source pad */
3933   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3934   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
3935   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
3936   session->send_rtp_src_ghost =
3937       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
3938   gst_object_unref (send_rtp_src);
3939   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
3940   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
3941       session->send_rtp_src_ghost);
3942   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
3943   g_free (gname);
3944
3945   return TRUE;
3946
3947   /* ERRORS */
3948 no_srcpad:
3949   {
3950     g_warning ("rtpbin: failed to get rtp source pad for session %u", sessid);
3951     return FALSE;
3952   }
3953 enc_src_failed:
3954   {
3955     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
3956     return FALSE;
3957   }
3958 enc_sink_failed:
3959   {
3960     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
3961     gst_object_unref (send_rtp_src);
3962     return FALSE;
3963   }
3964 enc_link_failed:
3965   {
3966     g_warning ("rtpbin: failed to link rtp encoder for session %u", sessid);
3967     gst_object_unref (send_rtp_src);
3968     return FALSE;
3969   }
3970 }
3971
3972 static gboolean
3973 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
3974 {
3975   GstPad *pad;
3976   gchar *name;
3977   guint sessid;
3978   GstRtpBinSession *session = user_data, *newsess;
3979   GstRtpBin *rtpbin = session->bin;
3980   GstPadLinkReturn ret;
3981
3982   pad = g_value_get_object (item);
3983   name = gst_pad_get_name (pad);
3984
3985   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
3986     goto no_name;
3987
3988   g_free (name);
3989
3990   newsess = find_session_by_id (rtpbin, sessid);
3991   if (newsess == NULL) {
3992     /* create new session */
3993     newsess = create_session (rtpbin, sessid);
3994     if (newsess == NULL)
3995       goto create_error;
3996   } else if (newsess->send_rtp_sink != NULL)
3997     goto existing_session;
3998
3999   /* get send_rtp pad and store */
4000   newsess->send_rtp_sink =
4001       gst_element_get_request_pad (newsess->session, "send_rtp_sink");
4002   if (newsess->send_rtp_sink == NULL)
4003     goto pad_failed;
4004
4005   ret = gst_pad_link (pad, newsess->send_rtp_sink);
4006   if (ret != GST_PAD_LINK_OK)
4007     goto aux_link_failed;
4008
4009   if (!complete_session_src (rtpbin, newsess))
4010     goto session_src_failed;
4011
4012   return TRUE;
4013
4014   /* ERRORS */
4015 no_name:
4016   {
4017     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
4018     g_free (name);
4019     return TRUE;
4020   }
4021 create_error:
4022   {
4023     /* create_session already warned */
4024     return FALSE;
4025   }
4026 existing_session:
4027   {
4028     g_warning ("rtpbin: session %u is already a sender", sessid);
4029     return FALSE;
4030   }
4031 pad_failed:
4032   {
4033     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4034     return FALSE;
4035   }
4036 aux_link_failed:
4037   {
4038     g_warning ("rtpbin: failed to link AUX for session %u", sessid);
4039     return FALSE;
4040   }
4041 session_src_failed:
4042   {
4043     g_warning ("rtpbin: failed to complete AUX for session %u", sessid);
4044     return FALSE;
4045   }
4046 }
4047
4048 static gboolean
4049 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
4050     GstElement * aux)
4051 {
4052   GstIterator *it;
4053   GValue result = { 0, };
4054   GstIteratorResult res;
4055
4056   it = gst_element_iterate_src_pads (aux);
4057   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
4058   gst_iterator_free (it);
4059
4060   return res == GST_ITERATOR_DONE;
4061 }
4062
4063 /* Create a pad for sending RTP for the session in @name. Must be called with
4064  * RTP_BIN_LOCK.
4065  */
4066 static GstPad *
4067 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
4068 {
4069   gchar *pname;
4070   guint sessid;
4071   GstPad *send_rtp_sink;
4072   GstElement *aux;
4073   GstRtpBinSession *session;
4074
4075   /* first get the session number */
4076   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
4077     goto no_name;
4078
4079   /* get or create session */
4080   session = find_session_by_id (rtpbin, sessid);
4081   if (!session) {
4082     /* create session now */
4083     session = create_session (rtpbin, sessid);
4084     if (session == NULL)
4085       goto create_error;
4086   }
4087
4088   /* check if pad was requested */
4089   if (session->send_rtp_sink_ghost != NULL)
4090     return session->send_rtp_sink_ghost;
4091
4092   /* check if we are already using this session as a sender */
4093   if (session->send_rtp_sink != NULL)
4094     goto existing_session;
4095
4096   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
4097   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
4098   if (aux) {
4099     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
4100     if (!setup_aux_sender (rtpbin, session, aux))
4101       goto aux_session_failed;
4102
4103     pname = g_strdup_printf ("sink_%u", sessid);
4104     send_rtp_sink = gst_element_get_static_pad (aux, pname);
4105     g_free (pname);
4106
4107     if (send_rtp_sink == NULL)
4108       goto aux_sink_failed;
4109   } else {
4110     /* get send_rtp pad and store */
4111     session->send_rtp_sink =
4112         gst_element_get_request_pad (session->session, "send_rtp_sink");
4113     if (session->send_rtp_sink == NULL)
4114       goto pad_failed;
4115
4116     if (!complete_session_src (rtpbin, session))
4117       goto session_src_failed;
4118
4119     send_rtp_sink = gst_object_ref (session->send_rtp_sink);
4120   }
4121
4122   session->send_rtp_sink_ghost =
4123       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
4124   gst_object_unref (send_rtp_sink);
4125   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
4126   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
4127
4128   return session->send_rtp_sink_ghost;
4129
4130   /* ERRORS */
4131 no_name:
4132   {
4133     g_warning ("rtpbin: invalid name given");
4134     return NULL;
4135   }
4136 create_error:
4137   {
4138     /* create_session already warned */
4139     return NULL;
4140   }
4141 existing_session:
4142   {
4143     g_warning ("rtpbin: session %u is already in use", sessid);
4144     return NULL;
4145   }
4146 aux_session_failed:
4147   {
4148     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4149     return NULL;
4150   }
4151 aux_sink_failed:
4152   {
4153     g_warning ("rtpbin: failed to get AUX sink pad for session %u", sessid);
4154     return NULL;
4155   }
4156 pad_failed:
4157   {
4158     g_warning ("rtpbin: failed to get session pad for session %u", sessid);
4159     return NULL;
4160   }
4161 session_src_failed:
4162   {
4163     g_warning ("rtpbin: failed to setup source pads for session %u", sessid);
4164     return NULL;
4165   }
4166 }
4167
4168 static void
4169 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4170 {
4171   if (session->send_rtp_src_ghost) {
4172     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
4173     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4174         session->send_rtp_src_ghost);
4175     session->send_rtp_src_ghost = NULL;
4176   }
4177   if (session->send_rtp_src) {
4178     gst_object_unref (session->send_rtp_src);
4179     session->send_rtp_src = NULL;
4180   }
4181   if (session->send_rtp_sink) {
4182     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
4183         session->send_rtp_sink);
4184     gst_object_unref (session->send_rtp_sink);
4185     session->send_rtp_sink = NULL;
4186   }
4187   if (session->send_rtp_sink_ghost) {
4188     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
4189     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4190         session->send_rtp_sink_ghost);
4191     session->send_rtp_sink_ghost = NULL;
4192   }
4193 }
4194
4195 /* Create a pad for sending RTCP for the session in @name. Must be called with
4196  * RTP_BIN_LOCK.
4197  */
4198 static GstPad *
4199 create_send_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
4200     const gchar * name)
4201 {
4202   guint sessid;
4203   GstPad *encsrc;
4204   GstElement *encoder;
4205   GstRtpBinSession *session;
4206
4207   /* first get the session number */
4208   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
4209     goto no_name;
4210
4211   /* get or create session */
4212   session = find_session_by_id (rtpbin, sessid);
4213   if (!session) {
4214     GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid);
4215     /* create session now */
4216     session = create_session (rtpbin, sessid);
4217     if (session == NULL)
4218       goto create_error;
4219   }
4220
4221   /* check if pad was requested */
4222   if (session->send_rtcp_src_ghost != NULL)
4223     return session->send_rtcp_src_ghost;
4224
4225   /* get rtcp_src pad and store */
4226   session->send_rtcp_src =
4227       gst_element_get_request_pad (session->session, "send_rtcp_src");
4228   if (session->send_rtcp_src == NULL)
4229     goto pad_failed;
4230
4231   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
4232   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
4233   if (encoder) {
4234     gchar *ename;
4235     GstPad *encsink;
4236     GstPadLinkReturn ret;
4237
4238     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
4239
4240     ename = g_strdup_printf ("rtcp_src_%u", sessid);
4241     encsrc = gst_element_get_static_pad (encoder, ename);
4242     g_free (ename);
4243     if (encsrc == NULL)
4244       goto enc_src_failed;
4245
4246     ename = g_strdup_printf ("rtcp_sink_%u", sessid);
4247     encsink = gst_element_get_static_pad (encoder, ename);
4248     g_free (ename);
4249     if (encsink == NULL)
4250       goto enc_sink_failed;
4251
4252     ret = gst_pad_link (session->send_rtcp_src, encsink);
4253     gst_object_unref (encsink);
4254
4255     if (ret != GST_PAD_LINK_OK)
4256       goto enc_link_failed;
4257   } else {
4258     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
4259     encsrc = gst_object_ref (session->send_rtcp_src);
4260   }
4261
4262   session->send_rtcp_src_ghost =
4263       gst_ghost_pad_new_from_template (name, encsrc, templ);
4264   gst_object_unref (encsrc);
4265   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
4266   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
4267
4268   return session->send_rtcp_src_ghost;
4269
4270   /* ERRORS */
4271 no_name:
4272   {
4273     g_warning ("rtpbin: invalid name given");
4274     return NULL;
4275   }
4276 create_error:
4277   {
4278     /* create_session already warned */
4279     return NULL;
4280   }
4281 pad_failed:
4282   {
4283     g_warning ("rtpbin: failed to get rtcp pad for session %u", sessid);
4284     return NULL;
4285   }
4286 enc_src_failed:
4287   {
4288     g_warning ("rtpbin: failed to get encoder src pad for session %u", sessid);
4289     return NULL;
4290   }
4291 enc_sink_failed:
4292   {
4293     g_warning ("rtpbin: failed to get encoder sink pad for session %u", sessid);
4294     gst_object_unref (encsrc);
4295     return NULL;
4296   }
4297 enc_link_failed:
4298   {
4299     g_warning ("rtpbin: failed to link rtcp encoder for session %u", sessid);
4300     gst_object_unref (encsrc);
4301     return NULL;
4302   }
4303 }
4304
4305 static void
4306 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
4307 {
4308   if (session->send_rtcp_src_ghost) {
4309     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
4310     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
4311         session->send_rtcp_src_ghost);
4312     session->send_rtcp_src_ghost = NULL;
4313   }
4314   if (session->send_rtcp_src) {
4315     gst_element_release_request_pad (session->session, session->send_rtcp_src);
4316     gst_object_unref (session->send_rtcp_src);
4317     session->send_rtcp_src = NULL;
4318   }
4319 }
4320
4321 /* If the requested name is NULL we should create a name with
4322  * the session number assuming we want the lowest posible session
4323  * with a free pad like the template */
4324 static gchar *
4325 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
4326 {
4327   gboolean name_found = FALSE;
4328   gint session = 0;
4329   GstIterator *pad_it = NULL;
4330   gchar *pad_name = NULL;
4331   GValue data = { 0, };
4332
4333   GST_DEBUG_OBJECT (element, "find a free pad name for template");
4334   while (!name_found) {
4335     gboolean done = FALSE;
4336
4337     g_free (pad_name);
4338     pad_name = g_strdup_printf (templ->name_template, session++);
4339     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
4340     name_found = TRUE;
4341     while (!done) {
4342       switch (gst_iterator_next (pad_it, &data)) {
4343         case GST_ITERATOR_OK:
4344         {
4345           GstPad *pad;
4346           gchar *name;
4347
4348           pad = g_value_get_object (&data);
4349           name = gst_pad_get_name (pad);
4350
4351           if (strcmp (name, pad_name) == 0) {
4352             done = TRUE;
4353             name_found = FALSE;
4354           }
4355           g_free (name);
4356           g_value_reset (&data);
4357           break;
4358         }
4359         case GST_ITERATOR_ERROR:
4360         case GST_ITERATOR_RESYNC:
4361           /* restart iteration */
4362           done = TRUE;
4363           name_found = FALSE;
4364           session = 0;
4365           break;
4366         case GST_ITERATOR_DONE:
4367           done = TRUE;
4368           break;
4369       }
4370     }
4371     g_value_unset (&data);
4372     gst_iterator_free (pad_it);
4373   }
4374
4375   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
4376   return pad_name;
4377 }
4378
4379 /*
4380  */
4381 static GstPad *
4382 gst_rtp_bin_request_new_pad (GstElement * element,
4383     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
4384 {
4385   GstRtpBin *rtpbin;
4386   GstElementClass *klass;
4387   GstPad *result;
4388
4389   gchar *pad_name = NULL;
4390
4391   g_return_val_if_fail (templ != NULL, NULL);
4392   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
4393
4394   rtpbin = GST_RTP_BIN (element);
4395   klass = GST_ELEMENT_GET_CLASS (element);
4396
4397   GST_RTP_BIN_LOCK (rtpbin);
4398
4399   if (name == NULL) {
4400     /* use a free pad name */
4401     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
4402   } else {
4403     /* use the provided name */
4404     pad_name = g_strdup (name);
4405   }
4406
4407   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
4408
4409   /* figure out the template */
4410   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
4411     result = create_recv_rtp (rtpbin, templ, pad_name);
4412   } else if (templ == gst_element_class_get_pad_template (klass,
4413           "recv_rtcp_sink_%u")) {
4414     result = create_recv_rtcp (rtpbin, templ, pad_name);
4415   } else if (templ == gst_element_class_get_pad_template (klass,
4416           "send_rtp_sink_%u")) {
4417     result = create_send_rtp (rtpbin, templ, pad_name);
4418   } else if (templ == gst_element_class_get_pad_template (klass,
4419           "send_rtcp_src_%u")) {
4420     result = create_send_rtcp (rtpbin, templ, pad_name);
4421   } else
4422     goto wrong_template;
4423
4424   g_free (pad_name);
4425   GST_RTP_BIN_UNLOCK (rtpbin);
4426
4427   return result;
4428
4429   /* ERRORS */
4430 wrong_template:
4431   {
4432     g_free (pad_name);
4433     GST_RTP_BIN_UNLOCK (rtpbin);
4434     g_warning ("rtpbin: this is not our template");
4435     return NULL;
4436   }
4437 }
4438
4439 static void
4440 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
4441 {
4442   GstRtpBinSession *session;
4443   GstRtpBin *rtpbin;
4444
4445   g_return_if_fail (GST_IS_GHOST_PAD (pad));
4446   g_return_if_fail (GST_IS_RTP_BIN (element));
4447
4448   rtpbin = GST_RTP_BIN (element);
4449
4450   GST_RTP_BIN_LOCK (rtpbin);
4451   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
4452       GST_DEBUG_PAD_NAME (pad));
4453
4454   if (!(session = find_session_by_pad (rtpbin, pad)))
4455     goto unknown_pad;
4456
4457   if (session->recv_rtp_sink_ghost == pad) {
4458     remove_recv_rtp (rtpbin, session);
4459   } else if (session->recv_rtcp_sink_ghost == pad) {
4460     remove_recv_rtcp (rtpbin, session);
4461   } else if (session->send_rtp_sink_ghost == pad) {
4462     remove_send_rtp (rtpbin, session);
4463   } else if (session->send_rtcp_src_ghost == pad) {
4464     remove_rtcp (rtpbin, session);
4465   }
4466
4467   /* no more request pads, free the complete session */
4468   if (session->recv_rtp_sink_ghost == NULL
4469       && session->recv_rtcp_sink_ghost == NULL
4470       && session->send_rtp_sink_ghost == NULL
4471       && session->send_rtcp_src_ghost == NULL) {
4472     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
4473     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
4474     free_session (session, rtpbin);
4475   }
4476   GST_RTP_BIN_UNLOCK (rtpbin);
4477
4478   return;
4479
4480   /* ERROR */
4481 unknown_pad:
4482   {
4483     GST_RTP_BIN_UNLOCK (rtpbin);
4484     g_warning ("rtpbin: %s:%s is not one of our request pads",
4485         GST_DEBUG_PAD_NAME (pad));
4486     return;
4487   }
4488 }