rtpbin: add signal for new jitterbuffer
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of rtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
67  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
68  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
69  * and decoders in order to support SRTP. The encoders must provide the pads
70  * rtp_sink_\%d and rtp_src_\%d for RTP and rtcp_sink_\%d and rtcp_src_\%d for
71  * RTCP. The session number will be used in the pad name. The decoders must provide
72  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
73  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
74  * internally.
75  *
76  * <refsect2>
77  * <title>Example pipelines</title>
78  * |[
79  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
80  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
81  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
82  * |[
83  * gst-launch-1.0 rtpbin name=rtpbin \
84  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
85  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
86  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
87  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
88  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
89  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
90  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
91  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
92  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
93  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
94  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
95  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
96  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
97  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
98  * is received on port 5007. Since RTCP packets from the sender should be sent
99  * as soon as possible and do not participate in preroll, sync=false and
100  * async=false is configured on udpsink
101  * |[
102  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
103  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
104  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
105  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
106  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
107  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
108  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
109  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
110  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
111  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
112  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
113  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
114  * decode and display the video.
115  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
116  * decode and play the audio.
117  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
118  * session 1 on port 5003. These packets will be used for session management and
119  * synchronisation.
120  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
121  * on port 5007.
122  * </refsect2>
123  *
124  * Last reviewed on 2007-08-30 (0.10.6)
125  */
126
127 #ifdef HAVE_CONFIG_H
128 #include "config.h"
129 #endif
130 #include <stdio.h>
131 #include <string.h>
132
133 #include <gst/rtp/gstrtpbuffer.h>
134 #include <gst/rtp/gstrtcpbuffer.h>
135
136 #include "gstrtpbin.h"
137 #include "rtpsession.h"
138 #include "gstrtpsession.h"
139 #include "gstrtpjitterbuffer.h"
140
141 #include <gst/glib-compat-private.h>
142
143 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
144 #define GST_CAT_DEFAULT gst_rtp_bin_debug
145
146 /* sink pads */
147 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
148     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
149     GST_PAD_SINK,
150     GST_PAD_REQUEST,
151     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
152     );
153
154 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
155     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
156     GST_PAD_SINK,
157     GST_PAD_REQUEST,
158     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
159     );
160
161 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
162 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
163     GST_PAD_SINK,
164     GST_PAD_REQUEST,
165     GST_STATIC_CAPS ("application/x-rtp")
166     );
167
168 /* src pads */
169 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
170 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
171     GST_PAD_SRC,
172     GST_PAD_SOMETIMES,
173     GST_STATIC_CAPS ("application/x-rtp")
174     );
175
176 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
177     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
178     GST_PAD_SRC,
179     GST_PAD_REQUEST,
180     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
181     );
182
183 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
184     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
185     GST_PAD_SRC,
186     GST_PAD_SOMETIMES,
187     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
188     );
189
190 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
191    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
192
193 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
194 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
195
196 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
197 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
198 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
199
200 /* lock for shutdown */
201 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
202 G_STMT_START {                                   \
203   if (g_atomic_int_get (&bin->priv->shutdown))   \
204     goto label;                                  \
205   GST_RTP_BIN_DYN_LOCK (bin);                    \
206   if (g_atomic_int_get (&bin->priv->shutdown)) { \
207     GST_RTP_BIN_DYN_UNLOCK (bin);                \
208     goto label;                                  \
209   }                                              \
210 } G_STMT_END
211
212 /* unlock for shutdown */
213 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
214   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
215
216 struct _GstRtpBinPrivate
217 {
218   GMutex bin_lock;
219
220   /* lock protecting dynamic adding/removing */
221   GMutex dyn_lock;
222
223   /* if we are shutting down or not */
224   gint shutdown;
225
226   gboolean autoremove;
227
228   /* UNIX (ntp) time of last SR sync used */
229   guint64 last_unix;
230
231   /* list of extra elements */
232   GList *elements;
233 };
234
235 /* signals and args */
236 enum
237 {
238   SIGNAL_REQUEST_PT_MAP,
239   SIGNAL_PAYLOAD_TYPE_CHANGE,
240   SIGNAL_CLEAR_PT_MAP,
241   SIGNAL_RESET_SYNC,
242   SIGNAL_GET_INTERNAL_SESSION,
243
244   SIGNAL_ON_NEW_SSRC,
245   SIGNAL_ON_SSRC_COLLISION,
246   SIGNAL_ON_SSRC_VALIDATED,
247   SIGNAL_ON_SSRC_ACTIVE,
248   SIGNAL_ON_SSRC_SDES,
249   SIGNAL_ON_BYE_SSRC,
250   SIGNAL_ON_BYE_TIMEOUT,
251   SIGNAL_ON_TIMEOUT,
252   SIGNAL_ON_SENDER_TIMEOUT,
253   SIGNAL_ON_NPT_STOP,
254
255   SIGNAL_REQUEST_RTP_ENCODER,
256   SIGNAL_REQUEST_RTP_DECODER,
257   SIGNAL_REQUEST_RTCP_ENCODER,
258   SIGNAL_REQUEST_RTCP_DECODER,
259
260   SIGNAL_NEW_JITTERBUFFER,
261
262   LAST_SIGNAL
263 };
264
265 #define DEFAULT_LATENCY_MS           200
266 #define DEFAULT_DROP_ON_LATENCY      FALSE
267 #define DEFAULT_SDES                 NULL
268 #define DEFAULT_DO_LOST              FALSE
269 #define DEFAULT_IGNORE_PT            FALSE
270 #define DEFAULT_NTP_SYNC             FALSE
271 #define DEFAULT_AUTOREMOVE           FALSE
272 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
273 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
274 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
275 #define DEFAULT_RTCP_SYNC_INTERVAL   0
276 #define DEFAULT_DO_SYNC_EVENT        FALSE
277 #define DEFAULT_DO_RETRANSMISSION    FALSE
278
279 enum
280 {
281   PROP_0,
282   PROP_LATENCY,
283   PROP_DROP_ON_LATENCY,
284   PROP_SDES,
285   PROP_DO_LOST,
286   PROP_IGNORE_PT,
287   PROP_NTP_SYNC,
288   PROP_RTCP_SYNC,
289   PROP_RTCP_SYNC_INTERVAL,
290   PROP_AUTOREMOVE,
291   PROP_BUFFER_MODE,
292   PROP_USE_PIPELINE_CLOCK,
293   PROP_DO_SYNC_EVENT,
294   PROP_DO_RETRANSMISSION,
295   PROP_LAST
296 };
297
298 enum
299 {
300   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
301   GST_RTP_BIN_RTCP_SYNC_INITIAL,
302   GST_RTP_BIN_RTCP_SYNC_RTP
303 };
304
305 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
306 static GType
307 gst_rtp_bin_rtcp_sync_get_type (void)
308 {
309   static GType rtcp_sync_type = 0;
310   static const GEnumValue rtcp_sync_types[] = {
311     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
312     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
313     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
314     {0, NULL, NULL},
315   };
316
317   if (!rtcp_sync_type) {
318     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
319   }
320   return rtcp_sync_type;
321 }
322
323 /* helper objects */
324 typedef struct _GstRtpBinSession GstRtpBinSession;
325 typedef struct _GstRtpBinStream GstRtpBinStream;
326 typedef struct _GstRtpBinClient GstRtpBinClient;
327
328 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
329
330 static GstCaps *pt_map_requested (GstElement * element, guint pt,
331     GstRtpBinSession * session);
332 static void payload_type_change (GstElement * element, guint pt,
333     GstRtpBinSession * session);
334 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
335 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
336 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
337 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
338 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
339 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
340
341 /* Manages the RTP stream for one SSRC.
342  *
343  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
344  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
345  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
346  * together (see below).
347  */
348 struct _GstRtpBinStream
349 {
350   /* the SSRC of this stream */
351   guint32 ssrc;
352
353   /* parent bin */
354   GstRtpBin *bin;
355
356   /* the session this SSRC belongs to */
357   GstRtpBinSession *session;
358
359   /* the jitterbuffer of the SSRC */
360   GstElement *buffer;
361   gulong buffer_handlesync_sig;
362   gulong buffer_ptreq_sig;
363   gulong buffer_ntpstop_sig;
364   gint percent;
365
366   /* the PT demuxer of the SSRC */
367   GstElement *demux;
368   gulong demux_newpad_sig;
369   gulong demux_padremoved_sig;
370   gulong demux_ptreq_sig;
371   gulong demux_ptchange_sig;
372
373   /* if we have calculated a valid rt_delta for this stream */
374   gboolean have_sync;
375   /* mapping to local RTP and NTP time */
376   gint64 rt_delta;
377   gint64 rtp_delta;
378   /* base rtptime in gst time */
379   gint64 clock_base;
380 };
381
382 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
383 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
384
385 /* Manages the receiving end of the packets.
386  *
387  * There is one such structure for each RTP session (audio/video/...).
388  * We get the RTP/RTCP packets and stuff them into the session manager. From
389  * there they are pushed into an SSRC demuxer that splits the stream based on
390  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
391  * the GstRtpBinStream above).
392  */
393 struct _GstRtpBinSession
394 {
395   /* session id */
396   gint id;
397   /* the parent bin */
398   GstRtpBin *bin;
399   /* the session element */
400   GstElement *session;
401   /* the SSRC demuxer */
402   GstElement *demux;
403   gulong demux_newpad_sig;
404   gulong demux_padremoved_sig;
405
406   GMutex lock;
407
408   /* list of GstRtpBinStream */
409   GSList *streams;
410
411   /* list of encoders */
412   GSList *encoders;
413
414   /* list of decoders */
415   GSList *decoders;
416
417   /* mapping of payload type to caps */
418   GHashTable *ptmap;
419
420   /* the pads of the session */
421   GstPad *recv_rtp_sink;
422   GstPad *recv_rtp_sink_ghost;
423   GstPad *recv_rtp_src;
424   GstPad *recv_rtcp_sink;
425   GstPad *recv_rtcp_sink_ghost;
426   GstPad *sync_src;
427   GstPad *send_rtp_sink;
428   GstPad *send_rtp_sink_ghost;
429   GstPad *send_rtp_src;
430   GstPad *send_rtp_src_ghost;
431   GstPad *send_rtcp_src;
432   GstPad *send_rtcp_src_ghost;
433 };
434
435 /* Manages the RTP streams that come from one client and should therefore be
436  * synchronized.
437  */
438 struct _GstRtpBinClient
439 {
440   /* the common CNAME for the streams */
441   gchar *cname;
442   guint cname_len;
443
444   /* the streams */
445   guint nstreams;
446   GSList *streams;
447 };
448
449 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
450 static GstRtpBinSession *
451 find_session_by_id (GstRtpBin * rtpbin, gint id)
452 {
453   GSList *walk;
454
455   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
456     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
457
458     if (sess->id == id)
459       return sess;
460   }
461   return NULL;
462 }
463
464 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
465 static GstRtpBinSession *
466 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
467 {
468   GSList *walk;
469
470   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
471     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
472
473     if ((sess->recv_rtp_sink_ghost == pad) ||
474         (sess->recv_rtcp_sink_ghost == pad) ||
475         (sess->send_rtp_sink_ghost == pad)
476         || (sess->send_rtcp_src_ghost == pad))
477       return sess;
478   }
479   return NULL;
480 }
481
482 static void
483 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
484 {
485   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
486       sess->id, ssrc);
487 }
488
489 static void
490 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
491 {
492   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
493       sess->id, ssrc);
494 }
495
496 static void
497 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
498 {
499   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
500       sess->id, ssrc);
501 }
502
503 static void
504 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
505 {
506   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
507       sess->id, ssrc);
508 }
509
510 static void
511 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
512 {
513   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
514       sess->id, ssrc);
515 }
516
517 static void
518 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
519 {
520   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
521       sess->id, ssrc);
522 }
523
524 static void
525 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
526 {
527   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
528       sess->id, ssrc);
529
530   if (sess->bin->priv->autoremove)
531     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
532 }
533
534 static void
535 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
536 {
537   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
538       sess->id, ssrc);
539
540   if (sess->bin->priv->autoremove)
541     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
542 }
543
544 static void
545 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
546 {
547   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
548       sess->id, ssrc);
549 }
550
551 static void
552 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
553 {
554   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
555       stream->session->id, stream->ssrc);
556 }
557
558 /* must be called with the SESSION lock */
559 static GstRtpBinStream *
560 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
561 {
562   GSList *walk;
563
564   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
565     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
566
567     if (stream->ssrc == ssrc)
568       return stream;
569   }
570   return NULL;
571 }
572
573 static void
574 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
575     GstRtpBinSession * session)
576 {
577   GstRtpBinStream *stream = NULL;
578   GstRtpBin *rtpbin;
579
580   rtpbin = session->bin;
581
582   GST_RTP_BIN_LOCK (rtpbin);
583
584   GST_RTP_SESSION_LOCK (session);
585   if ((stream = find_stream_by_ssrc (session, ssrc)))
586     session->streams = g_slist_remove (session->streams, stream);
587   GST_RTP_SESSION_UNLOCK (session);
588
589   if (stream)
590     free_stream (stream, rtpbin);
591
592   GST_RTP_BIN_UNLOCK (rtpbin);
593 }
594
595 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
596 static GstRtpBinSession *
597 create_session (GstRtpBin * rtpbin, gint id)
598 {
599   GstRtpBinSession *sess;
600   GstElement *session, *demux;
601   GstState target;
602
603   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
604     goto no_session;
605
606   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
607     goto no_demux;
608
609   sess = g_new0 (GstRtpBinSession, 1);
610   g_mutex_init (&sess->lock);
611   sess->id = id;
612   sess->bin = rtpbin;
613   sess->session = session;
614   sess->demux = demux;
615   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
616       (GDestroyNotify) gst_caps_unref);
617   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
618
619   /* configure SDES items */
620   GST_OBJECT_LOCK (rtpbin);
621   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
622       rtpbin->use_pipeline_clock, NULL);
623   GST_OBJECT_UNLOCK (rtpbin);
624
625   /* provide clock_rate to the session manager when needed */
626   g_signal_connect (session, "request-pt-map",
627       (GCallback) pt_map_requested, sess);
628
629   g_signal_connect (sess->session, "on-new-ssrc",
630       (GCallback) on_new_ssrc, sess);
631   g_signal_connect (sess->session, "on-ssrc-collision",
632       (GCallback) on_ssrc_collision, sess);
633   g_signal_connect (sess->session, "on-ssrc-validated",
634       (GCallback) on_ssrc_validated, sess);
635   g_signal_connect (sess->session, "on-ssrc-active",
636       (GCallback) on_ssrc_active, sess);
637   g_signal_connect (sess->session, "on-ssrc-sdes",
638       (GCallback) on_ssrc_sdes, sess);
639   g_signal_connect (sess->session, "on-bye-ssrc",
640       (GCallback) on_bye_ssrc, sess);
641   g_signal_connect (sess->session, "on-bye-timeout",
642       (GCallback) on_bye_timeout, sess);
643   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
644   g_signal_connect (sess->session, "on-sender-timeout",
645       (GCallback) on_sender_timeout, sess);
646
647   gst_bin_add (GST_BIN_CAST (rtpbin), session);
648   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
649
650   GST_OBJECT_LOCK (rtpbin);
651   target = GST_STATE_TARGET (rtpbin);
652   GST_OBJECT_UNLOCK (rtpbin);
653
654   /* change state only to what's needed */
655   gst_element_set_state (demux, target);
656   gst_element_set_state (session, target);
657
658   return sess;
659
660   /* ERRORS */
661 no_session:
662   {
663     g_warning ("rtpbin: could not create rtpsession element");
664     return NULL;
665   }
666 no_demux:
667   {
668     gst_object_unref (session);
669     g_warning ("rtpbin: could not create rtpssrcdemux element");
670     return NULL;
671   }
672 }
673
674 static gboolean
675 bin_manage_element (GstRtpBin * bin, GstElement * element)
676 {
677   GstRtpBinPrivate *priv = bin->priv;
678
679   if (g_list_find (priv->elements, element)) {
680     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
681   } else {
682     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
683     if (!gst_bin_add (GST_BIN_CAST (bin), element))
684       goto add_failed;
685     if (!gst_element_sync_state_with_parent (element))
686       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
687   }
688   /* we add the element multiple times, each we need an equal number of
689    * removes to really remove the element from the bin */
690   priv->elements = g_list_prepend (priv->elements, element);
691
692   return TRUE;
693
694   /* ERRORS */
695 add_failed:
696   {
697     GST_WARNING_OBJECT (bin, "unable to add element");
698     return FALSE;
699   }
700 }
701
702 static void
703 remove_bin_element (GstElement * element, GstRtpBin * bin)
704 {
705   GstRtpBinPrivate *priv = bin->priv;
706   GList *find;
707
708   find = g_list_find (priv->elements, element);
709   if (find) {
710     priv->elements = g_list_delete_link (priv->elements, find);
711
712     if (!g_list_find (priv->elements, element))
713       gst_bin_remove (GST_BIN_CAST (bin), element);
714     else
715       gst_object_unref (element);
716   }
717 }
718
719 /* called with RTP_BIN_LOCK */
720 static void
721 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
722 {
723   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
724
725   gst_element_set_locked_state (sess->demux, TRUE);
726   gst_element_set_locked_state (sess->session, TRUE);
727
728   gst_element_set_state (sess->demux, GST_STATE_NULL);
729   gst_element_set_state (sess->session, GST_STATE_NULL);
730
731   remove_recv_rtp (bin, sess);
732   remove_recv_rtcp (bin, sess);
733   remove_send_rtp (bin, sess);
734   remove_rtcp (bin, sess);
735
736   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
737   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
738
739   g_slist_foreach (sess->encoders, (GFunc) remove_bin_element, bin);
740   g_slist_free (sess->encoders);
741
742   g_slist_foreach (sess->decoders, (GFunc) remove_bin_element, bin);
743   g_slist_free (sess->decoders);
744
745   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
746   g_slist_free (sess->streams);
747
748   g_mutex_clear (&sess->lock);
749   g_hash_table_destroy (sess->ptmap);
750
751   g_free (sess);
752 }
753
754 /* get the payload type caps for the specific payload @pt in @session */
755 static GstCaps *
756 get_pt_map (GstRtpBinSession * session, guint pt)
757 {
758   GstCaps *caps = NULL;
759   GstRtpBin *bin;
760   GValue ret = { 0 };
761   GValue args[3] = { {0}, {0}, {0} };
762
763   GST_DEBUG ("searching pt %d in cache", pt);
764
765   GST_RTP_SESSION_LOCK (session);
766
767   /* first look in the cache */
768   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
769   if (caps) {
770     gst_caps_ref (caps);
771     goto done;
772   }
773
774   bin = session->bin;
775
776   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
777
778   /* not in cache, send signal to request caps */
779   g_value_init (&args[0], GST_TYPE_ELEMENT);
780   g_value_set_object (&args[0], bin);
781   g_value_init (&args[1], G_TYPE_UINT);
782   g_value_set_uint (&args[1], session->id);
783   g_value_init (&args[2], G_TYPE_UINT);
784   g_value_set_uint (&args[2], pt);
785
786   g_value_init (&ret, GST_TYPE_CAPS);
787   g_value_set_boxed (&ret, NULL);
788
789   GST_RTP_SESSION_UNLOCK (session);
790
791   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
792
793   GST_RTP_SESSION_LOCK (session);
794
795   g_value_unset (&args[0]);
796   g_value_unset (&args[1]);
797   g_value_unset (&args[2]);
798
799   /* look in the cache again because we let the lock go */
800   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
801   if (caps) {
802     gst_caps_ref (caps);
803     g_value_unset (&ret);
804     goto done;
805   }
806
807   caps = (GstCaps *) g_value_dup_boxed (&ret);
808   g_value_unset (&ret);
809   if (!caps)
810     goto no_caps;
811
812   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
813
814   /* store in cache, take additional ref */
815   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
816       gst_caps_ref (caps));
817
818 done:
819   GST_RTP_SESSION_UNLOCK (session);
820
821   return caps;
822
823   /* ERRORS */
824 no_caps:
825   {
826     GST_RTP_SESSION_UNLOCK (session);
827     GST_DEBUG ("no pt map could be obtained");
828     return NULL;
829   }
830 }
831
832 static gboolean
833 return_true (gpointer key, gpointer value, gpointer user_data)
834 {
835   return TRUE;
836 }
837
838 static void
839 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
840 {
841   GSList *clients, *streams;
842
843   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
844
845   GST_RTP_BIN_LOCK (rtpbin);
846   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
847     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
848
849     /* reset sync on all streams for this client */
850     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
851       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
852
853       /* make use require a new SR packet for this stream before we attempt new
854        * lip-sync */
855       stream->have_sync = FALSE;
856       stream->rt_delta = 0;
857       stream->rtp_delta = 0;
858       stream->clock_base = -100 * GST_SECOND;
859     }
860   }
861   GST_RTP_BIN_UNLOCK (rtpbin);
862 }
863
864 static void
865 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
866 {
867   GSList *sessions, *streams;
868
869   GST_RTP_BIN_LOCK (bin);
870   GST_DEBUG_OBJECT (bin, "clearing pt map");
871   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
872     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
873
874     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
875     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
876
877     GST_RTP_SESSION_LOCK (session);
878     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
879
880     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
881       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
882
883       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
884       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
885       if (stream->demux)
886         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
887     }
888     GST_RTP_SESSION_UNLOCK (session);
889   }
890   GST_RTP_BIN_UNLOCK (bin);
891
892   /* reset sync too */
893   gst_rtp_bin_reset_sync (bin);
894 }
895
896 static RTPSession *
897 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
898 {
899   RTPSession *internal_session = NULL;
900   GstRtpBinSession *session;
901
902   GST_RTP_BIN_LOCK (bin);
903   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
904       session_id);
905   session = find_session_by_id (bin, (gint) session_id);
906   if (session) {
907     g_object_get (session->session, "internal-session", &internal_session,
908         NULL);
909   }
910   GST_RTP_BIN_UNLOCK (bin);
911
912   return internal_session;
913 }
914
915 static GstElement *
916 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
917 {
918   GST_DEBUG_OBJECT (bin, "return NULL encoder");
919   return NULL;
920 }
921
922 static GstElement *
923 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
924 {
925   GST_DEBUG_OBJECT (bin, "return NULL decoder");
926   return NULL;
927 }
928
929 static void
930 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
931     const gchar * name, const GValue * value)
932 {
933   GSList *sessions, *streams;
934
935   GST_RTP_BIN_LOCK (bin);
936   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
937     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
938
939     GST_RTP_SESSION_LOCK (session);
940     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
941       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
942
943       g_object_set_property (G_OBJECT (stream->buffer), name, value);
944     }
945     GST_RTP_SESSION_UNLOCK (session);
946   }
947   GST_RTP_BIN_UNLOCK (bin);
948 }
949
950 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
951 static GstRtpBinClient *
952 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
953 {
954   GstRtpBinClient *result = NULL;
955   GSList *walk;
956
957   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
958     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
959
960     if (len != client->cname_len)
961       continue;
962
963     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
964       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
965           client->cname);
966       result = client;
967       break;
968     }
969   }
970
971   /* nothing found, create one */
972   if (result == NULL) {
973     result = g_new0 (GstRtpBinClient, 1);
974     result->cname = g_strndup ((gchar *) data, len);
975     result->cname_len = len;
976     bin->clients = g_slist_prepend (bin->clients, result);
977     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
978         result->cname);
979   }
980   return result;
981 }
982
983 static void
984 free_client (GstRtpBinClient * client, GstRtpBin * bin)
985 {
986   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
987   g_slist_free (client->streams);
988   g_free (client->cname);
989   g_free (client);
990 }
991
992 static void
993 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
994     guint64 * ntpnstime)
995 {
996   guint64 ntpns;
997   GstClock *clock;
998   GstClockTime base_time, rt, clock_time;
999
1000   GST_OBJECT_LOCK (bin);
1001   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1002     base_time = GST_ELEMENT_CAST (bin)->base_time;
1003     gst_object_ref (clock);
1004     GST_OBJECT_UNLOCK (bin);
1005
1006     clock_time = gst_clock_get_time (clock);
1007
1008     if (bin->use_pipeline_clock) {
1009       ntpns = clock_time - base_time;
1010     } else {
1011       GTimeVal current;
1012
1013       /* get current NTP time */
1014       g_get_current_time (&current);
1015       ntpns = GST_TIMEVAL_TO_TIME (current);
1016     }
1017
1018     /* add constant to convert from 1970 based time to 1900 based time */
1019     ntpns += (2208988800LL * GST_SECOND);
1020
1021     /* get current clock time and convert to running time */
1022     rt = clock_time - base_time;
1023
1024     gst_object_unref (clock);
1025   } else {
1026     GST_OBJECT_UNLOCK (bin);
1027     rt = -1;
1028     ntpns = -1;
1029   }
1030   if (running_time)
1031     *running_time = rt;
1032   if (ntpnstime)
1033     *ntpnstime = ntpns;
1034 }
1035
1036 static void
1037 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1038     gint64 ts_offset, gboolean check)
1039 {
1040   gint64 prev_ts_offset;
1041
1042   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1043
1044   /* delta changed, see how much */
1045   if (prev_ts_offset != ts_offset) {
1046     gint64 diff;
1047
1048     diff = prev_ts_offset - ts_offset;
1049
1050     GST_DEBUG_OBJECT (bin,
1051         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1052         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1053
1054     if (check) {
1055       /* only change diff when it changed more than 4 milliseconds. This
1056        * compensates for rounding errors in NTP to RTP timestamp
1057        * conversions */
1058       if (ABS (diff) < 4 * GST_MSECOND) {
1059         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1060         return;
1061       }
1062       if (ABS (diff) > (3 * GST_SECOND)) {
1063         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
1064         return;
1065       }
1066     }
1067     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
1068   }
1069   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1070       stream->ssrc, ts_offset);
1071 }
1072
1073 static void
1074 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1075 {
1076   if (stream->bin->send_sync_event) {
1077     GstEvent *event;
1078     GstPad *srcpad;
1079
1080     GST_DEBUG_OBJECT (stream->bin,
1081         "sending GstRTCPSRReceived event downstream");
1082
1083     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1084         gst_structure_new_empty ("GstRTCPSRReceived"));
1085
1086     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1087     gst_pad_push_event (srcpad, event);
1088     gst_object_unref (srcpad);
1089   }
1090 }
1091
1092 /* associate a stream to the given CNAME. This will make sure all streams for
1093  * that CNAME are synchronized together.
1094  * Must be called with GST_RTP_BIN_LOCK */
1095 static void
1096 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1097     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1098     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1099     gint64 rtp_clock_base)
1100 {
1101   GstRtpBinClient *client;
1102   gboolean created;
1103   GSList *walk;
1104   guint64 local_rt;
1105   guint64 local_rtp;
1106   GstClockTime running_time;
1107   guint64 ntpnstime;
1108   gint64 ntpdiff, rtdiff;
1109   guint64 last_unix;
1110
1111   /* first find or create the CNAME */
1112   client = get_client (bin, len, data, &created);
1113
1114   /* find stream in the client */
1115   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1116     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1117
1118     if (ostream == stream)
1119       break;
1120   }
1121   /* not found, add it to the list */
1122   if (walk == NULL) {
1123     GST_DEBUG_OBJECT (bin,
1124         "new association of SSRC %08x with client %p with CNAME %s",
1125         stream->ssrc, client, client->cname);
1126     client->streams = g_slist_prepend (client->streams, stream);
1127     client->nstreams++;
1128   } else {
1129     GST_DEBUG_OBJECT (bin,
1130         "found association of SSRC %08x with client %p with CNAME %s",
1131         stream->ssrc, client, client->cname);
1132   }
1133
1134   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1135     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1136     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1137       /* we don't need that data, so carry on,
1138        * but make some values look saner */
1139       last_extrtptime = base_rtptime;
1140     } else {
1141       /* nothing we can do with this data in this case */
1142       GST_DEBUG_OBJECT (bin, "bailing out");
1143       return;
1144     }
1145   }
1146
1147   /* Take the extended rtptime we found in the SR packet and map it to the
1148    * local rtptime. The local rtp time is used to construct timestamps on the
1149    * buffers so we will calculate what running_time corresponds to the RTP
1150    * timestamp in the SR packet. */
1151   local_rtp = last_extrtptime - base_rtptime;
1152
1153   GST_DEBUG_OBJECT (bin,
1154       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1155       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1156       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1157       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1158
1159   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1160    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1161    * into a corresponding gstreamer timestamp. Note that the base_time also
1162    * contains the drift between sender and receiver. */
1163   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1164   local_rt += base_time;
1165
1166   /* convert ntptime to unix time since 1900 */
1167   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1168       (G_GINT64_CONSTANT (1) << 32));
1169
1170   stream->have_sync = TRUE;
1171
1172   GST_DEBUG_OBJECT (bin,
1173       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1174       local_rt, last_unix);
1175
1176   /* recalc inter stream playout offset, but only if there is more than one
1177    * stream or we're doing NTP sync. */
1178   if (bin->ntp_sync) {
1179     /* For NTP sync we need to first get a snapshot of running_time and NTP
1180      * time. We know at what running_time we play a certain RTP time, we also
1181      * calculated when we would play the RTP time in the SR packet. Now we need
1182      * to know how the running_time and the NTP time relate to eachother. */
1183     get_current_times (bin, &running_time, &ntpnstime);
1184
1185     /* see how far away the NTP time is. This is the difference between the
1186      * current NTP time and the NTP time in the last SR packet. */
1187     ntpdiff = ntpnstime - last_unix;
1188     /* see how far away the running_time is. This is the difference between the
1189      * current running_time and the running_time of the RTP timestamp in the
1190      * last SR packet. */
1191     rtdiff = running_time - local_rt;
1192
1193     GST_DEBUG_OBJECT (bin,
1194         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1195         ntpnstime, last_unix);
1196     GST_DEBUG_OBJECT (bin,
1197         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1198         rtdiff);
1199
1200     /* combine to get the final diff to apply to the running_time */
1201     stream->rt_delta = rtdiff - ntpdiff;
1202
1203     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1204   } else {
1205     gint64 min, rtp_min, clock_base = stream->clock_base;
1206     gboolean all_sync, use_rtp;
1207     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1208
1209     /* calculate delta between server and receiver. last_unix is created by
1210      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1211      * delta expresses the difference to our timeline and the server timeline. The
1212      * difference in itself doesn't mean much but we can combine the delta of
1213      * multiple streams to create a stream specific offset. */
1214     stream->rt_delta = last_unix - local_rt;
1215
1216     /* calculate the min of all deltas, ignoring streams that did not yet have a
1217      * valid rt_delta because we did not yet receive an SR packet for those
1218      * streams.
1219      * We calculate the mininum because we would like to only apply positive
1220      * offsets to streams, delaying their playback instead of trying to speed up
1221      * other streams (which might be imposible when we have to create negative
1222      * latencies).
1223      * The stream that has the smallest diff is selected as the reference stream,
1224      * all other streams will have a positive offset to this difference. */
1225
1226     /* some alternative setting allow ignoring RTCP as much as possible,
1227      * for servers generating bogus ntp timeline */
1228     min = rtp_min = G_MAXINT64;
1229     use_rtp = FALSE;
1230     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1231       guint64 ext_base;
1232
1233       use_rtp = TRUE;
1234       /* signed version for convienience */
1235       clock_base = base_rtptime;
1236       /* deal with possible wrap-around */
1237       ext_base = base_rtptime;
1238       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1239       /* sanity check; base rtp and provided clock_base should be close */
1240       if (rtp_clock_base >= clock_base) {
1241         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1242           rtp_clock_base = base_time +
1243               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1244               GST_SECOND, clock_rate);
1245         } else {
1246           use_rtp = FALSE;
1247         }
1248       } else {
1249         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1250           rtp_clock_base = base_time -
1251               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1252               GST_SECOND, clock_rate);
1253         } else {
1254           use_rtp = FALSE;
1255         }
1256       }
1257       /* warn and bail for clarity out if no sane values */
1258       if (!use_rtp) {
1259         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1260         return;
1261       }
1262       /* store to track changes */
1263       clock_base = rtp_clock_base;
1264       /* generate a fake as before,
1265        * now equating rtptime obtained from RTP-Info,
1266        * where the large time represent the otherwise irrelevant npt/ntp time */
1267       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1268     } else {
1269       clock_base = rtp_clock_base;
1270     }
1271
1272     all_sync = TRUE;
1273     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1274       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1275
1276       if (!ostream->have_sync) {
1277         all_sync = FALSE;
1278         continue;
1279       }
1280
1281       /* change in current stream's base from previously init'ed value
1282        * leads to reset of all stream's base */
1283       if (stream != ostream && stream->clock_base >= 0 &&
1284           (stream->clock_base != clock_base)) {
1285         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1286         ostream->clock_base = -100 * GST_SECOND;
1287         ostream->rtp_delta = 0;
1288       }
1289
1290       if (ostream->rt_delta < min)
1291         min = ostream->rt_delta;
1292       if (ostream->rtp_delta < rtp_min)
1293         rtp_min = ostream->rtp_delta;
1294     }
1295
1296     /* arrange to re-sync for each stream upon significant change,
1297      * e.g. post-seek */
1298     all_sync = all_sync && (stream->clock_base == clock_base);
1299     stream->clock_base = clock_base;
1300
1301     /* may need init performed above later on, but nothing more to do now */
1302     if (client->nstreams <= 1)
1303       return;
1304
1305     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1306         " all sync %d", client, min, all_sync);
1307     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1308
1309     switch (rtcp_sync) {
1310       case GST_RTP_BIN_RTCP_SYNC_RTP:
1311         if (!use_rtp)
1312           break;
1313         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1314             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1315         /* fall-through */
1316       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1317         /* if all have been synced already, do not bother further */
1318         if (all_sync) {
1319           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1320           return;
1321         }
1322         break;
1323       default:
1324         break;
1325     }
1326
1327     /* bail out if we adjusted recently enough */
1328     if (all_sync && (last_unix - bin->priv->last_unix) <
1329         bin->rtcp_sync_interval * GST_MSECOND) {
1330       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1331           "previous sender info too recent "
1332           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1333       return;
1334     }
1335     bin->priv->last_unix = last_unix;
1336
1337     /* calculate offsets for each stream */
1338     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1339       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1340       gint64 ts_offset;
1341
1342       /* ignore streams for which we didn't receive an SR packet yet, we
1343        * can't synchronize them yet. We can however sync other streams just
1344        * fine. */
1345       if (!ostream->have_sync)
1346         continue;
1347
1348       /* calculate offset to our reference stream, this should always give a
1349        * positive number. */
1350       if (use_rtp)
1351         ts_offset = ostream->rtp_delta - rtp_min;
1352       else
1353         ts_offset = ostream->rt_delta - min;
1354
1355       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1356     }
1357   }
1358   gst_rtp_bin_send_sync_event (stream);
1359
1360   return;
1361 }
1362
1363 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1364   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1365           (b) = gst_rtcp_packet_move_to_next ((packet)))
1366
1367 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1368   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1369           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1370
1371 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1372   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1373           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1374
1375 static void
1376 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1377     GstRtpBinStream * stream)
1378 {
1379   GstRtpBin *bin;
1380   GstRTCPPacket packet;
1381   guint32 ssrc;
1382   guint64 ntptime;
1383   gboolean have_sr, have_sdes;
1384   gboolean more;
1385   guint64 base_rtptime;
1386   guint64 base_time;
1387   guint clock_rate;
1388   guint64 clock_base;
1389   guint64 extrtptime;
1390   GstBuffer *buffer;
1391   GstRTCPBuffer rtcp = { NULL, };
1392
1393   bin = stream->bin;
1394
1395   GST_DEBUG_OBJECT (bin, "sync handler called");
1396
1397   /* get the last relation between the rtp timestamps and the gstreamer
1398    * timestamps. We get this info directly from the jitterbuffer which
1399    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1400    * what the current situation is. */
1401   base_rtptime =
1402       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1403   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1404   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1405   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1406   extrtptime =
1407       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1408   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1409
1410   have_sr = FALSE;
1411   have_sdes = FALSE;
1412
1413   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1414
1415   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1416     /* first packet must be SR or RR or else the validate would have failed */
1417     switch (gst_rtcp_packet_get_type (&packet)) {
1418       case GST_RTCP_TYPE_SR:
1419         /* only parse first. There is only supposed to be one SR in the packet
1420          * but we will deal with malformed packets gracefully */
1421         if (have_sr)
1422           break;
1423         /* get NTP and RTP times */
1424         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1425             NULL, NULL);
1426
1427         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1428         /* ignore SR that is not ours */
1429         if (ssrc != stream->ssrc)
1430           continue;
1431
1432         have_sr = TRUE;
1433         break;
1434       case GST_RTCP_TYPE_SDES:
1435       {
1436         gboolean more_items, more_entries;
1437
1438         /* only deal with first SDES, there is only supposed to be one SDES in
1439          * the RTCP packet but we deal with bad packets gracefully. Also bail
1440          * out if we have not seen an SR item yet. */
1441         if (have_sdes || !have_sr)
1442           break;
1443
1444         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1445           /* skip items that are not about the SSRC of the sender */
1446           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1447             continue;
1448
1449           /* find the CNAME entry */
1450           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1451             GstRTCPSDESType type;
1452             guint8 len;
1453             guint8 *data;
1454
1455             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1456
1457             if (type == GST_RTCP_SDES_CNAME) {
1458               GST_RTP_BIN_LOCK (bin);
1459               /* associate the stream to CNAME */
1460               gst_rtp_bin_associate (bin, stream, len, data,
1461                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1462                   clock_base);
1463               GST_RTP_BIN_UNLOCK (bin);
1464             }
1465           }
1466         }
1467         have_sdes = TRUE;
1468         break;
1469       }
1470       default:
1471         /* we can ignore these packets */
1472         break;
1473     }
1474   }
1475   gst_rtcp_buffer_unmap (&rtcp);
1476 }
1477
1478 /* create a new stream with @ssrc in @session. Must be called with
1479  * RTP_SESSION_LOCK. */
1480 static GstRtpBinStream *
1481 create_stream (GstRtpBinSession * session, guint32 ssrc)
1482 {
1483   GstElement *buffer, *demux = NULL;
1484   GstRtpBinStream *stream;
1485   GstRtpBin *rtpbin;
1486   GstState target;
1487
1488   rtpbin = session->bin;
1489
1490   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1491     goto no_jitterbuffer;
1492
1493   if (!rtpbin->ignore_pt)
1494     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1495       goto no_demux;
1496
1497   stream = g_new0 (GstRtpBinStream, 1);
1498   stream->ssrc = ssrc;
1499   stream->bin = rtpbin;
1500   stream->session = session;
1501   stream->buffer = buffer;
1502   stream->demux = demux;
1503
1504   stream->have_sync = FALSE;
1505   stream->rt_delta = 0;
1506   stream->rtp_delta = 0;
1507   stream->percent = 100;
1508   stream->clock_base = -100 * GST_SECOND;
1509   session->streams = g_slist_prepend (session->streams, stream);
1510
1511   /* provide clock_rate to the jitterbuffer when needed */
1512   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1513       (GCallback) pt_map_requested, session);
1514   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1515       (GCallback) on_npt_stop, stream);
1516
1517   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1518   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1519
1520   /* configure latency and packet lost */
1521   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1522   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1523   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1524   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1525   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1526
1527   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1528       buffer, session->id, ssrc);
1529
1530   if (!rtpbin->ignore_pt)
1531     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1532   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1533
1534   /* link stuff */
1535   if (demux)
1536     gst_element_link_pads_full (buffer, "src", demux, "sink",
1537         GST_PAD_LINK_CHECK_NOTHING);
1538
1539   if (rtpbin->buffering) {
1540     guint64 last_out;
1541
1542     GST_INFO_OBJECT (rtpbin,
1543         "bin is buffering, set jitterbuffer as not active");
1544     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1545   }
1546
1547
1548   GST_OBJECT_LOCK (rtpbin);
1549   target = GST_STATE_TARGET (rtpbin);
1550   GST_OBJECT_UNLOCK (rtpbin);
1551
1552   /* from sink to source */
1553   if (demux)
1554     gst_element_set_state (demux, target);
1555
1556   gst_element_set_state (buffer, target);
1557
1558   return stream;
1559
1560   /* ERRORS */
1561 no_jitterbuffer:
1562   {
1563     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1564     return NULL;
1565   }
1566 no_demux:
1567   {
1568     gst_object_unref (buffer);
1569     g_warning ("rtpbin: could not create rtpptdemux element");
1570     return NULL;
1571   }
1572 }
1573
1574 /* called with RTP_BIN_LOCK */
1575 static void
1576 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1577 {
1578   GSList *clients, *next_client;
1579
1580   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1581
1582   if (stream->demux) {
1583     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1584     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1585     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1586   }
1587   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1588   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1589   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1590
1591   if (stream->demux)
1592     gst_element_set_locked_state (stream->demux, TRUE);
1593   gst_element_set_locked_state (stream->buffer, TRUE);
1594
1595   if (stream->demux)
1596     gst_element_set_state (stream->demux, GST_STATE_NULL);
1597   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1598
1599   /* now remove this signal, we need this while going to NULL because it to
1600    * do some cleanups */
1601   if (stream->demux)
1602     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1603
1604   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1605   if (stream->demux)
1606     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1607
1608   for (clients = bin->clients; clients; clients = next_client) {
1609     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1610     GSList *streams, *next_stream;
1611
1612     next_client = g_slist_next (clients);
1613
1614     for (streams = client->streams; streams; streams = next_stream) {
1615       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1616
1617       next_stream = g_slist_next (streams);
1618
1619       if (ostream == stream) {
1620         client->streams = g_slist_delete_link (client->streams, streams);
1621         /* If this was the last stream belonging to this client,
1622          * clean up the client. */
1623         if (--client->nstreams == 0) {
1624           bin->clients = g_slist_delete_link (bin->clients, clients);
1625           free_client (client, bin);
1626           break;
1627         }
1628       }
1629     }
1630   }
1631   g_free (stream);
1632 }
1633
1634 /* GObject vmethods */
1635 static void gst_rtp_bin_dispose (GObject * object);
1636 static void gst_rtp_bin_finalize (GObject * object);
1637 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1638     const GValue * value, GParamSpec * pspec);
1639 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1640     GValue * value, GParamSpec * pspec);
1641
1642 /* GstElement vmethods */
1643 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1644     GstStateChange transition);
1645 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1646     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1647 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1648 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1649
1650 #define gst_rtp_bin_parent_class parent_class
1651 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1652
1653 static gboolean
1654 _gst_element_accumulator (GSignalInvocationHint * ihint,
1655     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1656 {
1657   GstElement *element;
1658
1659   element = g_value_get_object (handler_return);
1660   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
1661
1662   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1663     g_value_set_object (return_accu, element);
1664
1665   /* stop emission if we have an element */
1666   return (element == NULL);
1667 }
1668
1669 static void
1670 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1671 {
1672   GObjectClass *gobject_class;
1673   GstElementClass *gstelement_class;
1674   GstBinClass *gstbin_class;
1675
1676   gobject_class = (GObjectClass *) klass;
1677   gstelement_class = (GstElementClass *) klass;
1678   gstbin_class = (GstBinClass *) klass;
1679
1680   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1681
1682   gobject_class->dispose = gst_rtp_bin_dispose;
1683   gobject_class->finalize = gst_rtp_bin_finalize;
1684   gobject_class->set_property = gst_rtp_bin_set_property;
1685   gobject_class->get_property = gst_rtp_bin_get_property;
1686
1687   g_object_class_install_property (gobject_class, PROP_LATENCY,
1688       g_param_spec_uint ("latency", "Buffer latency in ms",
1689           "Default amount of ms to buffer in the jitterbuffers", 0,
1690           G_MAXUINT, DEFAULT_LATENCY_MS,
1691           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1692
1693   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1694       g_param_spec_boolean ("drop-on-latency",
1695           "Drop buffers when maximum latency is reached",
1696           "Tells the jitterbuffer to never exceed the given latency in size",
1697           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1698
1699   /**
1700    * GstRtpBin::request-pt-map:
1701    * @rtpbin: the object which received the signal
1702    * @session: the session
1703    * @pt: the pt
1704    *
1705    * Request the payload type as #GstCaps for @pt in @session.
1706    */
1707   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1708       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1709       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1710       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 2, G_TYPE_UINT,
1711       G_TYPE_UINT);
1712
1713     /**
1714    * GstRtpBin::payload-type-change:
1715    * @rtpbin: the object which received the signal
1716    * @session: the session
1717    * @pt: the pt
1718    *
1719    * Signal that the current payload type changed to @pt in @session.
1720    */
1721   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1722       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1723       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1724       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1725       G_TYPE_UINT);
1726
1727   /**
1728    * GstRtpBin::clear-pt-map:
1729    * @rtpbin: the object which received the signal
1730    *
1731    * Clear all previously cached pt-mapping obtained with
1732    * #GstRtpBin::request-pt-map.
1733    */
1734   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1735       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1736       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1737           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1738       0, G_TYPE_NONE);
1739
1740   /**
1741    * GstRtpBin::reset-sync:
1742    * @rtpbin: the object which received the signal
1743    *
1744    * Reset all currently configured lip-sync parameters and require new SR
1745    * packets for all streams before lip-sync is attempted again.
1746    */
1747   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1748       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1749       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1750           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1751       0, G_TYPE_NONE);
1752
1753   /**
1754    * GstRtpBin::get-internal-session:
1755    * @rtpbin: the object which received the signal
1756    * @id: the session id
1757    *
1758    * Request the internal RTPSession object as #GObject in session @id.
1759    */
1760   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1761       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1762       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1763           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
1764       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1765
1766   /**
1767    * GstRtpBin::on-new-ssrc:
1768    * @rtpbin: the object which received the signal
1769    * @session: the session
1770    * @ssrc: the SSRC
1771    *
1772    * Notify of a new SSRC that entered @session.
1773    */
1774   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1775       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1776       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1777       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1778       G_TYPE_UINT);
1779   /**
1780    * GstRtpBin::on-ssrc-collision:
1781    * @rtpbin: the object which received the signal
1782    * @session: the session
1783    * @ssrc: the SSRC
1784    *
1785    * Notify when we have an SSRC collision
1786    */
1787   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1788       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1789       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1790       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1791       G_TYPE_UINT);
1792   /**
1793    * GstRtpBin::on-ssrc-validated:
1794    * @rtpbin: the object which received the signal
1795    * @session: the session
1796    * @ssrc: the SSRC
1797    *
1798    * Notify of a new SSRC that became validated.
1799    */
1800   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1801       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1802       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1803       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1804       G_TYPE_UINT);
1805   /**
1806    * GstRtpBin::on-ssrc-active:
1807    * @rtpbin: the object which received the signal
1808    * @session: the session
1809    * @ssrc: the SSRC
1810    *
1811    * Notify of a SSRC that is active, i.e., sending RTCP.
1812    */
1813   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1814       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1815       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1816       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1817       G_TYPE_UINT);
1818   /**
1819    * GstRtpBin::on-ssrc-sdes:
1820    * @rtpbin: the object which received the signal
1821    * @session: the session
1822    * @ssrc: the SSRC
1823    *
1824    * Notify of a SSRC that is active, i.e., sending RTCP.
1825    */
1826   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1827       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1828       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1829       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1830       G_TYPE_UINT);
1831
1832   /**
1833    * GstRtpBin::on-bye-ssrc:
1834    * @rtpbin: the object which received the signal
1835    * @session: the session
1836    * @ssrc: the SSRC
1837    *
1838    * Notify of an SSRC that became inactive because of a BYE packet.
1839    */
1840   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1841       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1842       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1843       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1844       G_TYPE_UINT);
1845   /**
1846    * GstRtpBin::on-bye-timeout:
1847    * @rtpbin: the object which received the signal
1848    * @session: the session
1849    * @ssrc: the SSRC
1850    *
1851    * Notify of an SSRC that has timed out because of BYE
1852    */
1853   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1854       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1855       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1856       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1857       G_TYPE_UINT);
1858   /**
1859    * GstRtpBin::on-timeout:
1860    * @rtpbin: the object which received the signal
1861    * @session: the session
1862    * @ssrc: the SSRC
1863    *
1864    * Notify of an SSRC that has timed out
1865    */
1866   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1867       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1868       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1869       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1870       G_TYPE_UINT);
1871   /**
1872    * GstRtpBin::on-sender-timeout:
1873    * @rtpbin: the object which received the signal
1874    * @session: the session
1875    * @ssrc: the SSRC
1876    *
1877    * Notify of a sender SSRC that has timed out and became a receiver
1878    */
1879   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1880       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1881       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1882       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1883       G_TYPE_UINT);
1884
1885   /**
1886    * GstRtpBin::on-npt-stop:
1887    * @rtpbin: the object which received the signal
1888    * @session: the session
1889    * @ssrc: the SSRC
1890    *
1891    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1892    */
1893   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1894       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1895       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1896       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1897       G_TYPE_UINT);
1898
1899   /**
1900    * GstRtpBin::request-rtp-encoder:
1901    * @rtpbin: the object which received the signal
1902    * @session: the session
1903    *
1904    * Request an RTP encoder element for the given @session. The encoder
1905    * element will be added to the bin if not previously added.
1906    *
1907    * If no handler is connected, no encoder will be used.
1908    */
1909   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
1910       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
1911       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1912           request_rtp_encoder), _gst_element_accumulator, NULL,
1913       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1914
1915   /**
1916    * GstRtpBin::request-rtp-decoder:
1917    * @rtpbin: the object which received the signal
1918    * @session: the session
1919    *
1920    * Request an RTP decoder element for the given @session. The decoder
1921    * element will be added to the bin if not previously added.
1922    *
1923    * If no handler is connected, no encoder will be used.
1924    */
1925   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
1926       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
1927       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1928           request_rtp_decoder), _gst_element_accumulator, NULL,
1929       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1930
1931   /**
1932    * GstRtpBin::request-rtcp-encoder:
1933    * @rtpbin: the object which received the signal
1934    * @session: the session
1935    *
1936    * Request an RTCP encoder element for the given @session. The encoder
1937    * element will be added to the bin if not previously added.
1938    *
1939    * If no handler is connected, no encoder will be used.
1940    */
1941   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
1942       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
1943       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1944           request_rtcp_encoder), _gst_element_accumulator, NULL,
1945       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1946
1947   /**
1948    * GstRtpBin::request-rtcp-decoder:
1949    * @rtpbin: the object which received the signal
1950    * @session: the session
1951    *
1952    * Request an RTCP decoder element for the given @session. The decoder
1953    * element will be added to the bin if not previously added.
1954    *
1955    * If no handler is connected, no encoder will be used.
1956    */
1957   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
1958       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
1959       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1960           request_rtcp_decoder), _gst_element_accumulator, NULL,
1961       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1962
1963   /**
1964    * GstRtpBin::new-jitterbuffer:
1965    * @rtpbin: the object which received the signal
1966    * @jitterbuffer: the new jitterbuffer
1967    * @session: the session
1968    * @ssrc: the SSRC
1969    *
1970    * Notify that a new @jitterbuffer was created for @session and @ssrc.
1971    * This signal can, for example, be used to configure @jitterbuffer.
1972    */
1973   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
1974       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
1975       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1976           new_jitterbuffer), NULL, NULL, g_cclosure_marshal_generic,
1977       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
1978
1979   g_object_class_install_property (gobject_class, PROP_SDES,
1980       g_param_spec_boxed ("sdes", "SDES",
1981           "The SDES items of this session",
1982           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1983
1984   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1985       g_param_spec_boolean ("do-lost", "Do Lost",
1986           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1987           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1988
1989   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
1990       g_param_spec_boolean ("autoremove", "Auto Remove",
1991           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
1992           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1993
1994   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
1995       g_param_spec_boolean ("ignore-pt", "Ignore PT",
1996           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
1997           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1998
1999   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2000       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2001           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
2002           DEFAULT_USE_PIPELINE_CLOCK,
2003           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2004   /**
2005    * GstRtpBin:buffer-mode:
2006    *
2007    * Control the buffering and timestamping mode used by the jitterbuffer.
2008    */
2009   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2010       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2011           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2012           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2013   /**
2014    * GstRtpBin:ntp-sync:
2015    *
2016    * Set the NTP time from the sender reports as the running-time on the
2017    * buffers. When both the sender and receiver have sychronized
2018    * running-time, i.e. when the clock and base-time is shared
2019    * between the receivers and the and the senders, this option can be
2020    * used to synchronize receivers on multiple machines.
2021    */
2022   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2023       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2024           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2025           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2026
2027   /**
2028    * GstRtpBin:rtcp-sync:
2029    *
2030    * If not synchronizing (directly) to the NTP clock, determines how to sync
2031    * the various streams.
2032    */
2033   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2034       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2035           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2036           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2037
2038   /**
2039    * GstRtpBin:rtcp-sync-interval:
2040    *
2041    * Determines how often to sync streams using RTCP data.
2042    */
2043   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2044       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2045           "RTCP SR interval synchronization (ms) (0 = always)",
2046           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2047           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2048
2049   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2050       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2051           "Send event downstream when a stream is synchronized to the sender",
2052           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2053
2054   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2055       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2056           "Send an event downstream to request packet retransmission",
2057           DEFAULT_DO_RETRANSMISSION,
2058           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2059
2060   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2061   gstelement_class->request_new_pad =
2062       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2063   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2064
2065   /* sink pads */
2066   gst_element_class_add_pad_template (gstelement_class,
2067       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
2068   gst_element_class_add_pad_template (gstelement_class,
2069       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
2070   gst_element_class_add_pad_template (gstelement_class,
2071       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
2072
2073   /* src pads */
2074   gst_element_class_add_pad_template (gstelement_class,
2075       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
2076   gst_element_class_add_pad_template (gstelement_class,
2077       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
2078   gst_element_class_add_pad_template (gstelement_class,
2079       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
2080
2081   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2082       "Filter/Network/RTP",
2083       "Real-Time Transport Protocol bin",
2084       "Wim Taymans <wim.taymans@gmail.com>");
2085
2086   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2087
2088   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2089   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2090   klass->get_internal_session =
2091       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2092   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2093   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2094   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2095   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2096
2097   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2098 }
2099
2100 static void
2101 gst_rtp_bin_init (GstRtpBin * rtpbin)
2102 {
2103   gchar *cname;
2104
2105   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
2106   g_mutex_init (&rtpbin->priv->bin_lock);
2107   g_mutex_init (&rtpbin->priv->dyn_lock);
2108
2109   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2110   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2111   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2112   rtpbin->do_lost = DEFAULT_DO_LOST;
2113   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2114   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2115   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2116   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2117   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2118   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2119   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2120   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2121   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2122
2123   /* some default SDES entries */
2124   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2125   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2126       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2127   g_free (cname);
2128 }
2129
2130 static void
2131 gst_rtp_bin_dispose (GObject * object)
2132 {
2133   GstRtpBin *rtpbin;
2134
2135   rtpbin = GST_RTP_BIN (object);
2136
2137   GST_RTP_BIN_LOCK (rtpbin);
2138   GST_DEBUG_OBJECT (object, "freeing sessions");
2139   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2140   g_slist_free (rtpbin->sessions);
2141   rtpbin->sessions = NULL;
2142   GST_RTP_BIN_UNLOCK (rtpbin);
2143
2144   G_OBJECT_CLASS (parent_class)->dispose (object);
2145 }
2146
2147 static void
2148 gst_rtp_bin_finalize (GObject * object)
2149 {
2150   GstRtpBin *rtpbin;
2151
2152   rtpbin = GST_RTP_BIN (object);
2153
2154   if (rtpbin->sdes)
2155     gst_structure_free (rtpbin->sdes);
2156
2157   g_mutex_clear (&rtpbin->priv->bin_lock);
2158   g_mutex_clear (&rtpbin->priv->dyn_lock);
2159
2160   G_OBJECT_CLASS (parent_class)->finalize (object);
2161 }
2162
2163
2164 static void
2165 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
2166 {
2167   GSList *item;
2168
2169   if (sdes == NULL)
2170     return;
2171
2172   GST_RTP_BIN_LOCK (bin);
2173
2174   GST_OBJECT_LOCK (bin);
2175   if (bin->sdes)
2176     gst_structure_free (bin->sdes);
2177   bin->sdes = gst_structure_copy (sdes);
2178   GST_OBJECT_UNLOCK (bin);
2179
2180   /* store in all sessions */
2181   for (item = bin->sessions; item; item = g_slist_next (item)) {
2182     GstRtpBinSession *session = item->data;
2183     g_object_set (session->session, "sdes", sdes, NULL);
2184   }
2185
2186   GST_RTP_BIN_UNLOCK (bin);
2187 }
2188
2189 static GstStructure *
2190 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2191 {
2192   GstStructure *result;
2193
2194   GST_OBJECT_LOCK (bin);
2195   result = gst_structure_copy (bin->sdes);
2196   GST_OBJECT_UNLOCK (bin);
2197
2198   return result;
2199 }
2200
2201 static void
2202 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2203     const GValue * value, GParamSpec * pspec)
2204 {
2205   GstRtpBin *rtpbin;
2206
2207   rtpbin = GST_RTP_BIN (object);
2208
2209   switch (prop_id) {
2210     case PROP_LATENCY:
2211       GST_RTP_BIN_LOCK (rtpbin);
2212       rtpbin->latency_ms = g_value_get_uint (value);
2213       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2214       GST_RTP_BIN_UNLOCK (rtpbin);
2215       /* propagate the property down to the jitterbuffer */
2216       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2217       break;
2218     case PROP_DROP_ON_LATENCY:
2219       GST_RTP_BIN_LOCK (rtpbin);
2220       rtpbin->drop_on_latency = g_value_get_boolean (value);
2221       GST_RTP_BIN_UNLOCK (rtpbin);
2222       /* propagate the property down to the jitterbuffer */
2223       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2224           "drop-on-latency", value);
2225       break;
2226     case PROP_SDES:
2227       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2228       break;
2229     case PROP_DO_LOST:
2230       GST_RTP_BIN_LOCK (rtpbin);
2231       rtpbin->do_lost = g_value_get_boolean (value);
2232       GST_RTP_BIN_UNLOCK (rtpbin);
2233       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2234       break;
2235     case PROP_NTP_SYNC:
2236       rtpbin->ntp_sync = g_value_get_boolean (value);
2237       break;
2238     case PROP_RTCP_SYNC:
2239       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2240       break;
2241     case PROP_RTCP_SYNC_INTERVAL:
2242       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2243       break;
2244     case PROP_IGNORE_PT:
2245       rtpbin->ignore_pt = g_value_get_boolean (value);
2246       break;
2247     case PROP_AUTOREMOVE:
2248       rtpbin->priv->autoremove = g_value_get_boolean (value);
2249       break;
2250     case PROP_USE_PIPELINE_CLOCK:
2251     {
2252       GSList *sessions;
2253       GST_RTP_BIN_LOCK (rtpbin);
2254       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2255       for (sessions = rtpbin->sessions; sessions;
2256           sessions = g_slist_next (sessions)) {
2257         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2258
2259         g_object_set (G_OBJECT (session->session),
2260             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2261       }
2262       GST_RTP_BIN_UNLOCK (rtpbin);
2263     }
2264       break;
2265     case PROP_DO_SYNC_EVENT:
2266       rtpbin->send_sync_event = g_value_get_boolean (value);
2267       break;
2268     case PROP_BUFFER_MODE:
2269       GST_RTP_BIN_LOCK (rtpbin);
2270       rtpbin->buffer_mode = g_value_get_enum (value);
2271       GST_RTP_BIN_UNLOCK (rtpbin);
2272       /* propagate the property down to the jitterbuffer */
2273       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2274       break;
2275     case PROP_DO_RETRANSMISSION:
2276       GST_RTP_BIN_LOCK (rtpbin);
2277       rtpbin->do_retransmission = g_value_get_boolean (value);
2278       GST_RTP_BIN_UNLOCK (rtpbin);
2279       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2280           "do-retransmission", value);
2281       break;
2282     default:
2283       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2284       break;
2285   }
2286 }
2287
2288 static void
2289 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2290     GValue * value, GParamSpec * pspec)
2291 {
2292   GstRtpBin *rtpbin;
2293
2294   rtpbin = GST_RTP_BIN (object);
2295
2296   switch (prop_id) {
2297     case PROP_LATENCY:
2298       GST_RTP_BIN_LOCK (rtpbin);
2299       g_value_set_uint (value, rtpbin->latency_ms);
2300       GST_RTP_BIN_UNLOCK (rtpbin);
2301       break;
2302     case PROP_DROP_ON_LATENCY:
2303       GST_RTP_BIN_LOCK (rtpbin);
2304       g_value_set_boolean (value, rtpbin->drop_on_latency);
2305       GST_RTP_BIN_UNLOCK (rtpbin);
2306       break;
2307     case PROP_SDES:
2308       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2309       break;
2310     case PROP_DO_LOST:
2311       GST_RTP_BIN_LOCK (rtpbin);
2312       g_value_set_boolean (value, rtpbin->do_lost);
2313       GST_RTP_BIN_UNLOCK (rtpbin);
2314       break;
2315     case PROP_IGNORE_PT:
2316       g_value_set_boolean (value, rtpbin->ignore_pt);
2317       break;
2318     case PROP_NTP_SYNC:
2319       g_value_set_boolean (value, rtpbin->ntp_sync);
2320       break;
2321     case PROP_RTCP_SYNC:
2322       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2323       break;
2324     case PROP_RTCP_SYNC_INTERVAL:
2325       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2326       break;
2327     case PROP_AUTOREMOVE:
2328       g_value_set_boolean (value, rtpbin->priv->autoremove);
2329       break;
2330     case PROP_BUFFER_MODE:
2331       g_value_set_enum (value, rtpbin->buffer_mode);
2332       break;
2333     case PROP_USE_PIPELINE_CLOCK:
2334       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2335       break;
2336     case PROP_DO_SYNC_EVENT:
2337       g_value_set_boolean (value, rtpbin->send_sync_event);
2338       break;
2339     case PROP_DO_RETRANSMISSION:
2340       GST_RTP_BIN_LOCK (rtpbin);
2341       g_value_set_boolean (value, rtpbin->do_retransmission);
2342       GST_RTP_BIN_UNLOCK (rtpbin);
2343       break;
2344     default:
2345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2346       break;
2347   }
2348 }
2349
2350 static void
2351 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2352 {
2353   GstRtpBin *rtpbin;
2354
2355   rtpbin = GST_RTP_BIN (bin);
2356
2357   switch (GST_MESSAGE_TYPE (message)) {
2358     case GST_MESSAGE_ELEMENT:
2359     {
2360       const GstStructure *s = gst_message_get_structure (message);
2361
2362       /* we change the structure name and add the session ID to it */
2363       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2364         GstRtpBinSession *sess;
2365
2366         /* find the session we set it as object data */
2367         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2368             "GstRTPBin.session");
2369
2370         if (G_LIKELY (sess)) {
2371           message = gst_message_make_writable (message);
2372           s = gst_message_get_structure (message);
2373           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2374               sess->id, NULL);
2375         }
2376       }
2377       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2378       break;
2379     }
2380     case GST_MESSAGE_BUFFERING:
2381     {
2382       gint percent;
2383       gint min_percent = 100;
2384       GSList *sessions, *streams;
2385       GstRtpBinStream *stream;
2386       gboolean change = FALSE, active = FALSE;
2387       GstClockTime min_out_time;
2388       GstBufferingMode mode;
2389       gint avg_in, avg_out;
2390       gint64 buffering_left;
2391
2392       gst_message_parse_buffering (message, &percent);
2393       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2394           &buffering_left);
2395
2396       stream =
2397           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2398           "GstRTPBin.stream");
2399
2400       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2401
2402       /* get the stream */
2403       if (G_LIKELY (stream)) {
2404         GST_RTP_BIN_LOCK (rtpbin);
2405         /* fill in the percent */
2406         stream->percent = percent;
2407
2408         /* calculate the min value for all streams */
2409         for (sessions = rtpbin->sessions; sessions;
2410             sessions = g_slist_next (sessions)) {
2411           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2412
2413           GST_RTP_SESSION_LOCK (session);
2414           if (session->streams) {
2415             for (streams = session->streams; streams;
2416                 streams = g_slist_next (streams)) {
2417               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2418
2419               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2420                   stream->percent);
2421
2422               /* find min percent */
2423               if (min_percent > stream->percent)
2424                 min_percent = stream->percent;
2425             }
2426           } else {
2427             GST_INFO_OBJECT (bin,
2428                 "session has no streams, setting min_percent to 0");
2429             min_percent = 0;
2430           }
2431           GST_RTP_SESSION_UNLOCK (session);
2432         }
2433         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2434
2435         if (rtpbin->buffering) {
2436           if (min_percent == 100) {
2437             rtpbin->buffering = FALSE;
2438             active = TRUE;
2439             change = TRUE;
2440           }
2441         } else {
2442           if (min_percent < 100) {
2443             /* pause the streams */
2444             rtpbin->buffering = TRUE;
2445             active = FALSE;
2446             change = TRUE;
2447           }
2448         }
2449         GST_RTP_BIN_UNLOCK (rtpbin);
2450
2451         gst_message_unref (message);
2452
2453         /* make a new buffering message with the min value */
2454         message =
2455             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2456         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2457             buffering_left);
2458
2459         if (G_UNLIKELY (change)) {
2460           GstClock *clock;
2461           guint64 running_time = 0;
2462           guint64 offset = 0;
2463
2464           /* figure out the running time when we have a clock */
2465           if (G_LIKELY ((clock =
2466                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2467             guint64 now, base_time;
2468
2469             now = gst_clock_get_time (clock);
2470             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2471             running_time = now - base_time;
2472             gst_object_unref (clock);
2473           }
2474           GST_DEBUG_OBJECT (bin,
2475               "running time now %" GST_TIME_FORMAT,
2476               GST_TIME_ARGS (running_time));
2477
2478           GST_RTP_BIN_LOCK (rtpbin);
2479
2480           /* when we reactivate, calculate the offsets so that all streams have
2481            * an output time that is at least as big as the running_time */
2482           offset = 0;
2483           if (active) {
2484             if (running_time > rtpbin->buffer_start) {
2485               offset = running_time - rtpbin->buffer_start;
2486               if (offset >= rtpbin->latency_ns)
2487                 offset -= rtpbin->latency_ns;
2488               else
2489                 offset = 0;
2490             }
2491           }
2492
2493           /* pause all streams */
2494           min_out_time = -1;
2495           for (sessions = rtpbin->sessions; sessions;
2496               sessions = g_slist_next (sessions)) {
2497             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2498
2499             GST_RTP_SESSION_LOCK (session);
2500             for (streams = session->streams; streams;
2501                 streams = g_slist_next (streams)) {
2502               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2503               GstElement *element = stream->buffer;
2504               guint64 last_out;
2505
2506               g_signal_emit_by_name (element, "set-active", active, offset,
2507                   &last_out);
2508
2509               if (!active) {
2510                 g_object_get (element, "percent", &stream->percent, NULL);
2511
2512                 if (last_out == -1)
2513                   last_out = 0;
2514                 if (min_out_time == -1 || last_out < min_out_time)
2515                   min_out_time = last_out;
2516               }
2517
2518               GST_DEBUG_OBJECT (bin,
2519                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2520                   GST_TIME_FORMAT ", percent %d", element, active,
2521                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2522                   stream->percent);
2523             }
2524             GST_RTP_SESSION_UNLOCK (session);
2525           }
2526           GST_DEBUG_OBJECT (bin,
2527               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2528
2529           /* the buffer_start is the min out time of all paused jitterbuffers */
2530           if (!active)
2531             rtpbin->buffer_start = min_out_time;
2532
2533           GST_RTP_BIN_UNLOCK (rtpbin);
2534         }
2535       }
2536       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2537       break;
2538     }
2539     default:
2540     {
2541       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2542       break;
2543     }
2544   }
2545 }
2546
2547 static GstStateChangeReturn
2548 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2549 {
2550   GstStateChangeReturn res;
2551   GstRtpBin *rtpbin;
2552   GstRtpBinPrivate *priv;
2553
2554   rtpbin = GST_RTP_BIN (element);
2555   priv = rtpbin->priv;
2556
2557   switch (transition) {
2558     case GST_STATE_CHANGE_NULL_TO_READY:
2559       break;
2560     case GST_STATE_CHANGE_READY_TO_PAUSED:
2561       priv->last_unix = 0;
2562       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2563       g_atomic_int_set (&priv->shutdown, 0);
2564       break;
2565     case GST_STATE_CHANGE_PAUSED_TO_READY:
2566       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2567       g_atomic_int_set (&priv->shutdown, 1);
2568       /* wait for all callbacks to end by taking the lock. No new callbacks will
2569        * be able to happen as we set the shutdown flag. */
2570       GST_RTP_BIN_DYN_LOCK (rtpbin);
2571       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2572       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2573       break;
2574     default:
2575       break;
2576   }
2577
2578   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2579
2580   switch (transition) {
2581     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2582       break;
2583     case GST_STATE_CHANGE_PAUSED_TO_READY:
2584       break;
2585     case GST_STATE_CHANGE_READY_TO_NULL:
2586       break;
2587     default:
2588       break;
2589   }
2590   return res;
2591 }
2592
2593 static GstElement *
2594 session_request_encoder (GstRtpBinSession * session, guint signal)
2595 {
2596   GstElement *encoder = NULL;
2597   GstRtpBin *bin = session->bin;
2598
2599   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &encoder);
2600
2601   if (encoder) {
2602     if (!bin_manage_element (bin, encoder))
2603       goto manage_failed;
2604     session->encoders = g_slist_prepend (session->encoders, encoder);
2605   }
2606   return encoder;
2607
2608   /* ERRORS */
2609 manage_failed:
2610   {
2611     GST_WARNING_OBJECT (bin, "unable to manage encoder");
2612     gst_object_unref (encoder);
2613     return NULL;
2614   }
2615 }
2616
2617 static GstElement *
2618 session_request_decoder (GstRtpBinSession * session, guint signal)
2619 {
2620   GstElement *decoder = NULL;
2621   GstRtpBin *bin = session->bin;
2622
2623   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &decoder);
2624
2625   if (decoder) {
2626     if (!bin_manage_element (bin, decoder))
2627       goto manage_failed;
2628     session->decoders = g_slist_prepend (session->decoders, decoder);
2629   }
2630   return decoder;
2631
2632   /* ERRORS */
2633 manage_failed:
2634   {
2635     GST_WARNING_OBJECT (bin, "unable to manage decoder");
2636     gst_object_unref (decoder);
2637     return NULL;
2638   }
2639 }
2640
2641 /* a new pad (SSRC) was created in @session. This signal is emited from the
2642  * payload demuxer. */
2643 static void
2644 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2645     GstRtpBinStream * stream)
2646 {
2647   GstRtpBin *rtpbin;
2648   GstElementClass *klass;
2649   GstPadTemplate *templ;
2650   gchar *padname;
2651   GstPad *gpad;
2652
2653   rtpbin = stream->bin;
2654
2655   GST_DEBUG ("new payload pad %d", pt);
2656
2657   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2658
2659   /* ghost the pad to the parent */
2660   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2661   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2662   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2663       stream->session->id, stream->ssrc, pt);
2664   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2665   g_free (padname);
2666   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2667
2668   gst_pad_set_active (gpad, TRUE);
2669   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2670
2671   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2672
2673   return;
2674
2675 shutdown:
2676   {
2677     GST_DEBUG ("ignoring, we are shutting down");
2678     return;
2679   }
2680 }
2681
2682 static void
2683 payload_pad_removed (GstElement * element, GstPad * pad,
2684     GstRtpBinStream * stream)
2685 {
2686   GstRtpBin *rtpbin;
2687   GstPad *gpad;
2688
2689   rtpbin = stream->bin;
2690
2691   GST_DEBUG ("payload pad removed");
2692
2693   GST_RTP_BIN_DYN_LOCK (rtpbin);
2694   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2695     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2696
2697     gst_pad_set_active (gpad, FALSE);
2698     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2699   }
2700   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2701 }
2702
2703 static GstCaps *
2704 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2705 {
2706   GstRtpBin *rtpbin;
2707   GstCaps *caps;
2708
2709   rtpbin = session->bin;
2710
2711   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2712       session->id);
2713
2714   caps = get_pt_map (session, pt);
2715   if (!caps)
2716     goto no_caps;
2717
2718   return caps;
2719
2720   /* ERRORS */
2721 no_caps:
2722   {
2723     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2724     return NULL;
2725   }
2726 }
2727
2728 static void
2729 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2730 {
2731   GST_DEBUG_OBJECT (session->bin,
2732       "emiting signal for pt type changed to %d in session %d", pt,
2733       session->id);
2734
2735   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2736       0, session->id, pt);
2737 }
2738
2739 /* emited when caps changed for the session */
2740 static void
2741 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2742 {
2743   GstRtpBin *bin;
2744   GstCaps *caps;
2745   gint payload;
2746   const GstStructure *s;
2747
2748   bin = session->bin;
2749
2750   g_object_get (pad, "caps", &caps, NULL);
2751
2752   if (caps == NULL)
2753     return;
2754
2755   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2756
2757   s = gst_caps_get_structure (caps, 0);
2758
2759   /* get payload, finish when it's not there */
2760   if (!gst_structure_get_int (s, "payload", &payload))
2761     return;
2762
2763   GST_RTP_SESSION_LOCK (session);
2764   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2765   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2766   GST_RTP_SESSION_UNLOCK (session);
2767 }
2768
2769 /* a new pad (SSRC) was created in @session */
2770 static void
2771 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2772     GstRtpBinSession * session)
2773 {
2774   GstRtpBin *rtpbin;
2775   GstRtpBinStream *stream;
2776   GstPad *sinkpad, *srcpad;
2777   gchar *padname;
2778
2779   rtpbin = session->bin;
2780
2781   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2782       GST_DEBUG_PAD_NAME (pad));
2783
2784   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2785
2786   GST_RTP_SESSION_LOCK (session);
2787
2788   /* create new stream */
2789   stream = create_stream (session, ssrc);
2790   if (!stream)
2791     goto no_stream;
2792
2793   /* get pad and link */
2794   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2795   padname = g_strdup_printf ("src_%u", ssrc);
2796   srcpad = gst_element_get_static_pad (element, padname);
2797   g_free (padname);
2798   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2799   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2800   gst_object_unref (sinkpad);
2801   gst_object_unref (srcpad);
2802
2803   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2804   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2805   srcpad = gst_element_get_static_pad (element, padname);
2806   g_free (padname);
2807   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2808   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2809   gst_object_unref (sinkpad);
2810   gst_object_unref (srcpad);
2811
2812   /* connect to the RTCP sync signal from the jitterbuffer */
2813   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2814   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2815       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2816
2817   if (stream->demux) {
2818     /* connect to the new-pad signal of the payload demuxer, this will expose the
2819      * new pad by ghosting it. */
2820     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2821         "new-payload-type", (GCallback) new_payload_found, stream);
2822     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2823         "pad-removed", (GCallback) payload_pad_removed, stream);
2824
2825     /* connect to the request-pt-map signal. This signal will be emited by the
2826      * demuxer so that it can apply a proper caps on the buffers for the
2827      * depayloaders. */
2828     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2829         "request-pt-map", (GCallback) pt_map_requested, session);
2830     /* connect to the  signal so it can be forwarded. */
2831     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2832         "payload-type-change", (GCallback) payload_type_change, session);
2833   } else {
2834     /* add rtpjitterbuffer src pad to pads */
2835     GstElementClass *klass;
2836     GstPadTemplate *templ;
2837     gchar *padname;
2838     GstPad *gpad, *pad;
2839
2840     pad = gst_element_get_static_pad (stream->buffer, "src");
2841
2842     /* ghost the pad to the parent */
2843     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2844     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2845     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2846         stream->session->id, stream->ssrc, 255);
2847     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2848     g_free (padname);
2849
2850     gst_pad_set_active (gpad, TRUE);
2851     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2852
2853     gst_object_unref (pad);
2854   }
2855
2856   GST_RTP_SESSION_UNLOCK (session);
2857   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2858
2859   return;
2860
2861   /* ERRORS */
2862 shutdown:
2863   {
2864     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2865     return;
2866   }
2867 no_stream:
2868   {
2869     GST_RTP_SESSION_UNLOCK (session);
2870     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2871     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2872     return;
2873   }
2874 }
2875
2876 /* Create a pad for receiving RTP for the session in @name. Must be called with
2877  * RTP_BIN_LOCK.
2878  */
2879 static GstPad *
2880 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2881 {
2882   guint sessid;
2883   GstElement *decoder;
2884   GstPad *sinkdpad, *decsink;
2885   GstRtpBinSession *session;
2886
2887   /* first get the session number */
2888   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2889     goto no_name;
2890
2891   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2892
2893   /* get or create session */
2894   session = find_session_by_id (rtpbin, sessid);
2895   if (!session) {
2896     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2897     /* create session now */
2898     session = create_session (rtpbin, sessid);
2899     if (session == NULL)
2900       goto create_error;
2901   }
2902
2903   /* check if pad was requested */
2904   if (session->recv_rtp_sink_ghost != NULL)
2905     return session->recv_rtp_sink_ghost;
2906
2907   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2908   /* get recv_rtp pad and store */
2909   session->recv_rtp_sink =
2910       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2911   if (session->recv_rtp_sink == NULL)
2912     goto pad_failed;
2913
2914   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2915       (GCallback) caps_changed, session);
2916
2917   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
2918   decoder = session_request_decoder (session, SIGNAL_REQUEST_RTP_DECODER);
2919   if (decoder) {
2920     GstPad *decsrc;
2921     GstPadLinkReturn ret;
2922
2923     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
2924     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
2925     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
2926
2927     if (decsink == NULL)
2928       goto dec_sink_failed;
2929
2930     if (decsrc == NULL)
2931       goto dec_src_failed;
2932
2933     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
2934     gst_object_unref (decsrc);
2935
2936     if (ret != GST_PAD_LINK_OK)
2937       goto dec_link_failed;
2938   } else {
2939     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
2940     decsink = gst_object_ref (session->recv_rtp_sink);
2941   }
2942
2943   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2944   /* get srcpad, link to SSRCDemux */
2945   session->recv_rtp_src =
2946       gst_element_get_static_pad (session->session, "recv_rtp_src");
2947   if (session->recv_rtp_src == NULL)
2948     goto src_pad_failed;
2949
2950   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2951   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2952   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2953   gst_pad_link_full (session->recv_rtp_src, sinkdpad,
2954       GST_PAD_LINK_CHECK_NOTHING);
2955   gst_object_unref (sinkdpad);
2956
2957   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2958   session->demux_newpad_sig = g_signal_connect (session->demux,
2959       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2960   session->demux_padremoved_sig = g_signal_connect (session->demux,
2961       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2962
2963   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2964   session->recv_rtp_sink_ghost =
2965       gst_ghost_pad_new_from_template (name, decsink, templ);
2966   gst_object_unref (decsink);
2967   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2968   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2969
2970   return session->recv_rtp_sink_ghost;
2971
2972   /* ERRORS */
2973 no_name:
2974   {
2975     g_warning ("rtpbin: invalid name given");
2976     return NULL;
2977   }
2978 create_error:
2979   {
2980     /* create_session already warned */
2981     return NULL;
2982   }
2983 pad_failed:
2984   {
2985     g_warning ("rtpbin: failed to get session rtp_sink pad");
2986     return NULL;
2987   }
2988 dec_sink_failed:
2989   {
2990     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
2991     return NULL;
2992   }
2993 dec_src_failed:
2994   {
2995     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
2996     gst_object_unref (decsink);
2997     return NULL;
2998   }
2999 dec_link_failed:
3000   {
3001     g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid);
3002     gst_object_unref (decsink);
3003     return NULL;
3004   }
3005 src_pad_failed:
3006   {
3007     g_warning ("rtpbin: failed to get session rtp_src pad");
3008     gst_object_unref (decsink);
3009     return NULL;
3010   }
3011 }
3012
3013 static void
3014 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3015 {
3016   if (session->demux_newpad_sig) {
3017     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
3018     session->demux_newpad_sig = 0;
3019   }
3020   if (session->demux_padremoved_sig) {
3021     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
3022     session->demux_padremoved_sig = 0;
3023   }
3024   if (session->recv_rtp_src) {
3025     gst_object_unref (session->recv_rtp_src);
3026     session->recv_rtp_src = NULL;
3027   }
3028   if (session->recv_rtp_sink) {
3029     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
3030     gst_object_unref (session->recv_rtp_sink);
3031     session->recv_rtp_sink = NULL;
3032   }
3033   if (session->recv_rtp_sink_ghost) {
3034     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
3035     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3036         session->recv_rtp_sink_ghost);
3037     session->recv_rtp_sink_ghost = NULL;
3038   }
3039 }
3040
3041 /* Create a pad for receiving RTCP for the session in @name. Must be called with
3042  * RTP_BIN_LOCK.
3043  */
3044 static GstPad *
3045 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
3046     const gchar * name)
3047 {
3048   guint sessid;
3049   GstElement *decoder;
3050   GstRtpBinSession *session;
3051   GstPad *sinkdpad, *decsink;
3052
3053   /* first get the session number */
3054   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
3055     goto no_name;
3056
3057   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
3058
3059   /* get or create the session */
3060   session = find_session_by_id (rtpbin, sessid);
3061   if (!session) {
3062     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
3063     /* create session now */
3064     session = create_session (rtpbin, sessid);
3065     if (session == NULL)
3066       goto create_error;
3067   }
3068
3069   /* check if pad was requested */
3070   if (session->recv_rtcp_sink_ghost != NULL)
3071     return session->recv_rtcp_sink_ghost;
3072
3073   /* get recv_rtp pad and store */
3074   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
3075   session->recv_rtcp_sink =
3076       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
3077   if (session->recv_rtcp_sink == NULL)
3078     goto pad_failed;
3079
3080   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
3081   decoder = session_request_decoder (session, SIGNAL_REQUEST_RTCP_DECODER);
3082   if (decoder) {
3083     GstPad *decsrc;
3084     GstPadLinkReturn ret;
3085
3086     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
3087     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
3088     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
3089
3090     if (decsink == NULL)
3091       goto dec_sink_failed;
3092
3093     if (decsrc == NULL)
3094       goto dec_src_failed;
3095
3096     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
3097     gst_object_unref (decsrc);
3098
3099     if (ret != GST_PAD_LINK_OK)
3100       goto dec_link_failed;
3101   } else {
3102     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
3103     decsink = gst_object_ref (session->recv_rtcp_sink);
3104   }
3105
3106   /* get srcpad, link to SSRCDemux */
3107   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
3108   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
3109   if (session->sync_src == NULL)
3110     goto src_pad_failed;
3111
3112   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
3113   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
3114   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3115   gst_object_unref (sinkdpad);
3116
3117   session->recv_rtcp_sink_ghost =
3118       gst_ghost_pad_new_from_template (name, decsink, templ);
3119   gst_object_unref (decsink);
3120   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
3121   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
3122       session->recv_rtcp_sink_ghost);
3123
3124   return session->recv_rtcp_sink_ghost;
3125
3126   /* ERRORS */
3127 no_name:
3128   {
3129     g_warning ("rtpbin: invalid name given");
3130     return NULL;
3131   }
3132 create_error:
3133   {
3134     /* create_session already warned */
3135     return NULL;
3136   }
3137 pad_failed:
3138   {
3139     g_warning ("rtpbin: failed to get session rtcp_sink pad");
3140     return NULL;
3141   }
3142 dec_sink_failed:
3143   {
3144     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
3145     return NULL;
3146   }
3147 dec_src_failed:
3148   {
3149     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
3150     gst_object_unref (decsink);
3151     return NULL;
3152   }
3153 dec_link_failed:
3154   {
3155     g_warning ("rtpbin: failed to link rtcp decoder for session %d", sessid);
3156     gst_object_unref (decsink);
3157     return NULL;
3158   }
3159 src_pad_failed:
3160   {
3161     g_warning ("rtpbin: failed to get session sync_src pad");
3162     gst_object_unref (decsink);
3163     return NULL;
3164   }
3165 }
3166
3167 static void
3168 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3169 {
3170   if (session->recv_rtcp_sink_ghost) {
3171     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
3172     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3173         session->recv_rtcp_sink_ghost);
3174     session->recv_rtcp_sink_ghost = NULL;
3175   }
3176   if (session->sync_src) {
3177     /* releasing the request pad should also unref the sync pad */
3178     gst_object_unref (session->sync_src);
3179     session->sync_src = NULL;
3180   }
3181   if (session->recv_rtcp_sink) {
3182     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
3183     gst_object_unref (session->recv_rtcp_sink);
3184     session->recv_rtcp_sink = NULL;
3185   }
3186 }
3187
3188 /* Create a pad for sending RTP for the session in @name. Must be called with
3189  * RTP_BIN_LOCK.
3190  */
3191 static GstPad *
3192 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3193 {
3194   gchar *gname;
3195   guint sessid;
3196   GstPad *encsrc;
3197   GstElement *encoder;
3198   GstRtpBinSession *session;
3199   GstElementClass *klass;
3200
3201   /* first get the session number */
3202   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
3203     goto no_name;
3204
3205   /* get or create session */
3206   session = find_session_by_id (rtpbin, sessid);
3207   if (!session) {
3208     /* create session now */
3209     session = create_session (rtpbin, sessid);
3210     if (session == NULL)
3211       goto create_error;
3212   }
3213
3214   /* check if pad was requested */
3215   if (session->send_rtp_sink_ghost != NULL)
3216     return session->send_rtp_sink_ghost;
3217
3218   /* get send_rtp pad and store */
3219   session->send_rtp_sink =
3220       gst_element_get_request_pad (session->session, "send_rtp_sink");
3221   if (session->send_rtp_sink == NULL)
3222     goto pad_failed;
3223
3224   session->send_rtp_sink_ghost =
3225       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
3226   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
3227   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
3228
3229   /* get srcpad */
3230   session->send_rtp_src =
3231       gst_element_get_static_pad (session->session, "send_rtp_src");
3232   if (session->send_rtp_src == NULL)
3233     goto no_srcpad;
3234
3235   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
3236   encoder = session_request_encoder (session, SIGNAL_REQUEST_RTP_ENCODER);
3237   if (encoder) {
3238     gchar *ename;
3239     GstPad *encsink;
3240     GstPadLinkReturn ret;
3241
3242     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
3243     ename = g_strdup_printf ("rtp_sink_%d", sessid);
3244     encsink = gst_element_get_static_pad (encoder, ename);
3245     g_free (ename);
3246     ename = g_strdup_printf ("rtp_src_%d", sessid);
3247     encsrc = gst_element_get_static_pad (encoder, ename);
3248     g_free (ename);
3249
3250     if (encsrc == NULL)
3251       goto enc_src_failed;
3252
3253     if (encsink == NULL)
3254       goto enc_sink_failed;
3255
3256     ret = gst_pad_link (session->send_rtp_src, encsink);
3257     gst_object_unref (encsink);
3258
3259     if (ret != GST_PAD_LINK_OK)
3260       goto enc_link_failed;
3261   } else {
3262     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
3263     encsrc = gst_object_ref (session->send_rtp_src);
3264   }
3265
3266   /* ghost the new source pad */
3267   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3268   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
3269   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
3270   session->send_rtp_src_ghost =
3271       gst_ghost_pad_new_from_template (gname, encsrc, templ);
3272   gst_object_unref (encsrc);
3273   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
3274   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
3275   g_free (gname);
3276
3277   return session->send_rtp_sink_ghost;
3278
3279   /* ERRORS */
3280 no_name:
3281   {
3282     g_warning ("rtpbin: invalid name given");
3283     return NULL;
3284   }
3285 create_error:
3286   {
3287     /* create_session already warned */
3288     return NULL;
3289   }
3290 pad_failed:
3291   {
3292     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
3293     return NULL;
3294   }
3295 no_srcpad:
3296   {
3297     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
3298     return NULL;
3299   }
3300 enc_src_failed:
3301   {
3302     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3303     return NULL;
3304   }
3305 enc_sink_failed:
3306   {
3307     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3308     gst_object_unref (encsrc);
3309     return NULL;
3310   }
3311 enc_link_failed:
3312   {
3313     g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid);
3314     gst_object_unref (encsrc);
3315     return NULL;
3316   }
3317 }
3318
3319 static void
3320 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3321 {
3322   if (session->send_rtp_src_ghost) {
3323     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
3324     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3325         session->send_rtp_src_ghost);
3326     session->send_rtp_src_ghost = NULL;
3327   }
3328   if (session->send_rtp_src) {
3329     gst_object_unref (session->send_rtp_src);
3330     session->send_rtp_src = NULL;
3331   }
3332   if (session->send_rtp_sink) {
3333     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
3334         session->send_rtp_sink);
3335     gst_object_unref (session->send_rtp_sink);
3336     session->send_rtp_sink = NULL;
3337   }
3338   if (session->send_rtp_sink_ghost) {
3339     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
3340     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3341         session->send_rtp_sink_ghost);
3342     session->send_rtp_sink_ghost = NULL;
3343   }
3344 }
3345
3346 /* Create a pad for sending RTCP for the session in @name. Must be called with
3347  * RTP_BIN_LOCK.
3348  */
3349 static GstPad *
3350 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3351 {
3352   guint sessid;
3353   GstPad *encsrc;
3354   GstElement *encoder;
3355   GstRtpBinSession *session;
3356
3357   /* first get the session number */
3358   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
3359     goto no_name;
3360
3361   /* get or create session */
3362   session = find_session_by_id (rtpbin, sessid);
3363   if (!session)
3364     goto no_session;
3365
3366   /* check if pad was requested */
3367   if (session->send_rtcp_src_ghost != NULL)
3368     return session->send_rtcp_src_ghost;
3369
3370   /* get rtcp_src pad and store */
3371   session->send_rtcp_src =
3372       gst_element_get_request_pad (session->session, "send_rtcp_src");
3373   if (session->send_rtcp_src == NULL)
3374     goto pad_failed;
3375
3376   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
3377   encoder = session_request_encoder (session, SIGNAL_REQUEST_RTCP_ENCODER);
3378   if (encoder) {
3379     gchar *ename;
3380     GstPad *encsink;
3381     GstPadLinkReturn ret;
3382
3383     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
3384     ename = g_strdup_printf ("rtcp_sink_%d", sessid);
3385     encsink = gst_element_get_static_pad (encoder, ename);
3386     g_free (ename);
3387     ename = g_strdup_printf ("rtcp_src_%d", sessid);
3388     encsrc = gst_element_get_static_pad (encoder, ename);
3389     g_free (ename);
3390
3391     if (encsrc == NULL)
3392       goto enc_src_failed;
3393
3394     if (encsink == NULL)
3395       goto enc_sink_failed;
3396
3397     ret = gst_pad_link (session->send_rtcp_src, encsink);
3398     gst_object_unref (encsink);
3399
3400     if (ret != GST_PAD_LINK_OK)
3401       goto enc_link_failed;
3402   } else {
3403     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
3404     encsrc = gst_object_ref (session->send_rtcp_src);
3405   }
3406
3407   session->send_rtcp_src_ghost =
3408       gst_ghost_pad_new_from_template (name, encsrc, templ);
3409   gst_object_unref (encsrc);
3410   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
3411   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
3412
3413   return session->send_rtcp_src_ghost;
3414
3415   /* ERRORS */
3416 no_name:
3417   {
3418     g_warning ("rtpbin: invalid name given");
3419     return NULL;
3420   }
3421 no_session:
3422   {
3423     g_warning ("rtpbin: session with id %d does not exist", sessid);
3424     return NULL;
3425   }
3426 pad_failed:
3427   {
3428     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
3429     return NULL;
3430   }
3431 enc_src_failed:
3432   {
3433     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3434     return NULL;
3435   }
3436 enc_sink_failed:
3437   {
3438     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3439     gst_object_unref (encsrc);
3440     return NULL;
3441   }
3442 enc_link_failed:
3443   {
3444     g_warning ("rtpbin: failed to link rtcp encoder for session %d", sessid);
3445     gst_object_unref (encsrc);
3446     return NULL;
3447   }
3448 }
3449
3450 static void
3451 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3452 {
3453   if (session->send_rtcp_src_ghost) {
3454     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
3455     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3456         session->send_rtcp_src_ghost);
3457     session->send_rtcp_src_ghost = NULL;
3458   }
3459   if (session->send_rtcp_src) {
3460     gst_element_release_request_pad (session->session, session->send_rtcp_src);
3461     gst_object_unref (session->send_rtcp_src);
3462     session->send_rtcp_src = NULL;
3463   }
3464 }
3465
3466 /* If the requested name is NULL we should create a name with
3467  * the session number assuming we want the lowest posible session
3468  * with a free pad like the template */
3469 static gchar *
3470 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
3471 {
3472   gboolean name_found = FALSE;
3473   gint session = 0;
3474   GstIterator *pad_it = NULL;
3475   gchar *pad_name = NULL;
3476   GValue data = { 0, };
3477
3478   GST_DEBUG_OBJECT (element, "find a free pad name for template");
3479   while (!name_found) {
3480     gboolean done = FALSE;
3481
3482     g_free (pad_name);
3483     pad_name = g_strdup_printf (templ->name_template, session++);
3484     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
3485     name_found = TRUE;
3486     while (!done) {
3487       switch (gst_iterator_next (pad_it, &data)) {
3488         case GST_ITERATOR_OK:
3489         {
3490           GstPad *pad;
3491           gchar *name;
3492
3493           pad = g_value_get_object (&data);
3494           name = gst_pad_get_name (pad);
3495
3496           if (strcmp (name, pad_name) == 0) {
3497             done = TRUE;
3498             name_found = FALSE;
3499           }
3500           g_free (name);
3501           g_value_reset (&data);
3502           break;
3503         }
3504         case GST_ITERATOR_ERROR:
3505         case GST_ITERATOR_RESYNC:
3506           /* restart iteration */
3507           done = TRUE;
3508           name_found = FALSE;
3509           session = 0;
3510           break;
3511         case GST_ITERATOR_DONE:
3512           done = TRUE;
3513           break;
3514       }
3515     }
3516     g_value_unset (&data);
3517     gst_iterator_free (pad_it);
3518   }
3519
3520   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3521   return pad_name;
3522 }
3523
3524 /*
3525  */
3526 static GstPad *
3527 gst_rtp_bin_request_new_pad (GstElement * element,
3528     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3529 {
3530   GstRtpBin *rtpbin;
3531   GstElementClass *klass;
3532   GstPad *result;
3533
3534   gchar *pad_name = NULL;
3535
3536   g_return_val_if_fail (templ != NULL, NULL);
3537   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3538
3539   rtpbin = GST_RTP_BIN (element);
3540   klass = GST_ELEMENT_GET_CLASS (element);
3541
3542   GST_RTP_BIN_LOCK (rtpbin);
3543
3544   if (name == NULL) {
3545     /* use a free pad name */
3546     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3547   } else {
3548     /* use the provided name */
3549     pad_name = g_strdup (name);
3550   }
3551
3552   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3553
3554   /* figure out the template */
3555   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3556     result = create_recv_rtp (rtpbin, templ, pad_name);
3557   } else if (templ == gst_element_class_get_pad_template (klass,
3558           "recv_rtcp_sink_%u")) {
3559     result = create_recv_rtcp (rtpbin, templ, pad_name);
3560   } else if (templ == gst_element_class_get_pad_template (klass,
3561           "send_rtp_sink_%u")) {
3562     result = create_send_rtp (rtpbin, templ, pad_name);
3563   } else if (templ == gst_element_class_get_pad_template (klass,
3564           "send_rtcp_src_%u")) {
3565     result = create_rtcp (rtpbin, templ, pad_name);
3566   } else
3567     goto wrong_template;
3568
3569   g_free (pad_name);
3570   GST_RTP_BIN_UNLOCK (rtpbin);
3571
3572   return result;
3573
3574   /* ERRORS */
3575 wrong_template:
3576   {
3577     g_free (pad_name);
3578     GST_RTP_BIN_UNLOCK (rtpbin);
3579     g_warning ("rtpbin: this is not our template");
3580     return NULL;
3581   }
3582 }
3583
3584 static void
3585 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3586 {
3587   GstRtpBinSession *session;
3588   GstRtpBin *rtpbin;
3589
3590   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3591   g_return_if_fail (GST_IS_RTP_BIN (element));
3592
3593   rtpbin = GST_RTP_BIN (element);
3594
3595   GST_RTP_BIN_LOCK (rtpbin);
3596   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3597       GST_DEBUG_PAD_NAME (pad));
3598
3599   if (!(session = find_session_by_pad (rtpbin, pad)))
3600     goto unknown_pad;
3601
3602   if (session->recv_rtp_sink_ghost == pad) {
3603     remove_recv_rtp (rtpbin, session);
3604   } else if (session->recv_rtcp_sink_ghost == pad) {
3605     remove_recv_rtcp (rtpbin, session);
3606   } else if (session->send_rtp_sink_ghost == pad) {
3607     remove_send_rtp (rtpbin, session);
3608   } else if (session->send_rtcp_src_ghost == pad) {
3609     remove_rtcp (rtpbin, session);
3610   }
3611
3612   /* no more request pads, free the complete session */
3613   if (session->recv_rtp_sink_ghost == NULL
3614       && session->recv_rtcp_sink_ghost == NULL
3615       && session->send_rtp_sink_ghost == NULL
3616       && session->send_rtcp_src_ghost == NULL) {
3617     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3618     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3619     free_session (session, rtpbin);
3620   }
3621   GST_RTP_BIN_UNLOCK (rtpbin);
3622
3623   return;
3624
3625   /* ERROR */
3626 unknown_pad:
3627   {
3628     GST_RTP_BIN_UNLOCK (rtpbin);
3629     g_warning ("rtpbin: %s:%s is not one of our request pads",
3630         GST_DEBUG_PAD_NAME (pad));
3631     return;
3632   }
3633 }