jitterbuffer: signal timestamp discont
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpssrcdemux.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * RTP SSRC demuxer
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:element-gstrtpssrcdemux
24  *
25  * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
26  * packets. Its main purpose is to allow an application to easily receive and
27  * decode an RTP stream with multiple SSRCs.
28  * 
29  * For each SSRC that is detected, a new pad will be created and the
30  * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. 
31  * 
32  * <refsect2>
33  * <title>Example pipelines</title>
34  * |[
35  * gst-launch-1.0 udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
36  * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
37  * to fakesink, discarding the other SSRCs.
38  * </refsect2>
39  *
40  * Last reviewed on 2007-05-28 (0.10.5)
41  */
42
43 #ifdef HAVE_CONFIG_H
44 #include "config.h"
45 #endif
46
47 #include <string.h>
48 #include <gst/rtp/gstrtpbuffer.h>
49 #include <gst/rtp/gstrtcpbuffer.h>
50
51 #include "gstrtpbin-marshal.h"
52 #include "gstrtpssrcdemux.h"
53
54 GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
55 #define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
56
57 /* generic templates */
58 static GstStaticPadTemplate rtp_ssrc_demux_sink_template =
59 GST_STATIC_PAD_TEMPLATE ("sink",
60     GST_PAD_SINK,
61     GST_PAD_ALWAYS,
62     GST_STATIC_CAPS ("application/x-rtp")
63     );
64
65 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template =
66 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
67     GST_PAD_SINK,
68     GST_PAD_ALWAYS,
69     GST_STATIC_CAPS ("application/x-rtcp")
70     );
71
72 static GstStaticPadTemplate rtp_ssrc_demux_src_template =
73 GST_STATIC_PAD_TEMPLATE ("src_%u",
74     GST_PAD_SRC,
75     GST_PAD_SOMETIMES,
76     GST_STATIC_CAPS ("application/x-rtp")
77     );
78
79 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
80 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
81     GST_PAD_SRC,
82     GST_PAD_SOMETIMES,
83     GST_STATIC_CAPS ("application/x-rtcp")
84     );
85
86 #define GST_PAD_LOCK(obj)   (g_rec_mutex_lock (&(obj)->padlock))
87 #define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
88
89 typedef enum
90 {
91   RTP_PAD,
92   RTCP_PAD
93 } PadType;
94
95 /* signals */
96 enum
97 {
98   SIGNAL_NEW_SSRC_PAD,
99   SIGNAL_REMOVED_SSRC_PAD,
100   SIGNAL_CLEAR_SSRC,
101   LAST_SIGNAL
102 };
103
104 #define gst_rtp_ssrc_demux_parent_class parent_class
105 G_DEFINE_TYPE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GST_TYPE_ELEMENT);
106
107 /* GObject vmethods */
108 static void gst_rtp_ssrc_demux_dispose (GObject * object);
109 static void gst_rtp_ssrc_demux_finalize (GObject * object);
110
111 /* GstElement vmethods */
112 static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement *
113     element, GstStateChange transition);
114
115 static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
116     guint32 ssrc);
117
118 /* sinkpad stuff */
119 static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
120     GstBuffer * buf);
121 static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
122     GstEvent * event);
123
124 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
125     GstObject * parent, GstBuffer * buf);
126 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
127     pad, GstObject * parent);
128
129 /* srcpad stuff */
130 static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
131     GstEvent * event);
132 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
133     GstObject * parent);
134 static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
135     GstQuery * query);
136
137 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
138
139 /*
140  * Item for storing GstPad <-> SSRC pairs.
141  */
142 struct _GstRtpSsrcDemuxPad
143 {
144   guint32 ssrc;
145   GstPad *rtp_pad;
146   GstCaps *caps;
147   GstPad *rtcp_pad;
148
149   gboolean pushed_initial_rtp_events;
150   gboolean pushed_initial_rtcp_events;
151 };
152
153 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
154  */
155 static GstRtpSsrcDemuxPad *
156 find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
157 {
158   GSList *walk;
159
160   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
161     GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
162
163     if (pad->ssrc == ssrc)
164       return pad;
165   }
166   return NULL;
167 }
168
169 static GstEvent *
170 add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
171 {
172   /* Set the ssrc on the output caps */
173   switch (GST_EVENT_TYPE (event)) {
174     case GST_EVENT_CAPS:
175     {
176       GstCaps *caps;
177       GstCaps *newcaps;
178       GstStructure *s;
179
180       gst_event_parse_caps (event, &caps);
181       newcaps = gst_caps_copy (caps);
182
183       s = gst_caps_get_structure (newcaps, 0);
184       gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
185       event = gst_event_new_caps (newcaps);
186       gst_caps_unref (newcaps);
187       break;
188     }
189     default:
190       gst_event_ref (event);
191       break;
192   }
193
194   return event;
195 }
196
197 struct ForwardStickyEventData
198 {
199   GstPad *pad;
200   guint32 ssrc;
201 };
202
203 static gboolean
204 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
205 {
206   struct ForwardStickyEventData *data = user_data;
207   GstEvent *newevent;
208
209   newevent = add_ssrc_and_ref (*event, data->ssrc);
210
211   gst_pad_push_event (data->pad, newevent);
212
213   return TRUE;
214 }
215
216 static void
217 forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
218     PadType padtype)
219 {
220   struct ForwardStickyEventData fdata;
221   GstPad *sinkpad;
222
223   if (padtype == RTP_PAD)
224     sinkpad = demux->rtp_sink;
225   else if (padtype == RTCP_PAD)
226     sinkpad = demux->rtcp_sink;
227   else
228     g_assert_not_reached ();
229
230   fdata.ssrc = ssrc;
231   fdata.pad = pad;
232
233   gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
234 }
235
236 static GstPad *
237 find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
238     PadType padtype)
239 {
240   GstPad *rtp_pad, *rtcp_pad;
241   GstElementClass *klass;
242   GstPadTemplate *templ;
243   gchar *padname;
244   GstRtpSsrcDemuxPad *demuxpad;
245   GstPad *retpad;
246   gulong rtp_block, rtcp_block;
247
248   GST_PAD_LOCK (demux);
249
250   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
251   if (demuxpad != NULL) {
252     gboolean forward = FALSE;
253
254     switch (padtype) {
255       case RTP_PAD:
256         retpad = gst_object_ref (demuxpad->rtp_pad);
257         if (!demuxpad->pushed_initial_rtp_events) {
258           forward = TRUE;
259           demuxpad->pushed_initial_rtp_events = TRUE;
260         }
261         break;
262       case RTCP_PAD:
263         retpad = gst_object_ref (demuxpad->rtcp_pad);
264         if (!demuxpad->pushed_initial_rtcp_events) {
265           forward = TRUE;
266           demuxpad->pushed_initial_rtcp_events = TRUE;
267         }
268         break;
269       default:
270         retpad = NULL;
271         g_assert_not_reached ();
272     }
273
274     GST_PAD_UNLOCK (demux);
275
276     if (forward)
277       forward_initial_events (demux, ssrc, retpad, padtype);
278     return retpad;
279   }
280
281   GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
282
283   klass = GST_ELEMENT_GET_CLASS (demux);
284   templ = gst_element_class_get_pad_template (klass, "src_%u");
285   padname = g_strdup_printf ("src_%u", ssrc);
286   rtp_pad = gst_pad_new_from_template (templ, padname);
287   g_free (padname);
288
289   templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
290   padname = g_strdup_printf ("rtcp_src_%u", ssrc);
291   rtcp_pad = gst_pad_new_from_template (templ, padname);
292   g_free (padname);
293
294   /* wrap in structure and add to list */
295   demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
296   demuxpad->ssrc = ssrc;
297   demuxpad->rtp_pad = rtp_pad;
298   demuxpad->rtcp_pad = rtcp_pad;
299
300   gst_pad_set_element_private (rtp_pad, demuxpad);
301   gst_pad_set_element_private (rtcp_pad, demuxpad);
302
303   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
304
305   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
306   gst_pad_set_iterate_internal_links_function (rtp_pad,
307       gst_rtp_ssrc_demux_iterate_internal_links_src);
308   gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
309   gst_pad_use_fixed_caps (rtp_pad);
310   gst_pad_set_active (rtp_pad, TRUE);
311
312   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
313   gst_pad_set_iterate_internal_links_function (rtcp_pad,
314       gst_rtp_ssrc_demux_iterate_internal_links_src);
315   gst_pad_use_fixed_caps (rtcp_pad);
316   gst_pad_set_active (rtcp_pad, TRUE);
317
318   if (padtype == RTP_PAD) {
319     demuxpad->pushed_initial_rtp_events = TRUE;
320     forward_initial_events (demux, ssrc, rtp_pad, padtype);
321   } else if (padtype == RTCP_PAD) {
322     demuxpad->pushed_initial_rtcp_events = TRUE;
323     forward_initial_events (demux, ssrc, rtcp_pad, padtype);
324   } else {
325     g_assert_not_reached ();
326   }
327
328   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
329   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
330
331   switch (padtype) {
332     case RTP_PAD:
333       retpad = gst_object_ref (demuxpad->rtp_pad);
334       break;
335     case RTCP_PAD:
336       retpad = gst_object_ref (demuxpad->rtcp_pad);
337       break;
338     default:
339       retpad = NULL;
340       g_assert_not_reached ();
341   }
342
343   gst_object_ref (rtp_pad);
344   gst_object_ref (rtcp_pad);
345
346   rtp_block = gst_pad_add_probe (rtp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
347       NULL, NULL, NULL);
348   rtcp_block = gst_pad_add_probe (rtcp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
349       NULL, NULL, NULL);
350
351   GST_PAD_UNLOCK (demux);
352
353   g_signal_emit (G_OBJECT (demux),
354       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
355
356   gst_pad_remove_probe (rtp_pad, rtp_block);
357   gst_pad_remove_probe (rtcp_pad, rtcp_block);
358
359   gst_object_unref (rtp_pad);
360   gst_object_unref (rtcp_pad);
361
362   return retpad;
363 }
364
365 static void
366 gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
367 {
368   GObjectClass *gobject_klass;
369   GstElementClass *gstelement_klass;
370   GstRtpSsrcDemuxClass *gstrtpssrcdemux_klass;
371
372   gobject_klass = (GObjectClass *) klass;
373   gstelement_klass = (GstElementClass *) klass;
374   gstrtpssrcdemux_klass = (GstRtpSsrcDemuxClass *) klass;
375
376   gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
377   gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
378
379   /**
380    * GstRtpSsrcDemux::new-ssrc-pad:
381    * @demux: the object which received the signal
382    * @ssrc: the SSRC of the pad
383    * @pad: the new pad.
384    *
385    * Emited when a new SSRC pad has been created.
386    */
387   gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
388       g_signal_new ("new-ssrc-pad",
389       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
390       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
391       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
392       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
393
394   /**
395    * GstRtpSsrcDemux::removed-ssrc-pad:
396    * @demux: the object which received the signal
397    * @ssrc: the SSRC of the pad
398    * @pad: the removed pad.
399    *
400    * Emited when a SSRC pad has been removed.
401    */
402   gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
403       g_signal_new ("removed-ssrc-pad",
404       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
405       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
406       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
407       G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
408
409   /**
410    * GstRtpSsrcDemux::clear-ssrc:
411    * @demux: the object which received the signal
412    * @ssrc: the SSRC of the pad
413    *
414    * Action signal to remove the pad for SSRC.
415    */
416   gst_rtp_ssrc_demux_signals[SIGNAL_CLEAR_SSRC] =
417       g_signal_new ("clear-ssrc",
418       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
419       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
420       NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
421
422   gstelement_klass->change_state =
423       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
424   gstrtpssrcdemux_klass->clear_ssrc =
425       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
426
427   gst_element_class_add_pad_template (gstelement_klass,
428       gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
429   gst_element_class_add_pad_template (gstelement_klass,
430       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
431   gst_element_class_add_pad_template (gstelement_klass,
432       gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
433   gst_element_class_add_pad_template (gstelement_klass,
434       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
435
436   gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
437       "Demux/Network/RTP",
438       "Splits RTP streams based on the SSRC",
439       "Wim Taymans <wim.taymans@gmail.com>");
440
441   GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
442       "rtpssrcdemux", 0, "RTP SSRC demuxer");
443 }
444
445 static void
446 gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
447 {
448   GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
449
450   demux->rtp_sink =
451       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
452           "sink"), "sink");
453   gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
454   gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
455   gst_pad_set_iterate_internal_links_function (demux->rtp_sink,
456       gst_rtp_ssrc_demux_iterate_internal_links_sink);
457   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
458
459   demux->rtcp_sink =
460       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
461           "rtcp_sink"), "rtcp_sink");
462   gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
463   gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_sink_event);
464   gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
465       gst_rtp_ssrc_demux_iterate_internal_links_sink);
466   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
467
468   g_rec_mutex_init (&demux->padlock);
469 }
470
471 static void
472 gst_rtp_ssrc_demux_reset (GstRtpSsrcDemux * demux)
473 {
474   GSList *walk;
475
476   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
477     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
478
479     gst_pad_set_active (dpad->rtp_pad, FALSE);
480     gst_pad_set_active (dpad->rtcp_pad, FALSE);
481
482     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
483     gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
484     g_free (dpad);
485   }
486   g_slist_free (demux->srcpads);
487   demux->srcpads = NULL;
488 }
489
490 static void
491 gst_rtp_ssrc_demux_dispose (GObject * object)
492 {
493   GstRtpSsrcDemux *demux;
494
495   demux = GST_RTP_SSRC_DEMUX (object);
496
497   gst_rtp_ssrc_demux_reset (demux);
498
499   G_OBJECT_CLASS (parent_class)->dispose (object);
500 }
501
502 static void
503 gst_rtp_ssrc_demux_finalize (GObject * object)
504 {
505   GstRtpSsrcDemux *demux;
506
507   demux = GST_RTP_SSRC_DEMUX (object);
508   g_rec_mutex_clear (&demux->padlock);
509
510   G_OBJECT_CLASS (parent_class)->finalize (object);
511 }
512
513 static void
514 gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
515 {
516   GstRtpSsrcDemuxPad *dpad;
517
518   GST_PAD_LOCK (demux);
519   dpad = find_demux_pad_for_ssrc (demux, ssrc);
520   if (dpad == NULL) {
521     GST_PAD_UNLOCK (demux);
522     goto unknown_pad;
523   }
524
525   GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
526
527   demux->srcpads = g_slist_remove (demux->srcpads, dpad);
528   GST_PAD_UNLOCK (demux);
529
530   gst_pad_set_active (dpad->rtp_pad, FALSE);
531   gst_pad_set_active (dpad->rtcp_pad, FALSE);
532
533   g_signal_emit (G_OBJECT (demux),
534       gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD], 0, ssrc,
535       dpad->rtp_pad);
536
537   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtp_pad);
538   gst_element_remove_pad (GST_ELEMENT_CAST (demux), dpad->rtcp_pad);
539
540   g_free (dpad);
541
542   return;
543
544   /* ERRORS */
545 unknown_pad:
546   {
547     GST_WARNING_OBJECT (demux, "unknown SSRC %08x", ssrc);
548     return;
549   }
550 }
551
552 struct ForwardEventData
553 {
554   GstRtpSsrcDemux *demux;
555   GstEvent *event;
556   gboolean res;
557   GstPad *pad;
558 };
559
560 static gboolean
561 forward_event (GstPad * pad, gpointer user_data)
562 {
563   struct ForwardEventData *fdata = user_data;
564   GSList *walk = NULL;
565   GstEvent *newevent = NULL;
566
567   GST_PAD_LOCK (fdata->demux);
568   for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
569     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
570
571     /* Only forward the event if the initial events have been through first,
572      * the initial events should be forwarded before any other event
573      * or buffer is pushed */
574     if ((pad == dpad->rtp_pad && dpad->pushed_initial_rtp_events) ||
575         (pad == dpad->rtcp_pad && dpad->pushed_initial_rtcp_events)) {
576       newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc);
577       break;
578     }
579   }
580   GST_PAD_UNLOCK (fdata->demux);
581
582   if (newevent)
583     fdata->res &= gst_pad_push_event (pad, newevent);
584
585   return TRUE;
586 }
587
588
589 static gboolean
590 gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
591     GstEvent * event)
592 {
593   GstRtpSsrcDemux *demux;
594   struct ForwardEventData fdata;
595
596   demux = GST_RTP_SSRC_DEMUX (parent);
597
598   fdata.demux = demux;
599   fdata.pad = pad;
600   fdata.event = event;
601   fdata.res = TRUE;
602
603   gst_pad_forward (pad, forward_event, &fdata);
604
605   gst_event_unref (event);
606
607   return fdata.res;
608 }
609
610 static GstFlowReturn
611 gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
612 {
613   GstFlowReturn ret;
614   GstRtpSsrcDemux *demux;
615   guint32 ssrc;
616   GstRTPBuffer rtp = { NULL };
617   GstPad *srcpad;
618   GstRtpSsrcDemuxPad *dpad;
619
620   demux = GST_RTP_SSRC_DEMUX (parent);
621
622   if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
623     goto invalid_payload;
624
625   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
626   gst_rtp_buffer_unmap (&rtp);
627
628   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
629
630   srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
631   if (srcpad == NULL)
632     goto create_failed;
633
634   /* push to srcpad */
635   ret = gst_pad_push (srcpad, buf);
636
637   if (ret != GST_FLOW_OK) {
638     /* check if the ssrc still there, may have been removed */
639     GST_PAD_LOCK (demux);
640     dpad = find_demux_pad_for_ssrc (demux, ssrc);
641     if (dpad == NULL || dpad->rtp_pad != srcpad) {
642       /* SSRC was removed during the push ... ignore the error */
643       ret = GST_FLOW_OK;
644     }
645     GST_PAD_UNLOCK (demux);
646   }
647
648   gst_object_unref (srcpad);
649
650   return ret;
651
652   /* ERRORS */
653 invalid_payload:
654   {
655     /* this is fatal and should be filtered earlier */
656     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
657         ("Dropping invalid RTP payload"));
658     gst_buffer_unref (buf);
659     return GST_FLOW_ERROR;
660   }
661 create_failed:
662   {
663     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
664         ("Could not create new pad"));
665     gst_buffer_unref (buf);
666     return GST_FLOW_ERROR;
667   }
668 }
669
670 static GstFlowReturn
671 gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
672     GstBuffer * buf)
673 {
674   GstFlowReturn ret;
675   GstRtpSsrcDemux *demux;
676   guint32 ssrc;
677   GstRTCPPacket packet;
678   GstRTCPBuffer rtcp = { NULL, };
679   GstPad *srcpad;
680   GstRtpSsrcDemuxPad *dpad;
681
682   demux = GST_RTP_SSRC_DEMUX (parent);
683
684   if (!gst_rtcp_buffer_validate (buf))
685     goto invalid_rtcp;
686
687   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
688   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
689     gst_rtcp_buffer_unmap (&rtcp);
690     goto invalid_rtcp;
691   }
692
693   /* first packet must be SR or RR or else the validate would have failed */
694   switch (gst_rtcp_packet_get_type (&packet)) {
695     case GST_RTCP_TYPE_SR:
696       /* get the ssrc so that we can route it to the right source pad */
697       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
698           NULL);
699       break;
700     default:
701       goto unexpected_rtcp;
702   }
703   gst_rtcp_buffer_unmap (&rtcp);
704
705   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
706
707   srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
708   if (srcpad == NULL)
709     goto create_failed;
710
711   /* push to srcpad */
712   ret = gst_pad_push (srcpad, buf);
713
714   if (ret != GST_FLOW_OK) {
715     /* check if the ssrc still there, may have been removed */
716     GST_PAD_LOCK (demux);
717     dpad = find_demux_pad_for_ssrc (demux, ssrc);
718     if (dpad == NULL || dpad->rtcp_pad != srcpad) {
719       /* SSRC was removed during the push ... ignore the error */
720       ret = GST_FLOW_OK;
721     }
722     GST_PAD_UNLOCK (demux);
723   }
724
725   gst_object_unref (srcpad);
726
727   return ret;
728
729   /* ERRORS */
730 invalid_rtcp:
731   {
732     /* this is fatal and should be filtered earlier */
733     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
734         ("Dropping invalid RTCP packet"));
735     gst_buffer_unref (buf);
736     return GST_FLOW_ERROR;
737   }
738 unexpected_rtcp:
739   {
740     GST_DEBUG_OBJECT (demux, "dropping unexpected RTCP packet");
741     gst_buffer_unref (buf);
742     return GST_FLOW_OK;
743   }
744 create_failed:
745   {
746     GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
747         ("Could not create new pad"));
748     gst_buffer_unref (buf);
749     return GST_FLOW_ERROR;
750   }
751 }
752
753 static GstRtpSsrcDemuxPad *
754 find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
755 {
756   GSList *walk;
757
758   for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
759     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
760     if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
761       return dpad;
762     }
763   }
764
765   return NULL;
766 }
767
768
769 static gboolean
770 gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
771     GstEvent * event)
772 {
773   GstRtpSsrcDemux *demux;
774   const GstStructure *s;
775
776   demux = GST_RTP_SSRC_DEMUX (parent);
777
778   switch (GST_EVENT_TYPE (event)) {
779     case GST_EVENT_CUSTOM_UPSTREAM:
780     case GST_EVENT_CUSTOM_BOTH:
781     case GST_EVENT_CUSTOM_BOTH_OOB:
782       s = gst_event_get_structure (event);
783       if (s && !gst_structure_has_field (s, "ssrc")) {
784         GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
785
786         if (dpad) {
787           GstStructure *ws;
788
789           event = gst_event_make_writable (event);
790           ws = gst_event_writable_structure (event);
791           gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
792         }
793       }
794       break;
795     default:
796       break;
797   }
798
799   return gst_pad_event_default (pad, parent, event);
800 }
801
802 static GstIterator *
803 gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
804 {
805   GstRtpSsrcDemux *demux;
806   GstPad *otherpad = NULL;
807   GstIterator *it = NULL;
808   GSList *current;
809
810   demux = GST_RTP_SSRC_DEMUX (parent);
811
812   GST_PAD_LOCK (demux);
813   for (current = demux->srcpads; current; current = g_slist_next (current)) {
814     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
815
816     if (pad == dpad->rtp_pad) {
817       otherpad = demux->rtp_sink;
818       break;
819     } else if (pad == dpad->rtcp_pad) {
820       otherpad = demux->rtcp_sink;
821       break;
822     }
823   }
824   if (otherpad) {
825     GValue val = { 0, };
826
827     g_value_init (&val, GST_TYPE_PAD);
828     g_value_set_object (&val, otherpad);
829     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
830     g_value_unset (&val);
831
832   }
833   GST_PAD_UNLOCK (demux);
834
835   return it;
836 }
837
838 /* Should return 0 for elements to be included */
839 static gint
840 src_pad_compare_func (gconstpointer a, gconstpointer b)
841 {
842   GstPad *pad = GST_PAD (g_value_get_object (a));
843   const gchar *prefix = g_value_get_string (b);
844   gint res;
845
846   /* 0 means equal means we accept the pad, accepted if there is a name
847    * and it starts with the prefix */
848   GST_OBJECT_LOCK (pad);
849   res = !GST_PAD_NAME (pad) || !g_str_has_prefix (GST_PAD_NAME (pad), prefix);
850   GST_OBJECT_UNLOCK (pad);
851
852   return res;
853 }
854
855 static GstIterator *
856 gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
857     GstObject * parent)
858 {
859   GstRtpSsrcDemux *demux;
860   GstIterator *it = NULL;
861   GValue gval = { 0, };
862
863   demux = GST_RTP_SSRC_DEMUX (parent);
864
865   g_value_init (&gval, G_TYPE_STRING);
866   if (pad == demux->rtp_sink)
867     g_value_set_static_string (&gval, "src_");
868   else if (pad == demux->rtcp_sink)
869     g_value_set_static_string (&gval, "rtcp_src_");
870   else
871     g_assert_not_reached ();
872
873   it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
874   it = gst_iterator_filter (it, src_pad_compare_func, &gval);
875
876   return it;
877 }
878
879
880 static gboolean
881 gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
882     GstQuery * query)
883 {
884   GstRtpSsrcDemux *demux;
885   gboolean res = FALSE;
886
887   demux = GST_RTP_SSRC_DEMUX (parent);
888
889   switch (GST_QUERY_TYPE (query)) {
890     case GST_QUERY_LATENCY:
891     {
892
893       if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
894         gboolean live;
895         GstClockTime min_latency, max_latency;
896         GstRtpSsrcDemuxPad *demuxpad;
897
898         demuxpad = gst_pad_get_element_private (pad);
899
900         gst_query_parse_latency (query, &live, &min_latency, &max_latency);
901
902         GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
903             GST_TIME_ARGS (min_latency));
904
905         GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc);
906
907         gst_query_set_latency (query, live, min_latency, max_latency);
908       }
909       break;
910     }
911     default:
912       res = gst_pad_query_default (pad, parent, query);
913       break;
914   }
915
916   return res;
917 }
918
919 static GstStateChangeReturn
920 gst_rtp_ssrc_demux_change_state (GstElement * element,
921     GstStateChange transition)
922 {
923   GstStateChangeReturn ret;
924   GstRtpSsrcDemux *demux;
925
926   demux = GST_RTP_SSRC_DEMUX (element);
927
928   switch (transition) {
929     case GST_STATE_CHANGE_NULL_TO_READY:
930     case GST_STATE_CHANGE_READY_TO_PAUSED:
931     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
932     default:
933       break;
934   }
935
936   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
937
938   switch (transition) {
939     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
940       break;
941     case GST_STATE_CHANGE_PAUSED_TO_READY:
942       gst_rtp_ssrc_demux_reset (demux);
943       break;
944     case GST_STATE_CHANGE_READY_TO_NULL:
945     default:
946       break;
947   }
948   return ret;
949 }