add parent to query function
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * RTP SSRC demuxer
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:element-gstrtpssrcdemux
24  *
25  * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
26  * packets. Its main purpose is to allow an application to easily receive and
27  * decode an RTP stream with multiple SSRCs.
28  * 
29  * For each SSRC that is detected, a new pad will be created and the
30  * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. 
31  * 
32  * <refsect2>
33  * <title>Example pipelines</title>
34  * |[
35  * gst-launch udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
36  * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
37  * to fakesink, discarding the other SSRCs.
38  * </refsect2>
39  *
40  * Last reviewed on 2007-05-28 (0.10.5)
41  */
42
43 #ifdef HAVE_CONFIG_H
44 #include "config.h"
45 #endif
46
47 #include <string.h>
48 #include <gst/rtp/gstrtpbuffer.h>
49 #include <gst/rtp/gstrtcpbuffer.h>
50
51 #include "gstrtpbin-marshal.h"
52 #include "gstrtpssrcdemux.h"
53
54 GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
55 #define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
56
57 /* generic templates */
58 static GstStaticPadTemplate rtp_ssrc_demux_sink_template =
59 GST_STATIC_PAD_TEMPLATE ("sink",
60     GST_PAD_SINK,
61     GST_PAD_ALWAYS,
62     GST_STATIC_CAPS ("application/x-rtp")
63     );
64
65 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template =
66 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
67     GST_PAD_SINK,
68     GST_PAD_ALWAYS,
69     GST_STATIC_CAPS ("application/x-rtcp")
70     );
71
72 static GstStaticPadTemplate rtp_ssrc_demux_src_template =
73 GST_STATIC_PAD_TEMPLATE ("src_%u",
74     GST_PAD_SRC,
75     GST_PAD_SOMETIMES,
76     GST_STATIC_CAPS ("application/x-rtp")
77     );
78
79 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
80 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
81     GST_PAD_SRC,
82     GST_PAD_SOMETIMES,
83     GST_STATIC_CAPS ("application/x-rtcp")
84     );
85
86 #define GST_PAD_LOCK(obj)   (g_static_rec_mutex_lock (&(obj)->padlock))
87 #define GST_PAD_UNLOCK(obj) (g_static_rec_mutex_unlock (&(obj)->padlock))
88
89 /* signals */
90 enum
91 {
92   SIGNAL_NEW_SSRC_PAD,
93   SIGNAL_REMOVED_SSRC_PAD,
94   SIGNAL_CLEAR_SSRC,
95   LAST_SIGNAL
96 };
97
98 #define gst_rtp_ssrc_demux_parent_class parent_class
99 G_DEFINE_TYPE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GST_TYPE_ELEMENT);
100
101 /* GObject vmethods */
102 static void gst_rtp_ssrc_demux_dispose (GObject * object);
103 static void gst_rtp_ssrc_demux_finalize (GObject * object);
104
105 /* GstElement vmethods */
106 static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement *
107     element, GstStateChange transition);
108
109 static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
110     guint32 ssrc);
111
112 /* sinkpad stuff */
113 static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf);
114 static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event);
115
116 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
117     GstBuffer * buf);
118 static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
119     GstEvent * event);
120 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
121     pad);
122
123 /* srcpad stuff */
124 static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
125 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad *
126     pad);
127 static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
128     GstQuery * query);
129
130 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
131
132 /*
133  * Item for storing GstPad <-> SSRC pairs.
134  */
135 struct _GstRtpSsrcDemuxPad
136 {
137   guint32 ssrc;
138   GstPad *rtp_pad;
139   GstCaps *caps;
140   GstPad *rtcp_pad;
141 };
142
143 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
144  */
145 static GstRtpSsrcDemuxPad *
146 find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
147 {
148   GSList *walk;
149
150   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
151     GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
152
153     if (pad->ssrc == ssrc)
154       return pad;
155   }
156   return NULL;
157 }
158
159 /* with PAD_LOCK */
160 static GstRtpSsrcDemuxPad *
161 find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
162 {
163   GstPad *rtp_pad, *rtcp_pad;
164   GstElementClass *klass;
165   GstPadTemplate *templ;
166   gchar *padname;
167   GstRtpSsrcDemuxPad *demuxpad;
168   GstCaps *caps;
169
170   GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
171
172   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
173   if (demuxpad != NULL) {
174     return demuxpad;
175   }
176
177   klass = GST_ELEMENT_GET_CLASS (demux);
178   templ = gst_element_class_get_pad_template (klass, "src_%u");
179   padname = g_strdup_printf ("src_%u", ssrc);
180   rtp_pad = gst_pad_new_from_template (templ, padname);
181   g_free (padname);
182
183   templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
184   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
185   rtcp_pad = gst_pad_new_from_template (templ, padname);
186   g_free (padname);
187
188   /* wrap in structure and add to list */
189   demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
190   demuxpad->ssrc = ssrc;
191   demuxpad->rtp_pad = rtp_pad;
192   demuxpad->rtcp_pad = rtcp_pad;
193
194   gst_pad_set_element_private (rtp_pad, demuxpad);
195   gst_pad_set_element_private (rtcp_pad, demuxpad);
196
197   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
198
199   /* copy caps from input */
200   caps = gst_pad_get_current_caps (demux->rtp_sink);
201   gst_pad_set_caps (rtp_pad, caps);
202   gst_caps_unref (caps);
203   gst_pad_use_fixed_caps (rtp_pad);
204   caps = gst_pad_get_current_caps (demux->rtcp_sink);
205   gst_pad_set_caps (rtcp_pad, caps);
206   gst_caps_unref (caps);
207   gst_pad_use_fixed_caps (rtcp_pad);
208
209   gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
210   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
211   gst_pad_set_iterate_internal_links_function (rtp_pad,
212       gst_rtp_ssrc_demux_iterate_internal_links_src);
213   gst_pad_set_active (rtp_pad, TRUE);
214
215   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
216   gst_pad_set_iterate_internal_links_function (rtcp_pad,
217       gst_rtp_ssrc_demux_iterate_internal_links_src);
218   gst_pad_set_active (rtcp_pad, TRUE);
219
220   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
221   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
222
223   g_signal_emit (G_OBJECT (demux),
224       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
225
226   return demuxpad;
227 }
228
229 static void
230 gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
231 {
232   GObjectClass *gobject_klass;
233   GstElementClass *gstelement_klass;
234   GstRtpSsrcDemuxClass *gstrtpssrcdemux_klass;
235
236   gobject_klass = (GObjectClass *) klass;
237   gstelement_klass = (GstElementClass *) klass;
238   gstrtpssrcdemux_klass = (GstRtpSsrcDemuxClass *) klass;
239
240   gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
241   gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
242
243   /**
244    * GstRtpSsrcDemux::new-ssrc-pad:
245    * @demux: the object which received the signal
246    * @ssrc: the SSRC of the pad
247    * @pad: the new pad.
248    *
249    * Emited when a new SSRC pad has been created.
250    */
251   gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
252       g_signal_new ("new-ssrc-pad",
253       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
254       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
255       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
256       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
257
258   /**
259    * GstRtpSsrcDemux::removed-ssrc-pad:
260    * @demux: the object which received the signal
261    * @ssrc: the SSRC of the pad
262    * @pad: the removed pad.
263    *
264    * Emited when a SSRC pad has been removed.
265    */
266   gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
267       g_signal_new ("removed-ssrc-pad",
268       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
269       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
270       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
271       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
272
273   /**
274    * GstRtpSsrcDemux::clear-ssrc:
275    * @demux: the object which received the signal
276    * @ssrc: the SSRC of the pad
277    *
278    * Action signal to remove the pad for SSRC.
279    */
280   gst_rtp_ssrc_demux_signals[SIGNAL_CLEAR_SSRC] =
281       g_signal_new ("clear-ssrc",
282       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
283       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
284       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
285
286   gstelement_klass->change_state =
287       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
288   gstrtpssrcdemux_klass->clear_ssrc =
289       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
290
291   gst_element_class_add_pad_template (gstelement_klass,
292       gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
293   gst_element_class_add_pad_template (gstelement_klass,
294       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
295   gst_element_class_add_pad_template (gstelement_klass,
296       gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
297   gst_element_class_add_pad_template (gstelement_klass,
298       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
299
300   gst_element_class_set_details_simple (gstelement_klass, "RTP SSRC Demux",
301       "Demux/Network/RTP",
302       "Splits RTP streams based on the SSRC",
303       "Wim Taymans <wim.taymans@gmail.com>");
304
305   GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
306       "rtpssrcdemux", 0, "RTP SSRC demuxer");
307 }
308
309 static void
310 gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
311 {
312   GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
313
314   demux->rtp_sink =
315       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
316           "sink"), "sink");
317   gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
318   gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
319   gst_pad_set_iterate_internal_links_function (demux->rtp_sink,
320       gst_rtp_ssrc_demux_iterate_internal_links_sink);
321   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
322
323   demux->rtcp_sink =
324       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
325           "rtcp_sink"), "rtcp_sink");
326   gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
327   gst_pad_set_event_function (demux->rtcp_sink,
328       gst_rtp_ssrc_demux_rtcp_sink_event);
329   gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
330       gst_rtp_ssrc_demux_iterate_internal_links_sink);
331   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
332
333   g_static_rec_mutex_init (&demux->padlock);
334
335   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
336 }
337
338 static void
339 gst_rtp_ssrc_demux_reset (GstRtpSsrcDemux * demux)
340 {
341   GSList *walk;
342
343   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
344     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
345
346     gst_pad_set_active (dpad->rtp_pad, FALSE);
347     gst_pad_set_active (dpad->rtcp_pad, FALSE);
348
349     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
350     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
351     g_free (dpad);
352   }
353   g_slist_free (demux->srcpads);
354   demux->srcpads = NULL;
355 }
356
357 static void
358 gst_rtp_ssrc_demux_dispose (GObject * object)
359 {
360   GstRtpSsrcDemux *demux;
361
362   demux = GST_RTP_SSRC_DEMUX (object);
363
364   gst_rtp_ssrc_demux_reset (demux);
365
366   G_OBJECT_CLASS (parent_class)->dispose (object);
367 }
368
369 static void
370 gst_rtp_ssrc_demux_finalize (GObject * object)
371 {
372   GstRtpSsrcDemux *demux;
373
374   demux = GST_RTP_SSRC_DEMUX (object);
375   g_static_rec_mutex_free (&demux->padlock);
376
377   G_OBJECT_CLASS (parent_class)->finalize (object);
378 }
379
380 static void
381 gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
382 {
383   GstRtpSsrcDemuxPad *dpad;
384
385   GST_PAD_LOCK (demux);
386   dpad = find_demux_pad_for_ssrc (demux, ssrc);
387   if (dpad == NULL) {
388     GST_PAD_UNLOCK (demux);
389     goto unknown_pad;
390   }
391
392   GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
393
394   demux->srcpads = g_slist_remove (demux->srcpads, dpad);
395   GST_PAD_UNLOCK (demux);
396
397   gst_pad_set_active (dpad->rtp_pad, FALSE);
398   gst_pad_set_active (dpad->rtcp_pad, FALSE);
399
400   g_signal_emit (G_OBJECT (demux),
401       gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD], 0, ssrc,
402       dpad->rtp_pad);
403
404   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
405   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
406
407   g_free (dpad);
408
409   return;
410
411   /* ERRORS */
412 unknown_pad:
413   {
414     GST_WARNING_OBJECT (demux, "unknown SSRC %08x", ssrc);
415     return;
416   }
417 }
418
419 static gboolean
420 gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
421 {
422   GstRtpSsrcDemux *demux;
423   gboolean res = FALSE;
424
425   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
426   if (G_UNLIKELY (demux == NULL)) {
427     gst_event_unref (event);
428     return FALSE;
429   }
430
431   switch (GST_EVENT_TYPE (event)) {
432     case GST_EVENT_FLUSH_STOP:
433       gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
434     default:
435     {
436       GSList *walk;
437       GSList *pads = NULL;
438
439       res = TRUE;
440       /* need local snapshot of pads;
441        * should not push downstream while holding lock as that might deadlock
442        * with stuff traveling upstream tyring to get this lock while holding
443        * other (stream)lock */
444       GST_PAD_LOCK (demux);
445       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
446         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
447
448         pads = g_slist_prepend (pads, gst_object_ref (pad->rtp_pad));
449       }
450       GST_PAD_UNLOCK (demux);
451       for (walk = pads; walk; walk = g_slist_next (walk)) {
452         GstPad *pad = (GstPad *) walk->data;
453
454         gst_event_ref (event);
455         res &= gst_pad_push_event (pad, event);
456         gst_object_unref (pad);
457       }
458       g_slist_free (pads);
459       gst_event_unref (event);
460       break;
461     }
462   }
463
464   gst_object_unref (demux);
465   return res;
466 }
467
468 static gboolean
469 gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
470 {
471   GstRtpSsrcDemux *demux;
472   gboolean res = FALSE;
473
474   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
475
476   switch (GST_EVENT_TYPE (event)) {
477     default:
478     {
479       GSList *walk;
480       GSList *pads = NULL;
481
482       res = TRUE;
483       GST_PAD_LOCK (demux);
484       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
485         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
486
487         pads = g_slist_prepend (pads, gst_object_ref (pad->rtcp_pad));
488       }
489       GST_PAD_UNLOCK (demux);
490       for (walk = pads; walk; walk = g_slist_next (walk)) {
491         GstPad *pad = (GstPad *) walk->data;
492
493         gst_event_ref (event);
494         res &= gst_pad_push_event (pad, event);
495         gst_object_unref (pad);
496       }
497       g_slist_free (pads);
498       gst_event_unref (event);
499       break;
500     }
501   }
502   gst_object_unref (demux);
503   return res;
504 }
505
506 static GstFlowReturn
507 gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
508 {
509   GstFlowReturn ret;
510   GstRtpSsrcDemux *demux;
511   guint32 ssrc;
512   GstRtpSsrcDemuxPad *dpad;
513   GstRTPBuffer rtp;
514   GstPad *srcpad;
515
516   demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
517
518   if (!gst_rtp_buffer_validate (buf))
519     goto invalid_payload;
520
521   gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
522   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
523   gst_rtp_buffer_unmap (&rtp);
524
525   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
526
527   GST_PAD_LOCK (demux);
528   dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
529   if (dpad == NULL) {
530     GST_PAD_UNLOCK (demux);
531     goto create_failed;
532   }
533   srcpad = gst_object_ref (dpad->rtp_pad);
534   GST_PAD_UNLOCK (demux);
535
536   /* push to srcpad */
537   ret = gst_pad_push (srcpad, buf);
538
539   gst_object_unref (srcpad);
540
541   return ret;
542
543   /* ERRORS */
544 invalid_payload:
545   {
546     /* this is fatal and should be filtered earlier */
547     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
548         ("Dropping invalid RTP payload"));
549     gst_buffer_unref (buf);
550     return GST_FLOW_ERROR;
551   }
552 create_failed:
553   {
554     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
555         ("Could not create new pad"));
556     gst_buffer_unref (buf);
557     return GST_FLOW_ERROR;
558   }
559 }
560
561 static GstFlowReturn
562 gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
563 {
564   GstFlowReturn ret;
565   GstRtpSsrcDemux *demux;
566   guint32 ssrc;
567   GstRtpSsrcDemuxPad *dpad;
568   GstRTCPPacket packet;
569   GstRTCPBuffer rtcp;
570   GstPad *srcpad;
571
572   demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
573
574   if (!gst_rtcp_buffer_validate (buf))
575     goto invalid_rtcp;
576
577   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
578   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
579     gst_rtcp_buffer_unmap (&rtcp);
580     goto invalid_rtcp;
581   }
582
583   /* first packet must be SR or RR or else the validate would have failed */
584   switch (gst_rtcp_packet_get_type (&packet)) {
585     case GST_RTCP_TYPE_SR:
586       /* get the ssrc so that we can route it to the right source pad */
587       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
588           NULL);
589       break;
590     default:
591       goto unexpected_rtcp;
592   }
593   gst_rtcp_buffer_unmap (&rtcp);
594
595   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
596
597   GST_PAD_LOCK (demux);
598   dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
599   if (dpad == NULL) {
600     GST_PAD_UNLOCK (demux);
601     goto create_failed;
602   }
603   srcpad = gst_object_ref (dpad->rtcp_pad);
604   GST_PAD_UNLOCK (demux);
605
606   /* push to srcpad */
607   ret = gst_pad_push (srcpad, buf);
608
609   gst_object_unref (srcpad);
610
611   return ret;
612
613   /* ERRORS */
614 invalid_rtcp:
615   {
616     /* this is fatal and should be filtered earlier */
617     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
618         ("Dropping invalid RTCP packet"));
619     gst_buffer_unref (buf);
620     return GST_FLOW_ERROR;
621   }
622 unexpected_rtcp:
623   {
624     GST_DEBUG_OBJECT (demux, "dropping unexpected RTCP packet");
625     gst_buffer_unref (buf);
626     return GST_FLOW_OK;
627   }
628 create_failed:
629   {
630     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
631         ("Could not create new pad"));
632     gst_buffer_unref (buf);
633     return GST_FLOW_ERROR;
634   }
635 }
636
637 static gboolean
638 gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
639 {
640   GstRtpSsrcDemux *demux;
641   const GstStructure *s;
642
643   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
644
645   switch (GST_EVENT_TYPE (event)) {
646     case GST_EVENT_CUSTOM_UPSTREAM:
647     case GST_EVENT_CUSTOM_BOTH:
648     case GST_EVENT_CUSTOM_BOTH_OOB:
649       s = gst_event_get_structure (event);
650       if (s && !gst_structure_has_field (s, "ssrc")) {
651         GSList *walk;
652
653         for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
654           GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
655
656           if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
657             GstStructure *ws;
658
659             event =
660                 GST_EVENT_CAST (gst_mini_object_make_writable
661                 (GST_MINI_OBJECT_CAST (event)));
662             ws = gst_event_writable_structure (event);
663             gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
664             break;
665           }
666         }
667       }
668       break;
669     default:
670       break;
671   }
672
673   gst_object_unref (demux);
674
675   return gst_pad_event_default (pad, event);
676 }
677
678 static GstIterator *
679 gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
680 {
681   GstRtpSsrcDemux *demux;
682   GstPad *otherpad = NULL;
683   GstIterator *it = NULL;
684   GSList *current;
685
686   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
687
688   if (!demux)
689     return NULL;
690
691   GST_PAD_LOCK (demux);
692   for (current = demux->srcpads; current; current = g_slist_next (current)) {
693     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
694
695     if (pad == dpad->rtp_pad) {
696       otherpad = demux->rtp_sink;
697       break;
698     } else if (pad == dpad->rtcp_pad) {
699       otherpad = demux->rtcp_sink;
700       break;
701     }
702   }
703   if (otherpad) {
704     GValue val = { 0, };
705
706     g_value_init (&val, GST_TYPE_PAD);
707     g_value_set_object (&val, otherpad);
708     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
709     g_value_unset (&val);
710     gst_object_unref (otherpad);
711   }
712   GST_PAD_UNLOCK (demux);
713
714   gst_object_unref (demux);
715   return it;
716 }
717
718 /* Should return 0 for elements to be included */
719 static gint
720 src_pad_compare_func (gconstpointer a, gconstpointer b)
721 {
722   GstPad *pad = GST_PAD (a);
723   const gchar *prefix = b;
724   gint res = 1;
725
726   GST_OBJECT_LOCK (pad);
727   res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix);
728   GST_OBJECT_UNLOCK (pad);
729
730   return res;
731 }
732
733 static GstIterator *
734 gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad)
735 {
736   GstRtpSsrcDemux *demux;
737   GstIterator *it = NULL;
738   GValue gval = { 0, };
739
740   demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
741
742   if (!demux)
743     return NULL;
744
745   g_value_init (&gval, G_TYPE_STRING);
746   if (pad == demux->rtp_sink)
747     g_value_set_static_string (&gval, "src_");
748   else if (pad == demux->rtcp_sink)
749     g_value_set_static_string (&gval, "rtcp_src_");
750   else
751     g_assert_not_reached ();
752
753   it = gst_element_iterate_src_pads (GST_ELEMENT (demux));
754
755   it = gst_iterator_filter (it, src_pad_compare_func, &gval);
756
757   gst_object_unref (demux);
758
759   return it;
760 }
761
762
763 static gboolean
764 gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
765     GstQuery * query)
766 {
767   GstRtpSsrcDemux *demux;
768   gboolean res = FALSE;
769
770   demux = GST_RTP_SSRC_DEMUX (parent);
771
772   switch (GST_QUERY_TYPE (query)) {
773     case GST_QUERY_LATENCY:
774     {
775
776       if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
777         gboolean live;
778         GstClockTime min_latency, max_latency;
779         GstRtpSsrcDemuxPad *demuxpad;
780
781         demuxpad = gst_pad_get_element_private (pad);
782
783         gst_query_parse_latency (query, &live, &min_latency, &max_latency);
784
785         GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
786             GST_TIME_ARGS (min_latency));
787
788         GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc);
789
790         gst_query_set_latency (query, live, min_latency, max_latency);
791       }
792       break;
793     }
794     default:
795       res = gst_pad_query_default (pad, parent, query);
796       break;
797   }
798
799   return res;
800 }
801
802 static GstStateChangeReturn
803 gst_rtp_ssrc_demux_change_state (GstElement * element,
804     GstStateChange transition)
805 {
806   GstStateChangeReturn ret;
807   GstRtpSsrcDemux *demux;
808
809   demux = GST_RTP_SSRC_DEMUX (element);
810
811   switch (transition) {
812     case GST_STATE_CHANGE_NULL_TO_READY:
813     case GST_STATE_CHANGE_READY_TO_PAUSED:
814     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
815     default:
816       break;
817   }
818
819   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
820
821   switch (transition) {
822     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
823       break;
824     case GST_STATE_CHANGE_PAUSED_TO_READY:
825       gst_rtp_ssrc_demux_reset (demux);
826       break;
827     case GST_STATE_CHANGE_READY_TO_NULL:
828     default:
829       break;
830   }
831   return ret;
832 }