gst/rtpmanager/gstrtpbin.c: Fix deadlock when shutting down, use a new lock instead...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpbin
22  * @short_description: handle media from one RTP bin
23  * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
24  *
25  * <refsect2>
26  * <para>
27  * RTP bin combines the functions of gstrtpsession, gstrtpssrcdemux, gstrtpjitterbuffer
28  * and gstrtpptdemux in one element. It allows for multiple RTP sessions that will
29  * be synchronized together using RTCP SR packets.
30  * </para>
31  * <para>
32  * gstrtpbin is configured with a number of request pads that define the
33  * functionality that is activated, similar to the gstrtpsession element.
34  * </para>
35  * <para>
36  * To use gstrtpbin as an RTP receiver, request a recv_rtp_sink_%%d pad. The session
37  * number must be specified in the pad name. 
38  * Data received on the recv_rtp_sink_%%d pad will be processed in the gstrtpsession
39  * manager and after being validated forwarded on gstrtpssrcdemuxer element. Each
40  * RTP stream is demuxed based on the SSRC and send to a gstrtpjitterbuffer. After
41  * the packets are released from the jitterbuffer, they will be forwarded to a
42  * gstrtpptdemuxer element. The gstrtpptdemuxer element will demux the packets based
43  * on the payload type and will create a unique pad recv_rtp_src_%%d_%%d_%%d on
44  * gstrtpbin with the session number, SSRC and payload type respectively as the pad
45  * name.
46  * </para>
47  * <para>
48  * To also use gstrtpbin as an RTCP receiver, request a recv_rtcp_sink_%%d pad. The
49  * session number must be specified in the pad name.
50  * </para>
51  * <para>
52  * If you want the session manager to generate and send RTCP packets, request
53  * the send_rtcp_src_%%d pad with the session number in the pad name. Packet pushed
54  * on this pad contain SR/RR RTCP reports that should be sent to all participants
55  * in the session.
56  * </para>
57  * <para>
58  * To use gstrtpbin as a sender, request a send_rtp_sink_%%d pad, which will
59  * automatically create a send_rtp_src_%%d pad. If the session number is not provided,
60  * the pad from the lowest available session will be returned. The session manager will modify the
61  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
62  * send_rtp_src_%%d pad after updating its internal state.
63  * </para>
64  * <para>
65  * The session manager needs the clock-rate of the payload types it is handling
66  * and will signal the GstRtpSession::request-pt-map signal when it needs such a
67  * mapping. One can clear the cached values with the GstRtpSession::clear-pt-map
68  * signal.
69  * </para>
70  * <title>Example pipelines</title>
71  * <para>
72  * <programlisting>
73  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
74  *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
75  * </programlisting>
76  * Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
77  * </para>
78  * <para>
79  * <programlisting>
80  * gst-launch gstrtpbin name=rtpbin \
81  *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
82  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
83  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
84  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
85  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
86  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
87  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
88  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
89  * </programlisting>
90  * Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
91  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
92  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
93  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
94  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
95  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
96  * is received on port 5007. Since RTCP packets from the sender should be sent
97  * as soon as possible and do not participate in preroll, sync=false and 
98  * async=false is configured on udpsink
99  * </para>
100  * <para>
101  * <programlisting>
102  *  gst-launch -v gstrtpbin name=rtpbin                                          \
103  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
104  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
105  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
106  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
107  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
108  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
109  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
110  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
111  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
112  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
113  * </programlisting>
114  * Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
115  * decode and display the video.
116  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
117  * decode and play the audio.
118  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
119  * session 1 on port 5003. These packets will be used for session management and
120  * synchronisation.
121  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
122  * on port 5007.
123  * </para>
124  * </refsect2>
125  *
126  * Last reviewed on 2007-08-30 (0.10.6)
127  */
128
129 #ifdef HAVE_CONFIG_H
130 #include "config.h"
131 #endif
132 #include <string.h>
133
134 #include <gst/rtp/gstrtpbuffer.h>
135 #include <gst/rtp/gstrtcpbuffer.h>
136
137 #include "gstrtpbin-marshal.h"
138 #include "gstrtpbin.h"
139
140 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
141 #define GST_CAT_DEFAULT gst_rtp_bin_debug
142
143 /* elementfactory information */
144 static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
145     "Filter/Network/RTP",
146     "Implement an RTP bin",
147     "Wim Taymans <wim.taymans@gmail.com>");
148
149 /* sink pads */
150 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
152     GST_PAD_SINK,
153     GST_PAD_REQUEST,
154     GST_STATIC_CAPS ("application/x-rtp")
155     );
156
157 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
158 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
159     GST_PAD_SINK,
160     GST_PAD_REQUEST,
161     GST_STATIC_CAPS ("application/x-rtcp")
162     );
163
164 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
165 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
166     GST_PAD_SINK,
167     GST_PAD_REQUEST,
168     GST_STATIC_CAPS ("application/x-rtp")
169     );
170
171 /* src pads */
172 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
173 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
174     GST_PAD_SRC,
175     GST_PAD_SOMETIMES,
176     GST_STATIC_CAPS ("application/x-rtp")
177     );
178
179 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
180 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
181     GST_PAD_SRC,
182     GST_PAD_REQUEST,
183     GST_STATIC_CAPS ("application/x-rtcp")
184     );
185
186 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
187 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
188     GST_PAD_SRC,
189     GST_PAD_SOMETIMES,
190     GST_STATIC_CAPS ("application/x-rtp")
191     );
192
193 /* padtemplate for the internal pad */
194 static GstStaticPadTemplate rtpbin_sync_sink_template =
195 GST_STATIC_PAD_TEMPLATE ("sink_%d",
196     GST_PAD_SINK,
197     GST_PAD_SOMETIMES,
198     GST_STATIC_CAPS ("application/x-rtcp")
199     );
200
201 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
202    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
203
204 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock ((bin)->priv->bin_lock)
205 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)
206
207 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
208 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock ((bin)->priv->dyn_lock)
209 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock ((bin)->priv->dyn_lock)
210
211 /* lock for shutdown */
212 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
213 G_STMT_START {                                   \
214   if (g_atomic_int_get (&bin->priv->shutdown))   \
215     goto label;                                  \
216   GST_RTP_BIN_DYN_LOCK (bin);                    \
217   if (g_atomic_int_get (&bin->priv->shutdown)) { \
218     GST_RTP_BIN_DYN_UNLOCK (bin);                \
219     goto label;                                  \
220   }                                              \
221 } G_STMT_END
222
223 /* unlock for shutdown */
224 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
225   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
226
227 struct _GstRtpBinPrivate
228 {
229   GMutex *bin_lock;
230
231   /* lock protecting dynamic adding/removing */
232   GMutex *dyn_lock;
233
234   /* the time when we went to playing */
235   GstClockTime ntp_ns_base;
236
237   /* if we are shutting down or not */
238   gint shutdown;
239 };
240
241 /* signals and args */
242 enum
243 {
244   SIGNAL_REQUEST_PT_MAP,
245   SIGNAL_CLEAR_PT_MAP,
246
247   SIGNAL_ON_NEW_SSRC,
248   SIGNAL_ON_SSRC_COLLISION,
249   SIGNAL_ON_SSRC_VALIDATED,
250   SIGNAL_ON_SSRC_ACTIVE,
251   SIGNAL_ON_SSRC_SDES,
252   SIGNAL_ON_BYE_SSRC,
253   SIGNAL_ON_BYE_TIMEOUT,
254   SIGNAL_ON_TIMEOUT,
255   LAST_SIGNAL
256 };
257
258 #define DEFAULT_LATENCY_MS           200
259 #define DEFAULT_SDES_CNAME           NULL
260 #define DEFAULT_SDES_NAME            NULL
261 #define DEFAULT_SDES_EMAIL           NULL
262 #define DEFAULT_SDES_PHONE           NULL
263 #define DEFAULT_SDES_LOCATION        NULL
264 #define DEFAULT_SDES_TOOL            NULL
265 #define DEFAULT_SDES_NOTE            NULL
266 #define DEFAULT_DO_LOST              FALSE
267
268 enum
269 {
270   PROP_0,
271   PROP_LATENCY,
272   PROP_SDES_CNAME,
273   PROP_SDES_NAME,
274   PROP_SDES_EMAIL,
275   PROP_SDES_PHONE,
276   PROP_SDES_LOCATION,
277   PROP_SDES_TOOL,
278   PROP_SDES_NOTE,
279   PROP_DO_LOST,
280   PROP_LAST
281 };
282
283 /* helper objects */
284 typedef struct _GstRtpBinSession GstRtpBinSession;
285 typedef struct _GstRtpBinStream GstRtpBinStream;
286 typedef struct _GstRtpBinClient GstRtpBinClient;
287
288 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
289
290 static GstCaps *pt_map_requested (GstElement * element, guint pt,
291     GstRtpBinSession * session);
292 static const gchar *sdes_type_to_name (GstRTCPSDESType type);
293 static void gst_rtp_bin_set_sdes_string (GstRtpBin * bin,
294     GstRTCPSDESType type, const gchar * data);
295
296 static void free_stream (GstRtpBinStream * stream);
297
298 /* Manages the RTP stream for one SSRC.
299  *
300  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
301  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
302  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
303  * together (see below).
304  */
305 struct _GstRtpBinStream
306 {
307   /* the SSRC of this stream */
308   guint32 ssrc;
309
310   /* parent bin */
311   GstRtpBin *bin;
312
313   /* the session this SSRC belongs to */
314   GstRtpBinSession *session;
315
316   /* the jitterbuffer of the SSRC */
317   GstElement *buffer;
318
319   /* the PT demuxer of the SSRC */
320   GstElement *demux;
321   gulong demux_newpad_sig;
322   gulong demux_ptreq_sig;
323   gulong demux_pt_change_sig;
324
325   /* the internal pad we use to get RTCP sync messages */
326   GstPad *sync_pad;
327   gboolean have_sync;
328   guint64 last_unix;
329   guint64 last_extrtptime;
330
331   /* mapping to local RTP and NTP time */
332   guint64 local_rtp;
333   guint64 local_unix;
334   gint64 unix_delta;
335
336   /* for lip-sync */
337   guint64 clock_base;
338   guint64 clock_base_time;
339   gint clock_rate;
340   gint64 ts_offset;
341   gint64 prev_ts_offset;
342   gint last_pt;
343 };
344
345 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
346 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)
347
348 /* Manages the receiving end of the packets.
349  *
350  * There is one such structure for each RTP session (audio/video/...).
351  * We get the RTP/RTCP packets and stuff them into the session manager. From
352  * there they are pushed into an SSRC demuxer that splits the stream based on
353  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
354  * the GstRtpBinStream above).
355  */
356 struct _GstRtpBinSession
357 {
358   /* session id */
359   gint id;
360   /* the parent bin */
361   GstRtpBin *bin;
362   /* the session element */
363   GstElement *session;
364   /* the SSRC demuxer */
365   GstElement *demux;
366   gulong demux_newpad_sig;
367
368   GMutex *lock;
369
370   /* list of GstRtpBinStream */
371   GSList *streams;
372
373   /* mapping of payload type to caps */
374   GHashTable *ptmap;
375
376   /* the pads of the session */
377   GstPad *recv_rtp_sink;
378   GstPad *recv_rtp_src;
379   GstPad *recv_rtcp_sink;
380   GstPad *sync_src;
381   GstPad *send_rtp_sink;
382   GstPad *send_rtp_src;
383   GstPad *send_rtcp_src;
384 };
385
386 /* Manages the RTP streams that come from one client and should therefore be
387  * synchronized.
388  */
389 struct _GstRtpBinClient
390 {
391   /* the common CNAME for the streams */
392   gchar *cname;
393   guint cname_len;
394
395   /* the streams */
396   guint nstreams;
397   GSList *streams;
398
399   gint64 min_delta;
400 };
401
402 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
403 static GstRtpBinSession *
404 find_session_by_id (GstRtpBin * rtpbin, gint id)
405 {
406   GSList *walk;
407
408   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
409     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
410
411     if (sess->id == id)
412       return sess;
413   }
414   return NULL;
415 }
416
417 static void
418 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
419 {
420   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
421       sess->id, ssrc);
422 }
423
424 static void
425 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
426 {
427   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
428       sess->id, ssrc);
429 }
430
431 static void
432 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
433 {
434   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
435       sess->id, ssrc);
436 }
437
438 static void
439 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
440 {
441   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
442       sess->id, ssrc);
443 }
444
445 static void
446 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
447 {
448   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
449       sess->id, ssrc);
450 }
451
452 static void
453 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
454 {
455   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
456       sess->id, ssrc);
457 }
458
459 static void
460 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
461 {
462   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
463       sess->id, ssrc);
464 }
465
466 static void
467 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
468 {
469   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
470       sess->id, ssrc);
471 }
472
473 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
474 static GstRtpBinSession *
475 create_session (GstRtpBin * rtpbin, gint id)
476 {
477   GstRtpBinSession *sess;
478   GstElement *session, *demux;
479   gint i;
480
481   if (!(session = gst_element_factory_make ("gstrtpsession", NULL)))
482     goto no_session;
483
484   if (!(demux = gst_element_factory_make ("gstrtpssrcdemux", NULL)))
485     goto no_demux;
486
487   sess = g_new0 (GstRtpBinSession, 1);
488   sess->lock = g_mutex_new ();
489   sess->id = id;
490   sess->bin = rtpbin;
491   sess->session = session;
492   sess->demux = demux;
493   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
494       (GDestroyNotify) gst_caps_unref);
495   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
496
497   /* set NTP base or new session */
498   g_object_set (session, "ntp-ns-base", rtpbin->priv->ntp_ns_base, NULL);
499   /* configure SDES items */
500   GST_OBJECT_LOCK (rtpbin);
501   for (i = GST_RTCP_SDES_CNAME; i < GST_RTCP_SDES_PRIV; i++) {
502     g_object_set (session, sdes_type_to_name (i), rtpbin->sdes[i], NULL);
503   }
504   GST_OBJECT_UNLOCK (rtpbin);
505
506   /* provide clock_rate to the session manager when needed */
507   g_signal_connect (session, "request-pt-map",
508       (GCallback) pt_map_requested, sess);
509
510   g_signal_connect (sess->session, "on-new-ssrc",
511       (GCallback) on_new_ssrc, sess);
512   g_signal_connect (sess->session, "on-ssrc-collision",
513       (GCallback) on_ssrc_collision, sess);
514   g_signal_connect (sess->session, "on-ssrc-validated",
515       (GCallback) on_ssrc_validated, sess);
516   g_signal_connect (sess->session, "on-ssrc-active",
517       (GCallback) on_ssrc_active, sess);
518   g_signal_connect (sess->session, "on-ssrc-sdes",
519       (GCallback) on_ssrc_sdes, sess);
520   g_signal_connect (sess->session, "on-bye-ssrc",
521       (GCallback) on_bye_ssrc, sess);
522   g_signal_connect (sess->session, "on-bye-timeout",
523       (GCallback) on_bye_timeout, sess);
524   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
525
526   /* FIXME, change state only to what's needed */
527   gst_bin_add (GST_BIN_CAST (rtpbin), session);
528   gst_element_set_state (session, GST_STATE_PLAYING);
529   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
530   gst_element_set_state (demux, GST_STATE_PLAYING);
531
532   return sess;
533
534   /* ERRORS */
535 no_session:
536   {
537     g_warning ("gstrtpbin: could not create gstrtpsession element");
538     return NULL;
539   }
540 no_demux:
541   {
542     gst_object_unref (session);
543     g_warning ("gstrtpbin: could not create gstrtpssrcdemux element");
544     return NULL;
545   }
546 }
547
548 static void
549 free_session (GstRtpBinSession * sess)
550 {
551   GstRtpBin *bin;
552
553   bin = sess->bin;
554
555   gst_element_set_state (sess->session, GST_STATE_NULL);
556   gst_element_set_state (sess->demux, GST_STATE_NULL);
557
558   if (sess->recv_rtp_sink != NULL)
559     gst_element_release_request_pad (sess->session, sess->recv_rtp_sink);
560   if (sess->recv_rtp_src != NULL)
561     gst_object_unref (sess->recv_rtp_src);
562   if (sess->recv_rtcp_sink != NULL)
563     gst_element_release_request_pad (sess->session, sess->recv_rtcp_sink);
564   if (sess->sync_src != NULL)
565     gst_object_unref (sess->sync_src);
566   if (sess->send_rtp_sink != NULL)
567     gst_element_release_request_pad (sess->session, sess->send_rtp_sink);
568   if (sess->send_rtp_src != NULL)
569     gst_object_unref (sess->send_rtp_src);
570   if (sess->send_rtcp_src != NULL)
571     gst_element_release_request_pad (sess->session, sess->send_rtcp_src);
572
573   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
574   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
575
576   g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
577   g_slist_free (sess->streams);
578
579   g_mutex_free (sess->lock);
580   g_hash_table_destroy (sess->ptmap);
581
582   bin->sessions = g_slist_remove (bin->sessions, sess);
583
584   g_free (sess);
585 }
586
587 #if 0
588 static GstRtpBinStream *
589 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
590 {
591   GSList *walk;
592
593   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
594     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
595
596     if (stream->ssrc == ssrc)
597       return stream;
598   }
599   return NULL;
600 }
601 #endif
602
603 /* get the payload type caps for the specific payload @pt in @session */
604 static GstCaps *
605 get_pt_map (GstRtpBinSession * session, guint pt)
606 {
607   GstCaps *caps = NULL;
608   GstRtpBin *bin;
609   GValue ret = { 0 };
610   GValue args[3] = { {0}, {0}, {0} };
611
612   GST_DEBUG ("searching pt %d in cache", pt);
613
614   GST_RTP_SESSION_LOCK (session);
615
616   /* first look in the cache */
617   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
618   if (caps) {
619     gst_caps_ref (caps);
620     goto done;
621   }
622
623   bin = session->bin;
624
625   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
626
627   /* not in cache, send signal to request caps */
628   g_value_init (&args[0], GST_TYPE_ELEMENT);
629   g_value_set_object (&args[0], bin);
630   g_value_init (&args[1], G_TYPE_UINT);
631   g_value_set_uint (&args[1], session->id);
632   g_value_init (&args[2], G_TYPE_UINT);
633   g_value_set_uint (&args[2], pt);
634
635   g_value_init (&ret, GST_TYPE_CAPS);
636   g_value_set_boxed (&ret, NULL);
637
638   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
639
640   g_value_unset (&args[0]);
641   g_value_unset (&args[1]);
642   g_value_unset (&args[2]);
643   caps = (GstCaps *) g_value_dup_boxed (&ret);
644   g_value_unset (&ret);
645   if (!caps)
646     goto no_caps;
647
648   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
649
650   /* store in cache, take additional ref */
651   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
652       gst_caps_ref (caps));
653
654 done:
655   GST_RTP_SESSION_UNLOCK (session);
656
657   return caps;
658
659   /* ERRORS */
660 no_caps:
661   {
662     GST_RTP_SESSION_UNLOCK (session);
663     GST_DEBUG ("no pt map could be obtained");
664     return NULL;
665   }
666 }
667
668 static gboolean
669 return_true (gpointer key, gpointer value, gpointer user_data)
670 {
671   return TRUE;
672 }
673
674 static void
675 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
676 {
677   GSList *sessions, *streams;
678
679   GST_RTP_BIN_LOCK (bin);
680   GST_DEBUG_OBJECT (bin, "clearing pt map");
681   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
682     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
683
684     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
685     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
686
687     GST_RTP_SESSION_LOCK (session);
688     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
689
690     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
691       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
692
693       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
694       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
695       g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
696     }
697     GST_RTP_SESSION_UNLOCK (session);
698   }
699   GST_RTP_BIN_UNLOCK (bin);
700 }
701
702 static void
703 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
704     const gchar * name, const GValue * value)
705 {
706   GSList *sessions, *streams;
707
708   GST_RTP_BIN_LOCK (bin);
709   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
710     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
711
712     GST_RTP_SESSION_LOCK (session);
713     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
714       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
715
716       g_object_set_property (G_OBJECT (stream->buffer), name, value);
717     }
718     GST_RTP_SESSION_UNLOCK (session);
719   }
720   GST_RTP_BIN_UNLOCK (bin);
721 }
722
723 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
724 static GstRtpBinClient *
725 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
726 {
727   GstRtpBinClient *result = NULL;
728   GSList *walk;
729
730   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
731     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
732
733     if (len != client->cname_len)
734       continue;
735
736     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
737       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
738           client->cname);
739       result = client;
740       break;
741     }
742   }
743
744   /* nothing found, create one */
745   if (result == NULL) {
746     result = g_new0 (GstRtpBinClient, 1);
747     result->cname = g_strndup ((gchar *) data, len);
748     result->cname_len = len;
749     result->min_delta = G_MAXINT64;
750     bin->clients = g_slist_prepend (bin->clients, result);
751     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
752         result->cname);
753   }
754   return result;
755 }
756
757 static void
758 free_client (GstRtpBinClient * client)
759 {
760   g_slist_free (client->streams);
761   g_free (client->cname);
762   g_free (client);
763 }
764
765 /* associate a stream to the given CNAME. This will make sure all streams for
766  * that CNAME are synchronized together. */
767 static void
768 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
769     guint8 * data)
770 {
771   GstRtpBinClient *client;
772   gboolean created;
773   GSList *walk;
774
775   /* first find or create the CNAME */
776   GST_RTP_BIN_LOCK (bin);
777   client = get_client (bin, len, data, &created);
778
779   /* find stream in the client */
780   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
781     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
782
783     if (ostream == stream)
784       break;
785   }
786   /* not found, add it to the list */
787   if (walk == NULL) {
788     GST_DEBUG_OBJECT (bin,
789         "new association of SSRC %08x with client %p with CNAME %s",
790         stream->ssrc, client, client->cname);
791     client->streams = g_slist_prepend (client->streams, stream);
792     client->nstreams++;
793   } else {
794     GST_DEBUG_OBJECT (bin,
795         "found association of SSRC %08x with client %p with CNAME %s",
796         stream->ssrc, client, client->cname);
797   }
798
799   /* we can only continue if we know the local clock-base and clock-rate */
800   if (stream->clock_base == -1)
801     goto no_clock_base;
802
803   if (stream->clock_rate <= 0) {
804     gint pt = -1;
805     GstCaps *caps = NULL;
806     GstStructure *s = NULL;
807
808     GST_RTP_SESSION_LOCK (stream->session);
809     pt = stream->last_pt;
810     GST_RTP_SESSION_UNLOCK (stream->session);
811
812     if (pt < 0)
813       goto no_clock_rate;
814
815     caps = get_pt_map (stream->session, pt);
816     if (!caps)
817       goto no_clock_rate;
818
819     s = gst_caps_get_structure (caps, 0);
820     gst_structure_get_int (s, "clock-rate", &stream->clock_rate);
821     gst_caps_unref (caps);
822
823     if (stream->clock_rate <= 0)
824       goto no_clock_rate;
825   }
826
827   /* map last RTP time to local timeline using our clock-base */
828   stream->local_rtp = stream->last_extrtptime - stream->clock_base;
829
830   GST_DEBUG_OBJECT (bin,
831       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
832       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base,
833       stream->last_extrtptime, stream->local_rtp, stream->clock_rate);
834
835   /* calculate local NTP time in gstreamer timestamp */
836   stream->local_unix =
837       gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND,
838       stream->clock_rate);
839   stream->local_unix += stream->clock_base_time;
840   /* calculate delta between server and receiver */
841   stream->unix_delta = stream->last_unix - stream->local_unix;
842
843   GST_DEBUG_OBJECT (bin,
844       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
845       ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix,
846       stream->unix_delta);
847
848   /* recalc inter stream playout offset, but only if there are more than one
849    * stream. */
850   if (client->nstreams > 1) {
851     gint64 min;
852
853     /* calculate the min of all deltas */
854     min = G_MAXINT64;
855     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
856       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
857
858       if (ostream->unix_delta && ostream->unix_delta < min)
859         min = ostream->unix_delta;
860     }
861
862     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
863         min);
864
865     /* calculate offsets for each stream */
866     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
867       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
868
869       if (ostream->unix_delta == 0)
870         continue;
871
872       ostream->ts_offset = ostream->unix_delta - min;
873
874       /* delta changed, see how much */
875       if (ostream->prev_ts_offset != ostream->ts_offset) {
876         gint64 diff;
877
878         if (ostream->prev_ts_offset > ostream->ts_offset)
879           diff = ostream->prev_ts_offset - ostream->ts_offset;
880         else
881           diff = ostream->ts_offset - ostream->prev_ts_offset;
882
883         /* only change diff when it changed more than 1 millisecond. This
884          * compensates for rounding errors in NTP to RTP timestamp
885          * conversions */
886         if (diff > GST_MSECOND)
887           g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL);
888
889         ostream->prev_ts_offset = ostream->ts_offset;
890       }
891       GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
892           ostream->ssrc, ostream->ts_offset);
893     }
894   }
895   GST_RTP_BIN_UNLOCK (bin);
896   return;
897
898 no_clock_base:
899   {
900     GST_WARNING_OBJECT (bin, "we have no clock-base");
901     GST_RTP_BIN_UNLOCK (bin);
902     return;
903   }
904 no_clock_rate:
905   {
906     GST_WARNING_OBJECT (bin, "we have no clock-rate");
907     GST_RTP_BIN_UNLOCK (bin);
908     return;
909   }
910 }
911
912 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
913   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
914           (b) = gst_rtcp_packet_move_to_next ((packet)))
915
916 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
917   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
918           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
919
920 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
921   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
922           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
923
924 static GstFlowReturn
925 gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
926 {
927   GstFlowReturn ret = GST_FLOW_OK;
928   GstRtpBinStream *stream;
929   GstRtpBin *bin;
930   GstRTCPPacket packet;
931   guint32 ssrc;
932   guint64 ntptime;
933   guint32 rtptime;
934   gboolean have_sr, have_sdes;
935   gboolean more;
936
937   stream = gst_pad_get_element_private (pad);
938   bin = stream->bin;
939
940   GST_DEBUG_OBJECT (bin, "received sync packet");
941
942   if (!gst_rtcp_buffer_validate (buffer))
943     goto invalid_rtcp;
944
945   have_sr = FALSE;
946   have_sdes = FALSE;
947   GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) {
948     /* first packet must be SR or RR or else the validate would have failed */
949     switch (gst_rtcp_packet_get_type (&packet)) {
950       case GST_RTCP_TYPE_SR:
951         /* only parse first. There is only supposed to be one SR in the packet
952          * but we will deal with malformed packets gracefully */
953         if (have_sr)
954           break;
955         /* get NTP and RTP times */
956         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
957             NULL, NULL);
958
959         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
960         /* ignore SR that is not ours */
961         if (ssrc != stream->ssrc)
962           continue;
963
964         have_sr = TRUE;
965
966         /* store values in the stream */
967         stream->have_sync = TRUE;
968         stream->last_unix = gst_rtcp_ntp_to_unix (ntptime);
969         /* use extended timestamp */
970         gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime);
971         break;
972       case GST_RTCP_TYPE_SDES:
973       {
974         gboolean more_items, more_entries;
975
976         /* only deal with first SDES, there is only supposed to be one SDES in
977          * the RTCP packet but we deal with bad packets gracefully. Also bail
978          * out if we have not seen an SR item yet. */
979         if (have_sdes || !have_sr)
980           break;
981
982         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
983           /* skip items that are not about the SSRC of the sender */
984           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
985             continue;
986
987           /* find the CNAME entry */
988           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
989             GstRTCPSDESType type;
990             guint8 len;
991             guint8 *data;
992
993             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
994
995             if (type == GST_RTCP_SDES_CNAME) {
996               stream->clock_base = GST_BUFFER_OFFSET (buffer);
997               stream->clock_base_time = GST_BUFFER_OFFSET_END (buffer);
998               /* associate the stream to CNAME */
999               gst_rtp_bin_associate (bin, stream, len, data);
1000             }
1001           }
1002         }
1003         have_sdes = TRUE;
1004         break;
1005       }
1006       default:
1007         /* we can ignore these packets */
1008         break;
1009     }
1010   }
1011
1012   gst_buffer_unref (buffer);
1013
1014   return ret;
1015
1016   /* ERRORS */
1017 invalid_rtcp:
1018   {
1019     /* this is fatal and should be filtered earlier */
1020     GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL),
1021         ("invalid RTCP packet received"));
1022     gst_buffer_unref (buffer);
1023     return GST_FLOW_ERROR;
1024   }
1025 }
1026
1027 /* create a new stream with @ssrc in @session. Must be called with
1028  * RTP_SESSION_LOCK. */
1029 static GstRtpBinStream *
1030 create_stream (GstRtpBinSession * session, guint32 ssrc)
1031 {
1032   GstElement *buffer, *demux;
1033   GstRtpBinStream *stream;
1034   GstPadTemplate *templ;
1035   gchar *padname;
1036
1037   if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL)))
1038     goto no_jitterbuffer;
1039
1040   if (!(demux = gst_element_factory_make ("gstrtpptdemux", NULL)))
1041     goto no_demux;
1042
1043   stream = g_new0 (GstRtpBinStream, 1);
1044   stream->ssrc = ssrc;
1045   stream->bin = session->bin;
1046   stream->session = session;
1047   stream->buffer = buffer;
1048   stream->demux = demux;
1049   stream->last_extrtptime = -1;
1050   stream->last_pt = -1;
1051   stream->have_sync = FALSE;
1052   session->streams = g_slist_prepend (session->streams, stream);
1053
1054   /* make an internal sinkpad for RTCP sync packets. Take ownership of the
1055    * pad. We will link this pad later. */
1056   padname = g_strdup_printf ("sync_%d", ssrc);
1057   templ = gst_static_pad_template_get (&rtpbin_sync_sink_template);
1058   stream->sync_pad = gst_pad_new_from_template (templ, padname);
1059   gst_object_unref (templ);
1060   g_free (padname);
1061   gst_object_ref (stream->sync_pad);
1062   gst_object_sink (stream->sync_pad);
1063   gst_pad_set_element_private (stream->sync_pad, stream);
1064   gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain);
1065   gst_pad_set_active (stream->sync_pad, TRUE);
1066
1067   /* provide clock_rate to the jitterbuffer when needed */
1068   g_signal_connect (buffer, "request-pt-map",
1069       (GCallback) pt_map_requested, session);
1070
1071   /* configure latency and packet lost */
1072   g_object_set (buffer, "latency", session->bin->latency, NULL);
1073   g_object_set (buffer, "do-lost", session->bin->do_lost, NULL);
1074
1075   gst_bin_add (GST_BIN_CAST (session->bin), buffer);
1076   gst_element_set_state (buffer, GST_STATE_PLAYING);
1077   gst_bin_add (GST_BIN_CAST (session->bin), demux);
1078   gst_element_set_state (demux, GST_STATE_PLAYING);
1079
1080   /* link stuff */
1081   gst_element_link (buffer, demux);
1082
1083   return stream;
1084
1085   /* ERRORS */
1086 no_jitterbuffer:
1087   {
1088     g_warning ("gstrtpbin: could not create gstrtpjitterbuffer element");
1089     return NULL;
1090   }
1091 no_demux:
1092   {
1093     gst_object_unref (buffer);
1094     g_warning ("gstrtpbin: could not create gstrtpptdemux element");
1095     return NULL;
1096   }
1097 }
1098
1099 static void
1100 free_stream (GstRtpBinStream * stream)
1101 {
1102   GstRtpBinSession *session;
1103
1104   session = stream->session;
1105
1106   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1107   gst_element_set_state (stream->demux, GST_STATE_NULL);
1108
1109   gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer);
1110   gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux);
1111
1112   gst_object_unref (stream->sync_pad);
1113
1114   session->streams = g_slist_remove (session->streams, stream);
1115
1116   g_free (stream);
1117 }
1118
1119 /* GObject vmethods */
1120 static void gst_rtp_bin_dispose (GObject * object);
1121 static void gst_rtp_bin_finalize (GObject * object);
1122 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1123     const GValue * value, GParamSpec * pspec);
1124 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1125     GValue * value, GParamSpec * pspec);
1126
1127 /* GstElement vmethods */
1128 static GstClock *gst_rtp_bin_provide_clock (GstElement * element);
1129 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1130     GstStateChange transition);
1131 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1132     GstPadTemplate * templ, const gchar * name);
1133 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1134 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1135 static void gst_rtp_bin_clear_pt_map (GstRtpBin * bin);
1136
1137 GST_BOILERPLATE (GstRtpBin, gst_rtp_bin, GstBin, GST_TYPE_BIN);
1138
1139 static void
1140 gst_rtp_bin_base_init (gpointer klass)
1141 {
1142   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
1143
1144   /* sink pads */
1145   gst_element_class_add_pad_template (element_class,
1146       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
1147   gst_element_class_add_pad_template (element_class,
1148       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
1149   gst_element_class_add_pad_template (element_class,
1150       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
1151
1152   /* src pads */
1153   gst_element_class_add_pad_template (element_class,
1154       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
1155   gst_element_class_add_pad_template (element_class,
1156       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
1157   gst_element_class_add_pad_template (element_class,
1158       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
1159
1160   gst_element_class_set_details (element_class, &rtpbin_details);
1161 }
1162
1163 static void
1164 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1165 {
1166   GObjectClass *gobject_class;
1167   GstElementClass *gstelement_class;
1168   GstBinClass *gstbin_class;
1169
1170   gobject_class = (GObjectClass *) klass;
1171   gstelement_class = (GstElementClass *) klass;
1172   gstbin_class = (GstBinClass *) klass;
1173
1174   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1175
1176   gobject_class->dispose = gst_rtp_bin_dispose;
1177   gobject_class->finalize = gst_rtp_bin_finalize;
1178   gobject_class->set_property = gst_rtp_bin_set_property;
1179   gobject_class->get_property = gst_rtp_bin_get_property;
1180
1181   g_object_class_install_property (gobject_class, PROP_LATENCY,
1182       g_param_spec_uint ("latency", "Buffer latency in ms",
1183           "Default amount of ms to buffer in the jitterbuffers", 0,
1184           G_MAXUINT, DEFAULT_LATENCY_MS, G_PARAM_READWRITE));
1185
1186   /**
1187    * GstRtpBin::request-pt-map:
1188    * @rtpbin: the object which received the signal
1189    * @session: the session
1190    * @pt: the pt
1191    *
1192    * Request the payload type as #GstCaps for @pt in @session.
1193    */
1194   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1195       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1196       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1197       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
1198       G_TYPE_UINT, G_TYPE_UINT);
1199   /**
1200    * GstRtpBin::clear-pt-map:
1201    * @rtpbin: the object which received the signal
1202    *
1203    * Clear all previously cached pt-mapping obtained with
1204    * GstRtpBin::request-pt-map.
1205    */
1206   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1207       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1208       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1209           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1210       0, G_TYPE_NONE);
1211
1212   /**
1213    * GstRtpBin::on-new-ssrc:
1214    * @rtpbin: the object which received the signal
1215    * @session: the session
1216    * @ssrc: the SSRC 
1217    *
1218    * Notify of a new SSRC that entered @session.
1219    */
1220   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1221       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1222       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1223       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1224       G_TYPE_UINT, G_TYPE_UINT);
1225   /**
1226    * GstRtpBin::on-ssrc-collision:
1227    * @rtpbin: the object which received the signal
1228    * @session: the session
1229    * @ssrc: the SSRC 
1230    *
1231    * Notify when we have an SSRC collision
1232    */
1233   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1234       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1235       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1236       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1237       G_TYPE_UINT, G_TYPE_UINT);
1238   /**
1239    * GstRtpBin::on-ssrc-validated:
1240    * @rtpbin: the object which received the signal
1241    * @session: the session
1242    * @ssrc: the SSRC 
1243    *
1244    * Notify of a new SSRC that became validated.
1245    */
1246   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1247       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1248       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1249       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1250       G_TYPE_UINT, G_TYPE_UINT);
1251   /**
1252    * GstRtpBin::on-ssrc-active:
1253    * @rtpbin: the object which received the signal
1254    * @session: the session
1255    * @ssrc: the SSRC
1256    *
1257    * Notify of a SSRC that is active, i.e., sending RTCP.
1258    */
1259   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1260       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1261       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1262       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1263       G_TYPE_UINT, G_TYPE_UINT);
1264   /**
1265    * GstRtpBin::on-ssrc-sdes:
1266    * @rtpbin: the object which received the signal
1267    * @session: the session
1268    * @ssrc: the SSRC
1269    *
1270    * Notify of a SSRC that is active, i.e., sending RTCP.
1271    */
1272   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1273       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1274       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1275       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1276       G_TYPE_UINT, G_TYPE_UINT);
1277
1278   /**
1279    * GstRtpBin::on-bye-ssrc:
1280    * @rtpbin: the object which received the signal
1281    * @session: the session
1282    * @ssrc: the SSRC 
1283    *
1284    * Notify of an SSRC that became inactive because of a BYE packet.
1285    */
1286   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1287       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1288       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1289       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1290       G_TYPE_UINT, G_TYPE_UINT);
1291   /**
1292    * GstRtpBin::on-bye-timeout:
1293    * @rtpbin: the object which received the signal
1294    * @session: the session
1295    * @ssrc: the SSRC 
1296    *
1297    * Notify of an SSRC that has timed out because of BYE
1298    */
1299   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1300       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1301       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1302       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1303       G_TYPE_UINT, G_TYPE_UINT);
1304   /**
1305    * GstRtpBin::on-timeout:
1306    * @rtpbin: the object which received the signal
1307    * @session: the session
1308    * @ssrc: the SSRC 
1309    *
1310    * Notify of an SSRC that has timed out
1311    */
1312   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1313       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1314       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1315       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1316       G_TYPE_UINT, G_TYPE_UINT);
1317
1318   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
1319       g_param_spec_string ("sdes-cname", "SDES CNAME",
1320           "The CNAME to put in SDES messages of this session",
1321           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
1322
1323   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
1324       g_param_spec_string ("sdes-name", "SDES NAME",
1325           "The NAME to put in SDES messages of this session",
1326           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
1327
1328   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
1329       g_param_spec_string ("sdes-email", "SDES EMAIL",
1330           "The EMAIL to put in SDES messages of this session",
1331           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
1332
1333   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
1334       g_param_spec_string ("sdes-phone", "SDES PHONE",
1335           "The PHONE to put in SDES messages of this session",
1336           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
1337
1338   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
1339       g_param_spec_string ("sdes-location", "SDES LOCATION",
1340           "The LOCATION to put in SDES messages of this session",
1341           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
1342
1343   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
1344       g_param_spec_string ("sdes-tool", "SDES TOOL",
1345           "The TOOL to put in SDES messages of this session",
1346           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
1347
1348   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
1349       g_param_spec_string ("sdes-note", "SDES NOTE",
1350           "The NOTE to put in SDES messages of this session",
1351           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
1352
1353   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1354       g_param_spec_boolean ("do-lost", "Do Lost",
1355           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1356           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1357
1358   gstelement_class->provide_clock =
1359       GST_DEBUG_FUNCPTR (gst_rtp_bin_provide_clock);
1360   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1361   gstelement_class->request_new_pad =
1362       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1363   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1364
1365   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
1366
1367   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1368
1369   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1370 }
1371
1372 static void
1373 gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass)
1374 {
1375   gchar *str;
1376
1377   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1378   rtpbin->priv->bin_lock = g_mutex_new ();
1379   rtpbin->priv->dyn_lock = g_mutex_new ();
1380   rtpbin->provided_clock = gst_system_clock_obtain ();
1381
1382   rtpbin->latency = DEFAULT_LATENCY_MS;
1383   rtpbin->do_lost = DEFAULT_DO_LOST;
1384
1385   /* some default SDES entries */
1386   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
1387   gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_CNAME, str);
1388   g_free (str);
1389
1390   gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_NAME, g_get_real_name ());
1391   gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_TOOL, "GStreamer");
1392 }
1393
1394 static void
1395 gst_rtp_bin_dispose (GObject * object)
1396 {
1397   GstRtpBin *rtpbin;
1398
1399   rtpbin = GST_RTP_BIN (object);
1400
1401   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, NULL);
1402   g_slist_free (rtpbin->sessions);
1403   rtpbin->sessions = NULL;
1404   g_slist_foreach (rtpbin->clients, (GFunc) free_client, NULL);
1405   g_slist_free (rtpbin->clients);
1406   rtpbin->clients = NULL;
1407
1408   G_OBJECT_CLASS (parent_class)->dispose (object);
1409 }
1410
1411 static void
1412 gst_rtp_bin_finalize (GObject * object)
1413 {
1414   GstRtpBin *rtpbin;
1415   gint i;
1416
1417   rtpbin = GST_RTP_BIN (object);
1418
1419   for (i = 0; i < 9; i++)
1420     g_free (rtpbin->sdes[i]);
1421
1422   g_mutex_free (rtpbin->priv->bin_lock);
1423   g_mutex_free (rtpbin->priv->dyn_lock);
1424   gst_object_unref (rtpbin->provided_clock);
1425
1426   G_OBJECT_CLASS (parent_class)->finalize (object);
1427 }
1428
1429 static const gchar *
1430 sdes_type_to_name (GstRTCPSDESType type)
1431 {
1432   const gchar *result;
1433
1434   switch (type) {
1435     case GST_RTCP_SDES_CNAME:
1436       result = "sdes-cname";
1437       break;
1438     case GST_RTCP_SDES_NAME:
1439       result = "sdes-name";
1440       break;
1441     case GST_RTCP_SDES_EMAIL:
1442       result = "sdes-email";
1443       break;
1444     case GST_RTCP_SDES_PHONE:
1445       result = "sdes-phone";
1446       break;
1447     case GST_RTCP_SDES_LOC:
1448       result = "sdes-location";
1449       break;
1450     case GST_RTCP_SDES_TOOL:
1451       result = "sdes-tool";
1452       break;
1453     case GST_RTCP_SDES_NOTE:
1454       result = "sdes-note";
1455       break;
1456     case GST_RTCP_SDES_PRIV:
1457       result = "sdes-priv";
1458       break;
1459     default:
1460       result = NULL;
1461       break;
1462   }
1463   return result;
1464 }
1465
1466 static void
1467 gst_rtp_bin_set_sdes_string (GstRtpBin * bin, GstRTCPSDESType type,
1468     const gchar * data)
1469 {
1470   GSList *item;
1471   const gchar *name;
1472
1473   if (type < 0 || type > 8)
1474     return;
1475
1476   GST_OBJECT_LOCK (bin);
1477   g_free (bin->sdes[type]);
1478   bin->sdes[type] = g_strdup (data);
1479   name = sdes_type_to_name (type);
1480   /* store in all sessions */
1481   for (item = bin->sessions; item; item = g_slist_next (item))
1482     g_object_set (item->data, name, bin->sdes[type], NULL);
1483   GST_OBJECT_UNLOCK (bin);
1484 }
1485
1486 static gchar *
1487 gst_rtp_bin_get_sdes_string (GstRtpBin * bin, GstRTCPSDESType type)
1488 {
1489   gchar *result;
1490
1491   if (type < 0 || type > 8)
1492     return NULL;
1493
1494   GST_OBJECT_LOCK (bin);
1495   result = g_strdup (bin->sdes[type]);
1496   GST_OBJECT_UNLOCK (bin);
1497
1498   return result;
1499 }
1500
1501 static void
1502 gst_rtp_bin_set_property (GObject * object, guint prop_id,
1503     const GValue * value, GParamSpec * pspec)
1504 {
1505   GstRtpBin *rtpbin;
1506
1507   rtpbin = GST_RTP_BIN (object);
1508
1509   switch (prop_id) {
1510     case PROP_LATENCY:
1511       GST_RTP_BIN_LOCK (rtpbin);
1512       rtpbin->latency = g_value_get_uint (value);
1513       GST_RTP_BIN_UNLOCK (rtpbin);
1514       /* propegate the property down to the jitterbuffer */
1515       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
1516       break;
1517     case PROP_SDES_CNAME:
1518       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_CNAME,
1519           g_value_get_string (value));
1520       break;
1521     case PROP_SDES_NAME:
1522       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_NAME,
1523           g_value_get_string (value));
1524       break;
1525     case PROP_SDES_EMAIL:
1526       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_EMAIL,
1527           g_value_get_string (value));
1528       break;
1529     case PROP_SDES_PHONE:
1530       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_PHONE,
1531           g_value_get_string (value));
1532       break;
1533     case PROP_SDES_LOCATION:
1534       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_LOC,
1535           g_value_get_string (value));
1536       break;
1537     case PROP_SDES_TOOL:
1538       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_TOOL,
1539           g_value_get_string (value));
1540       break;
1541     case PROP_SDES_NOTE:
1542       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_NOTE,
1543           g_value_get_string (value));
1544       break;
1545     case PROP_DO_LOST:
1546       GST_RTP_BIN_LOCK (rtpbin);
1547       rtpbin->do_lost = g_value_get_boolean (value);
1548       GST_RTP_BIN_UNLOCK (rtpbin);
1549       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
1550       break;
1551     default:
1552       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1553       break;
1554   }
1555 }
1556
1557 static void
1558 gst_rtp_bin_get_property (GObject * object, guint prop_id,
1559     GValue * value, GParamSpec * pspec)
1560 {
1561   GstRtpBin *rtpbin;
1562
1563   rtpbin = GST_RTP_BIN (object);
1564
1565   switch (prop_id) {
1566     case PROP_LATENCY:
1567       GST_RTP_BIN_LOCK (rtpbin);
1568       g_value_set_uint (value, rtpbin->latency);
1569       GST_RTP_BIN_UNLOCK (rtpbin);
1570       break;
1571     case PROP_SDES_CNAME:
1572       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1573               GST_RTCP_SDES_CNAME));
1574       break;
1575     case PROP_SDES_NAME:
1576       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1577               GST_RTCP_SDES_NAME));
1578       break;
1579     case PROP_SDES_EMAIL:
1580       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1581               GST_RTCP_SDES_EMAIL));
1582       break;
1583     case PROP_SDES_PHONE:
1584       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1585               GST_RTCP_SDES_PHONE));
1586       break;
1587     case PROP_SDES_LOCATION:
1588       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1589               GST_RTCP_SDES_LOC));
1590       break;
1591     case PROP_SDES_TOOL:
1592       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1593               GST_RTCP_SDES_TOOL));
1594       break;
1595     case PROP_SDES_NOTE:
1596       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1597               GST_RTCP_SDES_NOTE));
1598       break;
1599     case PROP_DO_LOST:
1600       GST_RTP_BIN_LOCK (rtpbin);
1601       g_value_set_boolean (value, rtpbin->do_lost);
1602       GST_RTP_BIN_UNLOCK (rtpbin);
1603       break;
1604     default:
1605       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1606       break;
1607   }
1608 }
1609
1610 static GstClock *
1611 gst_rtp_bin_provide_clock (GstElement * element)
1612 {
1613   GstRtpBin *rtpbin;
1614
1615   rtpbin = GST_RTP_BIN (element);
1616
1617   return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock));
1618 }
1619
1620 static void
1621 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
1622 {
1623   GstRtpBin *rtpbin;
1624
1625   rtpbin = GST_RTP_BIN (bin);
1626
1627   switch (GST_MESSAGE_TYPE (message)) {
1628     case GST_MESSAGE_ELEMENT:
1629     {
1630       const GstStructure *s = gst_message_get_structure (message);
1631
1632       /* we change the structure name and add the session ID to it */
1633       if (gst_structure_has_name (s, "GstRTPSessionSDES")) {
1634         GSList *walk;
1635
1636         /* find the session, the message source has it */
1637         for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
1638           GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
1639
1640           /* if we found the session, change message. else we exit the loop and
1641            * leave the message unchanged */
1642           if (GST_OBJECT_CAST (sess->session) == GST_MESSAGE_SRC (message)) {
1643             message = gst_message_make_writable (message);
1644             s = gst_message_get_structure (message);
1645
1646             gst_structure_set_name ((GstStructure *) s, "GstRTPBinSDES");
1647
1648             gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
1649                 sess->id, NULL);
1650             break;
1651           }
1652         }
1653       }
1654       /* fallthrough to forward the modified message to the parent */
1655     }
1656     default:
1657     {
1658       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1659       break;
1660     }
1661   }
1662 }
1663
1664 static void
1665 calc_ntp_ns_base (GstRtpBin * bin)
1666 {
1667   GstClockTime now;
1668   GTimeVal current;
1669   GSList *walk;
1670
1671   /* get the current time and convert it to NTP time in nanoseconds */
1672   g_get_current_time (&current);
1673   now = GST_TIMEVAL_TO_TIME (current);
1674   now += (2208988800LL * GST_SECOND);
1675
1676   GST_RTP_BIN_LOCK (bin);
1677   bin->priv->ntp_ns_base = now;
1678   for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
1679     GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
1680
1681     g_object_set (session->session, "ntp-ns-base", now, NULL);
1682   }
1683   GST_RTP_BIN_UNLOCK (bin);
1684
1685   return;
1686 }
1687
1688 static GstStateChangeReturn
1689 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
1690 {
1691   GstStateChangeReturn res;
1692   GstRtpBin *rtpbin;
1693   GstRtpBinPrivate *priv;
1694
1695   rtpbin = GST_RTP_BIN (element);
1696   priv = rtpbin->priv;
1697
1698   switch (transition) {
1699     case GST_STATE_CHANGE_NULL_TO_READY:
1700       break;
1701     case GST_STATE_CHANGE_READY_TO_PAUSED:
1702       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
1703       g_atomic_int_set (&priv->shutdown, 0);
1704       break;
1705     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1706       calc_ntp_ns_base (rtpbin);
1707       break;
1708     case GST_STATE_CHANGE_PAUSED_TO_READY:
1709       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
1710       g_atomic_int_set (&priv->shutdown, 1);
1711       /* wait for all callbacks to end by taking the lock. No new callbacks will
1712        * be able to happen as we set the shutdown flag. */
1713       GST_RTP_BIN_DYN_LOCK (rtpbin);
1714       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
1715       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
1716       break;
1717     default:
1718       break;
1719   }
1720
1721   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1722
1723   switch (transition) {
1724     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1725       break;
1726     case GST_STATE_CHANGE_PAUSED_TO_READY:
1727       break;
1728     case GST_STATE_CHANGE_READY_TO_NULL:
1729       break;
1730     default:
1731       break;
1732   }
1733   return res;
1734 }
1735
1736 /* a new pad (SSRC) was created in @session. This signal is emited from the
1737  * payload demuxer. */
1738 static void
1739 new_payload_found (GstElement * element, guint pt, GstPad * pad,
1740     GstRtpBinStream * stream)
1741 {
1742   GstRtpBin *rtpbin;
1743   GstElementClass *klass;
1744   GstPadTemplate *templ;
1745   gchar *padname;
1746   GstPad *gpad;
1747
1748   rtpbin = stream->bin;
1749
1750   GST_DEBUG ("new payload pad %d", pt);
1751
1752   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
1753
1754   /* ghost the pad to the parent */
1755   klass = GST_ELEMENT_GET_CLASS (rtpbin);
1756   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
1757   padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d",
1758       stream->session->id, stream->ssrc, pt);
1759   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
1760   g_free (padname);
1761
1762   gst_pad_set_caps (gpad, GST_PAD_CAPS (pad));
1763   gst_pad_set_active (gpad, TRUE);
1764   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
1765
1766   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
1767
1768   return;
1769
1770 shutdown:
1771   {
1772     GST_DEBUG ("ignoring, we are shutting down");
1773     return;
1774   }
1775 }
1776
1777 static GstCaps *
1778 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
1779 {
1780   GstRtpBin *rtpbin;
1781   GstCaps *caps;
1782
1783   rtpbin = session->bin;
1784
1785   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
1786       session->id);
1787
1788   caps = get_pt_map (session, pt);
1789   if (!caps)
1790     goto no_caps;
1791
1792   return caps;
1793
1794   /* ERRORS */
1795 no_caps:
1796   {
1797     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
1798     return NULL;
1799   }
1800 }
1801
1802 /* emited when caps changed for the session */
1803 static void
1804 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
1805 {
1806   GstRtpBin *bin;
1807   GstCaps *caps;
1808   gint payload;
1809   const GstStructure *s;
1810
1811   bin = session->bin;
1812
1813   g_object_get (pad, "caps", &caps, NULL);
1814
1815   if (caps == NULL)
1816     return;
1817
1818   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
1819
1820   s = gst_caps_get_structure (caps, 0);
1821
1822   /* get payload, finish when it's not there */
1823   if (!gst_structure_get_int (s, "payload", &payload))
1824     return;
1825
1826   GST_RTP_SESSION_LOCK (session);
1827   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
1828   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
1829   GST_RTP_SESSION_UNLOCK (session);
1830 }
1831
1832 /* Stores the last payload type received on a particular stream */
1833 static void
1834 payload_type_change (GstElement * element, guint pt, GstRtpBinStream * stream)
1835 {
1836   GST_RTP_SESSION_LOCK (stream->session);
1837   stream->last_pt = pt;
1838   GST_RTP_SESSION_UNLOCK (stream->session);
1839 }
1840
1841 /* a new pad (SSRC) was created in @session */
1842 static void
1843 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
1844     GstRtpBinSession * session)
1845 {
1846   GstRtpBin *rtpbin;
1847   GstRtpBinStream *stream;
1848   GstPad *sinkpad, *srcpad;
1849   gchar *padname;
1850   GstCaps *caps;
1851
1852   rtpbin = session->bin;
1853
1854   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x", ssrc);
1855
1856   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
1857
1858   GST_RTP_SESSION_LOCK (session);
1859
1860   /* create new stream */
1861   stream = create_stream (session, ssrc);
1862   if (!stream)
1863     goto no_stream;
1864
1865   /* get the caps of the pad, we need the clock-rate and base_time if any. */
1866   if ((caps = gst_pad_get_caps (pad))) {
1867     const GstStructure *s;
1868     guint val;
1869
1870     GST_DEBUG_OBJECT (rtpbin, "pad has caps %" GST_PTR_FORMAT, caps);
1871
1872     s = gst_caps_get_structure (caps, 0);
1873
1874     if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate)) {
1875       stream->clock_rate = -1;
1876
1877       GST_WARNING_OBJECT (rtpbin,
1878           "Caps have no clock rate %s from pad %s:%s",
1879           gst_caps_to_string (caps), GST_DEBUG_PAD_NAME (pad));
1880     }
1881
1882     if (gst_structure_get_uint (s, "clock-base", &val))
1883       stream->clock_base = val;
1884     else
1885       stream->clock_base = -1;
1886
1887     gst_caps_unref (caps);
1888   }
1889
1890   /* get pad and link */
1891   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer");
1892   padname = g_strdup_printf ("src_%d", ssrc);
1893   srcpad = gst_element_get_static_pad (element, padname);
1894   g_free (padname);
1895   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
1896   gst_pad_link (srcpad, sinkpad);
1897   gst_object_unref (sinkpad);
1898   gst_object_unref (srcpad);
1899
1900   /* get the RTCP sync pad */
1901   GST_DEBUG_OBJECT (rtpbin, "linking sync pad");
1902   padname = g_strdup_printf ("rtcp_src_%d", ssrc);
1903   srcpad = gst_element_get_static_pad (element, padname);
1904   g_free (padname);
1905   gst_pad_link (srcpad, stream->sync_pad);
1906   gst_object_unref (srcpad);
1907
1908   /* connect to the new-pad signal of the payload demuxer, this will expose the
1909    * new pad by ghosting it. */
1910   stream->demux_newpad_sig = g_signal_connect (stream->demux,
1911       "new-payload-type", (GCallback) new_payload_found, stream);
1912   /* connect to the request-pt-map signal. This signal will be emited by the
1913    * demuxer so that it can apply a proper caps on the buffers for the
1914    * depayloaders. */
1915   stream->demux_ptreq_sig = g_signal_connect (stream->demux,
1916       "request-pt-map", (GCallback) pt_map_requested, session);
1917   /* connect to the payload-type-change signal so that we can know which
1918    * PT is the current PT so that the jitterbuffer can be matched to the right
1919    * offset. */
1920   stream->demux_pt_change_sig = g_signal_connect (stream->demux,
1921       "payload-type-change", (GCallback) payload_type_change, stream);
1922
1923   GST_RTP_SESSION_UNLOCK (session);
1924   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
1925
1926   return;
1927
1928   /* ERRORS */
1929 shutdown:
1930   {
1931     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
1932     return;
1933   }
1934 no_stream:
1935   {
1936     GST_RTP_SESSION_UNLOCK (session);
1937     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
1938     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
1939     return;
1940   }
1941 }
1942
1943 /* Create a pad for receiving RTP for the session in @name. Must be called with
1944  * RTP_BIN_LOCK.
1945  */
1946 static GstPad *
1947 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
1948 {
1949   GstPad *result, *sinkdpad;
1950   guint sessid;
1951   GstRtpBinSession *session;
1952   GstPadLinkReturn lres;
1953
1954   /* first get the session number */
1955   if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1)
1956     goto no_name;
1957
1958   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
1959
1960   /* get or create session */
1961   session = find_session_by_id (rtpbin, sessid);
1962   if (!session) {
1963     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
1964     /* create session now */
1965     session = create_session (rtpbin, sessid);
1966     if (session == NULL)
1967       goto create_error;
1968   }
1969
1970   /* check if pad was requested */
1971   if (session->recv_rtp_sink != NULL)
1972     goto existed;
1973
1974   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
1975   /* get recv_rtp pad and store */
1976   session->recv_rtp_sink =
1977       gst_element_get_request_pad (session->session, "recv_rtp_sink");
1978   if (session->recv_rtp_sink == NULL)
1979     goto pad_failed;
1980
1981   g_signal_connect (session->recv_rtp_sink, "notify::caps",
1982       (GCallback) caps_changed, session);
1983
1984   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
1985   /* get srcpad, link to SSRCDemux */
1986   session->recv_rtp_src =
1987       gst_element_get_static_pad (session->session, "recv_rtp_src");
1988   if (session->recv_rtp_src == NULL)
1989     goto pad_failed;
1990
1991   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
1992   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
1993   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
1994   lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
1995   gst_object_unref (sinkdpad);
1996   if (lres != GST_PAD_LINK_OK)
1997     goto link_failed;
1998
1999   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2000   session->demux_newpad_sig = g_signal_connect (session->demux,
2001       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2002
2003   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2004   result =
2005       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
2006   gst_pad_set_active (result, TRUE);
2007   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2008
2009   return result;
2010
2011   /* ERRORS */
2012 no_name:
2013   {
2014     g_warning ("gstrtpbin: invalid name given");
2015     return NULL;
2016   }
2017 create_error:
2018   {
2019     /* create_session already warned */
2020     return NULL;
2021   }
2022 existed:
2023   {
2024     g_warning ("gstrtpbin: recv_rtp pad already requested for session %d",
2025         sessid);
2026     return NULL;
2027   }
2028 pad_failed:
2029   {
2030     g_warning ("gstrtpbin: failed to get session pad");
2031     return NULL;
2032   }
2033 link_failed:
2034   {
2035     g_warning ("gstrtpbin: failed to link pads");
2036     return NULL;
2037   }
2038 }
2039
2040 /* Create a pad for receiving RTCP for the session in @name. Must be called with
2041  * RTP_BIN_LOCK.
2042  */
2043 static GstPad *
2044 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
2045     const gchar * name)
2046 {
2047   GstPad *result;
2048   guint sessid;
2049   GstRtpBinSession *session;
2050   GstPad *sinkdpad;
2051   GstPadLinkReturn lres;
2052
2053   /* first get the session number */
2054   if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1)
2055     goto no_name;
2056
2057   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2058
2059   /* get or create the session */
2060   session = find_session_by_id (rtpbin, sessid);
2061   if (!session) {
2062     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2063     /* create session now */
2064     session = create_session (rtpbin, sessid);
2065     if (session == NULL)
2066       goto create_error;
2067   }
2068
2069   /* check if pad was requested */
2070   if (session->recv_rtcp_sink != NULL)
2071     goto existed;
2072
2073   /* get recv_rtp pad and store */
2074   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
2075   session->recv_rtcp_sink =
2076       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
2077   if (session->recv_rtcp_sink == NULL)
2078     goto pad_failed;
2079
2080   /* get srcpad, link to SSRCDemux */
2081   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
2082   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
2083   if (session->sync_src == NULL)
2084     goto pad_failed;
2085
2086   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
2087   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
2088   lres = gst_pad_link (session->sync_src, sinkdpad);
2089   gst_object_unref (sinkdpad);
2090   if (lres != GST_PAD_LINK_OK)
2091     goto link_failed;
2092
2093   result =
2094       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
2095   gst_pad_set_active (result, TRUE);
2096   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2097
2098   return result;
2099
2100   /* ERRORS */
2101 no_name:
2102   {
2103     g_warning ("gstrtpbin: invalid name given");
2104     return NULL;
2105   }
2106 create_error:
2107   {
2108     /* create_session already warned */
2109     return NULL;
2110   }
2111 existed:
2112   {
2113     g_warning ("gstrtpbin: recv_rtcp pad already requested for session %d",
2114         sessid);
2115     return NULL;
2116   }
2117 pad_failed:
2118   {
2119     g_warning ("gstrtpbin: failed to get session pad");
2120     return NULL;
2121   }
2122 link_failed:
2123   {
2124     g_warning ("gstrtpbin: failed to link pads");
2125     return NULL;
2126   }
2127 }
2128
2129 /* Create a pad for sending RTP for the session in @name. Must be called with
2130  * RTP_BIN_LOCK.
2131  */
2132 static GstPad *
2133 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2134 {
2135   GstPad *result, *srcghost;
2136   gchar *gname;
2137   guint sessid;
2138   GstRtpBinSession *session;
2139   GstElementClass *klass;
2140
2141   /* first get the session number */
2142   if (name == NULL || sscanf (name, "send_rtp_sink_%d", &sessid) != 1)
2143     goto no_name;
2144
2145   /* get or create session */
2146   session = find_session_by_id (rtpbin, sessid);
2147   if (!session) {
2148     /* create session now */
2149     session = create_session (rtpbin, sessid);
2150     if (session == NULL)
2151       goto create_error;
2152   }
2153
2154   /* check if pad was requested */
2155   if (session->send_rtp_sink != NULL)
2156     goto existed;
2157
2158   /* get send_rtp pad and store */
2159   session->send_rtp_sink =
2160       gst_element_get_request_pad (session->session, "send_rtp_sink");
2161   if (session->send_rtp_sink == NULL)
2162     goto pad_failed;
2163
2164   result =
2165       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
2166   gst_pad_set_active (result, TRUE);
2167   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2168
2169   /* get srcpad */
2170   session->send_rtp_src =
2171       gst_element_get_static_pad (session->session, "send_rtp_src");
2172   if (session->send_rtp_src == NULL)
2173     goto no_srcpad;
2174
2175   /* ghost the new source pad */
2176   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2177   gname = g_strdup_printf ("send_rtp_src_%d", sessid);
2178   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d");
2179   srcghost =
2180       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
2181   gst_pad_set_active (srcghost, TRUE);
2182   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost);
2183   g_free (gname);
2184
2185   return result;
2186
2187   /* ERRORS */
2188 no_name:
2189   {
2190     g_warning ("gstrtpbin: invalid name given");
2191     return NULL;
2192   }
2193 create_error:
2194   {
2195     /* create_session already warned */
2196     return NULL;
2197   }
2198 existed:
2199   {
2200     g_warning ("gstrtpbin: send_rtp pad already requested for session %d",
2201         sessid);
2202     return NULL;
2203   }
2204 pad_failed:
2205   {
2206     g_warning ("gstrtpbin: failed to get session pad for session %d", sessid);
2207     return NULL;
2208   }
2209 no_srcpad:
2210   {
2211     g_warning ("gstrtpbin: failed to get rtp source pad for session %d",
2212         sessid);
2213     return NULL;
2214   }
2215 }
2216
2217 /* Create a pad for sending RTCP for the session in @name. Must be called with
2218  * RTP_BIN_LOCK.
2219  */
2220 static GstPad *
2221 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2222 {
2223   GstPad *result;
2224   guint sessid;
2225   GstRtpBinSession *session;
2226
2227   /* first get the session number */
2228   if (name == NULL || sscanf (name, "send_rtcp_src_%d", &sessid) != 1)
2229     goto no_name;
2230
2231   /* get or create session */
2232   session = find_session_by_id (rtpbin, sessid);
2233   if (!session)
2234     goto no_session;
2235
2236   /* check if pad was requested */
2237   if (session->send_rtcp_src != NULL)
2238     goto existed;
2239
2240   /* get rtcp_src pad and store */
2241   session->send_rtcp_src =
2242       gst_element_get_request_pad (session->session, "send_rtcp_src");
2243   if (session->send_rtcp_src == NULL)
2244     goto pad_failed;
2245
2246   result =
2247       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
2248   gst_pad_set_active (result, TRUE);
2249   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2250
2251   return result;
2252
2253   /* ERRORS */
2254 no_name:
2255   {
2256     g_warning ("gstrtpbin: invalid name given");
2257     return NULL;
2258   }
2259 no_session:
2260   {
2261     g_warning ("gstrtpbin: session with id %d does not exist", sessid);
2262     return NULL;
2263   }
2264 existed:
2265   {
2266     g_warning ("gstrtpbin: send_rtcp_src pad already requested for session %d",
2267         sessid);
2268     return NULL;
2269   }
2270 pad_failed:
2271   {
2272     g_warning ("gstrtpbin: failed to get rtcp pad for session %d", sessid);
2273     return NULL;
2274   }
2275 }
2276
2277 /* If the requested name is NULL we should create a name with
2278  * the session number assuming we want the lowest posible session
2279  * with a free pad like the template */
2280 static gchar *
2281 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
2282 {
2283   gboolean name_found = FALSE;
2284   gint session = 0;
2285   GstPad *pad = NULL;
2286   GstIterator *pad_it = NULL;
2287   gchar *pad_name = NULL;
2288
2289   GST_DEBUG_OBJECT (element, "find a free pad name for template");
2290   while (!name_found) {
2291     g_free (pad_name);
2292     pad_name = g_strdup_printf (templ->name_template, session++);
2293     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
2294     name_found = TRUE;
2295     while (gst_iterator_next (pad_it, (gpointer) & pad) == GST_ITERATOR_OK) {
2296       gchar *name;
2297
2298       name = gst_pad_get_name (pad);
2299       if (strcmp (name, pad_name) == 0)
2300         name_found = FALSE;
2301       g_free (name);
2302     }
2303     gst_iterator_free (pad_it);
2304   }
2305
2306   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
2307   return pad_name;
2308 }
2309
2310 /* 
2311  */
2312 static GstPad *
2313 gst_rtp_bin_request_new_pad (GstElement * element,
2314     GstPadTemplate * templ, const gchar * name)
2315 {
2316   GstRtpBin *rtpbin;
2317   GstElementClass *klass;
2318   GstPad *result;
2319   gchar *pad_name = NULL;
2320
2321   g_return_val_if_fail (templ != NULL, NULL);
2322   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
2323
2324   rtpbin = GST_RTP_BIN (element);
2325   klass = GST_ELEMENT_GET_CLASS (element);
2326
2327   GST_RTP_BIN_LOCK (rtpbin);
2328
2329   if (name == NULL) {
2330     /* use a free pad name */
2331     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
2332   } else {
2333     /* use the provided name */
2334     pad_name = g_strdup (name);
2335   }
2336
2337   GST_DEBUG ("Trying to request a pad with name %s", pad_name);
2338
2339   /* figure out the template */
2340   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%d")) {
2341     result = create_recv_rtp (rtpbin, templ, pad_name);
2342   } else if (templ == gst_element_class_get_pad_template (klass,
2343           "recv_rtcp_sink_%d")) {
2344     result = create_recv_rtcp (rtpbin, templ, pad_name);
2345   } else if (templ == gst_element_class_get_pad_template (klass,
2346           "send_rtp_sink_%d")) {
2347     result = create_send_rtp (rtpbin, templ, pad_name);
2348   } else if (templ == gst_element_class_get_pad_template (klass,
2349           "send_rtcp_src_%d")) {
2350     result = create_rtcp (rtpbin, templ, pad_name);
2351   } else
2352     goto wrong_template;
2353
2354   g_free (pad_name);
2355   GST_RTP_BIN_UNLOCK (rtpbin);
2356
2357   return result;
2358
2359   /* ERRORS */
2360 wrong_template:
2361   {
2362     g_free (pad_name);
2363     GST_RTP_BIN_UNLOCK (rtpbin);
2364     g_warning ("gstrtpbin: this is not our template");
2365     return NULL;
2366   }
2367 }
2368
2369 static void
2370 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
2371 {
2372 }