rtpssrcdemux: Add ssrc to forwarded CAPS events
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * RTP SSRC demuxer
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:element-gstrtpssrcdemux
24  *
25  * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
26  * packets. Its main purpose is to allow an application to easily receive and
27  * decode an RTP stream with multiple SSRCs.
28  * 
29  * For each SSRC that is detected, a new pad will be created and the
30  * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. 
31  * 
32  * <refsect2>
33  * <title>Example pipelines</title>
34  * |[
35  * gst-launch udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
36  * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
37  * to fakesink, discarding the other SSRCs.
38  * </refsect2>
39  *
40  * Last reviewed on 2007-05-28 (0.10.5)
41  */
42
43 #ifdef HAVE_CONFIG_H
44 #include "config.h"
45 #endif
46
47 #include <string.h>
48 #include <gst/rtp/gstrtpbuffer.h>
49 #include <gst/rtp/gstrtcpbuffer.h>
50
51 #include "gstrtpbin-marshal.h"
52 #include "gstrtpssrcdemux.h"
53
54 GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
55 #define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
56
57 /* generic templates */
58 static GstStaticPadTemplate rtp_ssrc_demux_sink_template =
59 GST_STATIC_PAD_TEMPLATE ("sink",
60     GST_PAD_SINK,
61     GST_PAD_ALWAYS,
62     GST_STATIC_CAPS ("application/x-rtp")
63     );
64
65 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template =
66 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
67     GST_PAD_SINK,
68     GST_PAD_ALWAYS,
69     GST_STATIC_CAPS ("application/x-rtcp")
70     );
71
72 static GstStaticPadTemplate rtp_ssrc_demux_src_template =
73 GST_STATIC_PAD_TEMPLATE ("src_%u",
74     GST_PAD_SRC,
75     GST_PAD_SOMETIMES,
76     GST_STATIC_CAPS ("application/x-rtp")
77     );
78
79 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
80 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
81     GST_PAD_SRC,
82     GST_PAD_SOMETIMES,
83     GST_STATIC_CAPS ("application/x-rtcp")
84     );
85
86 #define GST_PAD_LOCK(obj)   (g_rec_mutex_lock (&(obj)->padlock))
87 #define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
88
89 /* signals */
90 enum
91 {
92   SIGNAL_NEW_SSRC_PAD,
93   SIGNAL_REMOVED_SSRC_PAD,
94   SIGNAL_CLEAR_SSRC,
95   LAST_SIGNAL
96 };
97
98 #define gst_rtp_ssrc_demux_parent_class parent_class
99 G_DEFINE_TYPE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GST_TYPE_ELEMENT);
100
101 /* GObject vmethods */
102 static void gst_rtp_ssrc_demux_dispose (GObject * object);
103 static void gst_rtp_ssrc_demux_finalize (GObject * object);
104
105 /* GstElement vmethods */
106 static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement *
107     element, GstStateChange transition);
108
109 static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
110     guint32 ssrc);
111
112 /* sinkpad stuff */
113 static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
114     GstBuffer * buf);
115 static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
116     GstEvent * event);
117
118 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
119     GstObject * parent, GstBuffer * buf);
120 static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
121     GstObject * parent, GstEvent * event);
122 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
123     pad, GstObject * parent);
124
125 /* srcpad stuff */
126 static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
127     GstEvent * event);
128 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
129     GstObject * parent);
130 static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
131     GstQuery * query);
132
133 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
134
135 /*
136  * Item for storing GstPad <-> SSRC pairs.
137  */
138 struct _GstRtpSsrcDemuxPad
139 {
140   guint32 ssrc;
141   GstPad *rtp_pad;
142   GstCaps *caps;
143   GstPad *rtcp_pad;
144 };
145
146 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
147  */
148 static GstRtpSsrcDemuxPad *
149 find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
150 {
151   GSList *walk;
152
153   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
154     GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
155
156     if (pad->ssrc == ssrc)
157       return pad;
158   }
159   return NULL;
160 }
161
162 static GstEvent *
163 add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
164 {
165   /* Set the ssrc on the output caps */
166   switch (GST_EVENT_TYPE (event)) {
167     case GST_EVENT_CAPS:
168     {
169       GstCaps *caps;
170       GstCaps *newcaps;
171       GstStructure *s;
172
173       gst_event_parse_caps (event, &caps);
174       newcaps = gst_caps_copy (caps);
175
176       s = gst_caps_get_structure (newcaps, 0);
177       gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
178       event = gst_event_new_caps (newcaps);
179       gst_caps_unref (newcaps);
180       break;
181     }
182     default:
183       gst_event_ref (event);
184       break;
185   }
186
187   return event;
188 }
189
190 /* with PAD_LOCK */
191 static GstRtpSsrcDemuxPad *
192 find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
193 {
194   GstPad *rtp_pad, *rtcp_pad;
195   GstElementClass *klass;
196   GstPadTemplate *templ;
197   gchar *padname;
198   GstRtpSsrcDemuxPad *demuxpad;
199   GstCaps *caps;
200
201   GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
202
203   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
204   if (demuxpad != NULL) {
205     return demuxpad;
206   }
207
208   klass = GST_ELEMENT_GET_CLASS (demux);
209   templ = gst_element_class_get_pad_template (klass, "src_%u");
210   padname = g_strdup_printf ("src_%u", ssrc);
211   rtp_pad = gst_pad_new_from_template (templ, padname);
212   g_free (padname);
213
214   templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
215   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
216   rtcp_pad = gst_pad_new_from_template (templ, padname);
217   g_free (padname);
218
219   /* wrap in structure and add to list */
220   demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
221   demuxpad->ssrc = ssrc;
222   demuxpad->rtp_pad = rtp_pad;
223   demuxpad->rtcp_pad = rtcp_pad;
224
225   gst_pad_set_element_private (rtp_pad, demuxpad);
226   gst_pad_set_element_private (rtcp_pad, demuxpad);
227
228   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
229
230   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
231   gst_pad_set_iterate_internal_links_function (rtp_pad,
232       gst_rtp_ssrc_demux_iterate_internal_links_src);
233   gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
234   gst_pad_use_fixed_caps (rtp_pad);
235   gst_pad_set_active (rtp_pad, TRUE);
236
237   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
238   gst_pad_set_iterate_internal_links_function (rtcp_pad,
239       gst_rtp_ssrc_demux_iterate_internal_links_src);
240   gst_pad_use_fixed_caps (rtcp_pad);
241   gst_pad_set_active (rtcp_pad, TRUE);
242
243   /* copy caps from input */
244   if ((caps = gst_pad_get_current_caps (demux->rtp_sink))) {
245     gst_pad_set_caps (rtp_pad, caps);
246     gst_caps_unref (caps);
247   }
248   if ((caps = gst_pad_get_current_caps (demux->rtcp_sink))) {
249     gst_pad_set_caps (rtcp_pad, caps);
250     gst_caps_unref (caps);
251   }
252   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
253   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
254
255   g_signal_emit (G_OBJECT (demux),
256       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
257
258   return demuxpad;
259 }
260
261 static void
262 gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
263 {
264   GObjectClass *gobject_klass;
265   GstElementClass *gstelement_klass;
266   GstRtpSsrcDemuxClass *gstrtpssrcdemux_klass;
267
268   gobject_klass = (GObjectClass *) klass;
269   gstelement_klass = (GstElementClass *) klass;
270   gstrtpssrcdemux_klass = (GstRtpSsrcDemuxClass *) klass;
271
272   gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
273   gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
274
275   /**
276    * GstRtpSsrcDemux::new-ssrc-pad:
277    * @demux: the object which received the signal
278    * @ssrc: the SSRC of the pad
279    * @pad: the new pad.
280    *
281    * Emited when a new SSRC pad has been created.
282    */
283   gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
284       g_signal_new ("new-ssrc-pad",
285       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
286       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
287       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
288       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
289
290   /**
291    * GstRtpSsrcDemux::removed-ssrc-pad:
292    * @demux: the object which received the signal
293    * @ssrc: the SSRC of the pad
294    * @pad: the removed pad.
295    *
296    * Emited when a SSRC pad has been removed.
297    */
298   gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
299       g_signal_new ("removed-ssrc-pad",
300       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
301       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
302       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
303       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
304
305   /**
306    * GstRtpSsrcDemux::clear-ssrc:
307    * @demux: the object which received the signal
308    * @ssrc: the SSRC of the pad
309    *
310    * Action signal to remove the pad for SSRC.
311    */
312   gst_rtp_ssrc_demux_signals[SIGNAL_CLEAR_SSRC] =
313       g_signal_new ("clear-ssrc",
314       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
315       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
316       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
317
318   gstelement_klass->change_state =
319       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
320   gstrtpssrcdemux_klass->clear_ssrc =
321       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
322
323   gst_element_class_add_pad_template (gstelement_klass,
324       gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
325   gst_element_class_add_pad_template (gstelement_klass,
326       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
327   gst_element_class_add_pad_template (gstelement_klass,
328       gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
329   gst_element_class_add_pad_template (gstelement_klass,
330       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
331
332   gst_element_class_set_details_simple (gstelement_klass, "RTP SSRC Demux",
333       "Demux/Network/RTP",
334       "Splits RTP streams based on the SSRC",
335       "Wim Taymans <wim.taymans@gmail.com>");
336
337   GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
338       "rtpssrcdemux", 0, "RTP SSRC demuxer");
339 }
340
341 static void
342 gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
343 {
344   GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
345
346   demux->rtp_sink =
347       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
348           "sink"), "sink");
349   gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
350   gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
351   gst_pad_set_iterate_internal_links_function (demux->rtp_sink,
352       gst_rtp_ssrc_demux_iterate_internal_links_sink);
353   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
354
355   demux->rtcp_sink =
356       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
357           "rtcp_sink"), "rtcp_sink");
358   gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
359   gst_pad_set_event_function (demux->rtcp_sink,
360       gst_rtp_ssrc_demux_rtcp_sink_event);
361   gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
362       gst_rtp_ssrc_demux_iterate_internal_links_sink);
363   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
364
365   g_rec_mutex_init (&demux->padlock);
366
367   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
368 }
369
370 static void
371 gst_rtp_ssrc_demux_reset (GstRtpSsrcDemux * demux)
372 {
373   GSList *walk;
374
375   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
376     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
377
378     gst_pad_set_active (dpad->rtp_pad, FALSE);
379     gst_pad_set_active (dpad->rtcp_pad, FALSE);
380
381     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
382     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
383     g_free (dpad);
384   }
385   g_slist_free (demux->srcpads);
386   demux->srcpads = NULL;
387 }
388
389 static void
390 gst_rtp_ssrc_demux_dispose (GObject * object)
391 {
392   GstRtpSsrcDemux *demux;
393
394   demux = GST_RTP_SSRC_DEMUX (object);
395
396   gst_rtp_ssrc_demux_reset (demux);
397
398   G_OBJECT_CLASS (parent_class)->dispose (object);
399 }
400
401 static void
402 gst_rtp_ssrc_demux_finalize (GObject * object)
403 {
404   GstRtpSsrcDemux *demux;
405
406   demux = GST_RTP_SSRC_DEMUX (object);
407   g_rec_mutex_clear (&demux->padlock);
408
409   G_OBJECT_CLASS (parent_class)->finalize (object);
410 }
411
412 static void
413 gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
414 {
415   GstRtpSsrcDemuxPad *dpad;
416
417   GST_PAD_LOCK (demux);
418   dpad = find_demux_pad_for_ssrc (demux, ssrc);
419   if (dpad == NULL) {
420     GST_PAD_UNLOCK (demux);
421     goto unknown_pad;
422   }
423
424   GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
425
426   demux->srcpads = g_slist_remove (demux->srcpads, dpad);
427   GST_PAD_UNLOCK (demux);
428
429   gst_pad_set_active (dpad->rtp_pad, FALSE);
430   gst_pad_set_active (dpad->rtcp_pad, FALSE);
431
432   g_signal_emit (G_OBJECT (demux),
433       gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD], 0, ssrc,
434       dpad->rtp_pad);
435
436   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
437   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
438
439   g_free (dpad);
440
441   return;
442
443   /* ERRORS */
444 unknown_pad:
445   {
446     GST_WARNING_OBJECT (demux, "unknown SSRC %08x", ssrc);
447     return;
448   }
449 }
450
451 static gboolean
452 gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
453     GstEvent * event)
454 {
455   GstRtpSsrcDemux *demux;
456   gboolean res = FALSE;
457
458   demux = GST_RTP_SSRC_DEMUX (parent);
459
460   switch (GST_EVENT_TYPE (event)) {
461     case GST_EVENT_FLUSH_STOP:
462       gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
463       /* fallthrough */
464     default:
465     {
466       GSList *walk;
467       GSList *pads = NULL;
468
469       res = TRUE;
470       /* need local snapshot of pads;
471        * should not push downstream while holding lock as that might deadlock
472        * with stuff traveling upstream tyring to get this lock while holding
473        * other (stream)lock */
474       GST_PAD_LOCK (demux);
475       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
476         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
477
478         pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
479         gst_object_ref (pad->rtp_pad);
480
481         pads = g_slist_prepend (pads, pad);
482       }
483       GST_PAD_UNLOCK (demux);
484
485       for (walk = pads; walk; walk = g_slist_next (walk)) {
486         GstRtpSsrcDemuxPad *dpad = walk->data;
487         GstEvent *newevent;
488
489         newevent = add_ssrc_and_ref (event, dpad->ssrc);
490
491         res &= gst_pad_push_event (dpad->rtp_pad, newevent);
492         gst_object_unref (dpad->rtp_pad);
493         g_slice_free (GstRtpSsrcDemuxPad, dpad);
494       }
495       g_slist_free (pads);
496       gst_event_unref (event);
497       break;
498     }
499   }
500
501   return res;
502 }
503
504 static gboolean
505 gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstObject * parent,
506     GstEvent * event)
507 {
508   GstRtpSsrcDemux *demux;
509   gboolean res = TRUE;
510   GSList *walk;
511   GSList *pads = NULL;
512
513   demux = GST_RTP_SSRC_DEMUX (parent);
514
515   GST_PAD_LOCK (demux);
516   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
517     GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
518
519     pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
520     gst_object_ref (pad->rtcp_pad);
521
522     pads = g_slist_prepend (pads, pad);
523   }
524   GST_PAD_UNLOCK (demux);
525
526   for (walk = pads; walk; walk = g_slist_next (walk)) {
527     GstRtpSsrcDemuxPad *dpad = walk->data;
528     GstEvent *newevent;
529
530     newevent = add_ssrc_and_ref (event, dpad->ssrc);
531
532     res &= gst_pad_push_event (dpad->rtcp_pad, newevent);
533     gst_object_unref (dpad->rtcp_pad);
534     g_slice_free (GstRtpSsrcDemuxPad, dpad);
535   }
536   g_slist_free (pads);
537   gst_event_unref (event);
538
539   return res;
540 }
541
542 static GstFlowReturn
543 gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
544 {
545   GstFlowReturn ret;
546   GstRtpSsrcDemux *demux;
547   guint32 ssrc;
548   GstRtpSsrcDemuxPad *dpad;
549   GstRTPBuffer rtp = { NULL };
550   GstPad *srcpad;
551
552   demux = GST_RTP_SSRC_DEMUX (parent);
553
554   if (!gst_rtp_buffer_validate (buf))
555     goto invalid_payload;
556
557   gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
558   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
559   gst_rtp_buffer_unmap (&rtp);
560
561   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
562
563   GST_PAD_LOCK (demux);
564   dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
565   if (dpad == NULL) {
566     GST_PAD_UNLOCK (demux);
567     goto create_failed;
568   }
569   srcpad = gst_object_ref (dpad->rtp_pad);
570   GST_PAD_UNLOCK (demux);
571
572   /* push to srcpad */
573   ret = gst_pad_push (srcpad, buf);
574
575   gst_object_unref (srcpad);
576
577   return ret;
578
579   /* ERRORS */
580 invalid_payload:
581   {
582     /* this is fatal and should be filtered earlier */
583     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
584         ("Dropping invalid RTP payload"));
585     gst_buffer_unref (buf);
586     return GST_FLOW_ERROR;
587   }
588 create_failed:
589   {
590     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
591         ("Could not create new pad"));
592     gst_buffer_unref (buf);
593     return GST_FLOW_ERROR;
594   }
595 }
596
597 static GstFlowReturn
598 gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
599     GstBuffer * buf)
600 {
601   GstFlowReturn ret;
602   GstRtpSsrcDemux *demux;
603   guint32 ssrc;
604   GstRtpSsrcDemuxPad *dpad;
605   GstRTCPPacket packet;
606   GstRTCPBuffer rtcp = { NULL, };
607   GstPad *srcpad;
608
609   demux = GST_RTP_SSRC_DEMUX (parent);
610
611   if (!gst_rtcp_buffer_validate (buf))
612     goto invalid_rtcp;
613
614   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
615   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
616     gst_rtcp_buffer_unmap (&rtcp);
617     goto invalid_rtcp;
618   }
619
620   /* first packet must be SR or RR or else the validate would have failed */
621   switch (gst_rtcp_packet_get_type (&packet)) {
622     case GST_RTCP_TYPE_SR:
623       /* get the ssrc so that we can route it to the right source pad */
624       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
625           NULL);
626       break;
627     default:
628       goto unexpected_rtcp;
629   }
630   gst_rtcp_buffer_unmap (&rtcp);
631
632   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
633
634   GST_PAD_LOCK (demux);
635   dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
636   if (dpad == NULL) {
637     GST_PAD_UNLOCK (demux);
638     goto create_failed;
639   }
640   srcpad = gst_object_ref (dpad->rtcp_pad);
641   GST_PAD_UNLOCK (demux);
642
643   /* push to srcpad */
644   ret = gst_pad_push (srcpad, buf);
645
646   gst_object_unref (srcpad);
647
648   return ret;
649
650   /* ERRORS */
651 invalid_rtcp:
652   {
653     /* this is fatal and should be filtered earlier */
654     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
655         ("Dropping invalid RTCP packet"));
656     gst_buffer_unref (buf);
657     return GST_FLOW_ERROR;
658   }
659 unexpected_rtcp:
660   {
661     GST_DEBUG_OBJECT (demux, "dropping unexpected RTCP packet");
662     gst_buffer_unref (buf);
663     return GST_FLOW_OK;
664   }
665 create_failed:
666   {
667     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
668         ("Could not create new pad"));
669     gst_buffer_unref (buf);
670     return GST_FLOW_ERROR;
671   }
672 }
673
674 static GstRtpSsrcDemuxPad *
675 find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
676 {
677   GSList *walk;
678
679   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
680     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
681     if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
682       return dpad;
683     }
684   }
685
686   return NULL;
687 }
688
689
690 static gboolean
691 gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
692     GstEvent * event)
693 {
694   GstRtpSsrcDemux *demux;
695   const GstStructure *s;
696
697   demux = GST_RTP_SSRC_DEMUX (parent);
698
699   switch (GST_EVENT_TYPE (event)) {
700     case GST_EVENT_CUSTOM_UPSTREAM:
701     case GST_EVENT_CUSTOM_BOTH:
702     case GST_EVENT_CUSTOM_BOTH_OOB:
703       s = gst_event_get_structure (event);
704       if (s && !gst_structure_has_field (s, "ssrc")) {
705         GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
706
707         if (dpad) {
708           GstStructure *ws;
709
710           event = gst_event_make_writable (event);
711           ws = gst_event_writable_structure (event);
712           gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
713         }
714       }
715       break;
716     default:
717       break;
718   }
719
720   return gst_pad_event_default (pad, parent, event);
721 }
722
723 static GstIterator *
724 gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
725 {
726   GstRtpSsrcDemux *demux;
727   GstPad *otherpad = NULL;
728   GstIterator *it = NULL;
729   GSList *current;
730
731   demux = GST_RTP_SSRC_DEMUX (parent);
732
733   GST_PAD_LOCK (demux);
734   for (current = demux->srcpads; current; current = g_slist_next (current)) {
735     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
736
737     if (pad == dpad->rtp_pad) {
738       otherpad = demux->rtp_sink;
739       break;
740     } else if (pad == dpad->rtcp_pad) {
741       otherpad = demux->rtcp_sink;
742       break;
743     }
744   }
745   if (otherpad) {
746     GValue val = { 0, };
747
748     g_value_init (&val, GST_TYPE_PAD);
749     g_value_set_object (&val, otherpad);
750     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
751     g_value_unset (&val);
752
753   }
754   GST_PAD_UNLOCK (demux);
755
756   return it;
757 }
758
759 /* Should return 0 for elements to be included */
760 static gint
761 src_pad_compare_func (gconstpointer a, gconstpointer b)
762 {
763   GstPad *pad = GST_PAD (g_value_get_object (a));
764   const gchar *prefix = g_value_get_string (b);
765   gint res = 1;
766
767   GST_OBJECT_LOCK (pad);
768   res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix);
769   GST_OBJECT_UNLOCK (pad);
770
771   return res;
772 }
773
774 static GstIterator *
775 gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
776     GstObject * parent)
777 {
778   GstRtpSsrcDemux *demux;
779   GstIterator *it = NULL;
780   GValue gval = { 0, };
781
782   demux = GST_RTP_SSRC_DEMUX (parent);
783
784   g_value_init (&gval, G_TYPE_STRING);
785   if (pad == demux->rtp_sink)
786     g_value_set_static_string (&gval, "src_");
787   else if (pad == demux->rtcp_sink)
788     g_value_set_static_string (&gval, "rtcp_src_");
789   else
790     g_assert_not_reached ();
791
792   it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
793   it = gst_iterator_filter (it, src_pad_compare_func, &gval);
794
795   return it;
796 }
797
798
799 static gboolean
800 gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
801     GstQuery * query)
802 {
803   GstRtpSsrcDemux *demux;
804   gboolean res = FALSE;
805
806   demux = GST_RTP_SSRC_DEMUX (parent);
807
808   switch (GST_QUERY_TYPE (query)) {
809     case GST_QUERY_LATENCY:
810     {
811
812       if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
813         gboolean live;
814         GstClockTime min_latency, max_latency;
815         GstRtpSsrcDemuxPad *demuxpad;
816
817         demuxpad = gst_pad_get_element_private (pad);
818
819         gst_query_parse_latency (query, &live, &min_latency, &max_latency);
820
821         GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
822             GST_TIME_ARGS (min_latency));
823
824         GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc);
825
826         gst_query_set_latency (query, live, min_latency, max_latency);
827       }
828       break;
829     }
830     default:
831       res = gst_pad_query_default (pad, parent, query);
832       break;
833   }
834
835   return res;
836 }
837
838 static GstStateChangeReturn
839 gst_rtp_ssrc_demux_change_state (GstElement * element,
840     GstStateChange transition)
841 {
842   GstStateChangeReturn ret;
843   GstRtpSsrcDemux *demux;
844
845   demux = GST_RTP_SSRC_DEMUX (element);
846
847   switch (transition) {
848     case GST_STATE_CHANGE_NULL_TO_READY:
849     case GST_STATE_CHANGE_READY_TO_PAUSED:
850     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
851     default:
852       break;
853   }
854
855   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
856
857   switch (transition) {
858     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
859       break;
860     case GST_STATE_CHANGE_PAUSED_TO_READY:
861       gst_rtp_ssrc_demux_reset (demux);
862       break;
863     case GST_STATE_CHANGE_READY_TO_NULL:
864     default:
865       break;
866   }
867   return ret;
868 }