Merging gst-plugins-ugly
[platform/upstream/gstreamer.git] / gst / realmedia / rdtmanager.c
1 /* GStreamer
2  * Copyright (C) <2005,2006> Wim Taymans <wim@fluendo.com>
3  *               <2013> Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /*
21  * Unless otherwise indicated, Source Code is licensed under MIT license.
22  * See further explanation attached in License Statement (distributed in the file
23  * LICENSE).
24  *
25  * Permission is hereby granted, free of charge, to any person obtaining a copy of
26  * this software and associated documentation files (the "Software"), to deal in
27  * the Software without restriction, including without limitation the rights to
28  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
29  * of the Software, and to permit persons to whom the Software is furnished to do
30  * so, subject to the following conditions:
31  *
32  * The above copyright notice and this permission notice shall be included in all
33  * copies or substantial portions of the Software.
34  *
35  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
36  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
37  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
38  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
39  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
40  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
41  * SOFTWARE.
42  */
43 /* Element-Checklist-Version: 5 */
44
45 /**
46  * SECTION:element-rdtmanager
47  * @title: rdtmanager
48  * @see_also: GstRtspSrc
49  *
50  * A simple RTP session manager used internally by rtspsrc.
51  */
52
53 /* #define HAVE_RTCP */
54
55 #include "gstrdtbuffer.h"
56 #include "rdtmanager.h"
57 #include "rdtjitterbuffer.h"
58
59 #include <gst/glib-compat-private.h>
60
61 #include <stdio.h>
62
63 GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
64 #define GST_CAT_DEFAULT (rdtmanager_debug)
65
66 /* GstRDTManager signals and args */
67 enum
68 {
69   SIGNAL_REQUEST_PT_MAP,
70   SIGNAL_CLEAR_PT_MAP,
71
72   SIGNAL_ON_NEW_SSRC,
73   SIGNAL_ON_SSRC_COLLISION,
74   SIGNAL_ON_SSRC_VALIDATED,
75   SIGNAL_ON_SSRC_ACTIVE,
76   SIGNAL_ON_SSRC_SDES,
77   SIGNAL_ON_BYE_SSRC,
78   SIGNAL_ON_BYE_TIMEOUT,
79   SIGNAL_ON_TIMEOUT,
80   SIGNAL_ON_NPT_STOP,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS      200
85
86 enum
87 {
88   PROP_0,
89   PROP_LATENCY
90 };
91
92 static GstStaticPadTemplate gst_rdt_manager_recv_rtp_sink_template =
93 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%u",
94     GST_PAD_SINK,
95     GST_PAD_REQUEST,
96     GST_STATIC_CAPS ("application/x-rdt")
97     );
98
99 static GstStaticPadTemplate gst_rdt_manager_recv_rtcp_sink_template =
100 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u",
101     GST_PAD_SINK,
102     GST_PAD_REQUEST,
103     GST_STATIC_CAPS ("application/x-rtcp")
104     );
105
106 static GstStaticPadTemplate gst_rdt_manager_recv_rtp_src_template =
107 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%u_%u_%u",
108     GST_PAD_SRC,
109     GST_PAD_SOMETIMES,
110     GST_STATIC_CAPS ("application/x-rdt")
111     );
112
113 static GstStaticPadTemplate gst_rdt_manager_rtcp_src_template =
114 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
115     GST_PAD_SRC,
116     GST_PAD_REQUEST,
117     GST_STATIC_CAPS ("application/x-rtcp")
118     );
119
120 static void gst_rdt_manager_finalize (GObject * object);
121 static void gst_rdt_manager_set_property (GObject * object,
122     guint prop_id, const GValue * value, GParamSpec * pspec);
123 static void gst_rdt_manager_get_property (GObject * object,
124     guint prop_id, GValue * value, GParamSpec * pspec);
125
126 static gboolean gst_rdt_manager_query_src (GstPad * pad, GstObject * parent,
127     GstQuery * query);
128 static gboolean gst_rdt_manager_src_activate_mode (GstPad * pad,
129     GstObject * parent, GstPadMode mode, gboolean active);
130
131 static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
132 static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
133     GstStateChange transition);
134 static GstPad *gst_rdt_manager_request_new_pad (GstElement * element,
135     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
136 static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
137
138 static gboolean gst_rdt_manager_parse_caps (GstRDTManager * rdtmanager,
139     GstRDTManagerSession * session, GstCaps * caps);
140 static gboolean gst_rdt_manager_event_rdt (GstPad * pad, GstObject * parent,
141     GstEvent * event);
142
143 static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad,
144     GstObject * parent, GstBuffer * buffer);
145 static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
146     GstObject * parent, GstBuffer * buffer);
147 static void gst_rdt_manager_loop (GstPad * pad);
148
149 static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
150
151 #define JBUF_LOCK(sess)   (g_mutex_lock (&(sess)->jbuf_lock))
152
153 #define JBUF_LOCK_CHECK(sess,label) G_STMT_START {    \
154   JBUF_LOCK (sess);                                   \
155   if (sess->srcresult != GST_FLOW_OK)                 \
156     goto label;                                       \
157 } G_STMT_END
158
159 #define JBUF_UNLOCK(sess) (g_mutex_unlock (&(sess)->jbuf_lock))
160 #define JBUF_WAIT(sess)   (g_cond_wait (&(sess)->jbuf_cond, &(sess)->jbuf_lock))
161
162 #define JBUF_WAIT_CHECK(sess,label) G_STMT_START {    \
163   JBUF_WAIT(sess);                                    \
164   if (sess->srcresult != GST_FLOW_OK)                 \
165     goto label;                                       \
166 } G_STMT_END
167
168 #define JBUF_SIGNAL(sess) (g_cond_signal (&(sess)->jbuf_cond))
169
170 /* Manages the receiving end of the packets.
171  *
172  * There is one such structure for each RTP session (audio/video/...).
173  * We get the RTP/RTCP packets and stuff them into the session manager. 
174  */
175 struct _GstRDTManagerSession
176 {
177   /* session id */
178   gint id;
179   /* the parent bin */
180   GstRDTManager *dec;
181
182   gboolean active;
183   /* we only support one ssrc and one pt */
184   guint32 ssrc;
185   guint8 pt;
186   gint clock_rate;
187   GstCaps *caps;
188   gint64 clock_base;
189
190   GstSegment segment;
191
192   /* the last seqnum we pushed out */
193   guint32 last_popped_seqnum;
194   /* the next expected seqnum */
195   guint32 next_seqnum;
196   /* last output time */
197   GstClockTime last_out_time;
198
199   /* the pads of the session */
200   GstPad *recv_rtp_sink;
201   GstPad *recv_rtp_src;
202   GstPad *recv_rtcp_sink;
203   GstPad *rtcp_src;
204
205   GstFlowReturn srcresult;
206   gboolean blocked;
207   gboolean eos;
208   gboolean waiting;
209   gboolean discont;
210   GstClockID clock_id;
211
212   /* jitterbuffer, lock and cond */
213   RDTJitterBuffer *jbuf;
214   GMutex jbuf_lock;
215   GCond jbuf_cond;
216
217   /* some accounting */
218   guint64 num_late;
219   guint64 num_duplicates;
220 };
221
222 /* find a session with the given id */
223 static GstRDTManagerSession *
224 find_session_by_id (GstRDTManager * rdtmanager, gint id)
225 {
226   GSList *walk;
227
228   for (walk = rdtmanager->sessions; walk; walk = g_slist_next (walk)) {
229     GstRDTManagerSession *sess = (GstRDTManagerSession *) walk->data;
230
231     if (sess->id == id)
232       return sess;
233   }
234   return NULL;
235 }
236
237 /* create a session with the given id */
238 static GstRDTManagerSession *
239 create_session (GstRDTManager * rdtmanager, gint id)
240 {
241   GstRDTManagerSession *sess;
242
243   sess = g_new0 (GstRDTManagerSession, 1);
244   sess->id = id;
245   sess->dec = rdtmanager;
246   sess->jbuf = rdt_jitter_buffer_new ();
247   g_mutex_init (&sess->jbuf_lock);
248   g_cond_init (&sess->jbuf_cond);
249   rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
250
251   return sess;
252 }
253
254 static gboolean
255 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
256 {
257   GstPad *srcpad = GST_PAD_CAST (user_data);
258
259   gst_pad_push_event (srcpad, gst_event_ref (*event));
260
261   return TRUE;
262 }
263
264 static gboolean
265 activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session,
266     guint32 ssrc, guint8 pt)
267 {
268   GstPadTemplate *templ;
269   GstElementClass *klass;
270   gchar *name;
271   GstCaps *caps;
272   GValue ret = { 0 };
273   GValue args[3] = { {0}
274   , {0}
275   , {0}
276   };
277
278   GST_DEBUG_OBJECT (rdtmanager, "creating stream");
279
280   session->ssrc = ssrc;
281   session->pt = pt;
282
283   /* get pt map */
284   g_value_init (&args[0], GST_TYPE_ELEMENT);
285   g_value_set_object (&args[0], rdtmanager);
286   g_value_init (&args[1], G_TYPE_UINT);
287   g_value_set_uint (&args[1], session->id);
288   g_value_init (&args[2], G_TYPE_UINT);
289   g_value_set_uint (&args[2], pt);
290
291   g_value_init (&ret, GST_TYPE_CAPS);
292   g_value_set_boxed (&ret, NULL);
293
294   g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
295       &ret);
296
297   g_value_unset (&args[0]);
298   g_value_unset (&args[1]);
299   g_value_unset (&args[2]);
300   caps = (GstCaps *) g_value_dup_boxed (&ret);
301   g_value_unset (&ret);
302
303   if (caps)
304     gst_rdt_manager_parse_caps (rdtmanager, session, caps);
305
306   name = g_strdup_printf ("recv_rtp_src_%u_%u_%u", session->id, ssrc, pt);
307   klass = GST_ELEMENT_GET_CLASS (rdtmanager);
308   templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%u_%u_%u");
309   session->recv_rtp_src = gst_pad_new_from_template (templ, name);
310   g_free (name);
311
312   gst_pad_set_element_private (session->recv_rtp_src, session);
313   gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src);
314   gst_pad_set_activatemode_function (session->recv_rtp_src,
315       gst_rdt_manager_src_activate_mode);
316
317   gst_pad_set_active (session->recv_rtp_src, TRUE);
318
319   gst_pad_sticky_events_foreach (session->recv_rtp_sink, forward_sticky_events,
320       session->recv_rtp_src);
321   gst_pad_set_caps (session->recv_rtp_src, caps);
322   gst_caps_unref (caps);
323
324   gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
325
326   return TRUE;
327 }
328
329 static void
330 free_session (GstRDTManagerSession * session)
331 {
332   g_object_unref (session->jbuf);
333   g_cond_clear (&session->jbuf_cond);
334   g_mutex_clear (&session->jbuf_lock);
335   g_free (session);
336 }
337
338 #define gst_rdt_manager_parent_class parent_class
339 G_DEFINE_TYPE (GstRDTManager, gst_rdt_manager, GST_TYPE_ELEMENT);
340 GST_ELEMENT_REGISTER_DEFINE (rdtmanager, "rdtmanager",
341     GST_RANK_NONE, GST_TYPE_RDT_MANAGER);
342
343 /* BOXED:UINT,UINT */
344 #define g_marshal_value_peek_uint(v)     g_value_get_uint (v)
345
346 static void
347 gst_rdt_manager_marshal_BOXED__UINT_UINT (GClosure * closure,
348     GValue * return_value,
349     guint n_param_values,
350     const GValue * param_values,
351     gpointer invocation_hint, gpointer marshal_data)
352 {
353   typedef gpointer (*GMarshalFunc_BOXED__UINT_UINT) (gpointer data1,
354       guint arg_1, guint arg_2, gpointer data2);
355   register GMarshalFunc_BOXED__UINT_UINT callback;
356   register GCClosure *cc = (GCClosure *) closure;
357   register gpointer data1, data2;
358   gpointer v_return;
359
360   g_return_if_fail (return_value != NULL);
361   g_return_if_fail (n_param_values == 3);
362
363   if (G_CCLOSURE_SWAP_DATA (closure)) {
364     data1 = closure->data;
365     data2 = g_value_peek_pointer (param_values + 0);
366   } else {
367     data1 = g_value_peek_pointer (param_values + 0);
368     data2 = closure->data;
369   }
370   callback =
371       (GMarshalFunc_BOXED__UINT_UINT) (marshal_data ? marshal_data :
372       cc->callback);
373
374   v_return = callback (data1,
375       g_marshal_value_peek_uint (param_values + 1),
376       g_marshal_value_peek_uint (param_values + 2), data2);
377
378   g_value_take_boxed (return_value, v_return);
379 }
380
381 static void
382 gst_rdt_manager_marshal_VOID__UINT_UINT (GClosure * closure,
383     GValue * return_value,
384     guint n_param_values,
385     const GValue * param_values,
386     gpointer invocation_hint, gpointer marshal_data)
387 {
388   typedef void (*GMarshalFunc_VOID__UINT_UINT) (gpointer data1,
389       guint arg_1, guint arg_2, gpointer data2);
390   register GMarshalFunc_VOID__UINT_UINT callback;
391   register GCClosure *cc = (GCClosure *) closure;
392   register gpointer data1, data2;
393
394   g_return_if_fail (n_param_values == 3);
395
396   if (G_CCLOSURE_SWAP_DATA (closure)) {
397     data1 = closure->data;
398     data2 = g_value_peek_pointer (param_values + 0);
399   } else {
400     data1 = g_value_peek_pointer (param_values + 0);
401     data2 = closure->data;
402   }
403   callback =
404       (GMarshalFunc_VOID__UINT_UINT) (marshal_data ? marshal_data :
405       cc->callback);
406
407   callback (data1,
408       g_marshal_value_peek_uint (param_values + 1),
409       g_marshal_value_peek_uint (param_values + 2), data2);
410 }
411
412 static void
413 gst_rdt_manager_class_init (GstRDTManagerClass * g_class)
414 {
415   GObjectClass *gobject_class;
416   GstElementClass *gstelement_class;
417   GstRDTManagerClass *klass;
418
419   klass = (GstRDTManagerClass *) g_class;
420   gobject_class = (GObjectClass *) klass;
421   gstelement_class = (GstElementClass *) klass;
422
423   gobject_class->finalize = gst_rdt_manager_finalize;
424   gobject_class->set_property = gst_rdt_manager_set_property;
425   gobject_class->get_property = gst_rdt_manager_get_property;
426
427   g_object_class_install_property (gobject_class, PROP_LATENCY,
428       g_param_spec_uint ("latency", "Buffer latency in ms",
429           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
430           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431
432   /**
433    * GstRDTManager::request-pt-map:
434    * @rdtmanager: the object which received the signal
435    * @session: the session
436    * @pt: the pt
437    *
438    * Request the payload type as #GstCaps for @pt in @session.
439    */
440   gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP] =
441       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
442       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, request_pt_map),
443       NULL, NULL, gst_rdt_manager_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
444       G_TYPE_UINT, G_TYPE_UINT);
445
446   /**
447    * GstRDTManager::clear-pt-map:
448    * @rtpbin: the object which received the signal
449    *
450    * Clear all previously cached pt-mapping obtained with
451    * GstRDTManager::request-pt-map.
452    */
453   gst_rdt_manager_signals[SIGNAL_CLEAR_PT_MAP] =
454       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
455       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, clear_pt_map),
456       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
457
458   /**
459    * GstRDTManager::on-bye-ssrc:
460    * @rtpbin: the object which received the signal
461    * @session: the session
462    * @ssrc: the SSRC
463    *
464    * Notify of an SSRC that became inactive because of a BYE packet.
465    */
466   gst_rdt_manager_signals[SIGNAL_ON_BYE_SSRC] =
467       g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
468       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_bye_ssrc),
469       NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
470       G_TYPE_UINT, G_TYPE_UINT);
471   /**
472    * GstRDTManager::on-bye-timeout:
473    * @rtpbin: the object which received the signal
474    * @session: the session
475    * @ssrc: the SSRC
476    *
477    * Notify of an SSRC that has timed out because of BYE
478    */
479   gst_rdt_manager_signals[SIGNAL_ON_BYE_TIMEOUT] =
480       g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
481       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_bye_timeout),
482       NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
483       G_TYPE_UINT, G_TYPE_UINT);
484   /**
485    * GstRDTManager::on-timeout:
486    * @rtpbin: the object which received the signal
487    * @session: the session
488    * @ssrc: the SSRC
489    *
490    * Notify of an SSRC that has timed out
491    */
492   gst_rdt_manager_signals[SIGNAL_ON_TIMEOUT] =
493       g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
494       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_timeout),
495       NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
496       G_TYPE_UINT, G_TYPE_UINT);
497
498   /**
499    * GstRDTManager::on-npt-stop:
500    * @rtpbin: the object which received the signal
501    * @session: the session
502    * @ssrc: the SSRC
503    *
504    * Notify that SSRC sender has sent data up to the configured NPT stop time.
505    */
506   gst_rdt_manager_signals[SIGNAL_ON_NPT_STOP] =
507       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_npt_stop),
509       NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
510       G_TYPE_UINT, G_TYPE_UINT);
511
512
513   gstelement_class->provide_clock =
514       GST_DEBUG_FUNCPTR (gst_rdt_manager_provide_clock);
515   gstelement_class->change_state =
516       GST_DEBUG_FUNCPTR (gst_rdt_manager_change_state);
517   gstelement_class->request_new_pad =
518       GST_DEBUG_FUNCPTR (gst_rdt_manager_request_new_pad);
519   gstelement_class->release_pad =
520       GST_DEBUG_FUNCPTR (gst_rdt_manager_release_pad);
521
522   /* sink pads */
523   gst_element_class_add_static_pad_template (gstelement_class,
524       &gst_rdt_manager_recv_rtp_sink_template);
525   gst_element_class_add_static_pad_template (gstelement_class,
526       &gst_rdt_manager_recv_rtcp_sink_template);
527   /* src pads */
528   gst_element_class_add_static_pad_template (gstelement_class,
529       &gst_rdt_manager_recv_rtp_src_template);
530   gst_element_class_add_static_pad_template (gstelement_class,
531       &gst_rdt_manager_rtcp_src_template);
532
533   gst_element_class_set_static_metadata (gstelement_class, "RTP Decoder",
534       "Codec/Parser/Network",
535       "Accepts raw RTP and RTCP packets and sends them forward",
536       "Wim Taymans <wim.taymans@gmail.com>");
537
538   GST_DEBUG_CATEGORY_INIT (rdtmanager_debug, "rdtmanager", 0, "RTP decoder");
539 }
540
541 static void
542 gst_rdt_manager_init (GstRDTManager * rdtmanager)
543 {
544   rdtmanager->provided_clock = gst_system_clock_obtain ();
545   rdtmanager->latency = DEFAULT_LATENCY_MS;
546   GST_OBJECT_FLAG_SET (rdtmanager, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
547 }
548
549 static void
550 gst_rdt_manager_finalize (GObject * object)
551 {
552   GstRDTManager *rdtmanager;
553
554   rdtmanager = GST_RDT_MANAGER (object);
555
556   g_slist_foreach (rdtmanager->sessions, (GFunc) free_session, NULL);
557   g_slist_free (rdtmanager->sessions);
558   g_clear_object (&rdtmanager->provided_clock);
559
560   G_OBJECT_CLASS (parent_class)->finalize (object);
561 }
562
563 static gboolean
564 gst_rdt_manager_query_src (GstPad * pad, GstObject * parent, GstQuery * query)
565 {
566   GstRDTManager *rdtmanager;
567   gboolean res;
568
569   rdtmanager = GST_RDT_MANAGER (parent);
570
571   switch (GST_QUERY_TYPE (query)) {
572     case GST_QUERY_LATENCY:
573     {
574       GstClockTime latency;
575
576       latency = rdtmanager->latency * GST_MSECOND;
577
578       /* we pretend to be live with a 3 second latency */
579       gst_query_set_latency (query, TRUE, latency, -1);
580
581       GST_DEBUG_OBJECT (rdtmanager, "reporting %" GST_TIME_FORMAT " of latency",
582           GST_TIME_ARGS (latency));
583       res = TRUE;
584       break;
585     }
586     default:
587       res = gst_pad_query_default (pad, parent, query);
588       break;
589   }
590   return res;
591 }
592
593 static gboolean
594 gst_rdt_manager_src_activate_mode (GstPad * pad, GstObject * parent,
595     GstPadMode mode, gboolean active)
596 {
597   gboolean result;
598   GstRDTManager *rdtmanager;
599   GstRDTManagerSession *session;
600
601   session = gst_pad_get_element_private (pad);
602   rdtmanager = session->dec;
603
604   switch (mode) {
605     case GST_PAD_MODE_PUSH:
606       if (active) {
607         /* allow data processing */
608         JBUF_LOCK (session);
609         GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue");
610         /* Mark as non flushing */
611         session->srcresult = GST_FLOW_OK;
612         gst_segment_init (&session->segment, GST_FORMAT_TIME);
613         session->last_popped_seqnum = -1;
614         session->last_out_time = -1;
615         session->next_seqnum = -1;
616         session->eos = FALSE;
617         JBUF_UNLOCK (session);
618
619         /* start pushing out buffers */
620         GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad");
621         result =
622             gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop,
623             pad, NULL);
624       } else {
625         /* make sure all data processing stops ASAP */
626         JBUF_LOCK (session);
627         /* mark ourselves as flushing */
628         session->srcresult = GST_FLOW_FLUSHING;
629         GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue");
630         /* this unblocks any waiting pops on the src pad task */
631         JBUF_SIGNAL (session);
632         /* unlock clock, we just unschedule, the entry will be released by
633          * the locking streaming thread. */
634         if (session->clock_id)
635           gst_clock_id_unschedule (session->clock_id);
636         JBUF_UNLOCK (session);
637
638         /* NOTE this will hardlock if the state change is called from the src pad
639          * task thread because we will _join() the thread. */
640         GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad");
641         result = gst_pad_stop_task (pad);
642       }
643       break;
644     default:
645       result = FALSE;
646       break;
647   }
648   return result;
649 }
650
651 static GstFlowReturn
652 gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session,
653     GstClockTime timestamp, GstRDTPacket * packet)
654 {
655   GstRDTManager *rdtmanager;
656   guint16 seqnum;
657   gboolean tail;
658   GstFlowReturn res;
659   GstBuffer *buffer;
660
661   rdtmanager = session->dec;
662
663   res = GST_FLOW_OK;
664
665   seqnum = 0;
666   GST_DEBUG_OBJECT (rdtmanager,
667       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
668       GST_TIME_ARGS (timestamp));
669
670   buffer = gst_rdt_packet_to_buffer (packet);
671
672   JBUF_LOCK_CHECK (session, out_flushing);
673
674   /* insert the packet into the queue now, FIXME, use seqnum */
675   if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp,
676           session->clock_rate, &tail))
677     goto duplicate;
678
679   /* signal addition of new buffer when the _loop is waiting. */
680   if (session->waiting)
681     JBUF_SIGNAL (session);
682
683 finished:
684   JBUF_UNLOCK (session);
685
686   return res;
687
688   /* ERRORS */
689 out_flushing:
690   {
691     res = session->srcresult;
692     GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res));
693     gst_buffer_unref (buffer);
694     goto finished;
695   }
696 duplicate:
697   {
698     GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping",
699         seqnum);
700     session->num_duplicates++;
701     gst_buffer_unref (buffer);
702     goto finished;
703   }
704 }
705
706 static gboolean
707 gst_rdt_manager_parse_caps (GstRDTManager * rdtmanager,
708     GstRDTManagerSession * session, GstCaps * caps)
709 {
710   GstStructure *caps_struct;
711   guint val;
712
713   /* first parse the caps */
714   caps_struct = gst_caps_get_structure (caps, 0);
715
716   GST_DEBUG_OBJECT (rdtmanager, "got caps");
717
718   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
719    * measure the amount of data in the buffer */
720   if (!gst_structure_get_int (caps_struct, "clock-rate", &session->clock_rate))
721     session->clock_rate = 1000;
722
723   if (session->clock_rate <= 0)
724     goto wrong_rate;
725
726   GST_DEBUG_OBJECT (rdtmanager, "got clock-rate %d", session->clock_rate);
727
728   /* gah, clock-base is uint. If we don't have a base, we will use the first
729    * buffer timestamp as the base time. This will screw up sync but it's better
730    * than nothing. */
731   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
732     session->clock_base = val;
733   else
734     session->clock_base = -1;
735
736   GST_DEBUG_OBJECT (rdtmanager, "got clock-base %" G_GINT64_FORMAT,
737       session->clock_base);
738
739   /* first expected seqnum */
740   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
741     session->next_seqnum = val;
742   else
743     session->next_seqnum = -1;
744
745   GST_DEBUG_OBJECT (rdtmanager, "got seqnum-base %d", session->next_seqnum);
746
747   return TRUE;
748
749   /* ERRORS */
750 wrong_rate:
751   {
752     GST_DEBUG_OBJECT (rdtmanager, "Invalid clock-rate %d", session->clock_rate);
753     return FALSE;
754   }
755 }
756
757 static gboolean
758 gst_rdt_manager_event_rdt (GstPad * pad, GstObject * parent, GstEvent * event)
759 {
760   GstRDTManager *rdtmanager;
761   GstRDTManagerSession *session;
762   gboolean res;
763
764   rdtmanager = GST_RDT_MANAGER (parent);
765   /* find session */
766   session = gst_pad_get_element_private (pad);
767
768   switch (GST_EVENT_TYPE (event)) {
769     case GST_EVENT_CAPS:
770     {
771       GstCaps *caps;
772
773       gst_event_parse_caps (event, &caps);
774       res = gst_rdt_manager_parse_caps (rdtmanager, session, caps);
775       gst_event_unref (event);
776       break;
777     }
778     default:
779       res = gst_pad_event_default (pad, parent, event);
780       break;
781   }
782   return res;
783 }
784
785 static GstFlowReturn
786 gst_rdt_manager_chain_rdt (GstPad * pad, GstObject * parent, GstBuffer * buffer)
787 {
788   GstFlowReturn res;
789   GstRDTManager *rdtmanager;
790   GstRDTManagerSession *session;
791   GstClockTime timestamp;
792   GstRDTPacket packet;
793   guint32 ssrc;
794   guint8 pt;
795   gboolean more;
796
797   rdtmanager = GST_RDT_MANAGER (parent);
798
799   GST_DEBUG_OBJECT (rdtmanager, "got RDT packet");
800
801   ssrc = 0;
802   pt = 0;
803
804   GST_DEBUG_OBJECT (rdtmanager, "SSRC %08x, PT %d", ssrc, pt);
805
806   /* find session */
807   session = gst_pad_get_element_private (pad);
808
809   /* see if we have the pad */
810   if (!session->active) {
811     activate_session (rdtmanager, session, ssrc, pt);
812     session->active = TRUE;
813   }
814
815   if (GST_BUFFER_IS_DISCONT (buffer)) {
816     GST_DEBUG_OBJECT (rdtmanager, "received discont");
817     session->discont = TRUE;
818   }
819
820   res = GST_FLOW_OK;
821
822   /* take the timestamp of the buffer. This is the time when the packet was
823    * received and is used to calculate jitter and clock skew. We will adjust
824    * this timestamp with the smoothed value after processing it in the
825    * jitterbuffer. */
826   timestamp = GST_BUFFER_TIMESTAMP (buffer);
827   /* bring to running time */
828   timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME,
829       timestamp);
830
831   more = gst_rdt_buffer_get_first_packet (buffer, &packet);
832   while (more) {
833     GstRDTType type;
834
835     type = gst_rdt_packet_get_type (&packet);
836     GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type);
837
838     if (GST_RDT_IS_DATA_TYPE (type)) {
839       GST_DEBUG_OBJECT (rdtmanager, "We have a data packet");
840       res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet);
841     } else {
842       switch (type) {
843         default:
844           GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet");
845           break;
846       }
847     }
848     if (res != GST_FLOW_OK)
849       break;
850
851     more = gst_rdt_packet_move_to_next (&packet);
852   }
853
854   gst_buffer_unref (buffer);
855
856   return res;
857 }
858
859 /* push packets from the queue to the downstream demuxer */
860 static void
861 gst_rdt_manager_loop (GstPad * pad)
862 {
863   GstRDTManager *rdtmanager;
864   GstRDTManagerSession *session;
865   GstBuffer *buffer;
866   GstFlowReturn result;
867
868   rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
869
870   session = gst_pad_get_element_private (pad);
871
872   JBUF_LOCK_CHECK (session, flushing);
873   GST_DEBUG_OBJECT (rdtmanager, "Peeking item");
874   while (TRUE) {
875     /* always wait if we are blocked */
876     if (!session->blocked) {
877       /* if we have a packet, we can exit the loop and grab it */
878       if (rdt_jitter_buffer_num_packets (session->jbuf) > 0)
879         break;
880       /* no packets but we are EOS, do eos logic */
881       if (session->eos)
882         goto do_eos;
883     }
884     /* underrun, wait for packets or flushing now */
885     session->waiting = TRUE;
886     JBUF_WAIT_CHECK (session, flushing);
887     session->waiting = FALSE;
888   }
889
890   buffer = rdt_jitter_buffer_pop (session->jbuf);
891
892   GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer);
893
894   if (session->discont) {
895     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
896     session->discont = FALSE;
897   }
898
899   JBUF_UNLOCK (session);
900
901   result = gst_pad_push (session->recv_rtp_src, buffer);
902   if (result != GST_FLOW_OK)
903     goto pause;
904
905   return;
906
907   /* ERRORS */
908 flushing:
909   {
910     GST_DEBUG_OBJECT (rdtmanager, "we are flushing");
911     gst_pad_pause_task (session->recv_rtp_src);
912     JBUF_UNLOCK (session);
913     return;
914   }
915 do_eos:
916   {
917     /* store result, we are flushing now */
918     GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream");
919     session->srcresult = GST_FLOW_EOS;
920     gst_pad_pause_task (session->recv_rtp_src);
921     gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ());
922     JBUF_UNLOCK (session);
923     return;
924   }
925 pause:
926   {
927     GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s",
928         gst_flow_get_name (result));
929
930     JBUF_LOCK (session);
931     /* store result */
932     session->srcresult = result;
933     /* we don't post errors or anything because upstream will do that for us
934      * when we pass the return value upstream. */
935     gst_pad_pause_task (session->recv_rtp_src);
936     JBUF_UNLOCK (session);
937     return;
938   }
939 }
940
941 static GstFlowReturn
942 gst_rdt_manager_chain_rtcp (GstPad * pad, GstObject * parent,
943     GstBuffer * buffer)
944 {
945   GstRDTManager *src;
946
947 #ifdef HAVE_RTCP
948   gboolean valid;
949   GstRTCPPacket packet;
950   gboolean more;
951 #endif
952
953   src = GST_RDT_MANAGER (parent);
954
955   GST_DEBUG_OBJECT (src, "got rtcp packet");
956
957 #ifdef HAVE_RTCP
958   valid = gst_rtcp_buffer_validate (buffer);
959   if (!valid)
960     goto bad_packet;
961
962   /* position on first packet */
963   more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
964   while (more) {
965     switch (gst_rtcp_packet_get_type (&packet)) {
966       case GST_RTCP_TYPE_SR:
967       {
968         guint32 ssrc, rtptime, packet_count, octet_count;
969         guint64 ntptime;
970         guint count, i;
971
972         gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
973             &packet_count, &octet_count);
974
975         GST_DEBUG_OBJECT (src,
976             "got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT
977             ", RTP %u, PC %u, OC %u", ssrc, ntptime, rtptime, packet_count,
978             octet_count);
979
980         count = gst_rtcp_packet_get_rb_count (&packet);
981         for (i = 0; i < count; i++) {
982           guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
983           guint8 fractionlost;
984           gint32 packetslost;
985
986           gst_rtcp_packet_get_rb (&packet, i, &ssrc, &fractionlost,
987               &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
988
989           GST_DEBUG_OBJECT (src, "got RB packet %d: SSRC %08x, FL %u"
990               ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", ssrc, fractionlost,
991               packetslost, exthighestseq, jitter, lsr, dlsr);
992         }
993         break;
994       }
995       case GST_RTCP_TYPE_RR:
996       {
997         guint32 ssrc;
998         guint count, i;
999
1000         ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
1001
1002         GST_DEBUG_OBJECT (src, "got RR packet: SSRC %08x", ssrc);
1003
1004         count = gst_rtcp_packet_get_rb_count (&packet);
1005         for (i = 0; i < count; i++) {
1006           guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
1007           guint8 fractionlost;
1008           gint32 packetslost;
1009
1010           gst_rtcp_packet_get_rb (&packet, i, &ssrc, &fractionlost,
1011               &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
1012
1013           GST_DEBUG_OBJECT (src, "got RB packet %d: SSRC %08x, FL %u"
1014               ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", ssrc, fractionlost,
1015               packetslost, exthighestseq, jitter, lsr, dlsr);
1016         }
1017         break;
1018       }
1019       case GST_RTCP_TYPE_SDES:
1020       {
1021         guint chunks, i, j;
1022         gboolean more_chunks, more_items;
1023
1024         chunks = gst_rtcp_packet_sdes_get_chunk_count (&packet);
1025         GST_DEBUG_OBJECT (src, "got SDES packet with %d chunks", chunks);
1026
1027         more_chunks = gst_rtcp_packet_sdes_first_chunk (&packet);
1028         i = 0;
1029         while (more_chunks) {
1030           guint32 ssrc;
1031
1032           ssrc = gst_rtcp_packet_sdes_get_ssrc (&packet);
1033
1034           GST_DEBUG_OBJECT (src, "chunk %d, SSRC %08x", i, ssrc);
1035
1036           more_items = gst_rtcp_packet_sdes_first_item (&packet);
1037           j = 0;
1038           while (more_items) {
1039             GstRTCPSDESType type;
1040             guint8 len;
1041             gchar *data;
1042
1043             gst_rtcp_packet_sdes_get_item (&packet, &type, &len, &data);
1044
1045             GST_DEBUG_OBJECT (src, "item %d, type %d, len %d, data %s", j,
1046                 type, len, data);
1047
1048             more_items = gst_rtcp_packet_sdes_next_item (&packet);
1049             j++;
1050           }
1051           more_chunks = gst_rtcp_packet_sdes_next_chunk (&packet);
1052           i++;
1053         }
1054         break;
1055       }
1056       case GST_RTCP_TYPE_BYE:
1057       {
1058         guint count, i;
1059         gchar *reason;
1060
1061         reason = gst_rtcp_packet_bye_get_reason (&packet);
1062         GST_DEBUG_OBJECT (src, "got BYE packet (reason: %s)",
1063             GST_STR_NULL (reason));
1064         g_free (reason);
1065
1066         count = gst_rtcp_packet_bye_get_ssrc_count (&packet);
1067         for (i = 0; i < count; i++) {
1068           guint32 ssrc;
1069
1070
1071           ssrc = gst_rtcp_packet_bye_get_nth_ssrc (&packet, i);
1072
1073           GST_DEBUG_OBJECT (src, "SSRC: %08x", ssrc);
1074         }
1075         break;
1076       }
1077       case GST_RTCP_TYPE_APP:
1078         GST_DEBUG_OBJECT (src, "got APP packet");
1079         break;
1080       default:
1081         GST_WARNING_OBJECT (src, "got unknown RTCP packet");
1082         break;
1083     }
1084     more = gst_rtcp_packet_move_to_next (&packet);
1085   }
1086   gst_buffer_unref (buffer);
1087   return GST_FLOW_OK;
1088
1089 bad_packet:
1090   {
1091     GST_WARNING_OBJECT (src, "got invalid RTCP packet");
1092     return GST_FLOW_OK;
1093   }
1094 #else
1095   return GST_FLOW_OK;
1096 #endif
1097 }
1098
1099 static void
1100 gst_rdt_manager_set_property (GObject * object, guint prop_id,
1101     const GValue * value, GParamSpec * pspec)
1102 {
1103   GstRDTManager *src;
1104
1105   src = GST_RDT_MANAGER (object);
1106
1107   switch (prop_id) {
1108     case PROP_LATENCY:
1109       src->latency = g_value_get_uint (value);
1110       break;
1111     default:
1112       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1113       break;
1114   }
1115 }
1116
1117 static void
1118 gst_rdt_manager_get_property (GObject * object, guint prop_id, GValue * value,
1119     GParamSpec * pspec)
1120 {
1121   GstRDTManager *src;
1122
1123   src = GST_RDT_MANAGER (object);
1124
1125   switch (prop_id) {
1126     case PROP_LATENCY:
1127       g_value_set_uint (value, src->latency);
1128       break;
1129     default:
1130       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1131       break;
1132   }
1133 }
1134
1135 static GstClock *
1136 gst_rdt_manager_provide_clock (GstElement * element)
1137 {
1138   GstRDTManager *rdtmanager;
1139
1140   rdtmanager = GST_RDT_MANAGER (element);
1141
1142   return GST_CLOCK_CAST (gst_object_ref (rdtmanager->provided_clock));
1143 }
1144
1145 static GstStateChangeReturn
1146 gst_rdt_manager_change_state (GstElement * element, GstStateChange transition)
1147 {
1148   GstStateChangeReturn ret;
1149
1150   switch (transition) {
1151     default:
1152       break;
1153   }
1154
1155   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1156
1157   switch (transition) {
1158     case GST_STATE_CHANGE_READY_TO_PAUSED:
1159     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1160       /* we're NO_PREROLL when going to PAUSED */
1161       ret = GST_STATE_CHANGE_NO_PREROLL;
1162       break;
1163     default:
1164       break;
1165   }
1166
1167   return ret;
1168 }
1169
1170 /* Create a pad for receiving RTP for the session in @name
1171  */
1172 static GstPad *
1173 create_recv_rtp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1174     const gchar * name)
1175 {
1176   guint sessid;
1177   GstRDTManagerSession *session;
1178
1179   /* first get the session number */
1180   if (name == NULL || sscanf (name, "recv_rtp_sink_%u", &sessid) != 1)
1181     goto no_name;
1182
1183   GST_DEBUG_OBJECT (rdtmanager, "finding session %d", sessid);
1184
1185   /* get or create session */
1186   session = find_session_by_id (rdtmanager, sessid);
1187   if (!session) {
1188     GST_DEBUG_OBJECT (rdtmanager, "creating session %d", sessid);
1189     /* create session now */
1190     session = create_session (rdtmanager, sessid);
1191     if (session == NULL)
1192       goto create_error;
1193   }
1194   /* check if pad was requested */
1195   if (session->recv_rtp_sink != NULL)
1196     goto existed;
1197
1198   GST_DEBUG_OBJECT (rdtmanager, "getting RTP sink pad");
1199
1200   session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
1201   gst_pad_set_element_private (session->recv_rtp_sink, session);
1202   gst_pad_set_event_function (session->recv_rtp_sink,
1203       gst_rdt_manager_event_rdt);
1204   gst_pad_set_chain_function (session->recv_rtp_sink,
1205       gst_rdt_manager_chain_rdt);
1206   gst_pad_set_active (session->recv_rtp_sink, TRUE);
1207   gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);
1208
1209   return session->recv_rtp_sink;
1210
1211   /* ERRORS */
1212 no_name:
1213   {
1214     g_warning ("rdtmanager: invalid name given");
1215     return NULL;
1216   }
1217 create_error:
1218   {
1219     /* create_session already warned */
1220     return NULL;
1221   }
1222 existed:
1223   {
1224     g_warning ("rdtmanager: recv_rtp pad already requested for session %d",
1225         sessid);
1226     return NULL;
1227   }
1228 }
1229
1230 /* Create a pad for receiving RTCP for the session in @name
1231  */
1232 static GstPad *
1233 create_recv_rtcp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1234     const gchar * name)
1235 {
1236   guint sessid;
1237   GstRDTManagerSession *session;
1238
1239   /* first get the session number */
1240   if (name == NULL || sscanf (name, "recv_rtcp_sink_%u", &sessid) != 1)
1241     goto no_name;
1242
1243   GST_DEBUG_OBJECT (rdtmanager, "finding session %d", sessid);
1244
1245   /* get the session, it must exist or we error */
1246   session = find_session_by_id (rdtmanager, sessid);
1247   if (!session)
1248     goto no_session;
1249
1250   /* check if pad was requested */
1251   if (session->recv_rtcp_sink != NULL)
1252     goto existed;
1253
1254   GST_DEBUG_OBJECT (rdtmanager, "getting RTCP sink pad");
1255
1256   session->recv_rtcp_sink = gst_pad_new_from_template (templ, name);
1257   gst_pad_set_element_private (session->recv_rtp_sink, session);
1258   gst_pad_set_chain_function (session->recv_rtcp_sink,
1259       gst_rdt_manager_chain_rtcp);
1260   gst_pad_set_active (session->recv_rtcp_sink, TRUE);
1261   gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtcp_sink);
1262
1263   return session->recv_rtcp_sink;
1264
1265   /* ERRORS */
1266 no_name:
1267   {
1268     g_warning ("rdtmanager: invalid name given");
1269     return NULL;
1270   }
1271 no_session:
1272   {
1273     g_warning ("rdtmanager: no session with id %d", sessid);
1274     return NULL;
1275   }
1276 existed:
1277   {
1278     g_warning ("rdtmanager: recv_rtcp pad already requested for session %d",
1279         sessid);
1280     return NULL;
1281   }
1282 }
1283
1284 /* Create a pad for sending RTCP for the session in @name
1285  */
1286 static GstPad *
1287 create_rtcp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1288     const gchar * name)
1289 {
1290   guint sessid;
1291   GstRDTManagerSession *session;
1292
1293   /* first get the session number */
1294   if (name == NULL || sscanf (name, "rtcp_src_%u", &sessid) != 1)
1295     goto no_name;
1296
1297   /* get or create session */
1298   session = find_session_by_id (rdtmanager, sessid);
1299   if (!session)
1300     goto no_session;
1301
1302   /* check if pad was requested */
1303   if (session->rtcp_src != NULL)
1304     goto existed;
1305
1306   session->rtcp_src = gst_pad_new_from_template (templ, name);
1307   gst_pad_set_active (session->rtcp_src, TRUE);
1308   gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->rtcp_src);
1309
1310   return session->rtcp_src;
1311
1312   /* ERRORS */
1313 no_name:
1314   {
1315     g_warning ("rdtmanager: invalid name given");
1316     return NULL;
1317   }
1318 no_session:
1319   {
1320     g_warning ("rdtmanager: session with id %d does not exist", sessid);
1321     return NULL;
1322   }
1323 existed:
1324   {
1325     g_warning ("rdtmanager: rtcp_src pad already requested for session %d",
1326         sessid);
1327     return NULL;
1328   }
1329 }
1330
1331 /* 
1332  */
1333 static GstPad *
1334 gst_rdt_manager_request_new_pad (GstElement * element,
1335     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1336 {
1337   GstRDTManager *rdtmanager;
1338   GstElementClass *klass;
1339   GstPad *result;
1340
1341   g_return_val_if_fail (templ != NULL, NULL);
1342   g_return_val_if_fail (GST_IS_RDT_MANAGER (element), NULL);
1343
1344   rdtmanager = GST_RDT_MANAGER (element);
1345   klass = GST_ELEMENT_GET_CLASS (element);
1346
1347   /* figure out the template */
1348   if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%u")) {
1349     result = create_recv_rtp (rdtmanager, templ, name);
1350   } else if (templ == gst_element_class_get_pad_template (klass,
1351           "recv_rtcp_sink_%u")) {
1352     result = create_recv_rtcp (rdtmanager, templ, name);
1353   } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src_%u")) {
1354     result = create_rtcp (rdtmanager, templ, name);
1355   } else
1356     goto wrong_template;
1357
1358   return result;
1359
1360   /* ERRORS */
1361 wrong_template:
1362   {
1363     g_warning ("rdtmanager: this is not our template");
1364     return NULL;
1365   }
1366 }
1367
1368 static void
1369 gst_rdt_manager_release_pad (GstElement * element, GstPad * pad)
1370 {
1371 }