rtpbin: add Since tags
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of rtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
67  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
68  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
69  * and decoders in order to support SRTP. The encoders must provide the pads
70  * rtp_sink_\%d and rtp_src_\%d for RTP and rtcp_sink_\%d and rtcp_src_\%d for
71  * RTCP. The session number will be used in the pad name. The decoders must provide
72  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
73  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
74  * internally.
75  *
76  * <refsect2>
77  * <title>Example pipelines</title>
78  * |[
79  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
80  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
81  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
82  * |[
83  * gst-launch-1.0 rtpbin name=rtpbin \
84  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
85  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
86  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
87  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
88  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
89  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
90  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
91  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
92  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
93  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
94  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
95  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
96  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
97  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
98  * is received on port 5007. Since RTCP packets from the sender should be sent
99  * as soon as possible and do not participate in preroll, sync=false and
100  * async=false is configured on udpsink
101  * |[
102  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
103  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
104  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
105  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
106  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
107  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
108  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
109  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
110  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
111  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
112  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
113  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
114  * decode and display the video.
115  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
116  * decode and play the audio.
117  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
118  * session 1 on port 5003. These packets will be used for session management and
119  * synchronisation.
120  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
121  * on port 5007.
122  * </refsect2>
123  *
124  * Last reviewed on 2007-08-30 (0.10.6)
125  */
126
127 #ifdef HAVE_CONFIG_H
128 #include "config.h"
129 #endif
130 #include <stdio.h>
131 #include <string.h>
132
133 #include <gst/rtp/gstrtpbuffer.h>
134 #include <gst/rtp/gstrtcpbuffer.h>
135
136 #include "gstrtpbin.h"
137 #include "rtpsession.h"
138 #include "gstrtpsession.h"
139 #include "gstrtpjitterbuffer.h"
140
141 #include <gst/glib-compat-private.h>
142
143 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
144 #define GST_CAT_DEFAULT gst_rtp_bin_debug
145
146 /* sink pads */
147 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
148     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
149     GST_PAD_SINK,
150     GST_PAD_REQUEST,
151     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
152     );
153
154 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
155     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
156     GST_PAD_SINK,
157     GST_PAD_REQUEST,
158     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
159     );
160
161 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
162 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
163     GST_PAD_SINK,
164     GST_PAD_REQUEST,
165     GST_STATIC_CAPS ("application/x-rtp")
166     );
167
168 /* src pads */
169 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
170 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
171     GST_PAD_SRC,
172     GST_PAD_SOMETIMES,
173     GST_STATIC_CAPS ("application/x-rtp")
174     );
175
176 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
177     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
178     GST_PAD_SRC,
179     GST_PAD_REQUEST,
180     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
181     );
182
183 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
184     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
185     GST_PAD_SRC,
186     GST_PAD_SOMETIMES,
187     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
188     );
189
190 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
191    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
192
193 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
194 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
195
196 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
197 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
198 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
199
200 /* lock for shutdown */
201 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
202 G_STMT_START {                                   \
203   if (g_atomic_int_get (&bin->priv->shutdown))   \
204     goto label;                                  \
205   GST_RTP_BIN_DYN_LOCK (bin);                    \
206   if (g_atomic_int_get (&bin->priv->shutdown)) { \
207     GST_RTP_BIN_DYN_UNLOCK (bin);                \
208     goto label;                                  \
209   }                                              \
210 } G_STMT_END
211
212 /* unlock for shutdown */
213 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
214   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
215
216 struct _GstRtpBinPrivate
217 {
218   GMutex bin_lock;
219
220   /* lock protecting dynamic adding/removing */
221   GMutex dyn_lock;
222
223   /* if we are shutting down or not */
224   gint shutdown;
225
226   gboolean autoremove;
227
228   /* UNIX (ntp) time of last SR sync used */
229   guint64 last_unix;
230
231   /* list of extra elements */
232   GList *elements;
233 };
234
235 /* signals and args */
236 enum
237 {
238   SIGNAL_REQUEST_PT_MAP,
239   SIGNAL_PAYLOAD_TYPE_CHANGE,
240   SIGNAL_CLEAR_PT_MAP,
241   SIGNAL_RESET_SYNC,
242   SIGNAL_GET_INTERNAL_SESSION,
243
244   SIGNAL_ON_NEW_SSRC,
245   SIGNAL_ON_SSRC_COLLISION,
246   SIGNAL_ON_SSRC_VALIDATED,
247   SIGNAL_ON_SSRC_ACTIVE,
248   SIGNAL_ON_SSRC_SDES,
249   SIGNAL_ON_BYE_SSRC,
250   SIGNAL_ON_BYE_TIMEOUT,
251   SIGNAL_ON_TIMEOUT,
252   SIGNAL_ON_SENDER_TIMEOUT,
253   SIGNAL_ON_NPT_STOP,
254
255   SIGNAL_REQUEST_RTP_ENCODER,
256   SIGNAL_REQUEST_RTP_DECODER,
257   SIGNAL_REQUEST_RTCP_ENCODER,
258   SIGNAL_REQUEST_RTCP_DECODER,
259
260   SIGNAL_NEW_JITTERBUFFER,
261
262   LAST_SIGNAL
263 };
264
265 #define DEFAULT_LATENCY_MS           200
266 #define DEFAULT_DROP_ON_LATENCY      FALSE
267 #define DEFAULT_SDES                 NULL
268 #define DEFAULT_DO_LOST              FALSE
269 #define DEFAULT_IGNORE_PT            FALSE
270 #define DEFAULT_NTP_SYNC             FALSE
271 #define DEFAULT_AUTOREMOVE           FALSE
272 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
273 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
274 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
275 #define DEFAULT_RTCP_SYNC_INTERVAL   0
276 #define DEFAULT_DO_SYNC_EVENT        FALSE
277 #define DEFAULT_DO_RETRANSMISSION    FALSE
278
279 enum
280 {
281   PROP_0,
282   PROP_LATENCY,
283   PROP_DROP_ON_LATENCY,
284   PROP_SDES,
285   PROP_DO_LOST,
286   PROP_IGNORE_PT,
287   PROP_NTP_SYNC,
288   PROP_RTCP_SYNC,
289   PROP_RTCP_SYNC_INTERVAL,
290   PROP_AUTOREMOVE,
291   PROP_BUFFER_MODE,
292   PROP_USE_PIPELINE_CLOCK,
293   PROP_DO_SYNC_EVENT,
294   PROP_DO_RETRANSMISSION,
295   PROP_LAST
296 };
297
298 enum
299 {
300   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
301   GST_RTP_BIN_RTCP_SYNC_INITIAL,
302   GST_RTP_BIN_RTCP_SYNC_RTP
303 };
304
305 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
306 static GType
307 gst_rtp_bin_rtcp_sync_get_type (void)
308 {
309   static GType rtcp_sync_type = 0;
310   static const GEnumValue rtcp_sync_types[] = {
311     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
312     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
313     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
314     {0, NULL, NULL},
315   };
316
317   if (!rtcp_sync_type) {
318     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
319   }
320   return rtcp_sync_type;
321 }
322
323 /* helper objects */
324 typedef struct _GstRtpBinSession GstRtpBinSession;
325 typedef struct _GstRtpBinStream GstRtpBinStream;
326 typedef struct _GstRtpBinClient GstRtpBinClient;
327
328 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
329
330 static GstCaps *pt_map_requested (GstElement * element, guint pt,
331     GstRtpBinSession * session);
332 static void payload_type_change (GstElement * element, guint pt,
333     GstRtpBinSession * session);
334 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
335 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
336 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
337 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
338 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
339 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
340
341 /* Manages the RTP stream for one SSRC.
342  *
343  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
344  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
345  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
346  * together (see below).
347  */
348 struct _GstRtpBinStream
349 {
350   /* the SSRC of this stream */
351   guint32 ssrc;
352
353   /* parent bin */
354   GstRtpBin *bin;
355
356   /* the session this SSRC belongs to */
357   GstRtpBinSession *session;
358
359   /* the jitterbuffer of the SSRC */
360   GstElement *buffer;
361   gulong buffer_handlesync_sig;
362   gulong buffer_ptreq_sig;
363   gulong buffer_ntpstop_sig;
364   gint percent;
365
366   /* the PT demuxer of the SSRC */
367   GstElement *demux;
368   gulong demux_newpad_sig;
369   gulong demux_padremoved_sig;
370   gulong demux_ptreq_sig;
371   gulong demux_ptchange_sig;
372
373   /* if we have calculated a valid rt_delta for this stream */
374   gboolean have_sync;
375   /* mapping to local RTP and NTP time */
376   gint64 rt_delta;
377   gint64 rtp_delta;
378   /* base rtptime in gst time */
379   gint64 clock_base;
380 };
381
382 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
383 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
384
385 /* Manages the receiving end of the packets.
386  *
387  * There is one such structure for each RTP session (audio/video/...).
388  * We get the RTP/RTCP packets and stuff them into the session manager. From
389  * there they are pushed into an SSRC demuxer that splits the stream based on
390  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
391  * the GstRtpBinStream above).
392  */
393 struct _GstRtpBinSession
394 {
395   /* session id */
396   gint id;
397   /* the parent bin */
398   GstRtpBin *bin;
399   /* the session element */
400   GstElement *session;
401   /* the SSRC demuxer */
402   GstElement *demux;
403   gulong demux_newpad_sig;
404   gulong demux_padremoved_sig;
405
406   GMutex lock;
407
408   /* list of GstRtpBinStream */
409   GSList *streams;
410
411   /* list of encoders */
412   GSList *encoders;
413
414   /* list of decoders */
415   GSList *decoders;
416
417   /* mapping of payload type to caps */
418   GHashTable *ptmap;
419
420   /* the pads of the session */
421   GstPad *recv_rtp_sink;
422   GstPad *recv_rtp_sink_ghost;
423   GstPad *recv_rtp_src;
424   GstPad *recv_rtcp_sink;
425   GstPad *recv_rtcp_sink_ghost;
426   GstPad *sync_src;
427   GstPad *send_rtp_sink;
428   GstPad *send_rtp_sink_ghost;
429   GstPad *send_rtp_src;
430   GstPad *send_rtp_src_ghost;
431   GstPad *send_rtcp_src;
432   GstPad *send_rtcp_src_ghost;
433 };
434
435 /* Manages the RTP streams that come from one client and should therefore be
436  * synchronized.
437  */
438 struct _GstRtpBinClient
439 {
440   /* the common CNAME for the streams */
441   gchar *cname;
442   guint cname_len;
443
444   /* the streams */
445   guint nstreams;
446   GSList *streams;
447 };
448
449 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
450 static GstRtpBinSession *
451 find_session_by_id (GstRtpBin * rtpbin, gint id)
452 {
453   GSList *walk;
454
455   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
456     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
457
458     if (sess->id == id)
459       return sess;
460   }
461   return NULL;
462 }
463
464 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
465 static GstRtpBinSession *
466 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
467 {
468   GSList *walk;
469
470   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
471     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
472
473     if ((sess->recv_rtp_sink_ghost == pad) ||
474         (sess->recv_rtcp_sink_ghost == pad) ||
475         (sess->send_rtp_sink_ghost == pad)
476         || (sess->send_rtcp_src_ghost == pad))
477       return sess;
478   }
479   return NULL;
480 }
481
482 static void
483 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
484 {
485   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
486       sess->id, ssrc);
487 }
488
489 static void
490 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
491 {
492   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
493       sess->id, ssrc);
494 }
495
496 static void
497 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
498 {
499   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
500       sess->id, ssrc);
501 }
502
503 static void
504 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
505 {
506   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
507       sess->id, ssrc);
508 }
509
510 static void
511 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
512 {
513   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
514       sess->id, ssrc);
515 }
516
517 static void
518 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
519 {
520   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
521       sess->id, ssrc);
522 }
523
524 static void
525 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
526 {
527   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
528       sess->id, ssrc);
529
530   if (sess->bin->priv->autoremove)
531     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
532 }
533
534 static void
535 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
536 {
537   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
538       sess->id, ssrc);
539
540   if (sess->bin->priv->autoremove)
541     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
542 }
543
544 static void
545 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
546 {
547   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
548       sess->id, ssrc);
549 }
550
551 static void
552 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
553 {
554   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
555       stream->session->id, stream->ssrc);
556 }
557
558 /* must be called with the SESSION lock */
559 static GstRtpBinStream *
560 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
561 {
562   GSList *walk;
563
564   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
565     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
566
567     if (stream->ssrc == ssrc)
568       return stream;
569   }
570   return NULL;
571 }
572
573 static void
574 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
575     GstRtpBinSession * session)
576 {
577   GstRtpBinStream *stream = NULL;
578   GstRtpBin *rtpbin;
579
580   rtpbin = session->bin;
581
582   GST_RTP_BIN_LOCK (rtpbin);
583
584   GST_RTP_SESSION_LOCK (session);
585   if ((stream = find_stream_by_ssrc (session, ssrc)))
586     session->streams = g_slist_remove (session->streams, stream);
587   GST_RTP_SESSION_UNLOCK (session);
588
589   if (stream)
590     free_stream (stream, rtpbin);
591
592   GST_RTP_BIN_UNLOCK (rtpbin);
593 }
594
595 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
596 static GstRtpBinSession *
597 create_session (GstRtpBin * rtpbin, gint id)
598 {
599   GstRtpBinSession *sess;
600   GstElement *session, *demux;
601   GstState target;
602
603   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
604     goto no_session;
605
606   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
607     goto no_demux;
608
609   sess = g_new0 (GstRtpBinSession, 1);
610   g_mutex_init (&sess->lock);
611   sess->id = id;
612   sess->bin = rtpbin;
613   sess->session = session;
614   sess->demux = demux;
615   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
616       (GDestroyNotify) gst_caps_unref);
617   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
618
619   /* configure SDES items */
620   GST_OBJECT_LOCK (rtpbin);
621   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
622       rtpbin->use_pipeline_clock, NULL);
623   GST_OBJECT_UNLOCK (rtpbin);
624
625   /* provide clock_rate to the session manager when needed */
626   g_signal_connect (session, "request-pt-map",
627       (GCallback) pt_map_requested, sess);
628
629   g_signal_connect (sess->session, "on-new-ssrc",
630       (GCallback) on_new_ssrc, sess);
631   g_signal_connect (sess->session, "on-ssrc-collision",
632       (GCallback) on_ssrc_collision, sess);
633   g_signal_connect (sess->session, "on-ssrc-validated",
634       (GCallback) on_ssrc_validated, sess);
635   g_signal_connect (sess->session, "on-ssrc-active",
636       (GCallback) on_ssrc_active, sess);
637   g_signal_connect (sess->session, "on-ssrc-sdes",
638       (GCallback) on_ssrc_sdes, sess);
639   g_signal_connect (sess->session, "on-bye-ssrc",
640       (GCallback) on_bye_ssrc, sess);
641   g_signal_connect (sess->session, "on-bye-timeout",
642       (GCallback) on_bye_timeout, sess);
643   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
644   g_signal_connect (sess->session, "on-sender-timeout",
645       (GCallback) on_sender_timeout, sess);
646
647   gst_bin_add (GST_BIN_CAST (rtpbin), session);
648   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
649
650   GST_OBJECT_LOCK (rtpbin);
651   target = GST_STATE_TARGET (rtpbin);
652   GST_OBJECT_UNLOCK (rtpbin);
653
654   /* change state only to what's needed */
655   gst_element_set_state (demux, target);
656   gst_element_set_state (session, target);
657
658   return sess;
659
660   /* ERRORS */
661 no_session:
662   {
663     g_warning ("rtpbin: could not create rtpsession element");
664     return NULL;
665   }
666 no_demux:
667   {
668     gst_object_unref (session);
669     g_warning ("rtpbin: could not create rtpssrcdemux element");
670     return NULL;
671   }
672 }
673
674 static gboolean
675 bin_manage_element (GstRtpBin * bin, GstElement * element)
676 {
677   GstRtpBinPrivate *priv = bin->priv;
678
679   if (g_list_find (priv->elements, element)) {
680     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
681   } else {
682     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
683     if (!gst_bin_add (GST_BIN_CAST (bin), element))
684       goto add_failed;
685     if (!gst_element_sync_state_with_parent (element))
686       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
687   }
688   /* we add the element multiple times, each we need an equal number of
689    * removes to really remove the element from the bin */
690   priv->elements = g_list_prepend (priv->elements, element);
691
692   return TRUE;
693
694   /* ERRORS */
695 add_failed:
696   {
697     GST_WARNING_OBJECT (bin, "unable to add element");
698     return FALSE;
699   }
700 }
701
702 static void
703 remove_bin_element (GstElement * element, GstRtpBin * bin)
704 {
705   GstRtpBinPrivate *priv = bin->priv;
706   GList *find;
707
708   find = g_list_find (priv->elements, element);
709   if (find) {
710     priv->elements = g_list_delete_link (priv->elements, find);
711
712     if (!g_list_find (priv->elements, element))
713       gst_bin_remove (GST_BIN_CAST (bin), element);
714     else
715       gst_object_unref (element);
716   }
717 }
718
719 /* called with RTP_BIN_LOCK */
720 static void
721 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
722 {
723   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
724
725   gst_element_set_locked_state (sess->demux, TRUE);
726   gst_element_set_locked_state (sess->session, TRUE);
727
728   gst_element_set_state (sess->demux, GST_STATE_NULL);
729   gst_element_set_state (sess->session, GST_STATE_NULL);
730
731   remove_recv_rtp (bin, sess);
732   remove_recv_rtcp (bin, sess);
733   remove_send_rtp (bin, sess);
734   remove_rtcp (bin, sess);
735
736   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
737   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
738
739   g_slist_foreach (sess->encoders, (GFunc) remove_bin_element, bin);
740   g_slist_free (sess->encoders);
741
742   g_slist_foreach (sess->decoders, (GFunc) remove_bin_element, bin);
743   g_slist_free (sess->decoders);
744
745   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
746   g_slist_free (sess->streams);
747
748   g_mutex_clear (&sess->lock);
749   g_hash_table_destroy (sess->ptmap);
750
751   g_free (sess);
752 }
753
754 /* get the payload type caps for the specific payload @pt in @session */
755 static GstCaps *
756 get_pt_map (GstRtpBinSession * session, guint pt)
757 {
758   GstCaps *caps = NULL;
759   GstRtpBin *bin;
760   GValue ret = { 0 };
761   GValue args[3] = { {0}, {0}, {0} };
762
763   GST_DEBUG ("searching pt %d in cache", pt);
764
765   GST_RTP_SESSION_LOCK (session);
766
767   /* first look in the cache */
768   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
769   if (caps) {
770     gst_caps_ref (caps);
771     goto done;
772   }
773
774   bin = session->bin;
775
776   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
777
778   /* not in cache, send signal to request caps */
779   g_value_init (&args[0], GST_TYPE_ELEMENT);
780   g_value_set_object (&args[0], bin);
781   g_value_init (&args[1], G_TYPE_UINT);
782   g_value_set_uint (&args[1], session->id);
783   g_value_init (&args[2], G_TYPE_UINT);
784   g_value_set_uint (&args[2], pt);
785
786   g_value_init (&ret, GST_TYPE_CAPS);
787   g_value_set_boxed (&ret, NULL);
788
789   GST_RTP_SESSION_UNLOCK (session);
790
791   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
792
793   GST_RTP_SESSION_LOCK (session);
794
795   g_value_unset (&args[0]);
796   g_value_unset (&args[1]);
797   g_value_unset (&args[2]);
798
799   /* look in the cache again because we let the lock go */
800   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
801   if (caps) {
802     gst_caps_ref (caps);
803     g_value_unset (&ret);
804     goto done;
805   }
806
807   caps = (GstCaps *) g_value_dup_boxed (&ret);
808   g_value_unset (&ret);
809   if (!caps)
810     goto no_caps;
811
812   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
813
814   /* store in cache, take additional ref */
815   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
816       gst_caps_ref (caps));
817
818 done:
819   GST_RTP_SESSION_UNLOCK (session);
820
821   return caps;
822
823   /* ERRORS */
824 no_caps:
825   {
826     GST_RTP_SESSION_UNLOCK (session);
827     GST_DEBUG ("no pt map could be obtained");
828     return NULL;
829   }
830 }
831
832 static gboolean
833 return_true (gpointer key, gpointer value, gpointer user_data)
834 {
835   return TRUE;
836 }
837
838 static void
839 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
840 {
841   GSList *clients, *streams;
842
843   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
844
845   GST_RTP_BIN_LOCK (rtpbin);
846   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
847     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
848
849     /* reset sync on all streams for this client */
850     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
851       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
852
853       /* make use require a new SR packet for this stream before we attempt new
854        * lip-sync */
855       stream->have_sync = FALSE;
856       stream->rt_delta = 0;
857       stream->rtp_delta = 0;
858       stream->clock_base = -100 * GST_SECOND;
859     }
860   }
861   GST_RTP_BIN_UNLOCK (rtpbin);
862 }
863
864 static void
865 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
866 {
867   GSList *sessions, *streams;
868
869   GST_RTP_BIN_LOCK (bin);
870   GST_DEBUG_OBJECT (bin, "clearing pt map");
871   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
872     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
873
874     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
875     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
876
877     GST_RTP_SESSION_LOCK (session);
878     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
879
880     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
881       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
882
883       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
884       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
885       if (stream->demux)
886         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
887     }
888     GST_RTP_SESSION_UNLOCK (session);
889   }
890   GST_RTP_BIN_UNLOCK (bin);
891
892   /* reset sync too */
893   gst_rtp_bin_reset_sync (bin);
894 }
895
896 static RTPSession *
897 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
898 {
899   RTPSession *internal_session = NULL;
900   GstRtpBinSession *session;
901
902   GST_RTP_BIN_LOCK (bin);
903   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
904       session_id);
905   session = find_session_by_id (bin, (gint) session_id);
906   if (session) {
907     g_object_get (session->session, "internal-session", &internal_session,
908         NULL);
909   }
910   GST_RTP_BIN_UNLOCK (bin);
911
912   return internal_session;
913 }
914
915 static GstElement *
916 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
917 {
918   GST_DEBUG_OBJECT (bin, "return NULL encoder");
919   return NULL;
920 }
921
922 static GstElement *
923 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
924 {
925   GST_DEBUG_OBJECT (bin, "return NULL decoder");
926   return NULL;
927 }
928
929 static void
930 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
931     const gchar * name, const GValue * value)
932 {
933   GSList *sessions, *streams;
934
935   GST_RTP_BIN_LOCK (bin);
936   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
937     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
938
939     GST_RTP_SESSION_LOCK (session);
940     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
941       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
942
943       g_object_set_property (G_OBJECT (stream->buffer), name, value);
944     }
945     GST_RTP_SESSION_UNLOCK (session);
946   }
947   GST_RTP_BIN_UNLOCK (bin);
948 }
949
950 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
951 static GstRtpBinClient *
952 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
953 {
954   GstRtpBinClient *result = NULL;
955   GSList *walk;
956
957   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
958     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
959
960     if (len != client->cname_len)
961       continue;
962
963     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
964       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
965           client->cname);
966       result = client;
967       break;
968     }
969   }
970
971   /* nothing found, create one */
972   if (result == NULL) {
973     result = g_new0 (GstRtpBinClient, 1);
974     result->cname = g_strndup ((gchar *) data, len);
975     result->cname_len = len;
976     bin->clients = g_slist_prepend (bin->clients, result);
977     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
978         result->cname);
979   }
980   return result;
981 }
982
983 static void
984 free_client (GstRtpBinClient * client, GstRtpBin * bin)
985 {
986   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
987   g_slist_free (client->streams);
988   g_free (client->cname);
989   g_free (client);
990 }
991
992 static void
993 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
994     guint64 * ntpnstime)
995 {
996   guint64 ntpns;
997   GstClock *clock;
998   GstClockTime base_time, rt, clock_time;
999
1000   GST_OBJECT_LOCK (bin);
1001   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1002     base_time = GST_ELEMENT_CAST (bin)->base_time;
1003     gst_object_ref (clock);
1004     GST_OBJECT_UNLOCK (bin);
1005
1006     clock_time = gst_clock_get_time (clock);
1007
1008     if (bin->use_pipeline_clock) {
1009       ntpns = clock_time - base_time;
1010     } else {
1011       GTimeVal current;
1012
1013       /* get current NTP time */
1014       g_get_current_time (&current);
1015       ntpns = GST_TIMEVAL_TO_TIME (current);
1016     }
1017
1018     /* add constant to convert from 1970 based time to 1900 based time */
1019     ntpns += (2208988800LL * GST_SECOND);
1020
1021     /* get current clock time and convert to running time */
1022     rt = clock_time - base_time;
1023
1024     gst_object_unref (clock);
1025   } else {
1026     GST_OBJECT_UNLOCK (bin);
1027     rt = -1;
1028     ntpns = -1;
1029   }
1030   if (running_time)
1031     *running_time = rt;
1032   if (ntpnstime)
1033     *ntpnstime = ntpns;
1034 }
1035
1036 static void
1037 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1038     gint64 ts_offset, gboolean check)
1039 {
1040   gint64 prev_ts_offset;
1041
1042   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1043
1044   /* delta changed, see how much */
1045   if (prev_ts_offset != ts_offset) {
1046     gint64 diff;
1047
1048     diff = prev_ts_offset - ts_offset;
1049
1050     GST_DEBUG_OBJECT (bin,
1051         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1052         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1053
1054     if (check) {
1055       /* only change diff when it changed more than 4 milliseconds. This
1056        * compensates for rounding errors in NTP to RTP timestamp
1057        * conversions */
1058       if (ABS (diff) < 4 * GST_MSECOND) {
1059         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1060         return;
1061       }
1062       if (ABS (diff) > (3 * GST_SECOND)) {
1063         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
1064         return;
1065       }
1066     }
1067     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
1068   }
1069   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1070       stream->ssrc, ts_offset);
1071 }
1072
1073 static void
1074 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1075 {
1076   if (stream->bin->send_sync_event) {
1077     GstEvent *event;
1078     GstPad *srcpad;
1079
1080     GST_DEBUG_OBJECT (stream->bin,
1081         "sending GstRTCPSRReceived event downstream");
1082
1083     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1084         gst_structure_new_empty ("GstRTCPSRReceived"));
1085
1086     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1087     gst_pad_push_event (srcpad, event);
1088     gst_object_unref (srcpad);
1089   }
1090 }
1091
1092 /* associate a stream to the given CNAME. This will make sure all streams for
1093  * that CNAME are synchronized together.
1094  * Must be called with GST_RTP_BIN_LOCK */
1095 static void
1096 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1097     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1098     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1099     gint64 rtp_clock_base)
1100 {
1101   GstRtpBinClient *client;
1102   gboolean created;
1103   GSList *walk;
1104   guint64 local_rt;
1105   guint64 local_rtp;
1106   GstClockTime running_time;
1107   guint64 ntpnstime;
1108   gint64 ntpdiff, rtdiff;
1109   guint64 last_unix;
1110
1111   /* first find or create the CNAME */
1112   client = get_client (bin, len, data, &created);
1113
1114   /* find stream in the client */
1115   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1116     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1117
1118     if (ostream == stream)
1119       break;
1120   }
1121   /* not found, add it to the list */
1122   if (walk == NULL) {
1123     GST_DEBUG_OBJECT (bin,
1124         "new association of SSRC %08x with client %p with CNAME %s",
1125         stream->ssrc, client, client->cname);
1126     client->streams = g_slist_prepend (client->streams, stream);
1127     client->nstreams++;
1128   } else {
1129     GST_DEBUG_OBJECT (bin,
1130         "found association of SSRC %08x with client %p with CNAME %s",
1131         stream->ssrc, client, client->cname);
1132   }
1133
1134   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1135     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1136     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1137       /* we don't need that data, so carry on,
1138        * but make some values look saner */
1139       last_extrtptime = base_rtptime;
1140     } else {
1141       /* nothing we can do with this data in this case */
1142       GST_DEBUG_OBJECT (bin, "bailing out");
1143       return;
1144     }
1145   }
1146
1147   /* Take the extended rtptime we found in the SR packet and map it to the
1148    * local rtptime. The local rtp time is used to construct timestamps on the
1149    * buffers so we will calculate what running_time corresponds to the RTP
1150    * timestamp in the SR packet. */
1151   local_rtp = last_extrtptime - base_rtptime;
1152
1153   GST_DEBUG_OBJECT (bin,
1154       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1155       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1156       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1157       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1158
1159   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1160    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1161    * into a corresponding gstreamer timestamp. Note that the base_time also
1162    * contains the drift between sender and receiver. */
1163   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1164   local_rt += base_time;
1165
1166   /* convert ntptime to unix time since 1900 */
1167   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1168       (G_GINT64_CONSTANT (1) << 32));
1169
1170   stream->have_sync = TRUE;
1171
1172   GST_DEBUG_OBJECT (bin,
1173       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1174       local_rt, last_unix);
1175
1176   /* recalc inter stream playout offset, but only if there is more than one
1177    * stream or we're doing NTP sync. */
1178   if (bin->ntp_sync) {
1179     /* For NTP sync we need to first get a snapshot of running_time and NTP
1180      * time. We know at what running_time we play a certain RTP time, we also
1181      * calculated when we would play the RTP time in the SR packet. Now we need
1182      * to know how the running_time and the NTP time relate to eachother. */
1183     get_current_times (bin, &running_time, &ntpnstime);
1184
1185     /* see how far away the NTP time is. This is the difference between the
1186      * current NTP time and the NTP time in the last SR packet. */
1187     ntpdiff = ntpnstime - last_unix;
1188     /* see how far away the running_time is. This is the difference between the
1189      * current running_time and the running_time of the RTP timestamp in the
1190      * last SR packet. */
1191     rtdiff = running_time - local_rt;
1192
1193     GST_DEBUG_OBJECT (bin,
1194         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1195         ntpnstime, last_unix);
1196     GST_DEBUG_OBJECT (bin,
1197         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1198         rtdiff);
1199
1200     /* combine to get the final diff to apply to the running_time */
1201     stream->rt_delta = rtdiff - ntpdiff;
1202
1203     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1204   } else {
1205     gint64 min, rtp_min, clock_base = stream->clock_base;
1206     gboolean all_sync, use_rtp;
1207     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1208
1209     /* calculate delta between server and receiver. last_unix is created by
1210      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1211      * delta expresses the difference to our timeline and the server timeline. The
1212      * difference in itself doesn't mean much but we can combine the delta of
1213      * multiple streams to create a stream specific offset. */
1214     stream->rt_delta = last_unix - local_rt;
1215
1216     /* calculate the min of all deltas, ignoring streams that did not yet have a
1217      * valid rt_delta because we did not yet receive an SR packet for those
1218      * streams.
1219      * We calculate the mininum because we would like to only apply positive
1220      * offsets to streams, delaying their playback instead of trying to speed up
1221      * other streams (which might be imposible when we have to create negative
1222      * latencies).
1223      * The stream that has the smallest diff is selected as the reference stream,
1224      * all other streams will have a positive offset to this difference. */
1225
1226     /* some alternative setting allow ignoring RTCP as much as possible,
1227      * for servers generating bogus ntp timeline */
1228     min = rtp_min = G_MAXINT64;
1229     use_rtp = FALSE;
1230     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1231       guint64 ext_base;
1232
1233       use_rtp = TRUE;
1234       /* signed version for convienience */
1235       clock_base = base_rtptime;
1236       /* deal with possible wrap-around */
1237       ext_base = base_rtptime;
1238       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1239       /* sanity check; base rtp and provided clock_base should be close */
1240       if (rtp_clock_base >= clock_base) {
1241         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1242           rtp_clock_base = base_time +
1243               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1244               GST_SECOND, clock_rate);
1245         } else {
1246           use_rtp = FALSE;
1247         }
1248       } else {
1249         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1250           rtp_clock_base = base_time -
1251               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1252               GST_SECOND, clock_rate);
1253         } else {
1254           use_rtp = FALSE;
1255         }
1256       }
1257       /* warn and bail for clarity out if no sane values */
1258       if (!use_rtp) {
1259         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1260         return;
1261       }
1262       /* store to track changes */
1263       clock_base = rtp_clock_base;
1264       /* generate a fake as before,
1265        * now equating rtptime obtained from RTP-Info,
1266        * where the large time represent the otherwise irrelevant npt/ntp time */
1267       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1268     } else {
1269       clock_base = rtp_clock_base;
1270     }
1271
1272     all_sync = TRUE;
1273     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1274       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1275
1276       if (!ostream->have_sync) {
1277         all_sync = FALSE;
1278         continue;
1279       }
1280
1281       /* change in current stream's base from previously init'ed value
1282        * leads to reset of all stream's base */
1283       if (stream != ostream && stream->clock_base >= 0 &&
1284           (stream->clock_base != clock_base)) {
1285         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1286         ostream->clock_base = -100 * GST_SECOND;
1287         ostream->rtp_delta = 0;
1288       }
1289
1290       if (ostream->rt_delta < min)
1291         min = ostream->rt_delta;
1292       if (ostream->rtp_delta < rtp_min)
1293         rtp_min = ostream->rtp_delta;
1294     }
1295
1296     /* arrange to re-sync for each stream upon significant change,
1297      * e.g. post-seek */
1298     all_sync = all_sync && (stream->clock_base == clock_base);
1299     stream->clock_base = clock_base;
1300
1301     /* may need init performed above later on, but nothing more to do now */
1302     if (client->nstreams <= 1)
1303       return;
1304
1305     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1306         " all sync %d", client, min, all_sync);
1307     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1308
1309     switch (rtcp_sync) {
1310       case GST_RTP_BIN_RTCP_SYNC_RTP:
1311         if (!use_rtp)
1312           break;
1313         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1314             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1315         /* fall-through */
1316       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1317         /* if all have been synced already, do not bother further */
1318         if (all_sync) {
1319           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1320           return;
1321         }
1322         break;
1323       default:
1324         break;
1325     }
1326
1327     /* bail out if we adjusted recently enough */
1328     if (all_sync && (last_unix - bin->priv->last_unix) <
1329         bin->rtcp_sync_interval * GST_MSECOND) {
1330       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1331           "previous sender info too recent "
1332           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1333       return;
1334     }
1335     bin->priv->last_unix = last_unix;
1336
1337     /* calculate offsets for each stream */
1338     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1339       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1340       gint64 ts_offset;
1341
1342       /* ignore streams for which we didn't receive an SR packet yet, we
1343        * can't synchronize them yet. We can however sync other streams just
1344        * fine. */
1345       if (!ostream->have_sync)
1346         continue;
1347
1348       /* calculate offset to our reference stream, this should always give a
1349        * positive number. */
1350       if (use_rtp)
1351         ts_offset = ostream->rtp_delta - rtp_min;
1352       else
1353         ts_offset = ostream->rt_delta - min;
1354
1355       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1356     }
1357   }
1358   gst_rtp_bin_send_sync_event (stream);
1359
1360   return;
1361 }
1362
1363 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1364   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1365           (b) = gst_rtcp_packet_move_to_next ((packet)))
1366
1367 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1368   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1369           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1370
1371 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1372   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1373           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1374
1375 static void
1376 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1377     GstRtpBinStream * stream)
1378 {
1379   GstRtpBin *bin;
1380   GstRTCPPacket packet;
1381   guint32 ssrc;
1382   guint64 ntptime;
1383   gboolean have_sr, have_sdes;
1384   gboolean more;
1385   guint64 base_rtptime;
1386   guint64 base_time;
1387   guint clock_rate;
1388   guint64 clock_base;
1389   guint64 extrtptime;
1390   GstBuffer *buffer;
1391   GstRTCPBuffer rtcp = { NULL, };
1392
1393   bin = stream->bin;
1394
1395   GST_DEBUG_OBJECT (bin, "sync handler called");
1396
1397   /* get the last relation between the rtp timestamps and the gstreamer
1398    * timestamps. We get this info directly from the jitterbuffer which
1399    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1400    * what the current situation is. */
1401   base_rtptime =
1402       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1403   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1404   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1405   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1406   extrtptime =
1407       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1408   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1409
1410   have_sr = FALSE;
1411   have_sdes = FALSE;
1412
1413   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1414
1415   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1416     /* first packet must be SR or RR or else the validate would have failed */
1417     switch (gst_rtcp_packet_get_type (&packet)) {
1418       case GST_RTCP_TYPE_SR:
1419         /* only parse first. There is only supposed to be one SR in the packet
1420          * but we will deal with malformed packets gracefully */
1421         if (have_sr)
1422           break;
1423         /* get NTP and RTP times */
1424         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1425             NULL, NULL);
1426
1427         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1428         /* ignore SR that is not ours */
1429         if (ssrc != stream->ssrc)
1430           continue;
1431
1432         have_sr = TRUE;
1433         break;
1434       case GST_RTCP_TYPE_SDES:
1435       {
1436         gboolean more_items, more_entries;
1437
1438         /* only deal with first SDES, there is only supposed to be one SDES in
1439          * the RTCP packet but we deal with bad packets gracefully. Also bail
1440          * out if we have not seen an SR item yet. */
1441         if (have_sdes || !have_sr)
1442           break;
1443
1444         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1445           /* skip items that are not about the SSRC of the sender */
1446           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1447             continue;
1448
1449           /* find the CNAME entry */
1450           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1451             GstRTCPSDESType type;
1452             guint8 len;
1453             guint8 *data;
1454
1455             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1456
1457             if (type == GST_RTCP_SDES_CNAME) {
1458               GST_RTP_BIN_LOCK (bin);
1459               /* associate the stream to CNAME */
1460               gst_rtp_bin_associate (bin, stream, len, data,
1461                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1462                   clock_base);
1463               GST_RTP_BIN_UNLOCK (bin);
1464             }
1465           }
1466         }
1467         have_sdes = TRUE;
1468         break;
1469       }
1470       default:
1471         /* we can ignore these packets */
1472         break;
1473     }
1474   }
1475   gst_rtcp_buffer_unmap (&rtcp);
1476 }
1477
1478 /* create a new stream with @ssrc in @session. Must be called with
1479  * RTP_SESSION_LOCK. */
1480 static GstRtpBinStream *
1481 create_stream (GstRtpBinSession * session, guint32 ssrc)
1482 {
1483   GstElement *buffer, *demux = NULL;
1484   GstRtpBinStream *stream;
1485   GstRtpBin *rtpbin;
1486   GstState target;
1487
1488   rtpbin = session->bin;
1489
1490   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1491     goto no_jitterbuffer;
1492
1493   if (!rtpbin->ignore_pt)
1494     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1495       goto no_demux;
1496
1497   stream = g_new0 (GstRtpBinStream, 1);
1498   stream->ssrc = ssrc;
1499   stream->bin = rtpbin;
1500   stream->session = session;
1501   stream->buffer = buffer;
1502   stream->demux = demux;
1503
1504   stream->have_sync = FALSE;
1505   stream->rt_delta = 0;
1506   stream->rtp_delta = 0;
1507   stream->percent = 100;
1508   stream->clock_base = -100 * GST_SECOND;
1509   session->streams = g_slist_prepend (session->streams, stream);
1510
1511   /* provide clock_rate to the jitterbuffer when needed */
1512   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1513       (GCallback) pt_map_requested, session);
1514   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1515       (GCallback) on_npt_stop, stream);
1516
1517   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1518   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1519
1520   /* configure latency and packet lost */
1521   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1522   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1523   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1524   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1525   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1526
1527   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1528       buffer, session->id, ssrc);
1529
1530   if (!rtpbin->ignore_pt)
1531     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1532   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1533
1534   /* link stuff */
1535   if (demux)
1536     gst_element_link_pads_full (buffer, "src", demux, "sink",
1537         GST_PAD_LINK_CHECK_NOTHING);
1538
1539   if (rtpbin->buffering) {
1540     guint64 last_out;
1541
1542     GST_INFO_OBJECT (rtpbin,
1543         "bin is buffering, set jitterbuffer as not active");
1544     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1545   }
1546
1547
1548   GST_OBJECT_LOCK (rtpbin);
1549   target = GST_STATE_TARGET (rtpbin);
1550   GST_OBJECT_UNLOCK (rtpbin);
1551
1552   /* from sink to source */
1553   if (demux)
1554     gst_element_set_state (demux, target);
1555
1556   gst_element_set_state (buffer, target);
1557
1558   return stream;
1559
1560   /* ERRORS */
1561 no_jitterbuffer:
1562   {
1563     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1564     return NULL;
1565   }
1566 no_demux:
1567   {
1568     gst_object_unref (buffer);
1569     g_warning ("rtpbin: could not create rtpptdemux element");
1570     return NULL;
1571   }
1572 }
1573
1574 /* called with RTP_BIN_LOCK */
1575 static void
1576 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1577 {
1578   GSList *clients, *next_client;
1579
1580   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1581
1582   if (stream->demux) {
1583     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1584     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1585     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1586   }
1587   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1588   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1589   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1590
1591   if (stream->demux)
1592     gst_element_set_locked_state (stream->demux, TRUE);
1593   gst_element_set_locked_state (stream->buffer, TRUE);
1594
1595   if (stream->demux)
1596     gst_element_set_state (stream->demux, GST_STATE_NULL);
1597   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1598
1599   /* now remove this signal, we need this while going to NULL because it to
1600    * do some cleanups */
1601   if (stream->demux)
1602     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1603
1604   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1605   if (stream->demux)
1606     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1607
1608   for (clients = bin->clients; clients; clients = next_client) {
1609     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1610     GSList *streams, *next_stream;
1611
1612     next_client = g_slist_next (clients);
1613
1614     for (streams = client->streams; streams; streams = next_stream) {
1615       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1616
1617       next_stream = g_slist_next (streams);
1618
1619       if (ostream == stream) {
1620         client->streams = g_slist_delete_link (client->streams, streams);
1621         /* If this was the last stream belonging to this client,
1622          * clean up the client. */
1623         if (--client->nstreams == 0) {
1624           bin->clients = g_slist_delete_link (bin->clients, clients);
1625           free_client (client, bin);
1626           break;
1627         }
1628       }
1629     }
1630   }
1631   g_free (stream);
1632 }
1633
1634 /* GObject vmethods */
1635 static void gst_rtp_bin_dispose (GObject * object);
1636 static void gst_rtp_bin_finalize (GObject * object);
1637 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1638     const GValue * value, GParamSpec * pspec);
1639 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1640     GValue * value, GParamSpec * pspec);
1641
1642 /* GstElement vmethods */
1643 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1644     GstStateChange transition);
1645 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1646     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1647 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1648 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1649
1650 #define gst_rtp_bin_parent_class parent_class
1651 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1652
1653 static gboolean
1654 _gst_element_accumulator (GSignalInvocationHint * ihint,
1655     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1656 {
1657   GstElement *element;
1658
1659   element = g_value_get_object (handler_return);
1660   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
1661
1662   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1663     g_value_set_object (return_accu, element);
1664
1665   /* stop emission if we have an element */
1666   return (element == NULL);
1667 }
1668
1669 static void
1670 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1671 {
1672   GObjectClass *gobject_class;
1673   GstElementClass *gstelement_class;
1674   GstBinClass *gstbin_class;
1675
1676   gobject_class = (GObjectClass *) klass;
1677   gstelement_class = (GstElementClass *) klass;
1678   gstbin_class = (GstBinClass *) klass;
1679
1680   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1681
1682   gobject_class->dispose = gst_rtp_bin_dispose;
1683   gobject_class->finalize = gst_rtp_bin_finalize;
1684   gobject_class->set_property = gst_rtp_bin_set_property;
1685   gobject_class->get_property = gst_rtp_bin_get_property;
1686
1687   g_object_class_install_property (gobject_class, PROP_LATENCY,
1688       g_param_spec_uint ("latency", "Buffer latency in ms",
1689           "Default amount of ms to buffer in the jitterbuffers", 0,
1690           G_MAXUINT, DEFAULT_LATENCY_MS,
1691           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1692
1693   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1694       g_param_spec_boolean ("drop-on-latency",
1695           "Drop buffers when maximum latency is reached",
1696           "Tells the jitterbuffer to never exceed the given latency in size",
1697           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1698
1699   /**
1700    * GstRtpBin::request-pt-map:
1701    * @rtpbin: the object which received the signal
1702    * @session: the session
1703    * @pt: the pt
1704    *
1705    * Request the payload type as #GstCaps for @pt in @session.
1706    */
1707   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1708       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1709       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1710       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 2, G_TYPE_UINT,
1711       G_TYPE_UINT);
1712
1713     /**
1714    * GstRtpBin::payload-type-change:
1715    * @rtpbin: the object which received the signal
1716    * @session: the session
1717    * @pt: the pt
1718    *
1719    * Signal that the current payload type changed to @pt in @session.
1720    */
1721   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1722       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1723       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1724       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1725       G_TYPE_UINT);
1726
1727   /**
1728    * GstRtpBin::clear-pt-map:
1729    * @rtpbin: the object which received the signal
1730    *
1731    * Clear all previously cached pt-mapping obtained with
1732    * #GstRtpBin::request-pt-map.
1733    */
1734   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1735       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1736       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1737           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1738       0, G_TYPE_NONE);
1739
1740   /**
1741    * GstRtpBin::reset-sync:
1742    * @rtpbin: the object which received the signal
1743    *
1744    * Reset all currently configured lip-sync parameters and require new SR
1745    * packets for all streams before lip-sync is attempted again.
1746    */
1747   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1748       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1749       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1750           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1751       0, G_TYPE_NONE);
1752
1753   /**
1754    * GstRtpBin::get-internal-session:
1755    * @rtpbin: the object which received the signal
1756    * @id: the session id
1757    *
1758    * Request the internal RTPSession object as #GObject in session @id.
1759    */
1760   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1761       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1762       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1763           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
1764       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1765
1766   /**
1767    * GstRtpBin::on-new-ssrc:
1768    * @rtpbin: the object which received the signal
1769    * @session: the session
1770    * @ssrc: the SSRC
1771    *
1772    * Notify of a new SSRC that entered @session.
1773    */
1774   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1775       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1776       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1777       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1778       G_TYPE_UINT);
1779   /**
1780    * GstRtpBin::on-ssrc-collision:
1781    * @rtpbin: the object which received the signal
1782    * @session: the session
1783    * @ssrc: the SSRC
1784    *
1785    * Notify when we have an SSRC collision
1786    */
1787   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1788       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1789       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1790       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1791       G_TYPE_UINT);
1792   /**
1793    * GstRtpBin::on-ssrc-validated:
1794    * @rtpbin: the object which received the signal
1795    * @session: the session
1796    * @ssrc: the SSRC
1797    *
1798    * Notify of a new SSRC that became validated.
1799    */
1800   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1801       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1802       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1803       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1804       G_TYPE_UINT);
1805   /**
1806    * GstRtpBin::on-ssrc-active:
1807    * @rtpbin: the object which received the signal
1808    * @session: the session
1809    * @ssrc: the SSRC
1810    *
1811    * Notify of a SSRC that is active, i.e., sending RTCP.
1812    */
1813   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1814       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1815       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1816       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1817       G_TYPE_UINT);
1818   /**
1819    * GstRtpBin::on-ssrc-sdes:
1820    * @rtpbin: the object which received the signal
1821    * @session: the session
1822    * @ssrc: the SSRC
1823    *
1824    * Notify of a SSRC that is active, i.e., sending RTCP.
1825    */
1826   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1827       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1828       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1829       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1830       G_TYPE_UINT);
1831
1832   /**
1833    * GstRtpBin::on-bye-ssrc:
1834    * @rtpbin: the object which received the signal
1835    * @session: the session
1836    * @ssrc: the SSRC
1837    *
1838    * Notify of an SSRC that became inactive because of a BYE packet.
1839    */
1840   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1841       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1842       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1843       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1844       G_TYPE_UINT);
1845   /**
1846    * GstRtpBin::on-bye-timeout:
1847    * @rtpbin: the object which received the signal
1848    * @session: the session
1849    * @ssrc: the SSRC
1850    *
1851    * Notify of an SSRC that has timed out because of BYE
1852    */
1853   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1854       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1855       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1856       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1857       G_TYPE_UINT);
1858   /**
1859    * GstRtpBin::on-timeout:
1860    * @rtpbin: the object which received the signal
1861    * @session: the session
1862    * @ssrc: the SSRC
1863    *
1864    * Notify of an SSRC that has timed out
1865    */
1866   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1867       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1868       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1869       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1870       G_TYPE_UINT);
1871   /**
1872    * GstRtpBin::on-sender-timeout:
1873    * @rtpbin: the object which received the signal
1874    * @session: the session
1875    * @ssrc: the SSRC
1876    *
1877    * Notify of a sender SSRC that has timed out and became a receiver
1878    */
1879   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1880       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1881       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1882       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1883       G_TYPE_UINT);
1884
1885   /**
1886    * GstRtpBin::on-npt-stop:
1887    * @rtpbin: the object which received the signal
1888    * @session: the session
1889    * @ssrc: the SSRC
1890    *
1891    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1892    */
1893   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1894       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1895       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1896       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1897       G_TYPE_UINT);
1898
1899   /**
1900    * GstRtpBin::request-rtp-encoder:
1901    * @rtpbin: the object which received the signal
1902    * @session: the session
1903    *
1904    * Request an RTP encoder element for the given @session. The encoder
1905    * element will be added to the bin if not previously added.
1906    *
1907    * If no handler is connected, no encoder will be used.
1908    *
1909    * Since: 1.4
1910    */
1911   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
1912       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
1913       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1914           request_rtp_encoder), _gst_element_accumulator, NULL,
1915       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1916
1917   /**
1918    * GstRtpBin::request-rtp-decoder:
1919    * @rtpbin: the object which received the signal
1920    * @session: the session
1921    *
1922    * Request an RTP decoder element for the given @session. The decoder
1923    * element will be added to the bin if not previously added.
1924    *
1925    * If no handler is connected, no encoder will be used.
1926    *
1927    * Since: 1.4
1928    */
1929   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
1930       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
1931       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1932           request_rtp_decoder), _gst_element_accumulator, NULL,
1933       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1934
1935   /**
1936    * GstRtpBin::request-rtcp-encoder:
1937    * @rtpbin: the object which received the signal
1938    * @session: the session
1939    *
1940    * Request an RTCP encoder element for the given @session. The encoder
1941    * element will be added to the bin if not previously added.
1942    *
1943    * If no handler is connected, no encoder will be used.
1944    *
1945    * Since: 1.4
1946    */
1947   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
1948       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
1949       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1950           request_rtcp_encoder), _gst_element_accumulator, NULL,
1951       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1952
1953   /**
1954    * GstRtpBin::request-rtcp-decoder:
1955    * @rtpbin: the object which received the signal
1956    * @session: the session
1957    *
1958    * Request an RTCP decoder element for the given @session. The decoder
1959    * element will be added to the bin if not previously added.
1960    *
1961    * If no handler is connected, no encoder will be used.
1962    *
1963    * Since: 1.4
1964    */
1965   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
1966       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
1967       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1968           request_rtcp_decoder), _gst_element_accumulator, NULL,
1969       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1970
1971   /**
1972    * GstRtpBin::new-jitterbuffer:
1973    * @rtpbin: the object which received the signal
1974    * @jitterbuffer: the new jitterbuffer
1975    * @session: the session
1976    * @ssrc: the SSRC
1977    *
1978    * Notify that a new @jitterbuffer was created for @session and @ssrc.
1979    * This signal can, for example, be used to configure @jitterbuffer.
1980    *
1981    * Since: 1.4
1982    */
1983   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
1984       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
1985       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1986           new_jitterbuffer), NULL, NULL, g_cclosure_marshal_generic,
1987       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
1988
1989   g_object_class_install_property (gobject_class, PROP_SDES,
1990       g_param_spec_boxed ("sdes", "SDES",
1991           "The SDES items of this session",
1992           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1993
1994   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1995       g_param_spec_boolean ("do-lost", "Do Lost",
1996           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1997           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1998
1999   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2000       g_param_spec_boolean ("autoremove", "Auto Remove",
2001           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2002           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2003
2004   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2005       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2006           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2007           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2008
2009   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2010       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2011           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
2012           DEFAULT_USE_PIPELINE_CLOCK,
2013           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2014   /**
2015    * GstRtpBin:buffer-mode:
2016    *
2017    * Control the buffering and timestamping mode used by the jitterbuffer.
2018    */
2019   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2020       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2021           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2022           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2023   /**
2024    * GstRtpBin:ntp-sync:
2025    *
2026    * Set the NTP time from the sender reports as the running-time on the
2027    * buffers. When both the sender and receiver have sychronized
2028    * running-time, i.e. when the clock and base-time is shared
2029    * between the receivers and the and the senders, this option can be
2030    * used to synchronize receivers on multiple machines.
2031    */
2032   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2033       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2034           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2035           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2036
2037   /**
2038    * GstRtpBin:rtcp-sync:
2039    *
2040    * If not synchronizing (directly) to the NTP clock, determines how to sync
2041    * the various streams.
2042    */
2043   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2044       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2045           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2046           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2047
2048   /**
2049    * GstRtpBin:rtcp-sync-interval:
2050    *
2051    * Determines how often to sync streams using RTCP data.
2052    */
2053   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2054       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2055           "RTCP SR interval synchronization (ms) (0 = always)",
2056           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2057           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2058
2059   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2060       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2061           "Send event downstream when a stream is synchronized to the sender",
2062           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2063
2064   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2065       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2066           "Send an event downstream to request packet retransmission",
2067           DEFAULT_DO_RETRANSMISSION,
2068           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2069
2070   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2071   gstelement_class->request_new_pad =
2072       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2073   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2074
2075   /* sink pads */
2076   gst_element_class_add_pad_template (gstelement_class,
2077       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
2078   gst_element_class_add_pad_template (gstelement_class,
2079       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
2080   gst_element_class_add_pad_template (gstelement_class,
2081       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
2082
2083   /* src pads */
2084   gst_element_class_add_pad_template (gstelement_class,
2085       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
2086   gst_element_class_add_pad_template (gstelement_class,
2087       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
2088   gst_element_class_add_pad_template (gstelement_class,
2089       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
2090
2091   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2092       "Filter/Network/RTP",
2093       "Real-Time Transport Protocol bin",
2094       "Wim Taymans <wim.taymans@gmail.com>");
2095
2096   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2097
2098   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2099   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2100   klass->get_internal_session =
2101       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2102   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2103   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2104   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2105   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2106
2107   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2108 }
2109
2110 static void
2111 gst_rtp_bin_init (GstRtpBin * rtpbin)
2112 {
2113   gchar *cname;
2114
2115   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
2116   g_mutex_init (&rtpbin->priv->bin_lock);
2117   g_mutex_init (&rtpbin->priv->dyn_lock);
2118
2119   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2120   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2121   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2122   rtpbin->do_lost = DEFAULT_DO_LOST;
2123   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2124   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2125   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2126   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2127   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2128   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2129   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2130   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2131   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2132
2133   /* some default SDES entries */
2134   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2135   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2136       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2137   g_free (cname);
2138 }
2139
2140 static void
2141 gst_rtp_bin_dispose (GObject * object)
2142 {
2143   GstRtpBin *rtpbin;
2144
2145   rtpbin = GST_RTP_BIN (object);
2146
2147   GST_RTP_BIN_LOCK (rtpbin);
2148   GST_DEBUG_OBJECT (object, "freeing sessions");
2149   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2150   g_slist_free (rtpbin->sessions);
2151   rtpbin->sessions = NULL;
2152   GST_RTP_BIN_UNLOCK (rtpbin);
2153
2154   G_OBJECT_CLASS (parent_class)->dispose (object);
2155 }
2156
2157 static void
2158 gst_rtp_bin_finalize (GObject * object)
2159 {
2160   GstRtpBin *rtpbin;
2161
2162   rtpbin = GST_RTP_BIN (object);
2163
2164   if (rtpbin->sdes)
2165     gst_structure_free (rtpbin->sdes);
2166
2167   g_mutex_clear (&rtpbin->priv->bin_lock);
2168   g_mutex_clear (&rtpbin->priv->dyn_lock);
2169
2170   G_OBJECT_CLASS (parent_class)->finalize (object);
2171 }
2172
2173
2174 static void
2175 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
2176 {
2177   GSList *item;
2178
2179   if (sdes == NULL)
2180     return;
2181
2182   GST_RTP_BIN_LOCK (bin);
2183
2184   GST_OBJECT_LOCK (bin);
2185   if (bin->sdes)
2186     gst_structure_free (bin->sdes);
2187   bin->sdes = gst_structure_copy (sdes);
2188   GST_OBJECT_UNLOCK (bin);
2189
2190   /* store in all sessions */
2191   for (item = bin->sessions; item; item = g_slist_next (item)) {
2192     GstRtpBinSession *session = item->data;
2193     g_object_set (session->session, "sdes", sdes, NULL);
2194   }
2195
2196   GST_RTP_BIN_UNLOCK (bin);
2197 }
2198
2199 static GstStructure *
2200 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2201 {
2202   GstStructure *result;
2203
2204   GST_OBJECT_LOCK (bin);
2205   result = gst_structure_copy (bin->sdes);
2206   GST_OBJECT_UNLOCK (bin);
2207
2208   return result;
2209 }
2210
2211 static void
2212 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2213     const GValue * value, GParamSpec * pspec)
2214 {
2215   GstRtpBin *rtpbin;
2216
2217   rtpbin = GST_RTP_BIN (object);
2218
2219   switch (prop_id) {
2220     case PROP_LATENCY:
2221       GST_RTP_BIN_LOCK (rtpbin);
2222       rtpbin->latency_ms = g_value_get_uint (value);
2223       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2224       GST_RTP_BIN_UNLOCK (rtpbin);
2225       /* propagate the property down to the jitterbuffer */
2226       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2227       break;
2228     case PROP_DROP_ON_LATENCY:
2229       GST_RTP_BIN_LOCK (rtpbin);
2230       rtpbin->drop_on_latency = g_value_get_boolean (value);
2231       GST_RTP_BIN_UNLOCK (rtpbin);
2232       /* propagate the property down to the jitterbuffer */
2233       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2234           "drop-on-latency", value);
2235       break;
2236     case PROP_SDES:
2237       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2238       break;
2239     case PROP_DO_LOST:
2240       GST_RTP_BIN_LOCK (rtpbin);
2241       rtpbin->do_lost = g_value_get_boolean (value);
2242       GST_RTP_BIN_UNLOCK (rtpbin);
2243       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2244       break;
2245     case PROP_NTP_SYNC:
2246       rtpbin->ntp_sync = g_value_get_boolean (value);
2247       break;
2248     case PROP_RTCP_SYNC:
2249       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2250       break;
2251     case PROP_RTCP_SYNC_INTERVAL:
2252       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2253       break;
2254     case PROP_IGNORE_PT:
2255       rtpbin->ignore_pt = g_value_get_boolean (value);
2256       break;
2257     case PROP_AUTOREMOVE:
2258       rtpbin->priv->autoremove = g_value_get_boolean (value);
2259       break;
2260     case PROP_USE_PIPELINE_CLOCK:
2261     {
2262       GSList *sessions;
2263       GST_RTP_BIN_LOCK (rtpbin);
2264       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2265       for (sessions = rtpbin->sessions; sessions;
2266           sessions = g_slist_next (sessions)) {
2267         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2268
2269         g_object_set (G_OBJECT (session->session),
2270             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2271       }
2272       GST_RTP_BIN_UNLOCK (rtpbin);
2273     }
2274       break;
2275     case PROP_DO_SYNC_EVENT:
2276       rtpbin->send_sync_event = g_value_get_boolean (value);
2277       break;
2278     case PROP_BUFFER_MODE:
2279       GST_RTP_BIN_LOCK (rtpbin);
2280       rtpbin->buffer_mode = g_value_get_enum (value);
2281       GST_RTP_BIN_UNLOCK (rtpbin);
2282       /* propagate the property down to the jitterbuffer */
2283       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2284       break;
2285     case PROP_DO_RETRANSMISSION:
2286       GST_RTP_BIN_LOCK (rtpbin);
2287       rtpbin->do_retransmission = g_value_get_boolean (value);
2288       GST_RTP_BIN_UNLOCK (rtpbin);
2289       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2290           "do-retransmission", value);
2291       break;
2292     default:
2293       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2294       break;
2295   }
2296 }
2297
2298 static void
2299 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2300     GValue * value, GParamSpec * pspec)
2301 {
2302   GstRtpBin *rtpbin;
2303
2304   rtpbin = GST_RTP_BIN (object);
2305
2306   switch (prop_id) {
2307     case PROP_LATENCY:
2308       GST_RTP_BIN_LOCK (rtpbin);
2309       g_value_set_uint (value, rtpbin->latency_ms);
2310       GST_RTP_BIN_UNLOCK (rtpbin);
2311       break;
2312     case PROP_DROP_ON_LATENCY:
2313       GST_RTP_BIN_LOCK (rtpbin);
2314       g_value_set_boolean (value, rtpbin->drop_on_latency);
2315       GST_RTP_BIN_UNLOCK (rtpbin);
2316       break;
2317     case PROP_SDES:
2318       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2319       break;
2320     case PROP_DO_LOST:
2321       GST_RTP_BIN_LOCK (rtpbin);
2322       g_value_set_boolean (value, rtpbin->do_lost);
2323       GST_RTP_BIN_UNLOCK (rtpbin);
2324       break;
2325     case PROP_IGNORE_PT:
2326       g_value_set_boolean (value, rtpbin->ignore_pt);
2327       break;
2328     case PROP_NTP_SYNC:
2329       g_value_set_boolean (value, rtpbin->ntp_sync);
2330       break;
2331     case PROP_RTCP_SYNC:
2332       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2333       break;
2334     case PROP_RTCP_SYNC_INTERVAL:
2335       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2336       break;
2337     case PROP_AUTOREMOVE:
2338       g_value_set_boolean (value, rtpbin->priv->autoremove);
2339       break;
2340     case PROP_BUFFER_MODE:
2341       g_value_set_enum (value, rtpbin->buffer_mode);
2342       break;
2343     case PROP_USE_PIPELINE_CLOCK:
2344       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2345       break;
2346     case PROP_DO_SYNC_EVENT:
2347       g_value_set_boolean (value, rtpbin->send_sync_event);
2348       break;
2349     case PROP_DO_RETRANSMISSION:
2350       GST_RTP_BIN_LOCK (rtpbin);
2351       g_value_set_boolean (value, rtpbin->do_retransmission);
2352       GST_RTP_BIN_UNLOCK (rtpbin);
2353       break;
2354     default:
2355       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2356       break;
2357   }
2358 }
2359
2360 static void
2361 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2362 {
2363   GstRtpBin *rtpbin;
2364
2365   rtpbin = GST_RTP_BIN (bin);
2366
2367   switch (GST_MESSAGE_TYPE (message)) {
2368     case GST_MESSAGE_ELEMENT:
2369     {
2370       const GstStructure *s = gst_message_get_structure (message);
2371
2372       /* we change the structure name and add the session ID to it */
2373       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2374         GstRtpBinSession *sess;
2375
2376         /* find the session we set it as object data */
2377         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2378             "GstRTPBin.session");
2379
2380         if (G_LIKELY (sess)) {
2381           message = gst_message_make_writable (message);
2382           s = gst_message_get_structure (message);
2383           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2384               sess->id, NULL);
2385         }
2386       }
2387       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2388       break;
2389     }
2390     case GST_MESSAGE_BUFFERING:
2391     {
2392       gint percent;
2393       gint min_percent = 100;
2394       GSList *sessions, *streams;
2395       GstRtpBinStream *stream;
2396       gboolean change = FALSE, active = FALSE;
2397       GstClockTime min_out_time;
2398       GstBufferingMode mode;
2399       gint avg_in, avg_out;
2400       gint64 buffering_left;
2401
2402       gst_message_parse_buffering (message, &percent);
2403       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2404           &buffering_left);
2405
2406       stream =
2407           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2408           "GstRTPBin.stream");
2409
2410       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2411
2412       /* get the stream */
2413       if (G_LIKELY (stream)) {
2414         GST_RTP_BIN_LOCK (rtpbin);
2415         /* fill in the percent */
2416         stream->percent = percent;
2417
2418         /* calculate the min value for all streams */
2419         for (sessions = rtpbin->sessions; sessions;
2420             sessions = g_slist_next (sessions)) {
2421           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2422
2423           GST_RTP_SESSION_LOCK (session);
2424           if (session->streams) {
2425             for (streams = session->streams; streams;
2426                 streams = g_slist_next (streams)) {
2427               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2428
2429               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2430                   stream->percent);
2431
2432               /* find min percent */
2433               if (min_percent > stream->percent)
2434                 min_percent = stream->percent;
2435             }
2436           } else {
2437             GST_INFO_OBJECT (bin,
2438                 "session has no streams, setting min_percent to 0");
2439             min_percent = 0;
2440           }
2441           GST_RTP_SESSION_UNLOCK (session);
2442         }
2443         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2444
2445         if (rtpbin->buffering) {
2446           if (min_percent == 100) {
2447             rtpbin->buffering = FALSE;
2448             active = TRUE;
2449             change = TRUE;
2450           }
2451         } else {
2452           if (min_percent < 100) {
2453             /* pause the streams */
2454             rtpbin->buffering = TRUE;
2455             active = FALSE;
2456             change = TRUE;
2457           }
2458         }
2459         GST_RTP_BIN_UNLOCK (rtpbin);
2460
2461         gst_message_unref (message);
2462
2463         /* make a new buffering message with the min value */
2464         message =
2465             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2466         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2467             buffering_left);
2468
2469         if (G_UNLIKELY (change)) {
2470           GstClock *clock;
2471           guint64 running_time = 0;
2472           guint64 offset = 0;
2473
2474           /* figure out the running time when we have a clock */
2475           if (G_LIKELY ((clock =
2476                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2477             guint64 now, base_time;
2478
2479             now = gst_clock_get_time (clock);
2480             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2481             running_time = now - base_time;
2482             gst_object_unref (clock);
2483           }
2484           GST_DEBUG_OBJECT (bin,
2485               "running time now %" GST_TIME_FORMAT,
2486               GST_TIME_ARGS (running_time));
2487
2488           GST_RTP_BIN_LOCK (rtpbin);
2489
2490           /* when we reactivate, calculate the offsets so that all streams have
2491            * an output time that is at least as big as the running_time */
2492           offset = 0;
2493           if (active) {
2494             if (running_time > rtpbin->buffer_start) {
2495               offset = running_time - rtpbin->buffer_start;
2496               if (offset >= rtpbin->latency_ns)
2497                 offset -= rtpbin->latency_ns;
2498               else
2499                 offset = 0;
2500             }
2501           }
2502
2503           /* pause all streams */
2504           min_out_time = -1;
2505           for (sessions = rtpbin->sessions; sessions;
2506               sessions = g_slist_next (sessions)) {
2507             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2508
2509             GST_RTP_SESSION_LOCK (session);
2510             for (streams = session->streams; streams;
2511                 streams = g_slist_next (streams)) {
2512               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2513               GstElement *element = stream->buffer;
2514               guint64 last_out;
2515
2516               g_signal_emit_by_name (element, "set-active", active, offset,
2517                   &last_out);
2518
2519               if (!active) {
2520                 g_object_get (element, "percent", &stream->percent, NULL);
2521
2522                 if (last_out == -1)
2523                   last_out = 0;
2524                 if (min_out_time == -1 || last_out < min_out_time)
2525                   min_out_time = last_out;
2526               }
2527
2528               GST_DEBUG_OBJECT (bin,
2529                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2530                   GST_TIME_FORMAT ", percent %d", element, active,
2531                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2532                   stream->percent);
2533             }
2534             GST_RTP_SESSION_UNLOCK (session);
2535           }
2536           GST_DEBUG_OBJECT (bin,
2537               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2538
2539           /* the buffer_start is the min out time of all paused jitterbuffers */
2540           if (!active)
2541             rtpbin->buffer_start = min_out_time;
2542
2543           GST_RTP_BIN_UNLOCK (rtpbin);
2544         }
2545       }
2546       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2547       break;
2548     }
2549     default:
2550     {
2551       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2552       break;
2553     }
2554   }
2555 }
2556
2557 static GstStateChangeReturn
2558 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2559 {
2560   GstStateChangeReturn res;
2561   GstRtpBin *rtpbin;
2562   GstRtpBinPrivate *priv;
2563
2564   rtpbin = GST_RTP_BIN (element);
2565   priv = rtpbin->priv;
2566
2567   switch (transition) {
2568     case GST_STATE_CHANGE_NULL_TO_READY:
2569       break;
2570     case GST_STATE_CHANGE_READY_TO_PAUSED:
2571       priv->last_unix = 0;
2572       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2573       g_atomic_int_set (&priv->shutdown, 0);
2574       break;
2575     case GST_STATE_CHANGE_PAUSED_TO_READY:
2576       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2577       g_atomic_int_set (&priv->shutdown, 1);
2578       /* wait for all callbacks to end by taking the lock. No new callbacks will
2579        * be able to happen as we set the shutdown flag. */
2580       GST_RTP_BIN_DYN_LOCK (rtpbin);
2581       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2582       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2583       break;
2584     default:
2585       break;
2586   }
2587
2588   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2589
2590   switch (transition) {
2591     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2592       break;
2593     case GST_STATE_CHANGE_PAUSED_TO_READY:
2594       break;
2595     case GST_STATE_CHANGE_READY_TO_NULL:
2596       break;
2597     default:
2598       break;
2599   }
2600   return res;
2601 }
2602
2603 static GstElement *
2604 session_request_encoder (GstRtpBinSession * session, guint signal)
2605 {
2606   GstElement *encoder = NULL;
2607   GstRtpBin *bin = session->bin;
2608
2609   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &encoder);
2610
2611   if (encoder) {
2612     if (!bin_manage_element (bin, encoder))
2613       goto manage_failed;
2614     session->encoders = g_slist_prepend (session->encoders, encoder);
2615   }
2616   return encoder;
2617
2618   /* ERRORS */
2619 manage_failed:
2620   {
2621     GST_WARNING_OBJECT (bin, "unable to manage encoder");
2622     gst_object_unref (encoder);
2623     return NULL;
2624   }
2625 }
2626
2627 static GstElement *
2628 session_request_decoder (GstRtpBinSession * session, guint signal)
2629 {
2630   GstElement *decoder = NULL;
2631   GstRtpBin *bin = session->bin;
2632
2633   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &decoder);
2634
2635   if (decoder) {
2636     if (!bin_manage_element (bin, decoder))
2637       goto manage_failed;
2638     session->decoders = g_slist_prepend (session->decoders, decoder);
2639   }
2640   return decoder;
2641
2642   /* ERRORS */
2643 manage_failed:
2644   {
2645     GST_WARNING_OBJECT (bin, "unable to manage decoder");
2646     gst_object_unref (decoder);
2647     return NULL;
2648   }
2649 }
2650
2651 /* a new pad (SSRC) was created in @session. This signal is emited from the
2652  * payload demuxer. */
2653 static void
2654 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2655     GstRtpBinStream * stream)
2656 {
2657   GstRtpBin *rtpbin;
2658   GstElementClass *klass;
2659   GstPadTemplate *templ;
2660   gchar *padname;
2661   GstPad *gpad;
2662
2663   rtpbin = stream->bin;
2664
2665   GST_DEBUG ("new payload pad %d", pt);
2666
2667   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2668
2669   /* ghost the pad to the parent */
2670   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2671   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2672   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2673       stream->session->id, stream->ssrc, pt);
2674   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2675   g_free (padname);
2676   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2677
2678   gst_pad_set_active (gpad, TRUE);
2679   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2680
2681   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2682
2683   return;
2684
2685 shutdown:
2686   {
2687     GST_DEBUG ("ignoring, we are shutting down");
2688     return;
2689   }
2690 }
2691
2692 static void
2693 payload_pad_removed (GstElement * element, GstPad * pad,
2694     GstRtpBinStream * stream)
2695 {
2696   GstRtpBin *rtpbin;
2697   GstPad *gpad;
2698
2699   rtpbin = stream->bin;
2700
2701   GST_DEBUG ("payload pad removed");
2702
2703   GST_RTP_BIN_DYN_LOCK (rtpbin);
2704   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2705     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2706
2707     gst_pad_set_active (gpad, FALSE);
2708     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2709   }
2710   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2711 }
2712
2713 static GstCaps *
2714 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2715 {
2716   GstRtpBin *rtpbin;
2717   GstCaps *caps;
2718
2719   rtpbin = session->bin;
2720
2721   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2722       session->id);
2723
2724   caps = get_pt_map (session, pt);
2725   if (!caps)
2726     goto no_caps;
2727
2728   return caps;
2729
2730   /* ERRORS */
2731 no_caps:
2732   {
2733     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2734     return NULL;
2735   }
2736 }
2737
2738 static void
2739 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2740 {
2741   GST_DEBUG_OBJECT (session->bin,
2742       "emiting signal for pt type changed to %d in session %d", pt,
2743       session->id);
2744
2745   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2746       0, session->id, pt);
2747 }
2748
2749 /* emited when caps changed for the session */
2750 static void
2751 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2752 {
2753   GstRtpBin *bin;
2754   GstCaps *caps;
2755   gint payload;
2756   const GstStructure *s;
2757
2758   bin = session->bin;
2759
2760   g_object_get (pad, "caps", &caps, NULL);
2761
2762   if (caps == NULL)
2763     return;
2764
2765   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2766
2767   s = gst_caps_get_structure (caps, 0);
2768
2769   /* get payload, finish when it's not there */
2770   if (!gst_structure_get_int (s, "payload", &payload))
2771     return;
2772
2773   GST_RTP_SESSION_LOCK (session);
2774   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2775   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2776   GST_RTP_SESSION_UNLOCK (session);
2777 }
2778
2779 /* a new pad (SSRC) was created in @session */
2780 static void
2781 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2782     GstRtpBinSession * session)
2783 {
2784   GstRtpBin *rtpbin;
2785   GstRtpBinStream *stream;
2786   GstPad *sinkpad, *srcpad;
2787   gchar *padname;
2788
2789   rtpbin = session->bin;
2790
2791   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2792       GST_DEBUG_PAD_NAME (pad));
2793
2794   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2795
2796   GST_RTP_SESSION_LOCK (session);
2797
2798   /* create new stream */
2799   stream = create_stream (session, ssrc);
2800   if (!stream)
2801     goto no_stream;
2802
2803   /* get pad and link */
2804   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2805   padname = g_strdup_printf ("src_%u", ssrc);
2806   srcpad = gst_element_get_static_pad (element, padname);
2807   g_free (padname);
2808   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2809   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2810   gst_object_unref (sinkpad);
2811   gst_object_unref (srcpad);
2812
2813   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2814   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2815   srcpad = gst_element_get_static_pad (element, padname);
2816   g_free (padname);
2817   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2818   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2819   gst_object_unref (sinkpad);
2820   gst_object_unref (srcpad);
2821
2822   /* connect to the RTCP sync signal from the jitterbuffer */
2823   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2824   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2825       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2826
2827   if (stream->demux) {
2828     /* connect to the new-pad signal of the payload demuxer, this will expose the
2829      * new pad by ghosting it. */
2830     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2831         "new-payload-type", (GCallback) new_payload_found, stream);
2832     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2833         "pad-removed", (GCallback) payload_pad_removed, stream);
2834
2835     /* connect to the request-pt-map signal. This signal will be emited by the
2836      * demuxer so that it can apply a proper caps on the buffers for the
2837      * depayloaders. */
2838     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2839         "request-pt-map", (GCallback) pt_map_requested, session);
2840     /* connect to the  signal so it can be forwarded. */
2841     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2842         "payload-type-change", (GCallback) payload_type_change, session);
2843   } else {
2844     /* add rtpjitterbuffer src pad to pads */
2845     GstElementClass *klass;
2846     GstPadTemplate *templ;
2847     gchar *padname;
2848     GstPad *gpad, *pad;
2849
2850     pad = gst_element_get_static_pad (stream->buffer, "src");
2851
2852     /* ghost the pad to the parent */
2853     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2854     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2855     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2856         stream->session->id, stream->ssrc, 255);
2857     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2858     g_free (padname);
2859
2860     gst_pad_set_active (gpad, TRUE);
2861     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2862
2863     gst_object_unref (pad);
2864   }
2865
2866   GST_RTP_SESSION_UNLOCK (session);
2867   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2868
2869   return;
2870
2871   /* ERRORS */
2872 shutdown:
2873   {
2874     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2875     return;
2876   }
2877 no_stream:
2878   {
2879     GST_RTP_SESSION_UNLOCK (session);
2880     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2881     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2882     return;
2883   }
2884 }
2885
2886 /* Create a pad for receiving RTP for the session in @name. Must be called with
2887  * RTP_BIN_LOCK.
2888  */
2889 static GstPad *
2890 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2891 {
2892   guint sessid;
2893   GstElement *decoder;
2894   GstPad *sinkdpad, *decsink;
2895   GstRtpBinSession *session;
2896
2897   /* first get the session number */
2898   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2899     goto no_name;
2900
2901   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2902
2903   /* get or create session */
2904   session = find_session_by_id (rtpbin, sessid);
2905   if (!session) {
2906     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2907     /* create session now */
2908     session = create_session (rtpbin, sessid);
2909     if (session == NULL)
2910       goto create_error;
2911   }
2912
2913   /* check if pad was requested */
2914   if (session->recv_rtp_sink_ghost != NULL)
2915     return session->recv_rtp_sink_ghost;
2916
2917   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2918   /* get recv_rtp pad and store */
2919   session->recv_rtp_sink =
2920       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2921   if (session->recv_rtp_sink == NULL)
2922     goto pad_failed;
2923
2924   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2925       (GCallback) caps_changed, session);
2926
2927   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
2928   decoder = session_request_decoder (session, SIGNAL_REQUEST_RTP_DECODER);
2929   if (decoder) {
2930     GstPad *decsrc;
2931     GstPadLinkReturn ret;
2932
2933     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
2934     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
2935     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
2936
2937     if (decsink == NULL)
2938       goto dec_sink_failed;
2939
2940     if (decsrc == NULL)
2941       goto dec_src_failed;
2942
2943     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
2944     gst_object_unref (decsrc);
2945
2946     if (ret != GST_PAD_LINK_OK)
2947       goto dec_link_failed;
2948   } else {
2949     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
2950     decsink = gst_object_ref (session->recv_rtp_sink);
2951   }
2952
2953   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2954   /* get srcpad, link to SSRCDemux */
2955   session->recv_rtp_src =
2956       gst_element_get_static_pad (session->session, "recv_rtp_src");
2957   if (session->recv_rtp_src == NULL)
2958     goto src_pad_failed;
2959
2960   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2961   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2962   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2963   gst_pad_link_full (session->recv_rtp_src, sinkdpad,
2964       GST_PAD_LINK_CHECK_NOTHING);
2965   gst_object_unref (sinkdpad);
2966
2967   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2968   session->demux_newpad_sig = g_signal_connect (session->demux,
2969       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2970   session->demux_padremoved_sig = g_signal_connect (session->demux,
2971       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2972
2973   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2974   session->recv_rtp_sink_ghost =
2975       gst_ghost_pad_new_from_template (name, decsink, templ);
2976   gst_object_unref (decsink);
2977   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2978   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2979
2980   return session->recv_rtp_sink_ghost;
2981
2982   /* ERRORS */
2983 no_name:
2984   {
2985     g_warning ("rtpbin: invalid name given");
2986     return NULL;
2987   }
2988 create_error:
2989   {
2990     /* create_session already warned */
2991     return NULL;
2992   }
2993 pad_failed:
2994   {
2995     g_warning ("rtpbin: failed to get session rtp_sink pad");
2996     return NULL;
2997   }
2998 dec_sink_failed:
2999   {
3000     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
3001     return NULL;
3002   }
3003 dec_src_failed:
3004   {
3005     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
3006     gst_object_unref (decsink);
3007     return NULL;
3008   }
3009 dec_link_failed:
3010   {
3011     g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid);
3012     gst_object_unref (decsink);
3013     return NULL;
3014   }
3015 src_pad_failed:
3016   {
3017     g_warning ("rtpbin: failed to get session rtp_src pad");
3018     gst_object_unref (decsink);
3019     return NULL;
3020   }
3021 }
3022
3023 static void
3024 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3025 {
3026   if (session->demux_newpad_sig) {
3027     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
3028     session->demux_newpad_sig = 0;
3029   }
3030   if (session->demux_padremoved_sig) {
3031     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
3032     session->demux_padremoved_sig = 0;
3033   }
3034   if (session->recv_rtp_src) {
3035     gst_object_unref (session->recv_rtp_src);
3036     session->recv_rtp_src = NULL;
3037   }
3038   if (session->recv_rtp_sink) {
3039     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
3040     gst_object_unref (session->recv_rtp_sink);
3041     session->recv_rtp_sink = NULL;
3042   }
3043   if (session->recv_rtp_sink_ghost) {
3044     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
3045     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3046         session->recv_rtp_sink_ghost);
3047     session->recv_rtp_sink_ghost = NULL;
3048   }
3049 }
3050
3051 /* Create a pad for receiving RTCP for the session in @name. Must be called with
3052  * RTP_BIN_LOCK.
3053  */
3054 static GstPad *
3055 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
3056     const gchar * name)
3057 {
3058   guint sessid;
3059   GstElement *decoder;
3060   GstRtpBinSession *session;
3061   GstPad *sinkdpad, *decsink;
3062
3063   /* first get the session number */
3064   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
3065     goto no_name;
3066
3067   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
3068
3069   /* get or create the session */
3070   session = find_session_by_id (rtpbin, sessid);
3071   if (!session) {
3072     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
3073     /* create session now */
3074     session = create_session (rtpbin, sessid);
3075     if (session == NULL)
3076       goto create_error;
3077   }
3078
3079   /* check if pad was requested */
3080   if (session->recv_rtcp_sink_ghost != NULL)
3081     return session->recv_rtcp_sink_ghost;
3082
3083   /* get recv_rtp pad and store */
3084   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
3085   session->recv_rtcp_sink =
3086       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
3087   if (session->recv_rtcp_sink == NULL)
3088     goto pad_failed;
3089
3090   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
3091   decoder = session_request_decoder (session, SIGNAL_REQUEST_RTCP_DECODER);
3092   if (decoder) {
3093     GstPad *decsrc;
3094     GstPadLinkReturn ret;
3095
3096     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
3097     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
3098     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
3099
3100     if (decsink == NULL)
3101       goto dec_sink_failed;
3102
3103     if (decsrc == NULL)
3104       goto dec_src_failed;
3105
3106     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
3107     gst_object_unref (decsrc);
3108
3109     if (ret != GST_PAD_LINK_OK)
3110       goto dec_link_failed;
3111   } else {
3112     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
3113     decsink = gst_object_ref (session->recv_rtcp_sink);
3114   }
3115
3116   /* get srcpad, link to SSRCDemux */
3117   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
3118   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
3119   if (session->sync_src == NULL)
3120     goto src_pad_failed;
3121
3122   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
3123   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
3124   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3125   gst_object_unref (sinkdpad);
3126
3127   session->recv_rtcp_sink_ghost =
3128       gst_ghost_pad_new_from_template (name, decsink, templ);
3129   gst_object_unref (decsink);
3130   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
3131   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
3132       session->recv_rtcp_sink_ghost);
3133
3134   return session->recv_rtcp_sink_ghost;
3135
3136   /* ERRORS */
3137 no_name:
3138   {
3139     g_warning ("rtpbin: invalid name given");
3140     return NULL;
3141   }
3142 create_error:
3143   {
3144     /* create_session already warned */
3145     return NULL;
3146   }
3147 pad_failed:
3148   {
3149     g_warning ("rtpbin: failed to get session rtcp_sink pad");
3150     return NULL;
3151   }
3152 dec_sink_failed:
3153   {
3154     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
3155     return NULL;
3156   }
3157 dec_src_failed:
3158   {
3159     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
3160     gst_object_unref (decsink);
3161     return NULL;
3162   }
3163 dec_link_failed:
3164   {
3165     g_warning ("rtpbin: failed to link rtcp decoder for session %d", sessid);
3166     gst_object_unref (decsink);
3167     return NULL;
3168   }
3169 src_pad_failed:
3170   {
3171     g_warning ("rtpbin: failed to get session sync_src pad");
3172     gst_object_unref (decsink);
3173     return NULL;
3174   }
3175 }
3176
3177 static void
3178 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3179 {
3180   if (session->recv_rtcp_sink_ghost) {
3181     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
3182     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3183         session->recv_rtcp_sink_ghost);
3184     session->recv_rtcp_sink_ghost = NULL;
3185   }
3186   if (session->sync_src) {
3187     /* releasing the request pad should also unref the sync pad */
3188     gst_object_unref (session->sync_src);
3189     session->sync_src = NULL;
3190   }
3191   if (session->recv_rtcp_sink) {
3192     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
3193     gst_object_unref (session->recv_rtcp_sink);
3194     session->recv_rtcp_sink = NULL;
3195   }
3196 }
3197
3198 /* Create a pad for sending RTP for the session in @name. Must be called with
3199  * RTP_BIN_LOCK.
3200  */
3201 static GstPad *
3202 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3203 {
3204   gchar *gname;
3205   guint sessid;
3206   GstPad *encsrc;
3207   GstElement *encoder;
3208   GstRtpBinSession *session;
3209   GstElementClass *klass;
3210
3211   /* first get the session number */
3212   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
3213     goto no_name;
3214
3215   /* get or create session */
3216   session = find_session_by_id (rtpbin, sessid);
3217   if (!session) {
3218     /* create session now */
3219     session = create_session (rtpbin, sessid);
3220     if (session == NULL)
3221       goto create_error;
3222   }
3223
3224   /* check if pad was requested */
3225   if (session->send_rtp_sink_ghost != NULL)
3226     return session->send_rtp_sink_ghost;
3227
3228   /* get send_rtp pad and store */
3229   session->send_rtp_sink =
3230       gst_element_get_request_pad (session->session, "send_rtp_sink");
3231   if (session->send_rtp_sink == NULL)
3232     goto pad_failed;
3233
3234   session->send_rtp_sink_ghost =
3235       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
3236   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
3237   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
3238
3239   /* get srcpad */
3240   session->send_rtp_src =
3241       gst_element_get_static_pad (session->session, "send_rtp_src");
3242   if (session->send_rtp_src == NULL)
3243     goto no_srcpad;
3244
3245   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
3246   encoder = session_request_encoder (session, SIGNAL_REQUEST_RTP_ENCODER);
3247   if (encoder) {
3248     gchar *ename;
3249     GstPad *encsink;
3250     GstPadLinkReturn ret;
3251
3252     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
3253     ename = g_strdup_printf ("rtp_sink_%d", sessid);
3254     encsink = gst_element_get_static_pad (encoder, ename);
3255     g_free (ename);
3256     ename = g_strdup_printf ("rtp_src_%d", sessid);
3257     encsrc = gst_element_get_static_pad (encoder, ename);
3258     g_free (ename);
3259
3260     if (encsrc == NULL)
3261       goto enc_src_failed;
3262
3263     if (encsink == NULL)
3264       goto enc_sink_failed;
3265
3266     ret = gst_pad_link (session->send_rtp_src, encsink);
3267     gst_object_unref (encsink);
3268
3269     if (ret != GST_PAD_LINK_OK)
3270       goto enc_link_failed;
3271   } else {
3272     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
3273     encsrc = gst_object_ref (session->send_rtp_src);
3274   }
3275
3276   /* ghost the new source pad */
3277   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3278   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
3279   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
3280   session->send_rtp_src_ghost =
3281       gst_ghost_pad_new_from_template (gname, encsrc, templ);
3282   gst_object_unref (encsrc);
3283   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
3284   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
3285   g_free (gname);
3286
3287   return session->send_rtp_sink_ghost;
3288
3289   /* ERRORS */
3290 no_name:
3291   {
3292     g_warning ("rtpbin: invalid name given");
3293     return NULL;
3294   }
3295 create_error:
3296   {
3297     /* create_session already warned */
3298     return NULL;
3299   }
3300 pad_failed:
3301   {
3302     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
3303     return NULL;
3304   }
3305 no_srcpad:
3306   {
3307     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
3308     return NULL;
3309   }
3310 enc_src_failed:
3311   {
3312     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3313     return NULL;
3314   }
3315 enc_sink_failed:
3316   {
3317     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3318     gst_object_unref (encsrc);
3319     return NULL;
3320   }
3321 enc_link_failed:
3322   {
3323     g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid);
3324     gst_object_unref (encsrc);
3325     return NULL;
3326   }
3327 }
3328
3329 static void
3330 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3331 {
3332   if (session->send_rtp_src_ghost) {
3333     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
3334     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3335         session->send_rtp_src_ghost);
3336     session->send_rtp_src_ghost = NULL;
3337   }
3338   if (session->send_rtp_src) {
3339     gst_object_unref (session->send_rtp_src);
3340     session->send_rtp_src = NULL;
3341   }
3342   if (session->send_rtp_sink) {
3343     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
3344         session->send_rtp_sink);
3345     gst_object_unref (session->send_rtp_sink);
3346     session->send_rtp_sink = NULL;
3347   }
3348   if (session->send_rtp_sink_ghost) {
3349     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
3350     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3351         session->send_rtp_sink_ghost);
3352     session->send_rtp_sink_ghost = NULL;
3353   }
3354 }
3355
3356 /* Create a pad for sending RTCP for the session in @name. Must be called with
3357  * RTP_BIN_LOCK.
3358  */
3359 static GstPad *
3360 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3361 {
3362   guint sessid;
3363   GstPad *encsrc;
3364   GstElement *encoder;
3365   GstRtpBinSession *session;
3366
3367   /* first get the session number */
3368   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
3369     goto no_name;
3370
3371   /* get or create session */
3372   session = find_session_by_id (rtpbin, sessid);
3373   if (!session)
3374     goto no_session;
3375
3376   /* check if pad was requested */
3377   if (session->send_rtcp_src_ghost != NULL)
3378     return session->send_rtcp_src_ghost;
3379
3380   /* get rtcp_src pad and store */
3381   session->send_rtcp_src =
3382       gst_element_get_request_pad (session->session, "send_rtcp_src");
3383   if (session->send_rtcp_src == NULL)
3384     goto pad_failed;
3385
3386   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
3387   encoder = session_request_encoder (session, SIGNAL_REQUEST_RTCP_ENCODER);
3388   if (encoder) {
3389     gchar *ename;
3390     GstPad *encsink;
3391     GstPadLinkReturn ret;
3392
3393     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
3394     ename = g_strdup_printf ("rtcp_sink_%d", sessid);
3395     encsink = gst_element_get_static_pad (encoder, ename);
3396     g_free (ename);
3397     ename = g_strdup_printf ("rtcp_src_%d", sessid);
3398     encsrc = gst_element_get_static_pad (encoder, ename);
3399     g_free (ename);
3400
3401     if (encsrc == NULL)
3402       goto enc_src_failed;
3403
3404     if (encsink == NULL)
3405       goto enc_sink_failed;
3406
3407     ret = gst_pad_link (session->send_rtcp_src, encsink);
3408     gst_object_unref (encsink);
3409
3410     if (ret != GST_PAD_LINK_OK)
3411       goto enc_link_failed;
3412   } else {
3413     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
3414     encsrc = gst_object_ref (session->send_rtcp_src);
3415   }
3416
3417   session->send_rtcp_src_ghost =
3418       gst_ghost_pad_new_from_template (name, encsrc, templ);
3419   gst_object_unref (encsrc);
3420   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
3421   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
3422
3423   return session->send_rtcp_src_ghost;
3424
3425   /* ERRORS */
3426 no_name:
3427   {
3428     g_warning ("rtpbin: invalid name given");
3429     return NULL;
3430   }
3431 no_session:
3432   {
3433     g_warning ("rtpbin: session with id %d does not exist", sessid);
3434     return NULL;
3435   }
3436 pad_failed:
3437   {
3438     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
3439     return NULL;
3440   }
3441 enc_src_failed:
3442   {
3443     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3444     return NULL;
3445   }
3446 enc_sink_failed:
3447   {
3448     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3449     gst_object_unref (encsrc);
3450     return NULL;
3451   }
3452 enc_link_failed:
3453   {
3454     g_warning ("rtpbin: failed to link rtcp encoder for session %d", sessid);
3455     gst_object_unref (encsrc);
3456     return NULL;
3457   }
3458 }
3459
3460 static void
3461 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3462 {
3463   if (session->send_rtcp_src_ghost) {
3464     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
3465     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3466         session->send_rtcp_src_ghost);
3467     session->send_rtcp_src_ghost = NULL;
3468   }
3469   if (session->send_rtcp_src) {
3470     gst_element_release_request_pad (session->session, session->send_rtcp_src);
3471     gst_object_unref (session->send_rtcp_src);
3472     session->send_rtcp_src = NULL;
3473   }
3474 }
3475
3476 /* If the requested name is NULL we should create a name with
3477  * the session number assuming we want the lowest posible session
3478  * with a free pad like the template */
3479 static gchar *
3480 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
3481 {
3482   gboolean name_found = FALSE;
3483   gint session = 0;
3484   GstIterator *pad_it = NULL;
3485   gchar *pad_name = NULL;
3486   GValue data = { 0, };
3487
3488   GST_DEBUG_OBJECT (element, "find a free pad name for template");
3489   while (!name_found) {
3490     gboolean done = FALSE;
3491
3492     g_free (pad_name);
3493     pad_name = g_strdup_printf (templ->name_template, session++);
3494     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
3495     name_found = TRUE;
3496     while (!done) {
3497       switch (gst_iterator_next (pad_it, &data)) {
3498         case GST_ITERATOR_OK:
3499         {
3500           GstPad *pad;
3501           gchar *name;
3502
3503           pad = g_value_get_object (&data);
3504           name = gst_pad_get_name (pad);
3505
3506           if (strcmp (name, pad_name) == 0) {
3507             done = TRUE;
3508             name_found = FALSE;
3509           }
3510           g_free (name);
3511           g_value_reset (&data);
3512           break;
3513         }
3514         case GST_ITERATOR_ERROR:
3515         case GST_ITERATOR_RESYNC:
3516           /* restart iteration */
3517           done = TRUE;
3518           name_found = FALSE;
3519           session = 0;
3520           break;
3521         case GST_ITERATOR_DONE:
3522           done = TRUE;
3523           break;
3524       }
3525     }
3526     g_value_unset (&data);
3527     gst_iterator_free (pad_it);
3528   }
3529
3530   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3531   return pad_name;
3532 }
3533
3534 /*
3535  */
3536 static GstPad *
3537 gst_rtp_bin_request_new_pad (GstElement * element,
3538     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3539 {
3540   GstRtpBin *rtpbin;
3541   GstElementClass *klass;
3542   GstPad *result;
3543
3544   gchar *pad_name = NULL;
3545
3546   g_return_val_if_fail (templ != NULL, NULL);
3547   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3548
3549   rtpbin = GST_RTP_BIN (element);
3550   klass = GST_ELEMENT_GET_CLASS (element);
3551
3552   GST_RTP_BIN_LOCK (rtpbin);
3553
3554   if (name == NULL) {
3555     /* use a free pad name */
3556     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3557   } else {
3558     /* use the provided name */
3559     pad_name = g_strdup (name);
3560   }
3561
3562   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3563
3564   /* figure out the template */
3565   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3566     result = create_recv_rtp (rtpbin, templ, pad_name);
3567   } else if (templ == gst_element_class_get_pad_template (klass,
3568           "recv_rtcp_sink_%u")) {
3569     result = create_recv_rtcp (rtpbin, templ, pad_name);
3570   } else if (templ == gst_element_class_get_pad_template (klass,
3571           "send_rtp_sink_%u")) {
3572     result = create_send_rtp (rtpbin, templ, pad_name);
3573   } else if (templ == gst_element_class_get_pad_template (klass,
3574           "send_rtcp_src_%u")) {
3575     result = create_rtcp (rtpbin, templ, pad_name);
3576   } else
3577     goto wrong_template;
3578
3579   g_free (pad_name);
3580   GST_RTP_BIN_UNLOCK (rtpbin);
3581
3582   return result;
3583
3584   /* ERRORS */
3585 wrong_template:
3586   {
3587     g_free (pad_name);
3588     GST_RTP_BIN_UNLOCK (rtpbin);
3589     g_warning ("rtpbin: this is not our template");
3590     return NULL;
3591   }
3592 }
3593
3594 static void
3595 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3596 {
3597   GstRtpBinSession *session;
3598   GstRtpBin *rtpbin;
3599
3600   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3601   g_return_if_fail (GST_IS_RTP_BIN (element));
3602
3603   rtpbin = GST_RTP_BIN (element);
3604
3605   GST_RTP_BIN_LOCK (rtpbin);
3606   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3607       GST_DEBUG_PAD_NAME (pad));
3608
3609   if (!(session = find_session_by_pad (rtpbin, pad)))
3610     goto unknown_pad;
3611
3612   if (session->recv_rtp_sink_ghost == pad) {
3613     remove_recv_rtp (rtpbin, session);
3614   } else if (session->recv_rtcp_sink_ghost == pad) {
3615     remove_recv_rtcp (rtpbin, session);
3616   } else if (session->send_rtp_sink_ghost == pad) {
3617     remove_send_rtp (rtpbin, session);
3618   } else if (session->send_rtcp_src_ghost == pad) {
3619     remove_rtcp (rtpbin, session);
3620   }
3621
3622   /* no more request pads, free the complete session */
3623   if (session->recv_rtp_sink_ghost == NULL
3624       && session->recv_rtcp_sink_ghost == NULL
3625       && session->send_rtp_sink_ghost == NULL
3626       && session->send_rtcp_src_ghost == NULL) {
3627     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3628     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3629     free_session (session, rtpbin);
3630   }
3631   GST_RTP_BIN_UNLOCK (rtpbin);
3632
3633   return;
3634
3635   /* ERROR */
3636 unknown_pad:
3637   {
3638     GST_RTP_BIN_UNLOCK (rtpbin);
3639     g_warning ("rtpbin: %s:%s is not one of our request pads",
3640         GST_DEBUG_PAD_NAME (pad));
3641     return;
3642   }
3643 }