rtpssrcdemux: Release lock before signalling new pad
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
index e298299..314cddf 100644 (file)
@@ -70,21 +70,27 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
     );
 
 static GstStaticPadTemplate rtp_ssrc_demux_src_template =
-GST_STATIC_PAD_TEMPLATE ("src_%d",
+GST_STATIC_PAD_TEMPLATE ("src_%u",
     GST_PAD_SRC,
     GST_PAD_SOMETIMES,
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
 static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
+GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
     GST_PAD_SRC,
     GST_PAD_SOMETIMES,
     GST_STATIC_CAPS ("application/x-rtcp")
     );
 
-#define GST_PAD_LOCK(obj)   (g_static_rec_mutex_lock (&(obj)->padlock))
-#define GST_PAD_UNLOCK(obj) (g_static_rec_mutex_unlock (&(obj)->padlock))
+#define GST_PAD_LOCK(obj)   (g_rec_mutex_lock (&(obj)->padlock))
+#define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
+
+typedef enum
+{
+  RTP_PAD,
+  RTCP_PAD
+} PadType;
 
 /* signals */
 enum
@@ -110,21 +116,25 @@ static void gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux,
     guint32 ssrc);
 
 /* sinkpad stuff */
-static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf);
-static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event);
+static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buf);
+static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
 
 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
-    GstBuffer * buf);
+    GstObject * parent, GstBuffer * buf);
 static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
-    GstEvent * event);
+    GstObject * parent, GstEvent * event);
 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
-    pad);
+    pad, GstObject * parent);
 
 /* srcpad stuff */
-static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
-static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad *
-    pad);
-static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
+    GstObject * parent);
+static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
 
 static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
 
@@ -155,9 +165,57 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   return NULL;
 }
 
-/* with PAD_LOCK */
-static GstRtpSsrcDemuxPad *
-find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+static GstEvent *
+add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
+{
+  /* Set the ssrc on the output caps */
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CAPS:
+    {
+      GstCaps *caps;
+      GstCaps *newcaps;
+      GstStructure *s;
+
+      gst_event_parse_caps (event, &caps);
+      newcaps = gst_caps_copy (caps);
+
+      s = gst_caps_get_structure (newcaps, 0);
+      gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
+      event = gst_event_new_caps (newcaps);
+      gst_caps_unref (newcaps);
+      break;
+    }
+    default:
+      gst_event_ref (event);
+      break;
+  }
+
+  return event;
+}
+
+struct ForwardEventData
+{
+  GstPad *pad;
+  guint32 ssrc;
+};
+
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+  struct ForwardEventData *data = user_data;
+  GstEvent *newevent;
+
+  newevent = add_ssrc_and_ref (*event, data->ssrc);
+
+  gst_pad_push_event (data->pad, newevent);
+
+  return TRUE;
+}
+
+
+static GstPad *
+find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
+    PadType padtype)
 {
   GstPad *rtp_pad, *rtcp_pad;
   GstElementClass *klass;
@@ -165,22 +223,37 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   gchar *padname;
   GstRtpSsrcDemuxPad *demuxpad;
   GstCaps *caps;
+  struct ForwardEventData fdata;
+  GstPad *retpad;
 
   GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
 
+  GST_PAD_LOCK (demux);
+
   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (demuxpad != NULL) {
-    return demuxpad;
+    switch (padtype) {
+      case RTP_PAD:
+        retpad = gst_object_ref (demuxpad->rtp_pad);
+        break;
+      case RTCP_PAD:
+        retpad = gst_object_ref (demuxpad->rtcp_pad);
+        break;
+      default:
+        g_assert_not_reached ();
+    }
+    GST_PAD_UNLOCK (demux);
+    return retpad;
   }
 
   klass = GST_ELEMENT_GET_CLASS (demux);
-  templ = gst_element_class_get_pad_template (klass, "src_%d");
-  padname = g_strdup_printf ("src_%d", ssrc);
+  templ = gst_element_class_get_pad_template (klass, "src_%u");
+  padname = g_strdup_printf ("src_%u", ssrc);
   rtp_pad = gst_pad_new_from_template (templ, padname);
   g_free (padname);
 
-  templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d");
-  padname = g_strdup_printf ("rtcp_src_%d", ssrc);
+  templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
+  padname = g_strdup_printf ("rtcp_src_%u", ssrc);
   rtcp_pad = gst_pad_new_from_template (templ, padname);
   g_free (padname);
 
@@ -190,39 +263,65 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
   demuxpad->rtp_pad = rtp_pad;
   demuxpad->rtcp_pad = rtcp_pad;
 
+  fdata.ssrc = ssrc;
+
   gst_pad_set_element_private (rtp_pad, demuxpad);
   gst_pad_set_element_private (rtcp_pad, demuxpad);
 
   demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
 
-  /* copy caps from input */
-  caps = gst_pad_get_current_caps (demux->rtp_sink);
-  gst_pad_set_caps (rtp_pad, caps);
-  gst_caps_unref (caps);
-  gst_pad_use_fixed_caps (rtp_pad);
-  caps = gst_pad_get_current_caps (demux->rtcp_sink);
-  gst_pad_set_caps (rtcp_pad, caps);
-  gst_caps_unref (caps);
-  gst_pad_use_fixed_caps (rtcp_pad);
-
-  gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
   gst_pad_set_iterate_internal_links_function (rtp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
+  gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+  gst_pad_use_fixed_caps (rtp_pad);
   gst_pad_set_active (rtp_pad, TRUE);
+  fdata.pad = rtp_pad;
+  gst_pad_sticky_events_foreach (demux->rtp_sink, forward_sticky_events,
+      &fdata);
 
   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_iterate_internal_links_function (rtcp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
+  gst_pad_use_fixed_caps (rtcp_pad);
   gst_pad_set_active (rtcp_pad, TRUE);
+  fdata.pad = rtcp_pad;
+  gst_pad_sticky_events_foreach (demux->rtcp_sink, forward_sticky_events,
+      &fdata);
 
+  /* copy caps from input */
+  if ((caps = gst_pad_get_current_caps (demux->rtp_sink))) {
+    gst_pad_set_caps (rtp_pad, caps);
+    gst_caps_unref (caps);
+  }
+  if ((caps = gst_pad_get_current_caps (demux->rtcp_sink))) {
+    gst_pad_set_caps (rtcp_pad, caps);
+    gst_caps_unref (caps);
+  }
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
 
+  switch (padtype) {
+    case RTP_PAD:
+      retpad = gst_object_ref (demuxpad->rtp_pad);
+      break;
+    case RTCP_PAD:
+      retpad = gst_object_ref (demuxpad->rtcp_pad);
+      break;
+    default:
+      g_assert_not_reached ();
+  }
+
+  gst_object_ref (rtp_pad);
+
+  GST_PAD_UNLOCK (demux);
+
   g_signal_emit (G_OBJECT (demux),
       gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
 
-  return demuxpad;
+  gst_object_unref (rtp_pad);
+
+  return retpad;
 }
 
 static void
@@ -296,7 +395,7 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
   gst_element_class_add_pad_template (gstelement_klass,
       gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
 
-  gst_element_class_set_details_simple (gstelement_klass, "RTP SSRC Demux",
+  gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
       "Demux/Network/RTP",
       "Splits RTP streams based on the SSRC",
       "Wim Taymans <wim.taymans@gmail.com>");
@@ -329,7 +428,7 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
       gst_rtp_ssrc_demux_iterate_internal_links_sink);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
 
-  g_static_rec_mutex_init (&demux->padlock);
+  g_rec_mutex_init (&demux->padlock);
 
   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
 }
@@ -371,7 +470,7 @@ gst_rtp_ssrc_demux_finalize (GObject * object)
   GstRtpSsrcDemux *demux;
 
   demux = GST_RTP_SSRC_DEMUX (object);
-  g_static_rec_mutex_free (&demux->padlock);
+  g_rec_mutex_clear (&demux->padlock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -416,20 +515,18 @@ unknown_pad:
 }
 
 static gboolean
-gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
   gboolean res = FALSE;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-  if (G_UNLIKELY (demux == NULL)) {
-    gst_event_unref (event);
-    return FALSE;
-  }
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
+      /* fallthrough */
     default:
     {
       GSList *walk;
@@ -444,15 +541,22 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
       for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
         GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
 
-        pads = g_slist_prepend (pads, gst_object_ref (pad->rtp_pad));
+        pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
+        gst_object_ref (pad->rtp_pad);
+
+        pads = g_slist_prepend (pads, pad);
       }
       GST_PAD_UNLOCK (demux);
+
       for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstPad *pad = (GstPad *) walk->data;
+        GstRtpSsrcDemuxPad *dpad = walk->data;
+        GstEvent *newevent;
 
-        gst_event_ref (event);
-        res &= gst_pad_push_event (pad, event);
-        gst_object_unref (pad);
+        newevent = add_ssrc_and_ref (event, dpad->ssrc);
+
+        res &= gst_pad_push_event (dpad->rtp_pad, newevent);
+        gst_object_unref (dpad->rtp_pad);
+        g_slice_free (GstRtpSsrcDemuxPad, dpad);
       }
       g_slist_free (pads);
       gst_event_unref (event);
@@ -460,77 +564,69 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
     }
   }
 
-  gst_object_unref (demux);
   return res;
 }
 
 static gboolean
-gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
-  gboolean res = FALSE;
+  gboolean res = TRUE;
+  GSList *walk;
+  GSList *pads = NULL;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
-  switch (GST_EVENT_TYPE (event)) {
-    default:
-    {
-      GSList *walk;
-      GSList *pads = NULL;
+  GST_PAD_LOCK (demux);
+  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
 
-      res = TRUE;
-      GST_PAD_LOCK (demux);
-      for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-        GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
+    pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
+    gst_object_ref (pad->rtcp_pad);
 
-        pads = g_slist_prepend (pads, gst_object_ref (pad->rtcp_pad));
-      }
-      GST_PAD_UNLOCK (demux);
-      for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstPad *pad = (GstPad *) walk->data;
+    pads = g_slist_prepend (pads, pad);
+  }
+  GST_PAD_UNLOCK (demux);
 
-        gst_event_ref (event);
-        res &= gst_pad_push_event (pad, event);
-        gst_object_unref (pad);
-      }
-      g_slist_free (pads);
-      gst_event_unref (event);
-      break;
-    }
+  for (walk = pads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *dpad = walk->data;
+    GstEvent *newevent;
+
+    newevent = add_ssrc_and_ref (event, dpad->ssrc);
+
+    res &= gst_pad_push_event (dpad->rtcp_pad, newevent);
+    gst_object_unref (dpad->rtcp_pad);
+    g_slice_free (GstRtpSsrcDemuxPad, dpad);
   }
-  gst_object_unref (demux);
+  g_slist_free (pads);
+  gst_event_unref (event);
+
   return res;
 }
 
 static GstFlowReturn
-gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
+gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
 {
   GstFlowReturn ret;
   GstRtpSsrcDemux *demux;
   guint32 ssrc;
-  GstRtpSsrcDemuxPad *dpad;
-  GstRTPBuffer rtp;
+  GstRTPBuffer rtp = { NULL };
   GstPad *srcpad;
 
-  demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
-  if (!gst_rtp_buffer_validate (buf))
+  if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
     goto invalid_payload;
 
-  gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
   gst_rtp_buffer_unmap (&rtp);
 
   GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
 
-  GST_PAD_LOCK (demux);
-  dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
-  if (dpad == NULL) {
-    GST_PAD_UNLOCK (demux);
+  srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+  if (srcpad == NULL)
     goto create_failed;
-  }
-  srcpad = gst_object_ref (dpad->rtp_pad);
-  GST_PAD_UNLOCK (demux);
 
   /* push to srcpad */
   ret = gst_pad_push (srcpad, buf);
@@ -558,17 +654,17 @@ create_failed:
 }
 
 static GstFlowReturn
-gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
+gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buf)
 {
   GstFlowReturn ret;
   GstRtpSsrcDemux *demux;
   guint32 ssrc;
-  GstRtpSsrcDemuxPad *dpad;
   GstRTCPPacket packet;
-  GstRTCPBuffer rtcp;
+  GstRTCPBuffer rtcp = { NULL, };
   GstPad *srcpad;
 
-  demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   if (!gst_rtcp_buffer_validate (buf))
     goto invalid_rtcp;
@@ -593,14 +689,9 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
 
   GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
 
-  GST_PAD_LOCK (demux);
-  dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
-  if (dpad == NULL) {
-    GST_PAD_UNLOCK (demux);
+  srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+  if (srcpad == NULL)
     goto create_failed;
-  }
-  srcpad = gst_object_ref (dpad->rtcp_pad);
-  GST_PAD_UNLOCK (demux);
 
   /* push to srcpad */
   ret = gst_pad_push (srcpad, buf);
@@ -633,13 +724,30 @@ create_failed:
   }
 }
 
+static GstRtpSsrcDemuxPad *
+find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
+{
+  GSList *walk;
+
+  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+    GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+    if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
+      return dpad;
+    }
+  }
+
+  return NULL;
+}
+
+
 static gboolean
-gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
+    GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
   const GstStructure *s;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_CUSTOM_UPSTREAM:
@@ -647,21 +755,14 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_CUSTOM_BOTH_OOB:
       s = gst_event_get_structure (event);
       if (s && !gst_structure_has_field (s, "ssrc")) {
-        GSList *walk;
+        GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
 
-        for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-          GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+        if (dpad) {
+          GstStructure *ws;
 
-          if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
-            GstStructure *ws;
-
-            event =
-                GST_EVENT_CAST (gst_mini_object_make_writable
-                (GST_MINI_OBJECT_CAST (event)));
-            ws = gst_event_writable_structure (event);
-            gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
-            break;
-          }
+          event = gst_event_make_writable (event);
+          ws = gst_event_writable_structure (event);
+          gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
         }
       }
       break;
@@ -669,23 +770,18 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
       break;
   }
 
-  gst_object_unref (demux);
-
-  return gst_pad_event_default (pad, event);
+  return gst_pad_event_default (pad, parent, event);
 }
 
 static GstIterator *
-gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
+gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
 {
   GstRtpSsrcDemux *demux;
   GstPad *otherpad = NULL;
   GstIterator *it = NULL;
   GSList *current;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-
-  if (!demux)
-    return NULL;
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   GST_PAD_LOCK (demux);
   for (current = demux->srcpads; current; current = g_slist_next (current)) {
@@ -706,11 +802,10 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
     g_value_set_object (&val, otherpad);
     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
     g_value_unset (&val);
-    gst_object_unref (otherpad);
+
   }
   GST_PAD_UNLOCK (demux);
 
-  gst_object_unref (demux);
   return it;
 }
 
@@ -718,8 +813,8 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
 static gint
 src_pad_compare_func (gconstpointer a, gconstpointer b)
 {
-  GstPad *pad = GST_PAD (a);
-  const gchar *prefix = b;
+  GstPad *pad = GST_PAD (g_value_get_object (a));
+  const gchar *prefix = g_value_get_string (b);
   gint res = 1;
 
   GST_OBJECT_LOCK (pad);
@@ -730,16 +825,14 @@ src_pad_compare_func (gconstpointer a, gconstpointer b)
 }
 
 static GstIterator *
-gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad)
+gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
+    GstObject * parent)
 {
   GstRtpSsrcDemux *demux;
   GstIterator *it = NULL;
   GValue gval = { 0, };
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-
-  if (!demux)
-    return NULL;
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   g_value_init (&gval, G_TYPE_STRING);
   if (pad == demux->rtp_sink)
@@ -749,25 +842,21 @@ gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad)
   else
     g_assert_not_reached ();
 
-  it = gst_element_iterate_src_pads (GST_ELEMENT (demux));
-
+  it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
   it = gst_iterator_filter (it, src_pad_compare_func, &gval);
 
-  gst_object_unref (demux);
-
   return it;
 }
 
 
 static gboolean
-gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
+gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query)
 {
   GstRtpSsrcDemux *demux;
   gboolean res = FALSE;
 
-  demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-  if (G_UNLIKELY (demux == NULL))
-    return FALSE;
+  demux = GST_RTP_SSRC_DEMUX (parent);
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_LATENCY:
@@ -792,10 +881,9 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
       break;
     }
     default:
-      res = gst_pad_query_default (pad, query);
+      res = gst_pad_query_default (pad, parent, query);
       break;
   }
-  gst_object_unref (demux);
 
   return res;
 }