gst/rtpmanager/: Various leak fixes.
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpbin.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gstrtpbin
22  * @short_description: handle media from one RTP bin
23  * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
24  *
25  * <refsect2>
26  * <para>
27  * RTP bin combines the functions of gstrtpsession, gstrtpssrcdemux, gstrtpjitterbuffer
28  * and gstrtpptdemux in one element. It allows for multiple RTP sessions that will
29  * be synchronized together using RTCP SR packets.
30  * </para>
31  * <para>
32  * gstrtpbin is configured with a number of request pads that define the
33  * functionality that is activated, similar to the gstrtpsession element.
34  * </para>
35  * <para>
36  * To use gstrtpbin as an RTP receiver, request a recv_rtp_sink_%%d pad. The session
37  * number must be specified in the pad name. 
38  * Data received on the recv_rtp_sink_%%d pad will be processed in the gstrtpsession
39  * manager and after being validated forwarded on gstrtpssrcdemuxer element. Each
40  * RTP stream is demuxed based on the SSRC and send to a gstrtpjitterbuffer. After
41  * the packets are released from the jitterbuffer, they will be forwarded to a
42  * gstrtpptdemuxer element. The gstrtpptdemuxer element will demux the packets based
43  * on the payload type and will create a unique pad recv_rtp_src_%%d_%%d_%%d on
44  * gstrtpbin with the session number, SSRC and payload type respectively as the pad
45  * name.
46  * </para>
47  * <para>
48  * To also use gstrtpbin as an RTCP receiver, request a recv_rtcp_sink_%%d pad. The
49  * session number must be specified in the pad name.
50  * </para>
51  * <para>
52  * If you want the session manager to generate and send RTCP packets, request
53  * the send_rtcp_src_%%d pad with the session number in the pad name. Packet pushed
54  * on this pad contain SR/RR RTCP reports that should be sent to all participants
55  * in the session.
56  * </para>
57  * <para>
58  * To use gstrtpbin as a sender, request a send_rtp_sink_%%d pad, which will
59  * automatically create a send_rtp_src_%%d pad. The session number must be specified when
60  * requesting the sink pad. The session manager will modify the
61  * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
62  * send_rtp_src_%%d pad after updating its internal state.
63  * </para>
64  * <para>
65  * The session manager needs the clock-rate of the payload types it is handling
66  * and will signal the GstRtpSession::request-pt-map signal when it needs such a
67  * mapping. One can clear the cached values with the GstRtpSession::clear-pt-map
68  * signal.
69  * </para>
70  * <title>Example pipelines</title>
71  * <para>
72  * <programlisting>
73  * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
74  *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
75  * </programlisting>
76  * Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
77  * </para>
78  * <para>
79  * <programlisting>
80  * gst-launch gstrtpbin name=rtpbin \
81  *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
82  *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
83  *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
84  *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
85  *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
86  *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
87  *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
88  *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
89  * </programlisting>
90  * Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
91  * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
92  * and the audio is sent to session 1. Video packets are sent on UDP port 5000
93  * and audio packets on port 5002. The video RTCP packets for session 0 are sent
94  * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
95  * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
96  * is received on port 5007. Since RTCP packets from the sender should be sent
97  * as soon as possible and do not participate in preroll, sync=false and 
98  * async=false is configured on udpsink
99  * </para>
100  * <para>
101  * <programlisting>
102  *  gst-launch -v gstrtpbin name=rtpbin                                          \
103  *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
104  *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
105  *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
106  *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
107  *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
108  *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
109  *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
110  *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
111  *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
112  *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
113  * </programlisting>
114  * Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
115  * decode and display the video.
116  * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
117  * decode and play the audio.
118  * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
119  * session 1 on port 5003. These packets will be used for session management and
120  * synchronisation.
121  * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
122  * on port 5007.
123  * </para>
124  * </refsect2>
125  *
126  * Last reviewed on 2007-08-30 (0.10.6)
127  */
128
129 #ifdef HAVE_CONFIG_H
130 #include "config.h"
131 #endif
132 #include <string.h>
133
134 #include <gst/rtp/gstrtpbuffer.h>
135 #include <gst/rtp/gstrtcpbuffer.h>
136
137 #include "gstrtpbin-marshal.h"
138 #include "gstrtpbin.h"
139
140 GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
141 #define GST_CAT_DEFAULT gst_rtp_bin_debug
142
143
144 /* elementfactory information */
145 static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
146     "Filter/Network/RTP",
147     "Implement an RTP bin",
148     "Wim Taymans <wim@fluendo.com>");
149
150 /* sink pads */
151 static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
152 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
153     GST_PAD_SINK,
154     GST_PAD_REQUEST,
155     GST_STATIC_CAPS ("application/x-rtp")
156     );
157
158 static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS ("application/x-rtcp")
163     );
164
165 static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
166 GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
167     GST_PAD_SINK,
168     GST_PAD_REQUEST,
169     GST_STATIC_CAPS ("application/x-rtp")
170     );
171
172 /* src pads */
173 static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
174 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
175     GST_PAD_SRC,
176     GST_PAD_SOMETIMES,
177     GST_STATIC_CAPS ("application/x-rtp")
178     );
179
180 static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
181 GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
182     GST_PAD_SRC,
183     GST_PAD_REQUEST,
184     GST_STATIC_CAPS ("application/x-rtcp")
185     );
186
187 static GstStaticPadTemplate rtpbin_send_rtp_src_template =
188 GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
189     GST_PAD_SRC,
190     GST_PAD_SOMETIMES,
191     GST_STATIC_CAPS ("application/x-rtp")
192     );
193
194 /* padtemplate for the internal pad */
195 static GstStaticPadTemplate rtpbin_sync_sink_template =
196 GST_STATIC_PAD_TEMPLATE ("sink_%d",
197     GST_PAD_SINK,
198     GST_PAD_SOMETIMES,
199     GST_STATIC_CAPS ("application/x-rtcp")
200     );
201
202 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
203    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
204
205 #define GST_RTP_BIN_LOCK(bin)   g_mutex_lock ((bin)->priv->bin_lock)
206 #define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)
207
208 struct _GstRtpBinPrivate
209 {
210   GMutex *bin_lock;
211
212   GstClockTime ntp_ns_base;
213 };
214
215 /* signals and args */
216 enum
217 {
218   SIGNAL_REQUEST_PT_MAP,
219   SIGNAL_CLEAR_PT_MAP,
220
221   SIGNAL_ON_NEW_SSRC,
222   SIGNAL_ON_SSRC_COLLISION,
223   SIGNAL_ON_SSRC_VALIDATED,
224   SIGNAL_ON_BYE_SSRC,
225   SIGNAL_ON_BYE_TIMEOUT,
226   SIGNAL_ON_TIMEOUT,
227   LAST_SIGNAL
228 };
229
230 #define DEFAULT_LATENCY_MS      200
231
232 enum
233 {
234   PROP_0,
235   PROP_LATENCY
236 };
237
238 /* helper objects */
239 typedef struct _GstRtpBinSession GstRtpBinSession;
240 typedef struct _GstRtpBinStream GstRtpBinStream;
241 typedef struct _GstRtpBinClient GstRtpBinClient;
242
243 static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };
244
245 static GstCaps *pt_map_requested (GstElement * element, guint pt,
246     GstRtpBinSession * session);
247
248 static void free_stream (GstRtpBinStream * stream);
249
250 /* Manages the RTP stream for one SSRC.
251  *
252  * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
253  * If we see an SDES RTCP packet that links multiple SSRCs together based on a
254  * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
255  * together (see below).
256  */
257 struct _GstRtpBinStream
258 {
259   /* the SSRC of this stream */
260   guint32 ssrc;
261
262   /* parent bin */
263   GstRtpBin *bin;
264
265   /* the session this SSRC belongs to */
266   GstRtpBinSession *session;
267
268   /* the jitterbuffer of the SSRC */
269   GstElement *buffer;
270
271   /* the PT demuxer of the SSRC */
272   GstElement *demux;
273   gulong demux_newpad_sig;
274   gulong demux_ptreq_sig;
275
276   /* the internal pad we use to get RTCP sync messages */
277   GstPad *sync_pad;
278   gboolean have_sync;
279   guint64 last_unix;
280   guint64 last_extrtptime;
281
282   /* mapping to local RTP and NTP time */
283   guint64 local_rtp;
284   guint64 local_unix;
285   gint64 unix_delta;
286
287   /* for lip-sync */
288   guint64 clock_base;
289   gint clock_rate;
290   gint64 ts_offset;
291   gint64 prev_ts_offset;
292 };
293
294 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
295 #define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)
296
297 /* Manages the receiving end of the packets.
298  *
299  * There is one such structure for each RTP session (audio/video/...).
300  * We get the RTP/RTCP packets and stuff them into the session manager. From
301  * there they are pushed into an SSRC demuxer that splits the stream based on
302  * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
303  * the GstRtpBinStream above).
304  */
305 struct _GstRtpBinSession
306 {
307   /* session id */
308   gint id;
309   /* the parent bin */
310   GstRtpBin *bin;
311   /* the session element */
312   GstElement *session;
313   /* the SSRC demuxer */
314   GstElement *demux;
315   gulong demux_newpad_sig;
316
317   GMutex *lock;
318
319   /* list of GstRtpBinStream */
320   GSList *streams;
321
322   /* mapping of payload type to caps */
323   GHashTable *ptmap;
324
325   /* the pads of the session */
326   GstPad *recv_rtp_sink;
327   GstPad *recv_rtp_src;
328   GstPad *recv_rtcp_sink;
329   GstPad *sync_src;
330   GstPad *send_rtp_sink;
331   GstPad *send_rtp_src;
332   GstPad *send_rtcp_src;
333 };
334
335 /* Manages the RTP streams that come from one client and should therefore be
336  * synchronized.
337  */
338 struct _GstRtpBinClient
339 {
340   /* the common CNAME for the streams */
341   gchar *cname;
342   guint cname_len;
343
344   /* the streams */
345   guint nstreams;
346   GSList *streams;
347
348   gint64 min_delta;
349 };
350
351 /* find a session with the given id. Must be called with RTP_BIN_LOCK */
352 static GstRtpBinSession *
353 find_session_by_id (GstRtpBin * rtpbin, gint id)
354 {
355   GSList *walk;
356
357   for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
358     GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
359
360     if (sess->id == id)
361       return sess;
362   }
363   return NULL;
364 }
365
366 static void
367 on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
368 {
369   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
370       sess->id, ssrc);
371 }
372
373 static void
374 on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
375 {
376   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
377       sess->id, ssrc);
378 }
379
380 static void
381 on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
382 {
383   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
384       sess->id, ssrc);
385 }
386
387 static void
388 on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
389 {
390   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
391       sess->id, ssrc);
392 }
393
394 static void
395 on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
396 {
397   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
398       sess->id, ssrc);
399 }
400
401 static void
402 on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
403 {
404   g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
405       sess->id, ssrc);
406 }
407
408 /* create a session with the given id.  Must be called with RTP_BIN_LOCK */
409 static GstRtpBinSession *
410 create_session (GstRtpBin * rtpbin, gint id)
411 {
412   GstRtpBinSession *sess;
413   GstElement *session, *demux;
414
415   if (!(session = gst_element_factory_make ("gstrtpsession", NULL)))
416     goto no_session;
417
418   if (!(demux = gst_element_factory_make ("gstrtpssrcdemux", NULL)))
419     goto no_demux;
420
421   sess = g_new0 (GstRtpBinSession, 1);
422   sess->lock = g_mutex_new ();
423   sess->id = id;
424   sess->bin = rtpbin;
425   sess->session = session;
426   sess->demux = demux;
427   sess->ptmap = g_hash_table_new (NULL, NULL);
428   rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
429
430   /* provide clock_rate to the session manager when needed */
431   g_signal_connect (session, "request-pt-map",
432       (GCallback) pt_map_requested, sess);
433
434   g_signal_connect (sess->session, "on-new-ssrc",
435       (GCallback) on_new_ssrc, sess);
436   g_signal_connect (sess->session, "on-ssrc-collision",
437       (GCallback) on_ssrc_collision, sess);
438   g_signal_connect (sess->session, "on-ssrc-validated",
439       (GCallback) on_ssrc_validated, sess);
440   g_signal_connect (sess->session, "on-bye-ssrc",
441       (GCallback) on_bye_ssrc, sess);
442   g_signal_connect (sess->session, "on-bye-timeout",
443       (GCallback) on_bye_timeout, sess);
444   g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
445
446   gst_bin_add (GST_BIN_CAST (rtpbin), session);
447   gst_element_set_state (session, GST_STATE_PLAYING);
448   gst_bin_add (GST_BIN_CAST (rtpbin), demux);
449   gst_element_set_state (demux, GST_STATE_PLAYING);
450
451   return sess;
452
453   /* ERRORS */
454 no_session:
455   {
456     g_warning ("gstrtpbin: could not create gstrtpsession element");
457     return NULL;
458   }
459 no_demux:
460   {
461     gst_object_unref (session);
462     g_warning ("gstrtpbin: could not create gstrtpssrcdemux element");
463     return NULL;
464   }
465 }
466
467 static void
468 free_session (GstRtpBinSession * sess)
469 {
470   GstRtpBin *bin;
471
472   bin = sess->bin;
473
474   gst_element_set_state (sess->session, GST_STATE_NULL);
475   gst_element_set_state (sess->demux, GST_STATE_NULL);
476
477   gst_bin_remove (GST_BIN_CAST (bin), sess->session);
478   gst_bin_remove (GST_BIN_CAST (bin), sess->demux);
479
480   g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
481   g_slist_free (sess->streams);
482
483   g_mutex_free (sess->lock);
484   g_hash_table_destroy (sess->ptmap);
485
486   bin->sessions = g_slist_remove (bin->sessions, sess);
487
488   g_free (sess);
489 }
490
491 #if 0
492 static GstRtpBinStream *
493 find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
494 {
495   GSList *walk;
496
497   for (walk = session->streams; walk; walk = g_slist_next (walk)) {
498     GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
499
500     if (stream->ssrc == ssrc)
501       return stream;
502   }
503   return NULL;
504 }
505 #endif
506
507 /* get the payload type caps for the specific payload @pt in @session */
508 static GstCaps *
509 get_pt_map (GstRtpBinSession * session, guint pt)
510 {
511   GstCaps *caps = NULL;
512   GstRtpBin *bin;
513   GValue ret = { 0 };
514   GValue args[3] = { {0}, {0}, {0} };
515
516   GST_DEBUG ("searching pt %d in cache", pt);
517
518   GST_RTP_SESSION_LOCK (session);
519
520   /* first look in the cache */
521   caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
522   if (caps)
523     goto done;
524
525   bin = session->bin;
526
527   GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);
528
529   /* not in cache, send signal to request caps */
530   g_value_init (&args[0], GST_TYPE_ELEMENT);
531   g_value_set_object (&args[0], bin);
532   g_value_init (&args[1], G_TYPE_UINT);
533   g_value_set_uint (&args[1], session->id);
534   g_value_init (&args[2], G_TYPE_UINT);
535   g_value_set_uint (&args[2], pt);
536
537   g_value_init (&ret, GST_TYPE_CAPS);
538   g_value_set_boxed (&ret, NULL);
539
540   g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);
541
542   caps = (GstCaps *) g_value_get_boxed (&ret);
543   if (!caps)
544     goto no_caps;
545
546   GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);
547
548   /* store in cache */
549   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt), caps);
550
551 done:
552   GST_RTP_SESSION_UNLOCK (session);
553
554   return caps;
555
556   /* ERRORS */
557 no_caps:
558   {
559     GST_RTP_SESSION_UNLOCK (session);
560     GST_DEBUG ("no pt map could be obtained");
561     return NULL;
562   }
563 }
564
565 static gboolean
566 return_true (gpointer key, gpointer value, gpointer user_data)
567 {
568   return TRUE;
569 }
570
571 static void
572 gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
573 {
574   GSList *walk;
575
576   GST_RTP_BIN_LOCK (bin);
577   GST_DEBUG_OBJECT (bin, "clearing pt map");
578   for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
579     GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
580
581     GST_RTP_SESSION_LOCK (session);
582 #if 0
583     /* This requires GLib 2.12 */
584     g_hash_table_remove_all (session->ptmap);
585 #else
586     g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
587 #endif
588     GST_RTP_SESSION_UNLOCK (session);
589   }
590   GST_RTP_BIN_UNLOCK (bin);
591 }
592
593 static GstRtpBinClient *
594 get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
595 {
596   GstRtpBinClient *result = NULL;
597   GSList *walk;
598
599   for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
600     GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
601
602     if (len != client->cname_len)
603       continue;
604
605     if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
606       GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
607           client->cname);
608       result = client;
609       break;
610     }
611   }
612
613   /* nothing found, create one */
614   if (result == NULL) {
615     result = g_new0 (GstRtpBinClient, 1);
616     result->cname = g_strndup ((gchar *) data, len);
617     result->cname_len = len;
618     result->min_delta = G_MAXINT64;
619     bin->clients = g_slist_prepend (bin->clients, result);
620     GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
621         result->cname);
622   }
623   return result;
624 }
625
626 static void
627 free_client (GstRtpBinClient * client, GstRtpBin * bin)
628 {
629   bin->clients = g_slist_remove (bin->clients, client);
630   g_free (client->cname);
631   g_free (client);
632 }
633
634 /* associate a stream to the given CNAME. This will make sure all streams for
635  * that CNAME are synchronized together. */
636 static void
637 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
638     guint8 * data)
639 {
640   GstRtpBinClient *client;
641   gboolean created;
642   GSList *walk;
643
644   /* first find or create the CNAME */
645   client = get_client (bin, len, data, &created);
646
647   /* find stream in the client */
648   for (walk = client->streams; walk; walk = g_slist_next (walk)) {
649     GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
650
651     if (ostream == stream)
652       break;
653   }
654   /* not found, add it to the list */
655   if (walk == NULL) {
656     GST_DEBUG_OBJECT (bin,
657         "new association of SSRC %08x with client %p with CNAME %s",
658         stream->ssrc, client, client->cname);
659     client->streams = g_slist_prepend (client->streams, stream);
660     client->nstreams++;
661   } else {
662     GST_DEBUG_OBJECT (bin,
663         "found association of SSRC %08x with client %p with CNAME %s",
664         stream->ssrc, client, client->cname);
665   }
666
667   /* we can only continue if we know the local clock-base and clock-rate */
668   if (stream->clock_base == -1)
669     goto no_clock_base;
670   if (stream->clock_rate <= 0)
671     goto no_clock_rate;
672
673   /* map last RTP time to local timeline using our clock-base */
674   stream->local_rtp = stream->last_extrtptime - stream->clock_base;
675
676   GST_DEBUG_OBJECT (bin,
677       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
678       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base,
679       stream->last_extrtptime, stream->local_rtp, stream->clock_rate);
680
681   /* calculate local NTP time in gstreamer timestamp */
682   stream->local_unix =
683       gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND,
684       stream->clock_rate);
685   /* calculate delta between server and receiver */
686   stream->unix_delta = stream->last_unix - stream->local_unix;
687
688   GST_DEBUG_OBJECT (bin,
689       "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
690       ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix,
691       stream->unix_delta);
692
693   /* recalc inter stream playout offset, but only if there are more than one
694    * stream. */
695   if (client->nstreams > 1) {
696     gint64 min;
697
698     /* calculate the min of all deltas */
699     min = G_MAXINT64;
700     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
701       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
702
703       if (ostream->unix_delta < min)
704         min = ostream->unix_delta;
705     }
706
707     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
708         min);
709
710     /* calculate offsets for each stream */
711     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
712       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
713
714       ostream->ts_offset = ostream->unix_delta - min;
715
716       /* delta changed, see how much */
717       if (ostream->prev_ts_offset != ostream->ts_offset) {
718         gint64 diff;
719
720         if (ostream->prev_ts_offset > ostream->ts_offset)
721           diff = ostream->prev_ts_offset - ostream->ts_offset;
722         else
723           diff = ostream->ts_offset - ostream->prev_ts_offset;
724
725         /* only change diff when it changed more than 1 millisecond. This
726          * compensates for rounding errors in NTP to RTP timestamp
727          * conversions */
728         if (diff > GST_MSECOND)
729           g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL);
730
731         ostream->prev_ts_offset = ostream->ts_offset;
732       }
733       GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
734           ostream->ssrc, ostream->ts_offset);
735     }
736   }
737   return;
738
739 no_clock_base:
740   {
741     GST_WARNING_OBJECT (bin, "we have no clock-base");
742     return;
743   }
744 no_clock_rate:
745   {
746     GST_WARNING_OBJECT (bin, "we have no clock-rate");
747     return;
748   }
749 }
750
751 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
752   for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
753           (b) = gst_rtcp_packet_move_to_next ((packet)))
754
755 #define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
756   for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
757           (b) = gst_rtcp_packet_sdes_next_item ((packet)))
758
759 #define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
760   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
761           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
762
763 static GstFlowReturn
764 gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
765 {
766   GstFlowReturn ret = GST_FLOW_OK;
767   GstRtpBinStream *stream;
768   GstRtpBin *bin;
769   GstRTCPPacket packet;
770   guint32 ssrc;
771   guint64 ntptime;
772   guint32 rtptime;
773   gboolean have_sr, have_sdes;
774   gboolean more;
775
776   stream = gst_pad_get_element_private (pad);
777   bin = stream->bin;
778
779   GST_DEBUG_OBJECT (bin, "received sync packet");
780
781   if (!gst_rtcp_buffer_validate (buffer))
782     goto invalid_rtcp;
783
784   have_sr = FALSE;
785   have_sdes = FALSE;
786   GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) {
787     /* first packet must be SR or RR or else the validate would have failed */
788     switch (gst_rtcp_packet_get_type (&packet)) {
789       case GST_RTCP_TYPE_SR:
790         /* only parse first. There is only supposed to be one SR in the packet
791          * but we will deal with malformed packets gracefully */
792         if (have_sr)
793           break;
794         /* get NTP and RTP times */
795         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
796             NULL, NULL);
797
798         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
799         /* ignore SR that is not ours */
800         if (ssrc != stream->ssrc)
801           continue;
802
803         have_sr = TRUE;
804
805         /* store values in the stream */
806         stream->have_sync = TRUE;
807         stream->last_unix = gst_rtcp_ntp_to_unix (ntptime);
808         /* use extended timestamp */
809         gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime);
810         break;
811       case GST_RTCP_TYPE_SDES:
812       {
813         gboolean more_items, more_entries;
814
815         /* only deal with first SDES, there is only supposed to be one SDES in
816          * the RTCP packet but we deal with bad packets gracefully. Also bail
817          * out if we have not seen an SR item yet. */
818         if (have_sdes || !have_sr)
819           break;
820
821         GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
822           /* skip items that are not about the SSRC of the sender */
823           if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
824             continue;
825
826           /* find the CNAME entry */
827           GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
828             GstRTCPSDESType type;
829             guint8 len;
830             guint8 *data;
831
832             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
833
834             if (type == GST_RTCP_SDES_CNAME) {
835               stream->clock_base = GST_BUFFER_OFFSET (buffer);
836               /* associate the stream to CNAME */
837               gst_rtp_bin_associate (bin, stream, len, data);
838             }
839           }
840         }
841         have_sdes = TRUE;
842         break;
843       }
844       default:
845         /* we can ignore these packets */
846         break;
847     }
848   }
849
850   gst_buffer_unref (buffer);
851
852   return ret;
853
854   /* ERRORS */
855 invalid_rtcp:
856   {
857     /* this is fatal and should be filtered earlier */
858     GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL),
859         ("invalid RTCP packet received"));
860     gst_buffer_unref (buffer);
861     return GST_FLOW_ERROR;
862   }
863 }
864
865 /* create a new stream with @ssrc in @session. Must be called with
866  * RTP_SESSION_LOCK. */
867 static GstRtpBinStream *
868 create_stream (GstRtpBinSession * session, guint32 ssrc)
869 {
870   GstElement *buffer, *demux;
871   GstRtpBinStream *stream;
872   GstPadTemplate *templ;
873   gchar *padname;
874
875   if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL)))
876     goto no_jitterbuffer;
877
878   if (!(demux = gst_element_factory_make ("gstrtpptdemux", NULL)))
879     goto no_demux;
880
881   stream = g_new0 (GstRtpBinStream, 1);
882   stream->ssrc = ssrc;
883   stream->bin = session->bin;
884   stream->session = session;
885   stream->buffer = buffer;
886   stream->demux = demux;
887   stream->last_extrtptime = -1;
888   stream->have_sync = FALSE;
889   session->streams = g_slist_prepend (session->streams, stream);
890
891   /* make an internal sinkpad for RTCP sync packets. Take ownership of the
892    * pad. We will link this pad later. */
893   padname = g_strdup_printf ("sync_%d", ssrc);
894   templ = gst_static_pad_template_get (&rtpbin_sync_sink_template);
895   stream->sync_pad = gst_pad_new_from_template (templ, padname);
896   gst_object_unref (templ);
897   gst_object_ref (stream->sync_pad);
898   gst_object_sink (stream->sync_pad);
899   gst_pad_set_element_private (stream->sync_pad, stream);
900   gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain);
901   gst_pad_set_active (stream->sync_pad, TRUE);
902
903   /* provide clock_rate to the jitterbuffer when needed */
904   g_signal_connect (buffer, "request-pt-map",
905       (GCallback) pt_map_requested, session);
906
907   /* configure latency */
908   g_object_set (buffer, "latency", session->bin->latency, NULL);
909
910   gst_bin_add (GST_BIN_CAST (session->bin), buffer);
911   gst_element_set_state (buffer, GST_STATE_PLAYING);
912   gst_bin_add (GST_BIN_CAST (session->bin), demux);
913   gst_element_set_state (demux, GST_STATE_PLAYING);
914
915   /* link stuff */
916   gst_element_link (buffer, demux);
917
918   return stream;
919
920   /* ERRORS */
921 no_jitterbuffer:
922   {
923     g_warning ("gstrtpbin: could not create gstrtpjitterbuffer element");
924     return NULL;
925   }
926 no_demux:
927   {
928     gst_object_unref (buffer);
929     g_warning ("gstrtpbin: could not create gstrtpptdemux element");
930     return NULL;
931   }
932 }
933
934 static void
935 free_stream (GstRtpBinStream * stream)
936 {
937   GstRtpBinSession *session;
938
939   session = stream->session;
940
941   gst_element_set_state (stream->buffer, GST_STATE_NULL);
942   gst_element_set_state (stream->demux, GST_STATE_NULL);
943
944   gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer);
945   gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux);
946
947   gst_object_unref (stream->sync_pad);
948
949   session->streams = g_slist_remove (session->streams, stream);
950
951   g_free (stream);
952 }
953
954 /* GObject vmethods */
955 static void gst_rtp_bin_dispose (GObject * object);
956 static void gst_rtp_bin_finalize (GObject * object);
957 static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
958     const GValue * value, GParamSpec * pspec);
959 static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
960     GValue * value, GParamSpec * pspec);
961
962 /* GstElement vmethods */
963 static GstClock *gst_rtp_bin_provide_clock (GstElement * element);
964 static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
965     GstStateChange transition);
966 static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
967     GstPadTemplate * templ, const gchar * name);
968 static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
969 static void gst_rtp_bin_clear_pt_map (GstRtpBin * bin);
970
971 GST_BOILERPLATE (GstRtpBin, gst_rtp_bin, GstBin, GST_TYPE_BIN);
972
973 static void
974 gst_rtp_bin_base_init (gpointer klass)
975 {
976   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
977
978   /* sink pads */
979   gst_element_class_add_pad_template (element_class,
980       gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
981   gst_element_class_add_pad_template (element_class,
982       gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
983   gst_element_class_add_pad_template (element_class,
984       gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
985
986   /* src pads */
987   gst_element_class_add_pad_template (element_class,
988       gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
989   gst_element_class_add_pad_template (element_class,
990       gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
991   gst_element_class_add_pad_template (element_class,
992       gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
993
994   gst_element_class_set_details (element_class, &rtpbin_details);
995 }
996
997 static void
998 gst_rtp_bin_class_init (GstRtpBinClass * klass)
999 {
1000   GObjectClass *gobject_class;
1001   GstElementClass *gstelement_class;
1002
1003   gobject_class = (GObjectClass *) klass;
1004   gstelement_class = (GstElementClass *) klass;
1005
1006   g_type_class_add_private (klass, sizeof (GstRtpBinPrivate));
1007
1008   gobject_class->dispose = gst_rtp_bin_dispose;
1009   gobject_class->finalize = gst_rtp_bin_finalize;
1010   gobject_class->set_property = gst_rtp_bin_set_property;
1011   gobject_class->get_property = gst_rtp_bin_get_property;
1012
1013   g_object_class_install_property (gobject_class, PROP_LATENCY,
1014       g_param_spec_uint ("latency", "Buffer latency in ms",
1015           "Default amount of ms to buffer in the jitterbuffers", 0,
1016           G_MAXUINT, DEFAULT_LATENCY_MS, G_PARAM_READWRITE));
1017
1018   /**
1019    * GstRtpBin::request-pt-map:
1020    * @rtpbin: the object which received the signal
1021    * @session: the session
1022    * @pt: the pt
1023    *
1024    * Request the payload type as #GstCaps for @pt in @session.
1025    */
1026   gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP] =
1027       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
1028       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, request_pt_map),
1029       NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
1030       G_TYPE_UINT, G_TYPE_UINT);
1031   /**
1032    * GstRtpBin::clear-pt-map:
1033    * @rtpbin: the object which received the signal
1034    *
1035    * Clear all previously cached pt-mapping obtained with
1036    * GstRtpBin::request-pt-map.
1037    */
1038   gst_rtp_bin_signals[SIGNAL_CLEAR_PT_MAP] =
1039       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
1040       G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpBinClass, clear_pt_map),
1041       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
1042
1043   /**
1044    * GstRtpBin::on-new-ssrc:
1045    * @rtpbin: the object which received the signal
1046    * @session: the session
1047    * @ssrc: the SSRC 
1048    *
1049    * Notify of a new SSRC that entered @session.
1050    */
1051   gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
1052       g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
1053       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_new_ssrc),
1054       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1055       G_TYPE_UINT, G_TYPE_UINT);
1056   /**
1057    * GstRtpBin::on-ssrc_collision:
1058    * @rtpbin: the object which received the signal
1059    * @session: the session
1060    * @ssrc: the SSRC 
1061    *
1062    * Notify when we have an SSRC collision
1063    */
1064   gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
1065       g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
1066       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_collision),
1067       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1068       G_TYPE_UINT, G_TYPE_UINT);
1069   /**
1070    * GstRtpBin::on-ssrc_validated:
1071    * @rtpbin: the object which received the signal
1072    * @session: the session
1073    * @ssrc: the SSRC 
1074    *
1075    * Notify of a new SSRC that became validated.
1076    */
1077   gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
1078       g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
1079       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_ssrc_validated),
1080       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1081       G_TYPE_UINT, G_TYPE_UINT);
1082
1083   /**
1084    * GstRtpBin::on-bye-ssrc:
1085    * @rtpbin: the object which received the signal
1086    * @session: the session
1087    * @ssrc: the SSRC 
1088    *
1089    * Notify of an SSRC that became inactive because of a BYE packet.
1090    */
1091   gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
1092       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
1093       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_ssrc),
1094       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1095       G_TYPE_UINT, G_TYPE_UINT);
1096   /**
1097    * GstRtpBin::on-bye-timeout:
1098    * @rtpbin: the object which received the signal
1099    * @session: the session
1100    * @ssrc: the SSRC 
1101    *
1102    * Notify of an SSRC that has timed out because of BYE
1103    */
1104   gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
1105       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
1106       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_bye_timeout),
1107       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1108       G_TYPE_UINT, G_TYPE_UINT);
1109   /**
1110    * GstRtpBin::on-timeout:
1111    * @rtpbin: the object which received the signal
1112    * @session: the session
1113    * @ssrc: the SSRC 
1114    *
1115    * Notify of an SSRC that has timed out
1116    */
1117   gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
1118       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
1119       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_timeout),
1120       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
1121       G_TYPE_UINT, G_TYPE_UINT);
1122
1123   gstelement_class->provide_clock =
1124       GST_DEBUG_FUNCPTR (gst_rtp_bin_provide_clock);
1125   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
1126   gstelement_class->request_new_pad =
1127       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
1128   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
1129
1130   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_bin_clear_pt_map);
1131
1132   GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
1133 }
1134
1135 static void
1136 gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass)
1137 {
1138   rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
1139   rtpbin->priv->bin_lock = g_mutex_new ();
1140   rtpbin->provided_clock = gst_system_clock_obtain ();
1141   rtpbin->latency = DEFAULT_LATENCY_MS;
1142 }
1143
1144 static void
1145 gst_rtp_bin_dispose (GObject * object)
1146 {
1147   GstRtpBin *rtpbin;
1148
1149   rtpbin = GST_RTP_BIN (object);
1150
1151   g_slist_foreach (rtpbin->sessions, (GFunc) free_session, NULL);
1152   g_slist_foreach (rtpbin->clients, (GFunc) free_client, NULL);
1153   g_slist_free (rtpbin->sessions);
1154   rtpbin->sessions = NULL;
1155
1156   G_OBJECT_CLASS (parent_class)->dispose (object);
1157 }
1158
1159 static void
1160 gst_rtp_bin_finalize (GObject * object)
1161 {
1162   GstRtpBin *rtpbin;
1163
1164   rtpbin = GST_RTP_BIN (object);
1165
1166   g_mutex_free (rtpbin->priv->bin_lock);
1167   gst_object_unref (rtpbin->provided_clock);
1168   g_slist_free (rtpbin->sessions);
1169
1170   G_OBJECT_CLASS (parent_class)->finalize (object);
1171 }
1172
1173 static void
1174 gst_rtp_bin_set_property (GObject * object, guint prop_id,
1175     const GValue * value, GParamSpec * pspec)
1176 {
1177   GstRtpBin *rtpbin;
1178
1179   rtpbin = GST_RTP_BIN (object);
1180
1181   switch (prop_id) {
1182     case PROP_LATENCY:
1183       rtpbin->latency = g_value_get_uint (value);
1184       break;
1185     default:
1186       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1187       break;
1188   }
1189 }
1190
1191 static void
1192 gst_rtp_bin_get_property (GObject * object, guint prop_id,
1193     GValue * value, GParamSpec * pspec)
1194 {
1195   GstRtpBin *rtpbin;
1196
1197   rtpbin = GST_RTP_BIN (object);
1198
1199   switch (prop_id) {
1200     case PROP_LATENCY:
1201       g_value_set_uint (value, rtpbin->latency);
1202       break;
1203     default:
1204       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1205       break;
1206   }
1207 }
1208
1209 static GstClock *
1210 gst_rtp_bin_provide_clock (GstElement * element)
1211 {
1212   GstRtpBin *rtpbin;
1213
1214   rtpbin = GST_RTP_BIN (element);
1215
1216   return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock));
1217 }
1218
1219 static void
1220 calc_ntp_ns_base (GstRtpBin * bin)
1221 {
1222   GstClockTime now;
1223   GTimeVal current;
1224   GSList *walk;
1225
1226   /* get the current time and convert it to NTP time in nanoseconds */
1227   g_get_current_time (&current);
1228   now = GST_TIMEVAL_TO_TIME (current);
1229   now += (2208988800LL * GST_SECOND);
1230
1231   GST_RTP_BIN_LOCK (bin);
1232   bin->priv->ntp_ns_base = now;
1233   for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
1234     GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
1235
1236     g_object_set (session->session, "ntp-ns-base", now, NULL);
1237   }
1238   GST_RTP_BIN_UNLOCK (bin);
1239
1240   return;
1241 }
1242
1243 static GstStateChangeReturn
1244 gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
1245 {
1246   GstStateChangeReturn res;
1247   GstRtpBin *rtpbin;
1248
1249   rtpbin = GST_RTP_BIN (element);
1250
1251   switch (transition) {
1252     case GST_STATE_CHANGE_NULL_TO_READY:
1253       break;
1254     case GST_STATE_CHANGE_READY_TO_PAUSED:
1255       break;
1256     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1257       calc_ntp_ns_base (rtpbin);
1258       break;
1259     default:
1260       break;
1261   }
1262
1263   res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1264
1265   switch (transition) {
1266     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1267       break;
1268     case GST_STATE_CHANGE_PAUSED_TO_READY:
1269       break;
1270     case GST_STATE_CHANGE_READY_TO_NULL:
1271       break;
1272     default:
1273       break;
1274   }
1275   return res;
1276 }
1277
1278 /* a new pad (SSRC) was created in @session */
1279 static void
1280 new_payload_found (GstElement * element, guint pt, GstPad * pad,
1281     GstRtpBinStream * stream)
1282 {
1283   GstRtpBin *rtpbin;
1284   GstElementClass *klass;
1285   GstPadTemplate *templ;
1286   gchar *padname;
1287   GstPad *gpad;
1288
1289   rtpbin = stream->bin;
1290
1291   GST_DEBUG ("new payload pad %d", pt);
1292
1293   /* ghost the pad to the parent */
1294   klass = GST_ELEMENT_GET_CLASS (rtpbin);
1295   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
1296   padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d",
1297       stream->session->id, stream->ssrc, pt);
1298   gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
1299   g_free (padname);
1300
1301   gst_pad_set_caps (gpad, GST_PAD_CAPS (pad));
1302   gst_pad_set_active (gpad, TRUE);
1303   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
1304 }
1305
1306 static GstCaps *
1307 pt_map_requested (GstElement * element, guint pt, GstRtpBinSession * session)
1308 {
1309   GstRtpBin *rtpbin;
1310   GstCaps *caps;
1311
1312   rtpbin = session->bin;
1313
1314   GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,
1315       session->id);
1316
1317   caps = get_pt_map (session, pt);
1318   if (!caps)
1319     goto no_caps;
1320
1321   return caps;
1322
1323   /* ERRORS */
1324 no_caps:
1325   {
1326     GST_DEBUG_OBJECT (rtpbin, "could not get caps");
1327     return NULL;
1328   }
1329 }
1330
1331 /* emited when caps changed for the session */
1332 static void
1333 caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
1334 {
1335   GstRtpBin *bin;
1336   GstCaps *caps;
1337   gint payload;
1338   const GstStructure *s;
1339
1340   bin = session->bin;
1341
1342   g_object_get (pad, "caps", &caps, NULL);
1343
1344   if (caps == NULL)
1345     return;
1346
1347   GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
1348
1349   s = gst_caps_get_structure (caps, 0);
1350
1351   /* get payload, finish when it's not there */
1352   if (!gst_structure_get_int (s, "payload", &payload))
1353     return;
1354
1355   GST_RTP_SESSION_LOCK (session);
1356   GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
1357   g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
1358   GST_RTP_SESSION_UNLOCK (session);
1359 }
1360
1361 /* a new pad (SSRC) was created in @session */
1362 static void
1363 new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
1364     GstRtpBinSession * session)
1365 {
1366   GstRtpBinStream *stream;
1367   GstPad *sinkpad, *srcpad;
1368   gchar *padname;
1369   GstCaps *caps;
1370
1371   GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc);
1372
1373   GST_RTP_SESSION_LOCK (session);
1374
1375   /* create new stream */
1376   stream = create_stream (session, ssrc);
1377   if (!stream)
1378     goto no_stream;
1379
1380   /* get the caps of the pad, we need the clock-rate and base_time if any. */
1381   if ((caps = gst_pad_get_caps (pad))) {
1382     const GstStructure *s;
1383     guint val;
1384
1385     GST_DEBUG_OBJECT (session->bin, "pad has caps %" GST_PTR_FORMAT, caps);
1386
1387     s = gst_caps_get_structure (caps, 0);
1388
1389     if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate))
1390       stream->clock_rate = -1;
1391
1392     if (gst_structure_get_uint (s, "clock-base", &val))
1393       stream->clock_base = val;
1394     else
1395       stream->clock_base = -1;
1396   }
1397
1398   /* get pad and link */
1399   GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer");
1400   sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
1401   gst_pad_link (pad, sinkpad);
1402   gst_object_unref (sinkpad);
1403
1404   /* get the RTCP sync pad */
1405   GST_DEBUG_OBJECT (session->bin, "linking sync pad");
1406   padname = g_strdup_printf ("rtcp_src_%d", ssrc);
1407   srcpad = gst_element_get_pad (element, padname);
1408   g_free (padname);
1409   gst_pad_link (srcpad, stream->sync_pad);
1410   gst_object_unref (srcpad);
1411
1412   /* connect to the new-pad signal of the payload demuxer, this will expose the
1413    * new pad by ghosting it. */
1414   stream->demux_newpad_sig = g_signal_connect (stream->demux,
1415       "new-payload-type", (GCallback) new_payload_found, stream);
1416   /* connect to the request-pt-map signal. This signal will be emited by the
1417    * demuxer so that it can apply a proper caps on the buffers for the
1418    * depayloaders. */
1419   stream->demux_ptreq_sig = g_signal_connect (stream->demux,
1420       "request-pt-map", (GCallback) pt_map_requested, session);
1421
1422   GST_RTP_SESSION_UNLOCK (session);
1423
1424   return;
1425
1426   /* ERRORS */
1427 no_stream:
1428   {
1429     GST_RTP_SESSION_UNLOCK (session);
1430     GST_DEBUG ("could not create stream");
1431     return;
1432   }
1433 }
1434
1435 /* Create a pad for receiving RTP for the session in @name. Must be called with
1436  * RTP_BIN_LOCK.
1437  */
1438 static GstPad *
1439 create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
1440 {
1441   GstPad *result, *sinkdpad;
1442   guint sessid;
1443   GstRtpBinSession *session;
1444   GstPadLinkReturn lres;
1445
1446   /* first get the session number */
1447   if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1)
1448     goto no_name;
1449
1450   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
1451
1452   /* get or create session */
1453   session = find_session_by_id (rtpbin, sessid);
1454   if (!session) {
1455     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
1456     /* create session now */
1457     session = create_session (rtpbin, sessid);
1458     if (session == NULL)
1459       goto create_error;
1460   }
1461
1462   /* check if pad was requested */
1463   if (session->recv_rtp_sink != NULL)
1464     goto existed;
1465
1466   GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
1467   /* get recv_rtp pad and store */
1468   session->recv_rtp_sink =
1469       gst_element_get_request_pad (session->session, "recv_rtp_sink");
1470   if (session->recv_rtp_sink == NULL)
1471     goto pad_failed;
1472
1473   g_signal_connect (session->recv_rtp_sink, "notify::caps",
1474       (GCallback) caps_changed, session);
1475
1476   GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
1477   /* get srcpad, link to SSRCDemux */
1478   session->recv_rtp_src =
1479       gst_element_get_static_pad (session->session, "recv_rtp_src");
1480   if (session->recv_rtp_src == NULL)
1481     goto pad_failed;
1482
1483   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
1484   sinkdpad = gst_element_get_static_pad (session->demux, "sink");
1485   GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
1486   lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
1487   gst_object_unref (sinkdpad);
1488   if (lres != GST_PAD_LINK_OK)
1489     goto link_failed;
1490
1491   /* connect to the new-ssrc-pad signal of the SSRC demuxer */
1492   session->demux_newpad_sig = g_signal_connect (session->demux,
1493       "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
1494
1495   GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
1496   result =
1497       gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
1498   gst_pad_set_active (result, TRUE);
1499   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
1500
1501   return result;
1502
1503   /* ERRORS */
1504 no_name:
1505   {
1506     g_warning ("gstrtpbin: invalid name given");
1507     return NULL;
1508   }
1509 create_error:
1510   {
1511     /* create_session already warned */
1512     return NULL;
1513   }
1514 existed:
1515   {
1516     g_warning ("gstrtpbin: recv_rtp pad already requested for session %d",
1517         sessid);
1518     return NULL;
1519   }
1520 pad_failed:
1521   {
1522     g_warning ("gstrtpbin: failed to get session pad");
1523     return NULL;
1524   }
1525 link_failed:
1526   {
1527     g_warning ("gstrtpbin: failed to link pads");
1528     return NULL;
1529   }
1530 }
1531
1532 /* Create a pad for receiving RTCP for the session in @name. Must be called with
1533  * RTP_BIN_LOCK.
1534  */
1535 static GstPad *
1536 create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
1537     const gchar * name)
1538 {
1539   GstPad *result;
1540   guint sessid;
1541   GstRtpBinSession *session;
1542   GstPad *sinkdpad;
1543   GstPadLinkReturn lres;
1544
1545   /* first get the session number */
1546   if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1)
1547     goto no_name;
1548
1549   GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
1550
1551   /* get or create the session */
1552   session = find_session_by_id (rtpbin, sessid);
1553   if (!session) {
1554     GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
1555     /* create session now */
1556     session = create_session (rtpbin, sessid);
1557     if (session == NULL)
1558       goto create_error;
1559   }
1560
1561   /* check if pad was requested */
1562   if (session->recv_rtcp_sink != NULL)
1563     goto existed;
1564
1565   /* get recv_rtp pad and store */
1566   GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
1567   session->recv_rtcp_sink =
1568       gst_element_get_request_pad (session->session, "recv_rtcp_sink");
1569   if (session->recv_rtcp_sink == NULL)
1570     goto pad_failed;
1571
1572   /* get srcpad, link to SSRCDemux */
1573   GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
1574   session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
1575   if (session->sync_src == NULL)
1576     goto pad_failed;
1577
1578   GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
1579   sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
1580   lres = gst_pad_link (session->sync_src, sinkdpad);
1581   gst_object_unref (sinkdpad);
1582   if (lres != GST_PAD_LINK_OK)
1583     goto link_failed;
1584
1585   result =
1586       gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
1587   gst_pad_set_active (result, TRUE);
1588   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
1589
1590   return result;
1591
1592   /* ERRORS */
1593 no_name:
1594   {
1595     g_warning ("gstrtpbin: invalid name given");
1596     return NULL;
1597   }
1598 create_error:
1599   {
1600     /* create_session already warned */
1601     return NULL;
1602   }
1603 existed:
1604   {
1605     g_warning ("gstrtpbin: recv_rtcp pad already requested for session %d",
1606         sessid);
1607     return NULL;
1608   }
1609 pad_failed:
1610   {
1611     g_warning ("gstrtpbin: failed to get session pad");
1612     return NULL;
1613   }
1614 link_failed:
1615   {
1616     g_warning ("gstrtpbin: failed to link pads");
1617     return NULL;
1618   }
1619 }
1620
1621 /* Create a pad for sending RTP for the session in @name. Must be called with
1622  * RTP_BIN_LOCK.
1623  */
1624 static GstPad *
1625 create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
1626 {
1627   GstPad *result, *srcghost;
1628   gchar *gname;
1629   guint sessid;
1630   GstRtpBinSession *session;
1631   GstElementClass *klass;
1632
1633   /* first get the session number */
1634   if (name == NULL || sscanf (name, "send_rtp_sink_%d", &sessid) != 1)
1635     goto no_name;
1636
1637   /* get or create session */
1638   session = find_session_by_id (rtpbin, sessid);
1639   if (!session) {
1640     /* create session now */
1641     session = create_session (rtpbin, sessid);
1642     if (session == NULL)
1643       goto create_error;
1644   }
1645
1646   /* check if pad was requested */
1647   if (session->send_rtp_sink != NULL)
1648     goto existed;
1649
1650   /* get send_rtp pad and store */
1651   session->send_rtp_sink =
1652       gst_element_get_request_pad (session->session, "send_rtp_sink");
1653   if (session->send_rtp_sink == NULL)
1654     goto pad_failed;
1655
1656   result =
1657       gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
1658   gst_pad_set_active (result, TRUE);
1659   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
1660
1661   /* get srcpad */
1662   session->send_rtp_src =
1663       gst_element_get_static_pad (session->session, "send_rtp_src");
1664   if (session->send_rtp_src == NULL)
1665     goto no_srcpad;
1666
1667   /* ghost the new source pad */
1668   klass = GST_ELEMENT_GET_CLASS (rtpbin);
1669   gname = g_strdup_printf ("send_rtp_src_%d", sessid);
1670   templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d");
1671   srcghost =
1672       gst_ghost_pad_new_from_template (gname, session->send_rtp_src, templ);
1673   gst_pad_set_active (srcghost, TRUE);
1674   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost);
1675   g_free (gname);
1676
1677   return result;
1678
1679   /* ERRORS */
1680 no_name:
1681   {
1682     g_warning ("gstrtpbin: invalid name given");
1683     return NULL;
1684   }
1685 create_error:
1686   {
1687     /* create_session already warned */
1688     return NULL;
1689   }
1690 existed:
1691   {
1692     g_warning ("gstrtpbin: send_rtp pad already requested for session %d",
1693         sessid);
1694     return NULL;
1695   }
1696 pad_failed:
1697   {
1698     g_warning ("gstrtpbin: failed to get session pad for session %d", sessid);
1699     return NULL;
1700   }
1701 no_srcpad:
1702   {
1703     g_warning ("gstrtpbin: failed to get rtp source pad for session %d",
1704         sessid);
1705     return NULL;
1706   }
1707 }
1708
1709 /* Create a pad for sending RTCP for the session in @name. Must be called with
1710  * RTP_BIN_LOCK.
1711  */
1712 static GstPad *
1713 create_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
1714 {
1715   GstPad *result;
1716   guint sessid;
1717   GstRtpBinSession *session;
1718
1719   /* first get the session number */
1720   if (name == NULL || sscanf (name, "send_rtcp_src_%d", &sessid) != 1)
1721     goto no_name;
1722
1723   /* get or create session */
1724   session = find_session_by_id (rtpbin, sessid);
1725   if (!session)
1726     goto no_session;
1727
1728   /* check if pad was requested */
1729   if (session->send_rtcp_src != NULL)
1730     goto existed;
1731
1732   /* get rtcp_src pad and store */
1733   session->send_rtcp_src =
1734       gst_element_get_request_pad (session->session, "send_rtcp_src");
1735   if (session->send_rtcp_src == NULL)
1736     goto pad_failed;
1737
1738   result =
1739       gst_ghost_pad_new_from_template (name, session->send_rtcp_src, templ);
1740   gst_pad_set_active (result, TRUE);
1741   gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
1742
1743   return result;
1744
1745   /* ERRORS */
1746 no_name:
1747   {
1748     g_warning ("gstrtpbin: invalid name given");
1749     return NULL;
1750   }
1751 no_session:
1752   {
1753     g_warning ("gstrtpbin: session with id %d does not exist", sessid);
1754     return NULL;
1755   }
1756 existed:
1757   {
1758     g_warning ("gstrtpbin: send_rtcp_src pad already requested for session %d",
1759         sessid);
1760     return NULL;
1761   }
1762 pad_failed:
1763   {
1764     g_warning ("gstrtpbin: failed to get rtcp pad for session %d", sessid);
1765     return NULL;
1766   }
1767 }
1768
1769 /* 
1770  */
1771 static GstPad *
1772 gst_rtp_bin_request_new_pad (GstElement * element,
1773     GstPadTemplate * templ, const gchar * name)
1774 {
1775   GstRtpBin *rtpbin;
1776   GstElementClass *klass;
1777   GstPad *result;
1778
1779   g_return_val_if_fail (templ != NULL, NULL);
1780   g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
1781
1782   rtpbin = GST_RTP_BIN (element);
1783   klass = GST_ELEMENT_GET_CLASS (element);
1784
1785   GST_RTP_BIN_LOCK (rtpbin);
1786
1787   /* figure out the template */
1788   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%d")) {
1789     result = create_recv_rtp (rtpbin, templ, name);
1790   } else if (templ == gst_element_class_get_pad_template (klass,
1791           "recv_rtcp_sink_%d")) {
1792     result = create_recv_rtcp (rtpbin, templ, name);
1793   } else if (templ == gst_element_class_get_pad_template (klass,
1794           "send_rtp_sink_%d")) {
1795     result = create_send_rtp (rtpbin, templ, name);
1796   } else if (templ == gst_element_class_get_pad_template (klass,
1797           "send_rtcp_src_%d")) {
1798     result = create_rtcp (rtpbin, templ, name);
1799   } else
1800     goto wrong_template;
1801
1802   GST_RTP_BIN_UNLOCK (rtpbin);
1803
1804   return result;
1805
1806   /* ERRORS */
1807 wrong_template:
1808   {
1809     GST_RTP_BIN_UNLOCK (rtpbin);
1810     g_warning ("gstrtpbin: this is not our template");
1811     return NULL;
1812   }
1813 }
1814
1815 static void
1816 gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
1817 {
1818 }