docs: get rid of 'Since: 0.10.x' markers
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of rtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * <refsect2>
67  * <title>Example pipelines</title>
68  * |[
69  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
70  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
71  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
72  * |[
73  * gst-launch-1.0 rtpbin name=rtpbin \
74  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
75  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
76  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
77  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
78  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
79  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
80  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
81  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
82  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
83  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
84  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
85  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
86  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
87  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
88  * is received on port 5007. Since RTCP packets from the sender should be sent
89  * as soon as possible and do not participate in preroll, sync=false and
90  * async=false is configured on udpsink
91  * |[
92  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
93  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
94  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
95  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
96  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
97  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
98  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
99  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
100  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
101  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
102  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
103  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
104  * decode and display the video.
105  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
106  * decode and play the audio.
107  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
108  * session 1 on port 5003. These packets will be used for session management and
109  * synchronisation.
110  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
111  * on port 5007.
112  * </refsect2>
113  *
114  * Last reviewed on 2007-08-30 (0.10.6)
115  */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120 #include <stdio.h>
121 #include <string.h>
122
123 #include <gst/rtp/gstrtpbuffer.h>
124 #include <gst/rtp/gstrtcpbuffer.h>
125
126 #include "gstrtpbin.h"
127 #include "rtpsession.h"
128 #include "gstrtpsession.h"
129 #include "gstrtpjitterbuffer.h"
130
131 #include <gst/glib-compat-private.h>
132
133 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
134 #define GST_CAT_DEFAULT gst_rtp_bin_debug
135
136 /* sink pads */
137 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
138 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
139     GST_PAD_SINK,
140     GST_PAD_REQUEST,
141     GST_STATIC_CAPS ("application/x-rtp")
142     );
143
144 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
145 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
146     GST_PAD_SINK,
147     GST_PAD_REQUEST,
148     GST_STATIC_CAPS ("application/x-rtcp")
149     );
150
151 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
152 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
153     GST_PAD_SINK,
154     GST_PAD_REQUEST,
155     GST_STATIC_CAPS ("application/x-rtp")
156     );
157
158 /* src pads */
159 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
160 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
161     GST_PAD_SRC,
162     GST_PAD_SOMETIMES,
163     GST_STATIC_CAPS ("application/x-rtp")
164     );
165
166 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
167 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
168     GST_PAD_SRC,
169     GST_PAD_REQUEST,
170     GST_STATIC_CAPS ("application/x-rtcp")
171     );
172
173 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
174 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
175     GST_PAD_SRC,
176     GST_PAD_SOMETIMES,
177     GST_STATIC_CAPS ("application/x-rtp")
178     );
179
180 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
181    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
182
183 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
184 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
185
186 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
187 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
188 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
189
190 /* lock for shutdown */
191 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
192 G_STMT_START {                                   \
193   if (g_atomic_int_get (&bin->priv->shutdown))   \
194     goto label;                                  \
195   GST_RTP_BIN_DYN_LOCK (bin);                    \
196   if (g_atomic_int_get (&bin->priv->shutdown)) { \
197     GST_RTP_BIN_DYN_UNLOCK (bin);                \
198     goto label;                                  \
199   }                                              \
200 } G_STMT_END
201
202 /* unlock for shutdown */
203 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
204   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
205
206 struct _GstRtpBinPrivate
207 {
208   GMutex bin_lock;
209
210   /* lock protecting dynamic adding/removing */
211   GMutex dyn_lock;
212
213   /* if we are shutting down or not */
214   gint shutdown;
215
216   gboolean autoremove;
217
218   /* UNIX (ntp) time of last SR sync used */
219   guint64 last_unix;
220 };
221
222 /* signals and args */
223 enum
224 {
225   SIGNAL_REQUEST_PT_MAP,
226   SIGNAL_PAYLOAD_TYPE_CHANGE,
227   SIGNAL_CLEAR_PT_MAP,
228   SIGNAL_RESET_SYNC,
229   SIGNAL_GET_INTERNAL_SESSION,
230
231   SIGNAL_ON_NEW_SSRC,
232   SIGNAL_ON_SSRC_COLLISION,
233   SIGNAL_ON_SSRC_VALIDATED,
234   SIGNAL_ON_SSRC_ACTIVE,
235   SIGNAL_ON_SSRC_SDES,
236   SIGNAL_ON_BYE_SSRC,
237   SIGNAL_ON_BYE_TIMEOUT,
238   SIGNAL_ON_TIMEOUT,
239   SIGNAL_ON_SENDER_TIMEOUT,
240   SIGNAL_ON_NPT_STOP,
241   LAST_SIGNAL
242 };
243
244 #define DEFAULT_LATENCY_MS           200
245 #define DEFAULT_DROP_ON_LATENCY      FALSE
246 #define DEFAULT_SDES                 NULL
247 #define DEFAULT_DO_LOST              FALSE
248 #define DEFAULT_IGNORE_PT            FALSE
249 #define DEFAULT_NTP_SYNC             FALSE
250 #define DEFAULT_AUTOREMOVE           FALSE
251 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
252 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
253 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
254 #define DEFAULT_RTCP_SYNC_INTERVAL   0
255 #define DEFAULT_DO_SYNC_EVENT        FALSE
256 #define DEFAULT_DO_RETRANSMISSION    FALSE
257
258 enum
259 {
260   PROP_0,
261   PROP_LATENCY,
262   PROP_DROP_ON_LATENCY,
263   PROP_SDES,
264   PROP_DO_LOST,
265   PROP_IGNORE_PT,
266   PROP_NTP_SYNC,
267   PROP_RTCP_SYNC,
268   PROP_RTCP_SYNC_INTERVAL,
269   PROP_AUTOREMOVE,
270   PROP_BUFFER_MODE,
271   PROP_USE_PIPELINE_CLOCK,
272   PROP_DO_SYNC_EVENT,
273   PROP_DO_RETRANSMISSION,
274   PROP_LAST
275 };
276
277 enum
278 {
279   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
280   GST_RTP_BIN_RTCP_SYNC_INITIAL,
281   GST_RTP_BIN_RTCP_SYNC_RTP
282 };
283
284 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
285 static GType
286 gst_rtp_bin_rtcp_sync_get_type (void)
287 {
288   static GType rtcp_sync_type = 0;
289   static const GEnumValue rtcp_sync_types[] = {
290     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
291     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
292     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
293     {0, NULL, NULL},
294   };
295
296   if (!rtcp_sync_type) {
297     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
298   }
299   return rtcp_sync_type;
300 }
301
302 /* helper objects */
303 typedef struct _GstRtpBinSession GstRtpBinSession;
304 typedef struct _GstRtpBinStream GstRtpBinStream;
305 typedef struct _GstRtpBinClient GstRtpBinClient;
306
307 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
308
309 static GstCaps *pt_map_requested (GstElement * element, guint pt,
310     GstRtpBinSession * session);
311 static void payload_type_change (GstElement * element, guint pt,
312     GstRtpBinSession * session);
313 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
314 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
315 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
316 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
317 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
318 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
319
320 /* Manages the RTP stream for one SSRC.
321  *
322  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
323  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
324  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
325  * together (see below).
326  */
327 struct _GstRtpBinStream
328 {
329   /* the SSRC of this stream */
330   guint32 ssrc;
331
332   /* parent bin */
333   GstRtpBin *bin;
334
335   /* the session this SSRC belongs to */
336   GstRtpBinSession *session;
337
338   /* the jitterbuffer of the SSRC */
339   GstElement *buffer;
340   gulong buffer_handlesync_sig;
341   gulong buffer_ptreq_sig;
342   gulong buffer_ntpstop_sig;
343   gint percent;
344
345   /* the PT demuxer of the SSRC */
346   GstElement *demux;
347   gulong demux_newpad_sig;
348   gulong demux_padremoved_sig;
349   gulong demux_ptreq_sig;
350   gulong demux_ptchange_sig;
351
352   /* if we have calculated a valid rt_delta for this stream */
353   gboolean have_sync;
354   /* mapping to local RTP and NTP time */
355   gint64 rt_delta;
356   gint64 rtp_delta;
357   /* base rtptime in gst time */
358   gint64 clock_base;
359 };
360
361 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
362 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
363
364 /* Manages the receiving end of the packets.
365  *
366  * There is one such structure for each RTP session (audio/video/...).
367  * We get the RTP/RTCP packets and stuff them into the session manager. From
368  * there they are pushed into an SSRC demuxer that splits the stream based on
369  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
370  * the GstRtpBinStream above).
371  */
372 struct _GstRtpBinSession
373 {
374   /* session id */
375   gint id;
376   /* the parent bin */
377   GstRtpBin *bin;
378   /* the session element */
379   GstElement *session;
380   /* the SSRC demuxer */
381   GstElement *demux;
382   gulong demux_newpad_sig;
383   gulong demux_padremoved_sig;
384
385   GMutex lock;
386
387   /* list of GstRtpBinStream */
388   GSList *streams;
389
390   /* mapping of payload type to caps */
391   GHashTable *ptmap;
392
393   /* the pads of the session */
394   GstPad *recv_rtp_sink;
395   GstPad *recv_rtp_sink_ghost;
396   GstPad *recv_rtp_src;
397   GstPad *recv_rtcp_sink;
398   GstPad *recv_rtcp_sink_ghost;
399   GstPad *sync_src;
400   GstPad *send_rtp_sink;
401   GstPad *send_rtp_sink_ghost;
402   GstPad *send_rtp_src;
403   GstPad *send_rtp_src_ghost;
404   GstPad *send_rtcp_src;
405   GstPad *send_rtcp_src_ghost;
406 };
407
408 /* Manages the RTP streams that come from one client and should therefore be
409  * synchronized.
410  */
411 struct _GstRtpBinClient
412 {
413   /* the common CNAME for the streams */
414   gchar *cname;
415   guint cname_len;
416
417   /* the streams */
418   guint nstreams;
419   GSList *streams;
420 };
421
422 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
423 static GstRtpBinSession *
424 find_session_by_id (GstRtpBin * rtpbin, gint id)
425 {
426   GSList *walk;
427
428   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
429     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
430
431     if (sess->id == id)
432       return sess;
433   }
434   return NULL;
435 }
436
437 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
438 static GstRtpBinSession *
439 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
440 {
441   GSList *walk;
442
443   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
444     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
445
446     if ((sess->recv_rtp_sink_ghost == pad) ||
447         (sess->recv_rtcp_sink_ghost == pad) ||
448         (sess->send_rtp_sink_ghost == pad)
449         || (sess->send_rtcp_src_ghost == pad))
450       return sess;
451   }
452   return NULL;
453 }
454
455 static void
456 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
457 {
458   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
459       sess->id, ssrc);
460 }
461
462 static void
463 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
464 {
465   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
466       sess->id, ssrc);
467 }
468
469 static void
470 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
471 {
472   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
473       sess->id, ssrc);
474 }
475
476 static void
477 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
478 {
479   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
480       sess->id, ssrc);
481 }
482
483 static void
484 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
485 {
486   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
487       sess->id, ssrc);
488 }
489
490 static void
491 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
492 {
493   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
494       sess->id, ssrc);
495 }
496
497 static void
498 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
499 {
500   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
501       sess->id, ssrc);
502
503   if (sess->bin->priv->autoremove)
504     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
505 }
506
507 static void
508 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
509 {
510   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
511       sess->id, ssrc);
512
513   if (sess->bin->priv->autoremove)
514     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
515 }
516
517 static void
518 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
519 {
520   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
521       sess->id, ssrc);
522 }
523
524 static void
525 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
526 {
527   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
528       stream->session->id, stream->ssrc);
529 }
530
531 /* must be called with the SESSION lock */
532 static GstRtpBinStream *
533 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
534 {
535   GSList *walk;
536
537   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
538     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
539
540     if (stream->ssrc == ssrc)
541       return stream;
542   }
543   return NULL;
544 }
545
546 static void
547 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
548     GstRtpBinSession * session)
549 {
550   GstRtpBinStream *stream = NULL;
551   GstRtpBin *rtpbin;
552
553   rtpbin = session->bin;
554
555   GST_RTP_BIN_LOCK (rtpbin);
556
557   GST_RTP_SESSION_LOCK (session);
558   if ((stream = find_stream_by_ssrc (session, ssrc)))
559     session->streams = g_slist_remove (session->streams, stream);
560   GST_RTP_SESSION_UNLOCK (session);
561
562   if (stream)
563     free_stream (stream, rtpbin);
564
565   GST_RTP_BIN_UNLOCK (rtpbin);
566 }
567
568 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
569 static GstRtpBinSession *
570 create_session (GstRtpBin * rtpbin, gint id)
571 {
572   GstRtpBinSession *sess;
573   GstElement *session, *demux;
574   GstState target;
575
576   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
577     goto no_session;
578
579   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
580     goto no_demux;
581
582   sess = g_new0 (GstRtpBinSession, 1);
583   g_mutex_init (&sess->lock);
584   sess->id = id;
585   sess->bin = rtpbin;
586   sess->session = session;
587   sess->demux = demux;
588   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
589       (GDestroyNotify) gst_caps_unref);
590   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
591
592   /* configure SDES items */
593   GST_OBJECT_LOCK (rtpbin);
594   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
595       rtpbin->use_pipeline_clock, NULL);
596   GST_OBJECT_UNLOCK (rtpbin);
597
598   /* provide clock_rate to the session manager when needed */
599   g_signal_connect (session, "request-pt-map",
600       (GCallback) pt_map_requested, sess);
601
602   g_signal_connect (sess->session, "on-new-ssrc",
603       (GCallback) on_new_ssrc, sess);
604   g_signal_connect (sess->session, "on-ssrc-collision",
605       (GCallback) on_ssrc_collision, sess);
606   g_signal_connect (sess->session, "on-ssrc-validated",
607       (GCallback) on_ssrc_validated, sess);
608   g_signal_connect (sess->session, "on-ssrc-active",
609       (GCallback) on_ssrc_active, sess);
610   g_signal_connect (sess->session, "on-ssrc-sdes",
611       (GCallback) on_ssrc_sdes, sess);
612   g_signal_connect (sess->session, "on-bye-ssrc",
613       (GCallback) on_bye_ssrc, sess);
614   g_signal_connect (sess->session, "on-bye-timeout",
615       (GCallback) on_bye_timeout, sess);
616   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
617   g_signal_connect (sess->session, "on-sender-timeout",
618       (GCallback) on_sender_timeout, sess);
619
620   gst_bin_add (GST_BIN_CAST (rtpbin), session);
621   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
622
623   GST_OBJECT_LOCK (rtpbin);
624   target = GST_STATE_TARGET (rtpbin);
625   GST_OBJECT_UNLOCK (rtpbin);
626
627   /* change state only to what's needed */
628   gst_element_set_state (demux, target);
629   gst_element_set_state (session, target);
630
631   return sess;
632
633   /* ERRORS */
634 no_session:
635   {
636     g_warning ("rtpbin: could not create rtpsession element");
637     return NULL;
638   }
639 no_demux:
640   {
641     gst_object_unref (session);
642     g_warning ("rtpbin: could not create rtpssrcdemux element");
643     return NULL;
644   }
645 }
646
647 /* called with RTP_BIN_LOCK */
648 static void
649 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
650 {
651   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
652
653   gst_element_set_locked_state (sess->demux, TRUE);
654   gst_element_set_locked_state (sess->session, TRUE);
655
656   gst_element_set_state (sess->demux, GST_STATE_NULL);
657   gst_element_set_state (sess->session, GST_STATE_NULL);
658
659   remove_recv_rtp (bin, sess);
660   remove_recv_rtcp (bin, sess);
661   remove_send_rtp (bin, sess);
662   remove_rtcp (bin, sess);
663
664   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
665   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
666
667   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
668   g_slist_free (sess->streams);
669
670   g_mutex_clear (&sess->lock);
671   g_hash_table_destroy (sess->ptmap);
672
673   g_free (sess);
674 }
675
676 /* get the payload type caps for the specific payload @pt in @session */
677 static GstCaps *
678 get_pt_map (GstRtpBinSession * session, guint pt)
679 {
680   GstCaps *caps = NULL;
681   GstRtpBin *bin;
682   GValue ret = { 0 };
683   GValue args[3] = { {0}, {0}, {0} };
684
685   GST_DEBUG ("searching pt %d in cache", pt);
686
687   GST_RTP_SESSION_LOCK (session);
688
689   /* first look in the cache */
690   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
691   if (caps) {
692     gst_caps_ref (caps);
693     goto done;
694   }
695
696   bin = session->bin;
697
698   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
699
700   /* not in cache, send signal to request caps */
701   g_value_init (&args[0], GST_TYPE_ELEMENT);
702   g_value_set_object (&args[0], bin);
703   g_value_init (&args[1], G_TYPE_UINT);
704   g_value_set_uint (&args[1], session->id);
705   g_value_init (&args[2], G_TYPE_UINT);
706   g_value_set_uint (&args[2], pt);
707
708   g_value_init (&ret, GST_TYPE_CAPS);
709   g_value_set_boxed (&ret, NULL);
710
711   GST_RTP_SESSION_UNLOCK (session);
712
713   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
714
715   GST_RTP_SESSION_LOCK (session);
716
717   g_value_unset (&args[0]);
718   g_value_unset (&args[1]);
719   g_value_unset (&args[2]);
720
721   /* look in the cache again because we let the lock go */
722   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
723   if (caps) {
724     gst_caps_ref (caps);
725     g_value_unset (&ret);
726     goto done;
727   }
728
729   caps = (GstCaps *) g_value_dup_boxed (&ret);
730   g_value_unset (&ret);
731   if (!caps)
732     goto no_caps;
733
734   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
735
736   /* store in cache, take additional ref */
737   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
738       gst_caps_ref (caps));
739
740 done:
741   GST_RTP_SESSION_UNLOCK (session);
742
743   return caps;
744
745   /* ERRORS */
746 no_caps:
747   {
748     GST_RTP_SESSION_UNLOCK (session);
749     GST_DEBUG ("no pt map could be obtained");
750     return NULL;
751   }
752 }
753
754 static gboolean
755 return_true (gpointer key, gpointer value, gpointer user_data)
756 {
757   return TRUE;
758 }
759
760 static void
761 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
762 {
763   GSList *clients, *streams;
764
765   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
766
767   GST_RTP_BIN_LOCK (rtpbin);
768   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
769     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
770
771     /* reset sync on all streams for this client */
772     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
773       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
774
775       /* make use require a new SR packet for this stream before we attempt new
776        * lip-sync */
777       stream->have_sync = FALSE;
778       stream->rt_delta = 0;
779       stream->rtp_delta = 0;
780       stream->clock_base = -100 * GST_SECOND;
781     }
782   }
783   GST_RTP_BIN_UNLOCK (rtpbin);
784 }
785
786 static void
787 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
788 {
789   GSList *sessions, *streams;
790
791   GST_RTP_BIN_LOCK (bin);
792   GST_DEBUG_OBJECT (bin, "clearing pt map");
793   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
794     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
795
796     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
797     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
798
799     GST_RTP_SESSION_LOCK (session);
800     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
801
802     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
803       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
804
805       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
806       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
807       if (stream->demux)
808         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
809     }
810     GST_RTP_SESSION_UNLOCK (session);
811   }
812   GST_RTP_BIN_UNLOCK (bin);
813
814   /* reset sync too */
815   gst_rtp_bin_reset_sync (bin);
816 }
817
818 static RTPSession *
819 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
820 {
821   RTPSession *internal_session = NULL;
822   GstRtpBinSession *session;
823
824   GST_RTP_BIN_LOCK (bin);
825   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
826       session_id);
827   session = find_session_by_id (bin, (gint) session_id);
828   if (session) {
829     g_object_get (session->session, "internal-session", &internal_session,
830         NULL);
831   }
832   GST_RTP_BIN_UNLOCK (bin);
833
834   return internal_session;
835 }
836
837 static void
838 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
839     const gchar * name, const GValue * value)
840 {
841   GSList *sessions, *streams;
842
843   GST_RTP_BIN_LOCK (bin);
844   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
845     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
846
847     GST_RTP_SESSION_LOCK (session);
848     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
849       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
850
851       g_object_set_property (G_OBJECT (stream->buffer), name, value);
852     }
853     GST_RTP_SESSION_UNLOCK (session);
854   }
855   GST_RTP_BIN_UNLOCK (bin);
856 }
857
858 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
859 static GstRtpBinClient *
860 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
861 {
862   GstRtpBinClient *result = NULL;
863   GSList *walk;
864
865   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
866     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
867
868     if (len != client->cname_len)
869       continue;
870
871     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
872       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
873           client->cname);
874       result = client;
875       break;
876     }
877   }
878
879   /* nothing found, create one */
880   if (result == NULL) {
881     result = g_new0 (GstRtpBinClient, 1);
882     result->cname = g_strndup ((gchar *) data, len);
883     result->cname_len = len;
884     bin->clients = g_slist_prepend (bin->clients, result);
885     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
886         result->cname);
887   }
888   return result;
889 }
890
891 static void
892 free_client (GstRtpBinClient * client, GstRtpBin * bin)
893 {
894   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
895   g_slist_free (client->streams);
896   g_free (client->cname);
897   g_free (client);
898 }
899
900 static void
901 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
902     guint64 * ntpnstime)
903 {
904   guint64 ntpns;
905   GstClock *clock;
906   GstClockTime base_time, rt, clock_time;
907
908   GST_OBJECT_LOCK (bin);
909   if ((clock = GST_ELEMENT_CLOCK (bin))) {
910     base_time = GST_ELEMENT_CAST (bin)->base_time;
911     gst_object_ref (clock);
912     GST_OBJECT_UNLOCK (bin);
913
914     clock_time = gst_clock_get_time (clock);
915
916     if (bin->use_pipeline_clock) {
917       ntpns = clock_time - base_time;
918     } else {
919       GTimeVal current;
920
921       /* get current NTP time */
922       g_get_current_time (&current);
923       ntpns = GST_TIMEVAL_TO_TIME (current);
924     }
925
926     /* add constant to convert from 1970 based time to 1900 based time */
927     ntpns += (2208988800LL * GST_SECOND);
928
929     /* get current clock time and convert to running time */
930     rt = clock_time - base_time;
931
932     gst_object_unref (clock);
933   } else {
934     GST_OBJECT_UNLOCK (bin);
935     rt = -1;
936     ntpns = -1;
937   }
938   if (running_time)
939     *running_time = rt;
940   if (ntpnstime)
941     *ntpnstime = ntpns;
942 }
943
944 static void
945 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
946     gint64 ts_offset, gboolean check)
947 {
948   gint64 prev_ts_offset;
949
950   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
951
952   /* delta changed, see how much */
953   if (prev_ts_offset != ts_offset) {
954     gint64 diff;
955
956     diff = prev_ts_offset - ts_offset;
957
958     GST_DEBUG_OBJECT (bin,
959         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
960         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
961
962     if (check) {
963       /* only change diff when it changed more than 4 milliseconds. This
964        * compensates for rounding errors in NTP to RTP timestamp
965        * conversions */
966       if (ABS (diff) < 4 * GST_MSECOND) {
967         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
968         return;
969       }
970       if (ABS (diff) > (3 * GST_SECOND)) {
971         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
972         return;
973       }
974     }
975     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
976   }
977   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
978       stream->ssrc, ts_offset);
979 }
980
981 static void
982 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
983 {
984   if (stream->bin->send_sync_event) {
985     GstEvent *event;
986     GstPad *srcpad;
987
988     GST_DEBUG_OBJECT (stream->bin,
989         "sending GstRTCPSRReceived event downstream");
990
991     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
992         gst_structure_new_empty ("GstRTCPSRReceived"));
993
994     srcpad = gst_element_get_static_pad (stream->buffer, "src");
995     gst_pad_push_event (srcpad, event);
996     gst_object_unref (srcpad);
997   }
998 }
999
1000 /* associate a stream to the given CNAME. This will make sure all streams for
1001  * that CNAME are synchronized together.
1002  * Must be called with GST_RTP_BIN_LOCK */
1003 static void
1004 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1005     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1006     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1007     gint64 rtp_clock_base)
1008 {
1009   GstRtpBinClient *client;
1010   gboolean created;
1011   GSList *walk;
1012   guint64 local_rt;
1013   guint64 local_rtp;
1014   GstClockTime running_time;
1015   guint64 ntpnstime;
1016   gint64 ntpdiff, rtdiff;
1017   guint64 last_unix;
1018
1019   /* first find or create the CNAME */
1020   client = get_client (bin, len, data, &created);
1021
1022   /* find stream in the client */
1023   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1024     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1025
1026     if (ostream == stream)
1027       break;
1028   }
1029   /* not found, add it to the list */
1030   if (walk == NULL) {
1031     GST_DEBUG_OBJECT (bin,
1032         "new association of SSRC %08x with client %p with CNAME %s",
1033         stream->ssrc, client, client->cname);
1034     client->streams = g_slist_prepend (client->streams, stream);
1035     client->nstreams++;
1036   } else {
1037     GST_DEBUG_OBJECT (bin,
1038         "found association of SSRC %08x with client %p with CNAME %s",
1039         stream->ssrc, client, client->cname);
1040   }
1041
1042   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1043     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1044     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1045       /* we don't need that data, so carry on,
1046        * but make some values look saner */
1047       last_extrtptime = base_rtptime;
1048     } else {
1049       /* nothing we can do with this data in this case */
1050       GST_DEBUG_OBJECT (bin, "bailing out");
1051       return;
1052     }
1053   }
1054
1055   /* Take the extended rtptime we found in the SR packet and map it to the
1056    * local rtptime. The local rtp time is used to construct timestamps on the
1057    * buffers so we will calculate what running_time corresponds to the RTP
1058    * timestamp in the SR packet. */
1059   local_rtp = last_extrtptime - base_rtptime;
1060
1061   GST_DEBUG_OBJECT (bin,
1062       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1063       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1064       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1065       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1066
1067   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1068    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1069    * into a corresponding gstreamer timestamp. Note that the base_time also
1070    * contains the drift between sender and receiver. */
1071   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1072   local_rt += base_time;
1073
1074   /* convert ntptime to unix time since 1900 */
1075   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1076       (G_GINT64_CONSTANT (1) << 32));
1077
1078   stream->have_sync = TRUE;
1079
1080   GST_DEBUG_OBJECT (bin,
1081       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1082       local_rt, last_unix);
1083
1084   /* recalc inter stream playout offset, but only if there is more than one
1085    * stream or we're doing NTP sync. */
1086   if (bin->ntp_sync) {
1087     /* For NTP sync we need to first get a snapshot of running_time and NTP
1088      * time. We know at what running_time we play a certain RTP time, we also
1089      * calculated when we would play the RTP time in the SR packet. Now we need
1090      * to know how the running_time and the NTP time relate to eachother. */
1091     get_current_times (bin, &running_time, &ntpnstime);
1092
1093     /* see how far away the NTP time is. This is the difference between the
1094      * current NTP time and the NTP time in the last SR packet. */
1095     ntpdiff = ntpnstime - last_unix;
1096     /* see how far away the running_time is. This is the difference between the
1097      * current running_time and the running_time of the RTP timestamp in the
1098      * last SR packet. */
1099     rtdiff = running_time - local_rt;
1100
1101     GST_DEBUG_OBJECT (bin,
1102         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1103         ntpnstime, last_unix);
1104     GST_DEBUG_OBJECT (bin,
1105         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1106         rtdiff);
1107
1108     /* combine to get the final diff to apply to the running_time */
1109     stream->rt_delta = rtdiff - ntpdiff;
1110
1111     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1112   } else {
1113     gint64 min, rtp_min, clock_base = stream->clock_base;
1114     gboolean all_sync, use_rtp;
1115     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1116
1117     /* calculate delta between server and receiver. last_unix is created by
1118      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1119      * delta expresses the difference to our timeline and the server timeline. The
1120      * difference in itself doesn't mean much but we can combine the delta of
1121      * multiple streams to create a stream specific offset. */
1122     stream->rt_delta = last_unix - local_rt;
1123
1124     /* calculate the min of all deltas, ignoring streams that did not yet have a
1125      * valid rt_delta because we did not yet receive an SR packet for those
1126      * streams.
1127      * We calculate the mininum because we would like to only apply positive
1128      * offsets to streams, delaying their playback instead of trying to speed up
1129      * other streams (which might be imposible when we have to create negative
1130      * latencies).
1131      * The stream that has the smallest diff is selected as the reference stream,
1132      * all other streams will have a positive offset to this difference. */
1133
1134     /* some alternative setting allow ignoring RTCP as much as possible,
1135      * for servers generating bogus ntp timeline */
1136     min = rtp_min = G_MAXINT64;
1137     use_rtp = FALSE;
1138     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1139       guint64 ext_base;
1140
1141       use_rtp = TRUE;
1142       /* signed version for convienience */
1143       clock_base = base_rtptime;
1144       /* deal with possible wrap-around */
1145       ext_base = base_rtptime;
1146       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1147       /* sanity check; base rtp and provided clock_base should be close */
1148       if (rtp_clock_base >= clock_base) {
1149         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1150           rtp_clock_base = base_time +
1151               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1152               GST_SECOND, clock_rate);
1153         } else {
1154           use_rtp = FALSE;
1155         }
1156       } else {
1157         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1158           rtp_clock_base = base_time -
1159               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1160               GST_SECOND, clock_rate);
1161         } else {
1162           use_rtp = FALSE;
1163         }
1164       }
1165       /* warn and bail for clarity out if no sane values */
1166       if (!use_rtp) {
1167         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1168         return;
1169       }
1170       /* store to track changes */
1171       clock_base = rtp_clock_base;
1172       /* generate a fake as before,
1173        * now equating rtptime obtained from RTP-Info,
1174        * where the large time represent the otherwise irrelevant npt/ntp time */
1175       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1176     } else {
1177       clock_base = rtp_clock_base;
1178     }
1179
1180     all_sync = TRUE;
1181     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1182       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1183
1184       if (!ostream->have_sync) {
1185         all_sync = FALSE;
1186         continue;
1187       }
1188
1189       /* change in current stream's base from previously init'ed value
1190        * leads to reset of all stream's base */
1191       if (stream != ostream && stream->clock_base >= 0 &&
1192           (stream->clock_base != clock_base)) {
1193         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1194         ostream->clock_base = -100 * GST_SECOND;
1195         ostream->rtp_delta = 0;
1196       }
1197
1198       if (ostream->rt_delta < min)
1199         min = ostream->rt_delta;
1200       if (ostream->rtp_delta < rtp_min)
1201         rtp_min = ostream->rtp_delta;
1202     }
1203
1204     /* arrange to re-sync for each stream upon significant change,
1205      * e.g. post-seek */
1206     all_sync = all_sync && (stream->clock_base == clock_base);
1207     stream->clock_base = clock_base;
1208
1209     /* may need init performed above later on, but nothing more to do now */
1210     if (client->nstreams <= 1)
1211       return;
1212
1213     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1214         " all sync %d", client, min, all_sync);
1215     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1216
1217     switch (rtcp_sync) {
1218       case GST_RTP_BIN_RTCP_SYNC_RTP:
1219         if (!use_rtp)
1220           break;
1221         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1222             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1223         /* fall-through */
1224       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1225         /* if all have been synced already, do not bother further */
1226         if (all_sync) {
1227           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1228           return;
1229         }
1230         break;
1231       default:
1232         break;
1233     }
1234
1235     /* bail out if we adjusted recently enough */
1236     if (all_sync && (last_unix - bin->priv->last_unix) <
1237         bin->rtcp_sync_interval * GST_MSECOND) {
1238       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1239           "previous sender info too recent "
1240           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1241       return;
1242     }
1243     bin->priv->last_unix = last_unix;
1244
1245     /* calculate offsets for each stream */
1246     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1247       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1248       gint64 ts_offset;
1249
1250       /* ignore streams for which we didn't receive an SR packet yet, we
1251        * can't synchronize them yet. We can however sync other streams just
1252        * fine. */
1253       if (!ostream->have_sync)
1254         continue;
1255
1256       /* calculate offset to our reference stream, this should always give a
1257        * positive number. */
1258       if (use_rtp)
1259         ts_offset = ostream->rtp_delta - rtp_min;
1260       else
1261         ts_offset = ostream->rt_delta - min;
1262
1263       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1264     }
1265   }
1266   gst_rtp_bin_send_sync_event (stream);
1267
1268   return;
1269 }
1270
1271 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1272   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1273           (b) = gst_rtcp_packet_move_to_next ((packet)))
1274
1275 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1276   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1277           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1278
1279 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1280   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1281           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1282
1283 static void
1284 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1285     GstRtpBinStream * stream)
1286 {
1287   GstRtpBin *bin;
1288   GstRTCPPacket packet;
1289   guint32 ssrc;
1290   guint64 ntptime;
1291   gboolean have_sr, have_sdes;
1292   gboolean more;
1293   guint64 base_rtptime;
1294   guint64 base_time;
1295   guint clock_rate;
1296   guint64 clock_base;
1297   guint64 extrtptime;
1298   GstBuffer *buffer;
1299   GstRTCPBuffer rtcp = { NULL, };
1300
1301   bin = stream->bin;
1302
1303   GST_DEBUG_OBJECT (bin, "sync handler called");
1304
1305   /* get the last relation between the rtp timestamps and the gstreamer
1306    * timestamps. We get this info directly from the jitterbuffer which
1307    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1308    * what the current situation is. */
1309   base_rtptime =
1310       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1311   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1312   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1313   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1314   extrtptime =
1315       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1316   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1317
1318   have_sr = FALSE;
1319   have_sdes = FALSE;
1320
1321   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1322
1323   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1324     /* first packet must be SR or RR or else the validate would have failed */
1325     switch (gst_rtcp_packet_get_type (&packet)) {
1326       case GST_RTCP_TYPE_SR:
1327         /* only parse first. There is only supposed to be one SR in the packet
1328          * but we will deal with malformed packets gracefully */
1329         if (have_sr)
1330           break;
1331         /* get NTP and RTP times */
1332         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1333             NULL, NULL);
1334
1335         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1336         /* ignore SR that is not ours */
1337         if (ssrc != stream->ssrc)
1338           continue;
1339
1340         have_sr = TRUE;
1341         break;
1342       case GST_RTCP_TYPE_SDES:
1343       {
1344         gboolean more_items, more_entries;
1345
1346         /* only deal with first SDES, there is only supposed to be one SDES in
1347          * the RTCP packet but we deal with bad packets gracefully. Also bail
1348          * out if we have not seen an SR item yet. */
1349         if (have_sdes || !have_sr)
1350           break;
1351
1352         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1353           /* skip items that are not about the SSRC of the sender */
1354           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1355             continue;
1356
1357           /* find the CNAME entry */
1358           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1359             GstRTCPSDESType type;
1360             guint8 len;
1361             guint8 *data;
1362
1363             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1364
1365             if (type == GST_RTCP_SDES_CNAME) {
1366               GST_RTP_BIN_LOCK (bin);
1367               /* associate the stream to CNAME */
1368               gst_rtp_bin_associate (bin, stream, len, data,
1369                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1370                   clock_base);
1371               GST_RTP_BIN_UNLOCK (bin);
1372             }
1373           }
1374         }
1375         have_sdes = TRUE;
1376         break;
1377       }
1378       default:
1379         /* we can ignore these packets */
1380         break;
1381     }
1382   }
1383   gst_rtcp_buffer_unmap (&rtcp);
1384 }
1385
1386 /* create a new stream with @ssrc in @session. Must be called with
1387  * RTP_SESSION_LOCK. */
1388 static GstRtpBinStream *
1389 create_stream (GstRtpBinSession * session, guint32 ssrc)
1390 {
1391   GstElement *buffer, *demux = NULL;
1392   GstRtpBinStream *stream;
1393   GstRtpBin *rtpbin;
1394   GstState target;
1395
1396   rtpbin = session->bin;
1397
1398   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1399     goto no_jitterbuffer;
1400
1401   if (!rtpbin->ignore_pt)
1402     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1403       goto no_demux;
1404
1405
1406   stream = g_new0 (GstRtpBinStream, 1);
1407   stream->ssrc = ssrc;
1408   stream->bin = rtpbin;
1409   stream->session = session;
1410   stream->buffer = buffer;
1411   stream->demux = demux;
1412
1413   stream->have_sync = FALSE;
1414   stream->rt_delta = 0;
1415   stream->rtp_delta = 0;
1416   stream->percent = 100;
1417   stream->clock_base = -100 * GST_SECOND;
1418   session->streams = g_slist_prepend (session->streams, stream);
1419
1420   /* provide clock_rate to the jitterbuffer when needed */
1421   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1422       (GCallback) pt_map_requested, session);
1423   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1424       (GCallback) on_npt_stop, stream);
1425
1426   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1427   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1428
1429   /* configure latency and packet lost */
1430   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1431   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1432   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1433   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1434   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1435
1436   if (!rtpbin->ignore_pt)
1437     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1438   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1439
1440   /* link stuff */
1441   if (demux)
1442     gst_element_link_pads_full (buffer, "src", demux, "sink",
1443         GST_PAD_LINK_CHECK_NOTHING);
1444
1445   if (rtpbin->buffering) {
1446     guint64 last_out;
1447
1448     GST_INFO_OBJECT (rtpbin,
1449         "bin is buffering, set jitterbuffer as not active");
1450     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1451   }
1452
1453
1454   GST_OBJECT_LOCK (rtpbin);
1455   target = GST_STATE_TARGET (rtpbin);
1456   GST_OBJECT_UNLOCK (rtpbin);
1457
1458   /* from sink to source */
1459   if (demux)
1460     gst_element_set_state (demux, target);
1461
1462   gst_element_set_state (buffer, target);
1463
1464   return stream;
1465
1466   /* ERRORS */
1467 no_jitterbuffer:
1468   {
1469     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1470     return NULL;
1471   }
1472 no_demux:
1473   {
1474     gst_object_unref (buffer);
1475     g_warning ("rtpbin: could not create rtpptdemux element");
1476     return NULL;
1477   }
1478 }
1479
1480 /* called with RTP_BIN_LOCK */
1481 static void
1482 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1483 {
1484   GSList *clients, *next_client;
1485
1486   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1487
1488   if (stream->demux) {
1489     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1490     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1491     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1492   }
1493   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1494   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1495   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1496
1497   if (stream->demux)
1498     gst_element_set_locked_state (stream->demux, TRUE);
1499   gst_element_set_locked_state (stream->buffer, TRUE);
1500
1501   if (stream->demux)
1502     gst_element_set_state (stream->demux, GST_STATE_NULL);
1503   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1504
1505   /* now remove this signal, we need this while going to NULL because it to
1506    * do some cleanups */
1507   if (stream->demux)
1508     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1509
1510   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1511   if (stream->demux)
1512     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1513
1514   for (clients = bin->clients; clients; clients = next_client) {
1515     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1516     GSList *streams, *next_stream;
1517
1518     next_client = g_slist_next (clients);
1519
1520     for (streams = client->streams; streams; streams = next_stream) {
1521       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1522
1523       next_stream = g_slist_next (streams);
1524
1525       if (ostream == stream) {
1526         client->streams = g_slist_delete_link (client->streams, streams);
1527         /* If this was the last stream belonging to this client,
1528          * clean up the client. */
1529         if (--client->nstreams == 0) {
1530           bin->clients = g_slist_delete_link (bin->clients, clients);
1531           free_client (client, bin);
1532           break;
1533         }
1534       }
1535     }
1536   }
1537   g_free (stream);
1538 }
1539
1540 /* GObject vmethods */
1541 static void gst_rtp_bin_dispose (GObject * object);
1542 static void gst_rtp_bin_finalize (GObject * object);
1543 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1544     const GValue * value, GParamSpec * pspec);
1545 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1546     GValue * value, GParamSpec * pspec);
1547
1548 /* GstElement vmethods */
1549 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1550     GstStateChange transition);
1551 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1552     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1553 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1554 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1555
1556 #define gst_rtp_bin_parent_class parent_class
1557 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1558
1559 static void
1560 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1561 {
1562   GObjectClass *gobject_class;
1563   GstElementClass *gstelement_class;
1564   GstBinClass *gstbin_class;
1565
1566   gobject_class = (GObjectClass *) klass;
1567   gstelement_class = (GstElementClass *) klass;
1568   gstbin_class = (GstBinClass *) klass;
1569
1570   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1571
1572   gobject_class->dispose = gst_rtp_bin_dispose;
1573   gobject_class->finalize = gst_rtp_bin_finalize;
1574   gobject_class->set_property = gst_rtp_bin_set_property;
1575   gobject_class->get_property = gst_rtp_bin_get_property;
1576
1577   g_object_class_install_property (gobject_class, PROP_LATENCY,
1578       g_param_spec_uint ("latency", "Buffer latency in ms",
1579           "Default amount of ms to buffer in the jitterbuffers", 0,
1580           G_MAXUINT, DEFAULT_LATENCY_MS,
1581           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1582
1583   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1584       g_param_spec_boolean ("drop-on-latency",
1585           "Drop buffers when maximum latency is reached",
1586           "Tells the jitterbuffer to never exceed the given latency in size",
1587           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1588
1589   /**
1590    * GstRtpBin::request-pt-map:
1591    * @rtpbin: the object which received the signal
1592    * @session: the session
1593    * @pt: the pt
1594    *
1595    * Request the payload type as #GstCaps for @pt in @session.
1596    */
1597   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1598       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1599       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1600       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 2, G_TYPE_UINT,
1601       G_TYPE_UINT);
1602
1603     /**
1604    * GstRtpBin::payload-type-change:
1605    * @rtpbin: the object which received the signal
1606    * @session: the session
1607    * @pt: the pt
1608    *
1609    * Signal that the current payload type changed to @pt in @session.
1610    */
1611   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1612       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1613       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1614       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1615       G_TYPE_UINT);
1616
1617   /**
1618    * GstRtpBin::clear-pt-map:
1619    * @rtpbin: the object which received the signal
1620    *
1621    * Clear all previously cached pt-mapping obtained with
1622    * #GstRtpBin::request-pt-map.
1623    */
1624   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1625       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1626       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1627           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1628       0, G_TYPE_NONE);
1629
1630   /**
1631    * GstRtpBin::reset-sync:
1632    * @rtpbin: the object which received the signal
1633    *
1634    * Reset all currently configured lip-sync parameters and require new SR
1635    * packets for all streams before lip-sync is attempted again.
1636    */
1637   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1638       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1639       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1640           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1641       0, G_TYPE_NONE);
1642
1643   /**
1644    * GstRtpBin::get-internal-session:
1645    * @rtpbin: the object which received the signal
1646    * @id: the session id
1647    *
1648    * Request the internal RTPSession object as #GObject in session @id.
1649    */
1650   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1651       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1652       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1653           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
1654       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1655
1656   /**
1657    * GstRtpBin::on-new-ssrc:
1658    * @rtpbin: the object which received the signal
1659    * @session: the session
1660    * @ssrc: the SSRC
1661    *
1662    * Notify of a new SSRC that entered @session.
1663    */
1664   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1665       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1666       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1667       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1668       G_TYPE_UINT);
1669   /**
1670    * GstRtpBin::on-ssrc-collision:
1671    * @rtpbin: the object which received the signal
1672    * @session: the session
1673    * @ssrc: the SSRC
1674    *
1675    * Notify when we have an SSRC collision
1676    */
1677   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1678       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1679       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1680       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1681       G_TYPE_UINT);
1682   /**
1683    * GstRtpBin::on-ssrc-validated:
1684    * @rtpbin: the object which received the signal
1685    * @session: the session
1686    * @ssrc: the SSRC
1687    *
1688    * Notify of a new SSRC that became validated.
1689    */
1690   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1691       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1692       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1693       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1694       G_TYPE_UINT);
1695   /**
1696    * GstRtpBin::on-ssrc-active:
1697    * @rtpbin: the object which received the signal
1698    * @session: the session
1699    * @ssrc: the SSRC
1700    *
1701    * Notify of a SSRC that is active, i.e., sending RTCP.
1702    */
1703   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1704       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1705       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1706       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1707       G_TYPE_UINT);
1708   /**
1709    * GstRtpBin::on-ssrc-sdes:
1710    * @rtpbin: the object which received the signal
1711    * @session: the session
1712    * @ssrc: the SSRC
1713    *
1714    * Notify of a SSRC that is active, i.e., sending RTCP.
1715    */
1716   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1717       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1718       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1719       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1720       G_TYPE_UINT);
1721
1722   /**
1723    * GstRtpBin::on-bye-ssrc:
1724    * @rtpbin: the object which received the signal
1725    * @session: the session
1726    * @ssrc: the SSRC
1727    *
1728    * Notify of an SSRC that became inactive because of a BYE packet.
1729    */
1730   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1731       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1732       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1733       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1734       G_TYPE_UINT);
1735   /**
1736    * GstRtpBin::on-bye-timeout:
1737    * @rtpbin: the object which received the signal
1738    * @session: the session
1739    * @ssrc: the SSRC
1740    *
1741    * Notify of an SSRC that has timed out because of BYE
1742    */
1743   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1744       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1745       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1746       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1747       G_TYPE_UINT);
1748   /**
1749    * GstRtpBin::on-timeout:
1750    * @rtpbin: the object which received the signal
1751    * @session: the session
1752    * @ssrc: the SSRC
1753    *
1754    * Notify of an SSRC that has timed out
1755    */
1756   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1757       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1758       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1759       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1760       G_TYPE_UINT);
1761   /**
1762    * GstRtpBin::on-sender-timeout:
1763    * @rtpbin: the object which received the signal
1764    * @session: the session
1765    * @ssrc: the SSRC
1766    *
1767    * Notify of a sender SSRC that has timed out and became a receiver
1768    */
1769   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1770       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1771       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1772       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1773       G_TYPE_UINT);
1774
1775   /**
1776    * GstRtpBin::on-npt-stop:
1777    * @rtpbin: the object which received the signal
1778    * @session: the session
1779    * @ssrc: the SSRC
1780    *
1781    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1782    */
1783   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1784       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1785       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1786       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1787       G_TYPE_UINT);
1788
1789   g_object_class_install_property (gobject_class, PROP_SDES,
1790       g_param_spec_boxed ("sdes", "SDES",
1791           "The SDES items of this session",
1792           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1793
1794   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1795       g_param_spec_boolean ("do-lost", "Do Lost",
1796           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1797           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1798
1799   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
1800       g_param_spec_boolean ("autoremove", "Auto Remove",
1801           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
1802           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1803
1804   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
1805       g_param_spec_boolean ("ignore-pt", "Ignore PT",
1806           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
1807           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1808
1809   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
1810       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
1811           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
1812           DEFAULT_USE_PIPELINE_CLOCK,
1813           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1814   /**
1815    * GstRtpBin:buffer-mode:
1816    *
1817    * Control the buffering and timestamping mode used by the jitterbuffer.
1818    */
1819   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
1820       g_param_spec_enum ("buffer-mode", "Buffer Mode",
1821           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
1822           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1823   /**
1824    * GstRtpBin:ntp-sync:
1825    *
1826    * Set the NTP time from the sender reports as the running-time on the
1827    * buffers. When both the sender and receiver have sychronized
1828    * running-time, i.e. when the clock and base-time is shared
1829    * between the receivers and the and the senders, this option can be
1830    * used to synchronize receivers on multiple machines.
1831    */
1832   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
1833       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
1834           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
1835           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1836
1837   /**
1838    * GstRtpBin:rtcp-sync:
1839    *
1840    * If not synchronizing (directly) to the NTP clock, determines how to sync
1841    * the various streams.
1842    */
1843   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
1844       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
1845           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
1846           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1847
1848   /**
1849    * GstRtpBin:rtcp-sync-interval:
1850    *
1851    * Determines how often to sync streams using RTCP data.
1852    */
1853   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
1854       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
1855           "RTCP SR interval synchronization (ms) (0 = always)",
1856           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
1857           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1858
1859   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
1860       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
1861           "Send event downstream when a stream is synchronized to the sender",
1862           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1863
1864   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
1865       g_param_spec_boolean ("do-retransmission", "Do retransmission",
1866           "Send an event downstream to request packet retransmission",
1867           DEFAULT_DO_RETRANSMISSION,
1868           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1869
1870   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1871   gstelement_class->request_new_pad =
1872       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1873   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1874
1875   /* sink pads */
1876   gst_element_class_add_pad_template (gstelement_class,
1877       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
1878   gst_element_class_add_pad_template (gstelement_class,
1879       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
1880   gst_element_class_add_pad_template (gstelement_class,
1881       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
1882
1883   /* src pads */
1884   gst_element_class_add_pad_template (gstelement_class,
1885       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
1886   gst_element_class_add_pad_template (gstelement_class,
1887       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
1888   gst_element_class_add_pad_template (gstelement_class,
1889       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
1890
1891   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
1892       "Filter/Network/RTP",
1893       "Real-Time Transport Protocol bin",
1894       "Wim Taymans <wim.taymans@gmail.com>");
1895
1896   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
1897
1898   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1899   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
1900   klass->get_internal_session =
1901       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
1902
1903   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1904 }
1905
1906 static void
1907 gst_rtp_bin_init (GstRtpBin * rtpbin)
1908 {
1909   gchar *cname;
1910
1911   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1912   g_mutex_init (&rtpbin->priv->bin_lock);
1913   g_mutex_init (&rtpbin->priv->dyn_lock);
1914
1915   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
1916   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
1917   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
1918   rtpbin->do_lost = DEFAULT_DO_LOST;
1919   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
1920   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
1921   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
1922   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
1923   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
1924   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
1925   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
1926   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
1927   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1928
1929   /* some default SDES entries */
1930   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
1931   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
1932       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
1933   g_free (cname);
1934 }
1935
1936 static void
1937 gst_rtp_bin_dispose (GObject * object)
1938 {
1939   GstRtpBin *rtpbin;
1940
1941   rtpbin = GST_RTP_BIN (object);
1942
1943   GST_RTP_BIN_LOCK (rtpbin);
1944   GST_DEBUG_OBJECT (object, "freeing sessions");
1945   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
1946   g_slist_free (rtpbin->sessions);
1947   rtpbin->sessions = NULL;
1948   GST_RTP_BIN_UNLOCK (rtpbin);
1949
1950   G_OBJECT_CLASS (parent_class)->dispose (object);
1951 }
1952
1953 static void
1954 gst_rtp_bin_finalize (GObject * object)
1955 {
1956   GstRtpBin *rtpbin;
1957
1958   rtpbin = GST_RTP_BIN (object);
1959
1960   if (rtpbin->sdes)
1961     gst_structure_free (rtpbin->sdes);
1962
1963   g_mutex_clear (&rtpbin->priv->bin_lock);
1964   g_mutex_clear (&rtpbin->priv->dyn_lock);
1965
1966   G_OBJECT_CLASS (parent_class)->finalize (object);
1967 }
1968
1969
1970 static void
1971 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
1972 {
1973   GSList *item;
1974
1975   if (sdes == NULL)
1976     return;
1977
1978   GST_RTP_BIN_LOCK (bin);
1979
1980   GST_OBJECT_LOCK (bin);
1981   if (bin->sdes)
1982     gst_structure_free (bin->sdes);
1983   bin->sdes = gst_structure_copy (sdes);
1984   GST_OBJECT_UNLOCK (bin);
1985
1986   /* store in all sessions */
1987   for (item = bin->sessions; item; item = g_slist_next (item)) {
1988     GstRtpBinSession *session = item->data;
1989     g_object_set (session->session, "sdes", sdes, NULL);
1990   }
1991
1992   GST_RTP_BIN_UNLOCK (bin);
1993 }
1994
1995 static GstStructure *
1996 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
1997 {
1998   GstStructure *result;
1999
2000   GST_OBJECT_LOCK (bin);
2001   result = gst_structure_copy (bin->sdes);
2002   GST_OBJECT_UNLOCK (bin);
2003
2004   return result;
2005 }
2006
2007 static void
2008 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2009     const GValue * value, GParamSpec * pspec)
2010 {
2011   GstRtpBin *rtpbin;
2012
2013   rtpbin = GST_RTP_BIN (object);
2014
2015   switch (prop_id) {
2016     case PROP_LATENCY:
2017       GST_RTP_BIN_LOCK (rtpbin);
2018       rtpbin->latency_ms = g_value_get_uint (value);
2019       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2020       GST_RTP_BIN_UNLOCK (rtpbin);
2021       /* propagate the property down to the jitterbuffer */
2022       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2023       break;
2024     case PROP_DROP_ON_LATENCY:
2025       GST_RTP_BIN_LOCK (rtpbin);
2026       rtpbin->drop_on_latency = g_value_get_boolean (value);
2027       GST_RTP_BIN_UNLOCK (rtpbin);
2028       /* propagate the property down to the jitterbuffer */
2029       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2030           "drop-on-latency", value);
2031       break;
2032     case PROP_SDES:
2033       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2034       break;
2035     case PROP_DO_LOST:
2036       GST_RTP_BIN_LOCK (rtpbin);
2037       rtpbin->do_lost = g_value_get_boolean (value);
2038       GST_RTP_BIN_UNLOCK (rtpbin);
2039       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2040       break;
2041     case PROP_NTP_SYNC:
2042       rtpbin->ntp_sync = g_value_get_boolean (value);
2043       break;
2044     case PROP_RTCP_SYNC:
2045       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2046       break;
2047     case PROP_RTCP_SYNC_INTERVAL:
2048       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2049       break;
2050     case PROP_IGNORE_PT:
2051       rtpbin->ignore_pt = g_value_get_boolean (value);
2052       break;
2053     case PROP_AUTOREMOVE:
2054       rtpbin->priv->autoremove = g_value_get_boolean (value);
2055       break;
2056     case PROP_USE_PIPELINE_CLOCK:
2057     {
2058       GSList *sessions;
2059       GST_RTP_BIN_LOCK (rtpbin);
2060       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2061       for (sessions = rtpbin->sessions; sessions;
2062           sessions = g_slist_next (sessions)) {
2063         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2064
2065         g_object_set (G_OBJECT (session->session),
2066             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2067       }
2068       GST_RTP_BIN_UNLOCK (rtpbin);
2069     }
2070       break;
2071     case PROP_DO_SYNC_EVENT:
2072       rtpbin->send_sync_event = g_value_get_boolean (value);
2073       break;
2074     case PROP_BUFFER_MODE:
2075       GST_RTP_BIN_LOCK (rtpbin);
2076       rtpbin->buffer_mode = g_value_get_enum (value);
2077       GST_RTP_BIN_UNLOCK (rtpbin);
2078       /* propagate the property down to the jitterbuffer */
2079       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2080       break;
2081     case PROP_DO_RETRANSMISSION:
2082       GST_RTP_BIN_LOCK (rtpbin);
2083       rtpbin->do_retransmission = g_value_get_boolean (value);
2084       GST_RTP_BIN_UNLOCK (rtpbin);
2085       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2086           "do-retransmission", value);
2087       break;
2088     default:
2089       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2090       break;
2091   }
2092 }
2093
2094 static void
2095 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2096     GValue * value, GParamSpec * pspec)
2097 {
2098   GstRtpBin *rtpbin;
2099
2100   rtpbin = GST_RTP_BIN (object);
2101
2102   switch (prop_id) {
2103     case PROP_LATENCY:
2104       GST_RTP_BIN_LOCK (rtpbin);
2105       g_value_set_uint (value, rtpbin->latency_ms);
2106       GST_RTP_BIN_UNLOCK (rtpbin);
2107       break;
2108     case PROP_DROP_ON_LATENCY:
2109       GST_RTP_BIN_LOCK (rtpbin);
2110       g_value_set_boolean (value, rtpbin->drop_on_latency);
2111       GST_RTP_BIN_UNLOCK (rtpbin);
2112       break;
2113     case PROP_SDES:
2114       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2115       break;
2116     case PROP_DO_LOST:
2117       GST_RTP_BIN_LOCK (rtpbin);
2118       g_value_set_boolean (value, rtpbin->do_lost);
2119       GST_RTP_BIN_UNLOCK (rtpbin);
2120       break;
2121     case PROP_IGNORE_PT:
2122       g_value_set_boolean (value, rtpbin->ignore_pt);
2123       break;
2124     case PROP_NTP_SYNC:
2125       g_value_set_boolean (value, rtpbin->ntp_sync);
2126       break;
2127     case PROP_RTCP_SYNC:
2128       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2129       break;
2130     case PROP_RTCP_SYNC_INTERVAL:
2131       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2132       break;
2133     case PROP_AUTOREMOVE:
2134       g_value_set_boolean (value, rtpbin->priv->autoremove);
2135       break;
2136     case PROP_BUFFER_MODE:
2137       g_value_set_enum (value, rtpbin->buffer_mode);
2138       break;
2139     case PROP_USE_PIPELINE_CLOCK:
2140       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2141       break;
2142     case PROP_DO_SYNC_EVENT:
2143       g_value_set_boolean (value, rtpbin->send_sync_event);
2144       break;
2145     case PROP_DO_RETRANSMISSION:
2146       GST_RTP_BIN_LOCK (rtpbin);
2147       g_value_set_boolean (value, rtpbin->do_retransmission);
2148       GST_RTP_BIN_UNLOCK (rtpbin);
2149       break;
2150     default:
2151       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2152       break;
2153   }
2154 }
2155
2156 static void
2157 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2158 {
2159   GstRtpBin *rtpbin;
2160
2161   rtpbin = GST_RTP_BIN (bin);
2162
2163   switch (GST_MESSAGE_TYPE (message)) {
2164     case GST_MESSAGE_ELEMENT:
2165     {
2166       const GstStructure *s = gst_message_get_structure (message);
2167
2168       /* we change the structure name and add the session ID to it */
2169       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2170         GstRtpBinSession *sess;
2171
2172         /* find the session we set it as object data */
2173         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2174             "GstRTPBin.session");
2175
2176         if (G_LIKELY (sess)) {
2177           message = gst_message_make_writable (message);
2178           s = gst_message_get_structure (message);
2179           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2180               sess->id, NULL);
2181         }
2182       }
2183       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2184       break;
2185     }
2186     case GST_MESSAGE_BUFFERING:
2187     {
2188       gint percent;
2189       gint min_percent = 100;
2190       GSList *sessions, *streams;
2191       GstRtpBinStream *stream;
2192       gboolean change = FALSE, active = FALSE;
2193       GstClockTime min_out_time;
2194       GstBufferingMode mode;
2195       gint avg_in, avg_out;
2196       gint64 buffering_left;
2197
2198       gst_message_parse_buffering (message, &percent);
2199       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2200           &buffering_left);
2201
2202       stream =
2203           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2204           "GstRTPBin.stream");
2205
2206       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2207
2208       /* get the stream */
2209       if (G_LIKELY (stream)) {
2210         GST_RTP_BIN_LOCK (rtpbin);
2211         /* fill in the percent */
2212         stream->percent = percent;
2213
2214         /* calculate the min value for all streams */
2215         for (sessions = rtpbin->sessions; sessions;
2216             sessions = g_slist_next (sessions)) {
2217           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2218
2219           GST_RTP_SESSION_LOCK (session);
2220           if (session->streams) {
2221             for (streams = session->streams; streams;
2222                 streams = g_slist_next (streams)) {
2223               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2224
2225               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2226                   stream->percent);
2227
2228               /* find min percent */
2229               if (min_percent > stream->percent)
2230                 min_percent = stream->percent;
2231             }
2232           } else {
2233             GST_INFO_OBJECT (bin,
2234                 "session has no streams, setting min_percent to 0");
2235             min_percent = 0;
2236           }
2237           GST_RTP_SESSION_UNLOCK (session);
2238         }
2239         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2240
2241         if (rtpbin->buffering) {
2242           if (min_percent == 100) {
2243             rtpbin->buffering = FALSE;
2244             active = TRUE;
2245             change = TRUE;
2246           }
2247         } else {
2248           if (min_percent < 100) {
2249             /* pause the streams */
2250             rtpbin->buffering = TRUE;
2251             active = FALSE;
2252             change = TRUE;
2253           }
2254         }
2255         GST_RTP_BIN_UNLOCK (rtpbin);
2256
2257         gst_message_unref (message);
2258
2259         /* make a new buffering message with the min value */
2260         message =
2261             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2262         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2263             buffering_left);
2264
2265         if (G_UNLIKELY (change)) {
2266           GstClock *clock;
2267           guint64 running_time = 0;
2268           guint64 offset = 0;
2269
2270           /* figure out the running time when we have a clock */
2271           if (G_LIKELY ((clock =
2272                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2273             guint64 now, base_time;
2274
2275             now = gst_clock_get_time (clock);
2276             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2277             running_time = now - base_time;
2278             gst_object_unref (clock);
2279           }
2280           GST_DEBUG_OBJECT (bin,
2281               "running time now %" GST_TIME_FORMAT,
2282               GST_TIME_ARGS (running_time));
2283
2284           GST_RTP_BIN_LOCK (rtpbin);
2285
2286           /* when we reactivate, calculate the offsets so that all streams have
2287            * an output time that is at least as big as the running_time */
2288           offset = 0;
2289           if (active) {
2290             if (running_time > rtpbin->buffer_start) {
2291               offset = running_time - rtpbin->buffer_start;
2292               if (offset >= rtpbin->latency_ns)
2293                 offset -= rtpbin->latency_ns;
2294               else
2295                 offset = 0;
2296             }
2297           }
2298
2299           /* pause all streams */
2300           min_out_time = -1;
2301           for (sessions = rtpbin->sessions; sessions;
2302               sessions = g_slist_next (sessions)) {
2303             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2304
2305             GST_RTP_SESSION_LOCK (session);
2306             for (streams = session->streams; streams;
2307                 streams = g_slist_next (streams)) {
2308               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2309               GstElement *element = stream->buffer;
2310               guint64 last_out;
2311
2312               g_signal_emit_by_name (element, "set-active", active, offset,
2313                   &last_out);
2314
2315               if (!active) {
2316                 g_object_get (element, "percent", &stream->percent, NULL);
2317
2318                 if (last_out == -1)
2319                   last_out = 0;
2320                 if (min_out_time == -1 || last_out < min_out_time)
2321                   min_out_time = last_out;
2322               }
2323
2324               GST_DEBUG_OBJECT (bin,
2325                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2326                   GST_TIME_FORMAT ", percent %d", element, active,
2327                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2328                   stream->percent);
2329             }
2330             GST_RTP_SESSION_UNLOCK (session);
2331           }
2332           GST_DEBUG_OBJECT (bin,
2333               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2334
2335           /* the buffer_start is the min out time of all paused jitterbuffers */
2336           if (!active)
2337             rtpbin->buffer_start = min_out_time;
2338
2339           GST_RTP_BIN_UNLOCK (rtpbin);
2340         }
2341       }
2342       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2343       break;
2344     }
2345     default:
2346     {
2347       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2348       break;
2349     }
2350   }
2351 }
2352
2353 static GstStateChangeReturn
2354 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2355 {
2356   GstStateChangeReturn res;
2357   GstRtpBin *rtpbin;
2358   GstRtpBinPrivate *priv;
2359
2360   rtpbin = GST_RTP_BIN (element);
2361   priv = rtpbin->priv;
2362
2363   switch (transition) {
2364     case GST_STATE_CHANGE_NULL_TO_READY:
2365       break;
2366     case GST_STATE_CHANGE_READY_TO_PAUSED:
2367       priv->last_unix = 0;
2368       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2369       g_atomic_int_set (&priv->shutdown, 0);
2370       break;
2371     case GST_STATE_CHANGE_PAUSED_TO_READY:
2372       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2373       g_atomic_int_set (&priv->shutdown, 1);
2374       /* wait for all callbacks to end by taking the lock. No new callbacks will
2375        * be able to happen as we set the shutdown flag. */
2376       GST_RTP_BIN_DYN_LOCK (rtpbin);
2377       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2378       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2379       break;
2380     default:
2381       break;
2382   }
2383
2384   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2385
2386   switch (transition) {
2387     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2388       break;
2389     case GST_STATE_CHANGE_PAUSED_TO_READY:
2390       break;
2391     case GST_STATE_CHANGE_READY_TO_NULL:
2392       break;
2393     default:
2394       break;
2395   }
2396   return res;
2397 }
2398
2399 /* a new pad (SSRC) was created in @session. This signal is emited from the
2400  * payload demuxer. */
2401 static void
2402 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2403     GstRtpBinStream * stream)
2404 {
2405   GstRtpBin *rtpbin;
2406   GstElementClass *klass;
2407   GstPadTemplate *templ;
2408   gchar *padname;
2409   GstPad *gpad;
2410
2411   rtpbin = stream->bin;
2412
2413   GST_DEBUG ("new payload pad %d", pt);
2414
2415   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2416
2417   /* ghost the pad to the parent */
2418   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2419   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2420   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2421       stream->session->id, stream->ssrc, pt);
2422   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2423   g_free (padname);
2424   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2425
2426   gst_pad_set_active (gpad, TRUE);
2427   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2428
2429   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2430
2431   return;
2432
2433 shutdown:
2434   {
2435     GST_DEBUG ("ignoring, we are shutting down");
2436     return;
2437   }
2438 }
2439
2440 static void
2441 payload_pad_removed (GstElement * element, GstPad * pad,
2442     GstRtpBinStream * stream)
2443 {
2444   GstRtpBin *rtpbin;
2445   GstPad *gpad;
2446
2447   rtpbin = stream->bin;
2448
2449   GST_DEBUG ("payload pad removed");
2450
2451   GST_RTP_BIN_DYN_LOCK (rtpbin);
2452   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2453     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2454
2455     gst_pad_set_active (gpad, FALSE);
2456     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2457   }
2458   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2459 }
2460
2461 static GstCaps *
2462 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2463 {
2464   GstRtpBin *rtpbin;
2465   GstCaps *caps;
2466
2467   rtpbin = session->bin;
2468
2469   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2470       session->id);
2471
2472   caps = get_pt_map (session, pt);
2473   if (!caps)
2474     goto no_caps;
2475
2476   return caps;
2477
2478   /* ERRORS */
2479 no_caps:
2480   {
2481     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2482     return NULL;
2483   }
2484 }
2485
2486 static void
2487 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2488 {
2489   GST_DEBUG_OBJECT (session->bin,
2490       "emiting signal for pt type changed to %d in session %d", pt,
2491       session->id);
2492
2493   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2494       0, session->id, pt);
2495 }
2496
2497 /* emited when caps changed for the session */
2498 static void
2499 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2500 {
2501   GstRtpBin *bin;
2502   GstCaps *caps;
2503   gint payload;
2504   const GstStructure *s;
2505
2506   bin = session->bin;
2507
2508   g_object_get (pad, "caps", &caps, NULL);
2509
2510   if (caps == NULL)
2511     return;
2512
2513   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2514
2515   s = gst_caps_get_structure (caps, 0);
2516
2517   /* get payload, finish when it's not there */
2518   if (!gst_structure_get_int (s, "payload", &payload))
2519     return;
2520
2521   GST_RTP_SESSION_LOCK (session);
2522   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2523   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2524   GST_RTP_SESSION_UNLOCK (session);
2525 }
2526
2527 /* a new pad (SSRC) was created in @session */
2528 static void
2529 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2530     GstRtpBinSession * session)
2531 {
2532   GstRtpBin *rtpbin;
2533   GstRtpBinStream *stream;
2534   GstPad *sinkpad, *srcpad;
2535   gchar *padname;
2536
2537   rtpbin = session->bin;
2538
2539   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2540       GST_DEBUG_PAD_NAME (pad));
2541
2542   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2543
2544   GST_RTP_SESSION_LOCK (session);
2545
2546   /* create new stream */
2547   stream = create_stream (session, ssrc);
2548   if (!stream)
2549     goto no_stream;
2550
2551   /* get pad and link */
2552   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2553   padname = g_strdup_printf ("src_%u", ssrc);
2554   srcpad = gst_element_get_static_pad (element, padname);
2555   g_free (padname);
2556   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2557   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2558   gst_object_unref (sinkpad);
2559   gst_object_unref (srcpad);
2560
2561   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2562   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2563   srcpad = gst_element_get_static_pad (element, padname);
2564   g_free (padname);
2565   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2566   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2567   gst_object_unref (sinkpad);
2568   gst_object_unref (srcpad);
2569
2570   /* connect to the RTCP sync signal from the jitterbuffer */
2571   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2572   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2573       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2574
2575   if (stream->demux) {
2576     /* connect to the new-pad signal of the payload demuxer, this will expose the
2577      * new pad by ghosting it. */
2578     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2579         "new-payload-type", (GCallback) new_payload_found, stream);
2580     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2581         "pad-removed", (GCallback) payload_pad_removed, stream);
2582
2583     /* connect to the request-pt-map signal. This signal will be emited by the
2584      * demuxer so that it can apply a proper caps on the buffers for the
2585      * depayloaders. */
2586     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2587         "request-pt-map", (GCallback) pt_map_requested, session);
2588     /* connect to the  signal so it can be forwarded. */
2589     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2590         "payload-type-change", (GCallback) payload_type_change, session);
2591   } else {
2592     /* add rtpjitterbuffer src pad to pads */
2593     GstElementClass *klass;
2594     GstPadTemplate *templ;
2595     gchar *padname;
2596     GstPad *gpad, *pad;
2597
2598     pad = gst_element_get_static_pad (stream->buffer, "src");
2599
2600     /* ghost the pad to the parent */
2601     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2602     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2603     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2604         stream->session->id, stream->ssrc, 255);
2605     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2606     g_free (padname);
2607
2608     gst_pad_set_active (gpad, TRUE);
2609     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2610
2611     gst_object_unref (pad);
2612   }
2613
2614   GST_RTP_SESSION_UNLOCK (session);
2615   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2616
2617   return;
2618
2619   /* ERRORS */
2620 shutdown:
2621   {
2622     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2623     return;
2624   }
2625 no_stream:
2626   {
2627     GST_RTP_SESSION_UNLOCK (session);
2628     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2629     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2630     return;
2631   }
2632 }
2633
2634 /* Create a pad for receiving RTP for the session in @name. Must be called with
2635  * RTP_BIN_LOCK.
2636  */
2637 static GstPad *
2638 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2639 {
2640   GstPad *sinkdpad;
2641   guint sessid;
2642   GstRtpBinSession *session;
2643
2644   /* first get the session number */
2645   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2646     goto no_name;
2647
2648   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2649
2650   /* get or create session */
2651   session = find_session_by_id (rtpbin, sessid);
2652   if (!session) {
2653     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2654     /* create session now */
2655     session = create_session (rtpbin, sessid);
2656     if (session == NULL)
2657       goto create_error;
2658   }
2659
2660   /* check if pad was requested */
2661   if (session->recv_rtp_sink_ghost != NULL)
2662     return session->recv_rtp_sink_ghost;
2663
2664   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2665   /* get recv_rtp pad and store */
2666   session->recv_rtp_sink =
2667       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2668   if (session->recv_rtp_sink == NULL)
2669     goto pad_failed;
2670
2671   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2672       (GCallback) caps_changed, session);
2673
2674   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2675   /* get srcpad, link to SSRCDemux */
2676   session->recv_rtp_src =
2677       gst_element_get_static_pad (session->session, "recv_rtp_src");
2678   if (session->recv_rtp_src == NULL)
2679     goto pad_failed;
2680
2681   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2682   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2683   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2684   gst_pad_link_full (session->recv_rtp_src, sinkdpad,
2685       GST_PAD_LINK_CHECK_NOTHING);
2686   gst_object_unref (sinkdpad);
2687
2688   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2689   session->demux_newpad_sig = g_signal_connect (session->demux,
2690       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2691   session->demux_padremoved_sig = g_signal_connect (session->demux,
2692       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2693
2694   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2695   session->recv_rtp_sink_ghost =
2696       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
2697   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2698   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2699
2700   return session->recv_rtp_sink_ghost;
2701
2702   /* ERRORS */
2703 no_name:
2704   {
2705     g_warning ("rtpbin: invalid name given");
2706     return NULL;
2707   }
2708 create_error:
2709   {
2710     /* create_session already warned */
2711     return NULL;
2712   }
2713 pad_failed:
2714   {
2715     g_warning ("rtpbin: failed to get session pad");
2716     return NULL;
2717   }
2718 }
2719
2720 static void
2721 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2722 {
2723   if (session->demux_newpad_sig) {
2724     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
2725     session->demux_newpad_sig = 0;
2726   }
2727   if (session->demux_padremoved_sig) {
2728     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
2729     session->demux_padremoved_sig = 0;
2730   }
2731   if (session->recv_rtp_src) {
2732     gst_object_unref (session->recv_rtp_src);
2733     session->recv_rtp_src = NULL;
2734   }
2735   if (session->recv_rtp_sink) {
2736     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
2737     gst_object_unref (session->recv_rtp_sink);
2738     session->recv_rtp_sink = NULL;
2739   }
2740   if (session->recv_rtp_sink_ghost) {
2741     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
2742     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2743         session->recv_rtp_sink_ghost);
2744     session->recv_rtp_sink_ghost = NULL;
2745   }
2746 }
2747
2748 /* Create a pad for receiving RTCP for the session in @name. Must be called with
2749  * RTP_BIN_LOCK.
2750  */
2751 static GstPad *
2752 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
2753     const gchar * name)
2754 {
2755   guint sessid;
2756   GstRtpBinSession *session;
2757   GstPad *sinkdpad;
2758
2759   /* first get the session number */
2760   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
2761     goto no_name;
2762
2763   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2764
2765   /* get or create the session */
2766   session = find_session_by_id (rtpbin, sessid);
2767   if (!session) {
2768     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2769     /* create session now */
2770     session = create_session (rtpbin, sessid);
2771     if (session == NULL)
2772       goto create_error;
2773   }
2774
2775   /* check if pad was requested */
2776   if (session->recv_rtcp_sink_ghost != NULL)
2777     return session->recv_rtcp_sink_ghost;
2778
2779   /* get recv_rtp pad and store */
2780   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
2781   session->recv_rtcp_sink =
2782       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
2783   if (session->recv_rtcp_sink == NULL)
2784     goto pad_failed;
2785
2786   /* get srcpad, link to SSRCDemux */
2787   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
2788   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
2789   if (session->sync_src == NULL)
2790     goto pad_failed;
2791
2792   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
2793   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
2794   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
2795   gst_object_unref (sinkdpad);
2796
2797   session->recv_rtcp_sink_ghost =
2798       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
2799   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
2800   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
2801       session->recv_rtcp_sink_ghost);
2802
2803   return session->recv_rtcp_sink_ghost;
2804
2805   /* ERRORS */
2806 no_name:
2807   {
2808     g_warning ("rtpbin: invalid name given");
2809     return NULL;
2810   }
2811 create_error:
2812   {
2813     /* create_session already warned */
2814     return NULL;
2815   }
2816 pad_failed:
2817   {
2818     g_warning ("rtpbin: failed to get session pad");
2819     return NULL;
2820   }
2821 }
2822
2823 static void
2824 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2825 {
2826   if (session->recv_rtcp_sink_ghost) {
2827     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
2828     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2829         session->recv_rtcp_sink_ghost);
2830     session->recv_rtcp_sink_ghost = NULL;
2831   }
2832   if (session->sync_src) {
2833     /* releasing the request pad should also unref the sync pad */
2834     gst_object_unref (session->sync_src);
2835     session->sync_src = NULL;
2836   }
2837   if (session->recv_rtcp_sink) {
2838     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
2839     gst_object_unref (session->recv_rtcp_sink);
2840     session->recv_rtcp_sink = NULL;
2841   }
2842 }
2843
2844 /* Create a pad for sending RTP for the session in @name. Must be called with
2845  * RTP_BIN_LOCK.
2846  */
2847 static GstPad *
2848 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2849 {
2850   gchar *gname;
2851   guint sessid;
2852   GstRtpBinSession *session;
2853   GstElementClass *klass;
2854
2855   /* first get the session number */
2856   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
2857     goto no_name;
2858
2859   /* get or create session */
2860   session = find_session_by_id (rtpbin, sessid);
2861   if (!session) {
2862     /* create session now */
2863     session = create_session (rtpbin, sessid);
2864     if (session == NULL)
2865       goto create_error;
2866   }
2867
2868   /* check if pad was requested */
2869   if (session->send_rtp_sink_ghost != NULL)
2870     return session->send_rtp_sink_ghost;
2871
2872   /* get send_rtp pad and store */
2873   session->send_rtp_sink =
2874       gst_element_get_request_pad (session->session, "send_rtp_sink");
2875   if (session->send_rtp_sink == NULL)
2876     goto pad_failed;
2877
2878   session->send_rtp_sink_ghost =
2879       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
2880   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
2881   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
2882
2883   /* get srcpad */
2884   session->send_rtp_src =
2885       gst_element_get_static_pad (session->session, "send_rtp_src");
2886   if (session->send_rtp_src == NULL)
2887     goto no_srcpad;
2888
2889   /* ghost the new source pad */
2890   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2891   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
2892   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
2893   session->send_rtp_src_ghost =
2894       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
2895   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
2896   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
2897   g_free (gname);
2898
2899   return session->send_rtp_sink_ghost;
2900
2901   /* ERRORS */
2902 no_name:
2903   {
2904     g_warning ("rtpbin: invalid name given");
2905     return NULL;
2906   }
2907 create_error:
2908   {
2909     /* create_session already warned */
2910     return NULL;
2911   }
2912 pad_failed:
2913   {
2914     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
2915     return NULL;
2916   }
2917 no_srcpad:
2918   {
2919     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
2920     return NULL;
2921   }
2922 }
2923
2924 static void
2925 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2926 {
2927   if (session->send_rtp_src_ghost) {
2928     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
2929     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2930         session->send_rtp_src_ghost);
2931     session->send_rtp_src_ghost = NULL;
2932   }
2933   if (session->send_rtp_src) {
2934     gst_object_unref (session->send_rtp_src);
2935     session->send_rtp_src = NULL;
2936   }
2937   if (session->send_rtp_sink) {
2938     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
2939         session->send_rtp_sink);
2940     gst_object_unref (session->send_rtp_sink);
2941     session->send_rtp_sink = NULL;
2942   }
2943   if (session->send_rtp_sink_ghost) {
2944     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
2945     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2946         session->send_rtp_sink_ghost);
2947     session->send_rtp_sink_ghost = NULL;
2948   }
2949 }
2950
2951 /* Create a pad for sending RTCP for the session in @name. Must be called with
2952  * RTP_BIN_LOCK.
2953  */
2954 static GstPad *
2955 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2956 {
2957   guint sessid;
2958   GstRtpBinSession *session;
2959
2960   /* first get the session number */
2961   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
2962     goto no_name;
2963
2964   /* get or create session */
2965   session = find_session_by_id (rtpbin, sessid);
2966   if (!session)
2967     goto no_session;
2968
2969   /* check if pad was requested */
2970   if (session->send_rtcp_src_ghost != NULL)
2971     return session->send_rtcp_src_ghost;
2972
2973   /* get rtcp_src pad and store */
2974   session->send_rtcp_src =
2975       gst_element_get_request_pad (session->session, "send_rtcp_src");
2976   if (session->send_rtcp_src == NULL)
2977     goto pad_failed;
2978
2979   session->send_rtcp_src_ghost =
2980       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
2981   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
2982   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
2983
2984   return session->send_rtcp_src_ghost;
2985
2986   /* ERRORS */
2987 no_name:
2988   {
2989     g_warning ("rtpbin: invalid name given");
2990     return NULL;
2991   }
2992 no_session:
2993   {
2994     g_warning ("rtpbin: session with id %d does not exist", sessid);
2995     return NULL;
2996   }
2997 pad_failed:
2998   {
2999     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
3000     return NULL;
3001   }
3002 }
3003
3004 static void
3005 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3006 {
3007   if (session->send_rtcp_src_ghost) {
3008     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
3009     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3010         session->send_rtcp_src_ghost);
3011     session->send_rtcp_src_ghost = NULL;
3012   }
3013   if (session->send_rtcp_src) {
3014     gst_element_release_request_pad (session->session, session->send_rtcp_src);
3015     gst_object_unref (session->send_rtcp_src);
3016     session->send_rtcp_src = NULL;
3017   }
3018 }
3019
3020 /* If the requested name is NULL we should create a name with
3021  * the session number assuming we want the lowest posible session
3022  * with a free pad like the template */
3023 static gchar *
3024 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
3025 {
3026   gboolean name_found = FALSE;
3027   gint session = 0;
3028   GstIterator *pad_it = NULL;
3029   gchar *pad_name = NULL;
3030   GValue data = { 0, };
3031
3032   GST_DEBUG_OBJECT (element, "find a free pad name for template");
3033   while (!name_found) {
3034     gboolean done = FALSE;
3035
3036     g_free (pad_name);
3037     pad_name = g_strdup_printf (templ->name_template, session++);
3038     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
3039     name_found = TRUE;
3040     while (!done) {
3041       switch (gst_iterator_next (pad_it, &data)) {
3042         case GST_ITERATOR_OK:
3043         {
3044           GstPad *pad;
3045           gchar *name;
3046
3047           pad = g_value_get_object (&data);
3048           name = gst_pad_get_name (pad);
3049
3050           if (strcmp (name, pad_name) == 0) {
3051             done = TRUE;
3052             name_found = FALSE;
3053           }
3054           g_free (name);
3055           g_value_reset (&data);
3056           break;
3057         }
3058         case GST_ITERATOR_ERROR:
3059         case GST_ITERATOR_RESYNC:
3060           /* restart iteration */
3061           done = TRUE;
3062           name_found = FALSE;
3063           session = 0;
3064           break;
3065         case GST_ITERATOR_DONE:
3066           done = TRUE;
3067           break;
3068       }
3069     }
3070     g_value_unset (&data);
3071     gst_iterator_free (pad_it);
3072   }
3073
3074   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3075   return pad_name;
3076 }
3077
3078 /*
3079  */
3080 static GstPad *
3081 gst_rtp_bin_request_new_pad (GstElement * element,
3082     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3083 {
3084   GstRtpBin *rtpbin;
3085   GstElementClass *klass;
3086   GstPad *result;
3087
3088   gchar *pad_name = NULL;
3089
3090   g_return_val_if_fail (templ != NULL, NULL);
3091   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3092
3093   rtpbin = GST_RTP_BIN (element);
3094   klass = GST_ELEMENT_GET_CLASS (element);
3095
3096   GST_RTP_BIN_LOCK (rtpbin);
3097
3098   if (name == NULL) {
3099     /* use a free pad name */
3100     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3101   } else {
3102     /* use the provided name */
3103     pad_name = g_strdup (name);
3104   }
3105
3106   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3107
3108   /* figure out the template */
3109   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3110     result = create_recv_rtp (rtpbin, templ, pad_name);
3111   } else if (templ == gst_element_class_get_pad_template (klass,
3112           "recv_rtcp_sink_%u")) {
3113     result = create_recv_rtcp (rtpbin, templ, pad_name);
3114   } else if (templ == gst_element_class_get_pad_template (klass,
3115           "send_rtp_sink_%u")) {
3116     result = create_send_rtp (rtpbin, templ, pad_name);
3117   } else if (templ == gst_element_class_get_pad_template (klass,
3118           "send_rtcp_src_%u")) {
3119     result = create_rtcp (rtpbin, templ, pad_name);
3120   } else
3121     goto wrong_template;
3122
3123   g_free (pad_name);
3124   GST_RTP_BIN_UNLOCK (rtpbin);
3125
3126   return result;
3127
3128   /* ERRORS */
3129 wrong_template:
3130   {
3131     g_free (pad_name);
3132     GST_RTP_BIN_UNLOCK (rtpbin);
3133     g_warning ("rtpbin: this is not our template");
3134     return NULL;
3135   }
3136 }
3137
3138 static void
3139 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3140 {
3141   GstRtpBinSession *session;
3142   GstRtpBin *rtpbin;
3143
3144   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3145   g_return_if_fail (GST_IS_RTP_BIN (element));
3146
3147   rtpbin = GST_RTP_BIN (element);
3148
3149   GST_RTP_BIN_LOCK (rtpbin);
3150   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3151       GST_DEBUG_PAD_NAME (pad));
3152
3153   if (!(session = find_session_by_pad (rtpbin, pad)))
3154     goto unknown_pad;
3155
3156   if (session->recv_rtp_sink_ghost == pad) {
3157     remove_recv_rtp (rtpbin, session);
3158   } else if (session->recv_rtcp_sink_ghost == pad) {
3159     remove_recv_rtcp (rtpbin, session);
3160   } else if (session->send_rtp_sink_ghost == pad) {
3161     remove_send_rtp (rtpbin, session);
3162   } else if (session->send_rtcp_src_ghost == pad) {
3163     remove_rtcp (rtpbin, session);
3164   }
3165
3166   /* no more request pads, free the complete session */
3167   if (session->recv_rtp_sink_ghost == NULL
3168       && session->recv_rtcp_sink_ghost == NULL
3169       && session->send_rtp_sink_ghost == NULL
3170       && session->send_rtcp_src_ghost == NULL) {
3171     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3172     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3173     free_session (session, rtpbin);
3174   }
3175   GST_RTP_BIN_UNLOCK (rtpbin);
3176
3177   return;
3178
3179   /* ERROR */
3180 unknown_pad:
3181   {
3182     GST_RTP_BIN_UNLOCK (rtpbin);
3183     g_warning ("rtpbin: %s:%s is not one of our request pads",
3184         GST_DEBUG_PAD_NAME (pad));
3185     return;
3186   }
3187 }