4236cd160b1280e6c7b7d9cc54dc2102037688a3
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of rtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * <refsect2>
67  * <title>Example pipelines</title>
68  * |[
69  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
70  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
71  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
72  * |[
73  * gst-launch-1.0 rtpbin name=rtpbin \
74  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
75  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
76  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
77  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
78  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
79  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
80  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
81  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
82  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
83  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
84  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
85  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
86  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
87  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
88  * is received on port 5007. Since RTCP packets from the sender should be sent
89  * as soon as possible and do not participate in preroll, sync=false and
90  * async=false is configured on udpsink
91  * |[
92  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
93  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
94  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
95  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
96  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
97  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
98  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
99  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
100  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
101  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
102  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
103  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
104  * decode and display the video.
105  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
106  * decode and play the audio.
107  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
108  * session 1 on port 5003. These packets will be used for session management and
109  * synchronisation.
110  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
111  * on port 5007.
112  * </refsect2>
113  *
114  * Last reviewed on 2007-08-30 (0.10.6)
115  */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120 #include <stdio.h>
121 #include <string.h>
122
123 #include <gst/rtp/gstrtpbuffer.h>
124 #include <gst/rtp/gstrtcpbuffer.h>
125
126 #include "gstrtpbin.h"
127 #include "rtpsession.h"
128 #include "gstrtpsession.h"
129 #include "gstrtpjitterbuffer.h"
130
131 #include <gst/glib-compat-private.h>
132
133 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
134 #define GST_CAT_DEFAULT gst_rtp_bin_debug
135
136 /* sink pads */
137 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
138 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
139     GST_PAD_SINK,
140     GST_PAD_REQUEST,
141     GST_STATIC_CAPS ("application/x-rtp")
142     );
143
144 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
145 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
146     GST_PAD_SINK,
147     GST_PAD_REQUEST,
148     GST_STATIC_CAPS ("application/x-rtcp")
149     );
150
151 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
152 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
153     GST_PAD_SINK,
154     GST_PAD_REQUEST,
155     GST_STATIC_CAPS ("application/x-rtp")
156     );
157
158 /* src pads */
159 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
160 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
161     GST_PAD_SRC,
162     GST_PAD_SOMETIMES,
163     GST_STATIC_CAPS ("application/x-rtp")
164     );
165
166 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
167 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
168     GST_PAD_SRC,
169     GST_PAD_REQUEST,
170     GST_STATIC_CAPS ("application/x-rtcp")
171     );
172
173 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
174 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
175     GST_PAD_SRC,
176     GST_PAD_SOMETIMES,
177     GST_STATIC_CAPS ("application/x-rtp")
178     );
179
180 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
181    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
182
183 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
184 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
185
186 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
187 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
188 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
189
190 /* lock for shutdown */
191 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
192 G_STMT_START {                                   \
193   if (g_atomic_int_get (&bin->priv->shutdown))   \
194     goto label;                                  \
195   GST_RTP_BIN_DYN_LOCK (bin);                    \
196   if (g_atomic_int_get (&bin->priv->shutdown)) { \
197     GST_RTP_BIN_DYN_UNLOCK (bin);                \
198     goto label;                                  \
199   }                                              \
200 } G_STMT_END
201
202 /* unlock for shutdown */
203 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
204   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
205
206 struct _GstRtpBinPrivate
207 {
208   GMutex bin_lock;
209
210   /* lock protecting dynamic adding/removing */
211   GMutex dyn_lock;
212
213   /* if we are shutting down or not */
214   gint shutdown;
215
216   gboolean autoremove;
217
218   /* UNIX (ntp) time of last SR sync used */
219   guint64 last_unix;
220 };
221
222 /* signals and args */
223 enum
224 {
225   SIGNAL_REQUEST_PT_MAP,
226   SIGNAL_PAYLOAD_TYPE_CHANGE,
227   SIGNAL_CLEAR_PT_MAP,
228   SIGNAL_RESET_SYNC,
229   SIGNAL_GET_INTERNAL_SESSION,
230
231   SIGNAL_ON_NEW_SSRC,
232   SIGNAL_ON_SSRC_COLLISION,
233   SIGNAL_ON_SSRC_VALIDATED,
234   SIGNAL_ON_SSRC_ACTIVE,
235   SIGNAL_ON_SSRC_SDES,
236   SIGNAL_ON_BYE_SSRC,
237   SIGNAL_ON_BYE_TIMEOUT,
238   SIGNAL_ON_TIMEOUT,
239   SIGNAL_ON_SENDER_TIMEOUT,
240   SIGNAL_ON_NPT_STOP,
241   LAST_SIGNAL
242 };
243
244 #define DEFAULT_LATENCY_MS           200
245 #define DEFAULT_DROP_ON_LATENCY      FALSE
246 #define DEFAULT_SDES                 NULL
247 #define DEFAULT_DO_LOST              FALSE
248 #define DEFAULT_IGNORE_PT            FALSE
249 #define DEFAULT_NTP_SYNC             FALSE
250 #define DEFAULT_AUTOREMOVE           FALSE
251 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
252 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
253 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
254 #define DEFAULT_RTCP_SYNC_INTERVAL   0
255 #define DEFAULT_DO_SYNC_EVENT        FALSE
256 #define DEFAULT_DO_RETRANSMISSION    FALSE
257
258 enum
259 {
260   PROP_0,
261   PROP_LATENCY,
262   PROP_DROP_ON_LATENCY,
263   PROP_SDES,
264   PROP_DO_LOST,
265   PROP_IGNORE_PT,
266   PROP_NTP_SYNC,
267   PROP_RTCP_SYNC,
268   PROP_RTCP_SYNC_INTERVAL,
269   PROP_AUTOREMOVE,
270   PROP_BUFFER_MODE,
271   PROP_USE_PIPELINE_CLOCK,
272   PROP_DO_SYNC_EVENT,
273   PROP_DO_RETRANSMISSION,
274   PROP_LAST
275 };
276
277 enum
278 {
279   GST_RTP_BIN_RTCP_SYNC_ALWAYS,
280   GST_RTP_BIN_RTCP_SYNC_INITIAL,
281   GST_RTP_BIN_RTCP_SYNC_RTP
282 };
283
284 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
285 static GType
286 gst_rtp_bin_rtcp_sync_get_type (void)
287 {
288   static GType rtcp_sync_type = 0;
289   static const GEnumValue rtcp_sync_types[] = {
290     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
291     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
292     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
293     {0, NULL, NULL},
294   };
295
296   if (!rtcp_sync_type) {
297     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
298   }
299   return rtcp_sync_type;
300 }
301
302 /* helper objects */
303 typedef struct _GstRtpBinSession GstRtpBinSession;
304 typedef struct _GstRtpBinStream GstRtpBinStream;
305 typedef struct _GstRtpBinClient GstRtpBinClient;
306
307 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
308
309 static GstCaps *pt_map_requested (GstElement * element, guint pt,
310     GstRtpBinSession * session);
311 static void payload_type_change (GstElement * element, guint pt,
312     GstRtpBinSession * session);
313 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
314 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
315 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
316 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
317 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
318 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
319
320 /* Manages the RTP stream for one SSRC.
321  *
322  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
323  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
324  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
325  * together (see below).
326  */
327 struct _GstRtpBinStream
328 {
329   /* the SSRC of this stream */
330   guint32 ssrc;
331
332   /* parent bin */
333   GstRtpBin *bin;
334
335   /* the session this SSRC belongs to */
336   GstRtpBinSession *session;
337
338   /* the jitterbuffer of the SSRC */
339   GstElement *buffer;
340   gulong buffer_handlesync_sig;
341   gulong buffer_ptreq_sig;
342   gulong buffer_ntpstop_sig;
343   gint percent;
344
345   /* the PT demuxer of the SSRC */
346   GstElement *demux;
347   gulong demux_newpad_sig;
348   gulong demux_padremoved_sig;
349   gulong demux_ptreq_sig;
350   gulong demux_ptchange_sig;
351
352   /* if we have calculated a valid rt_delta for this stream */
353   gboolean have_sync;
354   /* mapping to local RTP and NTP time */
355   gint64 rt_delta;
356   gint64 rtp_delta;
357   /* base rtptime in gst time */
358   gint64 clock_base;
359 };
360
361 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
362 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
363
364 /* Manages the receiving end of the packets.
365  *
366  * There is one such structure for each RTP session (audio/video/...).
367  * We get the RTP/RTCP packets and stuff them into the session manager. From
368  * there they are pushed into an SSRC demuxer that splits the stream based on
369  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
370  * the GstRtpBinStream above).
371  */
372 struct _GstRtpBinSession
373 {
374   /* session id */
375   gint id;
376   /* the parent bin */
377   GstRtpBin *bin;
378   /* the session element */
379   GstElement *session;
380   /* the SSRC demuxer */
381   GstElement *demux;
382   gulong demux_newpad_sig;
383   gulong demux_padremoved_sig;
384
385   GMutex lock;
386
387   /* list of GstRtpBinStream */
388   GSList *streams;
389
390   /* mapping of payload type to caps */
391   GHashTable *ptmap;
392
393   /* the pads of the session */
394   GstPad *recv_rtp_sink;
395   GstPad *recv_rtp_sink_ghost;
396   GstPad *recv_rtp_src;
397   GstPad *recv_rtcp_sink;
398   GstPad *recv_rtcp_sink_ghost;
399   GstPad *sync_src;
400   GstPad *send_rtp_sink;
401   GstPad *send_rtp_sink_ghost;
402   GstPad *send_rtp_src;
403   GstPad *send_rtp_src_ghost;
404   GstPad *send_rtcp_src;
405   GstPad *send_rtcp_src_ghost;
406 };
407
408 /* Manages the RTP streams that come from one client and should therefore be
409  * synchronized.
410  */
411 struct _GstRtpBinClient
412 {
413   /* the common CNAME for the streams */
414   gchar *cname;
415   guint cname_len;
416
417   /* the streams */
418   guint nstreams;
419   GSList *streams;
420 };
421
422 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
423 static GstRtpBinSession *
424 find_session_by_id (GstRtpBin * rtpbin, gint id)
425 {
426   GSList *walk;
427
428   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
429     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
430
431     if (sess->id == id)
432       return sess;
433   }
434   return NULL;
435 }
436
437 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
438 static GstRtpBinSession *
439 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
440 {
441   GSList *walk;
442
443   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
444     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
445
446     if ((sess->recv_rtp_sink_ghost == pad) ||
447         (sess->recv_rtcp_sink_ghost == pad) ||
448         (sess->send_rtp_sink_ghost == pad)
449         || (sess->send_rtcp_src_ghost == pad))
450       return sess;
451   }
452   return NULL;
453 }
454
455 static void
456 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
457 {
458   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
459       sess->id, ssrc);
460 }
461
462 static void
463 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
464 {
465   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
466       sess->id, ssrc);
467 }
468
469 static void
470 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
471 {
472   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
473       sess->id, ssrc);
474 }
475
476 static void
477 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
478 {
479   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
480       sess->id, ssrc);
481 }
482
483 static void
484 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
485 {
486   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
487       sess->id, ssrc);
488 }
489
490 static void
491 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
492 {
493   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
494       sess->id, ssrc);
495 }
496
497 static void
498 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
499 {
500   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
501       sess->id, ssrc);
502
503   if (sess->bin->priv->autoremove)
504     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
505 }
506
507 static void
508 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
509 {
510   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
511       sess->id, ssrc);
512
513   if (sess->bin->priv->autoremove)
514     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
515 }
516
517 static void
518 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
519 {
520   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
521       sess->id, ssrc);
522 }
523
524 static void
525 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
526 {
527   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
528       stream->session->id, stream->ssrc);
529 }
530
531 /* must be called with the SESSION lock */
532 static GstRtpBinStream *
533 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
534 {
535   GSList *walk;
536
537   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
538     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
539
540     if (stream->ssrc == ssrc)
541       return stream;
542   }
543   return NULL;
544 }
545
546 static void
547 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
548     GstRtpBinSession * session)
549 {
550   GstRtpBinStream *stream = NULL;
551   GstRtpBin *rtpbin;
552
553   rtpbin = session->bin;
554
555   GST_RTP_BIN_LOCK (rtpbin);
556
557   GST_RTP_SESSION_LOCK (session);
558   if ((stream = find_stream_by_ssrc (session, ssrc)))
559     session->streams = g_slist_remove (session->streams, stream);
560   GST_RTP_SESSION_UNLOCK (session);
561
562   if (stream)
563     free_stream (stream, rtpbin);
564
565   GST_RTP_BIN_UNLOCK (rtpbin);
566 }
567
568 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
569 static GstRtpBinSession *
570 create_session (GstRtpBin * rtpbin, gint id)
571 {
572   GstRtpBinSession *sess;
573   GstElement *session, *demux;
574   GstState target;
575
576   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
577     goto no_session;
578
579   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
580     goto no_demux;
581
582   sess = g_new0 (GstRtpBinSession, 1);
583   g_mutex_init (&sess->lock);
584   sess->id = id;
585   sess->bin = rtpbin;
586   sess->session = session;
587   sess->demux = demux;
588   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
589       (GDestroyNotify) gst_caps_unref);
590   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
591
592   /* configure SDES items */
593   GST_OBJECT_LOCK (rtpbin);
594   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
595       rtpbin->use_pipeline_clock, NULL);
596   GST_OBJECT_UNLOCK (rtpbin);
597
598   /* provide clock_rate to the session manager when needed */
599   g_signal_connect (session, "request-pt-map",
600       (GCallback) pt_map_requested, sess);
601
602   g_signal_connect (sess->session, "on-new-ssrc",
603       (GCallback) on_new_ssrc, sess);
604   g_signal_connect (sess->session, "on-ssrc-collision",
605       (GCallback) on_ssrc_collision, sess);
606   g_signal_connect (sess->session, "on-ssrc-validated",
607       (GCallback) on_ssrc_validated, sess);
608   g_signal_connect (sess->session, "on-ssrc-active",
609       (GCallback) on_ssrc_active, sess);
610   g_signal_connect (sess->session, "on-ssrc-sdes",
611       (GCallback) on_ssrc_sdes, sess);
612   g_signal_connect (sess->session, "on-bye-ssrc",
613       (GCallback) on_bye_ssrc, sess);
614   g_signal_connect (sess->session, "on-bye-timeout",
615       (GCallback) on_bye_timeout, sess);
616   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
617   g_signal_connect (sess->session, "on-sender-timeout",
618       (GCallback) on_sender_timeout, sess);
619
620   gst_bin_add (GST_BIN_CAST (rtpbin), session);
621   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
622
623   GST_OBJECT_LOCK (rtpbin);
624   target = GST_STATE_TARGET (rtpbin);
625   GST_OBJECT_UNLOCK (rtpbin);
626
627   /* change state only to what's needed */
628   gst_element_set_state (demux, target);
629   gst_element_set_state (session, target);
630
631   return sess;
632
633   /* ERRORS */
634 no_session:
635   {
636     g_warning ("rtpbin: could not create gstrtpsession element");
637     return NULL;
638   }
639 no_demux:
640   {
641     gst_object_unref (session);
642     g_warning ("rtpbin: could not create gstrtpssrcdemux element");
643     return NULL;
644   }
645 }
646
647 /* called with RTP_BIN_LOCK */
648 static void
649 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
650 {
651   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
652
653   gst_element_set_locked_state (sess->demux, TRUE);
654   gst_element_set_locked_state (sess->session, TRUE);
655
656   gst_element_set_state (sess->demux, GST_STATE_NULL);
657   gst_element_set_state (sess->session, GST_STATE_NULL);
658
659   remove_recv_rtp (bin, sess);
660   remove_recv_rtcp (bin, sess);
661   remove_send_rtp (bin, sess);
662   remove_rtcp (bin, sess);
663
664   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
665   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
666
667   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
668   g_slist_free (sess->streams);
669
670   g_mutex_clear (&sess->lock);
671   g_hash_table_destroy (sess->ptmap);
672
673   g_free (sess);
674 }
675
676 /* get the payload type caps for the specific payload @pt in @session */
677 static GstCaps *
678 get_pt_map (GstRtpBinSession * session, guint pt)
679 {
680   GstCaps *caps = NULL;
681   GstRtpBin *bin;
682   GValue ret = { 0 };
683   GValue args[3] = { {0}, {0}, {0} };
684
685   GST_DEBUG ("searching pt %d in cache", pt);
686
687   GST_RTP_SESSION_LOCK (session);
688
689   /* first look in the cache */
690   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
691   if (caps) {
692     gst_caps_ref (caps);
693     goto done;
694   }
695
696   bin = session->bin;
697
698   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
699
700   /* not in cache, send signal to request caps */
701   g_value_init (&args[0], GST_TYPE_ELEMENT);
702   g_value_set_object (&args[0], bin);
703   g_value_init (&args[1], G_TYPE_UINT);
704   g_value_set_uint (&args[1], session->id);
705   g_value_init (&args[2], G_TYPE_UINT);
706   g_value_set_uint (&args[2], pt);
707
708   g_value_init (&ret, GST_TYPE_CAPS);
709   g_value_set_boxed (&ret, NULL);
710
711   GST_RTP_SESSION_UNLOCK (session);
712
713   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
714
715   GST_RTP_SESSION_LOCK (session);
716
717   g_value_unset (&args[0]);
718   g_value_unset (&args[1]);
719   g_value_unset (&args[2]);
720
721   /* look in the cache again because we let the lock go */
722   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
723   if (caps) {
724     gst_caps_ref (caps);
725     g_value_unset (&ret);
726     goto done;
727   }
728
729   caps = (GstCaps *) g_value_dup_boxed (&ret);
730   g_value_unset (&ret);
731   if (!caps)
732     goto no_caps;
733
734   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
735
736   /* store in cache, take additional ref */
737   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
738       gst_caps_ref (caps));
739
740 done:
741   GST_RTP_SESSION_UNLOCK (session);
742
743   return caps;
744
745   /* ERRORS */
746 no_caps:
747   {
748     GST_RTP_SESSION_UNLOCK (session);
749     GST_DEBUG ("no pt map could be obtained");
750     return NULL;
751   }
752 }
753
754 static gboolean
755 return_true (gpointer key, gpointer value, gpointer user_data)
756 {
757   return TRUE;
758 }
759
760 static void
761 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
762 {
763   GSList *clients, *streams;
764
765   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
766
767   GST_RTP_BIN_LOCK (rtpbin);
768   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
769     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
770
771     /* reset sync on all streams for this client */
772     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
773       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
774
775       /* make use require a new SR packet for this stream before we attempt new
776        * lip-sync */
777       stream->have_sync = FALSE;
778       stream->rt_delta = 0;
779       stream->rtp_delta = 0;
780       stream->clock_base = -100 * GST_SECOND;
781     }
782   }
783   GST_RTP_BIN_UNLOCK (rtpbin);
784 }
785
786 static void
787 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
788 {
789   GSList *sessions, *streams;
790
791   GST_RTP_BIN_LOCK (bin);
792   GST_DEBUG_OBJECT (bin, "clearing pt map");
793   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
794     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
795
796     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
797     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
798
799     GST_RTP_SESSION_LOCK (session);
800     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
801
802     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
803       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
804
805       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
806       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
807       if (stream->demux)
808         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
809     }
810     GST_RTP_SESSION_UNLOCK (session);
811   }
812   GST_RTP_BIN_UNLOCK (bin);
813
814   /* reset sync too */
815   gst_rtp_bin_reset_sync (bin);
816 }
817
818 static RTPSession *
819 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
820 {
821   RTPSession *internal_session = NULL;
822   GstRtpBinSession *session;
823
824   GST_RTP_BIN_LOCK (bin);
825   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
826       session_id);
827   session = find_session_by_id (bin, (gint) session_id);
828   if (session) {
829     g_object_get (session->session, "internal-session", &internal_session,
830         NULL);
831   }
832   GST_RTP_BIN_UNLOCK (bin);
833
834   return internal_session;
835 }
836
837 static void
838 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
839     const gchar * name, const GValue * value)
840 {
841   GSList *sessions, *streams;
842
843   GST_RTP_BIN_LOCK (bin);
844   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
845     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
846
847     GST_RTP_SESSION_LOCK (session);
848     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
849       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
850
851       g_object_set_property (G_OBJECT (stream->buffer), name, value);
852     }
853     GST_RTP_SESSION_UNLOCK (session);
854   }
855   GST_RTP_BIN_UNLOCK (bin);
856 }
857
858 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
859 static GstRtpBinClient *
860 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
861 {
862   GstRtpBinClient *result = NULL;
863   GSList *walk;
864
865   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
866     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
867
868     if (len != client->cname_len)
869       continue;
870
871     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
872       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
873           client->cname);
874       result = client;
875       break;
876     }
877   }
878
879   /* nothing found, create one */
880   if (result == NULL) {
881     result = g_new0 (GstRtpBinClient, 1);
882     result->cname = g_strndup ((gchar *) data, len);
883     result->cname_len = len;
884     bin->clients = g_slist_prepend (bin->clients, result);
885     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
886         result->cname);
887   }
888   return result;
889 }
890
891 static void
892 free_client (GstRtpBinClient * client, GstRtpBin * bin)
893 {
894   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
895   g_slist_free (client->streams);
896   g_free (client->cname);
897   g_free (client);
898 }
899
900 static void
901 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
902     guint64 * ntpnstime)
903 {
904   guint64 ntpns;
905   GstClock *clock;
906   GstClockTime base_time, rt, clock_time;
907
908   GST_OBJECT_LOCK (bin);
909   if ((clock = GST_ELEMENT_CLOCK (bin))) {
910     base_time = GST_ELEMENT_CAST (bin)->base_time;
911     gst_object_ref (clock);
912     GST_OBJECT_UNLOCK (bin);
913
914     clock_time = gst_clock_get_time (clock);
915
916     if (bin->use_pipeline_clock) {
917       ntpns = clock_time - base_time;
918     } else {
919       GTimeVal current;
920
921       /* get current NTP time */
922       g_get_current_time (&current);
923       ntpns = GST_TIMEVAL_TO_TIME (current);
924     }
925
926     /* add constant to convert from 1970 based time to 1900 based time */
927     ntpns += (2208988800LL * GST_SECOND);
928
929     /* get current clock time and convert to running time */
930     rt = clock_time - base_time;
931
932     gst_object_unref (clock);
933   } else {
934     GST_OBJECT_UNLOCK (bin);
935     rt = -1;
936     ntpns = -1;
937   }
938   if (running_time)
939     *running_time = rt;
940   if (ntpnstime)
941     *ntpnstime = ntpns;
942 }
943
944 static void
945 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
946     gint64 ts_offset, gboolean check)
947 {
948   gint64 prev_ts_offset;
949
950   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
951
952   /* delta changed, see how much */
953   if (prev_ts_offset != ts_offset) {
954     gint64 diff;
955
956     diff = prev_ts_offset - ts_offset;
957
958     GST_DEBUG_OBJECT (bin,
959         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
960         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
961
962     if (check) {
963       /* only change diff when it changed more than 4 milliseconds. This
964        * compensates for rounding errors in NTP to RTP timestamp
965        * conversions */
966       if (ABS (diff) < 4 * GST_MSECOND) {
967         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
968         return;
969       }
970       if (ABS (diff) > (3 * GST_SECOND)) {
971         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
972         return;
973       }
974     }
975     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
976   }
977   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
978       stream->ssrc, ts_offset);
979 }
980
981 static void
982 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
983 {
984   if (stream->bin->send_sync_event) {
985     GstEvent *event;
986     GstPad *srcpad;
987
988     GST_DEBUG_OBJECT (stream->bin,
989         "sending GstRTCPSRReceived event downstream");
990
991     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
992         gst_structure_new_empty ("GstRTCPSRReceived"));
993
994     srcpad = gst_element_get_static_pad (stream->buffer, "src");
995     gst_pad_push_event (srcpad, event);
996     gst_object_unref (srcpad);
997   }
998 }
999
1000 /* associate a stream to the given CNAME. This will make sure all streams for
1001  * that CNAME are synchronized together.
1002  * Must be called with GST_RTP_BIN_LOCK */
1003 static void
1004 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1005     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1006     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1007     gint64 rtp_clock_base)
1008 {
1009   GstRtpBinClient *client;
1010   gboolean created;
1011   GSList *walk;
1012   guint64 local_rt;
1013   guint64 local_rtp;
1014   GstClockTime running_time;
1015   guint64 ntpnstime;
1016   gint64 ntpdiff, rtdiff;
1017   guint64 last_unix;
1018
1019   /* first find or create the CNAME */
1020   client = get_client (bin, len, data, &created);
1021
1022   /* find stream in the client */
1023   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1024     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1025
1026     if (ostream == stream)
1027       break;
1028   }
1029   /* not found, add it to the list */
1030   if (walk == NULL) {
1031     GST_DEBUG_OBJECT (bin,
1032         "new association of SSRC %08x with client %p with CNAME %s",
1033         stream->ssrc, client, client->cname);
1034     client->streams = g_slist_prepend (client->streams, stream);
1035     client->nstreams++;
1036   } else {
1037     GST_DEBUG_OBJECT (bin,
1038         "found association of SSRC %08x with client %p with CNAME %s",
1039         stream->ssrc, client, client->cname);
1040   }
1041
1042   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1043     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1044     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1045       /* we don't need that data, so carry on,
1046        * but make some values look saner */
1047       last_extrtptime = base_rtptime;
1048     } else {
1049       /* nothing we can do with this data in this case */
1050       GST_DEBUG_OBJECT (bin, "bailing out");
1051       return;
1052     }
1053   }
1054
1055   /* Take the extended rtptime we found in the SR packet and map it to the
1056    * local rtptime. The local rtp time is used to construct timestamps on the
1057    * buffers so we will calculate what running_time corresponds to the RTP
1058    * timestamp in the SR packet. */
1059   local_rtp = last_extrtptime - base_rtptime;
1060
1061   GST_DEBUG_OBJECT (bin,
1062       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1063       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1064       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1065       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1066
1067   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1068    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1069    * into a corresponding gstreamer timestamp. Note that the base_time also
1070    * contains the drift between sender and receiver. */
1071   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1072   local_rt += base_time;
1073
1074   /* convert ntptime to unix time since 1900 */
1075   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1076       (G_GINT64_CONSTANT (1) << 32));
1077
1078   stream->have_sync = TRUE;
1079
1080   GST_DEBUG_OBJECT (bin,
1081       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1082       local_rt, last_unix);
1083
1084   /* recalc inter stream playout offset, but only if there is more than one
1085    * stream or we're doing NTP sync. */
1086   if (bin->ntp_sync) {
1087     /* For NTP sync we need to first get a snapshot of running_time and NTP
1088      * time. We know at what running_time we play a certain RTP time, we also
1089      * calculated when we would play the RTP time in the SR packet. Now we need
1090      * to know how the running_time and the NTP time relate to eachother. */
1091     get_current_times (bin, &running_time, &ntpnstime);
1092
1093     /* see how far away the NTP time is. This is the difference between the
1094      * current NTP time and the NTP time in the last SR packet. */
1095     ntpdiff = ntpnstime - last_unix;
1096     /* see how far away the running_time is. This is the difference between the
1097      * current running_time and the running_time of the RTP timestamp in the
1098      * last SR packet. */
1099     rtdiff = running_time - local_rt;
1100
1101     GST_DEBUG_OBJECT (bin,
1102         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1103         ntpnstime, last_unix);
1104     GST_DEBUG_OBJECT (bin,
1105         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1106         rtdiff);
1107
1108     /* combine to get the final diff to apply to the running_time */
1109     stream->rt_delta = rtdiff - ntpdiff;
1110
1111     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1112   } else {
1113     gint64 min, rtp_min, clock_base = stream->clock_base;
1114     gboolean all_sync, use_rtp;
1115     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1116
1117     /* calculate delta between server and receiver. last_unix is created by
1118      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1119      * delta expresses the difference to our timeline and the server timeline. The
1120      * difference in itself doesn't mean much but we can combine the delta of
1121      * multiple streams to create a stream specific offset. */
1122     stream->rt_delta = last_unix - local_rt;
1123
1124     /* calculate the min of all deltas, ignoring streams that did not yet have a
1125      * valid rt_delta because we did not yet receive an SR packet for those
1126      * streams.
1127      * We calculate the mininum because we would like to only apply positive
1128      * offsets to streams, delaying their playback instead of trying to speed up
1129      * other streams (which might be imposible when we have to create negative
1130      * latencies).
1131      * The stream that has the smallest diff is selected as the reference stream,
1132      * all other streams will have a positive offset to this difference. */
1133
1134     /* some alternative setting allow ignoring RTCP as much as possible,
1135      * for servers generating bogus ntp timeline */
1136     min = rtp_min = G_MAXINT64;
1137     use_rtp = FALSE;
1138     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1139       guint64 ext_base;
1140
1141       use_rtp = TRUE;
1142       /* signed version for convienience */
1143       clock_base = base_rtptime;
1144       /* deal with possible wrap-around */
1145       ext_base = base_rtptime;
1146       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1147       /* sanity check; base rtp and provided clock_base should be close */
1148       if (rtp_clock_base >= clock_base) {
1149         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1150           rtp_clock_base = base_time +
1151               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1152               GST_SECOND, clock_rate);
1153         } else {
1154           use_rtp = FALSE;
1155         }
1156       } else {
1157         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1158           rtp_clock_base = base_time -
1159               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1160               GST_SECOND, clock_rate);
1161         } else {
1162           use_rtp = FALSE;
1163         }
1164       }
1165       /* warn and bail for clarity out if no sane values */
1166       if (!use_rtp) {
1167         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1168         return;
1169       }
1170       /* store to track changes */
1171       clock_base = rtp_clock_base;
1172       /* generate a fake as before,
1173        * now equating rtptime obtained from RTP-Info,
1174        * where the large time represent the otherwise irrelevant npt/ntp time */
1175       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1176     } else {
1177       clock_base = rtp_clock_base;
1178     }
1179
1180     all_sync = TRUE;
1181     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1182       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1183
1184       if (!ostream->have_sync) {
1185         all_sync = FALSE;
1186         continue;
1187       }
1188
1189       /* change in current stream's base from previously init'ed value
1190        * leads to reset of all stream's base */
1191       if (stream != ostream && stream->clock_base >= 0 &&
1192           (stream->clock_base != clock_base)) {
1193         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1194         ostream->clock_base = -100 * GST_SECOND;
1195         ostream->rtp_delta = 0;
1196       }
1197
1198       if (ostream->rt_delta < min)
1199         min = ostream->rt_delta;
1200       if (ostream->rtp_delta < rtp_min)
1201         rtp_min = ostream->rtp_delta;
1202     }
1203
1204     /* arrange to re-sync for each stream upon significant change,
1205      * e.g. post-seek */
1206     all_sync = all_sync && (stream->clock_base == clock_base);
1207     stream->clock_base = clock_base;
1208
1209     /* may need init performed above later on, but nothing more to do now */
1210     if (client->nstreams <= 1)
1211       return;
1212
1213     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1214         " all sync %d", client, min, all_sync);
1215     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1216
1217     switch (rtcp_sync) {
1218       case GST_RTP_BIN_RTCP_SYNC_RTP:
1219         if (!use_rtp)
1220           break;
1221         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1222             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1223         /* fall-through */
1224       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1225         /* if all have been synced already, do not bother further */
1226         if (all_sync) {
1227           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1228           return;
1229         }
1230         break;
1231       default:
1232         break;
1233     }
1234
1235     /* bail out if we adjusted recently enough */
1236     if (all_sync && (last_unix - bin->priv->last_unix) <
1237         bin->rtcp_sync_interval * GST_MSECOND) {
1238       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1239           "previous sender info too recent "
1240           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1241       return;
1242     }
1243     bin->priv->last_unix = last_unix;
1244
1245     /* calculate offsets for each stream */
1246     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1247       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1248       gint64 ts_offset;
1249
1250       /* ignore streams for which we didn't receive an SR packet yet, we
1251        * can't synchronize them yet. We can however sync other streams just
1252        * fine. */
1253       if (!ostream->have_sync)
1254         continue;
1255
1256       /* calculate offset to our reference stream, this should always give a
1257        * positive number. */
1258       if (use_rtp)
1259         ts_offset = ostream->rtp_delta - rtp_min;
1260       else
1261         ts_offset = ostream->rt_delta - min;
1262
1263       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1264     }
1265   }
1266   gst_rtp_bin_send_sync_event (stream);
1267
1268   return;
1269 }
1270
1271 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1272   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1273           (b) = gst_rtcp_packet_move_to_next ((packet)))
1274
1275 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1276   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1277           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1278
1279 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1280   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1281           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1282
1283 static void
1284 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1285     GstRtpBinStream * stream)
1286 {
1287   GstRtpBin *bin;
1288   GstRTCPPacket packet;
1289   guint32 ssrc;
1290   guint64 ntptime;
1291   gboolean have_sr, have_sdes;
1292   gboolean more;
1293   guint64 base_rtptime;
1294   guint64 base_time;
1295   guint clock_rate;
1296   guint64 clock_base;
1297   guint64 extrtptime;
1298   GstBuffer *buffer;
1299   GstRTCPBuffer rtcp = { NULL, };
1300
1301   bin = stream->bin;
1302
1303   GST_DEBUG_OBJECT (bin, "sync handler called");
1304
1305   /* get the last relation between the rtp timestamps and the gstreamer
1306    * timestamps. We get this info directly from the jitterbuffer which
1307    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1308    * what the current situation is. */
1309   base_rtptime =
1310       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1311   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1312   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1313   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1314   extrtptime =
1315       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1316   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1317
1318   have_sr = FALSE;
1319   have_sdes = FALSE;
1320
1321   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1322
1323   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1324     /* first packet must be SR or RR or else the validate would have failed */
1325     switch (gst_rtcp_packet_get_type (&packet)) {
1326       case GST_RTCP_TYPE_SR:
1327         /* only parse first. There is only supposed to be one SR in the packet
1328          * but we will deal with malformed packets gracefully */
1329         if (have_sr)
1330           break;
1331         /* get NTP and RTP times */
1332         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1333             NULL, NULL);
1334
1335         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1336         /* ignore SR that is not ours */
1337         if (ssrc != stream->ssrc)
1338           continue;
1339
1340         have_sr = TRUE;
1341         break;
1342       case GST_RTCP_TYPE_SDES:
1343       {
1344         gboolean more_items, more_entries;
1345
1346         /* only deal with first SDES, there is only supposed to be one SDES in
1347          * the RTCP packet but we deal with bad packets gracefully. Also bail
1348          * out if we have not seen an SR item yet. */
1349         if (have_sdes || !have_sr)
1350           break;
1351
1352         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1353           /* skip items that are not about the SSRC of the sender */
1354           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1355             continue;
1356
1357           /* find the CNAME entry */
1358           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1359             GstRTCPSDESType type;
1360             guint8 len;
1361             guint8 *data;
1362
1363             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1364
1365             if (type == GST_RTCP_SDES_CNAME) {
1366               GST_RTP_BIN_LOCK (bin);
1367               /* associate the stream to CNAME */
1368               gst_rtp_bin_associate (bin, stream, len, data,
1369                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1370                   clock_base);
1371               GST_RTP_BIN_UNLOCK (bin);
1372             }
1373           }
1374         }
1375         have_sdes = TRUE;
1376         break;
1377       }
1378       default:
1379         /* we can ignore these packets */
1380         break;
1381     }
1382   }
1383   gst_rtcp_buffer_unmap (&rtcp);
1384 }
1385
1386 /* create a new stream with @ssrc in @session. Must be called with
1387  * RTP_SESSION_LOCK. */
1388 static GstRtpBinStream *
1389 create_stream (GstRtpBinSession * session, guint32 ssrc)
1390 {
1391   GstElement *buffer, *demux = NULL;
1392   GstRtpBinStream *stream;
1393   GstRtpBin *rtpbin;
1394   GstState target;
1395
1396   rtpbin = session->bin;
1397
1398   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1399     goto no_jitterbuffer;
1400
1401   if (!rtpbin->ignore_pt)
1402     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1403       goto no_demux;
1404
1405
1406   stream = g_new0 (GstRtpBinStream, 1);
1407   stream->ssrc = ssrc;
1408   stream->bin = rtpbin;
1409   stream->session = session;
1410   stream->buffer = buffer;
1411   stream->demux = demux;
1412
1413   stream->have_sync = FALSE;
1414   stream->rt_delta = 0;
1415   stream->rtp_delta = 0;
1416   stream->percent = 100;
1417   stream->clock_base = -100 * GST_SECOND;
1418   session->streams = g_slist_prepend (session->streams, stream);
1419
1420   /* provide clock_rate to the jitterbuffer when needed */
1421   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1422       (GCallback) pt_map_requested, session);
1423   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1424       (GCallback) on_npt_stop, stream);
1425
1426   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1427   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1428
1429   /* configure latency and packet lost */
1430   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1431   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1432   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1433   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1434   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1435
1436   if (!rtpbin->ignore_pt)
1437     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1438   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1439
1440   /* link stuff */
1441   if (demux)
1442     gst_element_link (buffer, demux);
1443
1444   if (rtpbin->buffering) {
1445     guint64 last_out;
1446
1447     GST_INFO_OBJECT (rtpbin,
1448         "bin is buffering, set jitterbuffer as not active");
1449     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1450   }
1451
1452
1453   GST_OBJECT_LOCK (rtpbin);
1454   target = GST_STATE_TARGET (rtpbin);
1455   GST_OBJECT_UNLOCK (rtpbin);
1456
1457   /* from sink to source */
1458   if (demux)
1459     gst_element_set_state (demux, target);
1460
1461   gst_element_set_state (buffer, target);
1462
1463   return stream;
1464
1465   /* ERRORS */
1466 no_jitterbuffer:
1467   {
1468     g_warning ("rtpbin: could not create gstrtpjitterbuffer element");
1469     return NULL;
1470   }
1471 no_demux:
1472   {
1473     gst_object_unref (buffer);
1474     g_warning ("rtpbin: could not create gstrtpptdemux element");
1475     return NULL;
1476   }
1477 }
1478
1479 /* called with RTP_BIN_LOCK */
1480 static void
1481 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1482 {
1483   GSList *clients, *next_client;
1484
1485   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1486
1487   if (stream->demux) {
1488     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1489     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1490     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1491   }
1492   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1493   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1494   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1495
1496   if (stream->demux)
1497     gst_element_set_locked_state (stream->demux, TRUE);
1498   gst_element_set_locked_state (stream->buffer, TRUE);
1499
1500   if (stream->demux)
1501     gst_element_set_state (stream->demux, GST_STATE_NULL);
1502   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1503
1504   /* now remove this signal, we need this while going to NULL because it to
1505    * do some cleanups */
1506   if (stream->demux)
1507     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1508
1509   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1510   if (stream->demux)
1511     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1512
1513   for (clients = bin->clients; clients; clients = next_client) {
1514     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1515     GSList *streams, *next_stream;
1516
1517     next_client = g_slist_next (clients);
1518
1519     for (streams = client->streams; streams; streams = next_stream) {
1520       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1521
1522       next_stream = g_slist_next (streams);
1523
1524       if (ostream == stream) {
1525         client->streams = g_slist_delete_link (client->streams, streams);
1526         /* If this was the last stream belonging to this client,
1527          * clean up the client. */
1528         if (--client->nstreams == 0) {
1529           bin->clients = g_slist_delete_link (bin->clients, clients);
1530           free_client (client, bin);
1531           break;
1532         }
1533       }
1534     }
1535   }
1536   g_free (stream);
1537 }
1538
1539 /* GObject vmethods */
1540 static void gst_rtp_bin_dispose (GObject * object);
1541 static void gst_rtp_bin_finalize (GObject * object);
1542 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1543     const GValue * value, GParamSpec * pspec);
1544 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1545     GValue * value, GParamSpec * pspec);
1546
1547 /* GstElement vmethods */
1548 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1549     GstStateChange transition);
1550 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1551     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1552 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1553 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1554
1555 #define gst_rtp_bin_parent_class parent_class
1556 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1557
1558 static void
1559 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1560 {
1561   GObjectClass *gobject_class;
1562   GstElementClass *gstelement_class;
1563   GstBinClass *gstbin_class;
1564
1565   gobject_class = (GObjectClass *) klass;
1566   gstelement_class = (GstElementClass *) klass;
1567   gstbin_class = (GstBinClass *) klass;
1568
1569   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1570
1571   gobject_class->dispose = gst_rtp_bin_dispose;
1572   gobject_class->finalize = gst_rtp_bin_finalize;
1573   gobject_class->set_property = gst_rtp_bin_set_property;
1574   gobject_class->get_property = gst_rtp_bin_get_property;
1575
1576   g_object_class_install_property (gobject_class, PROP_LATENCY,
1577       g_param_spec_uint ("latency", "Buffer latency in ms",
1578           "Default amount of ms to buffer in the jitterbuffers", 0,
1579           G_MAXUINT, DEFAULT_LATENCY_MS,
1580           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1581
1582   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1583       g_param_spec_boolean ("drop-on-latency",
1584           "Drop buffers when maximum latency is reached",
1585           "Tells the jitterbuffer to never exceed the given latency in size",
1586           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1587
1588   /**
1589    * GstRtpBin::request-pt-map:
1590    * @rtpbin: the object which received the signal
1591    * @session: the session
1592    * @pt: the pt
1593    *
1594    * Request the payload type as #GstCaps for @pt in @session.
1595    */
1596   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1597       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1598       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1599       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS, 2, G_TYPE_UINT,
1600       G_TYPE_UINT);
1601
1602     /**
1603    * GstRtpBin::payload-type-change:
1604    * @rtpbin: the object which received the signal
1605    * @session: the session
1606    * @pt: the pt
1607    *
1608    * Signal that the current payload type changed to @pt in @session.
1609    *
1610    * Since: 0.10.17
1611    */
1612   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1613       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1614       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1615       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1616       G_TYPE_UINT);
1617
1618   /**
1619    * GstRtpBin::clear-pt-map:
1620    * @rtpbin: the object which received the signal
1621    *
1622    * Clear all previously cached pt-mapping obtained with
1623    * #GstRtpBin::request-pt-map.
1624    */
1625   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1626       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1627       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1628           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1629       0, G_TYPE_NONE);
1630
1631   /**
1632    * GstRtpBin::reset-sync:
1633    * @rtpbin: the object which received the signal
1634    *
1635    * Reset all currently configured lip-sync parameters and require new SR
1636    * packets for all streams before lip-sync is attempted again.
1637    */
1638   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1639       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1640       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1641           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1642       0, G_TYPE_NONE);
1643
1644   /**
1645    * GstRtpBin::get-internal-session:
1646    * @rtpbin: the object which received the signal
1647    * @id: the session id
1648    *
1649    * Request the internal RTPSession object as #GObject in session @id.
1650    */
1651   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1652       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1653       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1654           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
1655       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1656
1657   /**
1658    * GstRtpBin::on-new-ssrc:
1659    * @rtpbin: the object which received the signal
1660    * @session: the session
1661    * @ssrc: the SSRC
1662    *
1663    * Notify of a new SSRC that entered @session.
1664    */
1665   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1666       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1667       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1668       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1669       G_TYPE_UINT);
1670   /**
1671    * GstRtpBin::on-ssrc-collision:
1672    * @rtpbin: the object which received the signal
1673    * @session: the session
1674    * @ssrc: the SSRC
1675    *
1676    * Notify when we have an SSRC collision
1677    */
1678   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1679       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1680       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1681       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1682       G_TYPE_UINT);
1683   /**
1684    * GstRtpBin::on-ssrc-validated:
1685    * @rtpbin: the object which received the signal
1686    * @session: the session
1687    * @ssrc: the SSRC
1688    *
1689    * Notify of a new SSRC that became validated.
1690    */
1691   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1692       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1693       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1694       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1695       G_TYPE_UINT);
1696   /**
1697    * GstRtpBin::on-ssrc-active:
1698    * @rtpbin: the object which received the signal
1699    * @session: the session
1700    * @ssrc: the SSRC
1701    *
1702    * Notify of a SSRC that is active, i.e., sending RTCP.
1703    */
1704   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1705       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1706       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1707       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1708       G_TYPE_UINT);
1709   /**
1710    * GstRtpBin::on-ssrc-sdes:
1711    * @rtpbin: the object which received the signal
1712    * @session: the session
1713    * @ssrc: the SSRC
1714    *
1715    * Notify of a SSRC that is active, i.e., sending RTCP.
1716    */
1717   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1718       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1719       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1720       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1721       G_TYPE_UINT);
1722
1723   /**
1724    * GstRtpBin::on-bye-ssrc:
1725    * @rtpbin: the object which received the signal
1726    * @session: the session
1727    * @ssrc: the SSRC
1728    *
1729    * Notify of an SSRC that became inactive because of a BYE packet.
1730    */
1731   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1732       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1733       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1734       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1735       G_TYPE_UINT);
1736   /**
1737    * GstRtpBin::on-bye-timeout:
1738    * @rtpbin: the object which received the signal
1739    * @session: the session
1740    * @ssrc: the SSRC
1741    *
1742    * Notify of an SSRC that has timed out because of BYE
1743    */
1744   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1745       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1746       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1747       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1748       G_TYPE_UINT);
1749   /**
1750    * GstRtpBin::on-timeout:
1751    * @rtpbin: the object which received the signal
1752    * @session: the session
1753    * @ssrc: the SSRC
1754    *
1755    * Notify of an SSRC that has timed out
1756    */
1757   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1758       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1759       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1760       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1761       G_TYPE_UINT);
1762   /**
1763    * GstRtpBin::on-sender-timeout:
1764    * @rtpbin: the object which received the signal
1765    * @session: the session
1766    * @ssrc: the SSRC
1767    *
1768    * Notify of a sender SSRC that has timed out and became a receiver
1769    */
1770   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1771       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1772       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1773       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1774       G_TYPE_UINT);
1775
1776   /**
1777    * GstRtpBin::on-npt-stop:
1778    * @rtpbin: the object which received the signal
1779    * @session: the session
1780    * @ssrc: the SSRC
1781    *
1782    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1783    */
1784   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1785       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1786       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1787       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1788       G_TYPE_UINT);
1789
1790   g_object_class_install_property (gobject_class, PROP_SDES,
1791       g_param_spec_boxed ("sdes", "SDES",
1792           "The SDES items of this session",
1793           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1794
1795   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1796       g_param_spec_boolean ("do-lost", "Do Lost",
1797           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1798           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1799
1800   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
1801       g_param_spec_boolean ("autoremove", "Auto Remove",
1802           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
1803           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1804
1805   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
1806       g_param_spec_boolean ("ignore-pt", "Ignore PT",
1807           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
1808           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1809
1810   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
1811       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
1812           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
1813           DEFAULT_USE_PIPELINE_CLOCK,
1814           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1815   /**
1816    * GstRtpBin::buffer-mode:
1817    *
1818    * Control the buffering and timestamping mode used by the jitterbuffer.
1819    *
1820    * Since: 0.10.17
1821    */
1822   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
1823       g_param_spec_enum ("buffer-mode", "Buffer Mode",
1824           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
1825           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1826   /**
1827    * GstRtpBin::ntp-sync:
1828    *
1829    * Set the NTP time from the sender reports as the running-time on the
1830    * buffers. When both the sender and receiver have sychronized
1831    * running-time, i.e. when the clock and base-time is shared
1832    * between the receivers and the and the senders, this option can be
1833    * used to synchronize receivers on multiple machines.
1834    *
1835    * Since: 0.10.21
1836    */
1837   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
1838       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
1839           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
1840           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1841
1842   /**
1843    * GstRtpBin::rtcp-sync:
1844    *
1845    * If not synchronizing (directly) to the NTP clock, determines how to sync
1846    * the various streams.
1847    *
1848    * Since: 0.10.31
1849    */
1850   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
1851       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
1852           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
1853           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1854
1855   /**
1856    * GstRtpBin::rtcp-sync-interval:
1857    *
1858    * Determines how often to sync streams using RTCP data.
1859    *
1860    * Since: 0.10.31
1861    */
1862   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
1863       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
1864           "RTCP SR interval synchronization (ms) (0 = always)",
1865           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
1866           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1867
1868   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
1869       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
1870           "Send event downstream when a stream is synchronized to the sender",
1871           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1872
1873   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
1874       g_param_spec_boolean ("do-retransmission", "Do retransmission",
1875           "Send an event downstream to request packet retransmission",
1876           DEFAULT_DO_RETRANSMISSION,
1877           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1878
1879   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1880   gstelement_class->request_new_pad =
1881       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1882   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1883
1884   /* sink pads */
1885   gst_element_class_add_pad_template (gstelement_class,
1886       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
1887   gst_element_class_add_pad_template (gstelement_class,
1888       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
1889   gst_element_class_add_pad_template (gstelement_class,
1890       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
1891
1892   /* src pads */
1893   gst_element_class_add_pad_template (gstelement_class,
1894       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
1895   gst_element_class_add_pad_template (gstelement_class,
1896       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
1897   gst_element_class_add_pad_template (gstelement_class,
1898       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
1899
1900   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
1901       "Filter/Network/RTP",
1902       "Real-Time Transport Protocol bin",
1903       "Wim Taymans <wim.taymans@gmail.com>");
1904
1905   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
1906
1907   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1908   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
1909   klass->get_internal_session =
1910       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
1911
1912   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1913 }
1914
1915 static void
1916 gst_rtp_bin_init (GstRtpBin * rtpbin)
1917 {
1918   gchar *cname;
1919
1920   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1921   g_mutex_init (&rtpbin->priv->bin_lock);
1922   g_mutex_init (&rtpbin->priv->dyn_lock);
1923
1924   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
1925   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
1926   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
1927   rtpbin->do_lost = DEFAULT_DO_LOST;
1928   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
1929   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
1930   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
1931   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
1932   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
1933   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
1934   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
1935   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
1936   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1937
1938   /* some default SDES entries */
1939   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
1940   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
1941       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
1942   g_free (cname);
1943 }
1944
1945 static void
1946 gst_rtp_bin_dispose (GObject * object)
1947 {
1948   GstRtpBin *rtpbin;
1949
1950   rtpbin = GST_RTP_BIN (object);
1951
1952   GST_RTP_BIN_LOCK (rtpbin);
1953   GST_DEBUG_OBJECT (object, "freeing sessions");
1954   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
1955   g_slist_free (rtpbin->sessions);
1956   rtpbin->sessions = NULL;
1957   GST_RTP_BIN_UNLOCK (rtpbin);
1958
1959   G_OBJECT_CLASS (parent_class)->dispose (object);
1960 }
1961
1962 static void
1963 gst_rtp_bin_finalize (GObject * object)
1964 {
1965   GstRtpBin *rtpbin;
1966
1967   rtpbin = GST_RTP_BIN (object);
1968
1969   if (rtpbin->sdes)
1970     gst_structure_free (rtpbin->sdes);
1971
1972   g_mutex_clear (&rtpbin->priv->bin_lock);
1973   g_mutex_clear (&rtpbin->priv->dyn_lock);
1974
1975   G_OBJECT_CLASS (parent_class)->finalize (object);
1976 }
1977
1978
1979 static void
1980 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
1981 {
1982   GSList *item;
1983
1984   if (sdes == NULL)
1985     return;
1986
1987   GST_RTP_BIN_LOCK (bin);
1988
1989   GST_OBJECT_LOCK (bin);
1990   if (bin->sdes)
1991     gst_structure_free (bin->sdes);
1992   bin->sdes = gst_structure_copy (sdes);
1993   GST_OBJECT_UNLOCK (bin);
1994
1995   /* store in all sessions */
1996   for (item = bin->sessions; item; item = g_slist_next (item)) {
1997     GstRtpBinSession *session = item->data;
1998     g_object_set (session->session, "sdes", sdes, NULL);
1999   }
2000
2001   GST_RTP_BIN_UNLOCK (bin);
2002 }
2003
2004 static GstStructure *
2005 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2006 {
2007   GstStructure *result;
2008
2009   GST_OBJECT_LOCK (bin);
2010   result = gst_structure_copy (bin->sdes);
2011   GST_OBJECT_UNLOCK (bin);
2012
2013   return result;
2014 }
2015
2016 static void
2017 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2018     const GValue * value, GParamSpec * pspec)
2019 {
2020   GstRtpBin *rtpbin;
2021
2022   rtpbin = GST_RTP_BIN (object);
2023
2024   switch (prop_id) {
2025     case PROP_LATENCY:
2026       GST_RTP_BIN_LOCK (rtpbin);
2027       rtpbin->latency_ms = g_value_get_uint (value);
2028       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2029       GST_RTP_BIN_UNLOCK (rtpbin);
2030       /* propagate the property down to the jitterbuffer */
2031       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2032       break;
2033     case PROP_DROP_ON_LATENCY:
2034       GST_RTP_BIN_LOCK (rtpbin);
2035       rtpbin->drop_on_latency = g_value_get_boolean (value);
2036       GST_RTP_BIN_UNLOCK (rtpbin);
2037       /* propagate the property down to the jitterbuffer */
2038       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2039           "drop-on-latency", value);
2040       break;
2041     case PROP_SDES:
2042       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2043       break;
2044     case PROP_DO_LOST:
2045       GST_RTP_BIN_LOCK (rtpbin);
2046       rtpbin->do_lost = g_value_get_boolean (value);
2047       GST_RTP_BIN_UNLOCK (rtpbin);
2048       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2049       break;
2050     case PROP_NTP_SYNC:
2051       rtpbin->ntp_sync = g_value_get_boolean (value);
2052       break;
2053     case PROP_RTCP_SYNC:
2054       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2055       break;
2056     case PROP_RTCP_SYNC_INTERVAL:
2057       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2058       break;
2059     case PROP_IGNORE_PT:
2060       rtpbin->ignore_pt = g_value_get_boolean (value);
2061       break;
2062     case PROP_AUTOREMOVE:
2063       rtpbin->priv->autoremove = g_value_get_boolean (value);
2064       break;
2065     case PROP_USE_PIPELINE_CLOCK:
2066     {
2067       GSList *sessions;
2068       GST_RTP_BIN_LOCK (rtpbin);
2069       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2070       for (sessions = rtpbin->sessions; sessions;
2071           sessions = g_slist_next (sessions)) {
2072         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2073
2074         g_object_set (G_OBJECT (session->session),
2075             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2076       }
2077       GST_RTP_BIN_UNLOCK (rtpbin);
2078     }
2079       break;
2080     case PROP_DO_SYNC_EVENT:
2081       rtpbin->send_sync_event = g_value_get_boolean (value);
2082       break;
2083     case PROP_BUFFER_MODE:
2084       GST_RTP_BIN_LOCK (rtpbin);
2085       rtpbin->buffer_mode = g_value_get_enum (value);
2086       GST_RTP_BIN_UNLOCK (rtpbin);
2087       /* propagate the property down to the jitterbuffer */
2088       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2089       break;
2090     case PROP_DO_RETRANSMISSION:
2091       GST_RTP_BIN_LOCK (rtpbin);
2092       rtpbin->do_retransmission = g_value_get_boolean (value);
2093       GST_RTP_BIN_UNLOCK (rtpbin);
2094       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2095           "do-retransmission", value);
2096       break;
2097     default:
2098       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2099       break;
2100   }
2101 }
2102
2103 static void
2104 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2105     GValue * value, GParamSpec * pspec)
2106 {
2107   GstRtpBin *rtpbin;
2108
2109   rtpbin = GST_RTP_BIN (object);
2110
2111   switch (prop_id) {
2112     case PROP_LATENCY:
2113       GST_RTP_BIN_LOCK (rtpbin);
2114       g_value_set_uint (value, rtpbin->latency_ms);
2115       GST_RTP_BIN_UNLOCK (rtpbin);
2116       break;
2117     case PROP_DROP_ON_LATENCY:
2118       GST_RTP_BIN_LOCK (rtpbin);
2119       g_value_set_boolean (value, rtpbin->drop_on_latency);
2120       GST_RTP_BIN_UNLOCK (rtpbin);
2121       break;
2122     case PROP_SDES:
2123       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2124       break;
2125     case PROP_DO_LOST:
2126       GST_RTP_BIN_LOCK (rtpbin);
2127       g_value_set_boolean (value, rtpbin->do_lost);
2128       GST_RTP_BIN_UNLOCK (rtpbin);
2129       break;
2130     case PROP_IGNORE_PT:
2131       g_value_set_boolean (value, rtpbin->ignore_pt);
2132       break;
2133     case PROP_NTP_SYNC:
2134       g_value_set_boolean (value, rtpbin->ntp_sync);
2135       break;
2136     case PROP_RTCP_SYNC:
2137       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2138       break;
2139     case PROP_RTCP_SYNC_INTERVAL:
2140       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2141       break;
2142     case PROP_AUTOREMOVE:
2143       g_value_set_boolean (value, rtpbin->priv->autoremove);
2144       break;
2145     case PROP_BUFFER_MODE:
2146       g_value_set_enum (value, rtpbin->buffer_mode);
2147       break;
2148     case PROP_USE_PIPELINE_CLOCK:
2149       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2150       break;
2151     case PROP_DO_SYNC_EVENT:
2152       g_value_set_boolean (value, rtpbin->send_sync_event);
2153       break;
2154     case PROP_DO_RETRANSMISSION:
2155       GST_RTP_BIN_LOCK (rtpbin);
2156       g_value_set_boolean (value, rtpbin->do_retransmission);
2157       GST_RTP_BIN_UNLOCK (rtpbin);
2158       break;
2159     default:
2160       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2161       break;
2162   }
2163 }
2164
2165 static void
2166 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2167 {
2168   GstRtpBin *rtpbin;
2169
2170   rtpbin = GST_RTP_BIN (bin);
2171
2172   switch (GST_MESSAGE_TYPE (message)) {
2173     case GST_MESSAGE_ELEMENT:
2174     {
2175       const GstStructure *s = gst_message_get_structure (message);
2176
2177       /* we change the structure name and add the session ID to it */
2178       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2179         GstRtpBinSession *sess;
2180
2181         /* find the session we set it as object data */
2182         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2183             "GstRTPBin.session");
2184
2185         if (G_LIKELY (sess)) {
2186           message = gst_message_make_writable (message);
2187           s = gst_message_get_structure (message);
2188           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2189               sess->id, NULL);
2190         }
2191       }
2192       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2193       break;
2194     }
2195     case GST_MESSAGE_BUFFERING:
2196     {
2197       gint percent;
2198       gint min_percent = 100;
2199       GSList *sessions, *streams;
2200       GstRtpBinStream *stream;
2201       gboolean change = FALSE, active = FALSE;
2202       GstClockTime min_out_time;
2203       GstBufferingMode mode;
2204       gint avg_in, avg_out;
2205       gint64 buffering_left;
2206
2207       gst_message_parse_buffering (message, &percent);
2208       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2209           &buffering_left);
2210
2211       stream =
2212           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2213           "GstRTPBin.stream");
2214
2215       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2216
2217       /* get the stream */
2218       if (G_LIKELY (stream)) {
2219         GST_RTP_BIN_LOCK (rtpbin);
2220         /* fill in the percent */
2221         stream->percent = percent;
2222
2223         /* calculate the min value for all streams */
2224         for (sessions = rtpbin->sessions; sessions;
2225             sessions = g_slist_next (sessions)) {
2226           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2227
2228           GST_RTP_SESSION_LOCK (session);
2229           if (session->streams) {
2230             for (streams = session->streams; streams;
2231                 streams = g_slist_next (streams)) {
2232               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2233
2234               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2235                   stream->percent);
2236
2237               /* find min percent */
2238               if (min_percent > stream->percent)
2239                 min_percent = stream->percent;
2240             }
2241           } else {
2242             GST_INFO_OBJECT (bin,
2243                 "session has no streams, setting min_percent to 0");
2244             min_percent = 0;
2245           }
2246           GST_RTP_SESSION_UNLOCK (session);
2247         }
2248         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2249
2250         if (rtpbin->buffering) {
2251           if (min_percent == 100) {
2252             rtpbin->buffering = FALSE;
2253             active = TRUE;
2254             change = TRUE;
2255           }
2256         } else {
2257           if (min_percent < 100) {
2258             /* pause the streams */
2259             rtpbin->buffering = TRUE;
2260             active = FALSE;
2261             change = TRUE;
2262           }
2263         }
2264         GST_RTP_BIN_UNLOCK (rtpbin);
2265
2266         gst_message_unref (message);
2267
2268         /* make a new buffering message with the min value */
2269         message =
2270             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2271         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2272             buffering_left);
2273
2274         if (G_UNLIKELY (change)) {
2275           GstClock *clock;
2276           guint64 running_time = 0;
2277           guint64 offset = 0;
2278
2279           /* figure out the running time when we have a clock */
2280           if (G_LIKELY ((clock =
2281                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2282             guint64 now, base_time;
2283
2284             now = gst_clock_get_time (clock);
2285             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2286             running_time = now - base_time;
2287             gst_object_unref (clock);
2288           }
2289           GST_DEBUG_OBJECT (bin,
2290               "running time now %" GST_TIME_FORMAT,
2291               GST_TIME_ARGS (running_time));
2292
2293           GST_RTP_BIN_LOCK (rtpbin);
2294
2295           /* when we reactivate, calculate the offsets so that all streams have
2296            * an output time that is at least as big as the running_time */
2297           offset = 0;
2298           if (active) {
2299             if (running_time > rtpbin->buffer_start) {
2300               offset = running_time - rtpbin->buffer_start;
2301               if (offset >= rtpbin->latency_ns)
2302                 offset -= rtpbin->latency_ns;
2303               else
2304                 offset = 0;
2305             }
2306           }
2307
2308           /* pause all streams */
2309           min_out_time = -1;
2310           for (sessions = rtpbin->sessions; sessions;
2311               sessions = g_slist_next (sessions)) {
2312             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2313
2314             GST_RTP_SESSION_LOCK (session);
2315             for (streams = session->streams; streams;
2316                 streams = g_slist_next (streams)) {
2317               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2318               GstElement *element = stream->buffer;
2319               guint64 last_out;
2320
2321               g_signal_emit_by_name (element, "set-active", active, offset,
2322                   &last_out);
2323
2324               if (!active) {
2325                 g_object_get (element, "percent", &stream->percent, NULL);
2326
2327                 if (last_out == -1)
2328                   last_out = 0;
2329                 if (min_out_time == -1 || last_out < min_out_time)
2330                   min_out_time = last_out;
2331               }
2332
2333               GST_DEBUG_OBJECT (bin,
2334                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2335                   GST_TIME_FORMAT ", percent %d", element, active,
2336                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2337                   stream->percent);
2338             }
2339             GST_RTP_SESSION_UNLOCK (session);
2340           }
2341           GST_DEBUG_OBJECT (bin,
2342               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2343
2344           /* the buffer_start is the min out time of all paused jitterbuffers */
2345           if (!active)
2346             rtpbin->buffer_start = min_out_time;
2347
2348           GST_RTP_BIN_UNLOCK (rtpbin);
2349         }
2350       }
2351       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2352       break;
2353     }
2354     default:
2355     {
2356       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2357       break;
2358     }
2359   }
2360 }
2361
2362 static GstStateChangeReturn
2363 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2364 {
2365   GstStateChangeReturn res;
2366   GstRtpBin *rtpbin;
2367   GstRtpBinPrivate *priv;
2368
2369   rtpbin = GST_RTP_BIN (element);
2370   priv = rtpbin->priv;
2371
2372   switch (transition) {
2373     case GST_STATE_CHANGE_NULL_TO_READY:
2374       break;
2375     case GST_STATE_CHANGE_READY_TO_PAUSED:
2376       priv->last_unix = 0;
2377       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2378       g_atomic_int_set (&priv->shutdown, 0);
2379       break;
2380     case GST_STATE_CHANGE_PAUSED_TO_READY:
2381       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2382       g_atomic_int_set (&priv->shutdown, 1);
2383       /* wait for all callbacks to end by taking the lock. No new callbacks will
2384        * be able to happen as we set the shutdown flag. */
2385       GST_RTP_BIN_DYN_LOCK (rtpbin);
2386       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2387       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2388       break;
2389     default:
2390       break;
2391   }
2392
2393   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2394
2395   switch (transition) {
2396     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2397       break;
2398     case GST_STATE_CHANGE_PAUSED_TO_READY:
2399       break;
2400     case GST_STATE_CHANGE_READY_TO_NULL:
2401       break;
2402     default:
2403       break;
2404   }
2405   return res;
2406 }
2407
2408 /* a new pad (SSRC) was created in @session. This signal is emited from the
2409  * payload demuxer. */
2410 static void
2411 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2412     GstRtpBinStream * stream)
2413 {
2414   GstRtpBin *rtpbin;
2415   GstElementClass *klass;
2416   GstPadTemplate *templ;
2417   gchar *padname;
2418   GstPad *gpad;
2419
2420   rtpbin = stream->bin;
2421
2422   GST_DEBUG ("new payload pad %d", pt);
2423
2424   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2425
2426   /* ghost the pad to the parent */
2427   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2428   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2429   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2430       stream->session->id, stream->ssrc, pt);
2431   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2432   g_free (padname);
2433   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2434
2435   gst_pad_set_active (gpad, TRUE);
2436   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2437
2438   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2439
2440   return;
2441
2442 shutdown:
2443   {
2444     GST_DEBUG ("ignoring, we are shutting down");
2445     return;
2446   }
2447 }
2448
2449 static void
2450 payload_pad_removed (GstElement * element, GstPad * pad,
2451     GstRtpBinStream * stream)
2452 {
2453   GstRtpBin *rtpbin;
2454   GstPad *gpad;
2455
2456   rtpbin = stream->bin;
2457
2458   GST_DEBUG ("payload pad removed");
2459
2460   GST_RTP_BIN_DYN_LOCK (rtpbin);
2461   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2462     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2463
2464     gst_pad_set_active (gpad, FALSE);
2465     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2466   }
2467   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2468 }
2469
2470 static GstCaps *
2471 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2472 {
2473   GstRtpBin *rtpbin;
2474   GstCaps *caps;
2475
2476   rtpbin = session->bin;
2477
2478   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2479       session->id);
2480
2481   caps = get_pt_map (session, pt);
2482   if (!caps)
2483     goto no_caps;
2484
2485   return caps;
2486
2487   /* ERRORS */
2488 no_caps:
2489   {
2490     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2491     return NULL;
2492   }
2493 }
2494
2495 static void
2496 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2497 {
2498   GST_DEBUG_OBJECT (session->bin,
2499       "emiting signal for pt type changed to %d in session %d", pt,
2500       session->id);
2501
2502   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2503       0, session->id, pt);
2504 }
2505
2506 /* emited when caps changed for the session */
2507 static void
2508 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2509 {
2510   GstRtpBin *bin;
2511   GstCaps *caps;
2512   gint payload;
2513   const GstStructure *s;
2514
2515   bin = session->bin;
2516
2517   g_object_get (pad, "caps", &caps, NULL);
2518
2519   if (caps == NULL)
2520     return;
2521
2522   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2523
2524   s = gst_caps_get_structure (caps, 0);
2525
2526   /* get payload, finish when it's not there */
2527   if (!gst_structure_get_int (s, "payload", &payload))
2528     return;
2529
2530   GST_RTP_SESSION_LOCK (session);
2531   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2532   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2533   GST_RTP_SESSION_UNLOCK (session);
2534 }
2535
2536 /* a new pad (SSRC) was created in @session */
2537 static void
2538 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2539     GstRtpBinSession * session)
2540 {
2541   GstRtpBin *rtpbin;
2542   GstRtpBinStream *stream;
2543   GstPad *sinkpad, *srcpad;
2544   gchar *padname;
2545
2546   rtpbin = session->bin;
2547
2548   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2549       GST_DEBUG_PAD_NAME (pad));
2550
2551   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2552
2553   GST_RTP_SESSION_LOCK (session);
2554
2555   /* create new stream */
2556   stream = create_stream (session, ssrc);
2557   if (!stream)
2558     goto no_stream;
2559
2560   /* get pad and link */
2561   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2562   padname = g_strdup_printf ("src_%u", ssrc);
2563   srcpad = gst_element_get_static_pad (element, padname);
2564   g_free (padname);
2565   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2566   gst_pad_link (srcpad, sinkpad);
2567   gst_object_unref (sinkpad);
2568   gst_object_unref (srcpad);
2569
2570   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2571   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2572   srcpad = gst_element_get_static_pad (element, padname);
2573   g_free (padname);
2574   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2575   gst_pad_link (srcpad, sinkpad);
2576   gst_object_unref (sinkpad);
2577   gst_object_unref (srcpad);
2578
2579   /* connect to the RTCP sync signal from the jitterbuffer */
2580   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2581   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2582       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2583
2584   if (stream->demux) {
2585     /* connect to the new-pad signal of the payload demuxer, this will expose the
2586      * new pad by ghosting it. */
2587     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2588         "new-payload-type", (GCallback) new_payload_found, stream);
2589     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2590         "pad-removed", (GCallback) payload_pad_removed, stream);
2591
2592     /* connect to the request-pt-map signal. This signal will be emited by the
2593      * demuxer so that it can apply a proper caps on the buffers for the
2594      * depayloaders. */
2595     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2596         "request-pt-map", (GCallback) pt_map_requested, session);
2597     /* connect to the  signal so it can be forwarded. */
2598     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2599         "payload-type-change", (GCallback) payload_type_change, session);
2600   } else {
2601     /* add gstrtpjitterbuffer src pad to pads */
2602     GstElementClass *klass;
2603     GstPadTemplate *templ;
2604     gchar *padname;
2605     GstPad *gpad, *pad;
2606
2607     pad = gst_element_get_static_pad (stream->buffer, "src");
2608
2609     /* ghost the pad to the parent */
2610     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2611     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2612     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2613         stream->session->id, stream->ssrc, 255);
2614     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2615     g_free (padname);
2616
2617     gst_pad_set_active (gpad, TRUE);
2618     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2619
2620     gst_object_unref (pad);
2621   }
2622
2623   GST_RTP_SESSION_UNLOCK (session);
2624   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2625
2626   return;
2627
2628   /* ERRORS */
2629 shutdown:
2630   {
2631     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2632     return;
2633   }
2634 no_stream:
2635   {
2636     GST_RTP_SESSION_UNLOCK (session);
2637     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2638     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2639     return;
2640   }
2641 }
2642
2643 /* Create a pad for receiving RTP for the session in @name. Must be called with
2644  * RTP_BIN_LOCK.
2645  */
2646 static GstPad *
2647 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2648 {
2649   GstPad *sinkdpad;
2650   guint sessid;
2651   GstRtpBinSession *session;
2652   GstPadLinkReturn lres;
2653
2654   /* first get the session number */
2655   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
2656     goto no_name;
2657
2658   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2659
2660   /* get or create session */
2661   session = find_session_by_id (rtpbin, sessid);
2662   if (!session) {
2663     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2664     /* create session now */
2665     session = create_session (rtpbin, sessid);
2666     if (session == NULL)
2667       goto create_error;
2668   }
2669
2670   /* check if pad was requested */
2671   if (session->recv_rtp_sink_ghost != NULL)
2672     return session->recv_rtp_sink_ghost;
2673
2674   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
2675   /* get recv_rtp pad and store */
2676   session->recv_rtp_sink =
2677       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2678   if (session->recv_rtp_sink == NULL)
2679     goto pad_failed;
2680
2681   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2682       (GCallback) caps_changed, session);
2683
2684   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2685   /* get srcpad, link to SSRCDemux */
2686   session->recv_rtp_src =
2687       gst_element_get_static_pad (session->session, "recv_rtp_src");
2688   if (session->recv_rtp_src == NULL)
2689     goto pad_failed;
2690
2691   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2692   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2693   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2694   lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
2695   gst_object_unref (sinkdpad);
2696   if (lres != GST_PAD_LINK_OK)
2697     goto link_failed;
2698
2699   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2700   session->demux_newpad_sig = g_signal_connect (session->demux,
2701       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2702   session->demux_padremoved_sig = g_signal_connect (session->demux,
2703       "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
2704
2705   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2706   session->recv_rtp_sink_ghost =
2707       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
2708   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2709   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2710
2711   return session->recv_rtp_sink_ghost;
2712
2713   /* ERRORS */
2714 no_name:
2715   {
2716     g_warning ("rtpbin: invalid name given");
2717     return NULL;
2718   }
2719 create_error:
2720   {
2721     /* create_session already warned */
2722     return NULL;
2723   }
2724 pad_failed:
2725   {
2726     g_warning ("rtpbin: failed to get session pad");
2727     return NULL;
2728   }
2729 link_failed:
2730   {
2731     g_warning ("rtpbin: failed to link pads");
2732     return NULL;
2733   }
2734 }
2735
2736 static void
2737 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2738 {
2739   if (session->demux_newpad_sig) {
2740     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
2741     session->demux_newpad_sig = 0;
2742   }
2743   if (session->demux_padremoved_sig) {
2744     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
2745     session->demux_padremoved_sig = 0;
2746   }
2747   if (session->recv_rtp_src) {
2748     gst_object_unref (session->recv_rtp_src);
2749     session->recv_rtp_src = NULL;
2750   }
2751   if (session->recv_rtp_sink) {
2752     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
2753     gst_object_unref (session->recv_rtp_sink);
2754     session->recv_rtp_sink = NULL;
2755   }
2756   if (session->recv_rtp_sink_ghost) {
2757     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
2758     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2759         session->recv_rtp_sink_ghost);
2760     session->recv_rtp_sink_ghost = NULL;
2761   }
2762 }
2763
2764 /* Create a pad for receiving RTCP for the session in @name. Must be called with
2765  * RTP_BIN_LOCK.
2766  */
2767 static GstPad *
2768 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
2769     const gchar * name)
2770 {
2771   guint sessid;
2772   GstRtpBinSession *session;
2773   GstPad *sinkdpad;
2774   GstPadLinkReturn lres;
2775
2776   /* first get the session number */
2777   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
2778     goto no_name;
2779
2780   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2781
2782   /* get or create the session */
2783   session = find_session_by_id (rtpbin, sessid);
2784   if (!session) {
2785     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2786     /* create session now */
2787     session = create_session (rtpbin, sessid);
2788     if (session == NULL)
2789       goto create_error;
2790   }
2791
2792   /* check if pad was requested */
2793   if (session->recv_rtcp_sink_ghost != NULL)
2794     return session->recv_rtcp_sink_ghost;
2795
2796   /* get recv_rtp pad and store */
2797   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
2798   session->recv_rtcp_sink =
2799       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
2800   if (session->recv_rtcp_sink == NULL)
2801     goto pad_failed;
2802
2803   /* get srcpad, link to SSRCDemux */
2804   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
2805   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
2806   if (session->sync_src == NULL)
2807     goto pad_failed;
2808
2809   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
2810   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
2811   lres = gst_pad_link (session->sync_src, sinkdpad);
2812   gst_object_unref (sinkdpad);
2813   if (lres != GST_PAD_LINK_OK)
2814     goto link_failed;
2815
2816   session->recv_rtcp_sink_ghost =
2817       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
2818   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
2819   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
2820       session->recv_rtcp_sink_ghost);
2821
2822   return session->recv_rtcp_sink_ghost;
2823
2824   /* ERRORS */
2825 no_name:
2826   {
2827     g_warning ("rtpbin: invalid name given");
2828     return NULL;
2829   }
2830 create_error:
2831   {
2832     /* create_session already warned */
2833     return NULL;
2834   }
2835 pad_failed:
2836   {
2837     g_warning ("rtpbin: failed to get session pad");
2838     return NULL;
2839   }
2840 link_failed:
2841   {
2842     g_warning ("rtpbin: failed to link pads");
2843     return NULL;
2844   }
2845 }
2846
2847 static void
2848 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2849 {
2850   if (session->recv_rtcp_sink_ghost) {
2851     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
2852     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2853         session->recv_rtcp_sink_ghost);
2854     session->recv_rtcp_sink_ghost = NULL;
2855   }
2856   if (session->sync_src) {
2857     /* releasing the request pad should also unref the sync pad */
2858     gst_object_unref (session->sync_src);
2859     session->sync_src = NULL;
2860   }
2861   if (session->recv_rtcp_sink) {
2862     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
2863     gst_object_unref (session->recv_rtcp_sink);
2864     session->recv_rtcp_sink = NULL;
2865   }
2866 }
2867
2868 /* Create a pad for sending RTP for the session in @name. Must be called with
2869  * RTP_BIN_LOCK.
2870  */
2871 static GstPad *
2872 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2873 {
2874   gchar *gname;
2875   guint sessid;
2876   GstRtpBinSession *session;
2877   GstElementClass *klass;
2878
2879   /* first get the session number */
2880   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
2881     goto no_name;
2882
2883   /* get or create session */
2884   session = find_session_by_id (rtpbin, sessid);
2885   if (!session) {
2886     /* create session now */
2887     session = create_session (rtpbin, sessid);
2888     if (session == NULL)
2889       goto create_error;
2890   }
2891
2892   /* check if pad was requested */
2893   if (session->send_rtp_sink_ghost != NULL)
2894     return session->send_rtp_sink_ghost;
2895
2896   /* get send_rtp pad and store */
2897   session->send_rtp_sink =
2898       gst_element_get_request_pad (session->session, "send_rtp_sink");
2899   if (session->send_rtp_sink == NULL)
2900     goto pad_failed;
2901
2902   session->send_rtp_sink_ghost =
2903       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
2904   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
2905   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
2906
2907   /* get srcpad */
2908   session->send_rtp_src =
2909       gst_element_get_static_pad (session->session, "send_rtp_src");
2910   if (session->send_rtp_src == NULL)
2911     goto no_srcpad;
2912
2913   /* ghost the new source pad */
2914   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2915   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
2916   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
2917   session->send_rtp_src_ghost =
2918       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
2919   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
2920   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
2921   g_free (gname);
2922
2923   return session->send_rtp_sink_ghost;
2924
2925   /* ERRORS */
2926 no_name:
2927   {
2928     g_warning ("rtpbin: invalid name given");
2929     return NULL;
2930   }
2931 create_error:
2932   {
2933     /* create_session already warned */
2934     return NULL;
2935   }
2936 pad_failed:
2937   {
2938     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
2939     return NULL;
2940   }
2941 no_srcpad:
2942   {
2943     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
2944     return NULL;
2945   }
2946 }
2947
2948 static void
2949 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
2950 {
2951   if (session->send_rtp_src_ghost) {
2952     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
2953     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2954         session->send_rtp_src_ghost);
2955     session->send_rtp_src_ghost = NULL;
2956   }
2957   if (session->send_rtp_src) {
2958     gst_object_unref (session->send_rtp_src);
2959     session->send_rtp_src = NULL;
2960   }
2961   if (session->send_rtp_sink) {
2962     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
2963         session->send_rtp_sink);
2964     gst_object_unref (session->send_rtp_sink);
2965     session->send_rtp_sink = NULL;
2966   }
2967   if (session->send_rtp_sink_ghost) {
2968     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
2969     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
2970         session->send_rtp_sink_ghost);
2971     session->send_rtp_sink_ghost = NULL;
2972   }
2973 }
2974
2975 /* Create a pad for sending RTCP for the session in @name. Must be called with
2976  * RTP_BIN_LOCK.
2977  */
2978 static GstPad *
2979 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2980 {
2981   guint sessid;
2982   GstRtpBinSession *session;
2983
2984   /* first get the session number */
2985   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
2986     goto no_name;
2987
2988   /* get or create session */
2989   session = find_session_by_id (rtpbin, sessid);
2990   if (!session)
2991     goto no_session;
2992
2993   /* check if pad was requested */
2994   if (session->send_rtcp_src_ghost != NULL)
2995     return session->send_rtcp_src_ghost;
2996
2997   /* get rtcp_src pad and store */
2998   session->send_rtcp_src =
2999       gst_element_get_request_pad (session->session, "send_rtcp_src");
3000   if (session->send_rtcp_src == NULL)
3001     goto pad_failed;
3002
3003   session->send_rtcp_src_ghost =
3004       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
3005   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
3006   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
3007
3008   return session->send_rtcp_src_ghost;
3009
3010   /* ERRORS */
3011 no_name:
3012   {
3013     g_warning ("rtpbin: invalid name given");
3014     return NULL;
3015   }
3016 no_session:
3017   {
3018     g_warning ("rtpbin: session with id %d does not exist", sessid);
3019     return NULL;
3020   }
3021 pad_failed:
3022   {
3023     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
3024     return NULL;
3025   }
3026 }
3027
3028 static void
3029 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3030 {
3031   if (session->send_rtcp_src_ghost) {
3032     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
3033     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3034         session->send_rtcp_src_ghost);
3035     session->send_rtcp_src_ghost = NULL;
3036   }
3037   if (session->send_rtcp_src) {
3038     gst_element_release_request_pad (session->session, session->send_rtcp_src);
3039     gst_object_unref (session->send_rtcp_src);
3040     session->send_rtcp_src = NULL;
3041   }
3042 }
3043
3044 /* If the requested name is NULL we should create a name with
3045  * the session number assuming we want the lowest posible session
3046  * with a free pad like the template */
3047 static gchar *
3048 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
3049 {
3050   gboolean name_found = FALSE;
3051   gint session = 0;
3052   GstIterator *pad_it = NULL;
3053   gchar *pad_name = NULL;
3054   GValue data = { 0, };
3055
3056   GST_DEBUG_OBJECT (element, "find a free pad name for template");
3057   while (!name_found) {
3058     gboolean done = FALSE;
3059
3060     g_free (pad_name);
3061     pad_name = g_strdup_printf (templ->name_template, session++);
3062     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
3063     name_found = TRUE;
3064     while (!done) {
3065       switch (gst_iterator_next (pad_it, &data)) {
3066         case GST_ITERATOR_OK:
3067         {
3068           GstPad *pad;
3069           gchar *name;
3070
3071           pad = g_value_get_object (&data);
3072           name = gst_pad_get_name (pad);
3073
3074           if (strcmp (name, pad_name) == 0) {
3075             done = TRUE;
3076             name_found = FALSE;
3077           }
3078           g_free (name);
3079           g_value_reset (&data);
3080           break;
3081         }
3082         case GST_ITERATOR_ERROR:
3083         case GST_ITERATOR_RESYNC:
3084           /* restart iteration */
3085           done = TRUE;
3086           name_found = FALSE;
3087           session = 0;
3088           break;
3089         case GST_ITERATOR_DONE:
3090           done = TRUE;
3091           break;
3092       }
3093     }
3094     g_value_unset (&data);
3095     gst_iterator_free (pad_it);
3096   }
3097
3098   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3099   return pad_name;
3100 }
3101
3102 /*
3103  */
3104 static GstPad *
3105 gst_rtp_bin_request_new_pad (GstElement * element,
3106     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3107 {
3108   GstRtpBin *rtpbin;
3109   GstElementClass *klass;
3110   GstPad *result;
3111
3112   gchar *pad_name = NULL;
3113
3114   g_return_val_if_fail (templ != NULL, NULL);
3115   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3116
3117   rtpbin = GST_RTP_BIN (element);
3118   klass = GST_ELEMENT_GET_CLASS (element);
3119
3120   GST_RTP_BIN_LOCK (rtpbin);
3121
3122   if (name == NULL) {
3123     /* use a free pad name */
3124     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3125   } else {
3126     /* use the provided name */
3127     pad_name = g_strdup (name);
3128   }
3129
3130   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3131
3132   /* figure out the template */
3133   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3134     result = create_recv_rtp (rtpbin, templ, pad_name);
3135   } else if (templ == gst_element_class_get_pad_template (klass,
3136           "recv_rtcp_sink_%u")) {
3137     result = create_recv_rtcp (rtpbin, templ, pad_name);
3138   } else if (templ == gst_element_class_get_pad_template (klass,
3139           "send_rtp_sink_%u")) {
3140     result = create_send_rtp (rtpbin, templ, pad_name);
3141   } else if (templ == gst_element_class_get_pad_template (klass,
3142           "send_rtcp_src_%u")) {
3143     result = create_rtcp (rtpbin, templ, pad_name);
3144   } else
3145     goto wrong_template;
3146
3147   g_free (pad_name);
3148   GST_RTP_BIN_UNLOCK (rtpbin);
3149
3150   return result;
3151
3152   /* ERRORS */
3153 wrong_template:
3154   {
3155     g_free (pad_name);
3156     GST_RTP_BIN_UNLOCK (rtpbin);
3157     g_warning ("rtpbin: this is not our template");
3158     return NULL;
3159   }
3160 }
3161
3162 static void
3163 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3164 {
3165   GstRtpBinSession *session;
3166   GstRtpBin *rtpbin;
3167
3168   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3169   g_return_if_fail (GST_IS_RTP_BIN (element));
3170
3171   rtpbin = GST_RTP_BIN (element);
3172
3173   GST_RTP_BIN_LOCK (rtpbin);
3174   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3175       GST_DEBUG_PAD_NAME (pad));
3176
3177   if (!(session = find_session_by_pad (rtpbin, pad)))
3178     goto unknown_pad;
3179
3180   if (session->recv_rtp_sink_ghost == pad) {
3181     remove_recv_rtp (rtpbin, session);
3182   } else if (session->recv_rtcp_sink_ghost == pad) {
3183     remove_recv_rtcp (rtpbin, session);
3184   } else if (session->send_rtp_sink_ghost == pad) {
3185     remove_send_rtp (rtpbin, session);
3186   } else if (session->send_rtcp_src_ghost == pad) {
3187     remove_rtcp (rtpbin, session);
3188   }
3189
3190   /* no more request pads, free the complete session */
3191   if (session->recv_rtp_sink_ghost == NULL
3192       && session->recv_rtcp_sink_ghost == NULL
3193       && session->send_rtp_sink_ghost == NULL
3194       && session->send_rtcp_src_ghost == NULL) {
3195     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3196     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3197     free_session (session, rtpbin);
3198   }
3199   GST_RTP_BIN_UNLOCK (rtpbin);
3200
3201   return;
3202
3203   /* ERROR */
3204 unknown_pad:
3205   {
3206     GST_RTP_BIN_UNLOCK (rtpbin);
3207     g_warning ("rtpbin: %s:%s is not one of our request pads",
3208         GST_DEBUG_PAD_NAME (pad));
3209     return;
3210   }
3211 }