rtpssrcdemux: Release lock before signalling new pad
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
index 25ab9c3..314cddf 100644 (file)
@@ -70,21 +70,27 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
     );
 
 static GstStaticPadTemplate rtp_ssrc_demux_src_template =
-GST_STATIC_PAD_TEMPLATE ("src_%d",
+GST_STATIC_PAD_TEMPLATE ("src_%u",
     GST_PAD_SRC,
     GST_PAD_SOMETIMES,
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
+GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
     GST_PAD_SRC,
     GST_PAD_SOMETIMES,
     GST_STATIC_CAPS ("application/x-rtcp")
     );
 
-#define GST_PAD_LOCK(obj)   (g_mutex_lock ((obj)->padlock))
-#define GST_PAD_UNLOCK(obj) (g_mutex_unlock ((obj)->padlock))
+#define GST_PAD_LOCK(obj)   (g_rec_mutex_lock (&(obj)->padlock))
+#define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
+
+typedef enum
+{
+  RTP_PAD,
+  RTCP_PAD
+} PadType;
 
 /* signals */
 enum
@@ -95,9 +101,8 @@ enum
   LAST_SIGNAL
 };
 
-GST_BOILERPLATE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GstElement,
-    GST_TYPE_ELEMENT);
-
+#define gst_rtp_ssrc_demux_parent_class parent_class
+G_DEFINE_TYPE (GstRtpSsrcDemux, gst_rtp_ssrc_demux, GST_TYPE_ELEMENT);
 
 /* GObject vmethods */
 static void gst_rtp_ssrc_demux_dispose (GObject * object);
@@ -111,21 +116,25 @@ static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
     guint32 ssrc);
 
 /* sinkpad stuff */
-static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf);
-static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event);
+static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buf);
+static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
 
 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
-    GstBuffer * buf);
+    GstObject * parent, GstBuffer * buf);
 static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
-    GstEvent * event);
+    GstObject * parent, GstEvent * event);
 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
-    pad);
+    pad, GstObject * parent);
 
 /* srcpad stuff */
-static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
-static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad *
-    pad);
-static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
+    GstObject * parent);
+static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 
 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
 
@@ -156,33 +165,95 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   return NULL;
 }
 
-static GstRtpSsrcDemuxPad *
-find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+static GstEvent *
+add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
+{
+  /* Set the ssrc on the output caps */
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CAPS:
+    {
+      GstCaps *caps;
+      GstCaps *newcaps;
+      GstStructure *s;
+
+      gst_event_parse_caps (event, &caps);
+      newcaps = gst_caps_copy (caps);
+
+      s = gst_caps_get_structure (newcaps, 0);
+      gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
+      event = gst_event_new_caps (newcaps);
+      gst_caps_unref (newcaps);
+      break;
+    }
+    default:
+      gst_event_ref (event);
+      break;
+  }
+
+  return event;
+}
+
+struct ForwardEventData
+{
+  GstPad *pad;
+  guint32 ssrc;
+};
+
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+  struct ForwardEventData *data = user_data;
+  GstEvent *newevent;
+
+  newevent = add_ssrc_and_ref (*event, data->ssrc);
+
+  gst_pad_push_event (data->pad, newevent);
+
+  return TRUE;
+}
+
+
+static GstPad *
+find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
+    PadType padtype)
 {
   GstPad *rtp_pad, *rtcp_pad;
   GstElementClass *klass;
   GstPadTemplate *templ;
   gchar *padname;
   GstRtpSsrcDemuxPad *demuxpad;
+  GstCaps *caps;
+  struct ForwardEventData fdata;
+  GstPad *retpad;
 
   GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
 
-  GST_OBJECT_LOCK (demux);
+  GST_PAD_LOCK (demux);
 
   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (demuxpad != NULL) {
-    GST_OBJECT_UNLOCK (demux);
-    return demuxpad;
+    switch (padtype) {
+      case RTP_PAD:
+        retpad = gst_object_ref (demuxpad->rtp_pad);
+        break;
+      case RTCP_PAD:
+        retpad = gst_object_ref (demuxpad->rtcp_pad);
+        break;
+      default:
+        g_assert_not_reached ();
+    }
+    GST_PAD_UNLOCK (demux);
+    return retpad;
   }
 
   klass = GST_ELEMENT_GET_CLASS (demux);
-  templ = gst_element_class_get_pad_template (klass, "src_%d");
-  padname = g_strdup_printf ("src_%d", ssrc);
+  templ = gst_element_class_get_pad_template (klass, "src_%u");
+  padname = g_strdup_printf ("src_%u", ssrc);
   rtp_pad = gst_pad_new_from_template (templ, padname);
   g_free (padname);
 
-  templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d");
-  padname = g_strdup_printf ("rtcp_src_%d", ssrc);
+  templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
+  padname = g_strdup_printf ("rtcp_src_%u", ssrc);
   rtcp_pad = gst_pad_new_from_template (templ, padname);
   g_free (padname);
 
@@ -192,57 +263,65 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   demuxpad->rtp_pad = rtp_pad;
   demuxpad->rtcp_pad = rtcp_pad;
 
+  fdata.ssrc = ssrc;
+
   gst_pad_set_element_private (rtp_pad, demuxpad);
   gst_pad_set_element_private (rtcp_pad, demuxpad);
 
   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
 
-  /* copy caps from input */
-  gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink));
-  gst_pad_use_fixed_caps (rtp_pad);
-  gst_pad_set_caps (rtcp_pad, GST_PAD_CAPS (demux->rtcp_sink));
-  gst_pad_use_fixed_caps (rtcp_pad);
-
-  gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
   gst_pad_set_iterate_internal_links_function (rtp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
+  gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+  gst_pad_use_fixed_caps (rtp_pad);
   gst_pad_set_active (rtp_pad, TRUE);
+  fdata.pad = rtp_pad;
+  gst_pad_sticky_events_foreach (demux->rtp_sink, forward_sticky_events,
+      &fdata);
 
   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_iterate_internal_links_function (rtcp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
+  gst_pad_use_fixed_caps (rtcp_pad);
   gst_pad_set_active (rtcp_pad, TRUE);
+  fdata.pad = rtcp_pad;
+  gst_pad_sticky_events_foreach (demux->rtcp_sink, forward_sticky_events,
+      &fdata);
 
-  GST_OBJECT_UNLOCK (demux);
-
+  /* copy caps from input */
+  if ((caps = gst_pad_get_current_caps (demux->rtp_sink))) {
+    gst_pad_set_caps (rtp_pad, caps);
+    gst_caps_unref (caps);
+  }
+  if ((caps = gst_pad_get_current_caps (demux->rtcp_sink))) {
+    gst_pad_set_caps (rtcp_pad, caps);
+    gst_caps_unref (caps);
+  }
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
 
-  g_signal_emit (G_OBJECT (demux),
-      gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
+  switch (padtype) {
+    case RTP_PAD:
+      retpad = gst_object_ref (demuxpad->rtp_pad);
+      break;
+    case RTCP_PAD:
+      retpad = gst_object_ref (demuxpad->rtcp_pad);
+      break;
+    default:
+      g_assert_not_reached ();
+  }
 
-  return demuxpad;
-}
+  gst_object_ref (rtp_pad);
 
-static void
-gst_rtp_ssrc_demux_base_init (gpointer g_class)
-{
-  GstElementClass *gstelement_klass = GST_ELEMENT_CLASS (g_class);
+  GST_PAD_UNLOCK (demux);
 
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
+  g_signal_emit (G_OBJECT (demux),
+      gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
 
-  gst_element_class_set_details_simple (gstelement_klass, "RTP SSRC Demux",
-      "Demux/Network/RTP",
-      "Splits RTP streams based on the SSRC",
-      "Wim Taymans <wim.taymans@gmail.com>");
+  gst_object_unref (rtp_pad);
+
+  return retpad;
 }
 
 static void
@@ -307,13 +386,26 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
   gstrtpssrcdemux_klass->clear_ssrc =
       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
 
+  gst_element_class_add_pad_template (gstelement_klass,
+      gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
+  gst_element_class_add_pad_template (gstelement_klass,
+      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
+  gst_element_class_add_pad_template (gstelement_klass,
+      gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
+  gst_element_class_add_pad_template (gstelement_klass,
+      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
+
+  gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
+      "Demux/Network/RTP",
+      "Splits RTP streams based on the SSRC",
+      "Wim Taymans <wim.taymans@gmail.com>");
+
   GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
       "rtpssrcdemux", 0, "RTP SSRC demuxer");
 }
 
 static void
-gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
-    GstRtpSsrcDemuxClass * g_class)
+gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
 {
   GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
 
@@ -336,7 +428,7 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
       gst_rtp_ssrc_demux_iterate_internal_links_sink);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
 
-  demux->padlock = g_mutex_new ();
+  g_rec_mutex_init (&demux->padlock);
 
   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
 }
@@ -378,7 +470,7 @@ gst_rtp_ssrc_demux_finalize (GObject * object)
   GstRtpSsrcDemux *demux;
 
   demux = GST_RTP_SSRC_DEMUX (object);
-  g_mutex_free (demux->padlock);
+  g_rec_mutex_clear (&demux->padlock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -423,21 +515,18 @@ unknown_pad:
 }
 
 static gboolean
-gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
   gboolean res = FALSE;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-  if (G_UNLIKELY (demux == NULL)) {
-    gst_event_unref (event);
-    return FALSE;
-  }
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
-    case GST_EVENT_NEWSEGMENT:
+      /* fallthrough */
     default:
     {
       GSList *walk;
@@ -452,15 +541,22 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
 
-        pads = g_slist_prepend (pads, gst_object_ref (pad->rtp_pad));
+        pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
+        gst_object_ref (pad->rtp_pad);
+
+        pads = g_slist_prepend (pads, pad);
       }
       GST_PAD_UNLOCK (demux);
+
       for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstPad *pad = (GstPad *) walk->data;
+        GstRtpSsrcDemuxPad *dpad = walk->data;
+        GstEvent *newevent;
 
-        gst_event_ref (event);
-        res &= gst_pad_push_event (pad, event);
-        gst_object_unref (pad);
+        newevent = add_ssrc_and_ref (event, dpad->ssrc);
+
+        res &= gst_pad_push_event (dpad->rtp_pad, newevent);
+        gst_object_unref (dpad->rtp_pad);
+        g_slice_free (GstRtpSsrcDemuxPad, dpad);
       }
       g_slist_free (pads);
       gst_event_unref (event);
@@ -468,72 +564,74 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
     }
   }
 
-  gst_object_unref (demux);
   return res;
 }
 
 static gboolean
-gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
-  gboolean res = FALSE;
+  gboolean res = TRUE;
+  GSList *walk;
+  GSList *pads = NULL;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_NEWSEGMENT:
-    default:
-    {
-      GSList *walk;
-      GSList *pads = NULL;
+  GST_PAD_LOCK (demux);
+  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
 
-      res = TRUE;
-      GST_PAD_LOCK (demux);
-      for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-        GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
+    pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
+    gst_object_ref (pad->rtcp_pad);
 
-        pads = g_slist_prepend (pads, gst_object_ref (pad->rtcp_pad));
-      }
-      GST_PAD_UNLOCK (demux);
-      for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstPad *pad = (GstPad *) walk->data;
+    pads = g_slist_prepend (pads, pad);
+  }
+  GST_PAD_UNLOCK (demux);
 
-        gst_event_ref (event);
-        res &= gst_pad_push_event (pad, event);
-        gst_object_unref (pad);
-      }
-      g_slist_free (pads);
-      gst_event_unref (event);
-      break;
-    }
+  for (walk = pads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *dpad = walk->data;
+    GstEvent *newevent;
+
+    newevent = add_ssrc_and_ref (event, dpad->ssrc);
+
+    res &= gst_pad_push_event (dpad->rtcp_pad, newevent);
+    gst_object_unref (dpad->rtcp_pad);
+    g_slice_free (GstRtpSsrcDemuxPad, dpad);
   }
-  gst_object_unref (demux);
+  g_slist_free (pads);
+  gst_event_unref (event);
+
   return res;
 }
 
 static GstFlowReturn
-gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
+gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
 {
   GstFlowReturn ret;
   GstRtpSsrcDemux *demux;
   guint32 ssrc;
-  GstRtpSsrcDemuxPad *dpad;
+  GstRTPBuffer rtp = { NULL };
+  GstPad *srcpad;
 
-  demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
-  if (!gst_rtp_buffer_validate (buf))
+  if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
     goto invalid_payload;
 
-  ssrc = gst_rtp_buffer_get_ssrc (buf);
+  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+  gst_rtp_buffer_unmap (&rtp);
 
   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
 
-  dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
-  if (dpad == NULL)
+  srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+  if (srcpad == NULL)
     goto create_failed;
 
   /* push to srcpad */
-  ret = gst_pad_push (dpad->rtp_pad, buf);
+  ret = gst_pad_push (srcpad, buf);
+
+  gst_object_unref (srcpad);
 
   return ret;
 
@@ -556,21 +654,26 @@ create_failed:
 }
 
 static GstFlowReturn
-gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
+gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buf)
 {
   GstFlowReturn ret;
   GstRtpSsrcDemux *demux;
   guint32 ssrc;
-  GstRtpSsrcDemuxPad *dpad;
   GstRTCPPacket packet;
+  GstRTCPBuffer rtcp = { NULL, };
+  GstPad *srcpad;
 
-  demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   if (!gst_rtcp_buffer_validate (buf))
     goto invalid_rtcp;
 
-  if (!gst_rtcp_buffer_get_first_packet (buf, &packet))
+  gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
+  if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
+    gst_rtcp_buffer_unmap (&rtcp);
     goto invalid_rtcp;
+  }
 
   /* first packet must be SR or RR or else the validate would have failed */
   switch (gst_rtcp_packet_get_type (&packet)) {
@@ -582,16 +685,18 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
     default:
       goto unexpected_rtcp;
   }
+  gst_rtcp_buffer_unmap (&rtcp);
 
   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
 
-  dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
-  if (dpad == NULL)
+  srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+  if (srcpad == NULL)
     goto create_failed;
 
-
   /* push to srcpad */
-  ret = gst_pad_push (dpad->rtcp_pad, buf);
+  ret = gst_pad_push (srcpad, buf);
+
+  gst_object_unref (srcpad);
 
   return ret;
 
@@ -619,13 +724,30 @@ create_failed:
   }
 }
 
+static GstRtpSsrcDemuxPad *
+find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
+{
+  GSList *walk;
+
+  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+    if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
+      return dpad;
+    }
+  }
+
+  return NULL;
+}
+
+
 static gboolean
-gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
   const GstStructure *s;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_CUSTOM_UPSTREAM:
@@ -633,19 +755,14 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_CUSTOM_BOTH_OOB:
       s = gst_event_get_structure (event);
       if (s && !gst_structure_has_field (s, "ssrc")) {
-        GSList *walk;
-
-        for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-          GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
-
-          if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
-            event =
-                GST_EVENT_CAST (gst_mini_object_make_writable
-                (GST_MINI_OBJECT_CAST (event)));
-            gst_structure_set (event->structure, "ssrc", G_TYPE_UINT,
-                dpad->ssrc, NULL);
-            break;
-          }
+        GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
+
+        if (dpad) {
+          GstStructure *ws;
+
+          event = gst_event_make_writable (event);
+          ws = gst_event_writable_structure (event);
+          gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
         }
       }
       break;
@@ -653,23 +770,18 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
       break;
   }
 
-  gst_object_unref (demux);
-
-  return gst_pad_event_default (pad, event);
+  return gst_pad_event_default (pad, parent, event);
 }
 
 static GstIterator *
-gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
+gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
 {
   GstRtpSsrcDemux *demux;
   GstPad *otherpad = NULL;
   GstIterator *it = NULL;
   GSList *current;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-
-  if (!demux)
-    return NULL;
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   GST_PAD_LOCK (demux);
   for (current = demux->srcpads; current; current = g_slist_next (current)) {
@@ -683,11 +795,17 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
       break;
     }
   }
-  it = gst_iterator_new_single (GST_TYPE_PAD, otherpad,
-      (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
+  if (otherpad) {
+    GValue val = { 0, };
+
+    g_value_init (&val, GST_TYPE_PAD);
+    g_value_set_object (&val, otherpad);
+    it = gst_iterator_new_single (GST_TYPE_PAD, &val);
+    g_value_unset (&val);
+
+  }
   GST_PAD_UNLOCK (demux);
 
-  gst_object_unref (demux);
   return it;
 }
 
@@ -695,8 +813,8 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
 static gint
 src_pad_compare_func (gconstpointer a, gconstpointer b)
 {
-  GstPad *pad = GST_PAD (a);
-  const gchar *prefix = b;
+  GstPad *pad = GST_PAD (g_value_get_object (a));
+  const gchar *prefix = g_value_get_string (b);
   gint res = 1;
 
   GST_OBJECT_LOCK (pad);
@@ -707,39 +825,38 @@ src_pad_compare_func (gconstpointer a, gconstpointer b)
 }
 
 static GstIterator *
-gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad)
+gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
+    GstObject * parent)
 {
   GstRtpSsrcDemux *demux;
   GstIterator *it = NULL;
-  const gchar *prefix = NULL;
+  GValue gval = { 0, };
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-
-  if (!demux)
-    return NULL;
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
+  g_value_init (&gval, G_TYPE_STRING);
   if (pad == demux->rtp_sink)
-    prefix = "src_";
+    g_value_set_static_string (&gval, "src_");
   else if (pad == demux->rtcp_sink)
-    prefix = "rtcp_src_";
+    g_value_set_static_string (&gval, "rtcp_src_");
   else
     g_assert_not_reached ();
 
-  it = gst_element_iterate_src_pads (GST_ELEMENT (demux));
+  it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
+  it = gst_iterator_filter (it, src_pad_compare_func, &gval);
 
-  return gst_iterator_filter (it, src_pad_compare_func, (gpointer) prefix);
+  return it;
 }
 
 
 static gboolean
-gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
+gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query)
 {
   GstRtpSsrcDemux *demux;
   gboolean res = FALSE;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-  if (G_UNLIKELY (demux == NULL))
-    return FALSE;
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_LATENCY:
@@ -764,10 +881,9 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
       break;
     }
     default:
-      res = gst_pad_query_default (pad, query);
+      res = gst_pad_query_default (pad, parent, query);
       break;
   }
-  gst_object_unref (demux);
 
   return res;
 }