Merge branch 'dtmf-moved-from-bad'
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * RTP SSRC demuxer
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:element-gstrtpssrcdemux
24  *
25  * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
26  * packets. Its main purpose is to allow an application to easily receive and
27  * decode an RTP stream with multiple SSRCs.
28  * 
29  * For each SSRC that is detected, a new pad will be created and the
30  * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. 
31  * 
32  * <refsect2>
33  * <title>Example pipelines</title>
34  * |[
35  * gst-launch-1.0 udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
36  * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
37  * to fakesink, discarding the other SSRCs.
38  * </refsect2>
39  *
40  * Last reviewed on 2007-05-28 (0.10.5)
41  */
42
43 #ifdef HAVE_CONFIG_H
44 #include "config.h"
45 #endif
46
47 #include <string.h>
48 #include <gst/rtp/gstrtpbuffer.h>
49 #include <gst/rtp/gstrtcpbuffer.h>
50
51 #include "gstrtpbin-marshal.h"
52 #include "gstrtpssrcdemux.h"
53
54 GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
55 #define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
56
57 /* generic templates */
58 static GstStaticPadTemplate rtp_ssrc_demux_sink_template =
59 GST_STATIC_PAD_TEMPLATE ("sink",
60     GST_PAD_SINK,
61     GST_PAD_ALWAYS,
62     GST_STATIC_CAPS ("application/x-rtp")
63     );
64
65 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template =
66 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
67     GST_PAD_SINK,
68     GST_PAD_ALWAYS,
69     GST_STATIC_CAPS ("application/x-rtcp")
70     );
71
72 static GstStaticPadTemplate rtp_ssrc_demux_src_template =
73 GST_STATIC_PAD_TEMPLATE ("src_%u",
74     GST_PAD_SRC,
75     GST_PAD_SOMETIMES,
76     GST_STATIC_CAPS ("application/x-rtp")
77     );
78
79 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
80 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
81     GST_PAD_SRC,
82     GST_PAD_SOMETIMES,
83     GST_STATIC_CAPS ("application/x-rtcp")
84     );
85
86 #define GST_PAD_LOCK(obj)   (g_rec_mutex_lock (&(obj)->padlock))
87 #define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
88
89 typedef enum
90 {
91   RTP_PAD,
92   RTCP_PAD
93 } PadType;
94
95 /* signals */
96 enum
97 {
98   SIGNAL_NEW_SSRC_PAD,
99   SIGNAL_REMOVED_SSRC_PAD,
100   SIGNAL_CLEAR_SSRC,
101   LAST_SIGNAL
102 };
103
104 #define gst_rtp_ssrc_demux_parent_class parent_class
105 G_DEFINE_TYPE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GST_TYPE_ELEMENT);
106
107 /* GObject vmethods */
108 static void gst_rtp_ssrc_demux_dispose (GObject * object);
109 static void gst_rtp_ssrc_demux_finalize (GObject * object);
110
111 /* GstElement vmethods */
112 static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement *
113     element, GstStateChange transition);
114
115 static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
116     guint32 ssrc);
117
118 /* sinkpad stuff */
119 static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
120     GstBuffer * buf);
121 static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
122     GstEvent * event);
123
124 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
125     GstObject * parent, GstBuffer * buf);
126 static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
127     GstObject * parent, GstEvent * event);
128 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
129     pad, GstObject * parent);
130
131 /* srcpad stuff */
132 static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
133     GstEvent * event);
134 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
135     GstObject * parent);
136 static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
137     GstQuery * query);
138
139 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
140
141 /*
142  * Item for storing GstPad <-> SSRC pairs.
143  */
144 struct _GstRtpSsrcDemuxPad
145 {
146   guint32 ssrc;
147   GstPad *rtp_pad;
148   GstCaps *caps;
149   GstPad *rtcp_pad;
150 };
151
152 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
153  */
154 static GstRtpSsrcDemuxPad *
155 find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
156 {
157   GSList *walk;
158
159   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
160     GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
161
162     if (pad->ssrc == ssrc)
163       return pad;
164   }
165   return NULL;
166 }
167
168 static GstEvent *
169 add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
170 {
171   /* Set the ssrc on the output caps */
172   switch (GST_EVENT_TYPE (event)) {
173     case GST_EVENT_CAPS:
174     {
175       GstCaps *caps;
176       GstCaps *newcaps;
177       GstStructure *s;
178
179       gst_event_parse_caps (event, &caps);
180       newcaps = gst_caps_copy (caps);
181
182       s = gst_caps_get_structure (newcaps, 0);
183       gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
184       event = gst_event_new_caps (newcaps);
185       gst_caps_unref (newcaps);
186       break;
187     }
188     default:
189       gst_event_ref (event);
190       break;
191   }
192
193   return event;
194 }
195
196 struct ForwardEventData
197 {
198   GstPad *pad;
199   guint32 ssrc;
200 };
201
202 static gboolean
203 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
204 {
205   struct ForwardEventData *data = user_data;
206   GstEvent *newevent;
207
208   newevent = add_ssrc_and_ref (*event, data->ssrc);
209
210   gst_pad_push_event (data->pad, newevent);
211
212   return TRUE;
213 }
214
215
216 static GstPad *
217 find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
218     PadType padtype)
219 {
220   GstPad *rtp_pad, *rtcp_pad;
221   GstElementClass *klass;
222   GstPadTemplate *templ;
223   gchar *padname;
224   GstRtpSsrcDemuxPad *demuxpad;
225   GstCaps *caps;
226   struct ForwardEventData fdata;
227   GstPad *retpad;
228   gulong rtp_block, rtcp_block;
229
230   GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
231
232   GST_PAD_LOCK (demux);
233
234   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
235   if (demuxpad != NULL) {
236     switch (padtype) {
237       case RTP_PAD:
238         retpad = gst_object_ref (demuxpad->rtp_pad);
239         break;
240       case RTCP_PAD:
241         retpad = gst_object_ref (demuxpad->rtcp_pad);
242         break;
243       default:
244         retpad = NULL;
245         g_assert_not_reached ();
246     }
247     GST_PAD_UNLOCK (demux);
248     return retpad;
249   }
250
251   klass = GST_ELEMENT_GET_CLASS (demux);
252   templ = gst_element_class_get_pad_template (klass, "src_%u");
253   padname = g_strdup_printf ("src_%u", ssrc);
254   rtp_pad = gst_pad_new_from_template (templ, padname);
255   g_free (padname);
256
257   templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
258   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
259   rtcp_pad = gst_pad_new_from_template (templ, padname);
260   g_free (padname);
261
262   /* wrap in structure and add to list */
263   demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
264   demuxpad->ssrc = ssrc;
265   demuxpad->rtp_pad = rtp_pad;
266   demuxpad->rtcp_pad = rtcp_pad;
267
268   fdata.ssrc = ssrc;
269
270   gst_pad_set_element_private (rtp_pad, demuxpad);
271   gst_pad_set_element_private (rtcp_pad, demuxpad);
272
273   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
274
275   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
276   gst_pad_set_iterate_internal_links_function (rtp_pad,
277       gst_rtp_ssrc_demux_iterate_internal_links_src);
278   gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
279   gst_pad_use_fixed_caps (rtp_pad);
280   gst_pad_set_active (rtp_pad, TRUE);
281   fdata.pad = rtp_pad;
282   gst_pad_sticky_events_foreach (demux->rtp_sink, forward_sticky_events,
283       &fdata);
284
285   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
286   gst_pad_set_iterate_internal_links_function (rtcp_pad,
287       gst_rtp_ssrc_demux_iterate_internal_links_src);
288   gst_pad_use_fixed_caps (rtcp_pad);
289   gst_pad_set_active (rtcp_pad, TRUE);
290   fdata.pad = rtcp_pad;
291   gst_pad_sticky_events_foreach (demux->rtcp_sink, forward_sticky_events,
292       &fdata);
293
294   /* copy caps from input */
295   if ((caps = gst_pad_get_current_caps (demux->rtp_sink))) {
296     gst_pad_set_caps (rtp_pad, caps);
297     gst_caps_unref (caps);
298   }
299   if ((caps = gst_pad_get_current_caps (demux->rtcp_sink))) {
300     gst_pad_set_caps (rtcp_pad, caps);
301     gst_caps_unref (caps);
302   }
303   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
304   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
305
306   switch (padtype) {
307     case RTP_PAD:
308       retpad = gst_object_ref (demuxpad->rtp_pad);
309       break;
310     case RTCP_PAD:
311       retpad = gst_object_ref (demuxpad->rtcp_pad);
312       break;
313     default:
314       retpad = NULL;
315       g_assert_not_reached ();
316   }
317
318   gst_object_ref (rtp_pad);
319   gst_object_ref (rtcp_pad);
320
321   rtp_block = gst_pad_add_probe (rtp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
322       NULL, NULL, NULL);
323   rtcp_block = gst_pad_add_probe (rtcp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
324       NULL, NULL, NULL);
325
326   GST_PAD_UNLOCK (demux);
327
328   g_signal_emit (G_OBJECT (demux),
329       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
330
331   gst_pad_remove_probe (rtp_pad, rtp_block);
332   gst_pad_remove_probe (rtcp_pad, rtcp_block);
333
334   gst_object_unref (rtp_pad);
335   gst_object_unref (rtcp_pad);
336
337   return retpad;
338 }
339
340 static void
341 gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
342 {
343   GObjectClass *gobject_klass;
344   GstElementClass *gstelement_klass;
345   GstRtpSsrcDemuxClass *gstrtpssrcdemux_klass;
346
347   gobject_klass = (GObjectClass *) klass;
348   gstelement_klass = (GstElementClass *) klass;
349   gstrtpssrcdemux_klass = (GstRtpSsrcDemuxClass *) klass;
350
351   gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
352   gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
353
354   /**
355    * GstRtpSsrcDemux::new-ssrc-pad:
356    * @demux: the object which received the signal
357    * @ssrc: the SSRC of the pad
358    * @pad: the new pad.
359    *
360    * Emited when a new SSRC pad has been created.
361    */
362   gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
363       g_signal_new ("new-ssrc-pad",
364       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
365       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
366       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
367       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
368
369   /**
370    * GstRtpSsrcDemux::removed-ssrc-pad:
371    * @demux: the object which received the signal
372    * @ssrc: the SSRC of the pad
373    * @pad: the removed pad.
374    *
375    * Emited when a SSRC pad has been removed.
376    */
377   gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
378       g_signal_new ("removed-ssrc-pad",
379       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
380       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
381       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
382       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
383
384   /**
385    * GstRtpSsrcDemux::clear-ssrc:
386    * @demux: the object which received the signal
387    * @ssrc: the SSRC of the pad
388    *
389    * Action signal to remove the pad for SSRC.
390    */
391   gst_rtp_ssrc_demux_signals[SIGNAL_CLEAR_SSRC] =
392       g_signal_new ("clear-ssrc",
393       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
394       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
395       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
396
397   gstelement_klass->change_state =
398       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
399   gstrtpssrcdemux_klass->clear_ssrc =
400       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
401
402   gst_element_class_add_pad_template (gstelement_klass,
403       gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
404   gst_element_class_add_pad_template (gstelement_klass,
405       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
406   gst_element_class_add_pad_template (gstelement_klass,
407       gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
408   gst_element_class_add_pad_template (gstelement_klass,
409       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
410
411   gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
412       "Demux/Network/RTP",
413       "Splits RTP streams based on the SSRC",
414       "Wim Taymans <wim.taymans@gmail.com>");
415
416   GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
417       "rtpssrcdemux", 0, "RTP SSRC demuxer");
418 }
419
420 static void
421 gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
422 {
423   GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
424
425   demux->rtp_sink =
426       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
427           "sink"), "sink");
428   gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
429   gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
430   gst_pad_set_iterate_internal_links_function (demux->rtp_sink,
431       gst_rtp_ssrc_demux_iterate_internal_links_sink);
432   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
433
434   demux->rtcp_sink =
435       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
436           "rtcp_sink"), "rtcp_sink");
437   gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
438   gst_pad_set_event_function (demux->rtcp_sink,
439       gst_rtp_ssrc_demux_rtcp_sink_event);
440   gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
441       gst_rtp_ssrc_demux_iterate_internal_links_sink);
442   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
443
444   g_rec_mutex_init (&demux->padlock);
445
446   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
447 }
448
449 static void
450 gst_rtp_ssrc_demux_reset (GstRtpSsrcDemux * demux)
451 {
452   GSList *walk;
453
454   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
455     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
456
457     gst_pad_set_active (dpad->rtp_pad, FALSE);
458     gst_pad_set_active (dpad->rtcp_pad, FALSE);
459
460     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
461     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
462     g_free (dpad);
463   }
464   g_slist_free (demux->srcpads);
465   demux->srcpads = NULL;
466 }
467
468 static void
469 gst_rtp_ssrc_demux_dispose (GObject * object)
470 {
471   GstRtpSsrcDemux *demux;
472
473   demux = GST_RTP_SSRC_DEMUX (object);
474
475   gst_rtp_ssrc_demux_reset (demux);
476
477   G_OBJECT_CLASS (parent_class)->dispose (object);
478 }
479
480 static void
481 gst_rtp_ssrc_demux_finalize (GObject * object)
482 {
483   GstRtpSsrcDemux *demux;
484
485   demux = GST_RTP_SSRC_DEMUX (object);
486   g_rec_mutex_clear (&demux->padlock);
487
488   G_OBJECT_CLASS (parent_class)->finalize (object);
489 }
490
491 static void
492 gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
493 {
494   GstRtpSsrcDemuxPad *dpad;
495
496   GST_PAD_LOCK (demux);
497   dpad = find_demux_pad_for_ssrc (demux, ssrc);
498   if (dpad == NULL) {
499     GST_PAD_UNLOCK (demux);
500     goto unknown_pad;
501   }
502
503   GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
504
505   demux->srcpads = g_slist_remove (demux->srcpads, dpad);
506   GST_PAD_UNLOCK (demux);
507
508   gst_pad_set_active (dpad->rtp_pad, FALSE);
509   gst_pad_set_active (dpad->rtcp_pad, FALSE);
510
511   g_signal_emit (G_OBJECT (demux),
512       gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD], 0, ssrc,
513       dpad->rtp_pad);
514
515   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
516   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
517
518   g_free (dpad);
519
520   return;
521
522   /* ERRORS */
523 unknown_pad:
524   {
525     GST_WARNING_OBJECT (demux, "unknown SSRC %08x", ssrc);
526     return;
527   }
528 }
529
530 static gboolean
531 gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
532     GstEvent * event)
533 {
534   GstRtpSsrcDemux *demux;
535   gboolean res = FALSE;
536
537   demux = GST_RTP_SSRC_DEMUX (parent);
538
539   switch (GST_EVENT_TYPE (event)) {
540     case GST_EVENT_FLUSH_STOP:
541       gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
542       /* fallthrough */
543     default:
544     {
545       GSList *walk;
546       GSList *pads = NULL;
547
548       res = TRUE;
549       /* need local snapshot of pads;
550        * should not push downstream while holding lock as that might deadlock
551        * with stuff traveling upstream tyring to get this lock while holding
552        * other (stream)lock */
553       GST_PAD_LOCK (demux);
554       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
555         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
556
557         pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
558         gst_object_ref (pad->rtp_pad);
559
560         pads = g_slist_prepend (pads, pad);
561       }
562       GST_PAD_UNLOCK (demux);
563
564       for (walk = pads; walk; walk = g_slist_next (walk)) {
565         GstRtpSsrcDemuxPad *dpad = walk->data;
566         GstEvent *newevent;
567
568         newevent = add_ssrc_and_ref (event, dpad->ssrc);
569
570         res &= gst_pad_push_event (dpad->rtp_pad, newevent);
571         gst_object_unref (dpad->rtp_pad);
572         g_slice_free (GstRtpSsrcDemuxPad, dpad);
573       }
574       g_slist_free (pads);
575       gst_event_unref (event);
576       break;
577     }
578   }
579
580   return res;
581 }
582
583 static gboolean
584 gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstObject * parent,
585     GstEvent * event)
586 {
587   GstRtpSsrcDemux *demux;
588   gboolean res = TRUE;
589   GSList *walk;
590   GSList *pads = NULL;
591
592   demux = GST_RTP_SSRC_DEMUX (parent);
593
594   GST_PAD_LOCK (demux);
595   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
596     GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
597
598     pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
599     gst_object_ref (pad->rtcp_pad);
600
601     pads = g_slist_prepend (pads, pad);
602   }
603   GST_PAD_UNLOCK (demux);
604
605   for (walk = pads; walk; walk = g_slist_next (walk)) {
606     GstRtpSsrcDemuxPad *dpad = walk->data;
607     GstEvent *newevent;
608
609     newevent = add_ssrc_and_ref (event, dpad->ssrc);
610
611     res &= gst_pad_push_event (dpad->rtcp_pad, newevent);
612     gst_object_unref (dpad->rtcp_pad);
613     g_slice_free (GstRtpSsrcDemuxPad, dpad);
614   }
615   g_slist_free (pads);
616   gst_event_unref (event);
617
618   return res;
619 }
620
621 static GstFlowReturn
622 gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
623 {
624   GstFlowReturn ret;
625   GstRtpSsrcDemux *demux;
626   guint32 ssrc;
627   GstRTPBuffer rtp = { NULL };
628   GstPad *srcpad;
629   GstRtpSsrcDemuxPad *dpad;
630
631   demux = GST_RTP_SSRC_DEMUX (parent);
632
633   if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
634     goto invalid_payload;
635
636   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
637   gst_rtp_buffer_unmap (&rtp);
638
639   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
640
641   srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
642   if (srcpad == NULL)
643     goto create_failed;
644
645   /* push to srcpad */
646   ret = gst_pad_push (srcpad, buf);
647
648   if (ret != GST_FLOW_OK) {
649     /* check if the ssrc still there, may have been removed */
650     GST_PAD_LOCK (demux);
651     dpad = find_demux_pad_for_ssrc (demux, ssrc);
652     if (dpad == NULL || dpad->rtp_pad != srcpad) {
653       /* SSRC was removed during the push ... ignore the error */
654       ret = GST_FLOW_OK;
655     }
656     GST_PAD_UNLOCK (demux);
657   }
658
659   gst_object_unref (srcpad);
660
661   return ret;
662
663   /* ERRORS */
664 invalid_payload:
665   {
666     /* this is fatal and should be filtered earlier */
667     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
668         ("Dropping invalid RTP payload"));
669     gst_buffer_unref (buf);
670     return GST_FLOW_ERROR;
671   }
672 create_failed:
673   {
674     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
675         ("Could not create new pad"));
676     gst_buffer_unref (buf);
677     return GST_FLOW_ERROR;
678   }
679 }
680
681 static GstFlowReturn
682 gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
683     GstBuffer * buf)
684 {
685   GstFlowReturn ret;
686   GstRtpSsrcDemux *demux;
687   guint32 ssrc;
688   GstRTCPPacket packet;
689   GstRTCPBuffer rtcp = { NULL, };
690   GstPad *srcpad;
691   GstRtpSsrcDemuxPad *dpad;
692
693   demux = GST_RTP_SSRC_DEMUX (parent);
694
695   if (!gst_rtcp_buffer_validate (buf))
696     goto invalid_rtcp;
697
698   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
699   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
700     gst_rtcp_buffer_unmap (&rtcp);
701     goto invalid_rtcp;
702   }
703
704   /* first packet must be SR or RR or else the validate would have failed */
705   switch (gst_rtcp_packet_get_type (&packet)) {
706     case GST_RTCP_TYPE_SR:
707       /* get the ssrc so that we can route it to the right source pad */
708       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
709           NULL);
710       break;
711     default:
712       goto unexpected_rtcp;
713   }
714   gst_rtcp_buffer_unmap (&rtcp);
715
716   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
717
718   srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
719   if (srcpad == NULL)
720     goto create_failed;
721
722   /* push to srcpad */
723   ret = gst_pad_push (srcpad, buf);
724
725   if (ret != GST_FLOW_OK) {
726     /* check if the ssrc still there, may have been removed */
727     GST_PAD_LOCK (demux);
728     dpad = find_demux_pad_for_ssrc (demux, ssrc);
729     if (dpad == NULL || dpad->rtcp_pad != srcpad) {
730       /* SSRC was removed during the push ... ignore the error */
731       ret = GST_FLOW_OK;
732     }
733     GST_PAD_UNLOCK (demux);
734   }
735
736   gst_object_unref (srcpad);
737
738   return ret;
739
740   /* ERRORS */
741 invalid_rtcp:
742   {
743     /* this is fatal and should be filtered earlier */
744     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
745         ("Dropping invalid RTCP packet"));
746     gst_buffer_unref (buf);
747     return GST_FLOW_ERROR;
748   }
749 unexpected_rtcp:
750   {
751     GST_DEBUG_OBJECT (demux, "dropping unexpected RTCP packet");
752     gst_buffer_unref (buf);
753     return GST_FLOW_OK;
754   }
755 create_failed:
756   {
757     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
758         ("Could not create new pad"));
759     gst_buffer_unref (buf);
760     return GST_FLOW_ERROR;
761   }
762 }
763
764 static GstRtpSsrcDemuxPad *
765 find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
766 {
767   GSList *walk;
768
769   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
770     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
771     if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
772       return dpad;
773     }
774   }
775
776   return NULL;
777 }
778
779
780 static gboolean
781 gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
782     GstEvent * event)
783 {
784   GstRtpSsrcDemux *demux;
785   const GstStructure *s;
786
787   demux = GST_RTP_SSRC_DEMUX (parent);
788
789   switch (GST_EVENT_TYPE (event)) {
790     case GST_EVENT_CUSTOM_UPSTREAM:
791     case GST_EVENT_CUSTOM_BOTH:
792     case GST_EVENT_CUSTOM_BOTH_OOB:
793       s = gst_event_get_structure (event);
794       if (s && !gst_structure_has_field (s, "ssrc")) {
795         GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
796
797         if (dpad) {
798           GstStructure *ws;
799
800           event = gst_event_make_writable (event);
801           ws = gst_event_writable_structure (event);
802           gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
803         }
804       }
805       break;
806     default:
807       break;
808   }
809
810   return gst_pad_event_default (pad, parent, event);
811 }
812
813 static GstIterator *
814 gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
815 {
816   GstRtpSsrcDemux *demux;
817   GstPad *otherpad = NULL;
818   GstIterator *it = NULL;
819   GSList *current;
820
821   demux = GST_RTP_SSRC_DEMUX (parent);
822
823   GST_PAD_LOCK (demux);
824   for (current = demux->srcpads; current; current = g_slist_next (current)) {
825     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
826
827     if (pad == dpad->rtp_pad) {
828       otherpad = demux->rtp_sink;
829       break;
830     } else if (pad == dpad->rtcp_pad) {
831       otherpad = demux->rtcp_sink;
832       break;
833     }
834   }
835   if (otherpad) {
836     GValue val = { 0, };
837
838     g_value_init (&val, GST_TYPE_PAD);
839     g_value_set_object (&val, otherpad);
840     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
841     g_value_unset (&val);
842
843   }
844   GST_PAD_UNLOCK (demux);
845
846   return it;
847 }
848
849 /* Should return 0 for elements to be included */
850 static gint
851 src_pad_compare_func (gconstpointer a, gconstpointer b)
852 {
853   GstPad *pad = GST_PAD (g_value_get_object (a));
854   const gchar *prefix = g_value_get_string (b);
855   gint res = 1;
856
857   GST_OBJECT_LOCK (pad);
858   res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix);
859   GST_OBJECT_UNLOCK (pad);
860
861   return res;
862 }
863
864 static GstIterator *
865 gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
866     GstObject * parent)
867 {
868   GstRtpSsrcDemux *demux;
869   GstIterator *it = NULL;
870   GValue gval = { 0, };
871
872   demux = GST_RTP_SSRC_DEMUX (parent);
873
874   g_value_init (&gval, G_TYPE_STRING);
875   if (pad == demux->rtp_sink)
876     g_value_set_static_string (&gval, "src_");
877   else if (pad == demux->rtcp_sink)
878     g_value_set_static_string (&gval, "rtcp_src_");
879   else
880     g_assert_not_reached ();
881
882   it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
883   it = gst_iterator_filter (it, src_pad_compare_func, &gval);
884
885   return it;
886 }
887
888
889 static gboolean
890 gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
891     GstQuery * query)
892 {
893   GstRtpSsrcDemux *demux;
894   gboolean res = FALSE;
895
896   demux = GST_RTP_SSRC_DEMUX (parent);
897
898   switch (GST_QUERY_TYPE (query)) {
899     case GST_QUERY_LATENCY:
900     {
901
902       if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
903         gboolean live;
904         GstClockTime min_latency, max_latency;
905         GstRtpSsrcDemuxPad *demuxpad;
906
907         demuxpad = gst_pad_get_element_private (pad);
908
909         gst_query_parse_latency (query, &live, &min_latency, &max_latency);
910
911         GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
912             GST_TIME_ARGS (min_latency));
913
914         GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc);
915
916         gst_query_set_latency (query, live, min_latency, max_latency);
917       }
918       break;
919     }
920     default:
921       res = gst_pad_query_default (pad, parent, query);
922       break;
923   }
924
925   return res;
926 }
927
928 static GstStateChangeReturn
929 gst_rtp_ssrc_demux_change_state (GstElement * element,
930     GstStateChange transition)
931 {
932   GstStateChangeReturn ret;
933   GstRtpSsrcDemux *demux;
934
935   demux = GST_RTP_SSRC_DEMUX (element);
936
937   switch (transition) {
938     case GST_STATE_CHANGE_NULL_TO_READY:
939     case GST_STATE_CHANGE_READY_TO_PAUSED:
940     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
941     default:
942       break;
943   }
944
945   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
946
947   switch (transition) {
948     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
949       break;
950     case GST_STATE_CHANGE_PAUSED_TO_READY:
951       gst_rtp_ssrc_demux_reset (demux);
952       break;
953     case GST_STATE_CHANGE_READY_TO_NULL:
954     default:
955       break;
956   }
957   return ret;
958 }