rtpmanager: Update codes based on 1.18.4
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
index 7f9918f..a1fee7b 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
- * SECTION:element-gstrtpssrcdemux
+ * SECTION:element-rtpssrcdemux
+ * @title: rtpssrcdemux
  *
- * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
+ * rtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
  * packets. Its main purpose is to allow an application to easily receive and
  * decode an RTP stream with multiple SSRCs.
- * 
+ *
  * For each SSRC that is detected, a new pad will be created and the
- * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. 
- * 
- * <refsect2>
- * <title>Example pipelines</title>
+ * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted.
+ *
+ * ## Example pipelines
  * |[
- * gst-launch udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
+ * gst-launch-1.0 udpsrc caps="application/x-rtp" ! rtpssrcdemux ! fakesink
  * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
  * to fakesink, discarding the other SSRCs.
- * </refsect2>
  *
- * Last reviewed on 2007-05-28 (0.10.5)
  */
 
 #ifdef HAVE_CONFIG_H
@@ -48,7 +46,6 @@
 #include <gst/rtp/gstrtpbuffer.h>
 #include <gst/rtp/gstrtcpbuffer.h>
 
-#include "gstrtpbin-marshal.h"
 #include "gstrtpssrcdemux.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
@@ -83,8 +80,21 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
     GST_STATIC_CAPS ("application/x-rtcp")
     );
 
-#define GST_PAD_LOCK(obj)   (g_static_rec_mutex_lock (&(obj)->padlock))
-#define GST_PAD_UNLOCK(obj) (g_static_rec_mutex_unlock (&(obj)->padlock))
+#define INTERNAL_STREAM_LOCK(obj)   (g_rec_mutex_lock (&(obj)->padlock))
+#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
+
+typedef enum
+{
+  RTP_PAD,
+  RTCP_PAD
+} PadType;
+
+#define DEFAULT_MAX_STREAMS G_MAXUINT
+enum
+{
+  PROP_0,
+  PROP_MAX_STREAMS
+};
 
 /* signals */
 enum
@@ -117,8 +127,6 @@ static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
 
 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
     GstObject * parent, GstBuffer * buf);
-static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
-    GstObject * parent, GstEvent * event);
 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
     pad, GstObject * parent);
 
@@ -144,6 +152,7 @@ struct _GstRtpSsrcDemuxPad
 };
 
 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
+ * MUST be called with object lock
  */
 static GstRtpSsrcDemuxPad *
 find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
@@ -159,24 +168,137 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   return NULL;
 }
 
-/* with PAD_LOCK */
-static GstRtpSsrcDemuxPad *
-find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+/* returns a reference to the pad if found, %NULL otherwise */
+static GstPad *
+get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype)
+{
+  GstRtpSsrcDemuxPad *demuxpad;
+  GstPad *retpad;
+
+  GST_OBJECT_LOCK (demux);
+
+  demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
+  if (!demuxpad) {
+    GST_OBJECT_UNLOCK (demux);
+    return NULL;
+  }
+
+  switch (padtype) {
+    case RTP_PAD:
+      retpad = gst_object_ref (demuxpad->rtp_pad);
+      break;
+    case RTCP_PAD:
+      retpad = gst_object_ref (demuxpad->rtcp_pad);
+      break;
+    default:
+      retpad = NULL;
+      g_assert_not_reached ();
+  }
+
+  GST_OBJECT_UNLOCK (demux);
+
+  return retpad;
+}
+
+static GstEvent *
+add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
+{
+  /* Set the ssrc on the output caps */
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CAPS:
+    {
+      GstCaps *caps;
+      GstCaps *newcaps;
+      GstStructure *s;
+
+      gst_event_parse_caps (event, &caps);
+      newcaps = gst_caps_copy (caps);
+
+      s = gst_caps_get_structure (newcaps, 0);
+      gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
+      event = gst_event_new_caps (newcaps);
+      gst_caps_unref (newcaps);
+      break;
+    }
+    default:
+      gst_event_ref (event);
+      break;
+  }
+
+  return event;
+}
+
+struct ForwardStickyEventData
+{
+  GstPad *pad;
+  guint32 ssrc;
+};
+
+/* With internal stream lock held */
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+  struct ForwardStickyEventData *data = user_data;
+  GstEvent *newevent;
+
+  newevent = add_ssrc_and_ref (*event, data->ssrc);
+
+  gst_pad_push_event (data->pad, newevent);
+
+  return TRUE;
+}
+
+/* With internal stream lock held */
+static void
+forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
+    PadType padtype)
+{
+  struct ForwardStickyEventData fdata;
+  GstPad *sinkpad = NULL;
+
+  if (padtype == RTP_PAD)
+    sinkpad = demux->rtp_sink;
+  else if (padtype == RTCP_PAD)
+    sinkpad = demux->rtcp_sink;
+  else
+    g_assert_not_reached ();
+
+  fdata.ssrc = ssrc;
+  fdata.pad = pad;
+
+  gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
+}
+
+/* MUST only be called from streaming thread */
+static GstPad *
+find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
+    PadType padtype)
 {
   GstPad *rtp_pad, *rtcp_pad;
   GstElementClass *klass;
   GstPadTemplate *templ;
   gchar *padname;
   GstRtpSsrcDemuxPad *demuxpad;
-  GstCaps *caps;
+  GstPad *retpad;
+  guint num_streams;
 
-  GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
+  INTERNAL_STREAM_LOCK (demux);
 
-  demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
-  if (demuxpad != NULL) {
-    return demuxpad;
+  retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype);
+  if (retpad != NULL) {
+    INTERNAL_STREAM_UNLOCK (demux);
+    return retpad;
+  }
+  /* We create 2 src pads per ssrc (RTP & RTCP). Checking if we are allowed
+     to create 2 more pads */
+  num_streams = (GST_ELEMENT_CAST (demux)->numsrcpads) >> 1;
+  if (num_streams >= demux->max_streams) {
+    INTERNAL_STREAM_UNLOCK (demux);
+    return NULL;
   }
 
+  GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
+
   klass = GST_ELEMENT_GET_CLASS (demux);
   templ = gst_element_class_get_pad_template (klass, "src_%u");
   padname = g_strdup_printf ("src_%u", ssrc);
@@ -197,36 +319,81 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   gst_pad_set_element_private (rtp_pad, demuxpad);
   gst_pad_set_element_private (rtcp_pad, demuxpad);
 
+  GST_OBJECT_LOCK (demux);
   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+  GST_OBJECT_UNLOCK (demux);
 
-  /* copy caps from input */
-  caps = gst_pad_get_current_caps (demux->rtp_sink);
-  gst_pad_set_caps (rtp_pad, caps);
-  gst_caps_unref (caps);
-  gst_pad_use_fixed_caps (rtp_pad);
-  caps = gst_pad_get_current_caps (demux->rtcp_sink);
-  gst_pad_set_caps (rtcp_pad, caps);
-  gst_caps_unref (caps);
-  gst_pad_use_fixed_caps (rtcp_pad);
-
-  gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
   gst_pad_set_iterate_internal_links_function (rtp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
+  gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+  gst_pad_use_fixed_caps (rtp_pad);
   gst_pad_set_active (rtp_pad, TRUE);
 
   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_iterate_internal_links_function (rtcp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
+  gst_pad_use_fixed_caps (rtcp_pad);
   gst_pad_set_active (rtcp_pad, TRUE);
 
+  forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD);
+  forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD);
+
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
 
+  switch (padtype) {
+    case RTP_PAD:
+      retpad = gst_object_ref (demuxpad->rtp_pad);
+      break;
+    case RTCP_PAD:
+      retpad = gst_object_ref (demuxpad->rtcp_pad);
+      break;
+    default:
+      retpad = NULL;
+      g_assert_not_reached ();
+  }
+
   g_signal_emit (G_OBJECT (demux),
       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
 
-  return demuxpad;
+  INTERNAL_STREAM_UNLOCK (demux);
+
+  return retpad;
+}
+
+static void
+gst_rtp_ssrc_demux_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstRtpSsrcDemux *demux;
+
+  demux = GST_RTP_SSRC_DEMUX (object);
+  switch (prop_id) {
+    case PROP_MAX_STREAMS:
+      demux->max_streams = g_value_get_uint (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_rtp_ssrc_demux_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstRtpSsrcDemux *demux;
+
+  demux = GST_RTP_SSRC_DEMUX (object);
+  switch (prop_id) {
+    case PROP_MAX_STREAMS:
+      g_value_set_uint (value, demux->max_streams);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
 }
 
 static void
@@ -242,6 +409,14 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
 
   gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
   gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
+  gobject_klass->set_property = gst_rtp_ssrc_demux_set_property;
+  gobject_klass->get_property = gst_rtp_ssrc_demux_get_property;
+
+  g_object_class_install_property (gobject_klass, PROP_MAX_STREAMS,
+      g_param_spec_uint ("max-streams", "Max Streams",
+          "The maximum number of streams allowed",
+          0, G_MAXUINT, DEFAULT_MAX_STREAMS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstRtpSsrcDemux::new-ssrc-pad:
@@ -249,14 +424,13 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
    * @ssrc: the SSRC of the pad
    * @pad: the new pad.
    *
-   * Emited when a new SSRC pad has been created.
+   * Emitted when a new SSRC pad has been created.
    */
   gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
       g_signal_new ("new-ssrc-pad",
       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
-      G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
+      NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
 
   /**
    * GstRtpSsrcDemux::removed-ssrc-pad:
@@ -264,14 +438,13 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
    * @ssrc: the SSRC of the pad
    * @pad: the removed pad.
    *
-   * Emited when a SSRC pad has been removed.
+   * Emitted when a SSRC pad has been removed.
    */
   gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
       g_signal_new ("removed-ssrc-pad",
       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
-      G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
+      NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
 
   /**
    * GstRtpSsrcDemux::clear-ssrc:
@@ -284,29 +457,32 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
       g_signal_new ("clear-ssrc",
       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+      NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
 
   gstelement_klass->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
   gstrtpssrcdemux_klass->clear_ssrc =
       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
 
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_rtcp_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_src_template);
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_rtcp_src_template);
 
-  gst_element_class_set_details_simple (gstelement_klass, "RTP SSRC Demux",
+  gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
       "Demux/Network/RTP",
       "Splits RTP streams based on the SSRC",
       "Wim Taymans <wim.taymans@gmail.com>");
 
   GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
       "rtpssrcdemux", 0, "RTP SSRC demuxer");
+
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_ssrc_demux_chain);
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_ssrc_demux_rtcp_chain);
 }
 
 static void
@@ -327,15 +503,14 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
           "rtcp_sink"), "rtcp_sink");
   gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
-  gst_pad_set_event_function (demux->rtcp_sink,
-      gst_rtp_ssrc_demux_rtcp_sink_event);
+  gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_sink_event);
   gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
       gst_rtp_ssrc_demux_iterate_internal_links_sink);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
 
-  g_static_rec_mutex_init (&demux->padlock);
+  demux->max_streams = DEFAULT_MAX_STREAMS;
 
-  gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
+  g_rec_mutex_init (&demux->padlock);
 }
 
 static void
@@ -375,7 +550,7 @@ gst_rtp_ssrc_demux_finalize (GObject * object)
   GstRtpSsrcDemux *demux;
 
   demux = GST_RTP_SSRC_DEMUX (object);
-  g_static_rec_mutex_free (&demux->padlock);
+  g_rec_mutex_clear (&demux->padlock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -385,17 +560,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
 {
   GstRtpSsrcDemuxPad *dpad;
 
-  GST_PAD_LOCK (demux);
+  GST_OBJECT_LOCK (demux);
   dpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (dpad == NULL) {
-    GST_PAD_UNLOCK (demux);
+    GST_OBJECT_UNLOCK (demux);
     goto unknown_pad;
   }
 
   GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
 
   demux->srcpads = g_slist_remove (demux->srcpads, dpad);
-  GST_PAD_UNLOCK (demux);
+  GST_OBJECT_UNLOCK (demux);
 
   gst_pad_set_active (dpad->rtp_pad, FALSE);
   gst_pad_set_active (dpad->rtcp_pad, FALSE);
@@ -419,87 +594,58 @@ unknown_pad:
   }
 }
 
-static gboolean
-gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event)
+struct ForwardEventData
 {
   GstRtpSsrcDemux *demux;
-  gboolean res = FALSE;
+  GstEvent *event;
+  gboolean res;
+  GstPad *pad;
+};
 
-  demux = GST_RTP_SSRC_DEMUX (parent);
+static gboolean
+forward_event (GstPad * pad, gpointer user_data)
+{
+  struct ForwardEventData *fdata = user_data;
+  GSList *walk = NULL;
+  GstEvent *newevent = NULL;
 
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_FLUSH_STOP:
-      gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
-    default:
-    {
-      GSList *walk;
-      GSList *pads = NULL;
-
-      res = TRUE;
-      /* need local snapshot of pads;
-       * should not push downstream while holding lock as that might deadlock
-       * with stuff traveling upstream tyring to get this lock while holding
-       * other (stream)lock */
-      GST_PAD_LOCK (demux);
-      for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-        GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
-
-        pads = g_slist_prepend (pads, gst_object_ref (pad->rtp_pad));
-      }
-      GST_PAD_UNLOCK (demux);
-      for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstPad *pad = (GstPad *) walk->data;
+  GST_OBJECT_LOCK (fdata->demux);
+  for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
+    GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
 
-        gst_event_ref (event);
-        res &= gst_pad_push_event (pad, event);
-        gst_object_unref (pad);
-      }
-      g_slist_free (pads);
-      gst_event_unref (event);
+    if (pad == dpad->rtp_pad || pad == dpad->rtcp_pad) {
+      newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc);
       break;
     }
   }
+  GST_OBJECT_UNLOCK (fdata->demux);
 
-  return res;
+  if (newevent)
+    fdata->res &= gst_pad_push_event (pad, newevent);
+
+  return FALSE;
 }
 
+
 static gboolean
-gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstObject * parent,
+gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
     GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
-  gboolean res = FALSE;
+  struct ForwardEventData fdata;
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  switch (GST_EVENT_TYPE (event)) {
-    default:
-    {
-      GSList *walk;
-      GSList *pads = NULL;
+  fdata.demux = demux;
+  fdata.pad = pad;
+  fdata.event = event;
+  fdata.res = TRUE;
 
-      res = TRUE;
-      GST_PAD_LOCK (demux);
-      for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-        GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
+  gst_pad_forward (pad, forward_event, &fdata);
 
-        pads = g_slist_prepend (pads, gst_object_ref (pad->rtcp_pad));
-      }
-      GST_PAD_UNLOCK (demux);
-      for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstPad *pad = (GstPad *) walk->data;
+  gst_event_unref (event);
 
-        gst_event_ref (event);
-        res &= gst_pad_push_event (pad, event);
-        gst_object_unref (pad);
-      }
-      g_slist_free (pads);
-      gst_event_unref (event);
-      break;
-    }
-  }
-  return res;
+  return fdata.res;
 }
 
 static GstFlowReturn
@@ -508,33 +654,40 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   GstFlowReturn ret;
   GstRtpSsrcDemux *demux;
   guint32 ssrc;
-  GstRtpSsrcDemuxPad *dpad;
   GstRTPBuffer rtp = { NULL };
   GstPad *srcpad;
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  if (!gst_rtp_buffer_validate (buf))
+  if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
     goto invalid_payload;
 
-  gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
   gst_rtp_buffer_unmap (&rtp);
 
   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
 
-  GST_PAD_LOCK (demux);
-  dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
-  if (dpad == NULL) {
-    GST_PAD_UNLOCK (demux);
+  srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+  if (srcpad == NULL)
     goto create_failed;
-  }
-  srcpad = gst_object_ref (dpad->rtp_pad);
-  GST_PAD_UNLOCK (demux);
 
   /* push to srcpad */
   ret = gst_pad_push (srcpad, buf);
 
+  if (ret != GST_FLOW_OK) {
+    GstPad *active_pad;
+
+    /* check if the ssrc still there, may have been removed */
+    active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+
+    if (active_pad == NULL || active_pad != srcpad) {
+      /* SSRC was removed during the push ... ignore the error */
+      ret = GST_FLOW_OK;
+    }
+
+    g_clear_object (&active_pad);
+  }
+
   gst_object_unref (srcpad);
 
   return ret;
@@ -542,18 +695,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   /* ERRORS */
 invalid_payload:
   {
-    /* this is fatal and should be filtered earlier */
-    GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
-        ("Dropping invalid RTP payload"));
+    GST_DEBUG_OBJECT (demux, "Dropping invalid RTP packet");
     gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+    return GST_FLOW_OK;
   }
 create_failed:
   {
-    GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
-        ("Could not create new pad"));
     gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+    GST_WARNING_OBJECT (demux,
+        "Dropping buffer SSRC %08x. "
+        "Max streams number reached (%u)", ssrc, demux->max_streams);
+    return GST_FLOW_OK;
   }
 }
 
@@ -564,14 +716,13 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
   GstFlowReturn ret;
   GstRtpSsrcDemux *demux;
   guint32 ssrc;
-  GstRtpSsrcDemuxPad *dpad;
   GstRTCPPacket packet;
   GstRTCPBuffer rtcp = { NULL, };
   GstPad *srcpad;
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  if (!gst_rtcp_buffer_validate (buf))
+  if (!gst_rtcp_buffer_validate_reduced (buf))
     goto invalid_rtcp;
 
   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
@@ -580,13 +731,25 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
     goto invalid_rtcp;
   }
 
-  /* first packet must be SR or RR or else the validate would have failed */
+  /* first packet must be SR or RR, or in case of a reduced size RTCP packet
+   * it must be APP, RTPFB or PSFB feeadback, or else the validate would
+   * have failed */
   switch (gst_rtcp_packet_get_type (&packet)) {
     case GST_RTCP_TYPE_SR:
       /* get the ssrc so that we can route it to the right source pad */
       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
           NULL);
       break;
+    case GST_RTCP_TYPE_RR:
+      ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
+      break;
+    case GST_RTCP_TYPE_APP:
+      ssrc = gst_rtcp_packet_app_get_ssrc (&packet);
+      break;
+    case GST_RTCP_TYPE_RTPFB:
+    case GST_RTCP_TYPE_PSFB:
+      ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet);
+      break;
     default:
       goto unexpected_rtcp;
   }
@@ -594,18 +757,26 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
 
   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
 
-  GST_PAD_LOCK (demux);
-  dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
-  if (dpad == NULL) {
-    GST_PAD_UNLOCK (demux);
+  srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+  if (srcpad == NULL)
     goto create_failed;
-  }
-  srcpad = gst_object_ref (dpad->rtcp_pad);
-  GST_PAD_UNLOCK (demux);
 
   /* push to srcpad */
   ret = gst_pad_push (srcpad, buf);
 
+  if (ret != GST_FLOW_OK) {
+    GstPad *active_pad;
+
+    /* check if the ssrc still there, may have been removed */
+    active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+    if (active_pad == NULL || active_pad != srcpad) {
+      /* SSRC was removed during the push ... ignore the error */
+      ret = GST_FLOW_OK;
+    }
+
+    g_clear_object (&active_pad);
+  }
+
   gst_object_unref (srcpad);
 
   return ret;
@@ -613,11 +784,9 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
   /* ERRORS */
 invalid_rtcp:
   {
-    /* this is fatal and should be filtered earlier */
-    GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
-        ("Dropping invalid RTCP packet"));
+    GST_DEBUG_OBJECT (demux, "Dropping invalid RTCP packet");
     gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+    return GST_FLOW_OK;
   }
 unexpected_rtcp:
   {
@@ -627,13 +796,30 @@ unexpected_rtcp:
   }
 create_failed:
   {
-    GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
-        ("Could not create new pad"));
     gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+    GST_WARNING_OBJECT (demux,
+        "Dropping buffer SSRC %08x. "
+        "Max streams number reached (%u)", ssrc, demux->max_streams);
+    return GST_FLOW_OK;
+  }
+}
+
+static GstRtpSsrcDemuxPad *
+find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
+{
+  GSList *walk;
+
+  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+    if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
+      return dpad;
+    }
   }
+
+  return NULL;
 }
 
+
 static gboolean
 gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
     GstEvent * event)
@@ -649,21 +835,14 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
     case GST_EVENT_CUSTOM_BOTH_OOB:
       s = gst_event_get_structure (event);
       if (s && !gst_structure_has_field (s, "ssrc")) {
-        GSList *walk;
-
-        for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-          GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+        GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
 
-          if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
-            GstStructure *ws;
+        if (dpad) {
+          GstStructure *ws;
 
-            event =
-                GST_EVENT_CAST (gst_mini_object_make_writable
-                (GST_MINI_OBJECT_CAST (event)));
-            ws = gst_event_writable_structure (event);
-            gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
-            break;
-          }
+          event = gst_event_make_writable (event);
+          ws = gst_event_writable_structure (event);
+          gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
         }
       }
       break;
@@ -684,7 +863,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  GST_PAD_LOCK (demux);
+  GST_OBJECT_LOCK (demux);
   for (current = demux->srcpads; current; current = g_slist_next (current)) {
     GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
 
@@ -703,9 +882,9 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
     g_value_set_object (&val, otherpad);
     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
     g_value_unset (&val);
-    gst_object_unref (otherpad);
+
   }
-  GST_PAD_UNLOCK (demux);
+  GST_OBJECT_UNLOCK (demux);
 
   return it;
 }
@@ -714,12 +893,14 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
 static gint
 src_pad_compare_func (gconstpointer a, gconstpointer b)
 {
-  GstPad *pad = GST_PAD (a);
-  const gchar *prefix = b;
-  gint res = 1;
+  GstPad *pad = GST_PAD (g_value_get_object (a));
+  const gchar *prefix = g_value_get_string (b);
+  gint res;
 
+  /* 0 means equal means we accept the pad, accepted if there is a name
+   * and it starts with the prefix */
   GST_OBJECT_LOCK (pad);
-  res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix);
+  res = !GST_PAD_NAME (pad) || !g_str_has_prefix (GST_PAD_NAME (pad), prefix);
   GST_OBJECT_UNLOCK (pad);
 
   return res;
@@ -744,7 +925,6 @@ gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
     g_assert_not_reached ();
 
   it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
-
   it = gst_iterator_filter (it, src_pad_compare_func, &gval);
 
   return it;