Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpbin
22  * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * gstrtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of gstrtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * <refsect2>
67  * <title>Example pipelines</title>
68  * |[
69  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
70  *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
71  * ]| Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
72  * |[
73  * gst-launch gstrtpbin name=rtpbin \
74  *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
75  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
76  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
77  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
78  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
79  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
80  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
81  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
82  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
83  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
84  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
85  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
86  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
87  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
88  * is received on port 5007. Since RTCP packets from the sender should be sent
89  * as soon as possible and do not participate in preroll, sync=false and
90  * async=false is configured on udpsink
91  * |[
92  * gst-launch -v gstrtpbin name=rtpbin                                          \
93  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
94  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
95  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
96  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
97  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
98  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
99  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
100  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
101  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
102  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
103  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
104  * decode and display the video.
105  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
106  * decode and play the audio.
107  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
108  * session 1 on port 5003. These packets will be used for session management and
109  * synchronisation.
110  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
111  * on port 5007.
112  * </refsect2>
113  *
114  * Last reviewed on 2007-08-30 (0.10.6)
115  */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120 #include <stdio.h>
121 #include <string.h>
122
123 #include <gst/rtp/gstrtpbuffer.h>
124 #include <gst/rtp/gstrtcpbuffer.h>
125
126 #include "gstrtpbin-marshal.h"
127 #include "gstrtpbin.h"
128 #include "rtpsession.h"
129 #include "gstrtpsession.h"
130 #include "gstrtpjitterbuffer.h"
131
132 #include <gst/glib-compat-private.h>
133
134 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
135 #define GST_CAT_DEFAULT gst_rtp_bin_debug
136
137 /* sink pads */
138 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
139 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
140     GST_PAD_SINK,
141     GST_PAD_REQUEST,
142     GST_STATIC_CAPS ("application/x-rtp")
143     );
144
145 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS ("application/x-rtcp")
150     );
151
152 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
153 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
154     GST_PAD_SINK,
155     GST_PAD_REQUEST,
156     GST_STATIC_CAPS ("application/x-rtp")
157     );
158
159 /* src pads */
160 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
161 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
162     GST_PAD_SRC,
163     GST_PAD_SOMETIMES,
164     GST_STATIC_CAPS ("application/x-rtp")
165     );
166
167 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
168 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
169     GST_PAD_SRC,
170     GST_PAD_REQUEST,
171     GST_STATIC_CAPS ("application/x-rtcp")
172     );
173
174 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
175 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
176     GST_PAD_SRC,
177     GST_PAD_SOMETIMES,
178     GST_STATIC_CAPS ("application/x-rtp")
179     );
180
181 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
182    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
183
184 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
185 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
186
187 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
188 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
189 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
190
191 /* lock for shutdown */
192 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
193 G_STMT_START {                                   \
194   if (g_atomic_int_get (&bin->priv->shutdown))   \
195     goto label;                                  \
196   GST_RTP_BIN_DYN_LOCK (bin);                    \
197   if (g_atomic_int_get (&bin->priv->shutdown)) { \
198     GST_RTP_BIN_DYN_UNLOCK (bin);                \
199     goto label;                                  \
200   }                                              \
201 } G_STMT_END
202
203 /* unlock for shutdown */
204 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
205   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
206
207 struct _GstRtpBinPrivate
208 {
209   GMutex bin_lock;
210
211   /* lock protecting dynamic adding/removing */
212   GMutex dyn_lock;
213
214   /* if we are shutting down or not */
215   gint shutdown;
216
217   gboolean autoremove;
218
219   /* UNIX (ntp) time of last SR sync used */
220   guint64 last_unix;
221 };
222
223 /* signals and args */
224 enum
225 {
226   SIGNAL_REQUEST_PT_MAP,
227   SIGNAL_PAYLOAD_TYPE_CHANGE,
228   SIGNAL_CLEAR_PT_MAP,
229   SIGNAL_RESET_SYNC,
230   SIGNAL_GET_INTERNAL_SESSION,
231
232   SIGNAL_ON_NEW_SSRC,
233   SIGNAL_ON_SSRC_COLLISION,
234   SIGNAL_ON_SSRC_VALIDATED,
235   SIGNAL_ON_SSRC_ACTIVE,
236   SIGNAL_ON_SSRC_SDES,
237   SIGNAL_ON_BYE_SSRC,
238   SIGNAL_ON_BYE_TIMEOUT,
239   SIGNAL_ON_TIMEOUT,
240   SIGNAL_ON_SENDER_TIMEOUT,
241   SIGNAL_ON_NPT_STOP,
242   LAST_SIGNAL
243 };
244
245 #define DEFAULT_LATENCY_MS           200
246 #define DEFAULT_SDES                 NULL
247 #define DEFAULT_DO_LOST              FALSE
248 #define DEFAULT_IGNORE_PT            FALSE
249 #define DEFAULT_NTP_SYNC             FALSE
250 #define DEFAULT_AUTOREMOVE           FALSE
251 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
252 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
253 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
254 #define DEFAULT_RTCP_SYNC_INTERVAL   0
255
256 enum
257 {
258   PROP_0,
259   PROP_LATENCY,
260   PROP_SDES,
261   PROP_DO_LOST,
262   PROP_IGNORE_PT,
263   PROP_NTP_SYNC,
264   PROP_RTCP_SYNC,
265   PROP_RTCP_SYNC_INTERVAL,
266   PROP_AUTOREMOVE,
267   PROP_BUFFER_MODE,
268   PROP_USE_PIPELINE_CLOCK,
269   PROP_LAST
270 };
271
272 enum
273 {
274   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
275   GST_RTP_BIN_RTCP_SYNC_INITIAL,
276   GST_RTP_BIN_RTCP_SYNC_RTP
277 };
278
279 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
280 static GType
281 gst_rtp_bin_rtcp_sync_get_type (void)
282 {
283   static GType rtcp_sync_type = 0;
284   static const GEnumValue rtcp_sync_types[] = {
285     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
286     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
287     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
288     {0, NULL, NULL},
289   };
290
291   if (!rtcp_sync_type) {
292     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
293   }
294   return rtcp_sync_type;
295 }
296
297 /* helper objects */
298 typedef struct _GstRtpBinSession GstRtpBinSession;
299 typedef struct _GstRtpBinStream GstRtpBinStream;
300 typedef struct _GstRtpBinClient GstRtpBinClient;
301
302 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
303
304 static GstCaps *pt_map_requested (GstElement * element, guint pt,
305     GstRtpBinSession * session);
306 static void payload_type_change (GstElement * element, guint pt,
307     GstRtpBinSession * session);
308 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
309 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
310 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
311 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
312 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
313 static void free_stream (GstRtpBinStream * stream);
314
315 /* Manages the RTP stream for one SSRC.
316  *
317  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
318  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
319  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
320  * together (see below).
321  */
322 struct _GstRtpBinStream
323 {
324   /* the SSRC of this stream */
325   guint32 ssrc;
326
327   /* parent bin */
328   GstRtpBin *bin;
329
330   /* the session this SSRC belongs to */
331   GstRtpBinSession *session;
332
333   /* the jitterbuffer of the SSRC */
334   GstElement *buffer;
335   gulong buffer_handlesync_sig;
336   gulong buffer_ptreq_sig;
337   gulong buffer_ntpstop_sig;
338   gint percent;
339
340   /* the PT demuxer of the SSRC */
341   GstElement *demux;
342   gulong demux_newpad_sig;
343   gulong demux_padremoved_sig;
344   gulong demux_ptreq_sig;
345   gulong demux_ptchange_sig;
346
347   /* if we have calculated a valid rt_delta for this stream */
348   gboolean have_sync;
349   /* mapping to local RTP and NTP time */
350   gint64 rt_delta;
351   gint64 rtp_delta;
352   /* base rtptime in gst time */
353   gint64 clock_base;
354 };
355
356 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
357 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
358
359 /* Manages the receiving end of the packets.
360  *
361  * There is one such structure for each RTP session (audio/video/...).
362  * We get the RTP/RTCP packets and stuff them into the session manager. From
363  * there they are pushed into an SSRC demuxer that splits the stream based on
364  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
365  * the GstRtpBinStream above).
366  */
367 struct _GstRtpBinSession
368 {
369   /* session id */
370   gint id;
371   /* the parent bin */
372   GstRtpBin *bin;
373   /* the session element */
374   GstElement *session;
375   /* the SSRC demuxer */
376   GstElement *demux;
377   gulong demux_newpad_sig;
378   gulong demux_padremoved_sig;
379
380   GMutex lock;
381
382   /* list of GstRtpBinStream */
383   GSList *streams;
384
385   /* mapping of payload type to caps */
386   GHashTable *ptmap;
387
388   /* the pads of the session */
389   GstPad *recv_rtp_sink;
390   GstPad *recv_rtp_sink_ghost;
391   GstPad *recv_rtp_src;
392   GstPad *recv_rtcp_sink;
393   GstPad *recv_rtcp_sink_ghost;
394   GstPad *sync_src;
395   GstPad *send_rtp_sink;
396   GstPad *send_rtp_sink_ghost;
397   GstPad *send_rtp_src;
398   GstPad *send_rtp_src_ghost;
399   GstPad *send_rtcp_src;
400   GstPad *send_rtcp_src_ghost;
401 };
402
403 /* Manages the RTP streams that come from one client and should therefore be
404  * synchronized.
405  */
406 struct _GstRtpBinClient
407 {
408   /* the common CNAME for the streams */
409   gchar *cname;
410   guint cname_len;
411
412   /* the streams */
413   guint nstreams;
414   GSList *streams;
415 };
416
417 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
418 static GstRtpBinSession *
419 find_session_by_id (GstRtpBin * rtpbin, gint id)
420 {
421   GSList *walk;
422
423   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
424     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
425
426     if (sess->id == id)
427       return sess;
428   }
429   return NULL;
430 }
431
432 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
433 static GstRtpBinSession *
434 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
435 {
436   GSList *walk;
437
438   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
439     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
440
441     if ((sess->recv_rtp_sink_ghost == pad) ||
442         (sess->recv_rtcp_sink_ghost == pad) ||
443         (sess->send_rtp_sink_ghost == pad)
444         || (sess->send_rtcp_src_ghost == pad))
445       return sess;
446   }
447   return NULL;
448 }
449
450 static void
451 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
452 {
453   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
454       sess->id, ssrc);
455 }
456
457 static void
458 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
459 {
460   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
461       sess->id, ssrc);
462 }
463
464 static void
465 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
466 {
467   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
468       sess->id, ssrc);
469 }
470
471 static void
472 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
473 {
474   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
475       sess->id, ssrc);
476 }
477
478 static void
479 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
480 {
481   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
482       sess->id, ssrc);
483 }
484
485 static void
486 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
487 {
488   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
489       sess->id, ssrc);
490 }
491
492 static void
493 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
494 {
495   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
496       sess->id, ssrc);
497
498   if (sess->bin->priv->autoremove)
499     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
500 }
501
502 static void
503 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
504 {
505   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
506       sess->id, ssrc);
507
508   if (sess->bin->priv->autoremove)
509     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
510 }
511
512 static void
513 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
514 {
515   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
516       sess->id, ssrc);
517 }
518
519 static void
520 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
521 {
522   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
523       stream->session->id, stream->ssrc);
524 }
525
526 /* must be called with the SESSION lock */
527 static GstRtpBinStream *
528 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
529 {
530   GSList *walk;
531
532   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
533     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
534
535     if (stream->ssrc == ssrc)
536       return stream;
537   }
538   return NULL;
539 }
540
541 static void
542 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
543     GstRtpBinSession * session)
544 {
545   GstRtpBinStream *stream = NULL;
546
547   GST_RTP_SESSION_LOCK (session);
548   if ((stream = find_stream_by_ssrc (session, ssrc)))
549     session->streams = g_slist_remove (session->streams, stream);
550   GST_RTP_SESSION_UNLOCK (session);
551
552   if (stream)
553     free_stream (stream);
554 }
555
556 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
557 static GstRtpBinSession *
558 create_session (GstRtpBin * rtpbin, gint id)
559 {
560   GstRtpBinSession *sess;
561   GstElement *session, *demux;
562   GstState target;
563
564   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
565     goto no_session;
566
567   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
568     goto no_demux;
569
570   sess = g_new0 (GstRtpBinSession, 1);
571   g_mutex_init (&sess->lock);
572   sess->id = id;
573   sess->bin = rtpbin;
574   sess->session = session;
575   sess->demux = demux;
576   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
577       (GDestroyNotify) gst_caps_unref);
578   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
579
580   /* configure SDES items */
581   GST_OBJECT_LOCK (rtpbin);
582   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
583       rtpbin->use_pipeline_clock, NULL);
584   GST_OBJECT_UNLOCK (rtpbin);
585
586   /* provide clock_rate to the session manager when needed */
587   g_signal_connect (session, "request-pt-map",
588       (GCallback) pt_map_requested, sess);
589
590   g_signal_connect (sess->session, "on-new-ssrc",
591       (GCallback) on_new_ssrc, sess);
592   g_signal_connect (sess->session, "on-ssrc-collision",
593       (GCallback) on_ssrc_collision, sess);
594   g_signal_connect (sess->session, "on-ssrc-validated",
595       (GCallback) on_ssrc_validated, sess);
596   g_signal_connect (sess->session, "on-ssrc-active",
597       (GCallback) on_ssrc_active, sess);
598   g_signal_connect (sess->session, "on-ssrc-sdes",
599       (GCallback) on_ssrc_sdes, sess);
600   g_signal_connect (sess->session, "on-bye-ssrc",
601       (GCallback) on_bye_ssrc, sess);
602   g_signal_connect (sess->session, "on-bye-timeout",
603       (GCallback) on_bye_timeout, sess);
604   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
605   g_signal_connect (sess->session, "on-sender-timeout",
606       (GCallback) on_sender_timeout, sess);
607
608   gst_bin_add (GST_BIN_CAST (rtpbin), session);
609   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
610
611   GST_OBJECT_LOCK (rtpbin);
612   target = GST_STATE_TARGET (rtpbin);
613   GST_OBJECT_UNLOCK (rtpbin);
614
615   /* change state only to what's needed */
616   gst_element_set_state (demux, target);
617   gst_element_set_state (session, target);
618
619   return sess;
620
621   /* ERRORS */
622 no_session:
623   {
624     g_warning ("rtpbin: could not create gstrtpsession element");
625     return NULL;
626   }
627 no_demux:
628   {
629     gst_object_unref (session);
630     g_warning ("rtpbin: could not create gstrtpssrcdemux element");
631     return NULL;
632   }
633 }
634
635 static void
636 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
637 {
638   GSList *client_walk;
639
640   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
641
642   gst_element_set_locked_state (sess->demux, TRUE);
643   gst_element_set_locked_state (sess->session, TRUE);
644
645   gst_element_set_state (sess->demux, GST_STATE_NULL);
646   gst_element_set_state (sess->session, GST_STATE_NULL);
647
648   GST_RTP_BIN_LOCK (bin);
649   remove_recv_rtp (bin, sess);
650   remove_recv_rtcp (bin, sess);
651   remove_send_rtp (bin, sess);
652   remove_rtcp (bin, sess);
653   GST_RTP_BIN_UNLOCK (bin);
654
655   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
656   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
657
658   /* remove any references in bin->clients to the streams in sess->streams */
659   client_walk = bin->clients;
660   while (client_walk) {
661     GSList *client_node = client_walk;
662     GstRtpBinClient *client = (GstRtpBinClient *) client_node->data;
663     GSList *stream_walk = client->streams;
664
665     while (stream_walk) {
666       GSList *stream_node = stream_walk;
667       GstRtpBinStream *stream = (GstRtpBinStream *) stream_node->data;
668       GSList *inner_walk;
669
670       stream_walk = g_slist_next (stream_walk);
671
672       for (inner_walk = sess->streams; inner_walk;
673           inner_walk = g_slist_next (inner_walk)) {
674         if ((GstRtpBinStream *) inner_walk->data == stream) {
675           client->streams = g_slist_delete_link (client->streams, stream_node);
676           --client->nstreams;
677           break;
678         }
679       }
680     }
681     client_walk = g_slist_next (client_walk);
682
683     g_assert ((client->streams && client->nstreams > 0) || (!client->streams
684             && client->streams == 0));
685     if (client->nstreams == 0) {
686       free_client (client, bin);
687       bin->clients = g_slist_delete_link (bin->clients, client_node);
688     }
689   }
690
691   g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
692   g_slist_free (sess->streams);
693
694   g_mutex_clear (&sess->lock);
695   g_hash_table_destroy (sess->ptmap);
696
697   g_free (sess);
698 }
699
700 /* get the payload type caps for the specific payload @pt in @session */
701 static GstCaps *
702 get_pt_map (GstRtpBinSession * session, guint pt)
703 {
704   GstCaps *caps = NULL;
705   GstRtpBin *bin;
706   GValue ret = { 0 };
707   GValue args[3] = { {0}, {0}, {0} };
708
709   GST_DEBUG ("searching pt %d in cache", pt);
710
711   GST_RTP_SESSION_LOCK (session);
712
713   /* first look in the cache */
714   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
715   if (caps) {
716     gst_caps_ref (caps);
717     goto done;
718   }
719
720   bin = session->bin;
721
722   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
723
724   /* not in cache, send signal to request caps */
725   g_value_init (&args[0], GST_TYPE_ELEMENT);
726   g_value_set_object (&args[0], bin);
727   g_value_init (&args[1], G_TYPE_UINT);
728   g_value_set_uint (&args[1], session->id);
729   g_value_init (&args[2], G_TYPE_UINT);
730   g_value_set_uint (&args[2], pt);
731
732   g_value_init (&ret, GST_TYPE_CAPS);
733   g_value_set_boxed (&ret, NULL);
734
735   GST_RTP_SESSION_UNLOCK (session);
736
737   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
738
739   GST_RTP_SESSION_LOCK (session);
740
741   g_value_unset (&args[0]);
742   g_value_unset (&args[1]);
743   g_value_unset (&args[2]);
744
745   /* look in the cache again because we let the lock go */
746   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
747   if (caps) {
748     gst_caps_ref (caps);
749     g_value_unset (&ret);
750     goto done;
751   }
752
753   caps = (GstCaps *) g_value_dup_boxed (&ret);
754   g_value_unset (&ret);
755   if (!caps)
756     goto no_caps;
757
758   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
759
760   /* store in cache, take additional ref */
761   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
762       gst_caps_ref (caps));
763
764 done:
765   GST_RTP_SESSION_UNLOCK (session);
766
767   return caps;
768
769   /* ERRORS */
770 no_caps:
771   {
772     GST_RTP_SESSION_UNLOCK (session);
773     GST_DEBUG ("no pt map could be obtained");
774     return NULL;
775   }
776 }
777
778 static gboolean
779 return_true (gpointer key, gpointer value, gpointer user_data)
780 {
781   return TRUE;
782 }
783
784 static void
785 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
786 {
787   GSList *clients, *streams;
788
789   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
790
791   GST_RTP_BIN_LOCK (rtpbin);
792   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
793     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
794
795     /* reset sync on all streams for this client */
796     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
797       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
798
799       /* make use require a new SR packet for this stream before we attempt new
800        * lip-sync */
801       stream->have_sync = FALSE;
802       stream->rt_delta = 0;
803       stream->rtp_delta = 0;
804       stream->clock_base = -100 * GST_SECOND;
805     }
806   }
807   GST_RTP_BIN_UNLOCK (rtpbin);
808 }
809
810 static void
811 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
812 {
813   GSList *sessions, *streams;
814
815   GST_RTP_BIN_LOCK (bin);
816   GST_DEBUG_OBJECT (bin, "clearing pt map");
817   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
818     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
819
820     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
821     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
822
823     GST_RTP_SESSION_LOCK (session);
824     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
825
826     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
827       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
828
829       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
830       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
831       if (stream->demux)
832         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
833     }
834     GST_RTP_SESSION_UNLOCK (session);
835   }
836   GST_RTP_BIN_UNLOCK (bin);
837
838   /* reset sync too */
839   gst_rtp_bin_reset_sync (bin);
840 }
841
842 static RTPSession *
843 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
844 {
845   RTPSession *internal_session = NULL;
846   GstRtpBinSession *session;
847
848   GST_RTP_BIN_LOCK (bin);
849   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
850       session_id);
851   session = find_session_by_id (bin, (gint) session_id);
852   if (session) {
853     g_object_get (session->session, "internal-session", &internal_session,
854         NULL);
855   }
856   GST_RTP_BIN_UNLOCK (bin);
857
858   return internal_session;
859 }
860
861 static void
862 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
863     const gchar * name, const GValue * value)
864 {
865   GSList *sessions, *streams;
866
867   GST_RTP_BIN_LOCK (bin);
868   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
869     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
870
871     GST_RTP_SESSION_LOCK (session);
872     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
873       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
874
875       g_object_set_property (G_OBJECT (stream->buffer), name, value);
876     }
877     GST_RTP_SESSION_UNLOCK (session);
878   }
879   GST_RTP_BIN_UNLOCK (bin);
880 }
881
882 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
883 static GstRtpBinClient *
884 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
885 {
886   GstRtpBinClient *result = NULL;
887   GSList *walk;
888
889   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
890     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
891
892     if (len != client->cname_len)
893       continue;
894
895     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
896       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
897           client->cname);
898       result = client;
899       break;
900     }
901   }
902
903   /* nothing found, create one */
904   if (result == NULL) {
905     result = g_new0 (GstRtpBinClient, 1);
906     result->cname = g_strndup ((gchar *) data, len);
907     result->cname_len = len;
908     bin->clients = g_slist_prepend (bin->clients, result);
909     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
910         result->cname);
911   }
912   return result;
913 }
914
915 static void
916 free_client (GstRtpBinClient * client, GstRtpBin * bin)
917 {
918   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
919   g_slist_free (client->streams);
920   g_free (client->cname);
921   g_free (client);
922 }
923
924 static void
925 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
926     guint64 * ntpnstime)
927 {
928   guint64 ntpns;
929   GstClock *clock;
930   GstClockTime base_time, rt, clock_time;
931
932   GST_OBJECT_LOCK (bin);
933   if ((clock = GST_ELEMENT_CLOCK (bin))) {
934     base_time = GST_ELEMENT_CAST (bin)->base_time;
935     gst_object_ref (clock);
936     GST_OBJECT_UNLOCK (bin);
937
938     clock_time = gst_clock_get_time (clock);
939
940     if (bin->use_pipeline_clock) {
941       ntpns = clock_time;
942     } else {
943       GTimeVal current;
944
945       /* get current NTP time */
946       g_get_current_time (&current);
947       ntpns = GST_TIMEVAL_TO_TIME (current);
948     }
949
950     /* add constant to convert from 1970 based time to 1900 based time */
951     ntpns += (2208988800LL * GST_SECOND);
952
953     /* get current clock time and convert to running time */
954     rt = clock_time - base_time;
955
956     gst_object_unref (clock);
957   } else {
958     GST_OBJECT_UNLOCK (bin);
959     rt = -1;
960     ntpns = -1;
961   }
962   if (running_time)
963     *running_time = rt;
964   if (ntpnstime)
965     *ntpnstime = ntpns;
966 }
967
968 static void
969 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
970     gint64 ts_offset)
971 {
972   gint64 prev_ts_offset;
973
974   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
975
976   /* delta changed, see how much */
977   if (prev_ts_offset != ts_offset) {
978     gint64 diff;
979
980     diff = prev_ts_offset - ts_offset;
981
982     GST_DEBUG_OBJECT (bin,
983         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
984         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
985
986     /* only change diff when it changed more than 4 milliseconds. This
987      * compensates for rounding errors in NTP to RTP timestamp
988      * conversions */
989     if (ABS (diff) > 4 * GST_MSECOND) {
990       if (ABS (diff) < (3 * GST_SECOND)) {
991         g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
992       } else {
993         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
994       }
995     } else {
996       GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
997     }
998   }
999   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1000       stream->ssrc, ts_offset);
1001 }
1002
1003 /* associate a stream to the given CNAME. This will make sure all streams for
1004  * that CNAME are synchronized together.
1005  * Must be called with GST_RTP_BIN_LOCK */
1006 static void
1007 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1008     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1009     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1010     gint64 rtp_clock_base)
1011 {
1012   GstRtpBinClient *client;
1013   gboolean created;
1014   GSList *walk;
1015   guint64 local_rt;
1016   guint64 local_rtp;
1017   GstClockTime running_time;
1018   guint64 ntpnstime;
1019   gint64 ntpdiff, rtdiff;
1020   guint64 last_unix;
1021
1022   /* first find or create the CNAME */
1023   client = get_client (bin, len, data, &created);
1024
1025   /* find stream in the client */
1026   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1027     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1028
1029     if (ostream == stream)
1030       break;
1031   }
1032   /* not found, add it to the list */
1033   if (walk == NULL) {
1034     GST_DEBUG_OBJECT (bin,
1035         "new association of SSRC %08x with client %p with CNAME %s",
1036         stream->ssrc, client, client->cname);
1037     client->streams = g_slist_prepend (client->streams, stream);
1038     client->nstreams++;
1039   } else {
1040     GST_DEBUG_OBJECT (bin,
1041         "found association of SSRC %08x with client %p with CNAME %s",
1042         stream->ssrc, client, client->cname);
1043   }
1044
1045   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1046     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1047     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1048       /* we don't need that data, so carry on,
1049        * but make some values look saner */
1050       last_extrtptime = base_rtptime;
1051     } else {
1052       /* nothing we can do with this data in this case */
1053       GST_DEBUG_OBJECT (bin, "bailing out");
1054       return;
1055     }
1056   }
1057
1058   /* Take the extended rtptime we found in the SR packet and map it to the
1059    * local rtptime. The local rtp time is used to construct timestamps on the
1060    * buffers so we will calculate what running_time corresponds to the RTP
1061    * timestamp in the SR packet. */
1062   local_rtp = last_extrtptime - base_rtptime;
1063
1064   GST_DEBUG_OBJECT (bin,
1065       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1066       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1067       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1068       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1069
1070   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1071    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1072    * into a corresponding gstreamer timestamp. Note that the base_time also
1073    * contains the drift between sender and receiver. */
1074   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1075   local_rt += base_time;
1076
1077   /* convert ntptime to unix time since 1900 */
1078   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1079       (G_GINT64_CONSTANT (1) << 32));
1080
1081   stream->have_sync = TRUE;
1082
1083   GST_DEBUG_OBJECT (bin,
1084       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1085       local_rt, last_unix);
1086
1087   /* recalc inter stream playout offset, but only if there is more than one
1088    * stream or we're doing NTP sync. */
1089   if (bin->ntp_sync) {
1090     /* For NTP sync we need to first get a snapshot of running_time and NTP
1091      * time. We know at what running_time we play a certain RTP time, we also
1092      * calculated when we would play the RTP time in the SR packet. Now we need
1093      * to know how the running_time and the NTP time relate to eachother. */
1094     get_current_times (bin, &running_time, &ntpnstime);
1095
1096     /* see how far away the NTP time is. This is the difference between the
1097      * current NTP time and the NTP time in the last SR packet. */
1098     ntpdiff = ntpnstime - last_unix;
1099     /* see how far away the running_time is. This is the difference between the
1100      * current running_time and the running_time of the RTP timestamp in the
1101      * last SR packet. */
1102     rtdiff = running_time - local_rt;
1103
1104     GST_DEBUG_OBJECT (bin,
1105         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1106         ntpnstime, last_unix);
1107     GST_DEBUG_OBJECT (bin,
1108         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1109         rtdiff);
1110
1111     /* combine to get the final diff to apply to the running_time */
1112     stream->rt_delta = rtdiff - ntpdiff;
1113
1114     stream_set_ts_offset (bin, stream, stream->rt_delta);
1115   } else {
1116     gint64 min, rtp_min, clock_base = stream->clock_base;
1117     gboolean all_sync, use_rtp;
1118     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1119
1120     /* calculate delta between server and receiver. last_unix is created by
1121      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1122      * delta expresses the difference to our timeline and the server timeline. The
1123      * difference in itself doesn't mean much but we can combine the delta of
1124      * multiple streams to create a stream specific offset. */
1125     stream->rt_delta = last_unix - local_rt;
1126
1127     /* calculate the min of all deltas, ignoring streams that did not yet have a
1128      * valid rt_delta because we did not yet receive an SR packet for those
1129      * streams.
1130      * We calculate the mininum because we would like to only apply positive
1131      * offsets to streams, delaying their playback instead of trying to speed up
1132      * other streams (which might be imposible when we have to create negative
1133      * latencies).
1134      * The stream that has the smallest diff is selected as the reference stream,
1135      * all other streams will have a positive offset to this difference. */
1136
1137     /* some alternative setting allow ignoring RTCP as much as possible,
1138      * for servers generating bogus ntp timeline */
1139     min = rtp_min = G_MAXINT64;
1140     use_rtp = FALSE;
1141     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1142       guint64 ext_base;
1143
1144       use_rtp = TRUE;
1145       /* signed version for convienience */
1146       clock_base = base_rtptime;
1147       /* deal with possible wrap-around */
1148       ext_base = base_rtptime;
1149       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1150       /* sanity check; base rtp and provided clock_base should be close */
1151       if (rtp_clock_base >= clock_base) {
1152         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1153           rtp_clock_base = base_time +
1154               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1155               GST_SECOND, clock_rate);
1156         } else {
1157           use_rtp = FALSE;
1158         }
1159       } else {
1160         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1161           rtp_clock_base = base_time -
1162               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1163               GST_SECOND, clock_rate);
1164         } else {
1165           use_rtp = FALSE;
1166         }
1167       }
1168       /* warn and bail for clarity out if no sane values */
1169       if (!use_rtp) {
1170         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1171         return;
1172       }
1173       /* store to track changes */
1174       clock_base = rtp_clock_base;
1175       /* generate a fake as before,
1176        * now equating rtptime obtained from RTP-Info,
1177        * where the large time represent the otherwise irrelevant npt/ntp time */
1178       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1179     } else {
1180       clock_base = rtp_clock_base;
1181     }
1182
1183     all_sync = TRUE;
1184     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1185       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1186
1187       if (!ostream->have_sync) {
1188         all_sync = FALSE;
1189         continue;
1190       }
1191
1192       /* change in current stream's base from previously init'ed value
1193        * leads to reset of all stream's base */
1194       if (stream != ostream && stream->clock_base >= 0 &&
1195           (stream->clock_base != clock_base)) {
1196         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1197         ostream->clock_base = -100 * GST_SECOND;
1198         ostream->rtp_delta = 0;
1199       }
1200
1201       if (ostream->rt_delta < min)
1202         min = ostream->rt_delta;
1203       if (ostream->rtp_delta < rtp_min)
1204         rtp_min = ostream->rtp_delta;
1205     }
1206
1207     /* arrange to re-sync for each stream upon significant change,
1208      * e.g. post-seek */
1209     all_sync = all_sync && (stream->clock_base == clock_base);
1210     stream->clock_base = clock_base;
1211
1212     /* may need init performed above later on, but nothing more to do now */
1213     if (client->nstreams <= 1)
1214       return;
1215
1216     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1217         " all sync %d", client, min, all_sync);
1218     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1219
1220     switch (rtcp_sync) {
1221       case GST_RTP_BIN_RTCP_SYNC_RTP:
1222         if (!use_rtp)
1223           break;
1224         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1225             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1226         /* fall-through */
1227       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1228         /* if all have been synced already, do not bother further */
1229         if (all_sync) {
1230           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1231           return;
1232         }
1233         break;
1234       default:
1235         break;
1236     }
1237
1238     /* bail out if we adjusted recently enough */
1239     if (all_sync && (last_unix - bin->priv->last_unix) <
1240         bin->rtcp_sync_interval * GST_MSECOND) {
1241       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1242           "previous sender info too recent "
1243           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1244       return;
1245     }
1246     bin->priv->last_unix = last_unix;
1247
1248     /* calculate offsets for each stream */
1249     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1250       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1251       gint64 ts_offset;
1252
1253       /* ignore streams for which we didn't receive an SR packet yet, we
1254        * can't synchronize them yet. We can however sync other streams just
1255        * fine. */
1256       if (!ostream->have_sync)
1257         continue;
1258
1259       /* calculate offset to our reference stream, this should always give a
1260        * positive number. */
1261       if (use_rtp)
1262         ts_offset = ostream->rtp_delta - rtp_min;
1263       else
1264         ts_offset = ostream->rt_delta - min;
1265
1266       stream_set_ts_offset (bin, ostream, ts_offset);
1267     }
1268   }
1269   return;
1270 }
1271
1272 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1273   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1274           (b) = gst_rtcp_packet_move_to_next ((packet)))
1275
1276 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1277   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1278           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1279
1280 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1281   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1282           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1283
1284 static void
1285 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1286     GstRtpBinStream * stream)
1287 {
1288   GstRtpBin *bin;
1289   GstRTCPPacket packet;
1290   guint32 ssrc;
1291   guint64 ntptime;
1292   gboolean have_sr, have_sdes;
1293   gboolean more;
1294   guint64 base_rtptime;
1295   guint64 base_time;
1296   guint clock_rate;
1297   guint64 clock_base;
1298   guint64 extrtptime;
1299   GstBuffer *buffer;
1300   GstRTCPBuffer rtcp = { NULL, };
1301
1302   bin = stream->bin;
1303
1304   GST_DEBUG_OBJECT (bin, "sync handler called");
1305
1306   /* get the last relation between the rtp timestamps and the gstreamer
1307    * timestamps. We get this info directly from the jitterbuffer which
1308    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1309    * what the current situation is. */
1310   base_rtptime =
1311       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1312   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1313   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1314   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1315   extrtptime =
1316       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1317   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1318
1319   have_sr = FALSE;
1320   have_sdes = FALSE;
1321
1322   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1323
1324   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1325     /* first packet must be SR or RR or else the validate would have failed */
1326     switch (gst_rtcp_packet_get_type (&packet)) {
1327       case GST_RTCP_TYPE_SR:
1328         /* only parse first. There is only supposed to be one SR in the packet
1329          * but we will deal with malformed packets gracefully */
1330         if (have_sr)
1331           break;
1332         /* get NTP and RTP times */
1333         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1334             NULL, NULL);
1335
1336         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1337         /* ignore SR that is not ours */
1338         if (ssrc != stream->ssrc)
1339           continue;
1340
1341         have_sr = TRUE;
1342         break;
1343       case GST_RTCP_TYPE_SDES:
1344       {
1345         gboolean more_items, more_entries;
1346
1347         /* only deal with first SDES, there is only supposed to be one SDES in
1348          * the RTCP packet but we deal with bad packets gracefully. Also bail
1349          * out if we have not seen an SR item yet. */
1350         if (have_sdes || !have_sr)
1351           break;
1352
1353         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1354           /* skip items that are not about the SSRC of the sender */
1355           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1356             continue;
1357
1358           /* find the CNAME entry */
1359           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1360             GstRTCPSDESType type;
1361             guint8 len;
1362             guint8 *data;
1363
1364             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1365
1366             if (type == GST_RTCP_SDES_CNAME) {
1367               GST_RTP_BIN_LOCK (bin);
1368               /* associate the stream to CNAME */
1369               gst_rtp_bin_associate (bin, stream, len, data,
1370                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1371                   clock_base);
1372               GST_RTP_BIN_UNLOCK (bin);
1373             }
1374           }
1375         }
1376         have_sdes = TRUE;
1377         break;
1378       }
1379       default:
1380         /* we can ignore these packets */
1381         break;
1382     }
1383   }
1384   gst_rtcp_buffer_unmap (&rtcp);
1385 }
1386
1387 /* create a new stream with @ssrc in @session. Must be called with
1388  * RTP_SESSION_LOCK. */
1389 static GstRtpBinStream *
1390 create_stream (GstRtpBinSession * session, guint32 ssrc)
1391 {
1392   GstElement *buffer, *demux = NULL;
1393   GstRtpBinStream *stream;
1394   GstRtpBin *rtpbin;
1395   GstState target;
1396
1397   rtpbin = session->bin;
1398
1399   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1400     goto no_jitterbuffer;
1401
1402   if (!rtpbin->ignore_pt)
1403     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1404       goto no_demux;
1405
1406
1407   stream = g_new0 (GstRtpBinStream, 1);
1408   stream->ssrc = ssrc;
1409   stream->bin = rtpbin;
1410   stream->session = session;
1411   stream->buffer = buffer;
1412   stream->demux = demux;
1413
1414   stream->have_sync = FALSE;
1415   stream->rt_delta = 0;
1416   stream->rtp_delta = 0;
1417   stream->percent = 100;
1418   stream->clock_base = -100 * GST_SECOND;
1419   session->streams = g_slist_prepend (session->streams, stream);
1420
1421   /* provide clock_rate to the jitterbuffer when needed */
1422   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1423       (GCallback) pt_map_requested, session);
1424   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1425       (GCallback) on_npt_stop, stream);
1426
1427   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1428   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1429
1430   /* configure latency and packet lost */
1431   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1432   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1433   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1434
1435   if (!rtpbin->ignore_pt)
1436     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1437   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1438
1439   /* link stuff */
1440   if (demux)
1441     gst_element_link (buffer, demux);
1442
1443   if (rtpbin->buffering) {
1444     guint64 last_out;
1445
1446     GST_INFO_OBJECT (rtpbin,
1447         "bin is buffering, set jitterbuffer as not active");
1448     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1449   }
1450
1451
1452   GST_OBJECT_LOCK (rtpbin);
1453   target = GST_STATE_TARGET (rtpbin);
1454   GST_OBJECT_UNLOCK (rtpbin);
1455
1456   /* from sink to source */
1457   if (demux)
1458     gst_element_set_state (demux, target);
1459
1460   gst_element_set_state (buffer, target);
1461
1462   return stream;
1463
1464   /* ERRORS */
1465 no_jitterbuffer:
1466   {
1467     g_warning ("rtpbin: could not create gstrtpjitterbuffer element");
1468     return NULL;
1469   }
1470 no_demux:
1471   {
1472     gst_object_unref (buffer);
1473     g_warning ("rtpbin: could not create gstrtpptdemux element");
1474     return NULL;
1475   }
1476 }
1477
1478 static void
1479 free_stream (GstRtpBinStream * stream)
1480 {
1481   GstRtpBinSession *session;
1482
1483   session = stream->session;
1484
1485   if (stream->demux) {
1486     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1487     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1488     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1489   }
1490   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1491   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1492   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1493
1494   if (stream->demux)
1495     gst_element_set_locked_state (stream->demux, TRUE);
1496   gst_element_set_locked_state (stream->buffer, TRUE);
1497
1498   if (stream->demux)
1499     gst_element_set_state (stream->demux, GST_STATE_NULL);
1500   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1501
1502   /* now remove this signal, we need this while going to NULL because it to
1503    * do some cleanups */
1504   if (stream->demux)
1505     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1506
1507   gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer);
1508   if (stream->demux)
1509     gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux);
1510
1511   g_free (stream);
1512 }
1513
1514 /* GObject vmethods */
1515 static void gst_rtp_bin_dispose (GObject * object);
1516 static void gst_rtp_bin_finalize (GObject * object);
1517 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1518     const GValue * value, GParamSpec * pspec);
1519 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1520     GValue * value, GParamSpec * pspec);
1521
1522 /* GstElement vmethods */
1523 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1524     GstStateChange transition);
1525 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1526     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1527 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1528 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1529
1530 #define gst_rtp_bin_parent_class parent_class
1531 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1532
1533 static void
1534 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1535 {
1536   GObjectClass *gobject_class;
1537   GstElementClass *gstelement_class;
1538   GstBinClass *gstbin_class;
1539
1540   gobject_class = (GObjectClass *) klass;
1541   gstelement_class = (GstElementClass *) klass;
1542   gstbin_class = (GstBinClass *) klass;
1543
1544   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1545
1546   gobject_class->dispose = gst_rtp_bin_dispose;
1547   gobject_class->finalize = gst_rtp_bin_finalize;
1548   gobject_class->set_property = gst_rtp_bin_set_property;
1549   gobject_class->get_property = gst_rtp_bin_get_property;
1550
1551   g_object_class_install_property (gobject_class, PROP_LATENCY,
1552       g_param_spec_uint ("latency", "Buffer latency in ms",
1553           "Default amount of ms to buffer in the jitterbuffers", 0,
1554           G_MAXUINT, DEFAULT_LATENCY_MS,
1555           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1556
1557   /**
1558    * GstRtpBin::request-pt-map:
1559    * @rtpbin: the object which received the signal
1560    * @session: the session
1561    * @pt: the pt
1562    *
1563    * Request the payload type as #GstCaps for @pt in @session.
1564    */
1565   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1566       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1567       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1568       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
1569       G_TYPE_UINT, G_TYPE_UINT);
1570
1571     /**
1572    * GstRtpBin::payload-type-change:
1573    * @rtpbin: the object which received the signal
1574    * @session: the session
1575    * @pt: the pt
1576    *
1577    * Signal that the current payload type changed to @pt in @session.
1578    *
1579    * Since: 0.10.17
1580    */
1581   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1582       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1583       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1584       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1585       G_TYPE_UINT, G_TYPE_UINT);
1586
1587   /**
1588    * GstRtpBin::clear-pt-map:
1589    * @rtpbin: the object which received the signal
1590    *
1591    * Clear all previously cached pt-mapping obtained with
1592    * #GstRtpBin::request-pt-map.
1593    */
1594   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1595       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1596       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1597           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1598       0, G_TYPE_NONE);
1599
1600   /**
1601    * GstRtpBin::reset-sync:
1602    * @rtpbin: the object which received the signal
1603    *
1604    * Reset all currently configured lip-sync parameters and require new SR
1605    * packets for all streams before lip-sync is attempted again.
1606    */
1607   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1608       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1609       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1610           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1611       0, G_TYPE_NONE);
1612
1613   /**
1614    * GstRtpBin::get-internal-session:
1615    * @rtpbin: the object which received the signal
1616    * @id: the session id
1617    *
1618    * Request the internal RTPSession object as #GObject in session @id.
1619    */
1620   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1621       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1622       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1623           get_internal_session), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
1624       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1625
1626   /**
1627    * GstRtpBin::on-new-ssrc:
1628    * @rtpbin: the object which received the signal
1629    * @session: the session
1630    * @ssrc: the SSRC
1631    *
1632    * Notify of a new SSRC that entered @session.
1633    */
1634   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1635       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1636       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1637       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1638       G_TYPE_UINT, G_TYPE_UINT);
1639   /**
1640    * GstRtpBin::on-ssrc-collision:
1641    * @rtpbin: the object which received the signal
1642    * @session: the session
1643    * @ssrc: the SSRC
1644    *
1645    * Notify when we have an SSRC collision
1646    */
1647   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1648       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1649       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1650       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1651       G_TYPE_UINT, G_TYPE_UINT);
1652   /**
1653    * GstRtpBin::on-ssrc-validated:
1654    * @rtpbin: the object which received the signal
1655    * @session: the session
1656    * @ssrc: the SSRC
1657    *
1658    * Notify of a new SSRC that became validated.
1659    */
1660   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1661       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1662       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1663       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1664       G_TYPE_UINT, G_TYPE_UINT);
1665   /**
1666    * GstRtpBin::on-ssrc-active:
1667    * @rtpbin: the object which received the signal
1668    * @session: the session
1669    * @ssrc: the SSRC
1670    *
1671    * Notify of a SSRC that is active, i.e., sending RTCP.
1672    */
1673   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1674       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1675       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1676       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1677       G_TYPE_UINT, G_TYPE_UINT);
1678   /**
1679    * GstRtpBin::on-ssrc-sdes:
1680    * @rtpbin: the object which received the signal
1681    * @session: the session
1682    * @ssrc: the SSRC
1683    *
1684    * Notify of a SSRC that is active, i.e., sending RTCP.
1685    */
1686   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1687       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1688       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1689       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1690       G_TYPE_UINT, G_TYPE_UINT);
1691
1692   /**
1693    * GstRtpBin::on-bye-ssrc:
1694    * @rtpbin: the object which received the signal
1695    * @session: the session
1696    * @ssrc: the SSRC
1697    *
1698    * Notify of an SSRC that became inactive because of a BYE packet.
1699    */
1700   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1701       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1702       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1703       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1704       G_TYPE_UINT, G_TYPE_UINT);
1705   /**
1706    * GstRtpBin::on-bye-timeout:
1707    * @rtpbin: the object which received the signal
1708    * @session: the session
1709    * @ssrc: the SSRC
1710    *
1711    * Notify of an SSRC that has timed out because of BYE
1712    */
1713   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1714       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1715       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1716       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1717       G_TYPE_UINT, G_TYPE_UINT);
1718   /**
1719    * GstRtpBin::on-timeout:
1720    * @rtpbin: the object which received the signal
1721    * @session: the session
1722    * @ssrc: the SSRC
1723    *
1724    * Notify of an SSRC that has timed out
1725    */
1726   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1727       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1728       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1729       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1730       G_TYPE_UINT, G_TYPE_UINT);
1731   /**
1732    * GstRtpBin::on-sender-timeout:
1733    * @rtpbin: the object which received the signal
1734    * @session: the session
1735    * @ssrc: the SSRC
1736    *
1737    * Notify of a sender SSRC that has timed out and became a receiver
1738    */
1739   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1740       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1741       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1742       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1743       G_TYPE_UINT, G_TYPE_UINT);
1744
1745   /**
1746    * GstRtpBin::on-npt-stop:
1747    * @rtpbin: the object which received the signal
1748    * @session: the session
1749    * @ssrc: the SSRC
1750    *
1751    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1752    */
1753   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1754       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1755       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1756       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1757       G_TYPE_UINT, G_TYPE_UINT);
1758
1759   g_object_class_install_property (gobject_class, PROP_SDES,
1760       g_param_spec_boxed ("sdes", "SDES",
1761           "The SDES items of this session",
1762           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1763
1764   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1765       g_param_spec_boolean ("do-lost", "Do Lost",
1766           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1767           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1768
1769   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
1770       g_param_spec_boolean ("autoremove", "Auto Remove",
1771           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
1772           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1773
1774   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
1775       g_param_spec_boolean ("ignore-pt", "Ignore PT",
1776           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
1777           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1778
1779   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
1780       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
1781           "Use the pipeline clock to set the NTP time in the RTCP SR messages",
1782           DEFAULT_USE_PIPELINE_CLOCK,
1783           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1784   /**
1785    * GstRtpBin::buffer-mode:
1786    *
1787    * Control the buffering and timestamping mode used by the jitterbuffer.
1788    *
1789    * Since: 0.10.17
1790    */
1791   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
1792       g_param_spec_enum ("buffer-mode", "Buffer Mode",
1793           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
1794           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1795   /**
1796    * GstRtpBin::ntp-sync:
1797    *
1798    * Synchronize received streams to the NTP clock. When the NTP clock is shared
1799    * between the receivers and the senders (such as when using ntpd) this option
1800    * can be used to synchronize receivers on multiple machines.
1801    *
1802    * Since: 0.10.21
1803    */
1804   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
1805       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
1806           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
1807           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1808
1809   /**
1810    * GstRtpBin::rtcp-sync:
1811    *
1812    * If not synchronizing (directly) to the NTP clock, determines how to sync
1813    * the various streams.
1814    *
1815    * Since: 0.10.31
1816    */
1817   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
1818       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
1819           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
1820           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1821
1822   /**
1823    * GstRtpBin::rtcp-sync-interval:
1824    *
1825    * Determines how often to sync streams using RTCP data.
1826    *
1827    * Since: 0.10.31
1828    */
1829   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
1830       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
1831           "RTCP SR interval synchronization (ms) (0 = always)",
1832           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
1833           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1834
1835   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1836   gstelement_class->request_new_pad =
1837       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1838   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1839
1840   /* sink pads */
1841   gst_element_class_add_pad_template (gstelement_class,
1842       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
1843   gst_element_class_add_pad_template (gstelement_class,
1844       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
1845   gst_element_class_add_pad_template (gstelement_class,
1846       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
1847
1848   /* src pads */
1849   gst_element_class_add_pad_template (gstelement_class,
1850       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
1851   gst_element_class_add_pad_template (gstelement_class,
1852       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
1853   gst_element_class_add_pad_template (gstelement_class,
1854       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
1855
1856   gst_element_class_set_details_simple (gstelement_class, "RTP Bin",
1857       "Filter/Network/RTP",
1858       "Real-Time Transport Protocol bin",
1859       "Wim Taymans <wim.taymans@gmail.com>");
1860
1861   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
1862
1863   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1864   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
1865   klass->get_internal_session =
1866       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
1867
1868   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1869 }
1870
1871 static void
1872 gst_rtp_bin_init (GstRtpBin * rtpbin)
1873 {
1874   gchar *cname;
1875
1876   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1877   g_mutex_init (&rtpbin->priv->bin_lock);
1878   g_mutex_init (&rtpbin->priv->dyn_lock);
1879
1880   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
1881   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
1882   rtpbin->do_lost = DEFAULT_DO_LOST;
1883   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
1884   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
1885   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
1886   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
1887   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
1888   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
1889   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
1890
1891   /* some default SDES entries */
1892   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
1893   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
1894       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
1895   g_free (cname);
1896 }
1897
1898 static void
1899 gst_rtp_bin_dispose (GObject * object)
1900 {
1901   GstRtpBin *rtpbin;
1902
1903   rtpbin = GST_RTP_BIN (object);
1904
1905   GST_DEBUG_OBJECT (object, "freeing sessions");
1906   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
1907   g_slist_free (rtpbin->sessions);
1908   rtpbin->sessions = NULL;
1909   GST_DEBUG_OBJECT (object, "freeing clients");
1910   g_slist_foreach (rtpbin->clients, (GFunc) free_client, rtpbin);
1911   g_slist_free (rtpbin->clients);
1912   rtpbin->clients = NULL;
1913
1914   G_OBJECT_CLASS (parent_class)->dispose (object);
1915 }
1916
1917 static void
1918 gst_rtp_bin_finalize (GObject * object)
1919 {
1920   GstRtpBin *rtpbin;
1921
1922   rtpbin = GST_RTP_BIN (object);
1923
1924   if (rtpbin->sdes)
1925     gst_structure_free (rtpbin->sdes);
1926
1927   g_mutex_clear (&rtpbin->priv->bin_lock);
1928   g_mutex_clear (&rtpbin->priv->dyn_lock);
1929
1930   G_OBJECT_CLASS (parent_class)->finalize (object);
1931 }
1932
1933
1934 static void
1935 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
1936 {
1937   GSList *item;
1938
1939   if (sdes == NULL)
1940     return;
1941
1942   GST_RTP_BIN_LOCK (bin);
1943
1944   GST_OBJECT_LOCK (bin);
1945   if (bin->sdes)
1946     gst_structure_free (bin->sdes);
1947   bin->sdes = gst_structure_copy (sdes);
1948   GST_OBJECT_UNLOCK (bin);
1949
1950   /* store in all sessions */
1951   for (item = bin->sessions; item; item = g_slist_next (item)) {
1952     GstRtpBinSession *session = item->data;
1953     g_object_set (session->session, "sdes", sdes, NULL);
1954   }
1955
1956   GST_RTP_BIN_UNLOCK (bin);
1957 }
1958
1959 static GstStructure *
1960 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
1961 {
1962   GstStructure *result;
1963
1964   GST_OBJECT_LOCK (bin);
1965   result = gst_structure_copy (bin->sdes);
1966   GST_OBJECT_UNLOCK (bin);
1967
1968   return result;
1969 }
1970
1971 static void
1972 gst_rtp_bin_set_property (GObject * object, guint prop_id,
1973     const GValue * value, GParamSpec * pspec)
1974 {
1975   GstRtpBin *rtpbin;
1976
1977   rtpbin = GST_RTP_BIN (object);
1978
1979   switch (prop_id) {
1980     case PROP_LATENCY:
1981       GST_RTP_BIN_LOCK (rtpbin);
1982       rtpbin->latency_ms = g_value_get_uint (value);
1983       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
1984       GST_RTP_BIN_UNLOCK (rtpbin);
1985       /* propagate the property down to the jitterbuffer */
1986       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
1987       break;
1988     case PROP_SDES:
1989       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
1990       break;
1991     case PROP_DO_LOST:
1992       GST_RTP_BIN_LOCK (rtpbin);
1993       rtpbin->do_lost = g_value_get_boolean (value);
1994       GST_RTP_BIN_UNLOCK (rtpbin);
1995       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
1996       break;
1997     case PROP_NTP_SYNC:
1998       rtpbin->ntp_sync = g_value_get_boolean (value);
1999       break;
2000     case PROP_RTCP_SYNC:
2001       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2002       break;
2003     case PROP_RTCP_SYNC_INTERVAL:
2004       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2005       break;
2006     case PROP_IGNORE_PT:
2007       rtpbin->ignore_pt = g_value_get_boolean (value);
2008       break;
2009     case PROP_AUTOREMOVE:
2010       rtpbin->priv->autoremove = g_value_get_boolean (value);
2011       break;
2012     case PROP_USE_PIPELINE_CLOCK:
2013     {
2014       GSList *sessions;
2015       GST_RTP_BIN_LOCK (rtpbin);
2016       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2017       for (sessions = rtpbin->sessions; sessions;
2018           sessions = g_slist_next (sessions)) {
2019         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2020
2021         g_object_set (G_OBJECT (session->session),
2022             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2023       }
2024       GST_RTP_BIN_UNLOCK (rtpbin);
2025     }
2026       break;
2027     case PROP_BUFFER_MODE:
2028       GST_RTP_BIN_LOCK (rtpbin);
2029       rtpbin->buffer_mode = g_value_get_enum (value);
2030       GST_RTP_BIN_UNLOCK (rtpbin);
2031       /* propagate the property down to the jitterbuffer */
2032       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2033       break;
2034     default:
2035       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2036       break;
2037   }
2038 }
2039
2040 static void
2041 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2042     GValue * value, GParamSpec * pspec)
2043 {
2044   GstRtpBin *rtpbin;
2045
2046   rtpbin = GST_RTP_BIN (object);
2047
2048   switch (prop_id) {
2049     case PROP_LATENCY:
2050       GST_RTP_BIN_LOCK (rtpbin);
2051       g_value_set_uint (value, rtpbin->latency_ms);
2052       GST_RTP_BIN_UNLOCK (rtpbin);
2053       break;
2054     case PROP_SDES:
2055       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2056       break;
2057     case PROP_DO_LOST:
2058       GST_RTP_BIN_LOCK (rtpbin);
2059       g_value_set_boolean (value, rtpbin->do_lost);
2060       GST_RTP_BIN_UNLOCK (rtpbin);
2061       break;
2062     case PROP_IGNORE_PT:
2063       g_value_set_boolean (value, rtpbin->ignore_pt);
2064       break;
2065     case PROP_NTP_SYNC:
2066       g_value_set_boolean (value, rtpbin->ntp_sync);
2067       break;
2068     case PROP_RTCP_SYNC:
2069       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2070       break;
2071     case PROP_RTCP_SYNC_INTERVAL:
2072       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2073       break;
2074     case PROP_AUTOREMOVE:
2075       g_value_set_boolean (value, rtpbin->priv->autoremove);
2076       break;
2077     case PROP_BUFFER_MODE:
2078       g_value_set_enum (value, rtpbin->buffer_mode);
2079       break;
2080     case PROP_USE_PIPELINE_CLOCK:
2081       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2082       break;
2083     default:
2084       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2085       break;
2086   }
2087 }
2088
2089 static void
2090 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2091 {
2092   GstRtpBin *rtpbin;
2093
2094   rtpbin = GST_RTP_BIN (bin);
2095
2096   switch (GST_MESSAGE_TYPE (message)) {
2097     case GST_MESSAGE_ELEMENT:
2098     {
2099       const GstStructure *s = gst_message_get_structure (message);
2100
2101       /* we change the structure name and add the session ID to it */
2102       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2103         GstRtpBinSession *sess;
2104
2105         /* find the session we set it as object data */
2106         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2107             "GstRTPBin.session");
2108
2109         if (G_LIKELY (sess)) {
2110           message = gst_message_make_writable (message);
2111           s = gst_message_get_structure (message);
2112           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2113               sess->id, NULL);
2114         }
2115       }
2116       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2117       break;
2118     }
2119     case GST_MESSAGE_BUFFERING:
2120     {
2121       gint percent;
2122       gint min_percent = 100;
2123       GSList *sessions, *streams;
2124       GstRtpBinStream *stream;
2125       gboolean change = FALSE, active = FALSE;
2126       GstClockTime min_out_time;
2127       GstBufferingMode mode;
2128       gint avg_in, avg_out;
2129       gint64 buffering_left;
2130
2131       gst_message_parse_buffering (message, &percent);
2132       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2133           &buffering_left);
2134
2135       stream =
2136           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2137           "GstRTPBin.stream");
2138
2139       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2140
2141       /* get the stream */
2142       if (G_LIKELY (stream)) {
2143         GST_RTP_BIN_LOCK (rtpbin);
2144         /* fill in the percent */
2145         stream->percent = percent;
2146
2147         /* calculate the min value for all streams */
2148         for (sessions = rtpbin->sessions; sessions;
2149             sessions = g_slist_next (sessions)) {
2150           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2151
2152           GST_RTP_SESSION_LOCK (session);
2153           if (session->streams) {
2154             for (streams = session->streams; streams;
2155                 streams = g_slist_next (streams)) {
2156               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2157
2158               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2159                   stream->percent);
2160
2161               /* find min percent */
2162               if (min_percent > stream->percent)
2163                 min_percent = stream->percent;
2164             }
2165           } else {
2166             GST_INFO_OBJECT (bin,
2167                 "session has no streams, setting min_percent to 0");
2168             min_percent = 0;
2169           }
2170           GST_RTP_SESSION_UNLOCK (session);
2171         }
2172         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2173
2174         if (rtpbin->buffering) {
2175           if (min_percent == 100) {
2176             rtpbin->buffering = FALSE;
2177             active = TRUE;
2178             change = TRUE;
2179           }
2180         } else {
2181           if (min_percent < 100) {
2182             /* pause the streams */
2183             rtpbin->buffering = TRUE;
2184             active = FALSE;
2185             change = TRUE;
2186           }
2187         }
2188         GST_RTP_BIN_UNLOCK (rtpbin);
2189
2190         gst_message_unref (message);
2191
2192         /* make a new buffering message with the min value */
2193         message =
2194             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2195         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2196             buffering_left);
2197
2198         if (G_UNLIKELY (change)) {
2199           GstClock *clock;
2200           guint64 running_time = 0;
2201           guint64 offset = 0;
2202
2203           /* figure out the running time when we have a clock */
2204           if (G_LIKELY ((clock =
2205                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2206             guint64 now, base_time;
2207
2208             now = gst_clock_get_time (clock);
2209             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2210             running_time = now - base_time;
2211             gst_object_unref (clock);
2212           }
2213           GST_DEBUG_OBJECT (bin,
2214               "running time now %" GST_TIME_FORMAT,
2215               GST_TIME_ARGS (running_time));
2216
2217           GST_RTP_BIN_LOCK (rtpbin);
2218
2219           /* when we reactivate, calculate the offsets so that all streams have
2220            * an output time that is at least as big as the running_time */
2221           offset = 0;
2222           if (active) {
2223             if (running_time > rtpbin->buffer_start) {
2224               offset = running_time - rtpbin->buffer_start;
2225               if (offset >= rtpbin->latency_ns)
2226                 offset -= rtpbin->latency_ns;
2227               else
2228                 offset = 0;
2229             }
2230           }
2231
2232           /* pause all streams */
2233           min_out_time = -1;
2234           for (sessions = rtpbin->sessions; sessions;
2235               sessions = g_slist_next (sessions)) {
2236             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2237
2238             GST_RTP_SESSION_LOCK (session);
2239             for (streams = session->streams; streams;
2240                 streams = g_slist_next (streams)) {
2241               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2242               GstElement *element = stream->buffer;
2243               guint64 last_out;
2244
2245               g_signal_emit_by_name (element, "set-active", active, offset,
2246                   &last_out);
2247
2248               if (!active) {
2249                 g_object_get (element, "percent", &stream->percent, NULL);
2250
2251                 if (last_out == -1)
2252                   last_out = 0;
2253                 if (min_out_time == -1 || last_out < min_out_time)
2254                   min_out_time = last_out;
2255               }
2256
2257               GST_DEBUG_OBJECT (bin,
2258                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2259                   GST_TIME_FORMAT ", percent %d", element, active,
2260                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2261                   stream->percent);
2262             }
2263             GST_RTP_SESSION_UNLOCK (session);
2264           }
2265           GST_DEBUG_OBJECT (bin,
2266               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2267
2268           /* the buffer_start is the min out time of all paused jitterbuffers */
2269           if (!active)
2270             rtpbin->buffer_start = min_out_time;
2271
2272           GST_RTP_BIN_UNLOCK (rtpbin);
2273         }
2274       }
2275       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2276       break;
2277     }
2278     default:
2279     {
2280       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2281       break;
2282     }
2283   }
2284 }
2285
2286 static GstStateChangeReturn
2287 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2288 {
2289   GstStateChangeReturn res;
2290   GstRtpBin *rtpbin;
2291   GstRtpBinPrivate *priv;
2292
2293   rtpbin = GST_RTP_BIN (element);
2294   priv = rtpbin->priv;
2295
2296   switch (transition) {
2297     case GST_STATE_CHANGE_NULL_TO_READY:
2298       break;
2299     case GST_STATE_CHANGE_READY_TO_PAUSED:
2300       priv->last_unix = 0;
2301       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2302       g_atomic_int_set (&priv->shutdown, 0);
2303       break;
2304     case GST_STATE_CHANGE_PAUSED_TO_READY:
2305       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2306       g_atomic_int_set (&priv->shutdown, 1);
2307       /* wait for all callbacks to end by taking the lock. No new callbacks will
2308        * be able to happen as we set the shutdown flag. */
2309       GST_RTP_BIN_DYN_LOCK (rtpbin);
2310       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2311       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2312       break;
2313     default:
2314       break;
2315   }
2316
2317   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2318
2319   switch (transition) {
2320     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2321       break;
2322     case GST_STATE_CHANGE_PAUSED_TO_READY:
2323       break;
2324     case GST_STATE_CHANGE_READY_TO_NULL:
2325       break;
2326     default:
2327       break;
2328   }
2329   return res;
2330 }
2331
2332 /* a new pad (SSRC) was created in @session. This signal is emited from the
2333  * payload demuxer. */
2334 static void
2335 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2336     GstRtpBinStream * stream)
2337 {
2338   GstRtpBin *rtpbin;
2339   GstElementClass *klass;
2340   GstPadTemplate *templ;
2341   gchar *padname;
2342   GstPad *gpad;
2343
2344   rtpbin = stream->bin;
2345
2346   GST_DEBUG ("new payload pad %d", pt);
2347
2348   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2349
2350   /* ghost the pad to the parent */
2351   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2352   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2353   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2354       stream->session->id, stream->ssrc, pt);
2355   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2356   g_free (padname);
2357   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2358
2359   gst_pad_set_active (gpad, TRUE);
2360   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2361
2362   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2363
2364   return;
2365
2366 shutdown:
2367   {
2368     GST_DEBUG ("ignoring, we are shutting down");
2369     return;
2370   }
2371 }
2372
2373 static void
2374 payload_pad_removed (GstElement * element, GstPad * pad,
2375     GstRtpBinStream * stream)
2376 {
2377   GstRtpBin *rtpbin;
2378   GstPad *gpad;
2379
2380   rtpbin = stream->bin;
2381
2382   GST_DEBUG ("payload pad removed");
2383
2384   GST_RTP_BIN_DYN_LOCK (rtpbin);
2385   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2386     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2387
2388     gst_pad_set_active (gpad, FALSE);
2389     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2390   }
2391   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2392 }
2393
2394 static GstCaps *
2395 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2396 {
2397   GstRtpBin *rtpbin;
2398   GstCaps *caps;
2399
2400   rtpbin = session->bin;
2401
2402   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2403       session->id);
2404
2405   caps = get_pt_map (session, pt);
2406   if (!caps)
2407     goto no_caps;
2408
2409   return caps;
2410
2411   /* ERRORS */
2412 no_caps:
2413   {
2414     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2415     return NULL;
2416   }
2417 }
2418
2419 static void
2420 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2421 {
2422   GST_DEBUG_OBJECT (session->bin,
2423       "emiting signal for pt type changed to %d in session %d", pt,
2424       session->id);
2425
2426   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2427       0, session->id, pt);
2428 }
2429
2430 /* emited when caps changed for the session */
2431 static void
2432 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2433 {
2434   GstRtpBin *bin;
2435   GstCaps *caps;
2436   gint payload;
2437   const GstStructure *s;
2438
2439   bin = session->bin;
2440
2441   g_object_get (pad, "caps", &caps, NULL);
2442
2443   if (caps == NULL)
2444     return;
2445
2446   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2447
2448   s = gst_caps_get_structure (caps, 0);
2449
2450   /* get payload, finish when it's not there */
2451   if (!gst_structure_get_int (s, "payload", &payload))
2452     return;
2453
2454   GST_RTP_SESSION_LOCK (session);
2455   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2456   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2457   GST_RTP_SESSION_UNLOCK (session);
2458 }
2459
2460 /* a new pad (SSRC) was created in @session */
2461 static void
2462 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2463     GstRtpBinSession * session)
2464 {
2465   GstRtpBin *rtpbin;
2466   GstRtpBinStream *stream;
2467   GstPad *sinkpad, *srcpad;
2468   gchar *padname;
2469
2470   rtpbin = session->bin;
2471
2472   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2473       GST_DEBUG_PAD_NAME (pad));
2474
2475   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2476
2477   GST_RTP_SESSION_LOCK (session);
2478
2479   /* create new stream */
2480   stream = create_stream (session, ssrc);
2481   if (!stream)
2482     goto no_stream;
2483
2484   /* get pad and link */
2485   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2486   padname = g_strdup_printf ("src_%u", ssrc);
2487   srcpad = gst_element_get_static_pad (element, padname);
2488   g_free (padname);
2489   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2490   gst_pad_link (srcpad, sinkpad);
2491   gst_object_unref (sinkpad);
2492   gst_object_unref (srcpad);
2493
2494   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2495   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2496   srcpad = gst_element_get_static_pad (element, padname);
2497   g_free (padname);
2498   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2499   gst_pad_link (srcpad, sinkpad);
2500   gst_object_unref (sinkpad);
2501   gst_object_unref (srcpad);
2502
2503   /* connect to the RTCP sync signal from the jitterbuffer */
2504   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2505   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2506       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2507
2508   if (stream->demux) {
2509     /* connect to the new-pad signal of the payload demuxer, this will expose the
2510      * new pad by ghosting it. */
2511     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2512         "new-payload-type", (GCallback) new_payload_found, stream);
2513     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2514         "pad-removed", (GCallback) payload_pad_removed, stream);
2515
2516     /* connect to the request-pt-map signal. This signal will be emited by the
2517      * demuxer so that it can apply a proper caps on the buffers for the
2518      * depayloaders. */
2519     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2520         "request-pt-map", (GCallback) pt_map_requested, session);
2521     /* connect to the  signal so it can be forwarded. */
2522     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2523         "payload-type-change", (GCallback) payload_type_change, session);
2524   } else {
2525     /* add gstrtpjitterbuffer src pad to pads */
2526     GstElementClass *klass;
2527     GstPadTemplate *templ;
2528     gchar *padname;
2529     GstPad *gpad, *pad;
2530
2531     pad = gst_element_get_static_pad (stream->buffer, "src");
2532
2533     /* ghost the pad to the parent */
2534     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2535     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2536     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2537         stream->session->id, stream->ssrc, 255);
2538     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2539     g_free (padname);
2540
2541     gst_pad_set_active (gpad, TRUE);
2542     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2543
2544     gst_object_unref (pad);
2545   }
2546
2547   GST_RTP_SESSION_UNLOCK (session);
2548   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2549
2550   return;
2551
2552   /* ERRORS */
2553 shutdown:
2554   {
2555     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2556     return;
2557   }
2558 no_stream:
2559   {
2560     GST_RTP_SESSION_UNLOCK (session);
2561     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2562     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2563     return;
2564   }
2565 }
2566
2567 /* Create a pad for receiving RTP for the session in @name. Must be called with
2568  * RTP_BIN_LOCK.
2569  */
2570 static GstPad *
2571 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2572 {
2573   GstPad *sinkdpad;
2574   guint sessid;
2575   GstRtpBinSession *session;
2576   GstPadLinkReturn lres;
2577
2578   /* first get the session number */
2579   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2580     goto no_name;
2581
2582   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2583
2584   /* get or create session */
2585   session = find_session_by_id (rtpbin, sessid);
2586   if (!session) {
2587     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2588     /* create session now */
2589     session = create_session (rtpbin, sessid);
2590     if (session == NULL)
2591       goto create_error;
2592   }
2593
2594   /* check if pad was requested */
2595   if (session->recv_rtp_sink_ghost != NULL)
2596     return session->recv_rtp_sink_ghost;
2597
2598   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2599   /* get recv_rtp pad and store */
2600   session->recv_rtp_sink =
2601       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2602   if (session->recv_rtp_sink == NULL)
2603     goto pad_failed;
2604
2605   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2606       (GCallback) caps_changed, session);
2607
2608   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2609   /* get srcpad, link to SSRCDemux */
2610   session->recv_rtp_src =
2611       gst_element_get_static_pad (session->session, "recv_rtp_src");
2612   if (session->recv_rtp_src == NULL)
2613     goto pad_failed;
2614
2615   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2616   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2617   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2618   lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
2619   gst_object_unref (sinkdpad);
2620   if (lres != GST_PAD_LINK_OK)
2621     goto link_failed;
2622
2623   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2624   session->demux_newpad_sig = g_signal_connect (session->demux,
2625       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2626   session->demux_padremoved_sig = g_signal_connect (session->demux,
2627       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2628
2629   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2630   session->recv_rtp_sink_ghost =
2631       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
2632   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2633   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2634
2635   return session->recv_rtp_sink_ghost;
2636
2637   /* ERRORS */
2638 no_name:
2639   {
2640     g_warning ("rtpbin: invalid name given");
2641     return NULL;
2642   }
2643 create_error:
2644   {
2645     /* create_session already warned */
2646     return NULL;
2647   }
2648 pad_failed:
2649   {
2650     g_warning ("rtpbin: failed to get session pad");
2651     return NULL;
2652   }
2653 link_failed:
2654   {
2655     g_warning ("rtpbin: failed to link pads");
2656     return NULL;
2657   }
2658 }
2659
2660 static void
2661 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2662 {
2663   if (session->demux_newpad_sig) {
2664     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
2665     session->demux_newpad_sig = 0;
2666   }
2667   if (session->demux_padremoved_sig) {
2668     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
2669     session->demux_padremoved_sig = 0;
2670   }
2671   if (session->recv_rtp_src) {
2672     gst_object_unref (session->recv_rtp_src);
2673     session->recv_rtp_src = NULL;
2674   }
2675   if (session->recv_rtp_sink) {
2676     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
2677     gst_object_unref (session->recv_rtp_sink);
2678     session->recv_rtp_sink = NULL;
2679   }
2680   if (session->recv_rtp_sink_ghost) {
2681     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
2682     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2683         session->recv_rtp_sink_ghost);
2684     session->recv_rtp_sink_ghost = NULL;
2685   }
2686 }
2687
2688 /* Create a pad for receiving RTCP for the session in @name. Must be called with
2689  * RTP_BIN_LOCK.
2690  */
2691 static GstPad *
2692 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
2693     const gchar * name)
2694 {
2695   guint sessid;
2696   GstRtpBinSession *session;
2697   GstPad *sinkdpad;
2698   GstPadLinkReturn lres;
2699
2700   /* first get the session number */
2701   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
2702     goto no_name;
2703
2704   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2705
2706   /* get or create the session */
2707   session = find_session_by_id (rtpbin, sessid);
2708   if (!session) {
2709     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2710     /* create session now */
2711     session = create_session (rtpbin, sessid);
2712     if (session == NULL)
2713       goto create_error;
2714   }
2715
2716   /* check if pad was requested */
2717   if (session->recv_rtcp_sink_ghost != NULL)
2718     return session->recv_rtcp_sink_ghost;
2719
2720   /* get recv_rtp pad and store */
2721   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
2722   session->recv_rtcp_sink =
2723       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
2724   if (session->recv_rtcp_sink == NULL)
2725     goto pad_failed;
2726
2727   /* get srcpad, link to SSRCDemux */
2728   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
2729   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
2730   if (session->sync_src == NULL)
2731     goto pad_failed;
2732
2733   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
2734   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
2735   lres = gst_pad_link (session->sync_src, sinkdpad);
2736   gst_object_unref (sinkdpad);
2737   if (lres != GST_PAD_LINK_OK)
2738     goto link_failed;
2739
2740   session->recv_rtcp_sink_ghost =
2741       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
2742   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
2743   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
2744       session->recv_rtcp_sink_ghost);
2745
2746   return session->recv_rtcp_sink_ghost;
2747
2748   /* ERRORS */
2749 no_name:
2750   {
2751     g_warning ("rtpbin: invalid name given");
2752     return NULL;
2753   }
2754 create_error:
2755   {
2756     /* create_session already warned */
2757     return NULL;
2758   }
2759 pad_failed:
2760   {
2761     g_warning ("rtpbin: failed to get session pad");
2762     return NULL;
2763   }
2764 link_failed:
2765   {
2766     g_warning ("rtpbin: failed to link pads");
2767     return NULL;
2768   }
2769 }
2770
2771 static void
2772 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2773 {
2774   if (session->recv_rtcp_sink_ghost) {
2775     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
2776     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2777         session->recv_rtcp_sink_ghost);
2778     session->recv_rtcp_sink_ghost = NULL;
2779   }
2780   if (session->sync_src) {
2781     /* releasing the request pad should also unref the sync pad */
2782     gst_object_unref (session->sync_src);
2783     session->sync_src = NULL;
2784   }
2785   if (session->recv_rtcp_sink) {
2786     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
2787     gst_object_unref (session->recv_rtcp_sink);
2788     session->recv_rtcp_sink = NULL;
2789   }
2790 }
2791
2792 /* Create a pad for sending RTP for the session in @name. Must be called with
2793  * RTP_BIN_LOCK.
2794  */
2795 static GstPad *
2796 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2797 {
2798   gchar *gname;
2799   guint sessid;
2800   GstRtpBinSession *session;
2801   GstElementClass *klass;
2802
2803   /* first get the session number */
2804   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
2805     goto no_name;
2806
2807   /* get or create session */
2808   session = find_session_by_id (rtpbin, sessid);
2809   if (!session) {
2810     /* create session now */
2811     session = create_session (rtpbin, sessid);
2812     if (session == NULL)
2813       goto create_error;
2814   }
2815
2816   /* check if pad was requested */
2817   if (session->send_rtp_sink_ghost != NULL)
2818     return session->send_rtp_sink_ghost;
2819
2820   /* get send_rtp pad and store */
2821   session->send_rtp_sink =
2822       gst_element_get_request_pad (session->session, "send_rtp_sink");
2823   if (session->send_rtp_sink == NULL)
2824     goto pad_failed;
2825
2826   session->send_rtp_sink_ghost =
2827       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
2828   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
2829   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
2830
2831   /* get srcpad */
2832   session->send_rtp_src =
2833       gst_element_get_static_pad (session->session, "send_rtp_src");
2834   if (session->send_rtp_src == NULL)
2835     goto no_srcpad;
2836
2837   /* ghost the new source pad */
2838   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2839   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
2840   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
2841   session->send_rtp_src_ghost =
2842       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
2843   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
2844   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
2845   g_free (gname);
2846
2847   return session->send_rtp_sink_ghost;
2848
2849   /* ERRORS */
2850 no_name:
2851   {
2852     g_warning ("rtpbin: invalid name given");
2853     return NULL;
2854   }
2855 create_error:
2856   {
2857     /* create_session already warned */
2858     return NULL;
2859   }
2860 pad_failed:
2861   {
2862     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
2863     return NULL;
2864   }
2865 no_srcpad:
2866   {
2867     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
2868     return NULL;
2869   }
2870 }
2871
2872 static void
2873 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2874 {
2875   if (session->send_rtp_src_ghost) {
2876     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
2877     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2878         session->send_rtp_src_ghost);
2879     session->send_rtp_src_ghost = NULL;
2880   }
2881   if (session->send_rtp_src) {
2882     gst_object_unref (session->send_rtp_src);
2883     session->send_rtp_src = NULL;
2884   }
2885   if (session->send_rtp_sink) {
2886     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
2887         session->send_rtp_sink);
2888     gst_object_unref (session->send_rtp_sink);
2889     session->send_rtp_sink = NULL;
2890   }
2891   if (session->send_rtp_sink_ghost) {
2892     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
2893     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2894         session->send_rtp_sink_ghost);
2895     session->send_rtp_sink_ghost = NULL;
2896   }
2897 }
2898
2899 /* Create a pad for sending RTCP for the session in @name. Must be called with
2900  * RTP_BIN_LOCK.
2901  */
2902 static GstPad *
2903 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2904 {
2905   guint sessid;
2906   GstRtpBinSession *session;
2907
2908   /* first get the session number */
2909   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
2910     goto no_name;
2911
2912   /* get or create session */
2913   session = find_session_by_id (rtpbin, sessid);
2914   if (!session)
2915     goto no_session;
2916
2917   /* check if pad was requested */
2918   if (session->send_rtcp_src_ghost != NULL)
2919     return session->send_rtcp_src_ghost;
2920
2921   /* get rtcp_src pad and store */
2922   session->send_rtcp_src =
2923       gst_element_get_request_pad (session->session, "send_rtcp_src");
2924   if (session->send_rtcp_src == NULL)
2925     goto pad_failed;
2926
2927   session->send_rtcp_src_ghost =
2928       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
2929   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
2930   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
2931
2932   return session->send_rtcp_src_ghost;
2933
2934   /* ERRORS */
2935 no_name:
2936   {
2937     g_warning ("rtpbin: invalid name given");
2938     return NULL;
2939   }
2940 no_session:
2941   {
2942     g_warning ("rtpbin: session with id %d does not exist", sessid);
2943     return NULL;
2944   }
2945 pad_failed:
2946   {
2947     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
2948     return NULL;
2949   }
2950 }
2951
2952 static void
2953 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2954 {
2955   if (session->send_rtcp_src_ghost) {
2956     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
2957     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2958         session->send_rtcp_src_ghost);
2959     session->send_rtcp_src_ghost = NULL;
2960   }
2961   if (session->send_rtcp_src) {
2962     gst_element_release_request_pad (session->session, session->send_rtcp_src);
2963     gst_object_unref (session->send_rtcp_src);
2964     session->send_rtcp_src = NULL;
2965   }
2966 }
2967
2968 /* If the requested name is NULL we should create a name with
2969  * the session number assuming we want the lowest posible session
2970  * with a free pad like the template */
2971 static gchar *
2972 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
2973 {
2974   gboolean name_found = FALSE;
2975   gint session = 0;
2976   GstIterator *pad_it = NULL;
2977   gchar *pad_name = NULL;
2978   GValue data = { 0, };
2979
2980   GST_DEBUG_OBJECT (element, "find a free pad name for template");
2981   while (!name_found) {
2982     gboolean done = FALSE;
2983
2984     g_free (pad_name);
2985     pad_name = g_strdup_printf (templ->name_template, session++);
2986     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
2987     name_found = TRUE;
2988     while (!done) {
2989       switch (gst_iterator_next (pad_it, &data)) {
2990         case GST_ITERATOR_OK:
2991         {
2992           GstPad *pad;
2993           gchar *name;
2994
2995           pad = g_value_get_object (&data);
2996           name = gst_pad_get_name (pad);
2997
2998           if (strcmp (name, pad_name) == 0) {
2999             done = TRUE;
3000             name_found = FALSE;
3001           }
3002           g_free (name);
3003           g_value_reset (&data);
3004           break;
3005         }
3006         case GST_ITERATOR_ERROR:
3007         case GST_ITERATOR_RESYNC:
3008           /* restart iteration */
3009           done = TRUE;
3010           name_found = FALSE;
3011           session = 0;
3012           break;
3013         case GST_ITERATOR_DONE:
3014           done = TRUE;
3015           break;
3016       }
3017     }
3018     g_value_unset (&data);
3019     gst_iterator_free (pad_it);
3020   }
3021
3022   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3023   return pad_name;
3024 }
3025
3026 /*
3027  */
3028 static GstPad *
3029 gst_rtp_bin_request_new_pad (GstElement * element,
3030     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3031 {
3032   GstRtpBin *rtpbin;
3033   GstElementClass *klass;
3034   GstPad *result;
3035
3036   gchar *pad_name = NULL;
3037
3038   g_return_val_if_fail (templ != NULL, NULL);
3039   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3040
3041   rtpbin = GST_RTP_BIN (element);
3042   klass = GST_ELEMENT_GET_CLASS (element);
3043
3044   GST_RTP_BIN_LOCK (rtpbin);
3045
3046   if (name == NULL) {
3047     /* use a free pad name */
3048     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3049   } else {
3050     /* use the provided name */
3051     pad_name = g_strdup (name);
3052   }
3053
3054   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3055
3056   /* figure out the template */
3057   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3058     result = create_recv_rtp (rtpbin, templ, pad_name);
3059   } else if (templ == gst_element_class_get_pad_template (klass,
3060           "recv_rtcp_sink_%u")) {
3061     result = create_recv_rtcp (rtpbin, templ, pad_name);
3062   } else if (templ == gst_element_class_get_pad_template (klass,
3063           "send_rtp_sink_%u")) {
3064     result = create_send_rtp (rtpbin, templ, pad_name);
3065   } else if (templ == gst_element_class_get_pad_template (klass,
3066           "send_rtcp_src_%u")) {
3067     result = create_rtcp (rtpbin, templ, pad_name);
3068   } else
3069     goto wrong_template;
3070
3071   g_free (pad_name);
3072   GST_RTP_BIN_UNLOCK (rtpbin);
3073
3074   return result;
3075
3076   /* ERRORS */
3077 wrong_template:
3078   {
3079     g_free (pad_name);
3080     GST_RTP_BIN_UNLOCK (rtpbin);
3081     g_warning ("rtpbin: this is not our template");
3082     return NULL;
3083   }
3084 }
3085
3086 static void
3087 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3088 {
3089   GstRtpBinSession *session;
3090   GstRtpBin *rtpbin;
3091
3092   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3093   g_return_if_fail (GST_IS_RTP_BIN (element));
3094
3095   rtpbin = GST_RTP_BIN (element);
3096
3097   GST_RTP_BIN_LOCK (rtpbin);
3098   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3099       GST_DEBUG_PAD_NAME (pad));
3100
3101   if (!(session = find_session_by_pad (rtpbin, pad)))
3102     goto unknown_pad;
3103
3104   if (session->recv_rtp_sink_ghost == pad) {
3105     remove_recv_rtp (rtpbin, session);
3106   } else if (session->recv_rtcp_sink_ghost == pad) {
3107     remove_recv_rtcp (rtpbin, session);
3108   } else if (session->send_rtp_sink_ghost == pad) {
3109     remove_send_rtp (rtpbin, session);
3110   } else if (session->send_rtcp_src_ghost == pad) {
3111     remove_rtcp (rtpbin, session);
3112   }
3113
3114   /* no more request pads, free the complete session */
3115   if (session->recv_rtp_sink_ghost == NULL
3116       && session->recv_rtcp_sink_ghost == NULL
3117       && session->send_rtp_sink_ghost == NULL
3118       && session->send_rtcp_src_ghost == NULL) {
3119     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3120     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3121     free_session (session, rtpbin);
3122   }
3123   GST_RTP_BIN_UNLOCK (rtpbin);
3124
3125   return;
3126
3127   /* ERROR */
3128 unknown_pad:
3129   {
3130     GST_RTP_BIN_UNLOCK (rtpbin);
3131     g_warning ("rtpbin: %s:%s is not one of our request pads",
3132         GST_DEBUG_PAD_NAME (pad));
3133     return;
3134   }
3135 }