Merge remote-tracking branch 'origin/0.10'
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpbin
22  * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * gstrtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of gstrtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * <refsect2>
67  * <title>Example pipelines</title>
68  * |[
69  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
70  *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
71  * ]| Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
72  * |[
73  * gst-launch gstrtpbin name=rtpbin \
74  *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
75  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
76  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
77  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
78  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
79  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
80  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
81  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
82  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
83  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
84  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
85  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
86  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
87  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
88  * is received on port 5007. Since RTCP packets from the sender should be sent
89  * as soon as possible and do not participate in preroll, sync=false and
90  * async=false is configured on udpsink
91  * |[
92  * gst-launch -v gstrtpbin name=rtpbin                                          \
93  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
94  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
95  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
96  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
97  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
98  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
99  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
100  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
101  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
102  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
103  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
104  * decode and display the video.
105  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
106  * decode and play the audio.
107  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
108  * session 1 on port 5003. These packets will be used for session management and
109  * synchronisation.
110  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
111  * on port 5007.
112  * </refsect2>
113  *
114  * Last reviewed on 2007-08-30 (0.10.6)
115  */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120 #include <stdio.h>
121 #include <string.h>
122
123 #include <gst/rtp/gstrtpbuffer.h>
124 #include <gst/rtp/gstrtcpbuffer.h>
125
126 #include "gstrtpbin-marshal.h"
127 #include "gstrtpbin.h"
128 #include "rtpsession.h"
129 #include "gstrtpsession.h"
130 #include "gstrtpjitterbuffer.h"
131
132 #include <gst/glib-compat-private.h>
133
134 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
135 #define GST_CAT_DEFAULT gst_rtp_bin_debug
136
137 /* sink pads */
138 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
139 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
140     GST_PAD_SINK,
141     GST_PAD_REQUEST,
142     GST_STATIC_CAPS ("application/x-rtp")
143     );
144
145 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS ("application/x-rtcp")
150     );
151
152 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
153 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
154     GST_PAD_SINK,
155     GST_PAD_REQUEST,
156     GST_STATIC_CAPS ("application/x-rtp")
157     );
158
159 /* src pads */
160 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
161 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
162     GST_PAD_SRC,
163     GST_PAD_SOMETIMES,
164     GST_STATIC_CAPS ("application/x-rtp")
165     );
166
167 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
168 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
169     GST_PAD_SRC,
170     GST_PAD_REQUEST,
171     GST_STATIC_CAPS ("application/x-rtcp")
172     );
173
174 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
175 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
176     GST_PAD_SRC,
177     GST_PAD_SOMETIMES,
178     GST_STATIC_CAPS ("application/x-rtp")
179     );
180
181 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
182    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
183
184 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
185 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
186
187 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
188 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
189 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
190
191 /* lock for shutdown */
192 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
193 G_STMT_START {                                   \
194   if (g_atomic_int_get (&bin->priv->shutdown))   \
195     goto label;                                  \
196   GST_RTP_BIN_DYN_LOCK (bin);                    \
197   if (g_atomic_int_get (&bin->priv->shutdown)) { \
198     GST_RTP_BIN_DYN_UNLOCK (bin);                \
199     goto label;                                  \
200   }                                              \
201 } G_STMT_END
202
203 /* unlock for shutdown */
204 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
205   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
206
207 struct _GstRtpBinPrivate
208 {
209   GMutex bin_lock;
210
211   /* lock protecting dynamic adding/removing */
212   GMutex dyn_lock;
213
214   /* if we are shutting down or not */
215   gint shutdown;
216
217   gboolean autoremove;
218
219   /* UNIX (ntp) time of last SR sync used */
220   guint64 last_unix;
221 };
222
223 /* signals and args */
224 enum
225 {
226   SIGNAL_REQUEST_PT_MAP,
227   SIGNAL_PAYLOAD_TYPE_CHANGE,
228   SIGNAL_CLEAR_PT_MAP,
229   SIGNAL_RESET_SYNC,
230   SIGNAL_GET_INTERNAL_SESSION,
231
232   SIGNAL_ON_NEW_SSRC,
233   SIGNAL_ON_SSRC_COLLISION,
234   SIGNAL_ON_SSRC_VALIDATED,
235   SIGNAL_ON_SSRC_ACTIVE,
236   SIGNAL_ON_SSRC_SDES,
237   SIGNAL_ON_BYE_SSRC,
238   SIGNAL_ON_BYE_TIMEOUT,
239   SIGNAL_ON_TIMEOUT,
240   SIGNAL_ON_SENDER_TIMEOUT,
241   SIGNAL_ON_NPT_STOP,
242   LAST_SIGNAL
243 };
244
245 #define DEFAULT_LATENCY_MS           200
246 #define DEFAULT_SDES                 NULL
247 #define DEFAULT_DO_LOST              FALSE
248 #define DEFAULT_IGNORE_PT            FALSE
249 #define DEFAULT_NTP_SYNC             FALSE
250 #define DEFAULT_AUTOREMOVE           FALSE
251 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
252 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
253 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
254 #define DEFAULT_RTCP_SYNC_INTERVAL   0
255
256 enum
257 {
258   PROP_0,
259   PROP_LATENCY,
260   PROP_SDES,
261   PROP_DO_LOST,
262   PROP_IGNORE_PT,
263   PROP_NTP_SYNC,
264   PROP_RTCP_SYNC,
265   PROP_RTCP_SYNC_INTERVAL,
266   PROP_AUTOREMOVE,
267   PROP_BUFFER_MODE,
268   PROP_USE_PIPELINE_CLOCK,
269   PROP_LAST
270 };
271
272 enum
273 {
274   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
275   GST_RTP_BIN_RTCP_SYNC_INITIAL,
276   GST_RTP_BIN_RTCP_SYNC_RTP
277 };
278
279 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
280 static GType
281 gst_rtp_bin_rtcp_sync_get_type (void)
282 {
283   static GType rtcp_sync_type = 0;
284   static const GEnumValue rtcp_sync_types[] = {
285     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
286     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
287     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
288     {0, NULL, NULL},
289   };
290
291   if (!rtcp_sync_type) {
292     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
293   }
294   return rtcp_sync_type;
295 }
296
297 /* helper objects */
298 typedef struct _GstRtpBinSession GstRtpBinSession;
299 typedef struct _GstRtpBinStream GstRtpBinStream;
300 typedef struct _GstRtpBinClient GstRtpBinClient;
301
302 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
303
304 static GstCaps *pt_map_requested (GstElement * element, guint pt,
305     GstRtpBinSession * session);
306 static void payload_type_change (GstElement * element, guint pt,
307     GstRtpBinSession * session);
308 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
309 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
310 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
311 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
312 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
313 static void free_stream (GstRtpBinStream * stream);
314
315 /* Manages the RTP stream for one SSRC.
316  *
317  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
318  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
319  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
320  * together (see below).
321  */
322 struct _GstRtpBinStream
323 {
324   /* the SSRC of this stream */
325   guint32 ssrc;
326
327   /* parent bin */
328   GstRtpBin *bin;
329
330   /* the session this SSRC belongs to */
331   GstRtpBinSession *session;
332
333   /* the jitterbuffer of the SSRC */
334   GstElement *buffer;
335   gulong buffer_handlesync_sig;
336   gulong buffer_ptreq_sig;
337   gulong buffer_ntpstop_sig;
338   gint percent;
339
340   /* the PT demuxer of the SSRC */
341   GstElement *demux;
342   gulong demux_newpad_sig;
343   gulong demux_padremoved_sig;
344   gulong demux_ptreq_sig;
345   gulong demux_ptchange_sig;
346
347   /* if we have calculated a valid rt_delta for this stream */
348   gboolean have_sync;
349   /* mapping to local RTP and NTP time */
350   gint64 rt_delta;
351   gint64 rtp_delta;
352   /* base rtptime in gst time */
353   gint64 clock_base;
354 };
355
356 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
357 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
358
359 /* Manages the receiving end of the packets.
360  *
361  * There is one such structure for each RTP session (audio/video/...).
362  * We get the RTP/RTCP packets and stuff them into the session manager. From
363  * there they are pushed into an SSRC demuxer that splits the stream based on
364  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
365  * the GstRtpBinStream above).
366  */
367 struct _GstRtpBinSession
368 {
369   /* session id */
370   gint id;
371   /* the parent bin */
372   GstRtpBin *bin;
373   /* the session element */
374   GstElement *session;
375   /* the SSRC demuxer */
376   GstElement *demux;
377   gulong demux_newpad_sig;
378   gulong demux_padremoved_sig;
379
380   GMutex lock;
381
382   /* list of GstRtpBinStream */
383   GSList *streams;
384
385   /* mapping of payload type to caps */
386   GHashTable *ptmap;
387
388   /* the pads of the session */
389   GstPad *recv_rtp_sink;
390   GstPad *recv_rtp_sink_ghost;
391   GstPad *recv_rtp_src;
392   GstPad *recv_rtcp_sink;
393   GstPad *recv_rtcp_sink_ghost;
394   GstPad *sync_src;
395   GstPad *send_rtp_sink;
396   GstPad *send_rtp_sink_ghost;
397   GstPad *send_rtp_src;
398   GstPad *send_rtp_src_ghost;
399   GstPad *send_rtcp_src;
400   GstPad *send_rtcp_src_ghost;
401 };
402
403 /* Manages the RTP streams that come from one client and should therefore be
404  * synchronized.
405  */
406 struct _GstRtpBinClient
407 {
408   /* the common CNAME for the streams */
409   gchar *cname;
410   guint cname_len;
411
412   /* the streams */
413   guint nstreams;
414   GSList *streams;
415 };
416
417 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
418 static GstRtpBinSession *
419 find_session_by_id (GstRtpBin * rtpbin, gint id)
420 {
421   GSList *walk;
422
423   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
424     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
425
426     if (sess->id == id)
427       return sess;
428   }
429   return NULL;
430 }
431
432 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
433 static GstRtpBinSession *
434 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
435 {
436   GSList *walk;
437
438   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
439     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
440
441     if ((sess->recv_rtp_sink_ghost == pad) ||
442         (sess->recv_rtcp_sink_ghost == pad) ||
443         (sess->send_rtp_sink_ghost == pad)
444         || (sess->send_rtcp_src_ghost == pad))
445       return sess;
446   }
447   return NULL;
448 }
449
450 static void
451 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
452 {
453   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
454       sess->id, ssrc);
455 }
456
457 static void
458 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
459 {
460   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
461       sess->id, ssrc);
462 }
463
464 static void
465 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
466 {
467   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
468       sess->id, ssrc);
469 }
470
471 static void
472 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
473 {
474   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
475       sess->id, ssrc);
476 }
477
478 static void
479 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
480 {
481   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
482       sess->id, ssrc);
483 }
484
485 static void
486 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
487 {
488   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
489       sess->id, ssrc);
490 }
491
492 static void
493 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
494 {
495   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
496       sess->id, ssrc);
497
498   if (sess->bin->priv->autoremove)
499     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
500 }
501
502 static void
503 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
504 {
505   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
506       sess->id, ssrc);
507
508   if (sess->bin->priv->autoremove)
509     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
510 }
511
512 static void
513 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
514 {
515   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
516       sess->id, ssrc);
517 }
518
519 static void
520 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
521 {
522   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
523       stream->session->id, stream->ssrc);
524 }
525
526 /* must be called with the SESSION lock */
527 static GstRtpBinStream *
528 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
529 {
530   GSList *walk;
531
532   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
533     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
534
535     if (stream->ssrc == ssrc)
536       return stream;
537   }
538   return NULL;
539 }
540
541 static void
542 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
543     GstRtpBinSession * session)
544 {
545   GstRtpBinStream *stream = NULL;
546
547   GST_RTP_SESSION_LOCK (session);
548   if ((stream = find_stream_by_ssrc (session, ssrc)))
549     session->streams = g_slist_remove (session->streams, stream);
550   GST_RTP_SESSION_UNLOCK (session);
551
552   if (stream)
553     free_stream (stream);
554 }
555
556 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
557 static GstRtpBinSession *
558 create_session (GstRtpBin * rtpbin, gint id)
559 {
560   GstRtpBinSession *sess;
561   GstElement *session, *demux;
562   GstState target;
563
564   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
565     goto no_session;
566
567   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
568     goto no_demux;
569
570   sess = g_new0 (GstRtpBinSession, 1);
571   g_mutex_init (&sess->lock);
572   sess->id = id;
573   sess->bin = rtpbin;
574   sess->session = session;
575   sess->demux = demux;
576   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
577       (GDestroyNotify) gst_caps_unref);
578   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
579
580   /* configure SDES items */
581   GST_OBJECT_LOCK (rtpbin);
582   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
583       rtpbin->use_pipeline_clock, NULL);
584   GST_OBJECT_UNLOCK (rtpbin);
585
586   /* provide clock_rate to the session manager when needed */
587   g_signal_connect (session, "request-pt-map",
588       (GCallback) pt_map_requested, sess);
589
590   g_signal_connect (sess->session, "on-new-ssrc",
591       (GCallback) on_new_ssrc, sess);
592   g_signal_connect (sess->session, "on-ssrc-collision",
593       (GCallback) on_ssrc_collision, sess);
594   g_signal_connect (sess->session, "on-ssrc-validated",
595       (GCallback) on_ssrc_validated, sess);
596   g_signal_connect (sess->session, "on-ssrc-active",
597       (GCallback) on_ssrc_active, sess);
598   g_signal_connect (sess->session, "on-ssrc-sdes",
599       (GCallback) on_ssrc_sdes, sess);
600   g_signal_connect (sess->session, "on-bye-ssrc",
601       (GCallback) on_bye_ssrc, sess);
602   g_signal_connect (sess->session, "on-bye-timeout",
603       (GCallback) on_bye_timeout, sess);
604   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
605   g_signal_connect (sess->session, "on-sender-timeout",
606       (GCallback) on_sender_timeout, sess);
607
608   gst_bin_add (GST_BIN_CAST (rtpbin), session);
609   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
610
611   GST_OBJECT_LOCK (rtpbin);
612   target = GST_STATE_TARGET (rtpbin);
613   GST_OBJECT_UNLOCK (rtpbin);
614
615   /* change state only to what's needed */
616   gst_element_set_state (demux, target);
617   gst_element_set_state (session, target);
618
619   return sess;
620
621   /* ERRORS */
622 no_session:
623   {
624     g_warning ("rtpbin: could not create gstrtpsession element");
625     return NULL;
626   }
627 no_demux:
628   {
629     gst_object_unref (session);
630     g_warning ("rtpbin: could not create gstrtpssrcdemux element");
631     return NULL;
632   }
633 }
634
635 /* called with RTP_BIN_LOCK */
636 static void
637 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
638 {
639   GSList *client_walk;
640
641   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
642
643   gst_element_set_locked_state (sess->demux, TRUE);
644   gst_element_set_locked_state (sess->session, TRUE);
645
646   gst_element_set_state (sess->demux, GST_STATE_NULL);
647   gst_element_set_state (sess->session, GST_STATE_NULL);
648
649   remove_recv_rtp (bin, sess);
650   remove_recv_rtcp (bin, sess);
651   remove_send_rtp (bin, sess);
652   remove_rtcp (bin, sess);
653
654   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
655   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
656
657   /* remove any references in bin->clients to the streams in sess->streams */
658   client_walk = bin->clients;
659   while (client_walk) {
660     GSList *client_node = client_walk;
661     GstRtpBinClient *client = (GstRtpBinClient *) client_node->data;
662     GSList *stream_walk = client->streams;
663
664     while (stream_walk) {
665       GSList *stream_node = stream_walk;
666       GstRtpBinStream *stream = (GstRtpBinStream *) stream_node->data;
667       GSList *inner_walk;
668
669       stream_walk = g_slist_next (stream_walk);
670
671       for (inner_walk = sess->streams; inner_walk;
672           inner_walk = g_slist_next (inner_walk)) {
673         if ((GstRtpBinStream *) inner_walk->data == stream) {
674           client->streams = g_slist_delete_link (client->streams, stream_node);
675           --client->nstreams;
676           break;
677         }
678       }
679     }
680     client_walk = g_slist_next (client_walk);
681
682     g_assert ((client->streams && client->nstreams > 0) || (!client->streams
683             && client->streams == 0));
684     if (client->nstreams == 0) {
685       free_client (client, bin);
686       bin->clients = g_slist_delete_link (bin->clients, client_node);
687     }
688   }
689
690   g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
691   g_slist_free (sess->streams);
692
693   g_mutex_clear (&sess->lock);
694   g_hash_table_destroy (sess->ptmap);
695
696   g_free (sess);
697 }
698
699 /* get the payload type caps for the specific payload @pt in @session */
700 static GstCaps *
701 get_pt_map (GstRtpBinSession * session, guint pt)
702 {
703   GstCaps *caps = NULL;
704   GstRtpBin *bin;
705   GValue ret = { 0 };
706   GValue args[3] = { {0}, {0}, {0} };
707
708   GST_DEBUG ("searching pt %d in cache", pt);
709
710   GST_RTP_SESSION_LOCK (session);
711
712   /* first look in the cache */
713   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
714   if (caps) {
715     gst_caps_ref (caps);
716     goto done;
717   }
718
719   bin = session->bin;
720
721   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
722
723   /* not in cache, send signal to request caps */
724   g_value_init (&args[0], GST_TYPE_ELEMENT);
725   g_value_set_object (&args[0], bin);
726   g_value_init (&args[1], G_TYPE_UINT);
727   g_value_set_uint (&args[1], session->id);
728   g_value_init (&args[2], G_TYPE_UINT);
729   g_value_set_uint (&args[2], pt);
730
731   g_value_init (&ret, GST_TYPE_CAPS);
732   g_value_set_boxed (&ret, NULL);
733
734   GST_RTP_SESSION_UNLOCK (session);
735
736   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
737
738   GST_RTP_SESSION_LOCK (session);
739
740   g_value_unset (&args[0]);
741   g_value_unset (&args[1]);
742   g_value_unset (&args[2]);
743
744   /* look in the cache again because we let the lock go */
745   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
746   if (caps) {
747     gst_caps_ref (caps);
748     g_value_unset (&ret);
749     goto done;
750   }
751
752   caps = (GstCaps *) g_value_dup_boxed (&ret);
753   g_value_unset (&ret);
754   if (!caps)
755     goto no_caps;
756
757   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
758
759   /* store in cache, take additional ref */
760   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
761       gst_caps_ref (caps));
762
763 done:
764   GST_RTP_SESSION_UNLOCK (session);
765
766   return caps;
767
768   /* ERRORS */
769 no_caps:
770   {
771     GST_RTP_SESSION_UNLOCK (session);
772     GST_DEBUG ("no pt map could be obtained");
773     return NULL;
774   }
775 }
776
777 static gboolean
778 return_true (gpointer key, gpointer value, gpointer user_data)
779 {
780   return TRUE;
781 }
782
783 static void
784 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
785 {
786   GSList *clients, *streams;
787
788   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
789
790   GST_RTP_BIN_LOCK (rtpbin);
791   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
792     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
793
794     /* reset sync on all streams for this client */
795     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
796       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
797
798       /* make use require a new SR packet for this stream before we attempt new
799        * lip-sync */
800       stream->have_sync = FALSE;
801       stream->rt_delta = 0;
802       stream->rtp_delta = 0;
803       stream->clock_base = -100 * GST_SECOND;
804     }
805   }
806   GST_RTP_BIN_UNLOCK (rtpbin);
807 }
808
809 static void
810 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
811 {
812   GSList *sessions, *streams;
813
814   GST_RTP_BIN_LOCK (bin);
815   GST_DEBUG_OBJECT (bin, "clearing pt map");
816   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
817     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
818
819     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
820     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
821
822     GST_RTP_SESSION_LOCK (session);
823     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
824
825     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
826       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
827
828       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
829       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
830       if (stream->demux)
831         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
832     }
833     GST_RTP_SESSION_UNLOCK (session);
834   }
835   GST_RTP_BIN_UNLOCK (bin);
836
837   /* reset sync too */
838   gst_rtp_bin_reset_sync (bin);
839 }
840
841 static RTPSession *
842 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
843 {
844   RTPSession *internal_session = NULL;
845   GstRtpBinSession *session;
846
847   GST_RTP_BIN_LOCK (bin);
848   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
849       session_id);
850   session = find_session_by_id (bin, (gint) session_id);
851   if (session) {
852     g_object_get (session->session, "internal-session", &internal_session,
853         NULL);
854   }
855   GST_RTP_BIN_UNLOCK (bin);
856
857   return internal_session;
858 }
859
860 static void
861 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
862     const gchar * name, const GValue * value)
863 {
864   GSList *sessions, *streams;
865
866   GST_RTP_BIN_LOCK (bin);
867   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
868     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
869
870     GST_RTP_SESSION_LOCK (session);
871     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
872       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
873
874       g_object_set_property (G_OBJECT (stream->buffer), name, value);
875     }
876     GST_RTP_SESSION_UNLOCK (session);
877   }
878   GST_RTP_BIN_UNLOCK (bin);
879 }
880
881 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
882 static GstRtpBinClient *
883 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
884 {
885   GstRtpBinClient *result = NULL;
886   GSList *walk;
887
888   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
889     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
890
891     if (len != client->cname_len)
892       continue;
893
894     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
895       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
896           client->cname);
897       result = client;
898       break;
899     }
900   }
901
902   /* nothing found, create one */
903   if (result == NULL) {
904     result = g_new0 (GstRtpBinClient, 1);
905     result->cname = g_strndup ((gchar *) data, len);
906     result->cname_len = len;
907     bin->clients = g_slist_prepend (bin->clients, result);
908     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
909         result->cname);
910   }
911   return result;
912 }
913
914 static void
915 free_client (GstRtpBinClient * client, GstRtpBin * bin)
916 {
917   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
918   g_slist_free (client->streams);
919   g_free (client->cname);
920   g_free (client);
921 }
922
923 static void
924 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
925     guint64 * ntpnstime)
926 {
927   guint64 ntpns;
928   GstClock *clock;
929   GstClockTime base_time, rt, clock_time;
930
931   GST_OBJECT_LOCK (bin);
932   if ((clock = GST_ELEMENT_CLOCK (bin))) {
933     base_time = GST_ELEMENT_CAST (bin)->base_time;
934     gst_object_ref (clock);
935     GST_OBJECT_UNLOCK (bin);
936
937     clock_time = gst_clock_get_time (clock);
938
939     if (bin->use_pipeline_clock) {
940       ntpns = clock_time;
941     } else {
942       GTimeVal current;
943
944       /* get current NTP time */
945       g_get_current_time (&current);
946       ntpns = GST_TIMEVAL_TO_TIME (current);
947     }
948
949     /* add constant to convert from 1970 based time to 1900 based time */
950     ntpns += (2208988800LL * GST_SECOND);
951
952     /* get current clock time and convert to running time */
953     rt = clock_time - base_time;
954
955     gst_object_unref (clock);
956   } else {
957     GST_OBJECT_UNLOCK (bin);
958     rt = -1;
959     ntpns = -1;
960   }
961   if (running_time)
962     *running_time = rt;
963   if (ntpnstime)
964     *ntpnstime = ntpns;
965 }
966
967 static void
968 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
969     gint64 ts_offset)
970 {
971   gint64 prev_ts_offset;
972
973   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
974
975   /* delta changed, see how much */
976   if (prev_ts_offset != ts_offset) {
977     gint64 diff;
978
979     diff = prev_ts_offset - ts_offset;
980
981     GST_DEBUG_OBJECT (bin,
982         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
983         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
984
985     /* only change diff when it changed more than 4 milliseconds. This
986      * compensates for rounding errors in NTP to RTP timestamp
987      * conversions */
988     if (ABS (diff) > 4 * GST_MSECOND) {
989       if (ABS (diff) < (3 * GST_SECOND)) {
990         g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
991       } else {
992         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
993       }
994     } else {
995       GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
996     }
997   }
998   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
999       stream->ssrc, ts_offset);
1000 }
1001
1002 /* associate a stream to the given CNAME. This will make sure all streams for
1003  * that CNAME are synchronized together.
1004  * Must be called with GST_RTP_BIN_LOCK */
1005 static void
1006 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1007     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1008     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1009     gint64 rtp_clock_base)
1010 {
1011   GstRtpBinClient *client;
1012   gboolean created;
1013   GSList *walk;
1014   guint64 local_rt;
1015   guint64 local_rtp;
1016   GstClockTime running_time;
1017   guint64 ntpnstime;
1018   gint64 ntpdiff, rtdiff;
1019   guint64 last_unix;
1020
1021   /* first find or create the CNAME */
1022   client = get_client (bin, len, data, &created);
1023
1024   /* find stream in the client */
1025   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1026     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1027
1028     if (ostream == stream)
1029       break;
1030   }
1031   /* not found, add it to the list */
1032   if (walk == NULL) {
1033     GST_DEBUG_OBJECT (bin,
1034         "new association of SSRC %08x with client %p with CNAME %s",
1035         stream->ssrc, client, client->cname);
1036     client->streams = g_slist_prepend (client->streams, stream);
1037     client->nstreams++;
1038   } else {
1039     GST_DEBUG_OBJECT (bin,
1040         "found association of SSRC %08x with client %p with CNAME %s",
1041         stream->ssrc, client, client->cname);
1042   }
1043
1044   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1045     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1046     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1047       /* we don't need that data, so carry on,
1048        * but make some values look saner */
1049       last_extrtptime = base_rtptime;
1050     } else {
1051       /* nothing we can do with this data in this case */
1052       GST_DEBUG_OBJECT (bin, "bailing out");
1053       return;
1054     }
1055   }
1056
1057   /* Take the extended rtptime we found in the SR packet and map it to the
1058    * local rtptime. The local rtp time is used to construct timestamps on the
1059    * buffers so we will calculate what running_time corresponds to the RTP
1060    * timestamp in the SR packet. */
1061   local_rtp = last_extrtptime - base_rtptime;
1062
1063   GST_DEBUG_OBJECT (bin,
1064       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1065       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1066       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1067       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1068
1069   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1070    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1071    * into a corresponding gstreamer timestamp. Note that the base_time also
1072    * contains the drift between sender and receiver. */
1073   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1074   local_rt += base_time;
1075
1076   /* convert ntptime to unix time since 1900 */
1077   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1078       (G_GINT64_CONSTANT (1) << 32));
1079
1080   stream->have_sync = TRUE;
1081
1082   GST_DEBUG_OBJECT (bin,
1083       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1084       local_rt, last_unix);
1085
1086   /* recalc inter stream playout offset, but only if there is more than one
1087    * stream or we're doing NTP sync. */
1088   if (bin->ntp_sync) {
1089     /* For NTP sync we need to first get a snapshot of running_time and NTP
1090      * time. We know at what running_time we play a certain RTP time, we also
1091      * calculated when we would play the RTP time in the SR packet. Now we need
1092      * to know how the running_time and the NTP time relate to eachother. */
1093     get_current_times (bin, &running_time, &ntpnstime);
1094
1095     /* see how far away the NTP time is. This is the difference between the
1096      * current NTP time and the NTP time in the last SR packet. */
1097     ntpdiff = ntpnstime - last_unix;
1098     /* see how far away the running_time is. This is the difference between the
1099      * current running_time and the running_time of the RTP timestamp in the
1100      * last SR packet. */
1101     rtdiff = running_time - local_rt;
1102
1103     GST_DEBUG_OBJECT (bin,
1104         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1105         ntpnstime, last_unix);
1106     GST_DEBUG_OBJECT (bin,
1107         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1108         rtdiff);
1109
1110     /* combine to get the final diff to apply to the running_time */
1111     stream->rt_delta = rtdiff - ntpdiff;
1112
1113     stream_set_ts_offset (bin, stream, stream->rt_delta);
1114   } else {
1115     gint64 min, rtp_min, clock_base = stream->clock_base;
1116     gboolean all_sync, use_rtp;
1117     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1118
1119     /* calculate delta between server and receiver. last_unix is created by
1120      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1121      * delta expresses the difference to our timeline and the server timeline. The
1122      * difference in itself doesn't mean much but we can combine the delta of
1123      * multiple streams to create a stream specific offset. */
1124     stream->rt_delta = last_unix - local_rt;
1125
1126     /* calculate the min of all deltas, ignoring streams that did not yet have a
1127      * valid rt_delta because we did not yet receive an SR packet for those
1128      * streams.
1129      * We calculate the mininum because we would like to only apply positive
1130      * offsets to streams, delaying their playback instead of trying to speed up
1131      * other streams (which might be imposible when we have to create negative
1132      * latencies).
1133      * The stream that has the smallest diff is selected as the reference stream,
1134      * all other streams will have a positive offset to this difference. */
1135
1136     /* some alternative setting allow ignoring RTCP as much as possible,
1137      * for servers generating bogus ntp timeline */
1138     min = rtp_min = G_MAXINT64;
1139     use_rtp = FALSE;
1140     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1141       guint64 ext_base;
1142
1143       use_rtp = TRUE;
1144       /* signed version for convienience */
1145       clock_base = base_rtptime;
1146       /* deal with possible wrap-around */
1147       ext_base = base_rtptime;
1148       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1149       /* sanity check; base rtp and provided clock_base should be close */
1150       if (rtp_clock_base >= clock_base) {
1151         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1152           rtp_clock_base = base_time +
1153               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1154               GST_SECOND, clock_rate);
1155         } else {
1156           use_rtp = FALSE;
1157         }
1158       } else {
1159         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1160           rtp_clock_base = base_time -
1161               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1162               GST_SECOND, clock_rate);
1163         } else {
1164           use_rtp = FALSE;
1165         }
1166       }
1167       /* warn and bail for clarity out if no sane values */
1168       if (!use_rtp) {
1169         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1170         return;
1171       }
1172       /* store to track changes */
1173       clock_base = rtp_clock_base;
1174       /* generate a fake as before,
1175        * now equating rtptime obtained from RTP-Info,
1176        * where the large time represent the otherwise irrelevant npt/ntp time */
1177       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1178     } else {
1179       clock_base = rtp_clock_base;
1180     }
1181
1182     all_sync = TRUE;
1183     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1184       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1185
1186       if (!ostream->have_sync) {
1187         all_sync = FALSE;
1188         continue;
1189       }
1190
1191       /* change in current stream's base from previously init'ed value
1192        * leads to reset of all stream's base */
1193       if (stream != ostream && stream->clock_base >= 0 &&
1194           (stream->clock_base != clock_base)) {
1195         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1196         ostream->clock_base = -100 * GST_SECOND;
1197         ostream->rtp_delta = 0;
1198       }
1199
1200       if (ostream->rt_delta < min)
1201         min = ostream->rt_delta;
1202       if (ostream->rtp_delta < rtp_min)
1203         rtp_min = ostream->rtp_delta;
1204     }
1205
1206     /* arrange to re-sync for each stream upon significant change,
1207      * e.g. post-seek */
1208     all_sync = all_sync && (stream->clock_base == clock_base);
1209     stream->clock_base = clock_base;
1210
1211     /* may need init performed above later on, but nothing more to do now */
1212     if (client->nstreams <= 1)
1213       return;
1214
1215     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1216         " all sync %d", client, min, all_sync);
1217     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1218
1219     switch (rtcp_sync) {
1220       case GST_RTP_BIN_RTCP_SYNC_RTP:
1221         if (!use_rtp)
1222           break;
1223         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1224             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1225         /* fall-through */
1226       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1227         /* if all have been synced already, do not bother further */
1228         if (all_sync) {
1229           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1230           return;
1231         }
1232         break;
1233       default:
1234         break;
1235     }
1236
1237     /* bail out if we adjusted recently enough */
1238     if (all_sync && (last_unix - bin->priv->last_unix) <
1239         bin->rtcp_sync_interval * GST_MSECOND) {
1240       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1241           "previous sender info too recent "
1242           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1243       return;
1244     }
1245     bin->priv->last_unix = last_unix;
1246
1247     /* calculate offsets for each stream */
1248     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1249       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1250       gint64 ts_offset;
1251
1252       /* ignore streams for which we didn't receive an SR packet yet, we
1253        * can't synchronize them yet. We can however sync other streams just
1254        * fine. */
1255       if (!ostream->have_sync)
1256         continue;
1257
1258       /* calculate offset to our reference stream, this should always give a
1259        * positive number. */
1260       if (use_rtp)
1261         ts_offset = ostream->rtp_delta - rtp_min;
1262       else
1263         ts_offset = ostream->rt_delta - min;
1264
1265       stream_set_ts_offset (bin, ostream, ts_offset);
1266     }
1267   }
1268   return;
1269 }
1270
1271 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1272   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1273           (b) = gst_rtcp_packet_move_to_next ((packet)))
1274
1275 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1276   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1277           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1278
1279 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1280   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1281           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1282
1283 static void
1284 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1285     GstRtpBinStream * stream)
1286 {
1287   GstRtpBin *bin;
1288   GstRTCPPacket packet;
1289   guint32 ssrc;
1290   guint64 ntptime;
1291   gboolean have_sr, have_sdes;
1292   gboolean more;
1293   guint64 base_rtptime;
1294   guint64 base_time;
1295   guint clock_rate;
1296   guint64 clock_base;
1297   guint64 extrtptime;
1298   GstBuffer *buffer;
1299   GstRTCPBuffer rtcp = { NULL, };
1300
1301   bin = stream->bin;
1302
1303   GST_DEBUG_OBJECT (bin, "sync handler called");
1304
1305   /* get the last relation between the rtp timestamps and the gstreamer
1306    * timestamps. We get this info directly from the jitterbuffer which
1307    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1308    * what the current situation is. */
1309   base_rtptime =
1310       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1311   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1312   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1313   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1314   extrtptime =
1315       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1316   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1317
1318   have_sr = FALSE;
1319   have_sdes = FALSE;
1320
1321   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1322
1323   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1324     /* first packet must be SR or RR or else the validate would have failed */
1325     switch (gst_rtcp_packet_get_type (&packet)) {
1326       case GST_RTCP_TYPE_SR:
1327         /* only parse first. There is only supposed to be one SR in the packet
1328          * but we will deal with malformed packets gracefully */
1329         if (have_sr)
1330           break;
1331         /* get NTP and RTP times */
1332         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1333             NULL, NULL);
1334
1335         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1336         /* ignore SR that is not ours */
1337         if (ssrc != stream->ssrc)
1338           continue;
1339
1340         have_sr = TRUE;
1341         break;
1342       case GST_RTCP_TYPE_SDES:
1343       {
1344         gboolean more_items, more_entries;
1345
1346         /* only deal with first SDES, there is only supposed to be one SDES in
1347          * the RTCP packet but we deal with bad packets gracefully. Also bail
1348          * out if we have not seen an SR item yet. */
1349         if (have_sdes || !have_sr)
1350           break;
1351
1352         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1353           /* skip items that are not about the SSRC of the sender */
1354           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1355             continue;
1356
1357           /* find the CNAME entry */
1358           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1359             GstRTCPSDESType type;
1360             guint8 len;
1361             guint8 *data;
1362
1363             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1364
1365             if (type == GST_RTCP_SDES_CNAME) {
1366               GST_RTP_BIN_LOCK (bin);
1367               /* associate the stream to CNAME */
1368               gst_rtp_bin_associate (bin, stream, len, data,
1369                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1370                   clock_base);
1371               GST_RTP_BIN_UNLOCK (bin);
1372             }
1373           }
1374         }
1375         have_sdes = TRUE;
1376         break;
1377       }
1378       default:
1379         /* we can ignore these packets */
1380         break;
1381     }
1382   }
1383   gst_rtcp_buffer_unmap (&rtcp);
1384 }
1385
1386 /* create a new stream with @ssrc in @session. Must be called with
1387  * RTP_SESSION_LOCK. */
1388 static GstRtpBinStream *
1389 create_stream (GstRtpBinSession * session, guint32 ssrc)
1390 {
1391   GstElement *buffer, *demux = NULL;
1392   GstRtpBinStream *stream;
1393   GstRtpBin *rtpbin;
1394   GstState target;
1395
1396   rtpbin = session->bin;
1397
1398   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1399     goto no_jitterbuffer;
1400
1401   if (!rtpbin->ignore_pt)
1402     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1403       goto no_demux;
1404
1405
1406   stream = g_new0 (GstRtpBinStream, 1);
1407   stream->ssrc = ssrc;
1408   stream->bin = rtpbin;
1409   stream->session = session;
1410   stream->buffer = buffer;
1411   stream->demux = demux;
1412
1413   stream->have_sync = FALSE;
1414   stream->rt_delta = 0;
1415   stream->rtp_delta = 0;
1416   stream->percent = 100;
1417   stream->clock_base = -100 * GST_SECOND;
1418   session->streams = g_slist_prepend (session->streams, stream);
1419
1420   /* provide clock_rate to the jitterbuffer when needed */
1421   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1422       (GCallback) pt_map_requested, session);
1423   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1424       (GCallback) on_npt_stop, stream);
1425
1426   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1427   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1428
1429   /* configure latency and packet lost */
1430   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1431   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1432   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1433
1434   if (!rtpbin->ignore_pt)
1435     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1436   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1437
1438   /* link stuff */
1439   if (demux)
1440     gst_element_link (buffer, demux);
1441
1442   if (rtpbin->buffering) {
1443     guint64 last_out;
1444
1445     GST_INFO_OBJECT (rtpbin,
1446         "bin is buffering, set jitterbuffer as not active");
1447     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1448   }
1449
1450
1451   GST_OBJECT_LOCK (rtpbin);
1452   target = GST_STATE_TARGET (rtpbin);
1453   GST_OBJECT_UNLOCK (rtpbin);
1454
1455   /* from sink to source */
1456   if (demux)
1457     gst_element_set_state (demux, target);
1458
1459   gst_element_set_state (buffer, target);
1460
1461   return stream;
1462
1463   /* ERRORS */
1464 no_jitterbuffer:
1465   {
1466     g_warning ("rtpbin: could not create gstrtpjitterbuffer element");
1467     return NULL;
1468   }
1469 no_demux:
1470   {
1471     gst_object_unref (buffer);
1472     g_warning ("rtpbin: could not create gstrtpptdemux element");
1473     return NULL;
1474   }
1475 }
1476
1477 static void
1478 free_stream (GstRtpBinStream * stream)
1479 {
1480   GstRtpBinSession *session;
1481
1482   session = stream->session;
1483
1484   if (stream->demux) {
1485     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1486     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1487     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1488   }
1489   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1490   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1491   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1492
1493   if (stream->demux)
1494     gst_element_set_locked_state (stream->demux, TRUE);
1495   gst_element_set_locked_state (stream->buffer, TRUE);
1496
1497   if (stream->demux)
1498     gst_element_set_state (stream->demux, GST_STATE_NULL);
1499   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1500
1501   /* now remove this signal, we need this while going to NULL because it to
1502    * do some cleanups */
1503   if (stream->demux)
1504     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1505
1506   gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer);
1507   if (stream->demux)
1508     gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux);
1509
1510   g_free (stream);
1511 }
1512
1513 /* GObject vmethods */
1514 static void gst_rtp_bin_dispose (GObject * object);
1515 static void gst_rtp_bin_finalize (GObject * object);
1516 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1517     const GValue * value, GParamSpec * pspec);
1518 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1519     GValue * value, GParamSpec * pspec);
1520
1521 /* GstElement vmethods */
1522 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1523     GstStateChange transition);
1524 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1525     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1526 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1527 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1528
1529 #define gst_rtp_bin_parent_class parent_class
1530 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1531
1532 static void
1533 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1534 {
1535   GObjectClass *gobject_class;
1536   GstElementClass *gstelement_class;
1537   GstBinClass *gstbin_class;
1538
1539   gobject_class = (GObjectClass *) klass;
1540   gstelement_class = (GstElementClass *) klass;
1541   gstbin_class = (GstBinClass *) klass;
1542
1543   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1544
1545   gobject_class->dispose = gst_rtp_bin_dispose;
1546   gobject_class->finalize = gst_rtp_bin_finalize;
1547   gobject_class->set_property = gst_rtp_bin_set_property;
1548   gobject_class->get_property = gst_rtp_bin_get_property;
1549
1550   g_object_class_install_property (gobject_class, PROP_LATENCY,
1551       g_param_spec_uint ("latency", "Buffer latency in ms",
1552           "Default amount of ms to buffer in the jitterbuffers", 0,
1553           G_MAXUINT, DEFAULT_LATENCY_MS,
1554           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1555
1556   /**
1557    * GstRtpBin::request-pt-map:
1558    * @rtpbin: the object which received the signal
1559    * @session: the session
1560    * @pt: the pt
1561    *
1562    * Request the payload type as #GstCaps for @pt in @session.
1563    */
1564   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1565       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1566       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1567       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
1568       G_TYPE_UINT, G_TYPE_UINT);
1569
1570     /**
1571    * GstRtpBin::payload-type-change:
1572    * @rtpbin: the object which received the signal
1573    * @session: the session
1574    * @pt: the pt
1575    *
1576    * Signal that the current payload type changed to @pt in @session.
1577    *
1578    * Since: 0.10.17
1579    */
1580   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1581       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1582       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1583       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1584       G_TYPE_UINT, G_TYPE_UINT);
1585
1586   /**
1587    * GstRtpBin::clear-pt-map:
1588    * @rtpbin: the object which received the signal
1589    *
1590    * Clear all previously cached pt-mapping obtained with
1591    * #GstRtpBin::request-pt-map.
1592    */
1593   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1594       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1595       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1596           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1597       0, G_TYPE_NONE);
1598
1599   /**
1600    * GstRtpBin::reset-sync:
1601    * @rtpbin: the object which received the signal
1602    *
1603    * Reset all currently configured lip-sync parameters and require new SR
1604    * packets for all streams before lip-sync is attempted again.
1605    */
1606   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1607       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1608       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1609           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1610       0, G_TYPE_NONE);
1611
1612   /**
1613    * GstRtpBin::get-internal-session:
1614    * @rtpbin: the object which received the signal
1615    * @id: the session id
1616    *
1617    * Request the internal RTPSession object as #GObject in session @id.
1618    */
1619   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1620       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1621       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1622           get_internal_session), NULL, NULL, gst_rtp_bin_marshal_OBJECT__UINT,
1623       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1624
1625   /**
1626    * GstRtpBin::on-new-ssrc:
1627    * @rtpbin: the object which received the signal
1628    * @session: the session
1629    * @ssrc: the SSRC
1630    *
1631    * Notify of a new SSRC that entered @session.
1632    */
1633   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1634       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1635       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1636       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1637       G_TYPE_UINT, G_TYPE_UINT);
1638   /**
1639    * GstRtpBin::on-ssrc-collision:
1640    * @rtpbin: the object which received the signal
1641    * @session: the session
1642    * @ssrc: the SSRC
1643    *
1644    * Notify when we have an SSRC collision
1645    */
1646   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1647       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1648       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1649       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1650       G_TYPE_UINT, G_TYPE_UINT);
1651   /**
1652    * GstRtpBin::on-ssrc-validated:
1653    * @rtpbin: the object which received the signal
1654    * @session: the session
1655    * @ssrc: the SSRC
1656    *
1657    * Notify of a new SSRC that became validated.
1658    */
1659   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1660       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1661       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1662       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1663       G_TYPE_UINT, G_TYPE_UINT);
1664   /**
1665    * GstRtpBin::on-ssrc-active:
1666    * @rtpbin: the object which received the signal
1667    * @session: the session
1668    * @ssrc: the SSRC
1669    *
1670    * Notify of a SSRC that is active, i.e., sending RTCP.
1671    */
1672   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1673       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1674       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1675       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1676       G_TYPE_UINT, G_TYPE_UINT);
1677   /**
1678    * GstRtpBin::on-ssrc-sdes:
1679    * @rtpbin: the object which received the signal
1680    * @session: the session
1681    * @ssrc: the SSRC
1682    *
1683    * Notify of a SSRC that is active, i.e., sending RTCP.
1684    */
1685   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1686       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1687       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1688       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1689       G_TYPE_UINT, G_TYPE_UINT);
1690
1691   /**
1692    * GstRtpBin::on-bye-ssrc:
1693    * @rtpbin: the object which received the signal
1694    * @session: the session
1695    * @ssrc: the SSRC
1696    *
1697    * Notify of an SSRC that became inactive because of a BYE packet.
1698    */
1699   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1700       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1701       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1702       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1703       G_TYPE_UINT, G_TYPE_UINT);
1704   /**
1705    * GstRtpBin::on-bye-timeout:
1706    * @rtpbin: the object which received the signal
1707    * @session: the session
1708    * @ssrc: the SSRC
1709    *
1710    * Notify of an SSRC that has timed out because of BYE
1711    */
1712   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1713       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1714       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1715       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1716       G_TYPE_UINT, G_TYPE_UINT);
1717   /**
1718    * GstRtpBin::on-timeout:
1719    * @rtpbin: the object which received the signal
1720    * @session: the session
1721    * @ssrc: the SSRC
1722    *
1723    * Notify of an SSRC that has timed out
1724    */
1725   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1726       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1727       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1728       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1729       G_TYPE_UINT, G_TYPE_UINT);
1730   /**
1731    * GstRtpBin::on-sender-timeout:
1732    * @rtpbin: the object which received the signal
1733    * @session: the session
1734    * @ssrc: the SSRC
1735    *
1736    * Notify of a sender SSRC that has timed out and became a receiver
1737    */
1738   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1739       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1740       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1741       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1742       G_TYPE_UINT, G_TYPE_UINT);
1743
1744   /**
1745    * GstRtpBin::on-npt-stop:
1746    * @rtpbin: the object which received the signal
1747    * @session: the session
1748    * @ssrc: the SSRC
1749    *
1750    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1751    */
1752   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1753       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1754       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1755       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1756       G_TYPE_UINT, G_TYPE_UINT);
1757
1758   g_object_class_install_property (gobject_class, PROP_SDES,
1759       g_param_spec_boxed ("sdes", "SDES",
1760           "The SDES items of this session",
1761           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1762
1763   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1764       g_param_spec_boolean ("do-lost", "Do Lost",
1765           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1766           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1767
1768   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
1769       g_param_spec_boolean ("autoremove", "Auto Remove",
1770           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
1771           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1772
1773   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
1774       g_param_spec_boolean ("ignore-pt", "Ignore PT",
1775           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
1776           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1777
1778   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
1779       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
1780           "Use the pipeline clock to set the NTP time in the RTCP SR messages",
1781           DEFAULT_USE_PIPELINE_CLOCK,
1782           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1783   /**
1784    * GstRtpBin::buffer-mode:
1785    *
1786    * Control the buffering and timestamping mode used by the jitterbuffer.
1787    *
1788    * Since: 0.10.17
1789    */
1790   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
1791       g_param_spec_enum ("buffer-mode", "Buffer Mode",
1792           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
1793           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1794   /**
1795    * GstRtpBin::ntp-sync:
1796    *
1797    * Synchronize received streams to the NTP clock. When the NTP clock is shared
1798    * between the receivers and the senders (such as when using ntpd) this option
1799    * can be used to synchronize receivers on multiple machines.
1800    *
1801    * Since: 0.10.21
1802    */
1803   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
1804       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
1805           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
1806           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1807
1808   /**
1809    * GstRtpBin::rtcp-sync:
1810    *
1811    * If not synchronizing (directly) to the NTP clock, determines how to sync
1812    * the various streams.
1813    *
1814    * Since: 0.10.31
1815    */
1816   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
1817       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
1818           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
1819           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1820
1821   /**
1822    * GstRtpBin::rtcp-sync-interval:
1823    *
1824    * Determines how often to sync streams using RTCP data.
1825    *
1826    * Since: 0.10.31
1827    */
1828   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
1829       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
1830           "RTCP SR interval synchronization (ms) (0 = always)",
1831           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
1832           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1833
1834   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1835   gstelement_class->request_new_pad =
1836       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1837   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1838
1839   /* sink pads */
1840   gst_element_class_add_pad_template (gstelement_class,
1841       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
1842   gst_element_class_add_pad_template (gstelement_class,
1843       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
1844   gst_element_class_add_pad_template (gstelement_class,
1845       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
1846
1847   /* src pads */
1848   gst_element_class_add_pad_template (gstelement_class,
1849       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
1850   gst_element_class_add_pad_template (gstelement_class,
1851       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
1852   gst_element_class_add_pad_template (gstelement_class,
1853       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
1854
1855   gst_element_class_set_details_simple (gstelement_class, "RTP Bin",
1856       "Filter/Network/RTP",
1857       "Real-Time Transport Protocol bin",
1858       "Wim Taymans <wim.taymans@gmail.com>");
1859
1860   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
1861
1862   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1863   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
1864   klass->get_internal_session =
1865       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
1866
1867   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1868 }
1869
1870 static void
1871 gst_rtp_bin_init (GstRtpBin * rtpbin)
1872 {
1873   gchar *cname;
1874
1875   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1876   g_mutex_init (&rtpbin->priv->bin_lock);
1877   g_mutex_init (&rtpbin->priv->dyn_lock);
1878
1879   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
1880   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
1881   rtpbin->do_lost = DEFAULT_DO_LOST;
1882   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
1883   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
1884   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
1885   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
1886   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
1887   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
1888   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
1889
1890   /* some default SDES entries */
1891   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
1892   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
1893       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
1894   g_free (cname);
1895 }
1896
1897 static void
1898 gst_rtp_bin_dispose (GObject * object)
1899 {
1900   GstRtpBin *rtpbin;
1901
1902   rtpbin = GST_RTP_BIN (object);
1903
1904   GST_RTP_BIN_LOCK (rtpbin);
1905   GST_DEBUG_OBJECT (object, "freeing sessions");
1906   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
1907   g_slist_free (rtpbin->sessions);
1908   rtpbin->sessions = NULL;
1909   GST_DEBUG_OBJECT (object, "freeing clients");
1910   g_slist_foreach (rtpbin->clients, (GFunc) free_client, rtpbin);
1911   g_slist_free (rtpbin->clients);
1912   rtpbin->clients = NULL;
1913   GST_RTP_BIN_UNLOCK (rtpbin);
1914
1915   G_OBJECT_CLASS (parent_class)->dispose (object);
1916 }
1917
1918 static void
1919 gst_rtp_bin_finalize (GObject * object)
1920 {
1921   GstRtpBin *rtpbin;
1922
1923   rtpbin = GST_RTP_BIN (object);
1924
1925   if (rtpbin->sdes)
1926     gst_structure_free (rtpbin->sdes);
1927
1928   g_mutex_clear (&rtpbin->priv->bin_lock);
1929   g_mutex_clear (&rtpbin->priv->dyn_lock);
1930
1931   G_OBJECT_CLASS (parent_class)->finalize (object);
1932 }
1933
1934
1935 static void
1936 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
1937 {
1938   GSList *item;
1939
1940   if (sdes == NULL)
1941     return;
1942
1943   GST_RTP_BIN_LOCK (bin);
1944
1945   GST_OBJECT_LOCK (bin);
1946   if (bin->sdes)
1947     gst_structure_free (bin->sdes);
1948   bin->sdes = gst_structure_copy (sdes);
1949   GST_OBJECT_UNLOCK (bin);
1950
1951   /* store in all sessions */
1952   for (item = bin->sessions; item; item = g_slist_next (item)) {
1953     GstRtpBinSession *session = item->data;
1954     g_object_set (session->session, "sdes", sdes, NULL);
1955   }
1956
1957   GST_RTP_BIN_UNLOCK (bin);
1958 }
1959
1960 static GstStructure *
1961 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
1962 {
1963   GstStructure *result;
1964
1965   GST_OBJECT_LOCK (bin);
1966   result = gst_structure_copy (bin->sdes);
1967   GST_OBJECT_UNLOCK (bin);
1968
1969   return result;
1970 }
1971
1972 static void
1973 gst_rtp_bin_set_property (GObject * object, guint prop_id,
1974     const GValue * value, GParamSpec * pspec)
1975 {
1976   GstRtpBin *rtpbin;
1977
1978   rtpbin = GST_RTP_BIN (object);
1979
1980   switch (prop_id) {
1981     case PROP_LATENCY:
1982       GST_RTP_BIN_LOCK (rtpbin);
1983       rtpbin->latency_ms = g_value_get_uint (value);
1984       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
1985       GST_RTP_BIN_UNLOCK (rtpbin);
1986       /* propagate the property down to the jitterbuffer */
1987       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
1988       break;
1989     case PROP_SDES:
1990       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
1991       break;
1992     case PROP_DO_LOST:
1993       GST_RTP_BIN_LOCK (rtpbin);
1994       rtpbin->do_lost = g_value_get_boolean (value);
1995       GST_RTP_BIN_UNLOCK (rtpbin);
1996       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
1997       break;
1998     case PROP_NTP_SYNC:
1999       rtpbin->ntp_sync = g_value_get_boolean (value);
2000       break;
2001     case PROP_RTCP_SYNC:
2002       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2003       break;
2004     case PROP_RTCP_SYNC_INTERVAL:
2005       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2006       break;
2007     case PROP_IGNORE_PT:
2008       rtpbin->ignore_pt = g_value_get_boolean (value);
2009       break;
2010     case PROP_AUTOREMOVE:
2011       rtpbin->priv->autoremove = g_value_get_boolean (value);
2012       break;
2013     case PROP_USE_PIPELINE_CLOCK:
2014     {
2015       GSList *sessions;
2016       GST_RTP_BIN_LOCK (rtpbin);
2017       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2018       for (sessions = rtpbin->sessions; sessions;
2019           sessions = g_slist_next (sessions)) {
2020         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2021
2022         g_object_set (G_OBJECT (session->session),
2023             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2024       }
2025       GST_RTP_BIN_UNLOCK (rtpbin);
2026     }
2027       break;
2028     case PROP_BUFFER_MODE:
2029       GST_RTP_BIN_LOCK (rtpbin);
2030       rtpbin->buffer_mode = g_value_get_enum (value);
2031       GST_RTP_BIN_UNLOCK (rtpbin);
2032       /* propagate the property down to the jitterbuffer */
2033       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2034       break;
2035     default:
2036       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2037       break;
2038   }
2039 }
2040
2041 static void
2042 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2043     GValue * value, GParamSpec * pspec)
2044 {
2045   GstRtpBin *rtpbin;
2046
2047   rtpbin = GST_RTP_BIN (object);
2048
2049   switch (prop_id) {
2050     case PROP_LATENCY:
2051       GST_RTP_BIN_LOCK (rtpbin);
2052       g_value_set_uint (value, rtpbin->latency_ms);
2053       GST_RTP_BIN_UNLOCK (rtpbin);
2054       break;
2055     case PROP_SDES:
2056       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2057       break;
2058     case PROP_DO_LOST:
2059       GST_RTP_BIN_LOCK (rtpbin);
2060       g_value_set_boolean (value, rtpbin->do_lost);
2061       GST_RTP_BIN_UNLOCK (rtpbin);
2062       break;
2063     case PROP_IGNORE_PT:
2064       g_value_set_boolean (value, rtpbin->ignore_pt);
2065       break;
2066     case PROP_NTP_SYNC:
2067       g_value_set_boolean (value, rtpbin->ntp_sync);
2068       break;
2069     case PROP_RTCP_SYNC:
2070       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2071       break;
2072     case PROP_RTCP_SYNC_INTERVAL:
2073       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2074       break;
2075     case PROP_AUTOREMOVE:
2076       g_value_set_boolean (value, rtpbin->priv->autoremove);
2077       break;
2078     case PROP_BUFFER_MODE:
2079       g_value_set_enum (value, rtpbin->buffer_mode);
2080       break;
2081     case PROP_USE_PIPELINE_CLOCK:
2082       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2083       break;
2084     default:
2085       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2086       break;
2087   }
2088 }
2089
2090 static void
2091 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2092 {
2093   GstRtpBin *rtpbin;
2094
2095   rtpbin = GST_RTP_BIN (bin);
2096
2097   switch (GST_MESSAGE_TYPE (message)) {
2098     case GST_MESSAGE_ELEMENT:
2099     {
2100       const GstStructure *s = gst_message_get_structure (message);
2101
2102       /* we change the structure name and add the session ID to it */
2103       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2104         GstRtpBinSession *sess;
2105
2106         /* find the session we set it as object data */
2107         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2108             "GstRTPBin.session");
2109
2110         if (G_LIKELY (sess)) {
2111           message = gst_message_make_writable (message);
2112           s = gst_message_get_structure (message);
2113           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2114               sess->id, NULL);
2115         }
2116       }
2117       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2118       break;
2119     }
2120     case GST_MESSAGE_BUFFERING:
2121     {
2122       gint percent;
2123       gint min_percent = 100;
2124       GSList *sessions, *streams;
2125       GstRtpBinStream *stream;
2126       gboolean change = FALSE, active = FALSE;
2127       GstClockTime min_out_time;
2128       GstBufferingMode mode;
2129       gint avg_in, avg_out;
2130       gint64 buffering_left;
2131
2132       gst_message_parse_buffering (message, &percent);
2133       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2134           &buffering_left);
2135
2136       stream =
2137           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2138           "GstRTPBin.stream");
2139
2140       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2141
2142       /* get the stream */
2143       if (G_LIKELY (stream)) {
2144         GST_RTP_BIN_LOCK (rtpbin);
2145         /* fill in the percent */
2146         stream->percent = percent;
2147
2148         /* calculate the min value for all streams */
2149         for (sessions = rtpbin->sessions; sessions;
2150             sessions = g_slist_next (sessions)) {
2151           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2152
2153           GST_RTP_SESSION_LOCK (session);
2154           if (session->streams) {
2155             for (streams = session->streams; streams;
2156                 streams = g_slist_next (streams)) {
2157               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2158
2159               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2160                   stream->percent);
2161
2162               /* find min percent */
2163               if (min_percent > stream->percent)
2164                 min_percent = stream->percent;
2165             }
2166           } else {
2167             GST_INFO_OBJECT (bin,
2168                 "session has no streams, setting min_percent to 0");
2169             min_percent = 0;
2170           }
2171           GST_RTP_SESSION_UNLOCK (session);
2172         }
2173         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2174
2175         if (rtpbin->buffering) {
2176           if (min_percent == 100) {
2177             rtpbin->buffering = FALSE;
2178             active = TRUE;
2179             change = TRUE;
2180           }
2181         } else {
2182           if (min_percent < 100) {
2183             /* pause the streams */
2184             rtpbin->buffering = TRUE;
2185             active = FALSE;
2186             change = TRUE;
2187           }
2188         }
2189         GST_RTP_BIN_UNLOCK (rtpbin);
2190
2191         gst_message_unref (message);
2192
2193         /* make a new buffering message with the min value */
2194         message =
2195             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2196         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2197             buffering_left);
2198
2199         if (G_UNLIKELY (change)) {
2200           GstClock *clock;
2201           guint64 running_time = 0;
2202           guint64 offset = 0;
2203
2204           /* figure out the running time when we have a clock */
2205           if (G_LIKELY ((clock =
2206                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2207             guint64 now, base_time;
2208
2209             now = gst_clock_get_time (clock);
2210             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2211             running_time = now - base_time;
2212             gst_object_unref (clock);
2213           }
2214           GST_DEBUG_OBJECT (bin,
2215               "running time now %" GST_TIME_FORMAT,
2216               GST_TIME_ARGS (running_time));
2217
2218           GST_RTP_BIN_LOCK (rtpbin);
2219
2220           /* when we reactivate, calculate the offsets so that all streams have
2221            * an output time that is at least as big as the running_time */
2222           offset = 0;
2223           if (active) {
2224             if (running_time > rtpbin->buffer_start) {
2225               offset = running_time - rtpbin->buffer_start;
2226               if (offset >= rtpbin->latency_ns)
2227                 offset -= rtpbin->latency_ns;
2228               else
2229                 offset = 0;
2230             }
2231           }
2232
2233           /* pause all streams */
2234           min_out_time = -1;
2235           for (sessions = rtpbin->sessions; sessions;
2236               sessions = g_slist_next (sessions)) {
2237             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2238
2239             GST_RTP_SESSION_LOCK (session);
2240             for (streams = session->streams; streams;
2241                 streams = g_slist_next (streams)) {
2242               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2243               GstElement *element = stream->buffer;
2244               guint64 last_out;
2245
2246               g_signal_emit_by_name (element, "set-active", active, offset,
2247                   &last_out);
2248
2249               if (!active) {
2250                 g_object_get (element, "percent", &stream->percent, NULL);
2251
2252                 if (last_out == -1)
2253                   last_out = 0;
2254                 if (min_out_time == -1 || last_out < min_out_time)
2255                   min_out_time = last_out;
2256               }
2257
2258               GST_DEBUG_OBJECT (bin,
2259                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2260                   GST_TIME_FORMAT ", percent %d", element, active,
2261                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2262                   stream->percent);
2263             }
2264             GST_RTP_SESSION_UNLOCK (session);
2265           }
2266           GST_DEBUG_OBJECT (bin,
2267               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2268
2269           /* the buffer_start is the min out time of all paused jitterbuffers */
2270           if (!active)
2271             rtpbin->buffer_start = min_out_time;
2272
2273           GST_RTP_BIN_UNLOCK (rtpbin);
2274         }
2275       }
2276       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2277       break;
2278     }
2279     default:
2280     {
2281       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2282       break;
2283     }
2284   }
2285 }
2286
2287 static GstStateChangeReturn
2288 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2289 {
2290   GstStateChangeReturn res;
2291   GstRtpBin *rtpbin;
2292   GstRtpBinPrivate *priv;
2293
2294   rtpbin = GST_RTP_BIN (element);
2295   priv = rtpbin->priv;
2296
2297   switch (transition) {
2298     case GST_STATE_CHANGE_NULL_TO_READY:
2299       break;
2300     case GST_STATE_CHANGE_READY_TO_PAUSED:
2301       priv->last_unix = 0;
2302       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2303       g_atomic_int_set (&priv->shutdown, 0);
2304       break;
2305     case GST_STATE_CHANGE_PAUSED_TO_READY:
2306       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2307       g_atomic_int_set (&priv->shutdown, 1);
2308       /* wait for all callbacks to end by taking the lock. No new callbacks will
2309        * be able to happen as we set the shutdown flag. */
2310       GST_RTP_BIN_DYN_LOCK (rtpbin);
2311       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2312       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2313       break;
2314     default:
2315       break;
2316   }
2317
2318   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2319
2320   switch (transition) {
2321     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2322       break;
2323     case GST_STATE_CHANGE_PAUSED_TO_READY:
2324       break;
2325     case GST_STATE_CHANGE_READY_TO_NULL:
2326       break;
2327     default:
2328       break;
2329   }
2330   return res;
2331 }
2332
2333 /* a new pad (SSRC) was created in @session. This signal is emited from the
2334  * payload demuxer. */
2335 static void
2336 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2337     GstRtpBinStream * stream)
2338 {
2339   GstRtpBin *rtpbin;
2340   GstElementClass *klass;
2341   GstPadTemplate *templ;
2342   gchar *padname;
2343   GstPad *gpad;
2344
2345   rtpbin = stream->bin;
2346
2347   GST_DEBUG ("new payload pad %d", pt);
2348
2349   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2350
2351   /* ghost the pad to the parent */
2352   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2353   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2354   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2355       stream->session->id, stream->ssrc, pt);
2356   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2357   g_free (padname);
2358   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2359
2360   gst_pad_set_active (gpad, TRUE);
2361   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2362
2363   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2364
2365   return;
2366
2367 shutdown:
2368   {
2369     GST_DEBUG ("ignoring, we are shutting down");
2370     return;
2371   }
2372 }
2373
2374 static void
2375 payload_pad_removed (GstElement * element, GstPad * pad,
2376     GstRtpBinStream * stream)
2377 {
2378   GstRtpBin *rtpbin;
2379   GstPad *gpad;
2380
2381   rtpbin = stream->bin;
2382
2383   GST_DEBUG ("payload pad removed");
2384
2385   GST_RTP_BIN_DYN_LOCK (rtpbin);
2386   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2387     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2388
2389     gst_pad_set_active (gpad, FALSE);
2390     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2391   }
2392   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2393 }
2394
2395 static GstCaps *
2396 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2397 {
2398   GstRtpBin *rtpbin;
2399   GstCaps *caps;
2400
2401   rtpbin = session->bin;
2402
2403   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2404       session->id);
2405
2406   caps = get_pt_map (session, pt);
2407   if (!caps)
2408     goto no_caps;
2409
2410   return caps;
2411
2412   /* ERRORS */
2413 no_caps:
2414   {
2415     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2416     return NULL;
2417   }
2418 }
2419
2420 static void
2421 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2422 {
2423   GST_DEBUG_OBJECT (session->bin,
2424       "emiting signal for pt type changed to %d in session %d", pt,
2425       session->id);
2426
2427   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2428       0, session->id, pt);
2429 }
2430
2431 /* emited when caps changed for the session */
2432 static void
2433 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2434 {
2435   GstRtpBin *bin;
2436   GstCaps *caps;
2437   gint payload;
2438   const GstStructure *s;
2439
2440   bin = session->bin;
2441
2442   g_object_get (pad, "caps", &caps, NULL);
2443
2444   if (caps == NULL)
2445     return;
2446
2447   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2448
2449   s = gst_caps_get_structure (caps, 0);
2450
2451   /* get payload, finish when it's not there */
2452   if (!gst_structure_get_int (s, "payload", &payload))
2453     return;
2454
2455   GST_RTP_SESSION_LOCK (session);
2456   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2457   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2458   GST_RTP_SESSION_UNLOCK (session);
2459 }
2460
2461 /* a new pad (SSRC) was created in @session */
2462 static void
2463 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2464     GstRtpBinSession * session)
2465 {
2466   GstRtpBin *rtpbin;
2467   GstRtpBinStream *stream;
2468   GstPad *sinkpad, *srcpad;
2469   gchar *padname;
2470
2471   rtpbin = session->bin;
2472
2473   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2474       GST_DEBUG_PAD_NAME (pad));
2475
2476   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2477
2478   GST_RTP_SESSION_LOCK (session);
2479
2480   /* create new stream */
2481   stream = create_stream (session, ssrc);
2482   if (!stream)
2483     goto no_stream;
2484
2485   /* get pad and link */
2486   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2487   padname = g_strdup_printf ("src_%u", ssrc);
2488   srcpad = gst_element_get_static_pad (element, padname);
2489   g_free (padname);
2490   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2491   gst_pad_link (srcpad, sinkpad);
2492   gst_object_unref (sinkpad);
2493   gst_object_unref (srcpad);
2494
2495   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2496   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2497   srcpad = gst_element_get_static_pad (element, padname);
2498   g_free (padname);
2499   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2500   gst_pad_link (srcpad, sinkpad);
2501   gst_object_unref (sinkpad);
2502   gst_object_unref (srcpad);
2503
2504   /* connect to the RTCP sync signal from the jitterbuffer */
2505   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2506   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2507       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2508
2509   if (stream->demux) {
2510     /* connect to the new-pad signal of the payload demuxer, this will expose the
2511      * new pad by ghosting it. */
2512     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2513         "new-payload-type", (GCallback) new_payload_found, stream);
2514     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2515         "pad-removed", (GCallback) payload_pad_removed, stream);
2516
2517     /* connect to the request-pt-map signal. This signal will be emited by the
2518      * demuxer so that it can apply a proper caps on the buffers for the
2519      * depayloaders. */
2520     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2521         "request-pt-map", (GCallback) pt_map_requested, session);
2522     /* connect to the  signal so it can be forwarded. */
2523     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2524         "payload-type-change", (GCallback) payload_type_change, session);
2525   } else {
2526     /* add gstrtpjitterbuffer src pad to pads */
2527     GstElementClass *klass;
2528     GstPadTemplate *templ;
2529     gchar *padname;
2530     GstPad *gpad, *pad;
2531
2532     pad = gst_element_get_static_pad (stream->buffer, "src");
2533
2534     /* ghost the pad to the parent */
2535     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2536     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2537     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2538         stream->session->id, stream->ssrc, 255);
2539     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2540     g_free (padname);
2541
2542     gst_pad_set_active (gpad, TRUE);
2543     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2544
2545     gst_object_unref (pad);
2546   }
2547
2548   GST_RTP_SESSION_UNLOCK (session);
2549   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2550
2551   return;
2552
2553   /* ERRORS */
2554 shutdown:
2555   {
2556     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2557     return;
2558   }
2559 no_stream:
2560   {
2561     GST_RTP_SESSION_UNLOCK (session);
2562     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2563     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2564     return;
2565   }
2566 }
2567
2568 /* Create a pad for receiving RTP for the session in @name. Must be called with
2569  * RTP_BIN_LOCK.
2570  */
2571 static GstPad *
2572 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2573 {
2574   GstPad *sinkdpad;
2575   guint sessid;
2576   GstRtpBinSession *session;
2577   GstPadLinkReturn lres;
2578
2579   /* first get the session number */
2580   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2581     goto no_name;
2582
2583   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2584
2585   /* get or create session */
2586   session = find_session_by_id (rtpbin, sessid);
2587   if (!session) {
2588     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2589     /* create session now */
2590     session = create_session (rtpbin, sessid);
2591     if (session == NULL)
2592       goto create_error;
2593   }
2594
2595   /* check if pad was requested */
2596   if (session->recv_rtp_sink_ghost != NULL)
2597     return session->recv_rtp_sink_ghost;
2598
2599   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2600   /* get recv_rtp pad and store */
2601   session->recv_rtp_sink =
2602       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2603   if (session->recv_rtp_sink == NULL)
2604     goto pad_failed;
2605
2606   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2607       (GCallback) caps_changed, session);
2608
2609   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2610   /* get srcpad, link to SSRCDemux */
2611   session->recv_rtp_src =
2612       gst_element_get_static_pad (session->session, "recv_rtp_src");
2613   if (session->recv_rtp_src == NULL)
2614     goto pad_failed;
2615
2616   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2617   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2618   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2619   lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
2620   gst_object_unref (sinkdpad);
2621   if (lres != GST_PAD_LINK_OK)
2622     goto link_failed;
2623
2624   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2625   session->demux_newpad_sig = g_signal_connect (session->demux,
2626       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2627   session->demux_padremoved_sig = g_signal_connect (session->demux,
2628       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2629
2630   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2631   session->recv_rtp_sink_ghost =
2632       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
2633   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2634   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2635
2636   return session->recv_rtp_sink_ghost;
2637
2638   /* ERRORS */
2639 no_name:
2640   {
2641     g_warning ("rtpbin: invalid name given");
2642     return NULL;
2643   }
2644 create_error:
2645   {
2646     /* create_session already warned */
2647     return NULL;
2648   }
2649 pad_failed:
2650   {
2651     g_warning ("rtpbin: failed to get session pad");
2652     return NULL;
2653   }
2654 link_failed:
2655   {
2656     g_warning ("rtpbin: failed to link pads");
2657     return NULL;
2658   }
2659 }
2660
2661 static void
2662 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2663 {
2664   if (session->demux_newpad_sig) {
2665     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
2666     session->demux_newpad_sig = 0;
2667   }
2668   if (session->demux_padremoved_sig) {
2669     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
2670     session->demux_padremoved_sig = 0;
2671   }
2672   if (session->recv_rtp_src) {
2673     gst_object_unref (session->recv_rtp_src);
2674     session->recv_rtp_src = NULL;
2675   }
2676   if (session->recv_rtp_sink) {
2677     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
2678     gst_object_unref (session->recv_rtp_sink);
2679     session->recv_rtp_sink = NULL;
2680   }
2681   if (session->recv_rtp_sink_ghost) {
2682     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
2683     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2684         session->recv_rtp_sink_ghost);
2685     session->recv_rtp_sink_ghost = NULL;
2686   }
2687 }
2688
2689 /* Create a pad for receiving RTCP for the session in @name. Must be called with
2690  * RTP_BIN_LOCK.
2691  */
2692 static GstPad *
2693 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
2694     const gchar * name)
2695 {
2696   guint sessid;
2697   GstRtpBinSession *session;
2698   GstPad *sinkdpad;
2699   GstPadLinkReturn lres;
2700
2701   /* first get the session number */
2702   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
2703     goto no_name;
2704
2705   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2706
2707   /* get or create the session */
2708   session = find_session_by_id (rtpbin, sessid);
2709   if (!session) {
2710     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2711     /* create session now */
2712     session = create_session (rtpbin, sessid);
2713     if (session == NULL)
2714       goto create_error;
2715   }
2716
2717   /* check if pad was requested */
2718   if (session->recv_rtcp_sink_ghost != NULL)
2719     return session->recv_rtcp_sink_ghost;
2720
2721   /* get recv_rtp pad and store */
2722   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
2723   session->recv_rtcp_sink =
2724       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
2725   if (session->recv_rtcp_sink == NULL)
2726     goto pad_failed;
2727
2728   /* get srcpad, link to SSRCDemux */
2729   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
2730   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
2731   if (session->sync_src == NULL)
2732     goto pad_failed;
2733
2734   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
2735   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
2736   lres = gst_pad_link (session->sync_src, sinkdpad);
2737   gst_object_unref (sinkdpad);
2738   if (lres != GST_PAD_LINK_OK)
2739     goto link_failed;
2740
2741   session->recv_rtcp_sink_ghost =
2742       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
2743   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
2744   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
2745       session->recv_rtcp_sink_ghost);
2746
2747   return session->recv_rtcp_sink_ghost;
2748
2749   /* ERRORS */
2750 no_name:
2751   {
2752     g_warning ("rtpbin: invalid name given");
2753     return NULL;
2754   }
2755 create_error:
2756   {
2757     /* create_session already warned */
2758     return NULL;
2759   }
2760 pad_failed:
2761   {
2762     g_warning ("rtpbin: failed to get session pad");
2763     return NULL;
2764   }
2765 link_failed:
2766   {
2767     g_warning ("rtpbin: failed to link pads");
2768     return NULL;
2769   }
2770 }
2771
2772 static void
2773 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2774 {
2775   if (session->recv_rtcp_sink_ghost) {
2776     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
2777     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2778         session->recv_rtcp_sink_ghost);
2779     session->recv_rtcp_sink_ghost = NULL;
2780   }
2781   if (session->sync_src) {
2782     /* releasing the request pad should also unref the sync pad */
2783     gst_object_unref (session->sync_src);
2784     session->sync_src = NULL;
2785   }
2786   if (session->recv_rtcp_sink) {
2787     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
2788     gst_object_unref (session->recv_rtcp_sink);
2789     session->recv_rtcp_sink = NULL;
2790   }
2791 }
2792
2793 /* Create a pad for sending RTP for the session in @name. Must be called with
2794  * RTP_BIN_LOCK.
2795  */
2796 static GstPad *
2797 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2798 {
2799   gchar *gname;
2800   guint sessid;
2801   GstRtpBinSession *session;
2802   GstElementClass *klass;
2803
2804   /* first get the session number */
2805   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
2806     goto no_name;
2807
2808   /* get or create session */
2809   session = find_session_by_id (rtpbin, sessid);
2810   if (!session) {
2811     /* create session now */
2812     session = create_session (rtpbin, sessid);
2813     if (session == NULL)
2814       goto create_error;
2815   }
2816
2817   /* check if pad was requested */
2818   if (session->send_rtp_sink_ghost != NULL)
2819     return session->send_rtp_sink_ghost;
2820
2821   /* get send_rtp pad and store */
2822   session->send_rtp_sink =
2823       gst_element_get_request_pad (session->session, "send_rtp_sink");
2824   if (session->send_rtp_sink == NULL)
2825     goto pad_failed;
2826
2827   session->send_rtp_sink_ghost =
2828       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
2829   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
2830   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
2831
2832   /* get srcpad */
2833   session->send_rtp_src =
2834       gst_element_get_static_pad (session->session, "send_rtp_src");
2835   if (session->send_rtp_src == NULL)
2836     goto no_srcpad;
2837
2838   /* ghost the new source pad */
2839   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2840   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
2841   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
2842   session->send_rtp_src_ghost =
2843       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
2844   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
2845   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
2846   g_free (gname);
2847
2848   return session->send_rtp_sink_ghost;
2849
2850   /* ERRORS */
2851 no_name:
2852   {
2853     g_warning ("rtpbin: invalid name given");
2854     return NULL;
2855   }
2856 create_error:
2857   {
2858     /* create_session already warned */
2859     return NULL;
2860   }
2861 pad_failed:
2862   {
2863     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
2864     return NULL;
2865   }
2866 no_srcpad:
2867   {
2868     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
2869     return NULL;
2870   }
2871 }
2872
2873 static void
2874 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2875 {
2876   if (session->send_rtp_src_ghost) {
2877     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
2878     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2879         session->send_rtp_src_ghost);
2880     session->send_rtp_src_ghost = NULL;
2881   }
2882   if (session->send_rtp_src) {
2883     gst_object_unref (session->send_rtp_src);
2884     session->send_rtp_src = NULL;
2885   }
2886   if (session->send_rtp_sink) {
2887     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
2888         session->send_rtp_sink);
2889     gst_object_unref (session->send_rtp_sink);
2890     session->send_rtp_sink = NULL;
2891   }
2892   if (session->send_rtp_sink_ghost) {
2893     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
2894     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2895         session->send_rtp_sink_ghost);
2896     session->send_rtp_sink_ghost = NULL;
2897   }
2898 }
2899
2900 /* Create a pad for sending RTCP for the session in @name. Must be called with
2901  * RTP_BIN_LOCK.
2902  */
2903 static GstPad *
2904 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2905 {
2906   guint sessid;
2907   GstRtpBinSession *session;
2908
2909   /* first get the session number */
2910   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
2911     goto no_name;
2912
2913   /* get or create session */
2914   session = find_session_by_id (rtpbin, sessid);
2915   if (!session)
2916     goto no_session;
2917
2918   /* check if pad was requested */
2919   if (session->send_rtcp_src_ghost != NULL)
2920     return session->send_rtcp_src_ghost;
2921
2922   /* get rtcp_src pad and store */
2923   session->send_rtcp_src =
2924       gst_element_get_request_pad (session->session, "send_rtcp_src");
2925   if (session->send_rtcp_src == NULL)
2926     goto pad_failed;
2927
2928   session->send_rtcp_src_ghost =
2929       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
2930   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
2931   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
2932
2933   return session->send_rtcp_src_ghost;
2934
2935   /* ERRORS */
2936 no_name:
2937   {
2938     g_warning ("rtpbin: invalid name given");
2939     return NULL;
2940   }
2941 no_session:
2942   {
2943     g_warning ("rtpbin: session with id %d does not exist", sessid);
2944     return NULL;
2945   }
2946 pad_failed:
2947   {
2948     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
2949     return NULL;
2950   }
2951 }
2952
2953 static void
2954 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2955 {
2956   if (session->send_rtcp_src_ghost) {
2957     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
2958     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2959         session->send_rtcp_src_ghost);
2960     session->send_rtcp_src_ghost = NULL;
2961   }
2962   if (session->send_rtcp_src) {
2963     gst_element_release_request_pad (session->session, session->send_rtcp_src);
2964     gst_object_unref (session->send_rtcp_src);
2965     session->send_rtcp_src = NULL;
2966   }
2967 }
2968
2969 /* If the requested name is NULL we should create a name with
2970  * the session number assuming we want the lowest posible session
2971  * with a free pad like the template */
2972 static gchar *
2973 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
2974 {
2975   gboolean name_found = FALSE;
2976   gint session = 0;
2977   GstIterator *pad_it = NULL;
2978   gchar *pad_name = NULL;
2979   GValue data = { 0, };
2980
2981   GST_DEBUG_OBJECT (element, "find a free pad name for template");
2982   while (!name_found) {
2983     gboolean done = FALSE;
2984
2985     g_free (pad_name);
2986     pad_name = g_strdup_printf (templ->name_template, session++);
2987     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
2988     name_found = TRUE;
2989     while (!done) {
2990       switch (gst_iterator_next (pad_it, &data)) {
2991         case GST_ITERATOR_OK:
2992         {
2993           GstPad *pad;
2994           gchar *name;
2995
2996           pad = g_value_get_object (&data);
2997           name = gst_pad_get_name (pad);
2998
2999           if (strcmp (name, pad_name) == 0) {
3000             done = TRUE;
3001             name_found = FALSE;
3002           }
3003           g_free (name);
3004           g_value_reset (&data);
3005           break;
3006         }
3007         case GST_ITERATOR_ERROR:
3008         case GST_ITERATOR_RESYNC:
3009           /* restart iteration */
3010           done = TRUE;
3011           name_found = FALSE;
3012           session = 0;
3013           break;
3014         case GST_ITERATOR_DONE:
3015           done = TRUE;
3016           break;
3017       }
3018     }
3019     g_value_unset (&data);
3020     gst_iterator_free (pad_it);
3021   }
3022
3023   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3024   return pad_name;
3025 }
3026
3027 /*
3028  */
3029 static GstPad *
3030 gst_rtp_bin_request_new_pad (GstElement * element,
3031     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3032 {
3033   GstRtpBin *rtpbin;
3034   GstElementClass *klass;
3035   GstPad *result;
3036
3037   gchar *pad_name = NULL;
3038
3039   g_return_val_if_fail (templ != NULL, NULL);
3040   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3041
3042   rtpbin = GST_RTP_BIN (element);
3043   klass = GST_ELEMENT_GET_CLASS (element);
3044
3045   GST_RTP_BIN_LOCK (rtpbin);
3046
3047   if (name == NULL) {
3048     /* use a free pad name */
3049     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3050   } else {
3051     /* use the provided name */
3052     pad_name = g_strdup (name);
3053   }
3054
3055   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3056
3057   /* figure out the template */
3058   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3059     result = create_recv_rtp (rtpbin, templ, pad_name);
3060   } else if (templ == gst_element_class_get_pad_template (klass,
3061           "recv_rtcp_sink_%u")) {
3062     result = create_recv_rtcp (rtpbin, templ, pad_name);
3063   } else if (templ == gst_element_class_get_pad_template (klass,
3064           "send_rtp_sink_%u")) {
3065     result = create_send_rtp (rtpbin, templ, pad_name);
3066   } else if (templ == gst_element_class_get_pad_template (klass,
3067           "send_rtcp_src_%u")) {
3068     result = create_rtcp (rtpbin, templ, pad_name);
3069   } else
3070     goto wrong_template;
3071
3072   g_free (pad_name);
3073   GST_RTP_BIN_UNLOCK (rtpbin);
3074
3075   return result;
3076
3077   /* ERRORS */
3078 wrong_template:
3079   {
3080     g_free (pad_name);
3081     GST_RTP_BIN_UNLOCK (rtpbin);
3082     g_warning ("rtpbin: this is not our template");
3083     return NULL;
3084   }
3085 }
3086
3087 static void
3088 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3089 {
3090   GstRtpBinSession *session;
3091   GstRtpBin *rtpbin;
3092
3093   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3094   g_return_if_fail (GST_IS_RTP_BIN (element));
3095
3096   rtpbin = GST_RTP_BIN (element);
3097
3098   GST_RTP_BIN_LOCK (rtpbin);
3099   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3100       GST_DEBUG_PAD_NAME (pad));
3101
3102   if (!(session = find_session_by_pad (rtpbin, pad)))
3103     goto unknown_pad;
3104
3105   if (session->recv_rtp_sink_ghost == pad) {
3106     remove_recv_rtp (rtpbin, session);
3107   } else if (session->recv_rtcp_sink_ghost == pad) {
3108     remove_recv_rtcp (rtpbin, session);
3109   } else if (session->send_rtp_sink_ghost == pad) {
3110     remove_send_rtp (rtpbin, session);
3111   } else if (session->send_rtcp_src_ghost == pad) {
3112     remove_rtcp (rtpbin, session);
3113   }
3114
3115   /* no more request pads, free the complete session */
3116   if (session->recv_rtp_sink_ghost == NULL
3117       && session->recv_rtcp_sink_ghost == NULL
3118       && session->send_rtp_sink_ghost == NULL
3119       && session->send_rtcp_src_ghost == NULL) {
3120     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3121     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3122     free_session (session, rtpbin);
3123   }
3124   GST_RTP_BIN_UNLOCK (rtpbin);
3125
3126   return;
3127
3128   /* ERROR */
3129 unknown_pad:
3130   {
3131     GST_RTP_BIN_UNLOCK (rtpbin);
3132     g_warning ("rtpbin: %s:%s is not one of our request pads",
3133         GST_DEBUG_PAD_NAME (pad));
3134     return;
3135   }
3136 }