46ef4bb9121d680d300dc25184cbb7ddefb32d45
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpbin
22  * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
23  *
24  * RTP bin combines the functions of #GstRtpSession, #GstRtpsSrcDemux,
25  * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
26  * RTP sessions that will be synchronized together using RTCP SR packets.
27  * 
28  * #GstRtpBin is configured with a number of request pads that define the
29  * functionality that is activated, similar to the #GstRtpSession element.
30  * 
31  * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_%%d pad. The session
32  * number must be specified in the pad name. 
33  * Data received on the recv_rtp_sink_%%d pad will be processed in the gstrtpsession
34  * manager and after being validated forwarded on #GstRtpsSrcDemux element. Each
35  * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36  * the packets are released from the jitterbuffer, they will be forwarded to a
37  * #GstRtpsSrcDemux element. The #GstRtpsSrcDemux element will demux the packets based
38  * on the payload type and will create a unique pad recv_rtp_src_%%d_%%d_%%d on
39  * gstrtpbin with the session number, SSRC and payload type respectively as the pad
40  * name.
41  * 
42  * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_%%d pad. The
43  * session number must be specified in the pad name.
44  * 
45  * If you want the session manager to generate and send RTCP packets, request
46  * the send_rtcp_src_%%d pad with the session number in the pad name. Packet pushed
47  * on this pad contain SR/RR RTCP reports that should be sent to all participants
48  * in the session.
49  * 
50  * To use #GstRtpBin as a sender, request a send_rtp_sink_%%d pad, which will
51  * automatically create a send_rtp_src_%%d pad. If the session number is not provided,
52  * the pad from the lowest available session will be returned. The session manager will modify the
53  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
54  * send_rtp_src_%%d pad after updating its internal state.
55  * 
56  * The session manager needs the clock-rate of the payload types it is handling
57  * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
58  * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
59  * signal.
60  * 
61  * <refsect2>
62  * <title>Example pipelines</title>
63  * |[
64  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
65  *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
66  * ]| Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
67  * |[
68  * gst-launch gstrtpbin name=rtpbin \
69  *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
70  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
71  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
72  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
73  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
74  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
75  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
76  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
77  * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
78  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
79  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
80  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
81  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
82  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
83  * is received on port 5007. Since RTCP packets from the sender should be sent
84  * as soon as possible and do not participate in preroll, sync=false and 
85  * async=false is configured on udpsink
86  * |[
87  * gst-launch -v gstrtpbin name=rtpbin                                          \
88  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
89  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
90  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
91  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
92  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
93  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
94  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
95  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
96  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
97  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
98  * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
99  * decode and display the video.
100  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
101  * decode and play the audio.
102  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
103  * session 1 on port 5003. These packets will be used for session management and
104  * synchronisation.
105  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
106  * on port 5007.
107  * </refsect2>
108  *
109  * Last reviewed on 2007-08-30 (0.10.6)
110  */
111
112 #ifdef HAVE_CONFIG_H
113 #include "config.h"
114 #endif
115 #include <string.h>
116
117 #include <gst/rtp/gstrtpbuffer.h>
118 #include <gst/rtp/gstrtcpbuffer.h>
119
120 #include "gstrtpbin-marshal.h"
121 #include "gstrtpbin.h"
122 #include "gstrtpsession.h"
123
124 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
125 #define GST_CAT_DEFAULT gst_rtp_bin_debug
126
127 /* elementfactory information */
128 static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
129     "Filter/Network/RTP",
130     "Implement an RTP bin",
131     "Wim Taymans <wim.taymans@gmail.com>");
132
133 /* sink pads */
134 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
135 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
136     GST_PAD_SINK,
137     GST_PAD_REQUEST,
138     GST_STATIC_CAPS ("application/x-rtp")
139     );
140
141 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
142 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
143     GST_PAD_SINK,
144     GST_PAD_REQUEST,
145     GST_STATIC_CAPS ("application/x-rtcp")
146     );
147
148 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
149 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
150     GST_PAD_SINK,
151     GST_PAD_REQUEST,
152     GST_STATIC_CAPS ("application/x-rtp")
153     );
154
155 /* src pads */
156 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
157 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
158     GST_PAD_SRC,
159     GST_PAD_SOMETIMES,
160     GST_STATIC_CAPS ("application/x-rtp")
161     );
162
163 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
164 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
165     GST_PAD_SRC,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS ("application/x-rtcp")
168     );
169
170 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
171 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
172     GST_PAD_SRC,
173     GST_PAD_SOMETIMES,
174     GST_STATIC_CAPS ("application/x-rtp")
175     );
176
177 /* padtemplate for the internal pad */
178 static GstStaticPadTemplate rtpbin_sync_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("sink_%d",
180     GST_PAD_SINK,
181     GST_PAD_SOMETIMES,
182     GST_STATIC_CAPS ("application/x-rtcp")
183     );
184
185 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
186    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
187
188 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock ((bin)->priv->bin_lock)
189 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)
190
191 /* lock to protect dynamic callbacks, like pad-added and new ssrc. */
192 #define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock ((bin)->priv->dyn_lock)
193 #define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock ((bin)->priv->dyn_lock)
194
195 /* lock for shutdown */
196 #define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
197 G_STMT_START {                                   \
198   if (g_atomic_int_get (&bin->priv->shutdown))   \
199     goto label;                                  \
200   GST_RTP_BIN_DYN_LOCK (bin);                    \
201   if (g_atomic_int_get (&bin->priv->shutdown)) { \
202     GST_RTP_BIN_DYN_UNLOCK (bin);                \
203     goto label;                                  \
204   }                                              \
205 } G_STMT_END
206
207 /* unlock for shutdown */
208 #define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
209   GST_RTP_BIN_DYN_UNLOCK (bin);                  \
210
211 struct _GstRtpBinPrivate
212 {
213   GMutex *bin_lock;
214
215   /* lock protecting dynamic adding/removing */
216   GMutex *dyn_lock;
217
218   /* the time when we went to playing */
219   GstClockTime ntp_ns_base;
220
221   /* if we are shutting down or not */
222   gint shutdown;
223 };
224
225 /* signals and args */
226 enum
227 {
228   SIGNAL_REQUEST_PT_MAP,
229   SIGNAL_CLEAR_PT_MAP,
230
231   SIGNAL_ON_NEW_SSRC,
232   SIGNAL_ON_SSRC_COLLISION,
233   SIGNAL_ON_SSRC_VALIDATED,
234   SIGNAL_ON_SSRC_ACTIVE,
235   SIGNAL_ON_SSRC_SDES,
236   SIGNAL_ON_BYE_SSRC,
237   SIGNAL_ON_BYE_TIMEOUT,
238   SIGNAL_ON_TIMEOUT,
239   LAST_SIGNAL
240 };
241
242 #define DEFAULT_LATENCY_MS           200
243 #define DEFAULT_SDES_CNAME           NULL
244 #define DEFAULT_SDES_NAME            NULL
245 #define DEFAULT_SDES_EMAIL           NULL
246 #define DEFAULT_SDES_PHONE           NULL
247 #define DEFAULT_SDES_LOCATION        NULL
248 #define DEFAULT_SDES_TOOL            NULL
249 #define DEFAULT_SDES_NOTE            NULL
250 #define DEFAULT_DO_LOST              FALSE
251
252 enum
253 {
254   PROP_0,
255   PROP_LATENCY,
256   PROP_SDES_CNAME,
257   PROP_SDES_NAME,
258   PROP_SDES_EMAIL,
259   PROP_SDES_PHONE,
260   PROP_SDES_LOCATION,
261   PROP_SDES_TOOL,
262   PROP_SDES_NOTE,
263   PROP_DO_LOST,
264   PROP_LAST
265 };
266
267 /* helper objects */
268 typedef struct _GstRtpBinSession GstRtpBinSession;
269 typedef struct _GstRtpBinStream GstRtpBinStream;
270 typedef struct _GstRtpBinClient GstRtpBinClient;
271
272 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
273
274 static GstCaps *pt_map_requested (GstElement * element, guint pt,
275     GstRtpBinSession * session);
276 static const gchar *sdes_type_to_name (GstRTCPSDESType type);
277 static void gst_rtp_bin_set_sdes_string (GstRtpBin * bin,
278     GstRTCPSDESType type, const gchar * data);
279
280 static void free_stream (GstRtpBinStream * stream);
281
282 /* Manages the RTP stream for one SSRC.
283  *
284  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
285  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
286  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
287  * together (see below).
288  */
289 struct _GstRtpBinStream
290 {
291   /* the SSRC of this stream */
292   guint32 ssrc;
293
294   /* parent bin */
295   GstRtpBin *bin;
296
297   /* the session this SSRC belongs to */
298   GstRtpBinSession *session;
299
300   /* the jitterbuffer of the SSRC */
301   GstElement *buffer;
302
303   /* the PT demuxer of the SSRC */
304   GstElement *demux;
305   gulong demux_newpad_sig;
306   gulong demux_ptreq_sig;
307   gulong demux_pt_change_sig;
308
309   /* the internal pad we use to get RTCP sync messages */
310   GstPad *sync_pad;
311   gboolean have_sync;
312   guint64 last_unix;
313   guint64 last_extrtptime;
314
315   /* mapping to local RTP and NTP time */
316   guint64 local_rtp;
317   guint64 local_unix;
318   gint64 unix_delta;
319
320   /* for lip-sync */
321   guint64 last_clock_base;
322   guint64 clock_base;
323   guint64 clock_base_time;
324   gint clock_rate;
325   gint64 ts_offset;
326   gint64 prev_ts_offset;
327   gint last_pt;
328 };
329
330 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
331 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)
332
333 /* Manages the receiving end of the packets.
334  *
335  * There is one such structure for each RTP session (audio/video/...).
336  * We get the RTP/RTCP packets and stuff them into the session manager. From
337  * there they are pushed into an SSRC demuxer that splits the stream based on
338  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
339  * the GstRtpBinStream above).
340  */
341 struct _GstRtpBinSession
342 {
343   /* session id */
344   gint id;
345   /* the parent bin */
346   GstRtpBin *bin;
347   /* the session element */
348   GstElement *session;
349   /* the SSRC demuxer */
350   GstElement *demux;
351   gulong demux_newpad_sig;
352
353   GMutex *lock;
354
355   /* list of GstRtpBinStream */
356   GSList *streams;
357
358   /* mapping of payload type to caps */
359   GHashTable *ptmap;
360
361   /* the pads of the session */
362   GstPad *recv_rtp_sink;
363   GstPad *recv_rtp_src;
364   GstPad *recv_rtcp_sink;
365   GstPad *sync_src;
366   GstPad *send_rtp_sink;
367   GstPad *send_rtp_src;
368   GstPad *send_rtcp_src;
369 };
370
371 /* Manages the RTP streams that come from one client and should therefore be
372  * synchronized.
373  */
374 struct _GstRtpBinClient
375 {
376   /* the common CNAME for the streams */
377   gchar *cname;
378   guint cname_len;
379
380   /* the streams */
381   guint nstreams;
382   GSList *streams;
383
384   gint64 min_delta;
385 };
386
387 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
388 static GstRtpBinSession *
389 find_session_by_id (GstRtpBin * rtpbin, gint id)
390 {
391   GSList *walk;
392
393   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
394     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
395
396     if (sess->id == id)
397       return sess;
398   }
399   return NULL;
400 }
401
402 static void
403 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
404 {
405   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
406       sess->id, ssrc);
407 }
408
409 static void
410 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
411 {
412   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
413       sess->id, ssrc);
414 }
415
416 static void
417 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
418 {
419   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
420       sess->id, ssrc);
421 }
422
423 static void
424 on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
425 {
426   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
427       sess->id, ssrc);
428 }
429
430 static void
431 on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
432 {
433   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
434       sess->id, ssrc);
435 }
436
437 static void
438 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
439 {
440   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
441       sess->id, ssrc);
442 }
443
444 static void
445 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
446 {
447   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
448       sess->id, ssrc);
449 }
450
451 static void
452 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
453 {
454   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
455       sess->id, ssrc);
456 }
457
458 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
459 static GstRtpBinSession *
460 create_session (GstRtpBin * rtpbin, gint id)
461 {
462   GstRtpBinSession *sess;
463   GstElement *session, *demux;
464   gint i;
465
466   if (!(session = gst_element_factory_make ("gstrtpsession", NULL)))
467     goto no_session;
468
469   if (!(demux = gst_element_factory_make ("gstrtpssrcdemux", NULL)))
470     goto no_demux;
471
472   sess = g_new0 (GstRtpBinSession, 1);
473   sess->lock = g_mutex_new ();
474   sess->id = id;
475   sess->bin = rtpbin;
476   sess->session = session;
477   sess->demux = demux;
478   sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
479       (GDestroyNotify) gst_caps_unref);
480   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
481
482   /* set NTP base or new session */
483   g_object_set (session, "ntp-ns-base", rtpbin->priv->ntp_ns_base, NULL);
484   /* configure SDES items */
485   GST_OBJECT_LOCK (rtpbin);
486   for (i = GST_RTCP_SDES_CNAME; i < GST_RTCP_SDES_PRIV; i++) {
487     g_object_set (session, sdes_type_to_name (i), rtpbin->sdes[i], NULL);
488   }
489   GST_OBJECT_UNLOCK (rtpbin);
490
491   /* provide clock_rate to the session manager when needed */
492   g_signal_connect (session, "request-pt-map",
493       (GCallback) pt_map_requested, sess);
494
495   g_signal_connect (sess->session, "on-new-ssrc",
496       (GCallback) on_new_ssrc, sess);
497   g_signal_connect (sess->session, "on-ssrc-collision",
498       (GCallback) on_ssrc_collision, sess);
499   g_signal_connect (sess->session, "on-ssrc-validated",
500       (GCallback) on_ssrc_validated, sess);
501   g_signal_connect (sess->session, "on-ssrc-active",
502       (GCallback) on_ssrc_active, sess);
503   g_signal_connect (sess->session, "on-ssrc-sdes",
504       (GCallback) on_ssrc_sdes, sess);
505   g_signal_connect (sess->session, "on-bye-ssrc",
506       (GCallback) on_bye_ssrc, sess);
507   g_signal_connect (sess->session, "on-bye-timeout",
508       (GCallback) on_bye_timeout, sess);
509   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
510
511   /* FIXME, change state only to what's needed */
512   gst_bin_add (GST_BIN_CAST (rtpbin), session);
513   gst_element_set_state (session, GST_STATE_PLAYING);
514   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
515   gst_element_set_state (demux, GST_STATE_PLAYING);
516
517   return sess;
518
519   /* ERRORS */
520 no_session:
521   {
522     g_warning ("gstrtpbin: could not create gstrtpsession element");
523     return NULL;
524   }
525 no_demux:
526   {
527     gst_object_unref (session);
528     g_warning ("gstrtpbin: could not create gstrtpssrcdemux element");
529     return NULL;
530   }
531 }
532
533 static void
534 free_session (GstRtpBinSession * sess)
535 {
536   GstRtpBin *bin;
537
538   bin = sess->bin;
539
540   gst_element_set_state (sess->session, GST_STATE_NULL);
541   gst_element_set_state (sess->demux, GST_STATE_NULL);
542
543   if (sess->recv_rtp_sink != NULL)
544     gst_element_release_request_pad (sess->session, sess->recv_rtp_sink);
545   if (sess->recv_rtp_src != NULL)
546     gst_object_unref (sess->recv_rtp_src);
547   if (sess->recv_rtcp_sink != NULL)
548     gst_element_release_request_pad (sess->session, sess->recv_rtcp_sink);
549   if (sess->sync_src != NULL)
550     gst_object_unref (sess->sync_src);
551   if (sess->send_rtp_sink != NULL)
552     gst_element_release_request_pad (sess->session, sess->send_rtp_sink);
553   if (sess->send_rtp_src != NULL)
554     gst_object_unref (sess->send_rtp_src);
555   if (sess->send_rtcp_src != NULL)
556     gst_element_release_request_pad (sess->session, sess->send_rtcp_src);
557
558   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
559   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
560
561   g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
562   g_slist_free (sess->streams);
563
564   g_mutex_free (sess->lock);
565   g_hash_table_destroy (sess->ptmap);
566
567   bin->sessions = g_slist_remove (bin->sessions, sess);
568
569   g_free (sess);
570 }
571
572 #if 0
573 static GstRtpBinStream *
574 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
575 {
576   GSList *walk;
577
578   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
579     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
580
581     if (stream->ssrc == ssrc)
582       return stream;
583   }
584   return NULL;
585 }
586 #endif
587
588 /* get the payload type caps for the specific payload @pt in @session */
589 static GstCaps *
590 get_pt_map (GstRtpBinSession * session, guint pt)
591 {
592   GstCaps *caps = NULL;
593   GstRtpBin *bin;
594   GValue ret = { 0 };
595   GValue args[3] = { {0}, {0}, {0} };
596
597   GST_DEBUG ("searching pt %d in cache", pt);
598
599   GST_RTP_SESSION_LOCK (session);
600
601   /* first look in the cache */
602   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
603   if (caps) {
604     gst_caps_ref (caps);
605     goto done;
606   }
607
608   bin = session->bin;
609
610   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
611
612   /* not in cache, send signal to request caps */
613   g_value_init (&args[0], GST_TYPE_ELEMENT);
614   g_value_set_object (&args[0], bin);
615   g_value_init (&args[1], G_TYPE_UINT);
616   g_value_set_uint (&args[1], session->id);
617   g_value_init (&args[2], G_TYPE_UINT);
618   g_value_set_uint (&args[2], pt);
619
620   g_value_init (&ret, GST_TYPE_CAPS);
621   g_value_set_boxed (&ret, NULL);
622
623   GST_RTP_SESSION_UNLOCK (session);
624
625   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
626
627   GST_RTP_SESSION_LOCK (session);
628
629   g_value_unset (&args[0]);
630   g_value_unset (&args[1]);
631   g_value_unset (&args[2]);
632
633   /* look in the cache again because we let the lock go */
634   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
635   if (caps) {
636     gst_caps_ref (caps);
637     g_value_unset (&ret);
638     goto done;
639   }
640
641   caps = (GstCaps *) g_value_dup_boxed (&ret);
642   g_value_unset (&ret);
643   if (!caps)
644     goto no_caps;
645
646   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
647
648   /* store in cache, take additional ref */
649   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
650       gst_caps_ref (caps));
651
652 done:
653   GST_RTP_SESSION_UNLOCK (session);
654
655   return caps;
656
657   /* ERRORS */
658 no_caps:
659   {
660     GST_RTP_SESSION_UNLOCK (session);
661     GST_DEBUG ("no pt map could be obtained");
662     return NULL;
663   }
664 }
665
666 static gboolean
667 return_true (gpointer key, gpointer value, gpointer user_data)
668 {
669   return TRUE;
670 }
671
672 static void
673 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
674 {
675   GSList *sessions, *streams;
676
677   GST_RTP_BIN_LOCK (bin);
678   GST_DEBUG_OBJECT (bin, "clearing pt map");
679   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
680     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
681
682     GST_DEBUG_OBJECT (bin, "clearing session %p", session);
683     g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
684
685     GST_RTP_SESSION_LOCK (session);
686     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
687
688     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
689       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
690
691       GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
692       g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
693       g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
694     }
695     GST_RTP_SESSION_UNLOCK (session);
696   }
697   GST_RTP_BIN_UNLOCK (bin);
698 }
699
700 static void
701 gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
702     const gchar * name, const GValue * value)
703 {
704   GSList *sessions, *streams;
705
706   GST_RTP_BIN_LOCK (bin);
707   for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
708     GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
709
710     GST_RTP_SESSION_LOCK (session);
711     for (streams = session->streams; streams; streams = g_slist_next (streams)) {
712       GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
713
714       g_object_set_property (G_OBJECT (stream->buffer), name, value);
715     }
716     GST_RTP_SESSION_UNLOCK (session);
717   }
718   GST_RTP_BIN_UNLOCK (bin);
719 }
720
721 /* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
722 static GstRtpBinClient *
723 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
724 {
725   GstRtpBinClient *result = NULL;
726   GSList *walk;
727
728   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
729     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
730
731     if (len != client->cname_len)
732       continue;
733
734     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
735       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
736           client->cname);
737       result = client;
738       break;
739     }
740   }
741
742   /* nothing found, create one */
743   if (result == NULL) {
744     result = g_new0 (GstRtpBinClient, 1);
745     result->cname = g_strndup ((gchar *) data, len);
746     result->cname_len = len;
747     result->min_delta = G_MAXINT64;
748     bin->clients = g_slist_prepend (bin->clients, result);
749     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
750         result->cname);
751   }
752   return result;
753 }
754
755 static void
756 free_client (GstRtpBinClient * client)
757 {
758   g_slist_free (client->streams);
759   g_free (client->cname);
760   g_free (client);
761 }
762
763 /* associate a stream to the given CNAME. This will make sure all streams for
764  * that CNAME are synchronized together. */
765 static void
766 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
767     guint8 * data)
768 {
769   GstRtpBinClient *client;
770   gboolean created;
771   GSList *walk;
772
773   /* first find or create the CNAME */
774   GST_RTP_BIN_LOCK (bin);
775   client = get_client (bin, len, data, &created);
776
777   /* find stream in the client */
778   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
779     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
780
781     if (ostream == stream)
782       break;
783   }
784   /* not found, add it to the list */
785   if (walk == NULL) {
786     GST_DEBUG_OBJECT (bin,
787         "new association of SSRC %08x with client %p with CNAME %s",
788         stream->ssrc, client, client->cname);
789     client->streams = g_slist_prepend (client->streams, stream);
790     client->nstreams++;
791   } else {
792     GST_DEBUG_OBJECT (bin,
793         "found association of SSRC %08x with client %p with CNAME %s",
794         stream->ssrc, client, client->cname);
795   }
796
797   /* we can only continue if we know the local clock-base and clock-rate */
798   if (stream->clock_base == -1)
799     goto no_clock_base;
800
801   if (stream->clock_rate <= 0) {
802     gint pt = -1;
803     GstCaps *caps = NULL;
804     GstStructure *s = NULL;
805
806     GST_RTP_SESSION_LOCK (stream->session);
807     pt = stream->last_pt;
808     GST_RTP_SESSION_UNLOCK (stream->session);
809
810     if (pt < 0)
811       goto no_clock_rate;
812
813     caps = get_pt_map (stream->session, pt);
814     if (!caps)
815       goto no_clock_rate;
816
817     s = gst_caps_get_structure (caps, 0);
818     gst_structure_get_int (s, "clock-rate", &stream->clock_rate);
819     gst_caps_unref (caps);
820
821     if (stream->clock_rate <= 0)
822       goto no_clock_rate;
823   }
824
825   /* map last RTP time to local timeline using our clock-base */
826   stream->local_rtp = stream->last_extrtptime - stream->clock_base;
827
828   GST_DEBUG_OBJECT (bin,
829       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
830       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base,
831       stream->last_extrtptime, stream->local_rtp, stream->clock_rate);
832
833   /* calculate local NTP time in gstreamer timestamp */
834   stream->local_unix =
835       gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND,
836       stream->clock_rate);
837   stream->local_unix += stream->clock_base_time;
838   /* calculate delta between server and receiver */
839   stream->unix_delta = stream->last_unix - stream->local_unix;
840
841   GST_DEBUG_OBJECT (bin,
842       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
843       ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix,
844       stream->unix_delta);
845
846   /* recalc inter stream playout offset, but only if there are more than one
847    * stream. */
848   if (client->nstreams > 1) {
849     gint64 min;
850
851     /* calculate the min of all deltas */
852     min = G_MAXINT64;
853     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
854       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
855
856       if (ostream->unix_delta && ostream->unix_delta < min)
857         min = ostream->unix_delta;
858     }
859
860     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
861         min);
862
863     /* calculate offsets for each stream */
864     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
865       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
866
867       if (ostream->unix_delta == 0)
868         continue;
869
870       ostream->ts_offset = ostream->unix_delta - min;
871
872       /* delta changed, see how much */
873       if (ostream->prev_ts_offset != ostream->ts_offset) {
874         gint64 diff;
875
876         if (ostream->prev_ts_offset > ostream->ts_offset)
877           diff = ostream->prev_ts_offset - ostream->ts_offset;
878         else
879           diff = ostream->ts_offset - ostream->prev_ts_offset;
880
881         GST_DEBUG_OBJECT (bin,
882             "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
883             ", diff: %" G_GINT64_FORMAT, ostream->ts_offset,
884             ostream->prev_ts_offset, diff);
885
886         /* only change diff when it changed more than 1 millisecond. This
887          * compensates for rounding errors in NTP to RTP timestamp
888          * conversions */
889         if (diff > GST_MSECOND && diff < (3 * GST_SECOND)) {
890           g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL);
891           ostream->prev_ts_offset = ostream->ts_offset;
892         }
893       }
894       GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
895           ostream->ssrc, ostream->ts_offset);
896     }
897   }
898   GST_RTP_BIN_UNLOCK (bin);
899   return;
900
901 no_clock_base:
902   {
903     GST_WARNING_OBJECT (bin, "we have no clock-base");
904     GST_RTP_BIN_UNLOCK (bin);
905     return;
906   }
907 no_clock_rate:
908   {
909     GST_WARNING_OBJECT (bin, "we have no clock-rate");
910     GST_RTP_BIN_UNLOCK (bin);
911     return;
912   }
913 }
914
915 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
916   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
917           (b) = gst_rtcp_packet_move_to_next ((packet)))
918
919 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
920   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
921           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
922
923 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
924   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
925           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
926
927 static GstFlowReturn
928 gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
929 {
930   GstFlowReturn ret = GST_FLOW_OK;
931   GstRtpBinStream *stream;
932   GstRtpBin *bin;
933   GstRTCPPacket packet;
934   guint32 ssrc;
935   guint64 ntptime;
936   guint32 rtptime;
937   gboolean have_sr, have_sdes;
938   gboolean more;
939   guint64 clock_base;
940
941   clock_base = GST_BUFFER_OFFSET (buffer);
942
943   stream = gst_pad_get_element_private (pad);
944   bin = stream->bin;
945
946   GST_DEBUG_OBJECT (bin, "received sync packet");
947
948   if (!gst_rtcp_buffer_validate (buffer))
949     goto invalid_rtcp;
950
951   /* clock base changes when there is a huge gap in the timestamps or seqnum.
952    * When this happens we don't want to calculate the extended timestamp based
953    * on the previous one but reset the calculation. */
954   if (stream->last_clock_base != clock_base) {
955     stream->last_extrtptime = -1;
956     stream->last_clock_base = clock_base;
957   }
958
959   have_sr = FALSE;
960   have_sdes = FALSE;
961   GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) {
962     /* first packet must be SR or RR or else the validate would have failed */
963     switch (gst_rtcp_packet_get_type (&packet)) {
964       case GST_RTCP_TYPE_SR:
965         /* only parse first. There is only supposed to be one SR in the packet
966          * but we will deal with malformed packets gracefully */
967         if (have_sr)
968           break;
969         /* get NTP and RTP times */
970         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
971             NULL, NULL);
972
973         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
974         /* ignore SR that is not ours */
975         if (ssrc != stream->ssrc)
976           continue;
977
978         have_sr = TRUE;
979
980         /* store values in the stream */
981         stream->have_sync = TRUE;
982         stream->last_unix = gst_rtcp_ntp_to_unix (ntptime);
983         /* use extended timestamp */
984         gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime);
985         break;
986       case GST_RTCP_TYPE_SDES:
987       {
988         gboolean more_items, more_entries;
989
990         /* only deal with first SDES, there is only supposed to be one SDES in
991          * the RTCP packet but we deal with bad packets gracefully. Also bail
992          * out if we have not seen an SR item yet. */
993         if (have_sdes || !have_sr)
994           break;
995
996         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
997           /* skip items that are not about the SSRC of the sender */
998           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
999             continue;
1000
1001           /* find the CNAME entry */
1002           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
1003             GstRTCPSDESType type;
1004             guint8 len;
1005             guint8 *data;
1006
1007             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
1008
1009             if (type == GST_RTCP_SDES_CNAME) {
1010               stream->clock_base = clock_base;
1011               stream->clock_base_time = GST_BUFFER_OFFSET_END (buffer);
1012               /* associate the stream to CNAME */
1013               gst_rtp_bin_associate (bin, stream, len, data);
1014             }
1015           }
1016         }
1017         have_sdes = TRUE;
1018         break;
1019       }
1020       default:
1021         /* we can ignore these packets */
1022         break;
1023     }
1024   }
1025
1026   gst_buffer_unref (buffer);
1027
1028   return ret;
1029
1030   /* ERRORS */
1031 invalid_rtcp:
1032   {
1033     /* this is fatal and should be filtered earlier */
1034     GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL),
1035         ("invalid RTCP packet received"));
1036     gst_buffer_unref (buffer);
1037     return GST_FLOW_ERROR;
1038   }
1039 }
1040
1041 /* create a new stream with @ssrc in @session. Must be called with
1042  * RTP_SESSION_LOCK. */
1043 static GstRtpBinStream *
1044 create_stream (GstRtpBinSession * session, guint32 ssrc)
1045 {
1046   GstElement *buffer, *demux;
1047   GstRtpBinStream *stream;
1048   GstPadTemplate *templ;
1049   gchar *padname;
1050
1051   if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL)))
1052     goto no_jitterbuffer;
1053
1054   if (!(demux = gst_element_factory_make ("gstrtpptdemux", NULL)))
1055     goto no_demux;
1056
1057   stream = g_new0 (GstRtpBinStream, 1);
1058   stream->ssrc = ssrc;
1059   stream->bin = session->bin;
1060   stream->session = session;
1061   stream->buffer = buffer;
1062   stream->demux = demux;
1063   stream->last_extrtptime = -1;
1064   stream->last_pt = -1;
1065   stream->have_sync = FALSE;
1066   session->streams = g_slist_prepend (session->streams, stream);
1067
1068   /* make an internal sinkpad for RTCP sync packets. Take ownership of the
1069    * pad. We will link this pad later. */
1070   padname = g_strdup_printf ("sync_%d", ssrc);
1071   templ = gst_static_pad_template_get (&rtpbin_sync_sink_template);
1072   stream->sync_pad = gst_pad_new_from_template (templ, padname);
1073   gst_object_unref (templ);
1074   g_free (padname);
1075   gst_object_ref (stream->sync_pad);
1076   gst_object_sink (stream->sync_pad);
1077   gst_pad_set_element_private (stream->sync_pad, stream);
1078   gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain);
1079   gst_pad_set_active (stream->sync_pad, TRUE);
1080
1081   /* provide clock_rate to the jitterbuffer when needed */
1082   g_signal_connect (buffer, "request-pt-map",
1083       (GCallback) pt_map_requested, session);
1084
1085   /* configure latency and packet lost */
1086   g_object_set (buffer, "latency", session->bin->latency, NULL);
1087   g_object_set (buffer, "do-lost", session->bin->do_lost, NULL);
1088
1089   gst_bin_add (GST_BIN_CAST (session->bin), buffer);
1090   gst_element_set_state (buffer, GST_STATE_PLAYING);
1091   gst_bin_add (GST_BIN_CAST (session->bin), demux);
1092   gst_element_set_state (demux, GST_STATE_PLAYING);
1093
1094   /* link stuff */
1095   gst_element_link (buffer, demux);
1096
1097   return stream;
1098
1099   /* ERRORS */
1100 no_jitterbuffer:
1101   {
1102     g_warning ("gstrtpbin: could not create gstrtpjitterbuffer element");
1103     return NULL;
1104   }
1105 no_demux:
1106   {
1107     gst_object_unref (buffer);
1108     g_warning ("gstrtpbin: could not create gstrtpptdemux element");
1109     return NULL;
1110   }
1111 }
1112
1113 static void
1114 free_stream (GstRtpBinStream * stream)
1115 {
1116   GstRtpBinSession *session;
1117
1118   session = stream->session;
1119
1120   gst_element_set_state (stream->buffer, GST_STATE_NULL);
1121   gst_element_set_state (stream->demux, GST_STATE_NULL);
1122
1123   gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer);
1124   gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux);
1125
1126   gst_object_unref (stream->sync_pad);
1127
1128   session->streams = g_slist_remove (session->streams, stream);
1129
1130   g_free (stream);
1131 }
1132
1133 /* GObject vmethods */
1134 static void gst_rtp_bin_dispose (GObject * object);
1135 static void gst_rtp_bin_finalize (GObject * object);
1136 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
1137     const GValue * value, GParamSpec * pspec);
1138 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
1139     GValue * value, GParamSpec * pspec);
1140
1141 /* GstElement vmethods */
1142 static GstClock *gst_rtp_bin_provide_clock (GstElement * element);
1143 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
1144     GstStateChange transition);
1145 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
1146     GstPadTemplate * templ, const gchar * name);
1147 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
1148 static void gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message);
1149 static void gst_rtp_bin_clear_pt_map (GstRtpBin * bin);
1150
1151 GST_BOILERPLATE (GstRtpBin, gst_rtp_bin, GstBin, GST_TYPE_BIN);
1152
1153 static void
1154 gst_rtp_bin_base_init (gpointer klass)
1155 {
1156   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
1157
1158   /* sink pads */
1159   gst_element_class_add_pad_template (element_class,
1160       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
1161   gst_element_class_add_pad_template (element_class,
1162       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
1163   gst_element_class_add_pad_template (element_class,
1164       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
1165
1166   /* src pads */
1167   gst_element_class_add_pad_template (element_class,
1168       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
1169   gst_element_class_add_pad_template (element_class,
1170       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
1171   gst_element_class_add_pad_template (element_class,
1172       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
1173
1174   gst_element_class_set_details (element_class, &rtpbin_details);
1175 }
1176
1177 static void
1178 gst_rtp_bin_class_init (GstRtpBinClass * klass)
1179 {
1180   GObjectClass *gobject_class;
1181   GstElementClass *gstelement_class;
1182   GstBinClass *gstbin_class;
1183
1184   gobject_class = (GObjectClass *) klass;
1185   gstelement_class = (GstElementClass *) klass;
1186   gstbin_class = (GstBinClass *) klass;
1187
1188   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1189
1190   gobject_class->dispose = gst_rtp_bin_dispose;
1191   gobject_class->finalize = gst_rtp_bin_finalize;
1192   gobject_class->set_property = gst_rtp_bin_set_property;
1193   gobject_class->get_property = gst_rtp_bin_get_property;
1194
1195   g_object_class_install_property (gobject_class, PROP_LATENCY,
1196       g_param_spec_uint ("latency", "Buffer latency in ms",
1197           "Default amount of ms to buffer in the jitterbuffers", 0,
1198           G_MAXUINT, DEFAULT_LATENCY_MS, G_PARAM_READWRITE));
1199
1200   /**
1201    * GstRtpBin::request-pt-map:
1202    * @rtpbin: the object which received the signal
1203    * @session: the session
1204    * @pt: the pt
1205    *
1206    * Request the payload type as #GstCaps for @pt in @session.
1207    */
1208   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1209       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1210       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1211       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
1212       G_TYPE_UINT, G_TYPE_UINT);
1213   /**
1214    * GstRtpBin::clear-pt-map:
1215    * @rtpbin: the object which received the signal
1216    *
1217    * Clear all previously cached pt-mapping obtained with
1218    * #GstRtpBin::request-pt-map.
1219    */
1220   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1221       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1222       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass,
1223           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE,
1224       0, G_TYPE_NONE);
1225
1226   /**
1227    * GstRtpBin::on-new-ssrc:
1228    * @rtpbin: the object which received the signal
1229    * @session: the session
1230    * @ssrc: the SSRC 
1231    *
1232    * Notify of a new SSRC that entered @session.
1233    */
1234   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1235       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1236       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1237       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1238       G_TYPE_UINT, G_TYPE_UINT);
1239   /**
1240    * GstRtpBin::on-ssrc-collision:
1241    * @rtpbin: the object which received the signal
1242    * @session: the session
1243    * @ssrc: the SSRC 
1244    *
1245    * Notify when we have an SSRC collision
1246    */
1247   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1248       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1249       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1250       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1251       G_TYPE_UINT, G_TYPE_UINT);
1252   /**
1253    * GstRtpBin::on-ssrc-validated:
1254    * @rtpbin: the object which received the signal
1255    * @session: the session
1256    * @ssrc: the SSRC 
1257    *
1258    * Notify of a new SSRC that became validated.
1259    */
1260   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1261       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1262       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1263       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1264       G_TYPE_UINT, G_TYPE_UINT);
1265   /**
1266    * GstRtpBin::on-ssrc-active:
1267    * @rtpbin: the object which received the signal
1268    * @session: the session
1269    * @ssrc: the SSRC
1270    *
1271    * Notify of a SSRC that is active, i.e., sending RTCP.
1272    */
1273   gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE] =
1274       g_signal_new ("on-ssrc-active", G_TYPE_FROM_CLASS (klass),
1275       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_active),
1276       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1277       G_TYPE_UINT, G_TYPE_UINT);
1278   /**
1279    * GstRtpBin::on-ssrc-sdes:
1280    * @rtpbin: the object which received the signal
1281    * @session: the session
1282    * @ssrc: the SSRC
1283    *
1284    * Notify of a SSRC that is active, i.e., sending RTCP.
1285    */
1286   gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES] =
1287       g_signal_new ("on-ssrc-sdes", G_TYPE_FROM_CLASS (klass),
1288       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_sdes),
1289       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1290       G_TYPE_UINT, G_TYPE_UINT);
1291
1292   /**
1293    * GstRtpBin::on-bye-ssrc:
1294    * @rtpbin: the object which received the signal
1295    * @session: the session
1296    * @ssrc: the SSRC 
1297    *
1298    * Notify of an SSRC that became inactive because of a BYE packet.
1299    */
1300   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1301       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1302       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1303       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1304       G_TYPE_UINT, G_TYPE_UINT);
1305   /**
1306    * GstRtpBin::on-bye-timeout:
1307    * @rtpbin: the object which received the signal
1308    * @session: the session
1309    * @ssrc: the SSRC 
1310    *
1311    * Notify of an SSRC that has timed out because of BYE
1312    */
1313   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1314       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1315       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1316       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1317       G_TYPE_UINT, G_TYPE_UINT);
1318   /**
1319    * GstRtpBin::on-timeout:
1320    * @rtpbin: the object which received the signal
1321    * @session: the session
1322    * @ssrc: the SSRC 
1323    *
1324    * Notify of an SSRC that has timed out
1325    */
1326   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1327       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1328       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1329       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1330       G_TYPE_UINT, G_TYPE_UINT);
1331
1332   g_object_class_install_property (gobject_class, PROP_SDES_CNAME,
1333       g_param_spec_string ("sdes-cname", "SDES CNAME",
1334           "The CNAME to put in SDES messages of this session",
1335           DEFAULT_SDES_CNAME, G_PARAM_READWRITE));
1336
1337   g_object_class_install_property (gobject_class, PROP_SDES_NAME,
1338       g_param_spec_string ("sdes-name", "SDES NAME",
1339           "The NAME to put in SDES messages of this session",
1340           DEFAULT_SDES_NAME, G_PARAM_READWRITE));
1341
1342   g_object_class_install_property (gobject_class, PROP_SDES_EMAIL,
1343       g_param_spec_string ("sdes-email", "SDES EMAIL",
1344           "The EMAIL to put in SDES messages of this session",
1345           DEFAULT_SDES_EMAIL, G_PARAM_READWRITE));
1346
1347   g_object_class_install_property (gobject_class, PROP_SDES_PHONE,
1348       g_param_spec_string ("sdes-phone", "SDES PHONE",
1349           "The PHONE to put in SDES messages of this session",
1350           DEFAULT_SDES_PHONE, G_PARAM_READWRITE));
1351
1352   g_object_class_install_property (gobject_class, PROP_SDES_LOCATION,
1353       g_param_spec_string ("sdes-location", "SDES LOCATION",
1354           "The LOCATION to put in SDES messages of this session",
1355           DEFAULT_SDES_LOCATION, G_PARAM_READWRITE));
1356
1357   g_object_class_install_property (gobject_class, PROP_SDES_TOOL,
1358       g_param_spec_string ("sdes-tool", "SDES TOOL",
1359           "The TOOL to put in SDES messages of this session",
1360           DEFAULT_SDES_TOOL, G_PARAM_READWRITE));
1361
1362   g_object_class_install_property (gobject_class, PROP_SDES_NOTE,
1363       g_param_spec_string ("sdes-note", "SDES NOTE",
1364           "The NOTE to put in SDES messages of this session",
1365           DEFAULT_SDES_NOTE, G_PARAM_READWRITE));
1366
1367   g_object_class_install_property (gobject_class, PROP_DO_LOST,
1368       g_param_spec_boolean ("do-lost", "Do Lost",
1369           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
1370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1371
1372   gstelement_class->provide_clock =
1373       GST_DEBUG_FUNCPTR (gst_rtp_bin_provide_clock);
1374   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1375   gstelement_class->request_new_pad =
1376       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1377   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1378
1379   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (gst_rtp_bin_handle_message);
1380
1381   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1382
1383   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1384 }
1385
1386 static void
1387 gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass)
1388 {
1389   gchar *str;
1390
1391   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1392   rtpbin->priv->bin_lock = g_mutex_new ();
1393   rtpbin->priv->dyn_lock = g_mutex_new ();
1394   rtpbin->provided_clock = gst_system_clock_obtain ();
1395
1396   rtpbin->latency = DEFAULT_LATENCY_MS;
1397   rtpbin->do_lost = DEFAULT_DO_LOST;
1398
1399   /* some default SDES entries */
1400   str = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ());
1401   gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_CNAME, str);
1402   g_free (str);
1403
1404   gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_NAME, g_get_real_name ());
1405   gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_TOOL, "GStreamer");
1406 }
1407
1408 static void
1409 gst_rtp_bin_dispose (GObject * object)
1410 {
1411   GstRtpBin *rtpbin;
1412
1413   rtpbin = GST_RTP_BIN (object);
1414
1415   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, NULL);
1416   g_slist_free (rtpbin->sessions);
1417   rtpbin->sessions = NULL;
1418   g_slist_foreach (rtpbin->clients, (GFunc) free_client, NULL);
1419   g_slist_free (rtpbin->clients);
1420   rtpbin->clients = NULL;
1421
1422   G_OBJECT_CLASS (parent_class)->dispose (object);
1423 }
1424
1425 static void
1426 gst_rtp_bin_finalize (GObject * object)
1427 {
1428   GstRtpBin *rtpbin;
1429   gint i;
1430
1431   rtpbin = GST_RTP_BIN (object);
1432
1433   for (i = 0; i < 9; i++)
1434     g_free (rtpbin->sdes[i]);
1435
1436   g_mutex_free (rtpbin->priv->bin_lock);
1437   g_mutex_free (rtpbin->priv->dyn_lock);
1438   gst_object_unref (rtpbin->provided_clock);
1439
1440   G_OBJECT_CLASS (parent_class)->finalize (object);
1441 }
1442
1443 static const gchar *
1444 sdes_type_to_name (GstRTCPSDESType type)
1445 {
1446   const gchar *result;
1447
1448   switch (type) {
1449     case GST_RTCP_SDES_CNAME:
1450       result = "sdes-cname";
1451       break;
1452     case GST_RTCP_SDES_NAME:
1453       result = "sdes-name";
1454       break;
1455     case GST_RTCP_SDES_EMAIL:
1456       result = "sdes-email";
1457       break;
1458     case GST_RTCP_SDES_PHONE:
1459       result = "sdes-phone";
1460       break;
1461     case GST_RTCP_SDES_LOC:
1462       result = "sdes-location";
1463       break;
1464     case GST_RTCP_SDES_TOOL:
1465       result = "sdes-tool";
1466       break;
1467     case GST_RTCP_SDES_NOTE:
1468       result = "sdes-note";
1469       break;
1470     case GST_RTCP_SDES_PRIV:
1471       result = "sdes-priv";
1472       break;
1473     default:
1474       result = NULL;
1475       break;
1476   }
1477   return result;
1478 }
1479
1480 static void
1481 gst_rtp_bin_set_sdes_string (GstRtpBin * bin, GstRTCPSDESType type,
1482     const gchar * data)
1483 {
1484   GSList *item;
1485   const gchar *name;
1486
1487   if (type < 0 || type > 8)
1488     return;
1489
1490   GST_OBJECT_LOCK (bin);
1491   g_free (bin->sdes[type]);
1492   bin->sdes[type] = g_strdup (data);
1493   name = sdes_type_to_name (type);
1494   /* store in all sessions */
1495   for (item = bin->sessions; item; item = g_slist_next (item))
1496     g_object_set (item->data, name, bin->sdes[type], NULL);
1497   GST_OBJECT_UNLOCK (bin);
1498 }
1499
1500 static gchar *
1501 gst_rtp_bin_get_sdes_string (GstRtpBin * bin, GstRTCPSDESType type)
1502 {
1503   gchar *result;
1504
1505   if (type < 0 || type > 8)
1506     return NULL;
1507
1508   GST_OBJECT_LOCK (bin);
1509   result = g_strdup (bin->sdes[type]);
1510   GST_OBJECT_UNLOCK (bin);
1511
1512   return result;
1513 }
1514
1515 static void
1516 gst_rtp_bin_set_property (GObject * object, guint prop_id,
1517     const GValue * value, GParamSpec * pspec)
1518 {
1519   GstRtpBin *rtpbin;
1520
1521   rtpbin = GST_RTP_BIN (object);
1522
1523   switch (prop_id) {
1524     case PROP_LATENCY:
1525       GST_RTP_BIN_LOCK (rtpbin);
1526       rtpbin->latency = g_value_get_uint (value);
1527       GST_RTP_BIN_UNLOCK (rtpbin);
1528       /* propegate the property down to the jitterbuffer */
1529       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "latency", value);
1530       break;
1531     case PROP_SDES_CNAME:
1532       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_CNAME,
1533           g_value_get_string (value));
1534       break;
1535     case PROP_SDES_NAME:
1536       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_NAME,
1537           g_value_get_string (value));
1538       break;
1539     case PROP_SDES_EMAIL:
1540       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_EMAIL,
1541           g_value_get_string (value));
1542       break;
1543     case PROP_SDES_PHONE:
1544       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_PHONE,
1545           g_value_get_string (value));
1546       break;
1547     case PROP_SDES_LOCATION:
1548       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_LOC,
1549           g_value_get_string (value));
1550       break;
1551     case PROP_SDES_TOOL:
1552       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_TOOL,
1553           g_value_get_string (value));
1554       break;
1555     case PROP_SDES_NOTE:
1556       gst_rtp_bin_set_sdes_string (rtpbin, GST_RTCP_SDES_NOTE,
1557           g_value_get_string (value));
1558       break;
1559     case PROP_DO_LOST:
1560       GST_RTP_BIN_LOCK (rtpbin);
1561       rtpbin->do_lost = g_value_get_boolean (value);
1562       GST_RTP_BIN_UNLOCK (rtpbin);
1563       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
1564       break;
1565     default:
1566       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1567       break;
1568   }
1569 }
1570
1571 static void
1572 gst_rtp_bin_get_property (GObject * object, guint prop_id,
1573     GValue * value, GParamSpec * pspec)
1574 {
1575   GstRtpBin *rtpbin;
1576
1577   rtpbin = GST_RTP_BIN (object);
1578
1579   switch (prop_id) {
1580     case PROP_LATENCY:
1581       GST_RTP_BIN_LOCK (rtpbin);
1582       g_value_set_uint (value, rtpbin->latency);
1583       GST_RTP_BIN_UNLOCK (rtpbin);
1584       break;
1585     case PROP_SDES_CNAME:
1586       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1587               GST_RTCP_SDES_CNAME));
1588       break;
1589     case PROP_SDES_NAME:
1590       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1591               GST_RTCP_SDES_NAME));
1592       break;
1593     case PROP_SDES_EMAIL:
1594       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1595               GST_RTCP_SDES_EMAIL));
1596       break;
1597     case PROP_SDES_PHONE:
1598       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1599               GST_RTCP_SDES_PHONE));
1600       break;
1601     case PROP_SDES_LOCATION:
1602       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1603               GST_RTCP_SDES_LOC));
1604       break;
1605     case PROP_SDES_TOOL:
1606       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1607               GST_RTCP_SDES_TOOL));
1608       break;
1609     case PROP_SDES_NOTE:
1610       g_value_take_string (value, gst_rtp_bin_get_sdes_string (rtpbin,
1611               GST_RTCP_SDES_NOTE));
1612       break;
1613     case PROP_DO_LOST:
1614       GST_RTP_BIN_LOCK (rtpbin);
1615       g_value_set_boolean (value, rtpbin->do_lost);
1616       GST_RTP_BIN_UNLOCK (rtpbin);
1617       break;
1618     default:
1619       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1620       break;
1621   }
1622 }
1623
1624 static GstClock *
1625 gst_rtp_bin_provide_clock (GstElement * element)
1626 {
1627   GstRtpBin *rtpbin;
1628
1629   rtpbin = GST_RTP_BIN (element);
1630
1631   return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock));
1632 }
1633
1634 static void
1635 gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
1636 {
1637   GstRtpBin *rtpbin;
1638
1639   rtpbin = GST_RTP_BIN (bin);
1640
1641   switch (GST_MESSAGE_TYPE (message)) {
1642     case GST_MESSAGE_ELEMENT:
1643     {
1644       const GstStructure *s = gst_message_get_structure (message);
1645
1646       /* we change the structure name and add the session ID to it */
1647       if (gst_structure_has_name (s, "GstRTPSessionSDES")) {
1648         GSList *walk;
1649
1650         /* find the session, the message source has it */
1651         for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
1652           GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
1653
1654           /* if we found the session, change message. else we exit the loop and
1655            * leave the message unchanged */
1656           if (GST_OBJECT_CAST (sess->session) == GST_MESSAGE_SRC (message)) {
1657             message = gst_message_make_writable (message);
1658             s = gst_message_get_structure (message);
1659
1660             gst_structure_set_name ((GstStructure *) s, "GstRTPBinSDES");
1661
1662             gst_structure_set ((GstStructure *) s, "session", G_TYPE_UINT,
1663                 sess->id, NULL);
1664             break;
1665           }
1666         }
1667       }
1668       /* fallthrough to forward the modified message to the parent */
1669     }
1670     default:
1671     {
1672       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1673       break;
1674     }
1675   }
1676 }
1677
1678 static void
1679 calc_ntp_ns_base (GstRtpBin * bin)
1680 {
1681   GstClockTime now;
1682   GTimeVal current;
1683   GSList *walk;
1684
1685   /* get the current time and convert it to NTP time in nanoseconds */
1686   g_get_current_time (&current);
1687   now = GST_TIMEVAL_TO_TIME (current);
1688   now += (2208988800LL * GST_SECOND);
1689
1690   GST_RTP_BIN_LOCK (bin);
1691   bin->priv->ntp_ns_base = now;
1692   for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
1693     GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
1694
1695     g_object_set (session->session, "ntp-ns-base", now, NULL);
1696   }
1697   GST_RTP_BIN_UNLOCK (bin);
1698
1699   return;
1700 }
1701
1702 static GstStateChangeReturn
1703 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
1704 {
1705   GstStateChangeReturn res;
1706   GstRtpBin *rtpbin;
1707   GstRtpBinPrivate *priv;
1708
1709   rtpbin = GST_RTP_BIN (element);
1710   priv = rtpbin->priv;
1711
1712   switch (transition) {
1713     case GST_STATE_CHANGE_NULL_TO_READY:
1714       break;
1715     case GST_STATE_CHANGE_READY_TO_PAUSED:
1716       GST_LOG_OBJECT (rtpbin, "clearing shutdown flag");
1717       g_atomic_int_set (&priv->shutdown, 0);
1718       break;
1719     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1720       calc_ntp_ns_base (rtpbin);
1721       break;
1722     case GST_STATE_CHANGE_PAUSED_TO_READY:
1723       GST_LOG_OBJECT (rtpbin, "setting shutdown flag");
1724       g_atomic_int_set (&priv->shutdown, 1);
1725       /* wait for all callbacks to end by taking the lock. No new callbacks will
1726        * be able to happen as we set the shutdown flag. */
1727       GST_RTP_BIN_DYN_LOCK (rtpbin);
1728       GST_LOG_OBJECT (rtpbin, "dynamic lock taken, we can continue shutdown");
1729       GST_RTP_BIN_DYN_UNLOCK (rtpbin);
1730       break;
1731     default:
1732       break;
1733   }
1734
1735   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1736
1737   switch (transition) {
1738     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1739       break;
1740     case GST_STATE_CHANGE_PAUSED_TO_READY:
1741       break;
1742     case GST_STATE_CHANGE_READY_TO_NULL:
1743       break;
1744     default:
1745       break;
1746   }
1747   return res;
1748 }
1749
1750 /* a new pad (SSRC) was created in @session. This signal is emited from the
1751  * payload demuxer. */
1752 static void
1753 new_payload_found (GstElement * element, guint pt, GstPad * pad,
1754     GstRtpBinStream * stream)
1755 {
1756   GstRtpBin *rtpbin;
1757   GstElementClass *klass;
1758   GstPadTemplate *templ;
1759   gchar *padname;
1760   GstPad *gpad;
1761
1762   rtpbin = stream->bin;
1763
1764   GST_DEBUG ("new payload pad %d", pt);
1765
1766   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
1767
1768   /* ghost the pad to the parent */
1769   klass = GST_ELEMENT_GET_CLASS (rtpbin);
1770   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
1771   padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d",
1772       stream->session->id, stream->ssrc, pt);
1773   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
1774   g_free (padname);
1775
1776   gst_pad_set_caps (gpad, GST_PAD_CAPS (pad));
1777   gst_pad_set_active (gpad, TRUE);
1778   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
1779
1780   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
1781
1782   return;
1783
1784 shutdown:
1785   {
1786     GST_DEBUG ("ignoring, we are shutting down");
1787     return;
1788   }
1789 }
1790
1791 static GstCaps *
1792 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
1793 {
1794   GstRtpBin *rtpbin;
1795   GstCaps *caps;
1796
1797   rtpbin = session->bin;
1798
1799   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
1800       session->id);
1801
1802   caps = get_pt_map (session, pt);
1803   if (!caps)
1804     goto no_caps;
1805
1806   return caps;
1807
1808   /* ERRORS */
1809 no_caps:
1810   {
1811     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
1812     return NULL;
1813   }
1814 }
1815
1816 /* emited when caps changed for the session */
1817 static void
1818 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
1819 {
1820   GstRtpBin *bin;
1821   GstCaps *caps;
1822   gint payload;
1823   const GstStructure *s;
1824
1825   bin = session->bin;
1826
1827   g_object_get (pad, "caps", &caps, NULL);
1828
1829   if (caps == NULL)
1830     return;
1831
1832   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
1833
1834   s = gst_caps_get_structure (caps, 0);
1835
1836   /* get payload, finish when it's not there */
1837   if (!gst_structure_get_int (s, "payload", &payload))
1838     return;
1839
1840   GST_RTP_SESSION_LOCK (session);
1841   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
1842   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
1843   GST_RTP_SESSION_UNLOCK (session);
1844 }
1845
1846 /* Stores the last payload type received on a particular stream */
1847 static void
1848 payload_type_change (GstElement * element, guint pt, GstRtpBinStream * stream)
1849 {
1850   GST_RTP_SESSION_LOCK (stream->session);
1851   stream->last_pt = pt;
1852   GST_RTP_SESSION_UNLOCK (stream->session);
1853 }
1854
1855 /* a new pad (SSRC) was created in @session */
1856 static void
1857 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
1858     GstRtpBinSession * session)
1859 {
1860   GstRtpBin *rtpbin;
1861   GstRtpBinStream *stream;
1862   GstPad *sinkpad, *srcpad;
1863   gchar *padname;
1864   GstCaps *caps;
1865
1866   rtpbin = session->bin;
1867
1868   GST_DEBUG_OBJECT (rtpbin, "new SSRC pad %08x, %s:%s", ssrc,
1869       GST_DEBUG_PAD_NAME (pad));
1870
1871   GST_RTP_BIN_SHUTDOWN_LOCK (rtpbin, shutdown);
1872
1873   GST_RTP_SESSION_LOCK (session);
1874
1875   /* create new stream */
1876   stream = create_stream (session, ssrc);
1877   if (!stream)
1878     goto no_stream;
1879
1880   /* get the caps of the pad, we need the clock-rate and base_time if any. */
1881   if ((caps = gst_pad_get_caps (pad))) {
1882     const GstStructure *s;
1883     guint val;
1884
1885     GST_DEBUG_OBJECT (rtpbin, "pad has caps %" GST_PTR_FORMAT, caps);
1886
1887     s = gst_caps_get_structure (caps, 0);
1888
1889     if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate)) {
1890       stream->clock_rate = -1;
1891
1892       GST_WARNING_OBJECT (rtpbin,
1893           "Caps have no clock rate %s from pad %s:%s",
1894           gst_caps_to_string (caps), GST_DEBUG_PAD_NAME (pad));
1895     }
1896
1897     stream->last_clock_base = -1;
1898     if (gst_structure_get_uint (s, "clock-base", &val))
1899       stream->clock_base = val;
1900     else
1901       stream->clock_base = -1;
1902
1903     gst_caps_unref (caps);
1904   }
1905
1906   /* get pad and link */
1907   GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer");
1908   padname = g_strdup_printf ("src_%d", ssrc);
1909   srcpad = gst_element_get_static_pad (element, padname);
1910   g_free (padname);
1911   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
1912   gst_pad_link (srcpad, sinkpad);
1913   gst_object_unref (sinkpad);
1914   gst_object_unref (srcpad);
1915
1916   /* get the RTCP sync pad */
1917   GST_DEBUG_OBJECT (rtpbin, "linking sync pad");
1918   padname = g_strdup_printf ("rtcp_src_%d", ssrc);
1919   srcpad = gst_element_get_static_pad (element, padname);
1920   g_free (padname);
1921   gst_pad_link (srcpad, stream->sync_pad);
1922   gst_object_unref (srcpad);
1923
1924   /* connect to the new-pad signal of the payload demuxer, this will expose the
1925    * new pad by ghosting it. */
1926   stream->demux_newpad_sig = g_signal_connect (stream->demux,
1927       "new-payload-type", (GCallback) new_payload_found, stream);
1928   /* connect to the request-pt-map signal. This signal will be emited by the
1929    * demuxer so that it can apply a proper caps on the buffers for the
1930    * depayloaders. */
1931   stream->demux_ptreq_sig = g_signal_connect (stream->demux,
1932       "request-pt-map", (GCallback) pt_map_requested, session);
1933   /* connect to the payload-type-change signal so that we can know which
1934    * PT is the current PT so that the jitterbuffer can be matched to the right
1935    * offset. */
1936   stream->demux_pt_change_sig = g_signal_connect (stream->demux,
1937       "payload-type-change", (GCallback) payload_type_change, stream);
1938
1939   GST_RTP_SESSION_UNLOCK (session);
1940   GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
1941
1942   return;
1943
1944   /* ERRORS */
1945 shutdown:
1946   {
1947     GST_DEBUG_OBJECT (rtpbin, "we are shutting down");
1948     return;
1949   }
1950 no_stream:
1951   {
1952     GST_RTP_SESSION_UNLOCK (session);
1953     GST_RTP_BIN_SHUTDOWN_UNLOCK (rtpbin);
1954     GST_DEBUG_OBJECT (rtpbin, "could not create stream");
1955     return;
1956   }
1957 }
1958
1959 /* Create a pad for receiving RTP for the session in @name. Must be called with
1960  * RTP_BIN_LOCK.
1961  */
1962 static GstPad *
1963 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
1964 {
1965   GstPad *result, *sinkdpad;
1966   guint sessid;
1967   GstRtpBinSession *session;
1968   GstPadLinkReturn lres;
1969
1970   /* first get the session number */
1971   if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1)
1972     goto no_name;
1973
1974   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
1975
1976   /* get or create session */
1977   session = find_session_by_id (rtpbin, sessid);
1978   if (!session) {
1979     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
1980     /* create session now */
1981     session = create_session (rtpbin, sessid);
1982     if (session == NULL)
1983       goto create_error;
1984   }
1985
1986   /* check if pad was requested */
1987   if (session->recv_rtp_sink != NULL)
1988     goto existed;
1989
1990   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
1991   /* get recv_rtp pad and store */
1992   session->recv_rtp_sink =
1993       gst_element_get_request_pad (session->session, "recv_rtp_sink");
1994   if (session->recv_rtp_sink == NULL)
1995     goto pad_failed;
1996
1997   g_signal_connect (session->recv_rtp_sink, "notify::caps",
1998       (GCallback) caps_changed, session);
1999
2000   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
2001   /* get srcpad, link to SSRCDemux */
2002   session->recv_rtp_src =
2003       gst_element_get_static_pad (session->session, "recv_rtp_src");
2004   if (session->recv_rtp_src == NULL)
2005     goto pad_failed;
2006
2007   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
2008   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
2009   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
2010   lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
2011   gst_object_unref (sinkdpad);
2012   if (lres != GST_PAD_LINK_OK)
2013     goto link_failed;
2014
2015   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
2016   session->demux_newpad_sig = g_signal_connect (session->demux,
2017       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
2018
2019   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
2020   result =
2021       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
2022   gst_pad_set_active (result, TRUE);
2023   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2024
2025   return result;
2026
2027   /* ERRORS */
2028 no_name:
2029   {
2030     g_warning ("gstrtpbin: invalid name given");
2031     return NULL;
2032   }
2033 create_error:
2034   {
2035     /* create_session already warned */
2036     return NULL;
2037   }
2038 existed:
2039   {
2040     g_warning ("gstrtpbin: recv_rtp pad already requested for session %d",
2041         sessid);
2042     return NULL;
2043   }
2044 pad_failed:
2045   {
2046     g_warning ("gstrtpbin: failed to get session pad");
2047     return NULL;
2048   }
2049 link_failed:
2050   {
2051     g_warning ("gstrtpbin: failed to link pads");
2052     return NULL;
2053   }
2054 }
2055
2056 /* Create a pad for receiving RTCP for the session in @name. Must be called with
2057  * RTP_BIN_LOCK.
2058  */
2059 static GstPad *
2060 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
2061     const gchar * name)
2062 {
2063   GstPad *result;
2064   guint sessid;
2065   GstRtpBinSession *session;
2066   GstPad *sinkdpad;
2067   GstPadLinkReturn lres;
2068
2069   /* first get the session number */
2070   if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1)
2071     goto no_name;
2072
2073   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
2074
2075   /* get or create the session */
2076   session = find_session_by_id (rtpbin, sessid);
2077   if (!session) {
2078     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
2079     /* create session now */
2080     session = create_session (rtpbin, sessid);
2081     if (session == NULL)
2082       goto create_error;
2083   }
2084
2085   /* check if pad was requested */
2086   if (session->recv_rtcp_sink != NULL)
2087     goto existed;
2088
2089   /* get recv_rtp pad and store */
2090   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
2091   session->recv_rtcp_sink =
2092       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
2093   if (session->recv_rtcp_sink == NULL)
2094     goto pad_failed;
2095
2096   /* get srcpad, link to SSRCDemux */
2097   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
2098   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
2099   if (session->sync_src == NULL)
2100     goto pad_failed;
2101
2102   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
2103   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
2104   lres = gst_pad_link (session->sync_src, sinkdpad);
2105   gst_object_unref (sinkdpad);
2106   if (lres != GST_PAD_LINK_OK)
2107     goto link_failed;
2108
2109   result =
2110       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
2111   gst_pad_set_active (result, TRUE);
2112   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2113
2114   return result;
2115
2116   /* ERRORS */
2117 no_name:
2118   {
2119     g_warning ("gstrtpbin: invalid name given");
2120     return NULL;
2121   }
2122 create_error:
2123   {
2124     /* create_session already warned */
2125     return NULL;
2126   }
2127 existed:
2128   {
2129     g_warning ("gstrtpbin: recv_rtcp pad already requested for session %d",
2130         sessid);
2131     return NULL;
2132   }
2133 pad_failed:
2134   {
2135     g_warning ("gstrtpbin: failed to get session pad");
2136     return NULL;
2137   }
2138 link_failed:
2139   {
2140     g_warning ("gstrtpbin: failed to link pads");
2141     return NULL;
2142   }
2143 }
2144
2145 /* Create a pad for sending RTP for the session in @name. Must be called with
2146  * RTP_BIN_LOCK.
2147  */
2148 static GstPad *
2149 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2150 {
2151   GstPad *result, *srcghost;
2152   gchar *gname;
2153   guint sessid;
2154   GstRtpBinSession *session;
2155   GstElementClass *klass;
2156
2157   /* first get the session number */
2158   if (name == NULL || sscanf (name, "send_rtp_sink_%d", &sessid) != 1)
2159     goto no_name;
2160
2161   /* get or create session */
2162   session = find_session_by_id (rtpbin, sessid);
2163   if (!session) {
2164     /* create session now */
2165     session = create_session (rtpbin, sessid);
2166     if (session == NULL)
2167       goto create_error;
2168   }
2169
2170   /* check if pad was requested */
2171   if (session->send_rtp_sink != NULL)
2172     goto existed;
2173
2174   /* get send_rtp pad and store */
2175   session->send_rtp_sink =
2176       gst_element_get_request_pad (session->session, "send_rtp_sink");
2177   if (session->send_rtp_sink == NULL)
2178     goto pad_failed;
2179
2180   result =
2181       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
2182   gst_pad_set_active (result, TRUE);
2183   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2184
2185   /* get srcpad */
2186   session->send_rtp_src =
2187       gst_element_get_static_pad (session->session, "send_rtp_src");
2188   if (session->send_rtp_src == NULL)
2189     goto no_srcpad;
2190
2191   /* ghost the new source pad */
2192   klass = GST_ELEMENT_GET_CLASS (rtpbin);
2193   gname = g_strdup_printf ("send_rtp_src_%d", sessid);
2194   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d");
2195   srcghost =
2196       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
2197   gst_pad_set_active (srcghost, TRUE);
2198   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost);
2199   g_free (gname);
2200
2201   return result;
2202
2203   /* ERRORS */
2204 no_name:
2205   {
2206     g_warning ("gstrtpbin: invalid name given");
2207     return NULL;
2208   }
2209 create_error:
2210   {
2211     /* create_session already warned */
2212     return NULL;
2213   }
2214 existed:
2215   {
2216     g_warning ("gstrtpbin: send_rtp pad already requested for session %d",
2217         sessid);
2218     return NULL;
2219   }
2220 pad_failed:
2221   {
2222     g_warning ("gstrtpbin: failed to get session pad for session %d", sessid);
2223     return NULL;
2224   }
2225 no_srcpad:
2226   {
2227     g_warning ("gstrtpbin: failed to get rtp source pad for session %d",
2228         sessid);
2229     return NULL;
2230   }
2231 }
2232
2233 /* Create a pad for sending RTCP for the session in @name. Must be called with
2234  * RTP_BIN_LOCK.
2235  */
2236 static GstPad *
2237 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
2238 {
2239   GstPad *result;
2240   guint sessid;
2241   GstRtpBinSession *session;
2242
2243   /* first get the session number */
2244   if (name == NULL || sscanf (name, "send_rtcp_src_%d", &sessid) != 1)
2245     goto no_name;
2246
2247   /* get or create session */
2248   session = find_session_by_id (rtpbin, sessid);
2249   if (!session)
2250     goto no_session;
2251
2252   /* check if pad was requested */
2253   if (session->send_rtcp_src != NULL)
2254     goto existed;
2255
2256   /* get rtcp_src pad and store */
2257   session->send_rtcp_src =
2258       gst_element_get_request_pad (session->session, "send_rtcp_src");
2259   if (session->send_rtcp_src == NULL)
2260     goto pad_failed;
2261
2262   result =
2263       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
2264   gst_pad_set_active (result, TRUE);
2265   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
2266
2267   return result;
2268
2269   /* ERRORS */
2270 no_name:
2271   {
2272     g_warning ("gstrtpbin: invalid name given");
2273     return NULL;
2274   }
2275 no_session:
2276   {
2277     g_warning ("gstrtpbin: session with id %d does not exist", sessid);
2278     return NULL;
2279   }
2280 existed:
2281   {
2282     g_warning ("gstrtpbin: send_rtcp_src pad already requested for session %d",
2283         sessid);
2284     return NULL;
2285   }
2286 pad_failed:
2287   {
2288     g_warning ("gstrtpbin: failed to get rtcp pad for session %d", sessid);
2289     return NULL;
2290   }
2291 }
2292
2293 /* If the requested name is NULL we should create a name with
2294  * the session number assuming we want the lowest posible session
2295  * with a free pad like the template */
2296 static gchar *
2297 gst_rtp_bin_get_free_pad_name (GstElement * element, GstPadTemplate * templ)
2298 {
2299   gboolean name_found = FALSE;
2300   gint session = 0;
2301   GstPad *pad = NULL;
2302   GstIterator *pad_it = NULL;
2303   gchar *pad_name = NULL;
2304
2305   GST_DEBUG_OBJECT (element, "find a free pad name for template");
2306   while (!name_found) {
2307     g_free (pad_name);
2308     pad_name = g_strdup_printf (templ->name_template, session++);
2309     pad_it = gst_element_iterate_pads (GST_ELEMENT (element));
2310     name_found = TRUE;
2311     while (gst_iterator_next (pad_it, (gpointer) & pad) == GST_ITERATOR_OK) {
2312       gchar *name;
2313
2314       name = gst_pad_get_name (pad);
2315       if (strcmp (name, pad_name) == 0)
2316         name_found = FALSE;
2317       g_free (name);
2318     }
2319     gst_iterator_free (pad_it);
2320   }
2321
2322   GST_DEBUG_OBJECT (element, "free pad name found: '%s'", pad_name);
2323   return pad_name;
2324 }
2325
2326 /* 
2327  */
2328 static GstPad *
2329 gst_rtp_bin_request_new_pad (GstElement * element,
2330     GstPadTemplate * templ, const gchar * name)
2331 {
2332   GstRtpBin *rtpbin;
2333   GstElementClass *klass;
2334   GstPad *result;
2335   gchar *pad_name = NULL;
2336
2337   g_return_val_if_fail (templ != NULL, NULL);
2338   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
2339
2340   rtpbin = GST_RTP_BIN (element);
2341   klass = GST_ELEMENT_GET_CLASS (element);
2342
2343   GST_RTP_BIN_LOCK (rtpbin);
2344
2345   if (name == NULL) {
2346     /* use a free pad name */
2347     pad_name = gst_rtp_bin_get_free_pad_name (element, templ);
2348   } else {
2349     /* use the provided name */
2350     pad_name = g_strdup (name);
2351   }
2352
2353   GST_DEBUG ("Trying to request a pad with name %s", pad_name);
2354
2355   /* figure out the template */
2356   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%d")) {
2357     result = create_recv_rtp (rtpbin, templ, pad_name);
2358   } else if (templ == gst_element_class_get_pad_template (klass,
2359           "recv_rtcp_sink_%d")) {
2360     result = create_recv_rtcp (rtpbin, templ, pad_name);
2361   } else if (templ == gst_element_class_get_pad_template (klass,
2362           "send_rtp_sink_%d")) {
2363     result = create_send_rtp (rtpbin, templ, pad_name);
2364   } else if (templ == gst_element_class_get_pad_template (klass,
2365           "send_rtcp_src_%d")) {
2366     result = create_rtcp (rtpbin, templ, pad_name);
2367   } else
2368     goto wrong_template;
2369
2370   g_free (pad_name);
2371   GST_RTP_BIN_UNLOCK (rtpbin);
2372
2373   return result;
2374
2375   /* ERRORS */
2376 wrong_template:
2377   {
2378     g_free (pad_name);
2379     GST_RTP_BIN_UNLOCK (rtpbin);
2380     g_warning ("gstrtpbin: this is not our template");
2381     return NULL;
2382   }
2383 }
2384
2385 static void
2386 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
2387 {
2388 }