rtpbin: handle multiple encoder instances
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of rtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
67  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
68  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
69  * and decoders in order to support SRTP. The encoders must provide the pads
70  * rtp_sink_\%d and rtp_src_\%d for RTP and rtcp_sink_\%d and rtcp_src_\%d for
71  * RTCP. The session number will be used in the pad name. The decoders must provide
72  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
73  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
74  * internally.
75  *
76  * <refsect2>
77  * <title>Example pipelines</title>
78  * |[
79  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
80  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
81  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
82  * |[
83  * gst-launch-1.0 rtpbin name=rtpbin \
84  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
85  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
86  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
87  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
88  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
89  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
90  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
91  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
92  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
93  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
94  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
95  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
96  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
97  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
98  * is received on port 5007. Since RTCP packets from the sender should be sent
99  * as soon as possible and do not participate in preroll, sync=false and
100  * async=false is configured on udpsink
101  * |[
102  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
103  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
104  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
105  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
106  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
107  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
108  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
109  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
110  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
111  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
112  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
113  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
114  * decode and display the video.
115  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
116  * decode and play the audio.
117  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
118  * session 1 on port 5003. These packets will be used for session management and
119  * synchronisation.
120  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
121  * on port 5007.
122  * </refsect2>
123  *
124  * Last reviewed on 2007-08-30 (0.10.6)
125  */
126
127 #ifdef HAVE_CONFIG_H
128 #include "config.h"
129 #endif
130 #include <stdio.h>
131 #include <string.h>
132
133 #include <gst/rtp/gstrtpbuffer.h>
134 #include <gst/rtp/gstrtcpbuffer.h>
135
136 #include "gstrtpbin.h"
137 #include "rtpsession.h"
138 #include "gstrtpsession.h"
139 #include "gstrtpjitterbuffer.h"
140
141 #include <gst/glib-compat-private.h>
142
143 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
144 #define GST_CAT_DEFAULT gst_rtp_bin_debug
145
146 /* sink pads */
147 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
148     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
149     GST_PAD_SINK,
150     GST_PAD_REQUEST,
151     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
152     );
153
154 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
155     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
156     GST_PAD_SINK,
157     GST_PAD_REQUEST,
158     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
159     );
160
161 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
162 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
163     GST_PAD_SINK,
164     GST_PAD_REQUEST,
165     GST_STATIC_CAPS ("application/x-rtp")
166     );
167
168 /* src pads */
169 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
170 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
171     GST_PAD_SRC,
172     GST_PAD_SOMETIMES,
173     GST_STATIC_CAPS ("application/x-rtp")
174     );
175
176 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
177     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
178     GST_PAD_SRC,
179     GST_PAD_REQUEST,
180     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
181     );
182
183 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
184     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
185     GST_PAD_SRC,
186     GST_PAD_SOMETIMES,
187     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
188     );
189
190 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
191    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
192
193 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
194 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
195
196 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
197 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
198 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
199
200 /* lock for shutdown */
201 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
202 G_STMT_START {                                   \
203   if (g_atomic_int_get (&bin->priv->shutdown))   \
204     goto label;                                  \
205   GST_RTP_BIN_DYN_LOCK (bin);                    \
206   if (g_atomic_int_get (&bin->priv->shutdown)) { \
207     GST_RTP_BIN_DYN_UNLOCK (bin);                \
208     goto label;                                  \
209   }                                              \
210 } G_STMT_END
211
212 /* unlock for shutdown */
213 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
214   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
215
216 struct _GstRtpBinPrivate
217 {
218   GMutex bin_lock;
219
220   /* lock protecting dynamic adding/removing */
221   GMutex dyn_lock;
222
223   /* if we are shutting down or not */
224   gint shutdown;
225
226   gboolean autoremove;
227
228   /* UNIX (ntp) time of last SR sync used */
229   guint64 last_unix;
230
231   /* list of extra elements */
232   GList *elements;
233 };
234
235 /* signals and args */
236 enum
237 {
238   SIGNAL_REQUEST_PT_MAP,
239   SIGNAL_PAYLOAD_TYPE_CHANGE,
240   SIGNAL_CLEAR_PT_MAP,
241   SIGNAL_RESET_SYNC,
242   SIGNAL_GET_INTERNAL_SESSION,
243
244   SIGNAL_ON_NEW_SSRC,
245   SIGNAL_ON_SSRC_COLLISION,
246   SIGNAL_ON_SSRC_VALIDATED,
247   SIGNAL_ON_SSRC_ACTIVE,
248   SIGNAL_ON_SSRC_SDES,
249   SIGNAL_ON_BYE_SSRC,
250   SIGNAL_ON_BYE_TIMEOUT,
251   SIGNAL_ON_TIMEOUT,
252   SIGNAL_ON_SENDER_TIMEOUT,
253   SIGNAL_ON_NPT_STOP,
254
255   SIGNAL_REQUEST_RTP_ENCODER,
256   SIGNAL_REQUEST_RTP_DECODER,
257   SIGNAL_REQUEST_RTCP_ENCODER,
258   SIGNAL_REQUEST_RTCP_DECODER,
259
260   LAST_SIGNAL
261 };
262
263 #define DEFAULT_LATENCY_MS           200
264 #define DEFAULT_DROP_ON_LATENCY      FALSE
265 #define DEFAULT_SDES                 NULL
266 #define DEFAULT_DO_LOST              FALSE
267 #define DEFAULT_IGNORE_PT            FALSE
268 #define DEFAULT_NTP_SYNC             FALSE
269 #define DEFAULT_AUTOREMOVE           FALSE
270 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
271 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
272 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
273 #define DEFAULT_RTCP_SYNC_INTERVAL   0
274 #define DEFAULT_DO_SYNC_EVENT        FALSE
275 #define DEFAULT_DO_RETRANSMISSION    FALSE
276
277 enum
278 {
279   PROP_0,
280   PROP_LATENCY,
281   PROP_DROP_ON_LATENCY,
282   PROP_SDES,
283   PROP_DO_LOST,
284   PROP_IGNORE_PT,
285   PROP_NTP_SYNC,
286   PROP_RTCP_SYNC,
287   PROP_RTCP_SYNC_INTERVAL,
288   PROP_AUTOREMOVE,
289   PROP_BUFFER_MODE,
290   PROP_USE_PIPELINE_CLOCK,
291   PROP_DO_SYNC_EVENT,
292   PROP_DO_RETRANSMISSION,
293   PROP_LAST
294 };
295
296 enum
297 {
298   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
299   GST_RTP_BIN_RTCP_SYNC_INITIAL,
300   GST_RTP_BIN_RTCP_SYNC_RTP
301 };
302
303 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
304 static GType
305 gst_rtp_bin_rtcp_sync_get_type (void)
306 {
307   static GType rtcp_sync_type = 0;
308   static const GEnumValue rtcp_sync_types[] = {
309     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
310     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
311     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
312     {0, NULL, NULL},
313   };
314
315   if (!rtcp_sync_type) {
316     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
317   }
318   return rtcp_sync_type;
319 }
320
321 /* helper objects */
322 typedef struct _GstRtpBinSession GstRtpBinSession;
323 typedef struct _GstRtpBinStream GstRtpBinStream;
324 typedef struct _GstRtpBinClient GstRtpBinClient;
325
326 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
327
328 static GstCaps *pt_map_requested (GstElement * element, guint pt,
329     GstRtpBinSession * session);
330 static void payload_type_change (GstElement * element, guint pt,
331     GstRtpBinSession * session);
332 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
333 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
334 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
335 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
336 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
337 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
338
339 /* Manages the RTP stream for one SSRC.
340  *
341  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
342  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
343  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
344  * together (see below).
345  */
346 struct _GstRtpBinStream
347 {
348   /* the SSRC of this stream */
349   guint32 ssrc;
350
351   /* parent bin */
352   GstRtpBin *bin;
353
354   /* the session this SSRC belongs to */
355   GstRtpBinSession *session;
356
357   /* the jitterbuffer of the SSRC */
358   GstElement *buffer;
359   gulong buffer_handlesync_sig;
360   gulong buffer_ptreq_sig;
361   gulong buffer_ntpstop_sig;
362   gint percent;
363
364   /* the PT demuxer of the SSRC */
365   GstElement *demux;
366   gulong demux_newpad_sig;
367   gulong demux_padremoved_sig;
368   gulong demux_ptreq_sig;
369   gulong demux_ptchange_sig;
370
371   /* if we have calculated a valid rt_delta for this stream */
372   gboolean have_sync;
373   /* mapping to local RTP and NTP time */
374   gint64 rt_delta;
375   gint64 rtp_delta;
376   /* base rtptime in gst time */
377   gint64 clock_base;
378 };
379
380 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
381 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
382
383 /* Manages the receiving end of the packets.
384  *
385  * There is one such structure for each RTP session (audio/video/...).
386  * We get the RTP/RTCP packets and stuff them into the session manager. From
387  * there they are pushed into an SSRC demuxer that splits the stream based on
388  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
389  * the GstRtpBinStream above).
390  */
391 struct _GstRtpBinSession
392 {
393   /* session id */
394   gint id;
395   /* the parent bin */
396   GstRtpBin *bin;
397   /* the session element */
398   GstElement *session;
399   /* the SSRC demuxer */
400   GstElement *demux;
401   gulong demux_newpad_sig;
402   gulong demux_padremoved_sig;
403
404   GMutex lock;
405
406   /* list of GstRtpBinStream */
407   GSList *streams;
408
409   /* list of encoders */
410   GSList *encoders;
411
412   /* list of decoders */
413   GSList *decoders;
414
415   /* mapping of payload type to caps */
416   GHashTable *ptmap;
417
418   /* the pads of the session */
419   GstPad *recv_rtp_sink;
420   GstPad *recv_rtp_sink_ghost;
421   GstPad *recv_rtp_src;
422   GstPad *recv_rtcp_sink;
423   GstPad *recv_rtcp_sink_ghost;
424   GstPad *sync_src;
425   GstPad *send_rtp_sink;
426   GstPad *send_rtp_sink_ghost;
427   GstPad *send_rtp_src;
428   GstPad *send_rtp_src_ghost;
429   GstPad *send_rtcp_src;
430   GstPad *send_rtcp_src_ghost;
431 };
432
433 /* Manages the RTP streams that come from one client and should therefore be
434  * synchronized.
435  */
436 struct _GstRtpBinClient
437 {
438   /* the common CNAME for the streams */
439   gchar *cname;
440   guint cname_len;
441
442   /* the streams */
443   guint nstreams;
444   GSList *streams;
445 };
446
447 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
448 static GstRtpBinSession *
449 find_session_by_id (GstRtpBin * rtpbin, gint id)
450 {
451   GSList *walk;
452
453   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
454     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
455
456     if (sess->id == id)
457       return sess;
458   }
459   return NULL;
460 }
461
462 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
463 static GstRtpBinSession *
464 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
465 {
466   GSList *walk;
467
468   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
469     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
470
471     if ((sess->recv_rtp_sink_ghost == pad) ||
472         (sess->recv_rtcp_sink_ghost == pad) ||
473         (sess->send_rtp_sink_ghost == pad)
474         || (sess->send_rtcp_src_ghost == pad))
475       return sess;
476   }
477   return NULL;
478 }
479
480 static void
481 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
482 {
483   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
484       sess->id, ssrc);
485 }
486
487 static void
488 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
489 {
490   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
491       sess->id, ssrc);
492 }
493
494 static void
495 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
496 {
497   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
498       sess->id, ssrc);
499 }
500
501 static void
502 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
503 {
504   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
505       sess->id, ssrc);
506 }
507
508 static void
509 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
510 {
511   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
512       sess->id, ssrc);
513 }
514
515 static void
516 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
517 {
518   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
519       sess->id, ssrc);
520 }
521
522 static void
523 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
524 {
525   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
526       sess->id, ssrc);
527
528   if (sess->bin->priv->autoremove)
529     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
530 }
531
532 static void
533 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
534 {
535   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
536       sess->id, ssrc);
537
538   if (sess->bin->priv->autoremove)
539     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
540 }
541
542 static void
543 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
544 {
545   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
546       sess->id, ssrc);
547 }
548
549 static void
550 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
551 {
552   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
553       stream->session->id, stream->ssrc);
554 }
555
556 /* must be called with the SESSION lock */
557 static GstRtpBinStream *
558 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
559 {
560   GSList *walk;
561
562   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
563     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
564
565     if (stream->ssrc == ssrc)
566       return stream;
567   }
568   return NULL;
569 }
570
571 static void
572 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
573     GstRtpBinSession * session)
574 {
575   GstRtpBinStream *stream = NULL;
576   GstRtpBin *rtpbin;
577
578   rtpbin = session->bin;
579
580   GST_RTP_BIN_LOCK (rtpbin);
581
582   GST_RTP_SESSION_LOCK (session);
583   if ((stream = find_stream_by_ssrc (session, ssrc)))
584     session->streams = g_slist_remove (session->streams, stream);
585   GST_RTP_SESSION_UNLOCK (session);
586
587   if (stream)
588     free_stream (stream, rtpbin);
589
590   GST_RTP_BIN_UNLOCK (rtpbin);
591 }
592
593 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
594 static GstRtpBinSession *
595 create_session (GstRtpBin * rtpbin, gint id)
596 {
597   GstRtpBinSession *sess;
598   GstElement *session, *demux;
599   GstState target;
600
601   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
602     goto no_session;
603
604   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
605     goto no_demux;
606
607   sess = g_new0 (GstRtpBinSession, 1);
608   g_mutex_init (&sess->lock);
609   sess->id = id;
610   sess->bin = rtpbin;
611   sess->session = session;
612   sess->demux = demux;
613   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
614       (GDestroyNotify) gst_caps_unref);
615   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
616
617   /* configure SDES items */
618   GST_OBJECT_LOCK (rtpbin);
619   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
620       rtpbin->use_pipeline_clock, NULL);
621   GST_OBJECT_UNLOCK (rtpbin);
622
623   /* provide clock_rate to the session manager when needed */
624   g_signal_connect (session, "request-pt-map",
625       (GCallback) pt_map_requested, sess);
626
627   g_signal_connect (sess->session, "on-new-ssrc",
628       (GCallback) on_new_ssrc, sess);
629   g_signal_connect (sess->session, "on-ssrc-collision",
630       (GCallback) on_ssrc_collision, sess);
631   g_signal_connect (sess->session, "on-ssrc-validated",
632       (GCallback) on_ssrc_validated, sess);
633   g_signal_connect (sess->session, "on-ssrc-active",
634       (GCallback) on_ssrc_active, sess);
635   g_signal_connect (sess->session, "on-ssrc-sdes",
636       (GCallback) on_ssrc_sdes, sess);
637   g_signal_connect (sess->session, "on-bye-ssrc",
638       (GCallback) on_bye_ssrc, sess);
639   g_signal_connect (sess->session, "on-bye-timeout",
640       (GCallback) on_bye_timeout, sess);
641   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
642   g_signal_connect (sess->session, "on-sender-timeout",
643       (GCallback) on_sender_timeout, sess);
644
645   gst_bin_add (GST_BIN_CAST (rtpbin), session);
646   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
647
648   GST_OBJECT_LOCK (rtpbin);
649   target = GST_STATE_TARGET (rtpbin);
650   GST_OBJECT_UNLOCK (rtpbin);
651
652   /* change state only to what's needed */
653   gst_element_set_state (demux, target);
654   gst_element_set_state (session, target);
655
656   return sess;
657
658   /* ERRORS */
659 no_session:
660   {
661     g_warning ("rtpbin: could not create rtpsession element");
662     return NULL;
663   }
664 no_demux:
665   {
666     gst_object_unref (session);
667     g_warning ("rtpbin: could not create rtpssrcdemux element");
668     return NULL;
669   }
670 }
671
672 static gboolean
673 bin_manage_element (GstRtpBin * bin, GstElement * element)
674 {
675   GstRtpBinPrivate *priv = bin->priv;
676
677   if (g_list_find (priv->elements, element)) {
678     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
679   } else {
680     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
681     if (!gst_bin_add (GST_BIN_CAST (bin), element))
682       goto add_failed;
683     if (!gst_element_sync_state_with_parent (element))
684       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
685   }
686   /* we add the element multiple times, each we need an equal number of
687    * removes to really remove the element from the bin */
688   priv->elements = g_list_prepend (priv->elements, element);
689
690   return TRUE;
691
692   /* ERRORS */
693 add_failed:
694   {
695     GST_WARNING_OBJECT (bin, "unable to add element");
696     return FALSE;
697   }
698 }
699
700 static void
701 remove_bin_element (GstElement * element, GstRtpBin * bin)
702 {
703   GstRtpBinPrivate *priv = bin->priv;
704   GList *find;
705
706   find = g_list_find (priv->elements, element);
707   if (find) {
708     priv->elements = g_list_delete_link (priv->elements, find);
709
710     if (!g_list_find (priv->elements, element))
711       gst_bin_remove (GST_BIN_CAST (bin), element);
712     else
713       gst_object_unref (element);
714   }
715 }
716
717 /* called with RTP_BIN_LOCK */
718 static void
719 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
720 {
721   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
722
723   gst_element_set_locked_state (sess->demux, TRUE);
724   gst_element_set_locked_state (sess->session, TRUE);
725
726   gst_element_set_state (sess->demux, GST_STATE_NULL);
727   gst_element_set_state (sess->session, GST_STATE_NULL);
728
729   remove_recv_rtp (bin, sess);
730   remove_recv_rtcp (bin, sess);
731   remove_send_rtp (bin, sess);
732   remove_rtcp (bin, sess);
733
734   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
735   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
736
737   g_slist_foreach (sess->encoders, (GFunc) remove_bin_element, bin);
738   g_slist_free (sess->encoders);
739
740   g_slist_foreach (sess->decoders, (GFunc) remove_bin_element, bin);
741   g_slist_free (sess->decoders);
742
743   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
744   g_slist_free (sess->streams);
745
746   g_mutex_clear (&sess->lock);
747   g_hash_table_destroy (sess->ptmap);
748
749   g_free (sess);
750 }
751
752 /* get the payload type caps for the specific payload @pt in @session */
753 static GstCaps *
754 get_pt_map (GstRtpBinSession * session, guint pt)
755 {
756   GstCaps *caps = NULL;
757   GstRtpBin *bin;
758   GValue ret = { 0 };
759   GValue args[3] = { {0}, {0}, {0} };
760
761   GST_DEBUG ("searching pt %d in cache", pt);
762
763   GST_RTP_SESSION_LOCK (session);
764
765   /* first look in the cache */
766   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
767   if (caps) {
768     gst_caps_ref (caps);
769     goto done;
770   }
771
772   bin = session->bin;
773
774   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
775
776   /* not in cache, send signal to request caps */
777   g_value_init (&args[0], GST_TYPE_ELEMENT);
778   g_value_set_object (&args[0], bin);
779   g_value_init (&args[1], G_TYPE_UINT);
780   g_value_set_uint (&args[1], session->id);
781   g_value_init (&args[2], G_TYPE_UINT);
782   g_value_set_uint (&args[2], pt);
783
784   g_value_init (&ret, GST_TYPE_CAPS);
785   g_value_set_boxed (&ret, NULL);
786
787   GST_RTP_SESSION_UNLOCK (session);
788
789   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
790
791   GST_RTP_SESSION_LOCK (session);
792
793   g_value_unset (&args[0]);
794   g_value_unset (&args[1]);
795   g_value_unset (&args[2]);
796
797   /* look in the cache again because we let the lock go */
798   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
799   if (caps) {
800     gst_caps_ref (caps);
801     g_value_unset (&ret);
802     goto done;
803   }
804
805   caps = (GstCaps *) g_value_dup_boxed (&ret);
806   g_value_unset (&ret);
807   if (!caps)
808     goto no_caps;
809
810   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
811
812   /* store in cache, take additional ref */
813   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
814       gst_caps_ref (caps));
815
816 done:
817   GST_RTP_SESSION_UNLOCK (session);
818
819   return caps;
820
821   /* ERRORS */
822 no_caps:
823   {
824     GST_RTP_SESSION_UNLOCK (session);
825     GST_DEBUG ("no pt map could be obtained");
826     return NULL;
827   }
828 }
829
830 static gboolean
831 return_true (gpointer key, gpointer value, gpointer user_data)
832 {
833   return TRUE;
834 }
835
836 static void
837 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
838 {
839   GSList *clients, *streams;
840
841   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
842
843   GST_RTP_BIN_LOCK (rtpbin);
844   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
845     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
846
847     /* reset sync on all streams for this client */
848     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
849       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
850
851       /* make use require a new SR packet for this stream before we attempt new
852        * lip-sync */
853       stream->have_sync = FALSE;
854       stream->rt_delta = 0;
855       stream->rtp_delta = 0;
856       stream->clock_base = -100 * GST_SECOND;
857     }
858   }
859   GST_RTP_BIN_UNLOCK (rtpbin);
860 }
861
862 static void
863 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
864 {
865   GSList *sessions, *streams;
866
867   GST_RTP_BIN_LOCK (bin);
868   GST_DEBUG_OBJECT (bin, "clearing pt map");
869   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
870     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
871
872     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
873     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
874
875     GST_RTP_SESSION_LOCK (session);
876     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
877
878     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
879       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
880
881       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
882       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
883       if (stream->demux)
884         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
885     }
886     GST_RTP_SESSION_UNLOCK (session);
887   }
888   GST_RTP_BIN_UNLOCK (bin);
889
890   /* reset sync too */
891   gst_rtp_bin_reset_sync (bin);
892 }
893
894 static RTPSession *
895 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
896 {
897   RTPSession *internal_session = NULL;
898   GstRtpBinSession *session;
899
900   GST_RTP_BIN_LOCK (bin);
901   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
902       session_id);
903   session = find_session_by_id (bin, (gint) session_id);
904   if (session) {
905     g_object_get (session->session, "internal-session", &internal_session,
906         NULL);
907   }
908   GST_RTP_BIN_UNLOCK (bin);
909
910   return internal_session;
911 }
912
913 static GstElement *
914 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
915 {
916   GST_DEBUG_OBJECT (bin, "return NULL encoder");
917   return NULL;
918 }
919
920 static GstElement *
921 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
922 {
923   GST_DEBUG_OBJECT (bin, "return NULL decoder");
924   return NULL;
925 }
926
927 static void
928 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
929     const gchar * name, const GValue * value)
930 {
931   GSList *sessions, *streams;
932
933   GST_RTP_BIN_LOCK (bin);
934   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
935     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
936
937     GST_RTP_SESSION_LOCK (session);
938     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
939       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
940
941       g_object_set_property (G_OBJECT (stream->buffer), name, value);
942     }
943     GST_RTP_SESSION_UNLOCK (session);
944   }
945   GST_RTP_BIN_UNLOCK (bin);
946 }
947
948 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
949 static GstRtpBinClient *
950 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
951 {
952   GstRtpBinClient *result = NULL;
953   GSList *walk;
954
955   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
956     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
957
958     if (len != client->cname_len)
959       continue;
960
961     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
962       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
963           client->cname);
964       result = client;
965       break;
966     }
967   }
968
969   /* nothing found, create one */
970   if (result == NULL) {
971     result = g_new0 (GstRtpBinClient, 1);
972     result->cname = g_strndup ((gchar *) data, len);
973     result->cname_len = len;
974     bin->clients = g_slist_prepend (bin->clients, result);
975     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
976         result->cname);
977   }
978   return result;
979 }
980
981 static void
982 free_client (GstRtpBinClient * client, GstRtpBin * bin)
983 {
984   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
985   g_slist_free (client->streams);
986   g_free (client->cname);
987   g_free (client);
988 }
989
990 static void
991 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
992     guint64 * ntpnstime)
993 {
994   guint64 ntpns;
995   GstClock *clock;
996   GstClockTime base_time, rt, clock_time;
997
998   GST_OBJECT_LOCK (bin);
999   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1000     base_time = GST_ELEMENT_CAST (bin)->base_time;
1001     gst_object_ref (clock);
1002     GST_OBJECT_UNLOCK (bin);
1003
1004     clock_time = gst_clock_get_time (clock);
1005
1006     if (bin->use_pipeline_clock) {
1007       ntpns = clock_time - base_time;
1008     } else {
1009       GTimeVal current;
1010
1011       /* get current NTP time */
1012       g_get_current_time (&current);
1013       ntpns = GST_TIMEVAL_TO_TIME (current);
1014     }
1015
1016     /* add constant to convert from 1970 based time to 1900 based time */
1017     ntpns += (2208988800LL * GST_SECOND);
1018
1019     /* get current clock time and convert to running time */
1020     rt = clock_time - base_time;
1021
1022     gst_object_unref (clock);
1023   } else {
1024     GST_OBJECT_UNLOCK (bin);
1025     rt = -1;
1026     ntpns = -1;
1027   }
1028   if (running_time)
1029     *running_time = rt;
1030   if (ntpnstime)
1031     *ntpnstime = ntpns;
1032 }
1033
1034 static void
1035 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1036     gint64 ts_offset, gboolean check)
1037 {
1038   gint64 prev_ts_offset;
1039
1040   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1041
1042   /* delta changed, see how much */
1043   if (prev_ts_offset != ts_offset) {
1044     gint64 diff;
1045
1046     diff = prev_ts_offset - ts_offset;
1047
1048     GST_DEBUG_OBJECT (bin,
1049         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1050         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1051
1052     if (check) {
1053       /* only change diff when it changed more than 4 milliseconds. This
1054        * compensates for rounding errors in NTP to RTP timestamp
1055        * conversions */
1056       if (ABS (diff) < 4 * GST_MSECOND) {
1057         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1058         return;
1059       }
1060       if (ABS (diff) > (3 * GST_SECOND)) {
1061         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
1062         return;
1063       }
1064     }
1065     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
1066   }
1067   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1068       stream->ssrc, ts_offset);
1069 }
1070
1071 static void
1072 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1073 {
1074   if (stream->bin->send_sync_event) {
1075     GstEvent *event;
1076     GstPad *srcpad;
1077
1078     GST_DEBUG_OBJECT (stream->bin,
1079         "sending GstRTCPSRReceived event downstream");
1080
1081     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1082         gst_structure_new_empty ("GstRTCPSRReceived"));
1083
1084     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1085     gst_pad_push_event (srcpad, event);
1086     gst_object_unref (srcpad);
1087   }
1088 }
1089
1090 /* associate a stream to the given CNAME. This will make sure all streams for
1091  * that CNAME are synchronized together.
1092  * Must be called with GST_RTP_BIN_LOCK */
1093 static void
1094 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1095     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1096     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1097     gint64 rtp_clock_base)
1098 {
1099   GstRtpBinClient *client;
1100   gboolean created;
1101   GSList *walk;
1102   guint64 local_rt;
1103   guint64 local_rtp;
1104   GstClockTime running_time;
1105   guint64 ntpnstime;
1106   gint64 ntpdiff, rtdiff;
1107   guint64 last_unix;
1108
1109   /* first find or create the CNAME */
1110   client = get_client (bin, len, data, &created);
1111
1112   /* find stream in the client */
1113   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1114     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1115
1116     if (ostream == stream)
1117       break;
1118   }
1119   /* not found, add it to the list */
1120   if (walk == NULL) {
1121     GST_DEBUG_OBJECT (bin,
1122         "new association of SSRC %08x with client %p with CNAME %s",
1123         stream->ssrc, client, client->cname);
1124     client->streams = g_slist_prepend (client->streams, stream);
1125     client->nstreams++;
1126   } else {
1127     GST_DEBUG_OBJECT (bin,
1128         "found association of SSRC %08x with client %p with CNAME %s",
1129         stream->ssrc, client, client->cname);
1130   }
1131
1132   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1133     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1134     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1135       /* we don't need that data, so carry on,
1136        * but make some values look saner */
1137       last_extrtptime = base_rtptime;
1138     } else {
1139       /* nothing we can do with this data in this case */
1140       GST_DEBUG_OBJECT (bin, "bailing out");
1141       return;
1142     }
1143   }
1144
1145   /* Take the extended rtptime we found in the SR packet and map it to the
1146    * local rtptime. The local rtp time is used to construct timestamps on the
1147    * buffers so we will calculate what running_time corresponds to the RTP
1148    * timestamp in the SR packet. */
1149   local_rtp = last_extrtptime - base_rtptime;
1150
1151   GST_DEBUG_OBJECT (bin,
1152       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1153       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1154       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1155       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1156
1157   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1158    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1159    * into a corresponding gstreamer timestamp. Note that the base_time also
1160    * contains the drift between sender and receiver. */
1161   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1162   local_rt += base_time;
1163
1164   /* convert ntptime to unix time since 1900 */
1165   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1166       (G_GINT64_CONSTANT (1) << 32));
1167
1168   stream->have_sync = TRUE;
1169
1170   GST_DEBUG_OBJECT (bin,
1171       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1172       local_rt, last_unix);
1173
1174   /* recalc inter stream playout offset, but only if there is more than one
1175    * stream or we're doing NTP sync. */
1176   if (bin->ntp_sync) {
1177     /* For NTP sync we need to first get a snapshot of running_time and NTP
1178      * time. We know at what running_time we play a certain RTP time, we also
1179      * calculated when we would play the RTP time in the SR packet. Now we need
1180      * to know how the running_time and the NTP time relate to eachother. */
1181     get_current_times (bin, &running_time, &ntpnstime);
1182
1183     /* see how far away the NTP time is. This is the difference between the
1184      * current NTP time and the NTP time in the last SR packet. */
1185     ntpdiff = ntpnstime - last_unix;
1186     /* see how far away the running_time is. This is the difference between the
1187      * current running_time and the running_time of the RTP timestamp in the
1188      * last SR packet. */
1189     rtdiff = running_time - local_rt;
1190
1191     GST_DEBUG_OBJECT (bin,
1192         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1193         ntpnstime, last_unix);
1194     GST_DEBUG_OBJECT (bin,
1195         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1196         rtdiff);
1197
1198     /* combine to get the final diff to apply to the running_time */
1199     stream->rt_delta = rtdiff - ntpdiff;
1200
1201     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1202   } else {
1203     gint64 min, rtp_min, clock_base = stream->clock_base;
1204     gboolean all_sync, use_rtp;
1205     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1206
1207     /* calculate delta between server and receiver. last_unix is created by
1208      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1209      * delta expresses the difference to our timeline and the server timeline. The
1210      * difference in itself doesn't mean much but we can combine the delta of
1211      * multiple streams to create a stream specific offset. */
1212     stream->rt_delta = last_unix - local_rt;
1213
1214     /* calculate the min of all deltas, ignoring streams that did not yet have a
1215      * valid rt_delta because we did not yet receive an SR packet for those
1216      * streams.
1217      * We calculate the mininum because we would like to only apply positive
1218      * offsets to streams, delaying their playback instead of trying to speed up
1219      * other streams (which might be imposible when we have to create negative
1220      * latencies).
1221      * The stream that has the smallest diff is selected as the reference stream,
1222      * all other streams will have a positive offset to this difference. */
1223
1224     /* some alternative setting allow ignoring RTCP as much as possible,
1225      * for servers generating bogus ntp timeline */
1226     min = rtp_min = G_MAXINT64;
1227     use_rtp = FALSE;
1228     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1229       guint64 ext_base;
1230
1231       use_rtp = TRUE;
1232       /* signed version for convienience */
1233       clock_base = base_rtptime;
1234       /* deal with possible wrap-around */
1235       ext_base = base_rtptime;
1236       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1237       /* sanity check; base rtp and provided clock_base should be close */
1238       if (rtp_clock_base >= clock_base) {
1239         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1240           rtp_clock_base = base_time +
1241               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1242               GST_SECOND, clock_rate);
1243         } else {
1244           use_rtp = FALSE;
1245         }
1246       } else {
1247         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1248           rtp_clock_base = base_time -
1249               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1250               GST_SECOND, clock_rate);
1251         } else {
1252           use_rtp = FALSE;
1253         }
1254       }
1255       /* warn and bail for clarity out if no sane values */
1256       if (!use_rtp) {
1257         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1258         return;
1259       }
1260       /* store to track changes */
1261       clock_base = rtp_clock_base;
1262       /* generate a fake as before,
1263        * now equating rtptime obtained from RTP-Info,
1264        * where the large time represent the otherwise irrelevant npt/ntp time */
1265       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1266     } else {
1267       clock_base = rtp_clock_base;
1268     }
1269
1270     all_sync = TRUE;
1271     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1272       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1273
1274       if (!ostream->have_sync) {
1275         all_sync = FALSE;
1276         continue;
1277       }
1278
1279       /* change in current stream's base from previously init'ed value
1280        * leads to reset of all stream's base */
1281       if (stream != ostream && stream->clock_base >= 0 &&
1282           (stream->clock_base != clock_base)) {
1283         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1284         ostream->clock_base = -100 * GST_SECOND;
1285         ostream->rtp_delta = 0;
1286       }
1287
1288       if (ostream->rt_delta < min)
1289         min = ostream->rt_delta;
1290       if (ostream->rtp_delta < rtp_min)
1291         rtp_min = ostream->rtp_delta;
1292     }
1293
1294     /* arrange to re-sync for each stream upon significant change,
1295      * e.g. post-seek */
1296     all_sync = all_sync && (stream->clock_base == clock_base);
1297     stream->clock_base = clock_base;
1298
1299     /* may need init performed above later on, but nothing more to do now */
1300     if (client->nstreams <= 1)
1301       return;
1302
1303     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1304         " all sync %d", client, min, all_sync);
1305     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1306
1307     switch (rtcp_sync) {
1308       case GST_RTP_BIN_RTCP_SYNC_RTP:
1309         if (!use_rtp)
1310           break;
1311         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1312             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1313         /* fall-through */
1314       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1315         /* if all have been synced already, do not bother further */
1316         if (all_sync) {
1317           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1318           return;
1319         }
1320         break;
1321       default:
1322         break;
1323     }
1324
1325     /* bail out if we adjusted recently enough */
1326     if (all_sync && (last_unix - bin->priv->last_unix) <
1327         bin->rtcp_sync_interval * GST_MSECOND) {
1328       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1329           "previous sender info too recent "
1330           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1331       return;
1332     }
1333     bin->priv->last_unix = last_unix;
1334
1335     /* calculate offsets for each stream */
1336     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1337       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1338       gint64 ts_offset;
1339
1340       /* ignore streams for which we didn't receive an SR packet yet, we
1341        * can't synchronize them yet. We can however sync other streams just
1342        * fine. */
1343       if (!ostream->have_sync)
1344         continue;
1345
1346       /* calculate offset to our reference stream, this should always give a
1347        * positive number. */
1348       if (use_rtp)
1349         ts_offset = ostream->rtp_delta - rtp_min;
1350       else
1351         ts_offset = ostream->rt_delta - min;
1352
1353       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1354     }
1355   }
1356   gst_rtp_bin_send_sync_event (stream);
1357
1358   return;
1359 }
1360
1361 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1362   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1363           (b) = gst_rtcp_packet_move_to_next ((packet)))
1364
1365 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1366   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1367           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1368
1369 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1370   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1371           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1372
1373 static void
1374 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1375     GstRtpBinStream * stream)
1376 {
1377   GstRtpBin *bin;
1378   GstRTCPPacket packet;
1379   guint32 ssrc;
1380   guint64 ntptime;
1381   gboolean have_sr, have_sdes;
1382   gboolean more;
1383   guint64 base_rtptime;
1384   guint64 base_time;
1385   guint clock_rate;
1386   guint64 clock_base;
1387   guint64 extrtptime;
1388   GstBuffer *buffer;
1389   GstRTCPBuffer rtcp = { NULL, };
1390
1391   bin = stream->bin;
1392
1393   GST_DEBUG_OBJECT (bin, "sync handler called");
1394
1395   /* get the last relation between the rtp timestamps and the gstreamer
1396    * timestamps. We get this info directly from the jitterbuffer which
1397    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1398    * what the current situation is. */
1399   base_rtptime =
1400       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1401   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1402   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1403   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1404   extrtptime =
1405       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1406   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1407
1408   have_sr = FALSE;
1409   have_sdes = FALSE;
1410
1411   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1412
1413   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1414     /* first packet must be SR or RR or else the validate would have failed */
1415     switch (gst_rtcp_packet_get_type (&packet)) {
1416       case GST_RTCP_TYPE_SR:
1417         /* only parse first. There is only supposed to be one SR in the packet
1418          * but we will deal with malformed packets gracefully */
1419         if (have_sr)
1420           break;
1421         /* get NTP and RTP times */
1422         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1423             NULL, NULL);
1424
1425         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1426         /* ignore SR that is not ours */
1427         if (ssrc != stream->ssrc)
1428           continue;
1429
1430         have_sr = TRUE;
1431         break;
1432       case GST_RTCP_TYPE_SDES:
1433       {
1434         gboolean more_items, more_entries;
1435
1436         /* only deal with first SDES, there is only supposed to be one SDES in
1437          * the RTCP packet but we deal with bad packets gracefully. Also bail
1438          * out if we have not seen an SR item yet. */
1439         if (have_sdes || !have_sr)
1440           break;
1441
1442         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1443           /* skip items that are not about the SSRC of the sender */
1444           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1445             continue;
1446
1447           /* find the CNAME entry */
1448           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1449             GstRTCPSDESType type;
1450             guint8 len;
1451             guint8 *data;
1452
1453             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1454
1455             if (type == GST_RTCP_SDES_CNAME) {
1456               GST_RTP_BIN_LOCK (bin);
1457               /* associate the stream to CNAME */
1458               gst_rtp_bin_associate (bin, stream, len, data,
1459                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1460                   clock_base);
1461               GST_RTP_BIN_UNLOCK (bin);
1462             }
1463           }
1464         }
1465         have_sdes = TRUE;
1466         break;
1467       }
1468       default:
1469         /* we can ignore these packets */
1470         break;
1471     }
1472   }
1473   gst_rtcp_buffer_unmap (&rtcp);
1474 }
1475
1476 /* create a new stream with @ssrc in @session. Must be called with
1477  * RTP_SESSION_LOCK. */
1478 static GstRtpBinStream *
1479 create_stream (GstRtpBinSession * session, guint32 ssrc)
1480 {
1481   GstElement *buffer, *demux = NULL;
1482   GstRtpBinStream *stream;
1483   GstRtpBin *rtpbin;
1484   GstState target;
1485
1486   rtpbin = session->bin;
1487
1488   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1489     goto no_jitterbuffer;
1490
1491   if (!rtpbin->ignore_pt)
1492     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1493       goto no_demux;
1494
1495
1496   stream = g_new0 (GstRtpBinStream, 1);
1497   stream->ssrc = ssrc;
1498   stream->bin = rtpbin;
1499   stream->session = session;
1500   stream->buffer = buffer;
1501   stream->demux = demux;
1502
1503   stream->have_sync = FALSE;
1504   stream->rt_delta = 0;
1505   stream->rtp_delta = 0;
1506   stream->percent = 100;
1507   stream->clock_base = -100 * GST_SECOND;
1508   session->streams = g_slist_prepend (session->streams, stream);
1509
1510   /* provide clock_rate to the jitterbuffer when needed */
1511   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1512       (GCallback) pt_map_requested, session);
1513   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1514       (GCallback) on_npt_stop, stream);
1515
1516   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1517   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1518
1519   /* configure latency and packet lost */
1520   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1521   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1522   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1523   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1524   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1525
1526   if (!rtpbin->ignore_pt)
1527     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1528   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1529
1530   /* link stuff */
1531   if (demux)
1532     gst_element_link_pads_full (buffer, "src", demux, "sink",
1533         GST_PAD_LINK_CHECK_NOTHING);
1534
1535   if (rtpbin->buffering) {
1536     guint64 last_out;
1537
1538     GST_INFO_OBJECT (rtpbin,
1539         "bin is buffering, set jitterbuffer as not active");
1540     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1541   }
1542
1543
1544   GST_OBJECT_LOCK (rtpbin);
1545   target = GST_STATE_TARGET (rtpbin);
1546   GST_OBJECT_UNLOCK (rtpbin);
1547
1548   /* from sink to source */
1549   if (demux)
1550     gst_element_set_state (demux, target);
1551
1552   gst_element_set_state (buffer, target);
1553
1554   return stream;
1555
1556   /* ERRORS */
1557 no_jitterbuffer:
1558   {
1559     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1560     return NULL;
1561   }
1562 no_demux:
1563   {
1564     gst_object_unref (buffer);
1565     g_warning ("rtpbin: could not create rtpptdemux element");
1566     return NULL;
1567   }
1568 }
1569
1570 /* called with RTP_BIN_LOCK */
1571 static void
1572 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1573 {
1574   GSList *clients, *next_client;
1575
1576   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1577
1578   if (stream->demux) {
1579     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1580     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1581     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1582   }
1583   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1584   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1585   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1586
1587   if (stream->demux)
1588     gst_element_set_locked_state (stream->demux, TRUE);
1589   gst_element_set_locked_state (stream->buffer, TRUE);
1590
1591   if (stream->demux)
1592     gst_element_set_state (stream->demux, GST_STATE_NULL);
1593   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1594
1595   /* now remove this signal, we need this while going to NULL because it to
1596    * do some cleanups */
1597   if (stream->demux)
1598     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1599
1600   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1601   if (stream->demux)
1602     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1603
1604   for (clients = bin->clients; clients; clients = next_client) {
1605     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1606     GSList *streams, *next_stream;
1607
1608     next_client = g_slist_next (clients);
1609
1610     for (streams = client->streams; streams; streams = next_stream) {
1611       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1612
1613       next_stream = g_slist_next (streams);
1614
1615       if (ostream == stream) {
1616         client->streams = g_slist_delete_link (client->streams, streams);
1617         /* If this was the last stream belonging to this client,
1618          * clean up the client. */
1619         if (--client->nstreams == 0) {
1620           bin->clients = g_slist_delete_link (bin->clients, clients);
1621           free_client (client, bin);
1622           break;
1623         }
1624       }
1625     }
1626   }
1627   g_free (stream);
1628 }
1629
1630 /* GObject vmethods */
1631 static void gst_rtp_bin_dispose (GObject * object);
1632 static void gst_rtp_bin_finalize (GObject * object);
1633 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1634     const GValue * value, GParamSpec * pspec);
1635 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1636     GValue * value, GParamSpec * pspec);
1637
1638 /* GstElement vmethods */
1639 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1640     GstStateChange transition);
1641 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1642     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1643 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1644 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1645
1646 #define gst_rtp_bin_parent_class parent_class
1647 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1648
1649 static gboolean
1650 _gst_element_accumulator (GSignalInvocationHint * ihint,
1651     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1652 {
1653   GstElement *element;
1654
1655   element = g_value_get_object (handler_return);
1656   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
1657
1658   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1659     g_value_set_object (return_accu, element);
1660
1661   /* stop emission if we have an element */
1662   return (element == NULL);
1663 }
1664
1665 static void
1666 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1667 {
1668   GObjectClass *gobject_class;
1669   GstElementClass *gstelement_class;
1670   GstBinClass *gstbin_class;
1671
1672   gobject_class = (GObjectClass *) klass;
1673   gstelement_class = (GstElementClass *) klass;
1674   gstbin_class = (GstBinClass *) klass;
1675
1676   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1677
1678   gobject_class->dispose = gst_rtp_bin_dispose;
1679   gobject_class->finalize = gst_rtp_bin_finalize;
1680   gobject_class->set_property = gst_rtp_bin_set_property;
1681   gobject_class->get_property = gst_rtp_bin_get_property;
1682
1683   g_object_class_install_property (gobject_class, PROP_LATENCY,
1684       g_param_spec_uint ("latency", "Buffer latency in ms",
1685           "Default amount of ms to buffer in the jitterbuffers", 0,
1686           G_MAXUINT, DEFAULT_LATENCY_MS,
1687           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1688
1689   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1690       g_param_spec_boolean ("drop-on-latency",
1691           "Drop buffers when maximum latency is reached",
1692           "Tells the jitterbuffer to never exceed the given latency in size",
1693           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1694
1695   /**
1696    * GstRtpBin::request-pt-map:
1697    * @rtpbin: the object which received the signal
1698    * @session: the session
1699    * @pt: the pt
1700    *
1701    * Request the payload type as #GstCaps for @pt in @session.
1702    */
1703   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1704       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1705       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1706       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 2, G_TYPE_UINT,
1707       G_TYPE_UINT);
1708
1709     /**
1710    * GstRtpBin::payload-type-change:
1711    * @rtpbin: the object which received the signal
1712    * @session: the session
1713    * @pt: the pt
1714    *
1715    * Signal that the current payload type changed to @pt in @session.
1716    */
1717   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1718       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1719       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1720       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1721       G_TYPE_UINT);
1722
1723   /**
1724    * GstRtpBin::clear-pt-map:
1725    * @rtpbin: the object which received the signal
1726    *
1727    * Clear all previously cached pt-mapping obtained with
1728    * #GstRtpBin::request-pt-map.
1729    */
1730   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1731       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1732       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1733           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1734       0, G_TYPE_NONE);
1735
1736   /**
1737    * GstRtpBin::reset-sync:
1738    * @rtpbin: the object which received the signal
1739    *
1740    * Reset all currently configured lip-sync parameters and require new SR
1741    * packets for all streams before lip-sync is attempted again.
1742    */
1743   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1744       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1745       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1746           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1747       0, G_TYPE_NONE);
1748
1749   /**
1750    * GstRtpBin::get-internal-session:
1751    * @rtpbin: the object which received the signal
1752    * @id: the session id
1753    *
1754    * Request the internal RTPSession object as #GObject in session @id.
1755    */
1756   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1757       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1758       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1759           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
1760       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1761
1762   /**
1763    * GstRtpBin::on-new-ssrc:
1764    * @rtpbin: the object which received the signal
1765    * @session: the session
1766    * @ssrc: the SSRC
1767    *
1768    * Notify of a new SSRC that entered @session.
1769    */
1770   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1771       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1772       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1773       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1774       G_TYPE_UINT);
1775   /**
1776    * GstRtpBin::on-ssrc-collision:
1777    * @rtpbin: the object which received the signal
1778    * @session: the session
1779    * @ssrc: the SSRC
1780    *
1781    * Notify when we have an SSRC collision
1782    */
1783   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1784       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1785       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1786       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1787       G_TYPE_UINT);
1788   /**
1789    * GstRtpBin::on-ssrc-validated:
1790    * @rtpbin: the object which received the signal
1791    * @session: the session
1792    * @ssrc: the SSRC
1793    *
1794    * Notify of a new SSRC that became validated.
1795    */
1796   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1797       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1798       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1799       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1800       G_TYPE_UINT);
1801   /**
1802    * GstRtpBin::on-ssrc-active:
1803    * @rtpbin: the object which received the signal
1804    * @session: the session
1805    * @ssrc: the SSRC
1806    *
1807    * Notify of a SSRC that is active, i.e., sending RTCP.
1808    */
1809   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1810       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1811       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1812       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1813       G_TYPE_UINT);
1814   /**
1815    * GstRtpBin::on-ssrc-sdes:
1816    * @rtpbin: the object which received the signal
1817    * @session: the session
1818    * @ssrc: the SSRC
1819    *
1820    * Notify of a SSRC that is active, i.e., sending RTCP.
1821    */
1822   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1823       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1824       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1825       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1826       G_TYPE_UINT);
1827
1828   /**
1829    * GstRtpBin::on-bye-ssrc:
1830    * @rtpbin: the object which received the signal
1831    * @session: the session
1832    * @ssrc: the SSRC
1833    *
1834    * Notify of an SSRC that became inactive because of a BYE packet.
1835    */
1836   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1837       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1838       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1839       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1840       G_TYPE_UINT);
1841   /**
1842    * GstRtpBin::on-bye-timeout:
1843    * @rtpbin: the object which received the signal
1844    * @session: the session
1845    * @ssrc: the SSRC
1846    *
1847    * Notify of an SSRC that has timed out because of BYE
1848    */
1849   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1850       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1851       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1852       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1853       G_TYPE_UINT);
1854   /**
1855    * GstRtpBin::on-timeout:
1856    * @rtpbin: the object which received the signal
1857    * @session: the session
1858    * @ssrc: the SSRC
1859    *
1860    * Notify of an SSRC that has timed out
1861    */
1862   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1863       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1864       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1865       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1866       G_TYPE_UINT);
1867   /**
1868    * GstRtpBin::on-sender-timeout:
1869    * @rtpbin: the object which received the signal
1870    * @session: the session
1871    * @ssrc: the SSRC
1872    *
1873    * Notify of a sender SSRC that has timed out and became a receiver
1874    */
1875   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1876       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1877       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1878       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1879       G_TYPE_UINT);
1880
1881   /**
1882    * GstRtpBin::on-npt-stop:
1883    * @rtpbin: the object which received the signal
1884    * @session: the session
1885    * @ssrc: the SSRC
1886    *
1887    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1888    */
1889   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1890       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1891       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1892       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1893       G_TYPE_UINT);
1894
1895   /**
1896    * GstRtpBin::request-rtp-encoder:
1897    * @rtpbin: the object which received the signal
1898    * @session: the session
1899    *
1900    * Request an RTP encoder element for the given @session. The encoder
1901    * element will be added to the bin if not previously added.
1902    *
1903    * If no handler is connected, no encoder will be used.
1904    */
1905   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
1906       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
1907       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1908           request_rtp_encoder), _gst_element_accumulator, NULL,
1909       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1910
1911   /**
1912    * GstRtpBin::request-rtp-decoder:
1913    * @rtpbin: the object which received the signal
1914    * @session: the session
1915    *
1916    * Request an RTP decoder element for the given @session. The decoder
1917    * element will be added to the bin if not previously added.
1918    *
1919    * If no handler is connected, no encoder will be used.
1920    */
1921   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
1922       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
1923       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1924           request_rtp_decoder), _gst_element_accumulator, NULL,
1925       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1926
1927   /**
1928    * GstRtpBin::request-rtcp-encoder:
1929    * @rtpbin: the object which received the signal
1930    * @session: the session
1931    *
1932    * Request an RTCP encoder element for the given @session. The encoder
1933    * element will be added to the bin if not previously added.
1934    *
1935    * If no handler is connected, no encoder will be used.
1936    */
1937   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
1938       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
1939       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1940           request_rtcp_encoder), _gst_element_accumulator, NULL,
1941       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1942
1943   /**
1944    * GstRtpBin::request-rtcp-decoder:
1945    * @rtpbin: the object which received the signal
1946    * @session: the session
1947    *
1948    * Request an RTCP decoder element for the given @session. The decoder
1949    * element will be added to the bin if not previously added.
1950    *
1951    * If no handler is connected, no encoder will be used.
1952    */
1953   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
1954       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
1955       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1956           request_rtcp_decoder), _gst_element_accumulator, NULL,
1957       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1958
1959   g_object_class_install_property (gobject_class, PROP_SDES,
1960       g_param_spec_boxed ("sdes", "SDES",
1961           "The SDES items of this session",
1962           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1963
1964   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1965       g_param_spec_boolean ("do-lost", "Do Lost",
1966           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1967           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1968
1969   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
1970       g_param_spec_boolean ("autoremove", "Auto Remove",
1971           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
1972           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1973
1974   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
1975       g_param_spec_boolean ("ignore-pt", "Ignore PT",
1976           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
1977           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1978
1979   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
1980       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
1981           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
1982           DEFAULT_USE_PIPELINE_CLOCK,
1983           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1984   /**
1985    * GstRtpBin:buffer-mode:
1986    *
1987    * Control the buffering and timestamping mode used by the jitterbuffer.
1988    */
1989   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
1990       g_param_spec_enum ("buffer-mode", "Buffer Mode",
1991           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
1992           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1993   /**
1994    * GstRtpBin:ntp-sync:
1995    *
1996    * Set the NTP time from the sender reports as the running-time on the
1997    * buffers. When both the sender and receiver have sychronized
1998    * running-time, i.e. when the clock and base-time is shared
1999    * between the receivers and the and the senders, this option can be
2000    * used to synchronize receivers on multiple machines.
2001    */
2002   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2003       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2004           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2005           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2006
2007   /**
2008    * GstRtpBin:rtcp-sync:
2009    *
2010    * If not synchronizing (directly) to the NTP clock, determines how to sync
2011    * the various streams.
2012    */
2013   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2014       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2015           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2016           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2017
2018   /**
2019    * GstRtpBin:rtcp-sync-interval:
2020    *
2021    * Determines how often to sync streams using RTCP data.
2022    */
2023   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2024       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2025           "RTCP SR interval synchronization (ms) (0 = always)",
2026           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2027           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2028
2029   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2030       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2031           "Send event downstream when a stream is synchronized to the sender",
2032           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2033
2034   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2035       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2036           "Send an event downstream to request packet retransmission",
2037           DEFAULT_DO_RETRANSMISSION,
2038           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2039
2040   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2041   gstelement_class->request_new_pad =
2042       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2043   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2044
2045   /* sink pads */
2046   gst_element_class_add_pad_template (gstelement_class,
2047       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
2048   gst_element_class_add_pad_template (gstelement_class,
2049       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
2050   gst_element_class_add_pad_template (gstelement_class,
2051       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
2052
2053   /* src pads */
2054   gst_element_class_add_pad_template (gstelement_class,
2055       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
2056   gst_element_class_add_pad_template (gstelement_class,
2057       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
2058   gst_element_class_add_pad_template (gstelement_class,
2059       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
2060
2061   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2062       "Filter/Network/RTP",
2063       "Real-Time Transport Protocol bin",
2064       "Wim Taymans <wim.taymans@gmail.com>");
2065
2066   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2067
2068   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2069   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2070   klass->get_internal_session =
2071       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2072   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2073   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2074   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2075   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2076
2077   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2078 }
2079
2080 static void
2081 gst_rtp_bin_init (GstRtpBin * rtpbin)
2082 {
2083   gchar *cname;
2084
2085   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
2086   g_mutex_init (&rtpbin->priv->bin_lock);
2087   g_mutex_init (&rtpbin->priv->dyn_lock);
2088
2089   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2090   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2091   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2092   rtpbin->do_lost = DEFAULT_DO_LOST;
2093   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2094   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2095   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2096   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2097   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2098   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2099   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2100   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2101   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2102
2103   /* some default SDES entries */
2104   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2105   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2106       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2107   g_free (cname);
2108 }
2109
2110 static void
2111 gst_rtp_bin_dispose (GObject * object)
2112 {
2113   GstRtpBin *rtpbin;
2114
2115   rtpbin = GST_RTP_BIN (object);
2116
2117   GST_RTP_BIN_LOCK (rtpbin);
2118   GST_DEBUG_OBJECT (object, "freeing sessions");
2119   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2120   g_slist_free (rtpbin->sessions);
2121   rtpbin->sessions = NULL;
2122   GST_RTP_BIN_UNLOCK (rtpbin);
2123
2124   G_OBJECT_CLASS (parent_class)->dispose (object);
2125 }
2126
2127 static void
2128 gst_rtp_bin_finalize (GObject * object)
2129 {
2130   GstRtpBin *rtpbin;
2131
2132   rtpbin = GST_RTP_BIN (object);
2133
2134   if (rtpbin->sdes)
2135     gst_structure_free (rtpbin->sdes);
2136
2137   g_mutex_clear (&rtpbin->priv->bin_lock);
2138   g_mutex_clear (&rtpbin->priv->dyn_lock);
2139
2140   G_OBJECT_CLASS (parent_class)->finalize (object);
2141 }
2142
2143
2144 static void
2145 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
2146 {
2147   GSList *item;
2148
2149   if (sdes == NULL)
2150     return;
2151
2152   GST_RTP_BIN_LOCK (bin);
2153
2154   GST_OBJECT_LOCK (bin);
2155   if (bin->sdes)
2156     gst_structure_free (bin->sdes);
2157   bin->sdes = gst_structure_copy (sdes);
2158   GST_OBJECT_UNLOCK (bin);
2159
2160   /* store in all sessions */
2161   for (item = bin->sessions; item; item = g_slist_next (item)) {
2162     GstRtpBinSession *session = item->data;
2163     g_object_set (session->session, "sdes", sdes, NULL);
2164   }
2165
2166   GST_RTP_BIN_UNLOCK (bin);
2167 }
2168
2169 static GstStructure *
2170 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2171 {
2172   GstStructure *result;
2173
2174   GST_OBJECT_LOCK (bin);
2175   result = gst_structure_copy (bin->sdes);
2176   GST_OBJECT_UNLOCK (bin);
2177
2178   return result;
2179 }
2180
2181 static void
2182 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2183     const GValue * value, GParamSpec * pspec)
2184 {
2185   GstRtpBin *rtpbin;
2186
2187   rtpbin = GST_RTP_BIN (object);
2188
2189   switch (prop_id) {
2190     case PROP_LATENCY:
2191       GST_RTP_BIN_LOCK (rtpbin);
2192       rtpbin->latency_ms = g_value_get_uint (value);
2193       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2194       GST_RTP_BIN_UNLOCK (rtpbin);
2195       /* propagate the property down to the jitterbuffer */
2196       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2197       break;
2198     case PROP_DROP_ON_LATENCY:
2199       GST_RTP_BIN_LOCK (rtpbin);
2200       rtpbin->drop_on_latency = g_value_get_boolean (value);
2201       GST_RTP_BIN_UNLOCK (rtpbin);
2202       /* propagate the property down to the jitterbuffer */
2203       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2204           "drop-on-latency", value);
2205       break;
2206     case PROP_SDES:
2207       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2208       break;
2209     case PROP_DO_LOST:
2210       GST_RTP_BIN_LOCK (rtpbin);
2211       rtpbin->do_lost = g_value_get_boolean (value);
2212       GST_RTP_BIN_UNLOCK (rtpbin);
2213       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2214       break;
2215     case PROP_NTP_SYNC:
2216       rtpbin->ntp_sync = g_value_get_boolean (value);
2217       break;
2218     case PROP_RTCP_SYNC:
2219       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2220       break;
2221     case PROP_RTCP_SYNC_INTERVAL:
2222       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2223       break;
2224     case PROP_IGNORE_PT:
2225       rtpbin->ignore_pt = g_value_get_boolean (value);
2226       break;
2227     case PROP_AUTOREMOVE:
2228       rtpbin->priv->autoremove = g_value_get_boolean (value);
2229       break;
2230     case PROP_USE_PIPELINE_CLOCK:
2231     {
2232       GSList *sessions;
2233       GST_RTP_BIN_LOCK (rtpbin);
2234       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2235       for (sessions = rtpbin->sessions; sessions;
2236           sessions = g_slist_next (sessions)) {
2237         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2238
2239         g_object_set (G_OBJECT (session->session),
2240             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2241       }
2242       GST_RTP_BIN_UNLOCK (rtpbin);
2243     }
2244       break;
2245     case PROP_DO_SYNC_EVENT:
2246       rtpbin->send_sync_event = g_value_get_boolean (value);
2247       break;
2248     case PROP_BUFFER_MODE:
2249       GST_RTP_BIN_LOCK (rtpbin);
2250       rtpbin->buffer_mode = g_value_get_enum (value);
2251       GST_RTP_BIN_UNLOCK (rtpbin);
2252       /* propagate the property down to the jitterbuffer */
2253       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2254       break;
2255     case PROP_DO_RETRANSMISSION:
2256       GST_RTP_BIN_LOCK (rtpbin);
2257       rtpbin->do_retransmission = g_value_get_boolean (value);
2258       GST_RTP_BIN_UNLOCK (rtpbin);
2259       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2260           "do-retransmission", value);
2261       break;
2262     default:
2263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2264       break;
2265   }
2266 }
2267
2268 static void
2269 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2270     GValue * value, GParamSpec * pspec)
2271 {
2272   GstRtpBin *rtpbin;
2273
2274   rtpbin = GST_RTP_BIN (object);
2275
2276   switch (prop_id) {
2277     case PROP_LATENCY:
2278       GST_RTP_BIN_LOCK (rtpbin);
2279       g_value_set_uint (value, rtpbin->latency_ms);
2280       GST_RTP_BIN_UNLOCK (rtpbin);
2281       break;
2282     case PROP_DROP_ON_LATENCY:
2283       GST_RTP_BIN_LOCK (rtpbin);
2284       g_value_set_boolean (value, rtpbin->drop_on_latency);
2285       GST_RTP_BIN_UNLOCK (rtpbin);
2286       break;
2287     case PROP_SDES:
2288       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2289       break;
2290     case PROP_DO_LOST:
2291       GST_RTP_BIN_LOCK (rtpbin);
2292       g_value_set_boolean (value, rtpbin->do_lost);
2293       GST_RTP_BIN_UNLOCK (rtpbin);
2294       break;
2295     case PROP_IGNORE_PT:
2296       g_value_set_boolean (value, rtpbin->ignore_pt);
2297       break;
2298     case PROP_NTP_SYNC:
2299       g_value_set_boolean (value, rtpbin->ntp_sync);
2300       break;
2301     case PROP_RTCP_SYNC:
2302       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2303       break;
2304     case PROP_RTCP_SYNC_INTERVAL:
2305       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2306       break;
2307     case PROP_AUTOREMOVE:
2308       g_value_set_boolean (value, rtpbin->priv->autoremove);
2309       break;
2310     case PROP_BUFFER_MODE:
2311       g_value_set_enum (value, rtpbin->buffer_mode);
2312       break;
2313     case PROP_USE_PIPELINE_CLOCK:
2314       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2315       break;
2316     case PROP_DO_SYNC_EVENT:
2317       g_value_set_boolean (value, rtpbin->send_sync_event);
2318       break;
2319     case PROP_DO_RETRANSMISSION:
2320       GST_RTP_BIN_LOCK (rtpbin);
2321       g_value_set_boolean (value, rtpbin->do_retransmission);
2322       GST_RTP_BIN_UNLOCK (rtpbin);
2323       break;
2324     default:
2325       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2326       break;
2327   }
2328 }
2329
2330 static void
2331 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2332 {
2333   GstRtpBin *rtpbin;
2334
2335   rtpbin = GST_RTP_BIN (bin);
2336
2337   switch (GST_MESSAGE_TYPE (message)) {
2338     case GST_MESSAGE_ELEMENT:
2339     {
2340       const GstStructure *s = gst_message_get_structure (message);
2341
2342       /* we change the structure name and add the session ID to it */
2343       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2344         GstRtpBinSession *sess;
2345
2346         /* find the session we set it as object data */
2347         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2348             "GstRTPBin.session");
2349
2350         if (G_LIKELY (sess)) {
2351           message = gst_message_make_writable (message);
2352           s = gst_message_get_structure (message);
2353           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2354               sess->id, NULL);
2355         }
2356       }
2357       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2358       break;
2359     }
2360     case GST_MESSAGE_BUFFERING:
2361     {
2362       gint percent;
2363       gint min_percent = 100;
2364       GSList *sessions, *streams;
2365       GstRtpBinStream *stream;
2366       gboolean change = FALSE, active = FALSE;
2367       GstClockTime min_out_time;
2368       GstBufferingMode mode;
2369       gint avg_in, avg_out;
2370       gint64 buffering_left;
2371
2372       gst_message_parse_buffering (message, &percent);
2373       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2374           &buffering_left);
2375
2376       stream =
2377           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2378           "GstRTPBin.stream");
2379
2380       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2381
2382       /* get the stream */
2383       if (G_LIKELY (stream)) {
2384         GST_RTP_BIN_LOCK (rtpbin);
2385         /* fill in the percent */
2386         stream->percent = percent;
2387
2388         /* calculate the min value for all streams */
2389         for (sessions = rtpbin->sessions; sessions;
2390             sessions = g_slist_next (sessions)) {
2391           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2392
2393           GST_RTP_SESSION_LOCK (session);
2394           if (session->streams) {
2395             for (streams = session->streams; streams;
2396                 streams = g_slist_next (streams)) {
2397               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2398
2399               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2400                   stream->percent);
2401
2402               /* find min percent */
2403               if (min_percent > stream->percent)
2404                 min_percent = stream->percent;
2405             }
2406           } else {
2407             GST_INFO_OBJECT (bin,
2408                 "session has no streams, setting min_percent to 0");
2409             min_percent = 0;
2410           }
2411           GST_RTP_SESSION_UNLOCK (session);
2412         }
2413         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2414
2415         if (rtpbin->buffering) {
2416           if (min_percent == 100) {
2417             rtpbin->buffering = FALSE;
2418             active = TRUE;
2419             change = TRUE;
2420           }
2421         } else {
2422           if (min_percent < 100) {
2423             /* pause the streams */
2424             rtpbin->buffering = TRUE;
2425             active = FALSE;
2426             change = TRUE;
2427           }
2428         }
2429         GST_RTP_BIN_UNLOCK (rtpbin);
2430
2431         gst_message_unref (message);
2432
2433         /* make a new buffering message with the min value */
2434         message =
2435             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2436         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2437             buffering_left);
2438
2439         if (G_UNLIKELY (change)) {
2440           GstClock *clock;
2441           guint64 running_time = 0;
2442           guint64 offset = 0;
2443
2444           /* figure out the running time when we have a clock */
2445           if (G_LIKELY ((clock =
2446                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2447             guint64 now, base_time;
2448
2449             now = gst_clock_get_time (clock);
2450             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2451             running_time = now - base_time;
2452             gst_object_unref (clock);
2453           }
2454           GST_DEBUG_OBJECT (bin,
2455               "running time now %" GST_TIME_FORMAT,
2456               GST_TIME_ARGS (running_time));
2457
2458           GST_RTP_BIN_LOCK (rtpbin);
2459
2460           /* when we reactivate, calculate the offsets so that all streams have
2461            * an output time that is at least as big as the running_time */
2462           offset = 0;
2463           if (active) {
2464             if (running_time > rtpbin->buffer_start) {
2465               offset = running_time - rtpbin->buffer_start;
2466               if (offset >= rtpbin->latency_ns)
2467                 offset -= rtpbin->latency_ns;
2468               else
2469                 offset = 0;
2470             }
2471           }
2472
2473           /* pause all streams */
2474           min_out_time = -1;
2475           for (sessions = rtpbin->sessions; sessions;
2476               sessions = g_slist_next (sessions)) {
2477             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2478
2479             GST_RTP_SESSION_LOCK (session);
2480             for (streams = session->streams; streams;
2481                 streams = g_slist_next (streams)) {
2482               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2483               GstElement *element = stream->buffer;
2484               guint64 last_out;
2485
2486               g_signal_emit_by_name (element, "set-active", active, offset,
2487                   &last_out);
2488
2489               if (!active) {
2490                 g_object_get (element, "percent", &stream->percent, NULL);
2491
2492                 if (last_out == -1)
2493                   last_out = 0;
2494                 if (min_out_time == -1 || last_out < min_out_time)
2495                   min_out_time = last_out;
2496               }
2497
2498               GST_DEBUG_OBJECT (bin,
2499                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2500                   GST_TIME_FORMAT ", percent %d", element, active,
2501                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2502                   stream->percent);
2503             }
2504             GST_RTP_SESSION_UNLOCK (session);
2505           }
2506           GST_DEBUG_OBJECT (bin,
2507               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2508
2509           /* the buffer_start is the min out time of all paused jitterbuffers */
2510           if (!active)
2511             rtpbin->buffer_start = min_out_time;
2512
2513           GST_RTP_BIN_UNLOCK (rtpbin);
2514         }
2515       }
2516       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2517       break;
2518     }
2519     default:
2520     {
2521       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2522       break;
2523     }
2524   }
2525 }
2526
2527 static GstStateChangeReturn
2528 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2529 {
2530   GstStateChangeReturn res;
2531   GstRtpBin *rtpbin;
2532   GstRtpBinPrivate *priv;
2533
2534   rtpbin = GST_RTP_BIN (element);
2535   priv = rtpbin->priv;
2536
2537   switch (transition) {
2538     case GST_STATE_CHANGE_NULL_TO_READY:
2539       break;
2540     case GST_STATE_CHANGE_READY_TO_PAUSED:
2541       priv->last_unix = 0;
2542       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2543       g_atomic_int_set (&priv->shutdown, 0);
2544       break;
2545     case GST_STATE_CHANGE_PAUSED_TO_READY:
2546       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2547       g_atomic_int_set (&priv->shutdown, 1);
2548       /* wait for all callbacks to end by taking the lock. No new callbacks will
2549        * be able to happen as we set the shutdown flag. */
2550       GST_RTP_BIN_DYN_LOCK (rtpbin);
2551       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2552       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2553       break;
2554     default:
2555       break;
2556   }
2557
2558   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2559
2560   switch (transition) {
2561     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2562       break;
2563     case GST_STATE_CHANGE_PAUSED_TO_READY:
2564       break;
2565     case GST_STATE_CHANGE_READY_TO_NULL:
2566       break;
2567     default:
2568       break;
2569   }
2570   return res;
2571 }
2572
2573 static GstElement *
2574 session_request_encoder (GstRtpBinSession * session, guint signal)
2575 {
2576   GstElement *encoder = NULL;
2577   GstRtpBin *bin = session->bin;
2578
2579   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &encoder);
2580
2581   if (encoder) {
2582     if (!bin_manage_element (bin, encoder))
2583       goto manage_failed;
2584     session->encoders = g_slist_prepend (session->encoders, encoder);
2585   }
2586   return encoder;
2587
2588   /* ERRORS */
2589 manage_failed:
2590   {
2591     GST_WARNING_OBJECT (bin, "unable to manage encoder");
2592     gst_object_unref (encoder);
2593     return NULL;
2594   }
2595 }
2596
2597 static GstElement *
2598 session_request_decoder (GstRtpBinSession * session, guint signal)
2599 {
2600   GstElement *decoder = NULL;
2601   GstRtpBin *bin = session->bin;
2602
2603   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &decoder);
2604
2605   if (decoder) {
2606     if (!bin_manage_element (bin, decoder))
2607       goto manage_failed;
2608     session->decoders = g_slist_prepend (session->decoders, decoder);
2609   }
2610   return decoder;
2611
2612   /* ERRORS */
2613 manage_failed:
2614   {
2615     GST_WARNING_OBJECT (bin, "unable to manage decoder");
2616     gst_object_unref (decoder);
2617     return NULL;
2618   }
2619 }
2620
2621 /* a new pad (SSRC) was created in @session. This signal is emited from the
2622  * payload demuxer. */
2623 static void
2624 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2625     GstRtpBinStream * stream)
2626 {
2627   GstRtpBin *rtpbin;
2628   GstElementClass *klass;
2629   GstPadTemplate *templ;
2630   gchar *padname;
2631   GstPad *gpad;
2632
2633   rtpbin = stream->bin;
2634
2635   GST_DEBUG ("new payload pad %d", pt);
2636
2637   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2638
2639   /* ghost the pad to the parent */
2640   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2641   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2642   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2643       stream->session->id, stream->ssrc, pt);
2644   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2645   g_free (padname);
2646   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2647
2648   gst_pad_set_active (gpad, TRUE);
2649   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2650
2651   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2652
2653   return;
2654
2655 shutdown:
2656   {
2657     GST_DEBUG ("ignoring, we are shutting down");
2658     return;
2659   }
2660 }
2661
2662 static void
2663 payload_pad_removed (GstElement * element, GstPad * pad,
2664     GstRtpBinStream * stream)
2665 {
2666   GstRtpBin *rtpbin;
2667   GstPad *gpad;
2668
2669   rtpbin = stream->bin;
2670
2671   GST_DEBUG ("payload pad removed");
2672
2673   GST_RTP_BIN_DYN_LOCK (rtpbin);
2674   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2675     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2676
2677     gst_pad_set_active (gpad, FALSE);
2678     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2679   }
2680   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2681 }
2682
2683 static GstCaps *
2684 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2685 {
2686   GstRtpBin *rtpbin;
2687   GstCaps *caps;
2688
2689   rtpbin = session->bin;
2690
2691   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2692       session->id);
2693
2694   caps = get_pt_map (session, pt);
2695   if (!caps)
2696     goto no_caps;
2697
2698   return caps;
2699
2700   /* ERRORS */
2701 no_caps:
2702   {
2703     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2704     return NULL;
2705   }
2706 }
2707
2708 static void
2709 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2710 {
2711   GST_DEBUG_OBJECT (session->bin,
2712       "emiting signal for pt type changed to %d in session %d", pt,
2713       session->id);
2714
2715   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2716       0, session->id, pt);
2717 }
2718
2719 /* emited when caps changed for the session */
2720 static void
2721 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2722 {
2723   GstRtpBin *bin;
2724   GstCaps *caps;
2725   gint payload;
2726   const GstStructure *s;
2727
2728   bin = session->bin;
2729
2730   g_object_get (pad, "caps", &caps, NULL);
2731
2732   if (caps == NULL)
2733     return;
2734
2735   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2736
2737   s = gst_caps_get_structure (caps, 0);
2738
2739   /* get payload, finish when it's not there */
2740   if (!gst_structure_get_int (s, "payload", &payload))
2741     return;
2742
2743   GST_RTP_SESSION_LOCK (session);
2744   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2745   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2746   GST_RTP_SESSION_UNLOCK (session);
2747 }
2748
2749 /* a new pad (SSRC) was created in @session */
2750 static void
2751 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2752     GstRtpBinSession * session)
2753 {
2754   GstRtpBin *rtpbin;
2755   GstRtpBinStream *stream;
2756   GstPad *sinkpad, *srcpad;
2757   gchar *padname;
2758
2759   rtpbin = session->bin;
2760
2761   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2762       GST_DEBUG_PAD_NAME (pad));
2763
2764   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2765
2766   GST_RTP_SESSION_LOCK (session);
2767
2768   /* create new stream */
2769   stream = create_stream (session, ssrc);
2770   if (!stream)
2771     goto no_stream;
2772
2773   /* get pad and link */
2774   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2775   padname = g_strdup_printf ("src_%u", ssrc);
2776   srcpad = gst_element_get_static_pad (element, padname);
2777   g_free (padname);
2778   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2779   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2780   gst_object_unref (sinkpad);
2781   gst_object_unref (srcpad);
2782
2783   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2784   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2785   srcpad = gst_element_get_static_pad (element, padname);
2786   g_free (padname);
2787   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2788   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2789   gst_object_unref (sinkpad);
2790   gst_object_unref (srcpad);
2791
2792   /* connect to the RTCP sync signal from the jitterbuffer */
2793   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2794   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2795       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2796
2797   if (stream->demux) {
2798     /* connect to the new-pad signal of the payload demuxer, this will expose the
2799      * new pad by ghosting it. */
2800     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2801         "new-payload-type", (GCallback) new_payload_found, stream);
2802     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2803         "pad-removed", (GCallback) payload_pad_removed, stream);
2804
2805     /* connect to the request-pt-map signal. This signal will be emited by the
2806      * demuxer so that it can apply a proper caps on the buffers for the
2807      * depayloaders. */
2808     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2809         "request-pt-map", (GCallback) pt_map_requested, session);
2810     /* connect to the  signal so it can be forwarded. */
2811     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2812         "payload-type-change", (GCallback) payload_type_change, session);
2813   } else {
2814     /* add rtpjitterbuffer src pad to pads */
2815     GstElementClass *klass;
2816     GstPadTemplate *templ;
2817     gchar *padname;
2818     GstPad *gpad, *pad;
2819
2820     pad = gst_element_get_static_pad (stream->buffer, "src");
2821
2822     /* ghost the pad to the parent */
2823     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2824     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2825     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2826         stream->session->id, stream->ssrc, 255);
2827     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2828     g_free (padname);
2829
2830     gst_pad_set_active (gpad, TRUE);
2831     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2832
2833     gst_object_unref (pad);
2834   }
2835
2836   GST_RTP_SESSION_UNLOCK (session);
2837   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2838
2839   return;
2840
2841   /* ERRORS */
2842 shutdown:
2843   {
2844     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2845     return;
2846   }
2847 no_stream:
2848   {
2849     GST_RTP_SESSION_UNLOCK (session);
2850     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2851     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2852     return;
2853   }
2854 }
2855
2856 /* Create a pad for receiving RTP for the session in @name. Must be called with
2857  * RTP_BIN_LOCK.
2858  */
2859 static GstPad *
2860 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2861 {
2862   guint sessid;
2863   GstElement *decoder;
2864   GstPad *sinkdpad, *decsink;
2865   GstRtpBinSession *session;
2866
2867   /* first get the session number */
2868   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2869     goto no_name;
2870
2871   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2872
2873   /* get or create session */
2874   session = find_session_by_id (rtpbin, sessid);
2875   if (!session) {
2876     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2877     /* create session now */
2878     session = create_session (rtpbin, sessid);
2879     if (session == NULL)
2880       goto create_error;
2881   }
2882
2883   /* check if pad was requested */
2884   if (session->recv_rtp_sink_ghost != NULL)
2885     return session->recv_rtp_sink_ghost;
2886
2887   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2888   /* get recv_rtp pad and store */
2889   session->recv_rtp_sink =
2890       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2891   if (session->recv_rtp_sink == NULL)
2892     goto pad_failed;
2893
2894   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2895       (GCallback) caps_changed, session);
2896
2897   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
2898   decoder = session_request_decoder (session, SIGNAL_REQUEST_RTP_DECODER);
2899   if (decoder) {
2900     GstPad *decsrc;
2901     GstPadLinkReturn ret;
2902
2903     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
2904     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
2905     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
2906
2907     if (decsink == NULL)
2908       goto dec_sink_failed;
2909
2910     if (decsrc == NULL)
2911       goto dec_src_failed;
2912
2913     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
2914     gst_object_unref (decsrc);
2915
2916     if (ret != GST_PAD_LINK_OK)
2917       goto dec_link_failed;
2918   } else {
2919     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
2920     decsink = gst_object_ref (session->recv_rtp_sink);
2921   }
2922
2923   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2924   /* get srcpad, link to SSRCDemux */
2925   session->recv_rtp_src =
2926       gst_element_get_static_pad (session->session, "recv_rtp_src");
2927   if (session->recv_rtp_src == NULL)
2928     goto src_pad_failed;
2929
2930   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2931   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2932   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2933   gst_pad_link_full (session->recv_rtp_src, sinkdpad,
2934       GST_PAD_LINK_CHECK_NOTHING);
2935   gst_object_unref (sinkdpad);
2936
2937   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2938   session->demux_newpad_sig = g_signal_connect (session->demux,
2939       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2940   session->demux_padremoved_sig = g_signal_connect (session->demux,
2941       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2942
2943   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2944   session->recv_rtp_sink_ghost =
2945       gst_ghost_pad_new_from_template (name, decsink, templ);
2946   gst_object_unref (decsink);
2947   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2948   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2949
2950   return session->recv_rtp_sink_ghost;
2951
2952   /* ERRORS */
2953 no_name:
2954   {
2955     g_warning ("rtpbin: invalid name given");
2956     return NULL;
2957   }
2958 create_error:
2959   {
2960     /* create_session already warned */
2961     return NULL;
2962   }
2963 pad_failed:
2964   {
2965     g_warning ("rtpbin: failed to get session rtp_sink pad");
2966     return NULL;
2967   }
2968 dec_sink_failed:
2969   {
2970     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
2971     return NULL;
2972   }
2973 dec_src_failed:
2974   {
2975     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
2976     gst_object_unref (decsink);
2977     return NULL;
2978   }
2979 dec_link_failed:
2980   {
2981     g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid);
2982     gst_object_unref (decsink);
2983     return NULL;
2984   }
2985 src_pad_failed:
2986   {
2987     g_warning ("rtpbin: failed to get session rtp_src pad");
2988     gst_object_unref (decsink);
2989     return NULL;
2990   }
2991 }
2992
2993 static void
2994 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2995 {
2996   if (session->demux_newpad_sig) {
2997     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
2998     session->demux_newpad_sig = 0;
2999   }
3000   if (session->demux_padremoved_sig) {
3001     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
3002     session->demux_padremoved_sig = 0;
3003   }
3004   if (session->recv_rtp_src) {
3005     gst_object_unref (session->recv_rtp_src);
3006     session->recv_rtp_src = NULL;
3007   }
3008   if (session->recv_rtp_sink) {
3009     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
3010     gst_object_unref (session->recv_rtp_sink);
3011     session->recv_rtp_sink = NULL;
3012   }
3013   if (session->recv_rtp_sink_ghost) {
3014     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
3015     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3016         session->recv_rtp_sink_ghost);
3017     session->recv_rtp_sink_ghost = NULL;
3018   }
3019 }
3020
3021 /* Create a pad for receiving RTCP for the session in @name. Must be called with
3022  * RTP_BIN_LOCK.
3023  */
3024 static GstPad *
3025 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
3026     const gchar * name)
3027 {
3028   guint sessid;
3029   GstElement *decoder;
3030   GstRtpBinSession *session;
3031   GstPad *sinkdpad, *decsink;
3032
3033   /* first get the session number */
3034   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
3035     goto no_name;
3036
3037   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
3038
3039   /* get or create the session */
3040   session = find_session_by_id (rtpbin, sessid);
3041   if (!session) {
3042     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
3043     /* create session now */
3044     session = create_session (rtpbin, sessid);
3045     if (session == NULL)
3046       goto create_error;
3047   }
3048
3049   /* check if pad was requested */
3050   if (session->recv_rtcp_sink_ghost != NULL)
3051     return session->recv_rtcp_sink_ghost;
3052
3053   /* get recv_rtp pad and store */
3054   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
3055   session->recv_rtcp_sink =
3056       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
3057   if (session->recv_rtcp_sink == NULL)
3058     goto pad_failed;
3059
3060   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
3061   decoder = session_request_decoder (session, SIGNAL_REQUEST_RTCP_DECODER);
3062   if (decoder) {
3063     GstPad *decsrc;
3064     GstPadLinkReturn ret;
3065
3066     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
3067     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
3068     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
3069
3070     if (decsink == NULL)
3071       goto dec_sink_failed;
3072
3073     if (decsrc == NULL)
3074       goto dec_src_failed;
3075
3076     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
3077     gst_object_unref (decsrc);
3078
3079     if (ret != GST_PAD_LINK_OK)
3080       goto dec_link_failed;
3081   } else {
3082     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
3083     decsink = gst_object_ref (session->recv_rtcp_sink);
3084   }
3085
3086   /* get srcpad, link to SSRCDemux */
3087   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
3088   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
3089   if (session->sync_src == NULL)
3090     goto src_pad_failed;
3091
3092   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
3093   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
3094   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3095   gst_object_unref (sinkdpad);
3096
3097   session->recv_rtcp_sink_ghost =
3098       gst_ghost_pad_new_from_template (name, decsink, templ);
3099   gst_object_unref (decsink);
3100   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
3101   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
3102       session->recv_rtcp_sink_ghost);
3103
3104   return session->recv_rtcp_sink_ghost;
3105
3106   /* ERRORS */
3107 no_name:
3108   {
3109     g_warning ("rtpbin: invalid name given");
3110     return NULL;
3111   }
3112 create_error:
3113   {
3114     /* create_session already warned */
3115     return NULL;
3116   }
3117 pad_failed:
3118   {
3119     g_warning ("rtpbin: failed to get session rtcp_sink pad");
3120     return NULL;
3121   }
3122 dec_sink_failed:
3123   {
3124     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
3125     return NULL;
3126   }
3127 dec_src_failed:
3128   {
3129     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
3130     gst_object_unref (decsink);
3131     return NULL;
3132   }
3133 dec_link_failed:
3134   {
3135     g_warning ("rtpbin: failed to link rtcp decoder for session %d", sessid);
3136     gst_object_unref (decsink);
3137     return NULL;
3138   }
3139 src_pad_failed:
3140   {
3141     g_warning ("rtpbin: failed to get session sync_src pad");
3142     gst_object_unref (decsink);
3143     return NULL;
3144   }
3145 }
3146
3147 static void
3148 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3149 {
3150   if (session->recv_rtcp_sink_ghost) {
3151     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
3152     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3153         session->recv_rtcp_sink_ghost);
3154     session->recv_rtcp_sink_ghost = NULL;
3155   }
3156   if (session->sync_src) {
3157     /* releasing the request pad should also unref the sync pad */
3158     gst_object_unref (session->sync_src);
3159     session->sync_src = NULL;
3160   }
3161   if (session->recv_rtcp_sink) {
3162     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
3163     gst_object_unref (session->recv_rtcp_sink);
3164     session->recv_rtcp_sink = NULL;
3165   }
3166 }
3167
3168 /* Create a pad for sending RTP for the session in @name. Must be called with
3169  * RTP_BIN_LOCK.
3170  */
3171 static GstPad *
3172 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3173 {
3174   gchar *gname;
3175   guint sessid;
3176   GstPad *encsrc;
3177   GstElement *encoder;
3178   GstRtpBinSession *session;
3179   GstElementClass *klass;
3180
3181   /* first get the session number */
3182   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
3183     goto no_name;
3184
3185   /* get or create session */
3186   session = find_session_by_id (rtpbin, sessid);
3187   if (!session) {
3188     /* create session now */
3189     session = create_session (rtpbin, sessid);
3190     if (session == NULL)
3191       goto create_error;
3192   }
3193
3194   /* check if pad was requested */
3195   if (session->send_rtp_sink_ghost != NULL)
3196     return session->send_rtp_sink_ghost;
3197
3198   /* get send_rtp pad and store */
3199   session->send_rtp_sink =
3200       gst_element_get_request_pad (session->session, "send_rtp_sink");
3201   if (session->send_rtp_sink == NULL)
3202     goto pad_failed;
3203
3204   session->send_rtp_sink_ghost =
3205       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
3206   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
3207   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
3208
3209   /* get srcpad */
3210   session->send_rtp_src =
3211       gst_element_get_static_pad (session->session, "send_rtp_src");
3212   if (session->send_rtp_src == NULL)
3213     goto no_srcpad;
3214
3215   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
3216   encoder = session_request_encoder (session, SIGNAL_REQUEST_RTP_ENCODER);
3217   if (encoder) {
3218     gchar *ename;
3219     GstPad *encsink;
3220     GstPadLinkReturn ret;
3221
3222     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
3223     ename = g_strdup_printf ("rtp_sink_%d", sessid);
3224     encsink = gst_element_get_static_pad (encoder, ename);
3225     g_free (ename);
3226     ename = g_strdup_printf ("rtp_src_%d", sessid);
3227     encsrc = gst_element_get_static_pad (encoder, ename);
3228     g_free (ename);
3229
3230     if (encsrc == NULL)
3231       goto enc_src_failed;
3232
3233     if (encsink == NULL)
3234       goto enc_sink_failed;
3235
3236     ret = gst_pad_link (session->send_rtp_src, encsink);
3237     gst_object_unref (encsink);
3238
3239     if (ret != GST_PAD_LINK_OK)
3240       goto enc_link_failed;
3241   } else {
3242     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
3243     encsrc = gst_object_ref (session->send_rtp_src);
3244   }
3245
3246   /* ghost the new source pad */
3247   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3248   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
3249   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
3250   session->send_rtp_src_ghost =
3251       gst_ghost_pad_new_from_template (gname, encsrc, templ);
3252   gst_object_unref (encsrc);
3253   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
3254   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
3255   g_free (gname);
3256
3257   return session->send_rtp_sink_ghost;
3258
3259   /* ERRORS */
3260 no_name:
3261   {
3262     g_warning ("rtpbin: invalid name given");
3263     return NULL;
3264   }
3265 create_error:
3266   {
3267     /* create_session already warned */
3268     return NULL;
3269   }
3270 pad_failed:
3271   {
3272     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
3273     return NULL;
3274   }
3275 no_srcpad:
3276   {
3277     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
3278     return NULL;
3279   }
3280 enc_src_failed:
3281   {
3282     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3283     return NULL;
3284   }
3285 enc_sink_failed:
3286   {
3287     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3288     gst_object_unref (encsrc);
3289     return NULL;
3290   }
3291 enc_link_failed:
3292   {
3293     g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid);
3294     gst_object_unref (encsrc);
3295     return NULL;
3296   }
3297 }
3298
3299 static void
3300 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3301 {
3302   if (session->send_rtp_src_ghost) {
3303     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
3304     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3305         session->send_rtp_src_ghost);
3306     session->send_rtp_src_ghost = NULL;
3307   }
3308   if (session->send_rtp_src) {
3309     gst_object_unref (session->send_rtp_src);
3310     session->send_rtp_src = NULL;
3311   }
3312   if (session->send_rtp_sink) {
3313     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
3314         session->send_rtp_sink);
3315     gst_object_unref (session->send_rtp_sink);
3316     session->send_rtp_sink = NULL;
3317   }
3318   if (session->send_rtp_sink_ghost) {
3319     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
3320     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3321         session->send_rtp_sink_ghost);
3322     session->send_rtp_sink_ghost = NULL;
3323   }
3324 }
3325
3326 /* Create a pad for sending RTCP for the session in @name. Must be called with
3327  * RTP_BIN_LOCK.
3328  */
3329 static GstPad *
3330 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3331 {
3332   guint sessid;
3333   GstPad *encsrc;
3334   GstElement *encoder;
3335   GstRtpBinSession *session;
3336
3337   /* first get the session number */
3338   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
3339     goto no_name;
3340
3341   /* get or create session */
3342   session = find_session_by_id (rtpbin, sessid);
3343   if (!session)
3344     goto no_session;
3345
3346   /* check if pad was requested */
3347   if (session->send_rtcp_src_ghost != NULL)
3348     return session->send_rtcp_src_ghost;
3349
3350   /* get rtcp_src pad and store */
3351   session->send_rtcp_src =
3352       gst_element_get_request_pad (session->session, "send_rtcp_src");
3353   if (session->send_rtcp_src == NULL)
3354     goto pad_failed;
3355
3356   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
3357   encoder = session_request_encoder (session, SIGNAL_REQUEST_RTCP_ENCODER);
3358   if (encoder) {
3359     gchar *ename;
3360     GstPad *encsink;
3361     GstPadLinkReturn ret;
3362
3363     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
3364     ename = g_strdup_printf ("rtcp_sink_%d", sessid);
3365     encsink = gst_element_get_static_pad (encoder, ename);
3366     g_free (ename);
3367     ename = g_strdup_printf ("rtcp_src_%d", sessid);
3368     encsrc = gst_element_get_static_pad (encoder, ename);
3369     g_free (ename);
3370
3371     if (encsrc == NULL)
3372       goto enc_src_failed;
3373
3374     if (encsink == NULL)
3375       goto enc_sink_failed;
3376
3377     ret = gst_pad_link (session->send_rtcp_src, encsink);
3378     gst_object_unref (encsink);
3379
3380     if (ret != GST_PAD_LINK_OK)
3381       goto enc_link_failed;
3382   } else {
3383     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
3384     encsrc = gst_object_ref (session->send_rtcp_src);
3385   }
3386
3387   session->send_rtcp_src_ghost =
3388       gst_ghost_pad_new_from_template (name, encsrc, templ);
3389   gst_object_unref (encsrc);
3390   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
3391   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
3392
3393   return session->send_rtcp_src_ghost;
3394
3395   /* ERRORS */
3396 no_name:
3397   {
3398     g_warning ("rtpbin: invalid name given");
3399     return NULL;
3400   }
3401 no_session:
3402   {
3403     g_warning ("rtpbin: session with id %d does not exist", sessid);
3404     return NULL;
3405   }
3406 pad_failed:
3407   {
3408     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
3409     return NULL;
3410   }
3411 enc_src_failed:
3412   {
3413     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3414     return NULL;
3415   }
3416 enc_sink_failed:
3417   {
3418     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3419     gst_object_unref (encsrc);
3420     return NULL;
3421   }
3422 enc_link_failed:
3423   {
3424     g_warning ("rtpbin: failed to link rtcp encoder for session %d", sessid);
3425     gst_object_unref (encsrc);
3426     return NULL;
3427   }
3428 }
3429
3430 static void
3431 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3432 {
3433   if (session->send_rtcp_src_ghost) {
3434     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
3435     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3436         session->send_rtcp_src_ghost);
3437     session->send_rtcp_src_ghost = NULL;
3438   }
3439   if (session->send_rtcp_src) {
3440     gst_element_release_request_pad (session->session, session->send_rtcp_src);
3441     gst_object_unref (session->send_rtcp_src);
3442     session->send_rtcp_src = NULL;
3443   }
3444 }
3445
3446 /* If the requested name is NULL we should create a name with
3447  * the session number assuming we want the lowest posible session
3448  * with a free pad like the template */
3449 static gchar *
3450 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
3451 {
3452   gboolean name_found = FALSE;
3453   gint session = 0;
3454   GstIterator *pad_it = NULL;
3455   gchar *pad_name = NULL;
3456   GValue data = { 0, };
3457
3458   GST_DEBUG_OBJECT (element, "find a free pad name for template");
3459   while (!name_found) {
3460     gboolean done = FALSE;
3461
3462     g_free (pad_name);
3463     pad_name = g_strdup_printf (templ->name_template, session++);
3464     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
3465     name_found = TRUE;
3466     while (!done) {
3467       switch (gst_iterator_next (pad_it, &data)) {
3468         case GST_ITERATOR_OK:
3469         {
3470           GstPad *pad;
3471           gchar *name;
3472
3473           pad = g_value_get_object (&data);
3474           name = gst_pad_get_name (pad);
3475
3476           if (strcmp (name, pad_name) == 0) {
3477             done = TRUE;
3478             name_found = FALSE;
3479           }
3480           g_free (name);
3481           g_value_reset (&data);
3482           break;
3483         }
3484         case GST_ITERATOR_ERROR:
3485         case GST_ITERATOR_RESYNC:
3486           /* restart iteration */
3487           done = TRUE;
3488           name_found = FALSE;
3489           session = 0;
3490           break;
3491         case GST_ITERATOR_DONE:
3492           done = TRUE;
3493           break;
3494       }
3495     }
3496     g_value_unset (&data);
3497     gst_iterator_free (pad_it);
3498   }
3499
3500   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3501   return pad_name;
3502 }
3503
3504 /*
3505  */
3506 static GstPad *
3507 gst_rtp_bin_request_new_pad (GstElement * element,
3508     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3509 {
3510   GstRtpBin *rtpbin;
3511   GstElementClass *klass;
3512   GstPad *result;
3513
3514   gchar *pad_name = NULL;
3515
3516   g_return_val_if_fail (templ != NULL, NULL);
3517   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3518
3519   rtpbin = GST_RTP_BIN (element);
3520   klass = GST_ELEMENT_GET_CLASS (element);
3521
3522   GST_RTP_BIN_LOCK (rtpbin);
3523
3524   if (name == NULL) {
3525     /* use a free pad name */
3526     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3527   } else {
3528     /* use the provided name */
3529     pad_name = g_strdup (name);
3530   }
3531
3532   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3533
3534   /* figure out the template */
3535   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3536     result = create_recv_rtp (rtpbin, templ, pad_name);
3537   } else if (templ == gst_element_class_get_pad_template (klass,
3538           "recv_rtcp_sink_%u")) {
3539     result = create_recv_rtcp (rtpbin, templ, pad_name);
3540   } else if (templ == gst_element_class_get_pad_template (klass,
3541           "send_rtp_sink_%u")) {
3542     result = create_send_rtp (rtpbin, templ, pad_name);
3543   } else if (templ == gst_element_class_get_pad_template (klass,
3544           "send_rtcp_src_%u")) {
3545     result = create_rtcp (rtpbin, templ, pad_name);
3546   } else
3547     goto wrong_template;
3548
3549   g_free (pad_name);
3550   GST_RTP_BIN_UNLOCK (rtpbin);
3551
3552   return result;
3553
3554   /* ERRORS */
3555 wrong_template:
3556   {
3557     g_free (pad_name);
3558     GST_RTP_BIN_UNLOCK (rtpbin);
3559     g_warning ("rtpbin: this is not our template");
3560     return NULL;
3561   }
3562 }
3563
3564 static void
3565 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3566 {
3567   GstRtpBinSession *session;
3568   GstRtpBin *rtpbin;
3569
3570   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3571   g_return_if_fail (GST_IS_RTP_BIN (element));
3572
3573   rtpbin = GST_RTP_BIN (element);
3574
3575   GST_RTP_BIN_LOCK (rtpbin);
3576   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3577       GST_DEBUG_PAD_NAME (pad));
3578
3579   if (!(session = find_session_by_pad (rtpbin, pad)))
3580     goto unknown_pad;
3581
3582   if (session->recv_rtp_sink_ghost == pad) {
3583     remove_recv_rtp (rtpbin, session);
3584   } else if (session->recv_rtcp_sink_ghost == pad) {
3585     remove_recv_rtcp (rtpbin, session);
3586   } else if (session->send_rtp_sink_ghost == pad) {
3587     remove_send_rtp (rtpbin, session);
3588   } else if (session->send_rtcp_src_ghost == pad) {
3589     remove_rtcp (rtpbin, session);
3590   }
3591
3592   /* no more request pads, free the complete session */
3593   if (session->recv_rtp_sink_ghost == NULL
3594       && session->recv_rtcp_sink_ghost == NULL
3595       && session->send_rtp_sink_ghost == NULL
3596       && session->send_rtcp_src_ghost == NULL) {
3597     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3598     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3599     free_session (session, rtpbin);
3600   }
3601   GST_RTP_BIN_UNLOCK (rtpbin);
3602
3603   return;
3604
3605   /* ERROR */
3606 unknown_pad:
3607   {
3608     GST_RTP_BIN_UNLOCK (rtpbin);
3609     g_warning ("rtpbin: %s:%s is not one of our request pads",
3610         GST_DEBUG_PAD_NAME (pad));
3611     return;
3612   }
3613 }