f0b8b4e33aca8f4354b68ad48d8d058f0241a4dc
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-rtpbin
22  * @see_also: rtpjitterbuffer, rtpsession, rtpptdemux, rtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpSsrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  *
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  *
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_\%u pad. The session
32  * number must be specified in the pad name.
33  * Data received on the recv_rtp_sink_\%u pad will be processed in the #GstRtpSession
34  * manager and after being validated forwarded on #GstRtpSsrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpPtDemux element. The #GstRtpPtDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_\%u_\%u_\%u on
39  * rtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  *
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_\%u pad. The
43  * session number must be specified in the pad name.
44  *
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_\%u pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  *
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_\%u pad, which will
51  * automatically create a send_rtp_src_\%u pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_\%u pad after updating its internal state.
55  *
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  *
61  * Access to the internal statistics of rtpbin is provided with the
62  * get-internal-session property. This action signal gives access to the
63  * RTPSession object which further provides action signals to retrieve the
64  * internal source and other sources.
65  *
66  * #GstRtpBin also has signals (#GstRtpBin::request-rtp-encoder,
67  * #GstRtpBin::request-rtp-decoder, #GstRtpBin::request-rtcp-encoder and
68  * #GstRtpBin::request-rtp-decoder) to dynamically request for RTP and RTCP encoders
69  * and decoders in order to support SRTP. The encoders must provide the pads
70  * rtp_sink_\%u and rtp_src_\%u for RTP and rtcp_sink_\%u and rtcp_src_\%u for
71  * RTCP. The session number will be used in the pad name. The decoders must provide
72  * rtp_sink and rtp_src for RTP and rtcp_sink and rtcp_src for RTCP. The decoders will
73  * be placed before the #GstRtpSession element, thus they must support SSRC demuxing
74  * internally.
75  *
76  * #GstRtpBin has signals (#GstRtpBin::request-aux-sender and
77  * #GstRtpBin::request-aux-receiver to dynamically request an element that can be
78  * used to create or merge additional RTP streams. AUX elements are needed to
79  * implement FEC or retransmission (such as RFC 4588). An AUX sender must have one
80  * sink_\%u pad that matches the sessionid in the signal and it should have 1 or
81  * more src_\%u pads. For each src_%\u pad, a session will be made (if needed)
82  * and the pad will be linked to the session send_rtp_sink pad. Each session will
83  * then expose its source pad as send_rtp_src_\%u on #GstRtpBin.
84  * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
85  * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
86  * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
87  *
88  * <refsect2>
89  * <title>Example pipelines</title>
90  * |[
91  * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
92  *     rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
93  * ]| Receive RTP data from port 5000 and send to the session 0 in rtpbin.
94  * |[
95  * gst-launch-1.0 rtpbin name=rtpbin \
96  *         v4l2src ! videoconvert ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
97  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
98  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
99  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
100  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
101  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
102  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
103  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
104  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
105  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
106  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
107  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
108  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
109  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
110  * is received on port 5007. Since RTCP packets from the sender should be sent
111  * as soon as possible and do not participate in preroll, sync=false and
112  * async=false is configured on udpsink
113  * |[
114  * gst-launch-1.0 -v rtpbin name=rtpbin                                          \
115  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
116  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
117  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
118  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
119  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
120  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
121  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
122  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
123  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
124  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
125  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
126  * decode and display the video.
127  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
128  * decode and play the audio.
129  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
130  * session 1 on port 5003. These packets will be used for session management and
131  * synchronisation.
132  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
133  * on port 5007.
134  * </refsect2>
135  */
136
137 #ifdef HAVE_CONFIG_H
138 #include "config.h"
139 #endif
140 #include <stdio.h>
141 #include <string.h>
142
143 #include <gst/rtp/gstrtpbuffer.h>
144 #include <gst/rtp/gstrtcpbuffer.h>
145
146 #include "gstrtpbin.h"
147 #include "rtpsession.h"
148 #include "gstrtpsession.h"
149 #include "gstrtpjitterbuffer.h"
150
151 #include <gst/glib-compat-private.h>
152
153 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
154 #define GST_CAT_DEFAULT gst_rtp_bin_debug
155
156 /* sink pads */
157 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
158     GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
159     GST_PAD_SINK,
160     GST_PAD_REQUEST,
161     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
162     );
163
164 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
165     GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
166     GST_PAD_SINK,
167     GST_PAD_REQUEST,
168     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
169     );
170
171 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
172 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%u",
173     GST_PAD_SINK,
174     GST_PAD_REQUEST,
175     GST_STATIC_CAPS ("application/x-rtp")
176     );
177
178 /* src pads */
179 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
180 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
181     GST_PAD_SRC,
182     GST_PAD_SOMETIMES,
183     GST_STATIC_CAPS ("application/x-rtp")
184     );
185
186 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
187     GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%u",
188     GST_PAD_SRC,
189     GST_PAD_REQUEST,
190     GST_STATIC_CAPS ("application/x-rtcp;application/x-srtcp")
191     );
192
193 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
194     GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%u",
195     GST_PAD_SRC,
196     GST_PAD_SOMETIMES,
197     GST_STATIC_CAPS ("application/x-rtp;application/x-srtp")
198     );
199
200 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
201    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
202
203 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock (&(bin)->priv->bin_lock)
204 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock (&(bin)->priv->bin_lock)
205
206 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
207 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock (&(bin)->priv->dyn_lock)
208 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock (&(bin)->priv->dyn_lock)
209
210 /* lock for shutdown */
211 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
212 G_STMT_START {                                   \
213   if (g_atomic_int_get (&bin->priv->shutdown))   \
214     goto label;                                  \
215   GST_RTP_BIN_DYN_LOCK (bin);                    \
216   if (g_atomic_int_get (&bin->priv->shutdown)) { \
217     GST_RTP_BIN_DYN_UNLOCK (bin);                \
218     goto label;                                  \
219   }                                              \
220 } G_STMT_END
221
222 /* unlock for shutdown */
223 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
224   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
225
226 struct _GstRtpBinPrivate
227 {
228   GMutex bin_lock;
229
230   /* lock protecting dynamic adding/removing */
231   GMutex dyn_lock;
232
233   /* if we are shutting down or not */
234   gint shutdown;
235
236   gboolean autoremove;
237
238   /* UNIX (ntp) time of last SR sync used */
239   guint64 last_unix;
240
241   /* list of extra elements */
242   GList *elements;
243 };
244
245 /* signals and args */
246 enum
247 {
248   SIGNAL_REQUEST_PT_MAP,
249   SIGNAL_PAYLOAD_TYPE_CHANGE,
250   SIGNAL_CLEAR_PT_MAP,
251   SIGNAL_RESET_SYNC,
252   SIGNAL_GET_INTERNAL_SESSION,
253
254   SIGNAL_ON_NEW_SSRC,
255   SIGNAL_ON_SSRC_COLLISION,
256   SIGNAL_ON_SSRC_VALIDATED,
257   SIGNAL_ON_SSRC_ACTIVE,
258   SIGNAL_ON_SSRC_SDES,
259   SIGNAL_ON_BYE_SSRC,
260   SIGNAL_ON_BYE_TIMEOUT,
261   SIGNAL_ON_TIMEOUT,
262   SIGNAL_ON_SENDER_TIMEOUT,
263   SIGNAL_ON_NPT_STOP,
264
265   SIGNAL_REQUEST_RTP_ENCODER,
266   SIGNAL_REQUEST_RTP_DECODER,
267   SIGNAL_REQUEST_RTCP_ENCODER,
268   SIGNAL_REQUEST_RTCP_DECODER,
269
270   SIGNAL_NEW_JITTERBUFFER,
271
272   SIGNAL_REQUEST_AUX_SENDER,
273   SIGNAL_REQUEST_AUX_RECEIVER,
274
275   LAST_SIGNAL
276 };
277
278 #define DEFAULT_LATENCY_MS           200
279 #define DEFAULT_DROP_ON_LATENCY      FALSE
280 #define DEFAULT_SDES                 NULL
281 #define DEFAULT_DO_LOST              FALSE
282 #define DEFAULT_IGNORE_PT            FALSE
283 #define DEFAULT_NTP_SYNC             FALSE
284 #define DEFAULT_AUTOREMOVE           FALSE
285 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
286 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
287 #define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
288 #define DEFAULT_RTCP_SYNC_INTERVAL   0
289 #define DEFAULT_DO_SYNC_EVENT        FALSE
290 #define DEFAULT_DO_RETRANSMISSION    FALSE
291
292 enum
293 {
294   PROP_0,
295   PROP_LATENCY,
296   PROP_DROP_ON_LATENCY,
297   PROP_SDES,
298   PROP_DO_LOST,
299   PROP_IGNORE_PT,
300   PROP_NTP_SYNC,
301   PROP_RTCP_SYNC,
302   PROP_RTCP_SYNC_INTERVAL,
303   PROP_AUTOREMOVE,
304   PROP_BUFFER_MODE,
305   PROP_USE_PIPELINE_CLOCK,
306   PROP_DO_SYNC_EVENT,
307   PROP_DO_RETRANSMISSION
308 };
309
310 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
311 static GType
312 gst_rtp_bin_rtcp_sync_get_type (void)
313 {
314   static GType rtcp_sync_type = 0;
315   static const GEnumValue rtcp_sync_types[] = {
316     {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
317     {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
318     {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
319     {0, NULL, NULL},
320   };
321
322   if (!rtcp_sync_type) {
323     rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
324   }
325   return rtcp_sync_type;
326 }
327
328 /* helper objects */
329 typedef struct _GstRtpBinSession GstRtpBinSession;
330 typedef struct _GstRtpBinStream GstRtpBinStream;
331 typedef struct _GstRtpBinClient GstRtpBinClient;
332
333 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
334
335 static GstCaps *pt_map_requested (GstElement * element, guint pt,
336     GstRtpBinSession * session);
337 static void payload_type_change (GstElement * element, guint pt,
338     GstRtpBinSession * session);
339 static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
340 static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
341 static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session);
342 static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session);
343 static void free_client (GstRtpBinClient * client, GstRtpBin * bin);
344 static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin);
345
346 /* Manages the RTP stream for one SSRC.
347  *
348  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
349  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
350  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
351  * together (see below).
352  */
353 struct _GstRtpBinStream
354 {
355   /* the SSRC of this stream */
356   guint32 ssrc;
357
358   /* parent bin */
359   GstRtpBin *bin;
360
361   /* the session this SSRC belongs to */
362   GstRtpBinSession *session;
363
364   /* the jitterbuffer of the SSRC */
365   GstElement *buffer;
366   gulong buffer_handlesync_sig;
367   gulong buffer_ptreq_sig;
368   gulong buffer_ntpstop_sig;
369   gint percent;
370
371   /* the PT demuxer of the SSRC */
372   GstElement *demux;
373   gulong demux_newpad_sig;
374   gulong demux_padremoved_sig;
375   gulong demux_ptreq_sig;
376   gulong demux_ptchange_sig;
377
378   /* if we have calculated a valid rt_delta for this stream */
379   gboolean have_sync;
380   /* mapping to local RTP and NTP time */
381   gint64 rt_delta;
382   gint64 rtp_delta;
383   /* base rtptime in gst time */
384   gint64 clock_base;
385 };
386
387 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock (&(sess)->lock)
388 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock (&(sess)->lock)
389
390 /* Manages the receiving end of the packets.
391  *
392  * There is one such structure for each RTP session (audio/video/...).
393  * We get the RTP/RTCP packets and stuff them into the session manager. From
394  * there they are pushed into an SSRC demuxer that splits the stream based on
395  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
396  * the GstRtpBinStream above).
397  */
398 struct _GstRtpBinSession
399 {
400   /* session id */
401   gint id;
402   /* the parent bin */
403   GstRtpBin *bin;
404   /* the session element */
405   GstElement *session;
406   /* the SSRC demuxer */
407   GstElement *demux;
408   gulong demux_newpad_sig;
409   gulong demux_padremoved_sig;
410
411   GMutex lock;
412
413   /* list of GstRtpBinStream */
414   GSList *streams;
415
416   /* list of elements */
417   GSList *elements;
418
419   /* mapping of payload type to caps */
420   GHashTable *ptmap;
421
422   /* the pads of the session */
423   GstPad *recv_rtp_sink;
424   GstPad *recv_rtp_sink_ghost;
425   GstPad *recv_rtp_src;
426   GstPad *recv_rtcp_sink;
427   GstPad *recv_rtcp_sink_ghost;
428   GstPad *sync_src;
429   GstPad *send_rtp_sink;
430   GstPad *send_rtp_sink_ghost;
431   GstPad *send_rtp_src;
432   GstPad *send_rtp_src_ghost;
433   GstPad *send_rtcp_src;
434   GstPad *send_rtcp_src_ghost;
435 };
436
437 /* Manages the RTP streams that come from one client and should therefore be
438  * synchronized.
439  */
440 struct _GstRtpBinClient
441 {
442   /* the common CNAME for the streams */
443   gchar *cname;
444   guint cname_len;
445
446   /* the streams */
447   guint nstreams;
448   GSList *streams;
449 };
450
451 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
452 static GstRtpBinSession *
453 find_session_by_id (GstRtpBin * rtpbin, gint id)
454 {
455   GSList *walk;
456
457   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
458     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
459
460     if (sess->id == id)
461       return sess;
462   }
463   return NULL;
464 }
465
466 /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
467 static GstRtpBinSession *
468 find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
469 {
470   GSList *walk;
471
472   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
473     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
474
475     if ((sess->recv_rtp_sink_ghost == pad) ||
476         (sess->recv_rtcp_sink_ghost == pad) ||
477         (sess->send_rtp_sink_ghost == pad)
478         || (sess->send_rtcp_src_ghost == pad))
479       return sess;
480   }
481   return NULL;
482 }
483
484 static void
485 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
486 {
487   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
488       sess->id, ssrc);
489 }
490
491 static void
492 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
493 {
494   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
495       sess->id, ssrc);
496 }
497
498 static void
499 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
500 {
501   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
502       sess->id, ssrc);
503 }
504
505 static void
506 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
507 {
508   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
509       sess->id, ssrc);
510 }
511
512 static void
513 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
514 {
515   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
516       sess->id, ssrc);
517 }
518
519 static void
520 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
521 {
522   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
523       sess->id, ssrc);
524 }
525
526 static void
527 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
528 {
529   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
530       sess->id, ssrc);
531
532   if (sess->bin->priv->autoremove)
533     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
534 }
535
536 static void
537 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
538 {
539   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
540       sess->id, ssrc);
541
542   if (sess->bin->priv->autoremove)
543     g_signal_emit_by_name (sess->demux, "clear-ssrc", ssrc, NULL);
544 }
545
546 static void
547 on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
548 {
549   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
550       sess->id, ssrc);
551 }
552
553 static void
554 on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
555 {
556   g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
557       stream->session->id, stream->ssrc);
558 }
559
560 /* must be called with the SESSION lock */
561 static GstRtpBinStream *
562 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
563 {
564   GSList *walk;
565
566   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
567     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
568
569     if (stream->ssrc == ssrc)
570       return stream;
571   }
572   return NULL;
573 }
574
575 static void
576 ssrc_demux_pad_removed (GstElement * element, guint ssrc, GstPad * pad,
577     GstRtpBinSession * session)
578 {
579   GstRtpBinStream *stream = NULL;
580   GstRtpBin *rtpbin;
581
582   rtpbin = session->bin;
583
584   GST_RTP_BIN_LOCK (rtpbin);
585
586   GST_RTP_SESSION_LOCK (session);
587   if ((stream = find_stream_by_ssrc (session, ssrc)))
588     session->streams = g_slist_remove (session->streams, stream);
589   GST_RTP_SESSION_UNLOCK (session);
590
591   if (stream)
592     free_stream (stream, rtpbin);
593
594   GST_RTP_BIN_UNLOCK (rtpbin);
595 }
596
597 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
598 static GstRtpBinSession *
599 create_session (GstRtpBin * rtpbin, gint id)
600 {
601   GstRtpBinSession *sess;
602   GstElement *session, *demux;
603   GstState target;
604
605   if (!(session = gst_element_factory_make ("rtpsession", NULL)))
606     goto no_session;
607
608   if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
609     goto no_demux;
610
611   sess = g_new0 (GstRtpBinSession, 1);
612   g_mutex_init (&sess->lock);
613   sess->id = id;
614   sess->bin = rtpbin;
615   sess->session = session;
616   sess->demux = demux;
617   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
618       (GDestroyNotify) gst_caps_unref);
619   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
620
621   /* configure SDES items */
622   GST_OBJECT_LOCK (rtpbin);
623   g_object_set (session, "sdes", rtpbin->sdes, "use-pipeline-clock",
624       rtpbin->use_pipeline_clock, NULL);
625   GST_OBJECT_UNLOCK (rtpbin);
626
627   /* provide clock_rate to the session manager when needed */
628   g_signal_connect (session, "request-pt-map",
629       (GCallback) pt_map_requested, sess);
630
631   g_signal_connect (sess->session, "on-new-ssrc",
632       (GCallback) on_new_ssrc, sess);
633   g_signal_connect (sess->session, "on-ssrc-collision",
634       (GCallback) on_ssrc_collision, sess);
635   g_signal_connect (sess->session, "on-ssrc-validated",
636       (GCallback) on_ssrc_validated, sess);
637   g_signal_connect (sess->session, "on-ssrc-active",
638       (GCallback) on_ssrc_active, sess);
639   g_signal_connect (sess->session, "on-ssrc-sdes",
640       (GCallback) on_ssrc_sdes, sess);
641   g_signal_connect (sess->session, "on-bye-ssrc",
642       (GCallback) on_bye_ssrc, sess);
643   g_signal_connect (sess->session, "on-bye-timeout",
644       (GCallback) on_bye_timeout, sess);
645   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
646   g_signal_connect (sess->session, "on-sender-timeout",
647       (GCallback) on_sender_timeout, sess);
648
649   gst_bin_add (GST_BIN_CAST (rtpbin), session);
650   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
651
652   GST_OBJECT_LOCK (rtpbin);
653   target = GST_STATE_TARGET (rtpbin);
654   GST_OBJECT_UNLOCK (rtpbin);
655
656   /* change state only to what's needed */
657   gst_element_set_state (demux, target);
658   gst_element_set_state (session, target);
659
660   return sess;
661
662   /* ERRORS */
663 no_session:
664   {
665     g_warning ("rtpbin: could not create rtpsession element");
666     return NULL;
667   }
668 no_demux:
669   {
670     gst_object_unref (session);
671     g_warning ("rtpbin: could not create rtpssrcdemux element");
672     return NULL;
673   }
674 }
675
676 static gboolean
677 bin_manage_element (GstRtpBin * bin, GstElement * element)
678 {
679   GstRtpBinPrivate *priv = bin->priv;
680
681   if (g_list_find (priv->elements, element)) {
682     GST_DEBUG_OBJECT (bin, "requested element %p already in bin", element);
683   } else {
684     GST_DEBUG_OBJECT (bin, "adding requested element %p", element);
685     if (!gst_bin_add (GST_BIN_CAST (bin), element))
686       goto add_failed;
687     if (!gst_element_sync_state_with_parent (element))
688       GST_WARNING_OBJECT (bin, "unable to sync element state with rtpbin");
689   }
690   /* we add the element multiple times, each we need an equal number of
691    * removes to really remove the element from the bin */
692   priv->elements = g_list_prepend (priv->elements, element);
693
694   return TRUE;
695
696   /* ERRORS */
697 add_failed:
698   {
699     GST_WARNING_OBJECT (bin, "unable to add element");
700     return FALSE;
701   }
702 }
703
704 static void
705 remove_bin_element (GstElement * element, GstRtpBin * bin)
706 {
707   GstRtpBinPrivate *priv = bin->priv;
708   GList *find;
709
710   find = g_list_find (priv->elements, element);
711   if (find) {
712     priv->elements = g_list_delete_link (priv->elements, find);
713
714     if (!g_list_find (priv->elements, element))
715       gst_bin_remove (GST_BIN_CAST (bin), element);
716     else
717       gst_object_unref (element);
718   }
719 }
720
721 /* called with RTP_BIN_LOCK */
722 static void
723 free_session (GstRtpBinSession * sess, GstRtpBin * bin)
724 {
725   GST_DEBUG_OBJECT (bin, "freeing session %p", sess);
726
727   gst_element_set_locked_state (sess->demux, TRUE);
728   gst_element_set_locked_state (sess->session, TRUE);
729
730   gst_element_set_state (sess->demux, GST_STATE_NULL);
731   gst_element_set_state (sess->session, GST_STATE_NULL);
732
733   remove_recv_rtp (bin, sess);
734   remove_recv_rtcp (bin, sess);
735   remove_send_rtp (bin, sess);
736   remove_rtcp (bin, sess);
737
738   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
739   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
740
741   g_slist_foreach (sess->elements, (GFunc) remove_bin_element, bin);
742   g_slist_free (sess->elements);
743
744   g_slist_foreach (sess->streams, (GFunc) free_stream, bin);
745   g_slist_free (sess->streams);
746
747   g_mutex_clear (&sess->lock);
748   g_hash_table_destroy (sess->ptmap);
749
750   g_free (sess);
751 }
752
753 /* get the payload type caps for the specific payload @pt in @session */
754 static GstCaps *
755 get_pt_map (GstRtpBinSession * session, guint pt)
756 {
757   GstCaps *caps = NULL;
758   GstRtpBin *bin;
759   GValue ret = { 0 };
760   GValue args[3] = { {0}, {0}, {0} };
761
762   GST_DEBUG ("searching pt %d in cache", pt);
763
764   GST_RTP_SESSION_LOCK (session);
765
766   /* first look in the cache */
767   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
768   if (caps) {
769     gst_caps_ref (caps);
770     goto done;
771   }
772
773   bin = session->bin;
774
775   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
776
777   /* not in cache, send signal to request caps */
778   g_value_init (&args[0], GST_TYPE_ELEMENT);
779   g_value_set_object (&args[0], bin);
780   g_value_init (&args[1], G_TYPE_UINT);
781   g_value_set_uint (&args[1], session->id);
782   g_value_init (&args[2], G_TYPE_UINT);
783   g_value_set_uint (&args[2], pt);
784
785   g_value_init (&ret, GST_TYPE_CAPS);
786   g_value_set_boxed (&ret, NULL);
787
788   GST_RTP_SESSION_UNLOCK (session);
789
790   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
791
792   GST_RTP_SESSION_LOCK (session);
793
794   g_value_unset (&args[0]);
795   g_value_unset (&args[1]);
796   g_value_unset (&args[2]);
797
798   /* look in the cache again because we let the lock go */
799   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
800   if (caps) {
801     gst_caps_ref (caps);
802     g_value_unset (&ret);
803     goto done;
804   }
805
806   caps = (GstCaps *) g_value_dup_boxed (&ret);
807   g_value_unset (&ret);
808   if (!caps)
809     goto no_caps;
810
811   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
812
813   /* store in cache, take additional ref */
814   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
815       gst_caps_ref (caps));
816
817 done:
818   GST_RTP_SESSION_UNLOCK (session);
819
820   return caps;
821
822   /* ERRORS */
823 no_caps:
824   {
825     GST_RTP_SESSION_UNLOCK (session);
826     GST_DEBUG ("no pt map could be obtained");
827     return NULL;
828   }
829 }
830
831 static gboolean
832 return_true (gpointer key, gpointer value, gpointer user_data)
833 {
834   return TRUE;
835 }
836
837 static void
838 gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
839 {
840   GSList *clients, *streams;
841
842   GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");
843
844   GST_RTP_BIN_LOCK (rtpbin);
845   for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
846     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
847
848     /* reset sync on all streams for this client */
849     for (streams = client->streams; streams; streams = g_slist_next (streams)) {
850       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
851
852       /* make use require a new SR packet for this stream before we attempt new
853        * lip-sync */
854       stream->have_sync = FALSE;
855       stream->rt_delta = 0;
856       stream->rtp_delta = 0;
857       stream->clock_base = -100 * GST_SECOND;
858     }
859   }
860   GST_RTP_BIN_UNLOCK (rtpbin);
861 }
862
863 static void
864 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
865 {
866   GSList *sessions, *streams;
867
868   GST_RTP_BIN_LOCK (bin);
869   GST_DEBUG_OBJECT (bin, "clearing pt map");
870   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
871     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
872
873     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
874     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
875
876     GST_RTP_SESSION_LOCK (session);
877     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
878
879     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
880       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
881
882       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
883       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
884       if (stream->demux)
885         g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
886     }
887     GST_RTP_SESSION_UNLOCK (session);
888   }
889   GST_RTP_BIN_UNLOCK (bin);
890
891   /* reset sync too */
892   gst_rtp_bin_reset_sync (bin);
893 }
894
895 static RTPSession *
896 gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
897 {
898   RTPSession *internal_session = NULL;
899   GstRtpBinSession *session;
900
901   GST_RTP_BIN_LOCK (bin);
902   GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
903       session_id);
904   session = find_session_by_id (bin, (gint) session_id);
905   if (session) {
906     g_object_get (session->session, "internal-session", &internal_session,
907         NULL);
908   }
909   GST_RTP_BIN_UNLOCK (bin);
910
911   return internal_session;
912 }
913
914 static GstElement *
915 gst_rtp_bin_request_encoder (GstRtpBin * bin, guint session_id)
916 {
917   GST_DEBUG_OBJECT (bin, "return NULL encoder");
918   return NULL;
919 }
920
921 static GstElement *
922 gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
923 {
924   GST_DEBUG_OBJECT (bin, "return NULL decoder");
925   return NULL;
926 }
927
928 static void
929 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
930     const gchar * name, const GValue * value)
931 {
932   GSList *sessions, *streams;
933
934   GST_RTP_BIN_LOCK (bin);
935   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
936     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
937
938     GST_RTP_SESSION_LOCK (session);
939     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
940       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
941
942       g_object_set_property (G_OBJECT (stream->buffer), name, value);
943     }
944     GST_RTP_SESSION_UNLOCK (session);
945   }
946   GST_RTP_BIN_UNLOCK (bin);
947 }
948
949 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
950 static GstRtpBinClient *
951 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
952 {
953   GstRtpBinClient *result = NULL;
954   GSList *walk;
955
956   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
957     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
958
959     if (len != client->cname_len)
960       continue;
961
962     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
963       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
964           client->cname);
965       result = client;
966       break;
967     }
968   }
969
970   /* nothing found, create one */
971   if (result == NULL) {
972     result = g_new0 (GstRtpBinClient, 1);
973     result->cname = g_strndup ((gchar *) data, len);
974     result->cname_len = len;
975     bin->clients = g_slist_prepend (bin->clients, result);
976     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
977         result->cname);
978   }
979   return result;
980 }
981
982 static void
983 free_client (GstRtpBinClient * client, GstRtpBin * bin)
984 {
985   GST_DEBUG_OBJECT (bin, "freeing client %p", client);
986   g_slist_free (client->streams);
987   g_free (client->cname);
988   g_free (client);
989 }
990
991 static void
992 get_current_times (GstRtpBin * bin, GstClockTime * running_time,
993     guint64 * ntpnstime)
994 {
995   guint64 ntpns;
996   GstClock *clock;
997   GstClockTime base_time, rt, clock_time;
998
999   GST_OBJECT_LOCK (bin);
1000   if ((clock = GST_ELEMENT_CLOCK (bin))) {
1001     base_time = GST_ELEMENT_CAST (bin)->base_time;
1002     gst_object_ref (clock);
1003     GST_OBJECT_UNLOCK (bin);
1004
1005     clock_time = gst_clock_get_time (clock);
1006
1007     if (bin->use_pipeline_clock) {
1008       ntpns = clock_time - base_time;
1009     } else {
1010       GTimeVal current;
1011
1012       /* get current NTP time */
1013       g_get_current_time (&current);
1014       ntpns = GST_TIMEVAL_TO_TIME (current);
1015     }
1016
1017     /* add constant to convert from 1970 based time to 1900 based time */
1018     ntpns += (2208988800LL * GST_SECOND);
1019
1020     /* get current clock time and convert to running time */
1021     rt = clock_time - base_time;
1022
1023     gst_object_unref (clock);
1024   } else {
1025     GST_OBJECT_UNLOCK (bin);
1026     rt = -1;
1027     ntpns = -1;
1028   }
1029   if (running_time)
1030     *running_time = rt;
1031   if (ntpnstime)
1032     *ntpnstime = ntpns;
1033 }
1034
1035 static void
1036 stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
1037     gint64 ts_offset, gboolean check)
1038 {
1039   gint64 prev_ts_offset;
1040
1041   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
1042
1043   /* delta changed, see how much */
1044   if (prev_ts_offset != ts_offset) {
1045     gint64 diff;
1046
1047     diff = prev_ts_offset - ts_offset;
1048
1049     GST_DEBUG_OBJECT (bin,
1050         "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
1051         ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
1052
1053     if (check) {
1054       /* only change diff when it changed more than 4 milliseconds. This
1055        * compensates for rounding errors in NTP to RTP timestamp
1056        * conversions */
1057       if (ABS (diff) < 4 * GST_MSECOND) {
1058         GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
1059         return;
1060       }
1061       if (ABS (diff) > (3 * GST_SECOND)) {
1062         GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
1063         return;
1064       }
1065     }
1066     g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
1067   }
1068   GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
1069       stream->ssrc, ts_offset);
1070 }
1071
1072 static void
1073 gst_rtp_bin_send_sync_event (GstRtpBinStream * stream)
1074 {
1075   if (stream->bin->send_sync_event) {
1076     GstEvent *event;
1077     GstPad *srcpad;
1078
1079     GST_DEBUG_OBJECT (stream->bin,
1080         "sending GstRTCPSRReceived event downstream");
1081
1082     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1083         gst_structure_new_empty ("GstRTCPSRReceived"));
1084
1085     srcpad = gst_element_get_static_pad (stream->buffer, "src");
1086     gst_pad_push_event (srcpad, event);
1087     gst_object_unref (srcpad);
1088   }
1089 }
1090
1091 /* associate a stream to the given CNAME. This will make sure all streams for
1092  * that CNAME are synchronized together.
1093  * Must be called with GST_RTP_BIN_LOCK */
1094 static void
1095 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
1096     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
1097     guint64 base_rtptime, guint64 base_time, guint clock_rate,
1098     gint64 rtp_clock_base)
1099 {
1100   GstRtpBinClient *client;
1101   gboolean created;
1102   GSList *walk;
1103   guint64 local_rt;
1104   guint64 local_rtp;
1105   GstClockTime running_time;
1106   guint64 ntpnstime;
1107   gint64 ntpdiff, rtdiff;
1108   guint64 last_unix;
1109
1110   /* first find or create the CNAME */
1111   client = get_client (bin, len, data, &created);
1112
1113   /* find stream in the client */
1114   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1115     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1116
1117     if (ostream == stream)
1118       break;
1119   }
1120   /* not found, add it to the list */
1121   if (walk == NULL) {
1122     GST_DEBUG_OBJECT (bin,
1123         "new association of SSRC %08x with client %p with CNAME %s",
1124         stream->ssrc, client, client->cname);
1125     client->streams = g_slist_prepend (client->streams, stream);
1126     client->nstreams++;
1127   } else {
1128     GST_DEBUG_OBJECT (bin,
1129         "found association of SSRC %08x with client %p with CNAME %s",
1130         stream->ssrc, client, client->cname);
1131   }
1132
1133   if (!GST_CLOCK_TIME_IS_VALID (last_extrtptime)) {
1134     GST_DEBUG_OBJECT (bin, "invalidated sync data");
1135     if (bin->rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1136       /* we don't need that data, so carry on,
1137        * but make some values look saner */
1138       last_extrtptime = base_rtptime;
1139     } else {
1140       /* nothing we can do with this data in this case */
1141       GST_DEBUG_OBJECT (bin, "bailing out");
1142       return;
1143     }
1144   }
1145
1146   /* Take the extended rtptime we found in the SR packet and map it to the
1147    * local rtptime. The local rtp time is used to construct timestamps on the
1148    * buffers so we will calculate what running_time corresponds to the RTP
1149    * timestamp in the SR packet. */
1150   local_rtp = last_extrtptime - base_rtptime;
1151
1152   GST_DEBUG_OBJECT (bin,
1153       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
1154       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
1155       "clock-base %" G_GINT64_FORMAT, base_rtptime,
1156       last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
1157
1158   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
1159    * same conversion that a jitterbuffer would use to convert an rtp timestamp
1160    * into a corresponding gstreamer timestamp. Note that the base_time also
1161    * contains the drift between sender and receiver. */
1162   local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
1163   local_rt += base_time;
1164
1165   /* convert ntptime to unix time since 1900 */
1166   last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
1167       (G_GINT64_CONSTANT (1) << 32));
1168
1169   stream->have_sync = TRUE;
1170
1171   GST_DEBUG_OBJECT (bin,
1172       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
1173       local_rt, last_unix);
1174
1175   /* recalc inter stream playout offset, but only if there is more than one
1176    * stream or we're doing NTP sync. */
1177   if (bin->ntp_sync) {
1178     /* For NTP sync we need to first get a snapshot of running_time and NTP
1179      * time. We know at what running_time we play a certain RTP time, we also
1180      * calculated when we would play the RTP time in the SR packet. Now we need
1181      * to know how the running_time and the NTP time relate to eachother. */
1182     get_current_times (bin, &running_time, &ntpnstime);
1183
1184     /* see how far away the NTP time is. This is the difference between the
1185      * current NTP time and the NTP time in the last SR packet. */
1186     ntpdiff = ntpnstime - last_unix;
1187     /* see how far away the running_time is. This is the difference between the
1188      * current running_time and the running_time of the RTP timestamp in the
1189      * last SR packet. */
1190     rtdiff = running_time - local_rt;
1191
1192     GST_DEBUG_OBJECT (bin,
1193         "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
1194         ntpnstime, last_unix);
1195     GST_DEBUG_OBJECT (bin,
1196         "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
1197         rtdiff);
1198
1199     /* combine to get the final diff to apply to the running_time */
1200     stream->rt_delta = rtdiff - ntpdiff;
1201
1202     stream_set_ts_offset (bin, stream, stream->rt_delta, FALSE);
1203   } else {
1204     gint64 min, rtp_min, clock_base = stream->clock_base;
1205     gboolean all_sync, use_rtp;
1206     gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
1207
1208     /* calculate delta between server and receiver. last_unix is created by
1209      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
1210      * delta expresses the difference to our timeline and the server timeline. The
1211      * difference in itself doesn't mean much but we can combine the delta of
1212      * multiple streams to create a stream specific offset. */
1213     stream->rt_delta = last_unix - local_rt;
1214
1215     /* calculate the min of all deltas, ignoring streams that did not yet have a
1216      * valid rt_delta because we did not yet receive an SR packet for those
1217      * streams.
1218      * We calculate the mininum because we would like to only apply positive
1219      * offsets to streams, delaying their playback instead of trying to speed up
1220      * other streams (which might be imposible when we have to create negative
1221      * latencies).
1222      * The stream that has the smallest diff is selected as the reference stream,
1223      * all other streams will have a positive offset to this difference. */
1224
1225     /* some alternative setting allow ignoring RTCP as much as possible,
1226      * for servers generating bogus ntp timeline */
1227     min = rtp_min = G_MAXINT64;
1228     use_rtp = FALSE;
1229     if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
1230       guint64 ext_base;
1231
1232       use_rtp = TRUE;
1233       /* signed version for convienience */
1234       clock_base = base_rtptime;
1235       /* deal with possible wrap-around */
1236       ext_base = base_rtptime;
1237       rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
1238       /* sanity check; base rtp and provided clock_base should be close */
1239       if (rtp_clock_base >= clock_base) {
1240         if (rtp_clock_base - clock_base < 10 * clock_rate) {
1241           rtp_clock_base = base_time +
1242               gst_util_uint64_scale_int (rtp_clock_base - clock_base,
1243               GST_SECOND, clock_rate);
1244         } else {
1245           use_rtp = FALSE;
1246         }
1247       } else {
1248         if (clock_base - rtp_clock_base < 10 * clock_rate) {
1249           rtp_clock_base = base_time -
1250               gst_util_uint64_scale_int (clock_base - rtp_clock_base,
1251               GST_SECOND, clock_rate);
1252         } else {
1253           use_rtp = FALSE;
1254         }
1255       }
1256       /* warn and bail for clarity out if no sane values */
1257       if (!use_rtp) {
1258         GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
1259         return;
1260       }
1261       /* store to track changes */
1262       clock_base = rtp_clock_base;
1263       /* generate a fake as before,
1264        * now equating rtptime obtained from RTP-Info,
1265        * where the large time represent the otherwise irrelevant npt/ntp time */
1266       stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
1267     } else {
1268       clock_base = rtp_clock_base;
1269     }
1270
1271     all_sync = TRUE;
1272     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1273       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1274
1275       if (!ostream->have_sync) {
1276         all_sync = FALSE;
1277         continue;
1278       }
1279
1280       /* change in current stream's base from previously init'ed value
1281        * leads to reset of all stream's base */
1282       if (stream != ostream && stream->clock_base >= 0 &&
1283           (stream->clock_base != clock_base)) {
1284         GST_DEBUG_OBJECT (bin, "reset upon clock base change");
1285         ostream->clock_base = -100 * GST_SECOND;
1286         ostream->rtp_delta = 0;
1287       }
1288
1289       if (ostream->rt_delta < min)
1290         min = ostream->rt_delta;
1291       if (ostream->rtp_delta < rtp_min)
1292         rtp_min = ostream->rtp_delta;
1293     }
1294
1295     /* arrange to re-sync for each stream upon significant change,
1296      * e.g. post-seek */
1297     all_sync = all_sync && (stream->clock_base == clock_base);
1298     stream->clock_base = clock_base;
1299
1300     /* may need init performed above later on, but nothing more to do now */
1301     if (client->nstreams <= 1)
1302       return;
1303
1304     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
1305         " all sync %d", client, min, all_sync);
1306     GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
1307
1308     switch (rtcp_sync) {
1309       case GST_RTP_BIN_RTCP_SYNC_RTP:
1310         if (!use_rtp)
1311           break;
1312         GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
1313             "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
1314         /* fall-through */
1315       case GST_RTP_BIN_RTCP_SYNC_INITIAL:
1316         /* if all have been synced already, do not bother further */
1317         if (all_sync) {
1318           GST_DEBUG_OBJECT (bin, "all streams already synced; done");
1319           return;
1320         }
1321         break;
1322       default:
1323         break;
1324     }
1325
1326     /* bail out if we adjusted recently enough */
1327     if (all_sync && (last_unix - bin->priv->last_unix) <
1328         bin->rtcp_sync_interval * GST_MSECOND) {
1329       GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; "
1330           "previous sender info too recent "
1331           "(previous UNIX %" G_GUINT64_FORMAT ")", bin->priv->last_unix);
1332       return;
1333     }
1334     bin->priv->last_unix = last_unix;
1335
1336     /* calculate offsets for each stream */
1337     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
1338       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
1339       gint64 ts_offset;
1340
1341       /* ignore streams for which we didn't receive an SR packet yet, we
1342        * can't synchronize them yet. We can however sync other streams just
1343        * fine. */
1344       if (!ostream->have_sync)
1345         continue;
1346
1347       /* calculate offset to our reference stream, this should always give a
1348        * positive number. */
1349       if (use_rtp)
1350         ts_offset = ostream->rtp_delta - rtp_min;
1351       else
1352         ts_offset = ostream->rt_delta - min;
1353
1354       stream_set_ts_offset (bin, ostream, ts_offset, TRUE);
1355     }
1356   }
1357   gst_rtp_bin_send_sync_event (stream);
1358
1359   return;
1360 }
1361
1362 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
1363   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
1364           (b) = gst_rtcp_packet_move_to_next ((packet)))
1365
1366 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
1367   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
1368           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
1369
1370 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
1371   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
1372           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
1373
1374 static void
1375 gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
1376     GstRtpBinStream * stream)
1377 {
1378   GstRtpBin *bin;
1379   GstRTCPPacket packet;
1380   guint32 ssrc;
1381   guint64 ntptime;
1382   gboolean have_sr, have_sdes;
1383   gboolean more;
1384   guint64 base_rtptime;
1385   guint64 base_time;
1386   guint clock_rate;
1387   guint64 clock_base;
1388   guint64 extrtptime;
1389   GstBuffer *buffer;
1390   GstRTCPBuffer rtcp = { NULL, };
1391
1392   bin = stream->bin;
1393
1394   GST_DEBUG_OBJECT (bin, "sync handler called");
1395
1396   /* get the last relation between the rtp timestamps and the gstreamer
1397    * timestamps. We get this info directly from the jitterbuffer which
1398    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
1399    * what the current situation is. */
1400   base_rtptime =
1401       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
1402   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
1403   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
1404   clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
1405   extrtptime =
1406       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
1407   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1408
1409   have_sr = FALSE;
1410   have_sdes = FALSE;
1411
1412   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1413
1414   GST_RTCP_BUFFER_FOR_PACKETS (more, &rtcp, &packet) {
1415     /* first packet must be SR or RR or else the validate would have failed */
1416     switch (gst_rtcp_packet_get_type (&packet)) {
1417       case GST_RTCP_TYPE_SR:
1418         /* only parse first. There is only supposed to be one SR in the packet
1419          * but we will deal with malformed packets gracefully */
1420         if (have_sr)
1421           break;
1422         /* get NTP and RTP times */
1423         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1424             NULL, NULL);
1425
1426         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
1427         /* ignore SR that is not ours */
1428         if (ssrc != stream->ssrc)
1429           continue;
1430
1431         have_sr = TRUE;
1432         break;
1433       case GST_RTCP_TYPE_SDES:
1434       {
1435         gboolean more_items, more_entries;
1436
1437         /* only deal with first SDES, there is only supposed to be one SDES in
1438          * the RTCP packet but we deal with bad packets gracefully. Also bail
1439          * out if we have not seen an SR item yet. */
1440         if (have_sdes || !have_sr)
1441           break;
1442
1443         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
1444           /* skip items that are not about the SSRC of the sender */
1445           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
1446             continue;
1447
1448           /* find the CNAME entry */
1449           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1450             GstRTCPSDESType type;
1451             guint8 len;
1452             guint8 *data;
1453
1454             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1455
1456             if (type == GST_RTCP_SDES_CNAME) {
1457               GST_RTP_BIN_LOCK (bin);
1458               /* associate the stream to CNAME */
1459               gst_rtp_bin_associate (bin, stream, len, data,
1460                   ntptime, extrtptime, base_rtptime, base_time, clock_rate,
1461                   clock_base);
1462               GST_RTP_BIN_UNLOCK (bin);
1463             }
1464           }
1465         }
1466         have_sdes = TRUE;
1467         break;
1468       }
1469       default:
1470         /* we can ignore these packets */
1471         break;
1472     }
1473   }
1474   gst_rtcp_buffer_unmap (&rtcp);
1475 }
1476
1477 /* create a new stream with @ssrc in @session. Must be called with
1478  * RTP_SESSION_LOCK. */
1479 static GstRtpBinStream *
1480 create_stream (GstRtpBinSession * session, guint32 ssrc)
1481 {
1482   GstElement *buffer, *demux = NULL;
1483   GstRtpBinStream *stream;
1484   GstRtpBin *rtpbin;
1485   GstState target;
1486
1487   rtpbin = session->bin;
1488
1489   if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
1490     goto no_jitterbuffer;
1491
1492   if (!rtpbin->ignore_pt)
1493     if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
1494       goto no_demux;
1495
1496   stream = g_new0 (GstRtpBinStream, 1);
1497   stream->ssrc = ssrc;
1498   stream->bin = rtpbin;
1499   stream->session = session;
1500   stream->buffer = buffer;
1501   stream->demux = demux;
1502
1503   stream->have_sync = FALSE;
1504   stream->rt_delta = 0;
1505   stream->rtp_delta = 0;
1506   stream->percent = 100;
1507   stream->clock_base = -100 * GST_SECOND;
1508   session->streams = g_slist_prepend (session->streams, stream);
1509
1510   /* provide clock_rate to the jitterbuffer when needed */
1511   stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1512       (GCallback) pt_map_requested, session);
1513   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
1514       (GCallback) on_npt_stop, stream);
1515
1516   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
1517   g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
1518
1519   /* configure latency and packet lost */
1520   g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
1521   g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
1522   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
1523   g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
1524   g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
1525
1526   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
1527       buffer, session->id, ssrc);
1528
1529   if (!rtpbin->ignore_pt)
1530     gst_bin_add (GST_BIN_CAST (rtpbin), demux);
1531   gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
1532
1533   /* link stuff */
1534   if (demux)
1535     gst_element_link_pads_full (buffer, "src", demux, "sink",
1536         GST_PAD_LINK_CHECK_NOTHING);
1537
1538   if (rtpbin->buffering) {
1539     guint64 last_out;
1540
1541     GST_INFO_OBJECT (rtpbin,
1542         "bin is buffering, set jitterbuffer as not active");
1543     g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out);
1544   }
1545
1546
1547   GST_OBJECT_LOCK (rtpbin);
1548   target = GST_STATE_TARGET (rtpbin);
1549   GST_OBJECT_UNLOCK (rtpbin);
1550
1551   /* from sink to source */
1552   if (demux)
1553     gst_element_set_state (demux, target);
1554
1555   gst_element_set_state (buffer, target);
1556
1557   return stream;
1558
1559   /* ERRORS */
1560 no_jitterbuffer:
1561   {
1562     g_warning ("rtpbin: could not create rtpjitterbuffer element");
1563     return NULL;
1564   }
1565 no_demux:
1566   {
1567     gst_object_unref (buffer);
1568     g_warning ("rtpbin: could not create rtpptdemux element");
1569     return NULL;
1570   }
1571 }
1572
1573 /* called with RTP_BIN_LOCK */
1574 static void
1575 free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
1576 {
1577   GSList *clients, *next_client;
1578
1579   GST_DEBUG_OBJECT (bin, "freeing stream %p", stream);
1580
1581   if (stream->demux) {
1582     g_signal_handler_disconnect (stream->demux, stream->demux_newpad_sig);
1583     g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
1584     g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
1585   }
1586   g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
1587   g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
1588   g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
1589
1590   if (stream->demux)
1591     gst_element_set_locked_state (stream->demux, TRUE);
1592   gst_element_set_locked_state (stream->buffer, TRUE);
1593
1594   if (stream->demux)
1595     gst_element_set_state (stream->demux, GST_STATE_NULL);
1596   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1597
1598   /* now remove this signal, we need this while going to NULL because it to
1599    * do some cleanups */
1600   if (stream->demux)
1601     g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
1602
1603   gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
1604   if (stream->demux)
1605     gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
1606
1607   for (clients = bin->clients; clients; clients = next_client) {
1608     GstRtpBinClient *client = (GstRtpBinClient *) clients->data;
1609     GSList *streams, *next_stream;
1610
1611     next_client = g_slist_next (clients);
1612
1613     for (streams = client->streams; streams; streams = next_stream) {
1614       GstRtpBinStream *ostream = (GstRtpBinStream *) streams->data;
1615
1616       next_stream = g_slist_next (streams);
1617
1618       if (ostream == stream) {
1619         client->streams = g_slist_delete_link (client->streams, streams);
1620         /* If this was the last stream belonging to this client,
1621          * clean up the client. */
1622         if (--client->nstreams == 0) {
1623           bin->clients = g_slist_delete_link (bin->clients, clients);
1624           free_client (client, bin);
1625           break;
1626         }
1627       }
1628     }
1629   }
1630   g_free (stream);
1631 }
1632
1633 /* GObject vmethods */
1634 static void gst_rtp_bin_dispose (GObject * object);
1635 static void gst_rtp_bin_finalize (GObject * object);
1636 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1637     const GValue * value, GParamSpec * pspec);
1638 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1639     GValue * value, GParamSpec * pspec);
1640
1641 /* GstElement vmethods */
1642 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1643     GstStateChange transition);
1644 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1645     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
1646 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1647 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1648
1649 #define gst_rtp_bin_parent_class parent_class
1650 G_DEFINE_TYPE (GstRtpBin, gst_rtp_bin, GST_TYPE_BIN);
1651
1652 static gboolean
1653 _gst_element_accumulator (GSignalInvocationHint * ihint,
1654     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1655 {
1656   GstElement *element;
1657
1658   element = g_value_get_object (handler_return);
1659   GST_DEBUG ("got element %" GST_PTR_FORMAT, element);
1660
1661   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1662     g_value_set_object (return_accu, element);
1663
1664   /* stop emission if we have an element */
1665   return (element == NULL);
1666 }
1667
1668 static gboolean
1669 _gst_caps_accumulator (GSignalInvocationHint * ihint,
1670     GValue * return_accu, const GValue * handler_return, gpointer dummy)
1671 {
1672   GstCaps *caps;
1673
1674   caps = g_value_get_boxed (handler_return);
1675   GST_DEBUG ("got caps %" GST_PTR_FORMAT, caps);
1676
1677   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
1678     g_value_set_boxed (return_accu, caps);
1679
1680   /* stop emission if we have a caps */
1681   return (caps == NULL);
1682 }
1683
1684 static void
1685 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1686 {
1687   GObjectClass *gobject_class;
1688   GstElementClass *gstelement_class;
1689   GstBinClass *gstbin_class;
1690
1691   gobject_class = (GObjectClass *) klass;
1692   gstelement_class = (GstElementClass *) klass;
1693   gstbin_class = (GstBinClass *) klass;
1694
1695   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1696
1697   gobject_class->dispose = gst_rtp_bin_dispose;
1698   gobject_class->finalize = gst_rtp_bin_finalize;
1699   gobject_class->set_property = gst_rtp_bin_set_property;
1700   gobject_class->get_property = gst_rtp_bin_get_property;
1701
1702   g_object_class_install_property (gobject_class, PROP_LATENCY,
1703       g_param_spec_uint ("latency", "Buffer latency in ms",
1704           "Default amount of ms to buffer in the jitterbuffers", 0,
1705           G_MAXUINT, DEFAULT_LATENCY_MS,
1706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1707
1708   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
1709       g_param_spec_boolean ("drop-on-latency",
1710           "Drop buffers when maximum latency is reached",
1711           "Tells the jitterbuffer to never exceed the given latency in size",
1712           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1713
1714   /**
1715    * GstRtpBin::request-pt-map:
1716    * @rtpbin: the object which received the signal
1717    * @session: the session
1718    * @pt: the pt
1719    *
1720    * Request the payload type as #GstCaps for @pt in @session.
1721    */
1722   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1723       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1724       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1725       _gst_caps_accumulator, NULL, g_cclosure_marshal_generic, GST_TYPE_CAPS,
1726       2, G_TYPE_UINT, G_TYPE_UINT);
1727
1728     /**
1729    * GstRtpBin::payload-type-change:
1730    * @rtpbin: the object which received the signal
1731    * @session: the session
1732    * @pt: the pt
1733    *
1734    * Signal that the current payload type changed to @pt in @session.
1735    */
1736   gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
1737       g_signal_new ("payload-type-change", G_TYPE_FROM_CLASS (klass),
1738       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, payload_type_change),
1739       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1740       G_TYPE_UINT);
1741
1742   /**
1743    * GstRtpBin::clear-pt-map:
1744    * @rtpbin: the object which received the signal
1745    *
1746    * Clear all previously cached pt-mapping obtained with
1747    * #GstRtpBin::request-pt-map.
1748    */
1749   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1750       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1751       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1752           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1753       0, G_TYPE_NONE);
1754
1755   /**
1756    * GstRtpBin::reset-sync:
1757    * @rtpbin: the object which received the signal
1758    *
1759    * Reset all currently configured lip-sync parameters and require new SR
1760    * packets for all streams before lip-sync is attempted again.
1761    */
1762   gst_rtp_bin_signals[SIGNAL_RESET_SYNC] =
1763       g_signal_new ("reset-sync", G_TYPE_FROM_CLASS (klass),
1764       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1765           reset_sync), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1766       0, G_TYPE_NONE);
1767
1768   /**
1769    * GstRtpBin::get-internal-session:
1770    * @rtpbin: the object which received the signal
1771    * @id: the session id
1772    *
1773    * Request the internal RTPSession object as #GObject in session @id.
1774    */
1775   gst_rtp_bin_signals[SIGNAL_GET_INTERNAL_SESSION] =
1776       g_signal_new ("get-internal-session", G_TYPE_FROM_CLASS (klass),
1777       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1778           get_internal_session), NULL, NULL, g_cclosure_marshal_generic,
1779       RTP_TYPE_SESSION, 1, G_TYPE_UINT);
1780
1781   /**
1782    * GstRtpBin::on-new-ssrc:
1783    * @rtpbin: the object which received the signal
1784    * @session: the session
1785    * @ssrc: the SSRC
1786    *
1787    * Notify of a new SSRC that entered @session.
1788    */
1789   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1790       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1791       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1792       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1793       G_TYPE_UINT);
1794   /**
1795    * GstRtpBin::on-ssrc-collision:
1796    * @rtpbin: the object which received the signal
1797    * @session: the session
1798    * @ssrc: the SSRC
1799    *
1800    * Notify when we have an SSRC collision
1801    */
1802   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1803       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1804       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1805       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1806       G_TYPE_UINT);
1807   /**
1808    * GstRtpBin::on-ssrc-validated:
1809    * @rtpbin: the object which received the signal
1810    * @session: the session
1811    * @ssrc: the SSRC
1812    *
1813    * Notify of a new SSRC that became validated.
1814    */
1815   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1816       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1817       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1818       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1819       G_TYPE_UINT);
1820   /**
1821    * GstRtpBin::on-ssrc-active:
1822    * @rtpbin: the object which received the signal
1823    * @session: the session
1824    * @ssrc: the SSRC
1825    *
1826    * Notify of a SSRC that is active, i.e., sending RTCP.
1827    */
1828   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1829       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1830       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1831       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1832       G_TYPE_UINT);
1833   /**
1834    * GstRtpBin::on-ssrc-sdes:
1835    * @rtpbin: the object which received the signal
1836    * @session: the session
1837    * @ssrc: the SSRC
1838    *
1839    * Notify of a SSRC that is active, i.e., sending RTCP.
1840    */
1841   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1842       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1843       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1844       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1845       G_TYPE_UINT);
1846
1847   /**
1848    * GstRtpBin::on-bye-ssrc:
1849    * @rtpbin: the object which received the signal
1850    * @session: the session
1851    * @ssrc: the SSRC
1852    *
1853    * Notify of an SSRC that became inactive because of a BYE packet.
1854    */
1855   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1856       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1857       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1858       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1859       G_TYPE_UINT);
1860   /**
1861    * GstRtpBin::on-bye-timeout:
1862    * @rtpbin: the object which received the signal
1863    * @session: the session
1864    * @ssrc: the SSRC
1865    *
1866    * Notify of an SSRC that has timed out because of BYE
1867    */
1868   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1869       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1870       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1871       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1872       G_TYPE_UINT);
1873   /**
1874    * GstRtpBin::on-timeout:
1875    * @rtpbin: the object which received the signal
1876    * @session: the session
1877    * @ssrc: the SSRC
1878    *
1879    * Notify of an SSRC that has timed out
1880    */
1881   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1882       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1883       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1884       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1885       G_TYPE_UINT);
1886   /**
1887    * GstRtpBin::on-sender-timeout:
1888    * @rtpbin: the object which received the signal
1889    * @session: the session
1890    * @ssrc: the SSRC
1891    *
1892    * Notify of a sender SSRC that has timed out and became a receiver
1893    */
1894   gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT] =
1895       g_signal_new ("on-sender-timeout", G_TYPE_FROM_CLASS (klass),
1896       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_sender_timeout),
1897       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1898       G_TYPE_UINT);
1899
1900   /**
1901    * GstRtpBin::on-npt-stop:
1902    * @rtpbin: the object which received the signal
1903    * @session: the session
1904    * @ssrc: the SSRC
1905    *
1906    * Notify that SSRC sender has sent data up to the configured NPT stop time.
1907    */
1908   gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] =
1909       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
1910       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop),
1911       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
1912       G_TYPE_UINT);
1913
1914   /**
1915    * GstRtpBin::request-rtp-encoder:
1916    * @rtpbin: the object which received the signal
1917    * @session: the session
1918    *
1919    * Request an RTP encoder element for the given @session. The encoder
1920    * element will be added to the bin if not previously added.
1921    *
1922    * If no handler is connected, no encoder will be used.
1923    *
1924    * Since: 1.4
1925    */
1926   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_ENCODER] =
1927       g_signal_new ("request-rtp-encoder", G_TYPE_FROM_CLASS (klass),
1928       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1929           request_rtp_encoder), _gst_element_accumulator, NULL,
1930       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1931
1932   /**
1933    * GstRtpBin::request-rtp-decoder:
1934    * @rtpbin: the object which received the signal
1935    * @session: the session
1936    *
1937    * Request an RTP decoder element for the given @session. The decoder
1938    * element will be added to the bin if not previously added.
1939    *
1940    * If no handler is connected, no encoder will be used.
1941    *
1942    * Since: 1.4
1943    */
1944   gst_rtp_bin_signals[SIGNAL_REQUEST_RTP_DECODER] =
1945       g_signal_new ("request-rtp-decoder", G_TYPE_FROM_CLASS (klass),
1946       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1947           request_rtp_decoder), _gst_element_accumulator, NULL,
1948       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1949
1950   /**
1951    * GstRtpBin::request-rtcp-encoder:
1952    * @rtpbin: the object which received the signal
1953    * @session: the session
1954    *
1955    * Request an RTCP encoder element for the given @session. The encoder
1956    * element will be added to the bin if not previously added.
1957    *
1958    * If no handler is connected, no encoder will be used.
1959    *
1960    * Since: 1.4
1961    */
1962   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_ENCODER] =
1963       g_signal_new ("request-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
1964       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1965           request_rtcp_encoder), _gst_element_accumulator, NULL,
1966       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1967
1968   /**
1969    * GstRtpBin::request-rtcp-decoder:
1970    * @rtpbin: the object which received the signal
1971    * @session: the session
1972    *
1973    * Request an RTCP decoder element for the given @session. The decoder
1974    * element will be added to the bin if not previously added.
1975    *
1976    * If no handler is connected, no encoder will be used.
1977    *
1978    * Since: 1.4
1979    */
1980   gst_rtp_bin_signals[SIGNAL_REQUEST_RTCP_DECODER] =
1981       g_signal_new ("request-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
1982       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
1983           request_rtcp_decoder), _gst_element_accumulator, NULL,
1984       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
1985
1986   /**
1987    * GstRtpBin::new-jitterbuffer:
1988    * @rtpbin: the object which received the signal
1989    * @jitterbuffer: the new jitterbuffer
1990    * @session: the session
1991    * @ssrc: the SSRC
1992    *
1993    * Notify that a new @jitterbuffer was created for @session and @ssrc.
1994    * This signal can, for example, be used to configure @jitterbuffer.
1995    *
1996    * Since: 1.4
1997    */
1998   gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER] =
1999       g_signal_new ("new-jitterbuffer", G_TYPE_FROM_CLASS (klass),
2000       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2001           new_jitterbuffer), NULL, NULL, g_cclosure_marshal_generic,
2002       G_TYPE_NONE, 3, GST_TYPE_ELEMENT, G_TYPE_UINT, G_TYPE_UINT);
2003
2004   /**
2005    * GstRtpBin::request-aux-sender:
2006    * @rtpbin: the object which received the signal
2007    * @session: the session
2008    *
2009    * Request an AUX sender element for the given @session. The AUX
2010    * element will be added to the bin.
2011    *
2012    * If no handler is connected, no AUX element will be used.
2013    *
2014    * Since: 1.4
2015    */
2016   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_SENDER] =
2017       g_signal_new ("request-aux-sender", G_TYPE_FROM_CLASS (klass),
2018       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2019           request_aux_sender), _gst_element_accumulator, NULL,
2020       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2021   /**
2022    * GstRtpBin::request-aux-receiver:
2023    * @rtpbin: the object which received the signal
2024    * @session: the session
2025    *
2026    * Request an AUX receiver element for the given @session. The AUX
2027    * element will be added to the bin.
2028    *
2029    * If no handler is connected, no AUX element will be used.
2030    *
2031    * Since: 1.4
2032    */
2033   gst_rtp_bin_signals[SIGNAL_REQUEST_AUX_RECEIVER] =
2034       g_signal_new ("request-aux-receiver", G_TYPE_FROM_CLASS (klass),
2035       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
2036           request_aux_receiver), _gst_element_accumulator, NULL,
2037       g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
2038
2039   g_object_class_install_property (gobject_class, PROP_SDES,
2040       g_param_spec_boxed ("sdes", "SDES",
2041           "The SDES items of this session",
2042           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2043
2044   g_object_class_install_property (gobject_class, PROP_DO_LOST,
2045       g_param_spec_boolean ("do-lost", "Do Lost",
2046           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
2047           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2048
2049   g_object_class_install_property (gobject_class, PROP_AUTOREMOVE,
2050       g_param_spec_boolean ("autoremove", "Auto Remove",
2051           "Automatically remove timed out sources", DEFAULT_AUTOREMOVE,
2052           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2053
2054   g_object_class_install_property (gobject_class, PROP_IGNORE_PT,
2055       g_param_spec_boolean ("ignore-pt", "Ignore PT",
2056           "Do not demultiplex based on PT values", DEFAULT_IGNORE_PT,
2057           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2058
2059   g_object_class_install_property (gobject_class, PROP_USE_PIPELINE_CLOCK,
2060       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
2061           "Use the pipeline running-time to set the NTP time in the RTCP SR messages",
2062           DEFAULT_USE_PIPELINE_CLOCK,
2063           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2064   /**
2065    * GstRtpBin:buffer-mode:
2066    *
2067    * Control the buffering and timestamping mode used by the jitterbuffer.
2068    */
2069   g_object_class_install_property (gobject_class, PROP_BUFFER_MODE,
2070       g_param_spec_enum ("buffer-mode", "Buffer Mode",
2071           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
2072           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2073   /**
2074    * GstRtpBin:ntp-sync:
2075    *
2076    * Set the NTP time from the sender reports as the running-time on the
2077    * buffers. When both the sender and receiver have sychronized
2078    * running-time, i.e. when the clock and base-time is shared
2079    * between the receivers and the and the senders, this option can be
2080    * used to synchronize receivers on multiple machines.
2081    */
2082   g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
2083       g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
2084           "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
2085           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2086
2087   /**
2088    * GstRtpBin:rtcp-sync:
2089    *
2090    * If not synchronizing (directly) to the NTP clock, determines how to sync
2091    * the various streams.
2092    */
2093   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
2094       g_param_spec_enum ("rtcp-sync", "RTCP Sync",
2095           "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
2096           DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2097
2098   /**
2099    * GstRtpBin:rtcp-sync-interval:
2100    *
2101    * Determines how often to sync streams using RTCP data.
2102    */
2103   g_object_class_install_property (gobject_class, PROP_RTCP_SYNC_INTERVAL,
2104       g_param_spec_uint ("rtcp-sync-interval", "RTCP Sync Interval",
2105           "RTCP SR interval synchronization (ms) (0 = always)",
2106           0, G_MAXUINT, DEFAULT_RTCP_SYNC_INTERVAL,
2107           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2108
2109   g_object_class_install_property (gobject_class, PROP_DO_SYNC_EVENT,
2110       g_param_spec_boolean ("do-sync-event", "Do Sync Event",
2111           "Send event downstream when a stream is synchronized to the sender",
2112           DEFAULT_DO_SYNC_EVENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2113
2114   /**
2115    * GstRtpBin:do-retransmission:
2116    *
2117    * Enables RTP retransmission on all streams. To control retransmission on
2118    * a per-SSRC basis, connect to the #GstRtpBin::new-jitterbuffer signal and
2119    * set the #GstRtpJitterBuffer::do-retransmission property on the
2120    * #GstRtpJitterBuffer object instead.
2121    */
2122   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
2123       g_param_spec_boolean ("do-retransmission", "Do retransmission",
2124           "Enable retransmission on all streams",
2125           DEFAULT_DO_RETRANSMISSION,
2126           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2127
2128   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
2129   gstelement_class->request_new_pad =
2130       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
2131   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
2132
2133   /* sink pads */
2134   gst_element_class_add_pad_template (gstelement_class,
2135       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
2136   gst_element_class_add_pad_template (gstelement_class,
2137       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
2138   gst_element_class_add_pad_template (gstelement_class,
2139       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
2140
2141   /* src pads */
2142   gst_element_class_add_pad_template (gstelement_class,
2143       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
2144   gst_element_class_add_pad_template (gstelement_class,
2145       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
2146   gst_element_class_add_pad_template (gstelement_class,
2147       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
2148
2149   gst_element_class_set_static_metadata (gstelement_class, "RTP Bin",
2150       "Filter/Network/RTP",
2151       "Real-Time Transport Protocol bin",
2152       "Wim Taymans <wim.taymans@gmail.com>");
2153
2154   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
2155
2156   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
2157   klass->reset_sync = GST_DEBUG_FUNCPTR (gst_rtp_bin_reset_sync);
2158   klass->get_internal_session =
2159       GST_DEBUG_FUNCPTR (gst_rtp_bin_get_internal_session);
2160   klass->request_rtp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2161   klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2162   klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
2163   klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
2164
2165   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
2166 }
2167
2168 static void
2169 gst_rtp_bin_init (GstRtpBin * rtpbin)
2170 {
2171   gchar *cname;
2172
2173   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
2174   g_mutex_init (&rtpbin->priv->bin_lock);
2175   g_mutex_init (&rtpbin->priv->dyn_lock);
2176
2177   rtpbin->latency_ms = DEFAULT_LATENCY_MS;
2178   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
2179   rtpbin->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
2180   rtpbin->do_lost = DEFAULT_DO_LOST;
2181   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
2182   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
2183   rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
2184   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
2185   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
2186   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
2187   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
2188   rtpbin->send_sync_event = DEFAULT_DO_SYNC_EVENT;
2189   rtpbin->do_retransmission = DEFAULT_DO_RETRANSMISSION;
2190
2191   /* some default SDES entries */
2192   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
2193   rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes",
2194       "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL);
2195   g_free (cname);
2196 }
2197
2198 static void
2199 gst_rtp_bin_dispose (GObject * object)
2200 {
2201   GstRtpBin *rtpbin;
2202
2203   rtpbin = GST_RTP_BIN (object);
2204
2205   GST_RTP_BIN_LOCK (rtpbin);
2206   GST_DEBUG_OBJECT (object, "freeing sessions");
2207   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, rtpbin);
2208   g_slist_free (rtpbin->sessions);
2209   rtpbin->sessions = NULL;
2210   GST_RTP_BIN_UNLOCK (rtpbin);
2211
2212   G_OBJECT_CLASS (parent_class)->dispose (object);
2213 }
2214
2215 static void
2216 gst_rtp_bin_finalize (GObject * object)
2217 {
2218   GstRtpBin *rtpbin;
2219
2220   rtpbin = GST_RTP_BIN (object);
2221
2222   if (rtpbin->sdes)
2223     gst_structure_free (rtpbin->sdes);
2224
2225   g_mutex_clear (&rtpbin->priv->bin_lock);
2226   g_mutex_clear (&rtpbin->priv->dyn_lock);
2227
2228   G_OBJECT_CLASS (parent_class)->finalize (object);
2229 }
2230
2231
2232 static void
2233 gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes)
2234 {
2235   GSList *item;
2236
2237   if (sdes == NULL)
2238     return;
2239
2240   GST_RTP_BIN_LOCK (bin);
2241
2242   GST_OBJECT_LOCK (bin);
2243   if (bin->sdes)
2244     gst_structure_free (bin->sdes);
2245   bin->sdes = gst_structure_copy (sdes);
2246   GST_OBJECT_UNLOCK (bin);
2247
2248   /* store in all sessions */
2249   for (item = bin->sessions; item; item = g_slist_next (item)) {
2250     GstRtpBinSession *session = item->data;
2251     g_object_set (session->session, "sdes", sdes, NULL);
2252   }
2253
2254   GST_RTP_BIN_UNLOCK (bin);
2255 }
2256
2257 static GstStructure *
2258 gst_rtp_bin_get_sdes_struct (GstRtpBin * bin)
2259 {
2260   GstStructure *result;
2261
2262   GST_OBJECT_LOCK (bin);
2263   result = gst_structure_copy (bin->sdes);
2264   GST_OBJECT_UNLOCK (bin);
2265
2266   return result;
2267 }
2268
2269 static void
2270 gst_rtp_bin_set_property (GObject * object, guint prop_id,
2271     const GValue * value, GParamSpec * pspec)
2272 {
2273   GstRtpBin *rtpbin;
2274
2275   rtpbin = GST_RTP_BIN (object);
2276
2277   switch (prop_id) {
2278     case PROP_LATENCY:
2279       GST_RTP_BIN_LOCK (rtpbin);
2280       rtpbin->latency_ms = g_value_get_uint (value);
2281       rtpbin->latency_ns = rtpbin->latency_ms * GST_MSECOND;
2282       GST_RTP_BIN_UNLOCK (rtpbin);
2283       /* propagate the property down to the jitterbuffer */
2284       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
2285       break;
2286     case PROP_DROP_ON_LATENCY:
2287       GST_RTP_BIN_LOCK (rtpbin);
2288       rtpbin->drop_on_latency = g_value_get_boolean (value);
2289       GST_RTP_BIN_UNLOCK (rtpbin);
2290       /* propagate the property down to the jitterbuffer */
2291       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2292           "drop-on-latency", value);
2293       break;
2294     case PROP_SDES:
2295       gst_rtp_bin_set_sdes_struct (rtpbin, g_value_get_boxed (value));
2296       break;
2297     case PROP_DO_LOST:
2298       GST_RTP_BIN_LOCK (rtpbin);
2299       rtpbin->do_lost = g_value_get_boolean (value);
2300       GST_RTP_BIN_UNLOCK (rtpbin);
2301       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
2302       break;
2303     case PROP_NTP_SYNC:
2304       rtpbin->ntp_sync = g_value_get_boolean (value);
2305       break;
2306     case PROP_RTCP_SYNC:
2307       g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
2308       break;
2309     case PROP_RTCP_SYNC_INTERVAL:
2310       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
2311       break;
2312     case PROP_IGNORE_PT:
2313       rtpbin->ignore_pt = g_value_get_boolean (value);
2314       break;
2315     case PROP_AUTOREMOVE:
2316       rtpbin->priv->autoremove = g_value_get_boolean (value);
2317       break;
2318     case PROP_USE_PIPELINE_CLOCK:
2319     {
2320       GSList *sessions;
2321       GST_RTP_BIN_LOCK (rtpbin);
2322       rtpbin->use_pipeline_clock = g_value_get_boolean (value);
2323       for (sessions = rtpbin->sessions; sessions;
2324           sessions = g_slist_next (sessions)) {
2325         GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2326
2327         g_object_set (G_OBJECT (session->session),
2328             "use-pipeline-clock", rtpbin->use_pipeline_clock, NULL);
2329       }
2330       GST_RTP_BIN_UNLOCK (rtpbin);
2331     }
2332       break;
2333     case PROP_DO_SYNC_EVENT:
2334       rtpbin->send_sync_event = g_value_get_boolean (value);
2335       break;
2336     case PROP_BUFFER_MODE:
2337       GST_RTP_BIN_LOCK (rtpbin);
2338       rtpbin->buffer_mode = g_value_get_enum (value);
2339       GST_RTP_BIN_UNLOCK (rtpbin);
2340       /* propagate the property down to the jitterbuffer */
2341       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "mode", value);
2342       break;
2343     case PROP_DO_RETRANSMISSION:
2344       GST_RTP_BIN_LOCK (rtpbin);
2345       rtpbin->do_retransmission = g_value_get_boolean (value);
2346       GST_RTP_BIN_UNLOCK (rtpbin);
2347       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
2348           "do-retransmission", value);
2349       break;
2350     default:
2351       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2352       break;
2353   }
2354 }
2355
2356 static void
2357 gst_rtp_bin_get_property (GObject * object, guint prop_id,
2358     GValue * value, GParamSpec * pspec)
2359 {
2360   GstRtpBin *rtpbin;
2361
2362   rtpbin = GST_RTP_BIN (object);
2363
2364   switch (prop_id) {
2365     case PROP_LATENCY:
2366       GST_RTP_BIN_LOCK (rtpbin);
2367       g_value_set_uint (value, rtpbin->latency_ms);
2368       GST_RTP_BIN_UNLOCK (rtpbin);
2369       break;
2370     case PROP_DROP_ON_LATENCY:
2371       GST_RTP_BIN_LOCK (rtpbin);
2372       g_value_set_boolean (value, rtpbin->drop_on_latency);
2373       GST_RTP_BIN_UNLOCK (rtpbin);
2374       break;
2375     case PROP_SDES:
2376       g_value_take_boxed (value, gst_rtp_bin_get_sdes_struct (rtpbin));
2377       break;
2378     case PROP_DO_LOST:
2379       GST_RTP_BIN_LOCK (rtpbin);
2380       g_value_set_boolean (value, rtpbin->do_lost);
2381       GST_RTP_BIN_UNLOCK (rtpbin);
2382       break;
2383     case PROP_IGNORE_PT:
2384       g_value_set_boolean (value, rtpbin->ignore_pt);
2385       break;
2386     case PROP_NTP_SYNC:
2387       g_value_set_boolean (value, rtpbin->ntp_sync);
2388       break;
2389     case PROP_RTCP_SYNC:
2390       g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
2391       break;
2392     case PROP_RTCP_SYNC_INTERVAL:
2393       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
2394       break;
2395     case PROP_AUTOREMOVE:
2396       g_value_set_boolean (value, rtpbin->priv->autoremove);
2397       break;
2398     case PROP_BUFFER_MODE:
2399       g_value_set_enum (value, rtpbin->buffer_mode);
2400       break;
2401     case PROP_USE_PIPELINE_CLOCK:
2402       g_value_set_boolean (value, rtpbin->use_pipeline_clock);
2403       break;
2404     case PROP_DO_SYNC_EVENT:
2405       g_value_set_boolean (value, rtpbin->send_sync_event);
2406       break;
2407     case PROP_DO_RETRANSMISSION:
2408       GST_RTP_BIN_LOCK (rtpbin);
2409       g_value_set_boolean (value, rtpbin->do_retransmission);
2410       GST_RTP_BIN_UNLOCK (rtpbin);
2411       break;
2412     default:
2413       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2414       break;
2415   }
2416 }
2417
2418 static void
2419 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
2420 {
2421   GstRtpBin *rtpbin;
2422
2423   rtpbin = GST_RTP_BIN (bin);
2424
2425   switch (GST_MESSAGE_TYPE (message)) {
2426     case GST_MESSAGE_ELEMENT:
2427     {
2428       const GstStructure *s = gst_message_get_structure (message);
2429
2430       /* we change the structure name and add the session ID to it */
2431       if (gst_structure_has_name (s, "application/x-rtp-source-sdes")) {
2432         GstRtpBinSession *sess;
2433
2434         /* find the session we set it as object data */
2435         sess = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2436             "GstRTPBin.session");
2437
2438         if (G_LIKELY (sess)) {
2439           message = gst_message_make_writable (message);
2440           s = gst_message_get_structure (message);
2441           gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
2442               sess->id, NULL);
2443         }
2444       }
2445       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2446       break;
2447     }
2448     case GST_MESSAGE_BUFFERING:
2449     {
2450       gint percent;
2451       gint min_percent = 100;
2452       GSList *sessions, *streams;
2453       GstRtpBinStream *stream;
2454       gboolean change = FALSE, active = FALSE;
2455       GstClockTime min_out_time;
2456       GstBufferingMode mode;
2457       gint avg_in, avg_out;
2458       gint64 buffering_left;
2459
2460       gst_message_parse_buffering (message, &percent);
2461       gst_message_parse_buffering_stats (message, &mode, &avg_in, &avg_out,
2462           &buffering_left);
2463
2464       stream =
2465           g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
2466           "GstRTPBin.stream");
2467
2468       GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
2469
2470       /* get the stream */
2471       if (G_LIKELY (stream)) {
2472         GST_RTP_BIN_LOCK (rtpbin);
2473         /* fill in the percent */
2474         stream->percent = percent;
2475
2476         /* calculate the min value for all streams */
2477         for (sessions = rtpbin->sessions; sessions;
2478             sessions = g_slist_next (sessions)) {
2479           GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2480
2481           GST_RTP_SESSION_LOCK (session);
2482           if (session->streams) {
2483             for (streams = session->streams; streams;
2484                 streams = g_slist_next (streams)) {
2485               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2486
2487               GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
2488                   stream->percent);
2489
2490               /* find min percent */
2491               if (min_percent > stream->percent)
2492                 min_percent = stream->percent;
2493             }
2494           } else {
2495             GST_INFO_OBJECT (bin,
2496                 "session has no streams, setting min_percent to 0");
2497             min_percent = 0;
2498           }
2499           GST_RTP_SESSION_UNLOCK (session);
2500         }
2501         GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
2502
2503         if (rtpbin->buffering) {
2504           if (min_percent == 100) {
2505             rtpbin->buffering = FALSE;
2506             active = TRUE;
2507             change = TRUE;
2508           }
2509         } else {
2510           if (min_percent < 100) {
2511             /* pause the streams */
2512             rtpbin->buffering = TRUE;
2513             active = FALSE;
2514             change = TRUE;
2515           }
2516         }
2517         GST_RTP_BIN_UNLOCK (rtpbin);
2518
2519         gst_message_unref (message);
2520
2521         /* make a new buffering message with the min value */
2522         message =
2523             gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
2524         gst_message_set_buffering_stats (message, mode, avg_in, avg_out,
2525             buffering_left);
2526
2527         if (G_UNLIKELY (change)) {
2528           GstClock *clock;
2529           guint64 running_time = 0;
2530           guint64 offset = 0;
2531
2532           /* figure out the running time when we have a clock */
2533           if (G_LIKELY ((clock =
2534                       gst_element_get_clock (GST_ELEMENT_CAST (bin))))) {
2535             guint64 now, base_time;
2536
2537             now = gst_clock_get_time (clock);
2538             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (bin));
2539             running_time = now - base_time;
2540             gst_object_unref (clock);
2541           }
2542           GST_DEBUG_OBJECT (bin,
2543               "running time now %" GST_TIME_FORMAT,
2544               GST_TIME_ARGS (running_time));
2545
2546           GST_RTP_BIN_LOCK (rtpbin);
2547
2548           /* when we reactivate, calculate the offsets so that all streams have
2549            * an output time that is at least as big as the running_time */
2550           offset = 0;
2551           if (active) {
2552             if (running_time > rtpbin->buffer_start) {
2553               offset = running_time - rtpbin->buffer_start;
2554               if (offset >= rtpbin->latency_ns)
2555                 offset -= rtpbin->latency_ns;
2556               else
2557                 offset = 0;
2558             }
2559           }
2560
2561           /* pause all streams */
2562           min_out_time = -1;
2563           for (sessions = rtpbin->sessions; sessions;
2564               sessions = g_slist_next (sessions)) {
2565             GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
2566
2567             GST_RTP_SESSION_LOCK (session);
2568             for (streams = session->streams; streams;
2569                 streams = g_slist_next (streams)) {
2570               GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
2571               GstElement *element = stream->buffer;
2572               guint64 last_out;
2573
2574               g_signal_emit_by_name (element, "set-active", active, offset,
2575                   &last_out);
2576
2577               if (!active) {
2578                 g_object_get (element, "percent", &stream->percent, NULL);
2579
2580                 if (last_out == -1)
2581                   last_out = 0;
2582                 if (min_out_time == -1 || last_out < min_out_time)
2583                   min_out_time = last_out;
2584               }
2585
2586               GST_DEBUG_OBJECT (bin,
2587                   "setting %p to %d, offset %" GST_TIME_FORMAT ", last %"
2588                   GST_TIME_FORMAT ", percent %d", element, active,
2589                   GST_TIME_ARGS (offset), GST_TIME_ARGS (last_out),
2590                   stream->percent);
2591             }
2592             GST_RTP_SESSION_UNLOCK (session);
2593           }
2594           GST_DEBUG_OBJECT (bin,
2595               "min out time %" GST_TIME_FORMAT, GST_TIME_ARGS (min_out_time));
2596
2597           /* the buffer_start is the min out time of all paused jitterbuffers */
2598           if (!active)
2599             rtpbin->buffer_start = min_out_time;
2600
2601           GST_RTP_BIN_UNLOCK (rtpbin);
2602         }
2603       }
2604       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2605       break;
2606     }
2607     default:
2608     {
2609       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2610       break;
2611     }
2612   }
2613 }
2614
2615 static GstStateChangeReturn
2616 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
2617 {
2618   GstStateChangeReturn res;
2619   GstRtpBin *rtpbin;
2620   GstRtpBinPrivate *priv;
2621
2622   rtpbin = GST_RTP_BIN (element);
2623   priv = rtpbin->priv;
2624
2625   switch (transition) {
2626     case GST_STATE_CHANGE_NULL_TO_READY:
2627       break;
2628     case GST_STATE_CHANGE_READY_TO_PAUSED:
2629       priv->last_unix = 0;
2630       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
2631       g_atomic_int_set (&priv->shutdown, 0);
2632       break;
2633     case GST_STATE_CHANGE_PAUSED_TO_READY:
2634       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
2635       g_atomic_int_set (&priv->shutdown, 1);
2636       /* wait for all callbacks to end by taking the lock. No new callbacks will
2637        * be able to happen as we set the shutdown flag. */
2638       GST_RTP_BIN_DYN_LOCK (rtpbin);
2639       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
2640       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2641       break;
2642     default:
2643       break;
2644   }
2645
2646   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2647
2648   switch (transition) {
2649     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2650       break;
2651     case GST_STATE_CHANGE_PAUSED_TO_READY:
2652       break;
2653     case GST_STATE_CHANGE_READY_TO_NULL:
2654       break;
2655     default:
2656       break;
2657   }
2658   return res;
2659 }
2660
2661 static GstElement *
2662 session_request_element (GstRtpBinSession * session, guint signal)
2663 {
2664   GstElement *element = NULL;
2665   GstRtpBin *bin = session->bin;
2666
2667   g_signal_emit (bin, gst_rtp_bin_signals[signal], 0, session->id, &element);
2668
2669   if (element) {
2670     if (!bin_manage_element (bin, element))
2671       goto manage_failed;
2672     session->elements = g_slist_prepend (session->elements, element);
2673   }
2674   return element;
2675
2676   /* ERRORS */
2677 manage_failed:
2678   {
2679     GST_WARNING_OBJECT (bin, "unable to manage element");
2680     gst_object_unref (element);
2681     return NULL;
2682   }
2683 }
2684
2685 static gboolean
2686 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
2687 {
2688   GstPad *gpad = GST_PAD_CAST (user_data);
2689
2690   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
2691   gst_pad_store_sticky_event (gpad, *event);
2692
2693   return TRUE;
2694 }
2695
2696 /* a new pad (SSRC) was created in @session. This signal is emited from the
2697  * payload demuxer. */
2698 static void
2699 new_payload_found (GstElement * element, guint pt, GstPad * pad,
2700     GstRtpBinStream * stream)
2701 {
2702   GstRtpBin *rtpbin;
2703   GstElementClass *klass;
2704   GstPadTemplate *templ;
2705   gchar *padname;
2706   GstPad *gpad;
2707
2708   rtpbin = stream->bin;
2709
2710   GST_DEBUG ("new payload pad %d", pt);
2711
2712   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2713
2714   /* ghost the pad to the parent */
2715   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2716   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2717   padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2718       stream->session->id, stream->ssrc, pt);
2719   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2720   g_free (padname);
2721   g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", gpad);
2722
2723   gst_pad_set_active (gpad, TRUE);
2724   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2725
2726   gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
2727   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2728
2729   return;
2730
2731 shutdown:
2732   {
2733     GST_DEBUG ("ignoring, we are shutting down");
2734     return;
2735   }
2736 }
2737
2738 static void
2739 payload_pad_removed (GstElement * element, GstPad * pad,
2740     GstRtpBinStream * stream)
2741 {
2742   GstRtpBin *rtpbin;
2743   GstPad *gpad;
2744
2745   rtpbin = stream->bin;
2746
2747   GST_DEBUG ("payload pad removed");
2748
2749   GST_RTP_BIN_DYN_LOCK (rtpbin);
2750   if ((gpad = g_object_get_data (G_OBJECT (pad), "GstRTPBin.ghostpad"))) {
2751     g_object_set_data (G_OBJECT (pad), "GstRTPBin.ghostpad", NULL);
2752
2753     gst_pad_set_active (gpad, FALSE);
2754     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2755   }
2756   GST_RTP_BIN_DYN_UNLOCK (rtpbin);
2757 }
2758
2759 static GstCaps *
2760 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
2761 {
2762   GstRtpBin *rtpbin;
2763   GstCaps *caps;
2764
2765   rtpbin = session->bin;
2766
2767   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
2768       session->id);
2769
2770   caps = get_pt_map (session, pt);
2771   if (!caps)
2772     goto no_caps;
2773
2774   return caps;
2775
2776   /* ERRORS */
2777 no_caps:
2778   {
2779     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
2780     return NULL;
2781   }
2782 }
2783
2784 static void
2785 payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session)
2786 {
2787   GST_DEBUG_OBJECT (session->bin,
2788       "emiting signal for pt type changed to %d in session %d", pt,
2789       session->id);
2790
2791   g_signal_emit (session->bin, gst_rtp_bin_signals[SIGNAL_PAYLOAD_TYPE_CHANGE],
2792       0, session->id, pt);
2793 }
2794
2795 /* emited when caps changed for the session */
2796 static void
2797 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
2798 {
2799   GstRtpBin *bin;
2800   GstCaps *caps;
2801   gint payload;
2802   const GstStructure *s;
2803
2804   bin = session->bin;
2805
2806   g_object_get (pad, "caps", &caps, NULL);
2807
2808   if (caps == NULL)
2809     return;
2810
2811   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
2812
2813   s = gst_caps_get_structure (caps, 0);
2814
2815   /* get payload, finish when it's not there */
2816   if (!gst_structure_get_int (s, "payload", &payload)) {
2817     gst_caps_unref (caps);
2818     return;
2819   }
2820
2821   GST_RTP_SESSION_LOCK (session);
2822   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
2823   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
2824   GST_RTP_SESSION_UNLOCK (session);
2825 }
2826
2827 /* a new pad (SSRC) was created in @session */
2828 static void
2829 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
2830     GstRtpBinSession * session)
2831 {
2832   GstRtpBin *rtpbin;
2833   GstRtpBinStream *stream;
2834   GstPad *sinkpad, *srcpad;
2835   gchar *padname;
2836
2837   rtpbin = session->bin;
2838
2839   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
2840       GST_DEBUG_PAD_NAME (pad));
2841
2842   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
2843
2844   GST_RTP_SESSION_LOCK (session);
2845
2846   /* create new stream */
2847   stream = create_stream (session, ssrc);
2848   if (!stream)
2849     goto no_stream;
2850
2851   /* get pad and link */
2852   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
2853   padname = g_strdup_printf ("src_%u", ssrc);
2854   srcpad = gst_element_get_static_pad (element, padname);
2855   g_free (padname);
2856   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
2857   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2858   gst_object_unref (sinkpad);
2859   gst_object_unref (srcpad);
2860
2861   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
2862   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
2863   srcpad = gst_element_get_static_pad (element, padname);
2864   g_free (padname);
2865   sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
2866   gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
2867   gst_object_unref (sinkpad);
2868   gst_object_unref (srcpad);
2869
2870   /* connect to the RTCP sync signal from the jitterbuffer */
2871   GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
2872   stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
2873       "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
2874
2875   if (stream->demux) {
2876     /* connect to the new-pad signal of the payload demuxer, this will expose the
2877      * new pad by ghosting it. */
2878     stream->demux_newpad_sig = g_signal_connect (stream->demux,
2879         "new-payload-type", (GCallback) new_payload_found, stream);
2880     stream->demux_padremoved_sig = g_signal_connect (stream->demux,
2881         "pad-removed", (GCallback) payload_pad_removed, stream);
2882
2883     /* connect to the request-pt-map signal. This signal will be emited by the
2884      * demuxer so that it can apply a proper caps on the buffers for the
2885      * depayloaders. */
2886     stream->demux_ptreq_sig = g_signal_connect (stream->demux,
2887         "request-pt-map", (GCallback) pt_map_requested, session);
2888     /* connect to the  signal so it can be forwarded. */
2889     stream->demux_ptchange_sig = g_signal_connect (stream->demux,
2890         "payload-type-change", (GCallback) payload_type_change, session);
2891   } else {
2892     /* add rtpjitterbuffer src pad to pads */
2893     GstElementClass *klass;
2894     GstPadTemplate *templ;
2895     gchar *padname;
2896     GstPad *gpad, *pad;
2897
2898     pad = gst_element_get_static_pad (stream->buffer, "src");
2899
2900     /* ghost the pad to the parent */
2901     klass = GST_ELEMENT_GET_CLASS (rtpbin);
2902     templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
2903     padname = g_strdup_printf ("recv_rtp_src_%u_%u_%u",
2904         stream->session->id, stream->ssrc, 255);
2905     gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
2906     g_free (padname);
2907
2908     gst_pad_set_active (gpad, TRUE);
2909     gst_pad_sticky_events_foreach (pad, copy_sticky_events, gpad);
2910     gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
2911
2912     gst_object_unref (pad);
2913   }
2914
2915   GST_RTP_SESSION_UNLOCK (session);
2916   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2917
2918   return;
2919
2920   /* ERRORS */
2921 shutdown:
2922   {
2923     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
2924     return;
2925   }
2926 no_stream:
2927   {
2928     GST_RTP_SESSION_UNLOCK (session);
2929     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
2930     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
2931     return;
2932   }
2933 }
2934
2935 static gboolean
2936 complete_session_sink (GstRtpBin * rtpbin, GstRtpBinSession * session)
2937 {
2938   gchar *gname;
2939   guint sessid = session->id;
2940   GstPad *recv_rtp_sink;
2941   GstElement *decoder;
2942   GstElementClass *klass;
2943   GstPadTemplate *templ;
2944
2945   /* get recv_rtp pad and store */
2946   session->recv_rtp_sink =
2947       gst_element_get_request_pad (session->session, "recv_rtp_sink");
2948   if (session->recv_rtp_sink == NULL)
2949     goto pad_failed;
2950
2951   g_signal_connect (session->recv_rtp_sink, "notify::caps",
2952       (GCallback) caps_changed, session);
2953
2954   GST_DEBUG_OBJECT (rtpbin, "requesting RTP decoder");
2955   decoder = session_request_element (session, SIGNAL_REQUEST_RTP_DECODER);
2956   if (decoder) {
2957     GstPad *decsrc, *decsink;
2958     GstPadLinkReturn ret;
2959
2960     GST_DEBUG_OBJECT (rtpbin, "linking RTP decoder");
2961     decsink = gst_element_get_static_pad (decoder, "rtp_sink");
2962     if (decsink == NULL)
2963       goto dec_sink_failed;
2964
2965     recv_rtp_sink = decsink;
2966
2967     decsrc = gst_element_get_static_pad (decoder, "rtp_src");
2968     if (decsrc == NULL)
2969       goto dec_src_failed;
2970
2971     ret = gst_pad_link (decsrc, session->recv_rtp_sink);
2972     gst_object_unref (decsrc);
2973
2974     if (ret != GST_PAD_LINK_OK)
2975       goto dec_link_failed;
2976
2977   } else {
2978     GST_DEBUG_OBJECT (rtpbin, "no RTP decoder given");
2979     recv_rtp_sink = gst_object_ref (session->recv_rtp_sink);
2980   }
2981
2982   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2983   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2984   gname = g_strdup_printf ("recv_rtp_sink_%u", sessid);
2985   templ = gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u");
2986   session->recv_rtp_sink_ghost =
2987       gst_ghost_pad_new_from_template (gname, recv_rtp_sink, templ);
2988   gst_object_unref (recv_rtp_sink);
2989   gst_pad_set_active (session->recv_rtp_sink_ghost, TRUE);
2990   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->recv_rtp_sink_ghost);
2991   g_free (gname);
2992
2993   return TRUE;
2994
2995   /* ERRORS */
2996 pad_failed:
2997   {
2998     g_warning ("rtpbin: failed to get session recv_rtp_sink pad");
2999     return FALSE;
3000   }
3001 dec_sink_failed:
3002   {
3003     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
3004     return FALSE;
3005   }
3006 dec_src_failed:
3007   {
3008     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
3009     gst_object_unref (recv_rtp_sink);
3010     return FALSE;
3011   }
3012 dec_link_failed:
3013   {
3014     g_warning ("rtpbin: failed to link rtp decoder for session %d", sessid);
3015     gst_object_unref (recv_rtp_sink);
3016     return FALSE;
3017   }
3018 }
3019
3020 /* Create a pad for receiving RTP for the session in @name. Must be called with
3021  * RTP_BIN_LOCK.
3022  */
3023 static GstPad *
3024 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3025 {
3026   guint sessid;
3027   GstElement *aux;
3028   GstPad *recv_rtp_src;
3029   GstRtpBinSession *session;
3030
3031   /* first get the session number */
3032   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
3033     goto no_name;
3034
3035   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
3036
3037   /* get or create session */
3038   session = find_session_by_id (rtpbin, sessid);
3039   if (!session) {
3040     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
3041     /* create session now */
3042     session = create_session (rtpbin, sessid);
3043     if (session == NULL)
3044       goto create_error;
3045   }
3046
3047   /* check if pad was requested */
3048   if (session->recv_rtp_sink_ghost != NULL)
3049     return session->recv_rtp_sink_ghost;
3050
3051   /* setup the session sink pad */
3052   if (!complete_session_sink (rtpbin, session))
3053     goto session_sink_failed;
3054
3055   session->recv_rtp_src =
3056       gst_element_get_static_pad (session->session, "recv_rtp_src");
3057   if (session->recv_rtp_src == NULL)
3058     goto pad_failed;
3059
3060   /* find out if we need AUX elements or if we can go into the SSRC demuxer
3061    * directly */
3062   aux = session_request_element (session, SIGNAL_REQUEST_AUX_RECEIVER);
3063   if (aux) {
3064     gchar *pname;
3065     GstPad *auxsink;
3066     GstPadLinkReturn ret;
3067
3068     GST_DEBUG_OBJECT (rtpbin, "linking AUX receiver");
3069
3070     pname = g_strdup_printf ("sink_%d", sessid);
3071     auxsink = gst_element_get_static_pad (aux, pname);
3072     g_free (pname);
3073     if (auxsink == NULL)
3074       goto aux_sink_failed;
3075
3076     ret = gst_pad_link (session->recv_rtp_src, auxsink);
3077     gst_object_unref (auxsink);
3078     if (ret != GST_PAD_LINK_OK)
3079       goto aux_link_failed;
3080
3081     /* this can be NULL when this AUX element is not to be linked to
3082      * an SSRC demuxer */
3083     pname = g_strdup_printf ("src_%d", sessid);
3084     recv_rtp_src = gst_element_get_static_pad (aux, pname);
3085     g_free (pname);
3086   } else {
3087     recv_rtp_src = gst_object_ref (session->recv_rtp_src);
3088   }
3089
3090   if (recv_rtp_src) {
3091     GstPad *sinkdpad;
3092
3093     GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
3094     sinkdpad = gst_element_get_static_pad (session->demux, "sink");
3095     GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
3096     gst_pad_link_full (recv_rtp_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3097     gst_object_unref (recv_rtp_src);
3098     gst_object_unref (sinkdpad);
3099
3100     /* connect to the new-ssrc-pad signal of the SSRC demuxer */
3101     session->demux_newpad_sig = g_signal_connect (session->demux,
3102         "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
3103     session->demux_padremoved_sig = g_signal_connect (session->demux,
3104         "removed-ssrc-pad", (GCallback) ssrc_demux_pad_removed, session);
3105   }
3106   return session->recv_rtp_sink_ghost;
3107
3108   /* ERRORS */
3109 no_name:
3110   {
3111     g_warning ("rtpbin: invalid name given");
3112     return NULL;
3113   }
3114 create_error:
3115   {
3116     /* create_session already warned */
3117     return NULL;
3118   }
3119 session_sink_failed:
3120   {
3121     /* warning already done */
3122     return NULL;
3123   }
3124 pad_failed:
3125   {
3126     g_warning ("rtpbin: failed to get session recv_rtp_src pad");
3127     return NULL;
3128   }
3129 aux_sink_failed:
3130   {
3131     g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid);
3132     return NULL;
3133   }
3134 aux_link_failed:
3135   {
3136     g_warning ("rtpbin: failed to link AUX pad to session %d", sessid);
3137     return NULL;
3138   }
3139 }
3140
3141 static void
3142 remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3143 {
3144   if (session->demux_newpad_sig) {
3145     g_signal_handler_disconnect (session->demux, session->demux_newpad_sig);
3146     session->demux_newpad_sig = 0;
3147   }
3148   if (session->demux_padremoved_sig) {
3149     g_signal_handler_disconnect (session->demux, session->demux_padremoved_sig);
3150     session->demux_padremoved_sig = 0;
3151   }
3152   if (session->recv_rtp_src) {
3153     gst_object_unref (session->recv_rtp_src);
3154     session->recv_rtp_src = NULL;
3155   }
3156   if (session->recv_rtp_sink) {
3157     gst_element_release_request_pad (session->session, session->recv_rtp_sink);
3158     gst_object_unref (session->recv_rtp_sink);
3159     session->recv_rtp_sink = NULL;
3160   }
3161   if (session->recv_rtp_sink_ghost) {
3162     gst_pad_set_active (session->recv_rtp_sink_ghost, FALSE);
3163     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3164         session->recv_rtp_sink_ghost);
3165     session->recv_rtp_sink_ghost = NULL;
3166   }
3167 }
3168
3169 /* Create a pad for receiving RTCP for the session in @name. Must be called with
3170  * RTP_BIN_LOCK.
3171  */
3172 static GstPad *
3173 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
3174     const gchar * name)
3175 {
3176   guint sessid;
3177   GstElement *decoder;
3178   GstRtpBinSession *session;
3179   GstPad *sinkdpad, *decsink;
3180
3181   /* first get the session number */
3182   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
3183     goto no_name;
3184
3185   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
3186
3187   /* get or create the session */
3188   session = find_session_by_id (rtpbin, sessid);
3189   if (!session) {
3190     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
3191     /* create session now */
3192     session = create_session (rtpbin, sessid);
3193     if (session == NULL)
3194       goto create_error;
3195   }
3196
3197   /* check if pad was requested */
3198   if (session->recv_rtcp_sink_ghost != NULL)
3199     return session->recv_rtcp_sink_ghost;
3200
3201   /* get recv_rtp pad and store */
3202   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
3203   session->recv_rtcp_sink =
3204       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
3205   if (session->recv_rtcp_sink == NULL)
3206     goto pad_failed;
3207
3208   GST_DEBUG_OBJECT (rtpbin, "getting RTCP decoder");
3209   decoder = session_request_element (session, SIGNAL_REQUEST_RTCP_DECODER);
3210   if (decoder) {
3211     GstPad *decsrc;
3212     GstPadLinkReturn ret;
3213
3214     GST_DEBUG_OBJECT (rtpbin, "linking RTCP decoder");
3215     decsink = gst_element_get_static_pad (decoder, "rtcp_sink");
3216     decsrc = gst_element_get_static_pad (decoder, "rtcp_src");
3217
3218     if (decsink == NULL)
3219       goto dec_sink_failed;
3220
3221     if (decsrc == NULL)
3222       goto dec_src_failed;
3223
3224     ret = gst_pad_link (decsrc, session->recv_rtcp_sink);
3225     gst_object_unref (decsrc);
3226
3227     if (ret != GST_PAD_LINK_OK)
3228       goto dec_link_failed;
3229   } else {
3230     GST_DEBUG_OBJECT (rtpbin, "no RTCP decoder given");
3231     decsink = gst_object_ref (session->recv_rtcp_sink);
3232   }
3233
3234   /* get srcpad, link to SSRCDemux */
3235   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
3236   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
3237   if (session->sync_src == NULL)
3238     goto src_pad_failed;
3239
3240   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
3241   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
3242   gst_pad_link_full (session->sync_src, sinkdpad, GST_PAD_LINK_CHECK_NOTHING);
3243   gst_object_unref (sinkdpad);
3244
3245   session->recv_rtcp_sink_ghost =
3246       gst_ghost_pad_new_from_template (name, decsink, templ);
3247   gst_object_unref (decsink);
3248   gst_pad_set_active (session->recv_rtcp_sink_ghost, TRUE);
3249   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin),
3250       session->recv_rtcp_sink_ghost);
3251
3252   return session->recv_rtcp_sink_ghost;
3253
3254   /* ERRORS */
3255 no_name:
3256   {
3257     g_warning ("rtpbin: invalid name given");
3258     return NULL;
3259   }
3260 create_error:
3261   {
3262     /* create_session already warned */
3263     return NULL;
3264   }
3265 pad_failed:
3266   {
3267     g_warning ("rtpbin: failed to get session rtcp_sink pad");
3268     return NULL;
3269   }
3270 dec_sink_failed:
3271   {
3272     g_warning ("rtpbin: failed to get decoder sink pad for session %d", sessid);
3273     return NULL;
3274   }
3275 dec_src_failed:
3276   {
3277     g_warning ("rtpbin: failed to get decoder src pad for session %d", sessid);
3278     gst_object_unref (decsink);
3279     return NULL;
3280   }
3281 dec_link_failed:
3282   {
3283     g_warning ("rtpbin: failed to link rtcp decoder for session %d", sessid);
3284     gst_object_unref (decsink);
3285     return NULL;
3286   }
3287 src_pad_failed:
3288   {
3289     g_warning ("rtpbin: failed to get session sync_src pad");
3290     gst_object_unref (decsink);
3291     return NULL;
3292   }
3293 }
3294
3295 static void
3296 remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3297 {
3298   if (session->recv_rtcp_sink_ghost) {
3299     gst_pad_set_active (session->recv_rtcp_sink_ghost, FALSE);
3300     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3301         session->recv_rtcp_sink_ghost);
3302     session->recv_rtcp_sink_ghost = NULL;
3303   }
3304   if (session->sync_src) {
3305     /* releasing the request pad should also unref the sync pad */
3306     gst_object_unref (session->sync_src);
3307     session->sync_src = NULL;
3308   }
3309   if (session->recv_rtcp_sink) {
3310     gst_element_release_request_pad (session->session, session->recv_rtcp_sink);
3311     gst_object_unref (session->recv_rtcp_sink);
3312     session->recv_rtcp_sink = NULL;
3313   }
3314 }
3315
3316 static gboolean
3317 complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session)
3318 {
3319   gchar *gname;
3320   guint sessid = session->id;
3321   GstPad *send_rtp_src;
3322   GstElement *encoder;
3323   GstElementClass *klass;
3324   GstPadTemplate *templ;
3325
3326   /* get srcpad */
3327   session->send_rtp_src =
3328       gst_element_get_static_pad (session->session, "send_rtp_src");
3329   if (session->send_rtp_src == NULL)
3330     goto no_srcpad;
3331
3332   GST_DEBUG_OBJECT (rtpbin, "getting RTP encoder");
3333   encoder = session_request_element (session, SIGNAL_REQUEST_RTP_ENCODER);
3334   if (encoder) {
3335     gchar *ename;
3336     GstPad *encsrc, *encsink;
3337     GstPadLinkReturn ret;
3338
3339     GST_DEBUG_OBJECT (rtpbin, "linking RTP encoder");
3340     ename = g_strdup_printf ("rtp_src_%d", sessid);
3341     encsrc = gst_element_get_static_pad (encoder, ename);
3342     g_free (ename);
3343
3344     if (encsrc == NULL)
3345       goto enc_src_failed;
3346
3347     send_rtp_src = encsrc;
3348
3349     ename = g_strdup_printf ("rtp_sink_%d", sessid);
3350     encsink = gst_element_get_static_pad (encoder, ename);
3351     g_free (ename);
3352     if (encsink == NULL)
3353       goto enc_sink_failed;
3354
3355     ret = gst_pad_link (session->send_rtp_src, encsink);
3356     gst_object_unref (encsink);
3357
3358     if (ret != GST_PAD_LINK_OK)
3359       goto enc_link_failed;
3360   } else {
3361     GST_DEBUG_OBJECT (rtpbin, "no RTP encoder given");
3362     send_rtp_src = gst_object_ref (session->send_rtp_src);
3363   }
3364
3365   /* ghost the new source pad */
3366   klass = GST_ELEMENT_GET_CLASS (rtpbin);
3367   gname = g_strdup_printf ("send_rtp_src_%u", sessid);
3368   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%u");
3369   session->send_rtp_src_ghost =
3370       gst_ghost_pad_new_from_template (gname, send_rtp_src, templ);
3371   gst_object_unref (send_rtp_src);
3372   gst_pad_set_active (session->send_rtp_src_ghost, TRUE);
3373   gst_pad_sticky_events_foreach (send_rtp_src, copy_sticky_events,
3374       session->send_rtp_src_ghost);
3375   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_src_ghost);
3376   g_free (gname);
3377
3378   return TRUE;
3379
3380   /* ERRORS */
3381 no_srcpad:
3382   {
3383     g_warning ("rtpbin: failed to get rtp source pad for session %d", sessid);
3384     return FALSE;
3385   }
3386 enc_src_failed:
3387   {
3388     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3389     return FALSE;
3390   }
3391 enc_sink_failed:
3392   {
3393     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3394     gst_object_unref (send_rtp_src);
3395     return FALSE;
3396   }
3397 enc_link_failed:
3398   {
3399     g_warning ("rtpbin: failed to link rtp encoder for session %d", sessid);
3400     gst_object_unref (send_rtp_src);
3401     return FALSE;
3402   }
3403 }
3404
3405 static gboolean
3406 setup_aux_sender_fold (const GValue * item, GValue * result, gpointer user_data)
3407 {
3408   GstPad *pad;
3409   gchar *name;
3410   guint sessid;
3411   GstRtpBinSession *session = user_data, *newsess;
3412   GstRtpBin *rtpbin = session->bin;
3413   GstPadLinkReturn ret;
3414
3415   pad = g_value_get_object (item);
3416   name = gst_pad_get_name (pad);
3417
3418   if (name == NULL || sscanf (name, "src_%u", &sessid) != 1)
3419     goto no_name;
3420
3421   g_free (name);
3422
3423   newsess = find_session_by_id (rtpbin, sessid);
3424   if (newsess == NULL) {
3425     /* create new session */
3426     newsess = create_session (rtpbin, sessid);
3427     if (newsess == NULL)
3428       goto create_error;
3429   } else if (newsess->send_rtp_sink != NULL)
3430     goto existing_session;
3431
3432   /* get send_rtp pad and store */
3433   newsess->send_rtp_sink =
3434       gst_element_get_request_pad (newsess->session, "send_rtp_sink");
3435   if (newsess->send_rtp_sink == NULL)
3436     goto pad_failed;
3437
3438   ret = gst_pad_link (pad, newsess->send_rtp_sink);
3439   if (ret != GST_PAD_LINK_OK)
3440     goto aux_link_failed;
3441
3442   if (!complete_session_src (rtpbin, newsess))
3443     goto session_src_failed;
3444
3445   return TRUE;
3446
3447   /* ERRORS */
3448 no_name:
3449   {
3450     GST_WARNING ("ignoring invalid pad name %s", GST_STR_NULL (name));
3451     g_free (name);
3452     return TRUE;
3453   }
3454 create_error:
3455   {
3456     /* create_session already warned */
3457     return FALSE;
3458   }
3459 existing_session:
3460   {
3461     g_warning ("rtpbin: session %d is already a sender", sessid);
3462     return FALSE;
3463   }
3464 pad_failed:
3465   {
3466     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
3467     return FALSE;
3468   }
3469 aux_link_failed:
3470   {
3471     g_warning ("rtpbin: failed to link AUX for session %d", sessid);
3472     return FALSE;
3473   }
3474 session_src_failed:
3475   {
3476     g_warning ("rtpbin: failed to complete AUX for session %d", sessid);
3477     return FALSE;
3478   }
3479 }
3480
3481 static gboolean
3482 setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
3483     GstElement * aux)
3484 {
3485   GstIterator *it;
3486   GValue result = { 0, };
3487   GstIteratorResult res;
3488
3489   it = gst_element_iterate_src_pads (aux);
3490   res = gst_iterator_fold (it, setup_aux_sender_fold, &result, session);
3491   gst_iterator_free (it);
3492
3493   return res == GST_ITERATOR_DONE;
3494 }
3495
3496 /* Create a pad for sending RTP for the session in @name. Must be called with
3497  * RTP_BIN_LOCK.
3498  */
3499 static GstPad *
3500 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3501 {
3502   gchar *pname;
3503   guint sessid;
3504   GstPad *send_rtp_sink;
3505   GstElement *aux;
3506   GstRtpBinSession *session;
3507
3508   /* first get the session number */
3509   if (name == NULL || sscanf (name, "send_rtp_sink_%u", &sessid) != 1)
3510     goto no_name;
3511
3512   /* get or create session */
3513   session = find_session_by_id (rtpbin, sessid);
3514   if (!session) {
3515     /* create session now */
3516     session = create_session (rtpbin, sessid);
3517     if (session == NULL)
3518       goto create_error;
3519   }
3520
3521   /* check if pad was requested */
3522   if (session->send_rtp_sink_ghost != NULL)
3523     return session->send_rtp_sink_ghost;
3524
3525   /* check if we are already using this session as a sender */
3526   if (session->send_rtp_sink != NULL)
3527     goto existing_session;
3528
3529   GST_DEBUG_OBJECT (rtpbin, "getting RTP AUX sender");
3530   aux = session_request_element (session, SIGNAL_REQUEST_AUX_SENDER);
3531   if (aux) {
3532     GST_DEBUG_OBJECT (rtpbin, "linking AUX sender");
3533     if (!setup_aux_sender (rtpbin, session, aux))
3534       goto aux_session_failed;
3535
3536     pname = g_strdup_printf ("sink_%d", sessid);
3537     send_rtp_sink = gst_element_get_static_pad (aux, pname);
3538     g_free (pname);
3539
3540     if (send_rtp_sink == NULL)
3541       goto aux_sink_failed;
3542   } else {
3543     /* get send_rtp pad and store */
3544     session->send_rtp_sink =
3545         gst_element_get_request_pad (session->session, "send_rtp_sink");
3546     if (session->send_rtp_sink == NULL)
3547       goto pad_failed;
3548
3549     if (!complete_session_src (rtpbin, session))
3550       goto session_src_failed;
3551
3552     send_rtp_sink = gst_object_ref (session->send_rtp_sink);
3553   }
3554
3555   session->send_rtp_sink_ghost =
3556       gst_ghost_pad_new_from_template (name, send_rtp_sink, templ);
3557   gst_object_unref (send_rtp_sink);
3558   gst_pad_set_active (session->send_rtp_sink_ghost, TRUE);
3559   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtp_sink_ghost);
3560
3561   return session->send_rtp_sink_ghost;
3562
3563   /* ERRORS */
3564 no_name:
3565   {
3566     g_warning ("rtpbin: invalid name given");
3567     return NULL;
3568   }
3569 create_error:
3570   {
3571     /* create_session already warned */
3572     return NULL;
3573   }
3574 existing_session:
3575   {
3576     g_warning ("rtpbin: session %d is already in use", sessid);
3577     return NULL;
3578   }
3579 aux_session_failed:
3580   {
3581     g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid);
3582     return NULL;
3583   }
3584 aux_sink_failed:
3585   {
3586     g_warning ("rtpbin: failed to get AUX sink pad for session %d", sessid);
3587     return NULL;
3588   }
3589 pad_failed:
3590   {
3591     g_warning ("rtpbin: failed to get session pad for session %d", sessid);
3592     return NULL;
3593   }
3594 session_src_failed:
3595   {
3596     g_warning ("rtpbin: failed to setup source pads for session %d", sessid);
3597     return NULL;
3598   }
3599 }
3600
3601 static void
3602 remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3603 {
3604   if (session->send_rtp_src_ghost) {
3605     gst_pad_set_active (session->send_rtp_src_ghost, FALSE);
3606     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3607         session->send_rtp_src_ghost);
3608     session->send_rtp_src_ghost = NULL;
3609   }
3610   if (session->send_rtp_src) {
3611     gst_object_unref (session->send_rtp_src);
3612     session->send_rtp_src = NULL;
3613   }
3614   if (session->send_rtp_sink) {
3615     gst_element_release_request_pad (GST_ELEMENT_CAST (session->session),
3616         session->send_rtp_sink);
3617     gst_object_unref (session->send_rtp_sink);
3618     session->send_rtp_sink = NULL;
3619   }
3620   if (session->send_rtp_sink_ghost) {
3621     gst_pad_set_active (session->send_rtp_sink_ghost, FALSE);
3622     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3623         session->send_rtp_sink_ghost);
3624     session->send_rtp_sink_ghost = NULL;
3625   }
3626 }
3627
3628 /* Create a pad for sending RTCP for the session in @name. Must be called with
3629  * RTP_BIN_LOCK.
3630  */
3631 static GstPad *
3632 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
3633 {
3634   guint sessid;
3635   GstPad *encsrc;
3636   GstElement *encoder;
3637   GstRtpBinSession *session;
3638
3639   /* first get the session number */
3640   if (name == NULL || sscanf (name, "send_rtcp_src_%u", &sessid) != 1)
3641     goto no_name;
3642
3643   /* get or create session */
3644   session = find_session_by_id (rtpbin, sessid);
3645   if (!session)
3646     goto no_session;
3647
3648   /* check if pad was requested */
3649   if (session->send_rtcp_src_ghost != NULL)
3650     return session->send_rtcp_src_ghost;
3651
3652   /* get rtcp_src pad and store */
3653   session->send_rtcp_src =
3654       gst_element_get_request_pad (session->session, "send_rtcp_src");
3655   if (session->send_rtcp_src == NULL)
3656     goto pad_failed;
3657
3658   GST_DEBUG_OBJECT (rtpbin, "getting RTCP encoder");
3659   encoder = session_request_element (session, SIGNAL_REQUEST_RTCP_ENCODER);
3660   if (encoder) {
3661     gchar *ename;
3662     GstPad *encsink;
3663     GstPadLinkReturn ret;
3664
3665     GST_DEBUG_OBJECT (rtpbin, "linking RTCP encoder");
3666
3667     ename = g_strdup_printf ("rtcp_src_%d", sessid);
3668     encsrc = gst_element_get_static_pad (encoder, ename);
3669     g_free (ename);
3670     if (encsrc == NULL)
3671       goto enc_src_failed;
3672
3673     ename = g_strdup_printf ("rtcp_sink_%d", sessid);
3674     encsink = gst_element_get_static_pad (encoder, ename);
3675     g_free (ename);
3676     if (encsink == NULL)
3677       goto enc_sink_failed;
3678
3679     ret = gst_pad_link (session->send_rtcp_src, encsink);
3680     gst_object_unref (encsink);
3681
3682     if (ret != GST_PAD_LINK_OK)
3683       goto enc_link_failed;
3684   } else {
3685     GST_DEBUG_OBJECT (rtpbin, "no RTCP encoder given");
3686     encsrc = gst_object_ref (session->send_rtcp_src);
3687   }
3688
3689   session->send_rtcp_src_ghost =
3690       gst_ghost_pad_new_from_template (name, encsrc, templ);
3691   gst_object_unref (encsrc);
3692   gst_pad_set_active (session->send_rtcp_src_ghost, TRUE);
3693   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), session->send_rtcp_src_ghost);
3694
3695   return session->send_rtcp_src_ghost;
3696
3697   /* ERRORS */
3698 no_name:
3699   {
3700     g_warning ("rtpbin: invalid name given");
3701     return NULL;
3702   }
3703 no_session:
3704   {
3705     g_warning ("rtpbin: session with id %d does not exist", sessid);
3706     return NULL;
3707   }
3708 pad_failed:
3709   {
3710     g_warning ("rtpbin: failed to get rtcp pad for session %d", sessid);
3711     return NULL;
3712   }
3713 enc_src_failed:
3714   {
3715     g_warning ("rtpbin: failed to get encoder src pad for session %d", sessid);
3716     return NULL;
3717   }
3718 enc_sink_failed:
3719   {
3720     g_warning ("rtpbin: failed to get encoder sink pad for session %d", sessid);
3721     gst_object_unref (encsrc);
3722     return NULL;
3723   }
3724 enc_link_failed:
3725   {
3726     g_warning ("rtpbin: failed to link rtcp encoder for session %d", sessid);
3727     gst_object_unref (encsrc);
3728     return NULL;
3729   }
3730 }
3731
3732 static void
3733 remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session)
3734 {
3735   if (session->send_rtcp_src_ghost) {
3736     gst_pad_set_active (session->send_rtcp_src_ghost, FALSE);
3737     gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin),
3738         session->send_rtcp_src_ghost);
3739     session->send_rtcp_src_ghost = NULL;
3740   }
3741   if (session->send_rtcp_src) {
3742     gst_element_release_request_pad (session->session, session->send_rtcp_src);
3743     gst_object_unref (session->send_rtcp_src);
3744     session->send_rtcp_src = NULL;
3745   }
3746 }
3747
3748 /* If the requested name is NULL we should create a name with
3749  * the session number assuming we want the lowest posible session
3750  * with a free pad like the template */
3751 static gchar *
3752 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
3753 {
3754   gboolean name_found = FALSE;
3755   gint session = 0;
3756   GstIterator *pad_it = NULL;
3757   gchar *pad_name = NULL;
3758   GValue data = { 0, };
3759
3760   GST_DEBUG_OBJECT (element, "find a free pad name for template");
3761   while (!name_found) {
3762     gboolean done = FALSE;
3763
3764     g_free (pad_name);
3765     pad_name = g_strdup_printf (templ->name_template, session++);
3766     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
3767     name_found = TRUE;
3768     while (!done) {
3769       switch (gst_iterator_next (pad_it, &data)) {
3770         case GST_ITERATOR_OK:
3771         {
3772           GstPad *pad;
3773           gchar *name;
3774
3775           pad = g_value_get_object (&data);
3776           name = gst_pad_get_name (pad);
3777
3778           if (strcmp (name, pad_name) == 0) {
3779             done = TRUE;
3780             name_found = FALSE;
3781           }
3782           g_free (name);
3783           g_value_reset (&data);
3784           break;
3785         }
3786         case GST_ITERATOR_ERROR:
3787         case GST_ITERATOR_RESYNC:
3788           /* restart iteration */
3789           done = TRUE;
3790           name_found = FALSE;
3791           session = 0;
3792           break;
3793         case GST_ITERATOR_DONE:
3794           done = TRUE;
3795           break;
3796       }
3797     }
3798     g_value_unset (&data);
3799     gst_iterator_free (pad_it);
3800   }
3801
3802   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
3803   return pad_name;
3804 }
3805
3806 /*
3807  */
3808 static GstPad *
3809 gst_rtp_bin_request_new_pad (GstElement * element,
3810     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3811 {
3812   GstRtpBin *rtpbin;
3813   GstElementClass *klass;
3814   GstPad *result;
3815
3816   gchar *pad_name = NULL;
3817
3818   g_return_val_if_fail (templ != NULL, NULL);
3819   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
3820
3821   rtpbin = GST_RTP_BIN (element);
3822   klass = GST_ELEMENT_GET_CLASS (element);
3823
3824   GST_RTP_BIN_LOCK (rtpbin);
3825
3826   if (name == NULL) {
3827     /* use a free pad name */
3828     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
3829   } else {
3830     /* use the provided name */
3831     pad_name = g_strdup (name);
3832   }
3833
3834   GST_DEBUG_OBJECT (rtpbin, "Trying to request a pad with name %s", pad_name);
3835
3836   /* figure out the template */
3837   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
3838     result = create_recv_rtp (rtpbin, templ, pad_name);
3839   } else if (templ == gst_element_class_get_pad_template (klass,
3840           "recv_rtcp_sink_%u")) {
3841     result = create_recv_rtcp (rtpbin, templ, pad_name);
3842   } else if (templ == gst_element_class_get_pad_template (klass,
3843           "send_rtp_sink_%u")) {
3844     result = create_send_rtp (rtpbin, templ, pad_name);
3845   } else if (templ == gst_element_class_get_pad_template (klass,
3846           "send_rtcp_src_%u")) {
3847     result = create_rtcp (rtpbin, templ, pad_name);
3848   } else
3849     goto wrong_template;
3850
3851   g_free (pad_name);
3852   GST_RTP_BIN_UNLOCK (rtpbin);
3853
3854   return result;
3855
3856   /* ERRORS */
3857 wrong_template:
3858   {
3859     g_free (pad_name);
3860     GST_RTP_BIN_UNLOCK (rtpbin);
3861     g_warning ("rtpbin: this is not our template");
3862     return NULL;
3863   }
3864 }
3865
3866 static void
3867 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
3868 {
3869   GstRtpBinSession *session;
3870   GstRtpBin *rtpbin;
3871
3872   g_return_if_fail (GST_IS_GHOST_PAD (pad));
3873   g_return_if_fail (GST_IS_RTP_BIN (element));
3874
3875   rtpbin = GST_RTP_BIN (element);
3876
3877   GST_RTP_BIN_LOCK (rtpbin);
3878   GST_DEBUG_OBJECT (rtpbin, "Trying to release pad %s:%s",
3879       GST_DEBUG_PAD_NAME (pad));
3880
3881   if (!(session = find_session_by_pad (rtpbin, pad)))
3882     goto unknown_pad;
3883
3884   if (session->recv_rtp_sink_ghost == pad) {
3885     remove_recv_rtp (rtpbin, session);
3886   } else if (session->recv_rtcp_sink_ghost == pad) {
3887     remove_recv_rtcp (rtpbin, session);
3888   } else if (session->send_rtp_sink_ghost == pad) {
3889     remove_send_rtp (rtpbin, session);
3890   } else if (session->send_rtcp_src_ghost == pad) {
3891     remove_rtcp (rtpbin, session);
3892   }
3893
3894   /* no more request pads, free the complete session */
3895   if (session->recv_rtp_sink_ghost == NULL
3896       && session->recv_rtcp_sink_ghost == NULL
3897       && session->send_rtp_sink_ghost == NULL
3898       && session->send_rtcp_src_ghost == NULL) {
3899     GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session);
3900     rtpbin->sessions = g_slist_remove (rtpbin->sessions, session);
3901     free_session (session, rtpbin);
3902   }
3903   GST_RTP_BIN_UNLOCK (rtpbin);
3904
3905   return;
3906
3907   /* ERROR */
3908 unknown_pad:
3909   {
3910     GST_RTP_BIN_UNLOCK (rtpbin);
3911     g_warning ("rtpbin: %s:%s is not one of our request pads",
3912         GST_DEBUG_PAD_NAME (pad));
3913     return;
3914   }
3915 }