streamsynchronizer: avoid pad destruction races.
authorCharlie Turner <cturner@igalia.com>
Fri, 20 Sep 2019 08:48:30 +0000 (09:48 +0100)
committerSebastian Dröge <slomo@coaxion.net>
Tue, 24 Sep 2019 20:09:09 +0000 (20:09 +0000)
Due to the use of {set/get}-element_private methods being used to store
the GstSyncStream in the src and sink pads, and the racey nature of pad
destruction, there are numerous ways we can be bitten by race conditions
in the stream synchronizer. Fix that by tying the pads toghether with
references.

gst/playback/gststreamsynchronizer.c

index 8380fa418a8ddb39280eefe444bac6ca137451b3..41836f79b798680f678cff96f6e7bb6d712312c2 100644 (file)
@@ -81,35 +81,147 @@ typedef struct
   guint32 stream_start_seqnum;
   guint32 segment_seqnum;
   guint group_id;
+
+  gint refcount;
 } GstSyncStream;
 
-/* Must be called with lock! */
-static inline GstPad *
-gst_stream_get_other_pad (GstSyncStream * stream, GstPad * pad)
+static GstSyncStream *
+gst_syncstream_ref (GstSyncStream * stream)
+{
+  g_return_val_if_fail (stream != NULL, NULL);
+  g_atomic_int_add (&stream->refcount, 1);
+  return stream;
+}
+
+static void
+gst_syncstream_unref (GstSyncStream * stream)
 {
-  if (stream->sinkpad == pad)
-    return gst_object_ref (stream->srcpad);
-  if (stream->srcpad == pad)
-    return gst_object_ref (stream->sinkpad);
+  g_return_if_fail (stream != NULL);
+  g_return_if_fail (stream->refcount > 0);
 
-  return NULL;
+  if (g_atomic_int_dec_and_test (&stream->refcount))
+    g_slice_free (GstSyncStream, stream);
 }
 
-static GstPad *
-gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
+G_BEGIN_DECLS
+#define GST_TYPE_STREAMSYNC_PAD              (gst_streamsync_pad_get_type ())
+#define GST_IS_STREAMSYNC_PAD(obj)           (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD))
+#define GST_IS_STREAMSYNC_PAD_CLASS(klass)   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD))
+#define GST_STREAMSYNC_PAD(obj)              (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad))
+#define GST_STREAMSYNC_PAD_CLASS(klass)      (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass))
+typedef struct _GstStreamSyncPad GstStreamSyncPad;
+typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass;
+
+struct _GstStreamSyncPad
 {
+  GstPad parent;
+
   GstSyncStream *stream;
-  GstPad *opad = NULL;
 
-  GST_STREAM_SYNCHRONIZER_LOCK (self);
-  stream = gst_pad_get_element_private (pad);
-  if (!stream)
-    goto out;
+  /* Since we need to access data associated with a pad in this
+   * element, it's important to manage the respective lifetimes of the
+   * stored pad data and the pads themselves. Pad deactivation happens
+   * without mutual exclusion to the use of pad data in this element.
+   *
+   * The approach here is to have the sinkpad (the request pad) hold a
+   * strong reference onto the srcpad (so that it stays alive until
+   * the last pad is destroyed). Similarly the srcpad has a weak
+   * reference to the sinkpad (request pad) to ensure it knows when
+   * the pads are destroyed, since the pad data may be requested from
+   * either the srcpad or the sinkpad. This avoids a nasty set of
+   * potential race conditions.
+   *
+   * The code is arranged so that in the srcpad, the pad pointer is
+   * always NULL (not used) and in the sinkpad, the otherpad is always
+   * NULL. */
+  GstPad *pad;
+  GWeakRef otherpad;
+};
+
+struct _GstStreamSyncPadClass
+{
+  GstPadClass parent_class;
+};
 
-  opad = gst_stream_get_other_pad (stream, pad);
+static GType gst_streamsync_pad_get_type (void);
+static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad);
 
-out:
-  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+G_END_DECLS
+#define GST_STREAMSYNC_PAD_CAST(obj)         ((GstStreamSyncPad *)obj)
+  G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD);
+
+static void gst_streamsync_pad_dispose (GObject * object);
+
+static void
+gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass)
+{
+  GObjectClass *gobject_class;
+  gobject_class = G_OBJECT_CLASS (klass);
+  gobject_class->dispose = gst_streamsync_pad_dispose;
+}
+
+static void
+gst_streamsync_pad_init (GstStreamSyncPad * ppad)
+{
+}
+
+static void
+gst_streamsync_pad_dispose (GObject * object)
+{
+  GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object);
+
+  if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK)
+    gst_clear_object (&spad->pad);
+  else
+    g_weak_ref_clear (&spad->otherpad);
+
+  g_clear_pointer (&spad->stream, gst_syncstream_unref);
+
+  G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object);
+}
+
+static GstPad *
+gst_streamsync_pad_new_from_template (GstPadTemplate * templ,
+    const gchar * name)
+{
+  g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
+
+  return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD,
+          "name", name, "direction", templ->direction, "template", templ,
+          NULL));
+}
+
+static GstPad *
+gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ,
+    const gchar * name)
+{
+  GstPad *pad;
+  GstPadTemplate *template;
+
+  template = gst_static_pad_template_get (templ);
+  pad = gst_streamsync_pad_new_from_template (template, name);
+  gst_object_unref (template);
+
+  return pad;
+}
+
+static GstSyncStream *
+gst_streamsync_pad_get_stream (GstPad * pad)
+{
+  GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
+  return gst_syncstream_ref (spad->stream);
+}
+
+static GstPad *
+gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
+{
+  GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
+  GstPad *opad = NULL;
+
+  if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK)
+    opad = gst_object_ref (spad->pad);
+  else
+    opad = g_weak_ref_get (&spad->otherpad);
 
   if (!opad)
     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
@@ -163,9 +275,9 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
       gst_event_unref (event);
 
       GST_STREAM_SYNCHRONIZER_LOCK (self);
-      stream = gst_pad_get_element_private (pad);
-      if (stream)
-        running_time_diff = stream->segment.base;
+      stream = gst_streamsync_pad_get_stream (pad);
+      running_time_diff = stream->segment.base;
+      gst_syncstream_unref (stream);
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
 
       if (running_time_diff == -1) {
@@ -214,12 +326,9 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
   gboolean ret = FALSE;
   GstSyncStream *stream;
 
+  stream = gst_streamsync_pad_get_stream (pad);
+
   while (!self->eos && !self->flushing) {
-    stream = gst_pad_get_element_private (pad);
-    if (!stream) {
-      GST_WARNING_OBJECT (pad, "unknown stream");
-      return ret;
-    }
     if (stream->flushing) {
       GST_DEBUG_OBJECT (pad, "Flushing");
       break;
@@ -250,6 +359,7 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
       ret = gst_pad_push_event (pad, event);
       GST_STREAM_SYNCHRONIZER_LOCK (self);
       if (!ret) {
+        gst_syncstream_unref (stream);
         return ret;
       }
       stream->send_gap_event = FALSE;
@@ -262,6 +372,7 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
     g_cond_wait (&stream->stream_finish_cond, &self->lock);
   }
 
+  gst_syncstream_unref (stream);
   return TRUE;
 }
 
@@ -293,13 +404,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       self->have_group_id &= have_group_id;
       have_group_id = self->have_group_id;
 
-      stream = gst_pad_get_element_private (pad);
-
-      if (!stream) {
-        GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
-        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
-        break;
-      }
+      stream = gst_streamsync_pad_get_stream (pad);
 
       gst_event_parse_stream_flags (event, &stream->flags);
 
@@ -418,6 +523,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
         }
       }
 
+      gst_syncstream_unref (stream);
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       break;
     }
@@ -437,8 +543,8 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
         goto done;
       }
 
-      stream = gst_pad_get_element_private (pad);
-      if (stream && segment.format == GST_FORMAT_TIME) {
+      stream = gst_streamsync_pad_get_stream (pad);
+      if (segment.format == GST_FORMAT_TIME) {
         GST_DEBUG_OBJECT (pad,
             "New stream, updating base from %" GST_TIME_FORMAT " to %"
             GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
@@ -467,6 +573,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
             gst_format_get_name (segment.format));
         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
       }
+      gst_syncstream_unref (stream);
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       break;
     }
@@ -474,13 +581,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       GstSyncStream *stream;
 
       GST_STREAM_SYNCHRONIZER_LOCK (self);
-      stream = gst_pad_get_element_private (pad);
+      stream = gst_streamsync_pad_get_stream (pad);
       self->eos = FALSE;
-      if (stream) {
-        GST_DEBUG_OBJECT (pad, "Flushing streams");
-        stream->flushing = TRUE;
-        g_cond_broadcast (&stream->stream_finish_cond);
-      }
+      GST_DEBUG_OBJECT (pad, "Flushing streams");
+      stream->flushing = TRUE;
+      g_cond_broadcast (&stream->stream_finish_cond);
+      gst_syncstream_unref (stream);
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       break;
     }
@@ -490,18 +596,16 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       GstClockTime new_group_start_time = 0;
 
       GST_STREAM_SYNCHRONIZER_LOCK (self);
-      stream = gst_pad_get_element_private (pad);
-      if (stream) {
-        GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
-            stream->stream_number);
-        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
+      stream = gst_streamsync_pad_get_stream (pad);
+      GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
+          stream->stream_number);
+      gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
 
-        stream->is_eos = FALSE;
-        stream->eos_sent = FALSE;
-        stream->flushing = FALSE;
-        stream->wait = FALSE;
-        g_cond_broadcast (&stream->stream_finish_cond);
-      }
+      stream->is_eos = FALSE;
+      stream->eos_sent = FALSE;
+      stream->flushing = FALSE;
+      stream->wait = FALSE;
+      g_cond_broadcast (&stream->stream_finish_cond);
 
       for (l = self->streams; l; l = l->next) {
         GstSyncStream *ostream = l->data;
@@ -529,6 +633,8 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
           GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
           GST_TIME_ARGS (new_group_start_time));
       self->group_start_time = new_group_start_time;
+
+      gst_syncstream_unref (stream);
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       break;
     }
@@ -540,13 +646,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
         GstSyncStream *stream;
 
         GST_STREAM_SYNCHRONIZER_LOCK (self);
-        stream = gst_pad_get_element_private (pad);
-        if (stream) {
-          stream->is_eos = FALSE;
-          stream->eos_sent = FALSE;
-          stream->wait = FALSE;
-          g_cond_broadcast (&stream->stream_finish_cond);
-        }
+        stream = gst_streamsync_pad_get_stream (pad);
+        stream->is_eos = FALSE;
+        stream->eos_sent = FALSE;
+        stream->wait = FALSE;
+        g_cond_broadcast (&stream->stream_finish_cond);
+        gst_syncstream_unref (stream);
         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       }
       break;
@@ -562,12 +667,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       guint32 seqnum;
 
       GST_STREAM_SYNCHRONIZER_LOCK (self);
-      stream = gst_pad_get_element_private (pad);
-      if (!stream) {
-        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
-        GST_WARNING_OBJECT (pad, "EOS for unknown stream");
-        break;
-      }
+      stream = gst_streamsync_pad_get_stream (pad);
 
       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
       stream->is_eos = TRUE;
@@ -612,11 +712,9 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
         epad = pads;
         while (epad) {
           pad = epad->data;
-          ostream = gst_pad_get_element_private (pad);
-          if (ostream) {
-            g_cond_broadcast (&ostream->stream_finish_cond);
-          }
-
+          ostream = gst_streamsync_pad_get_stream (pad);
+          g_cond_broadcast (&ostream->stream_finish_cond);
+          gst_syncstream_unref (ostream);
           gst_object_unref (pad);
           epad = g_slist_next (epad);
         }
@@ -645,14 +743,14 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
         ret = gst_pad_push_event (srcpad, topush);
         GST_STREAM_SYNCHRONIZER_LOCK (self);
-        stream = gst_pad_get_element_private (pad);
-        if (stream) {
-          stream->eos_sent = TRUE;
-        }
+        stream = gst_streamsync_pad_get_stream (pad);
+        stream->eos_sent = TRUE;
+        gst_syncstream_unref (stream);
       }
 
       gst_object_unref (srcpad);
       gst_event_unref (event);
+      gst_syncstream_unref (stream);
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       goto done;
     }
@@ -694,21 +792,21 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
     timestamp_end = timestamp + duration;
 
   GST_STREAM_SYNCHRONIZER_LOCK (self);
-  stream = gst_pad_get_element_private (pad);
-
-  if (stream) {
-    stream->seen_data = TRUE;
-    if (stream->segment.format == GST_FORMAT_TIME
-        && GST_CLOCK_TIME_IS_VALID (timestamp)) {
-      GST_LOG_OBJECT (pad,
-          "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
-      if (stream->segment.rate > 0.0)
-        stream->segment.position = timestamp;
-      else
-        stream->segment.position = timestamp_end;
-    }
+  stream = gst_streamsync_pad_get_stream (pad);
+
+  stream->seen_data = TRUE;
+  if (stream->segment.format == GST_FORMAT_TIME
+      && GST_CLOCK_TIME_IS_VALID (timestamp)) {
+    GST_LOG_OBJECT (pad,
+        "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
+    if (stream->segment.rate > 0.0)
+      stream->segment.position = timestamp;
+    else
+      stream->segment.position = timestamp_end;
   }
+
+  gst_syncstream_unref (stream);
   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
 
   opad = gst_stream_get_other_pad_from_pad (self, pad);
@@ -722,8 +820,8 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
     GList *l;
 
     GST_STREAM_SYNCHRONIZER_LOCK (self);
-    stream = gst_pad_get_element_private (pad);
-    if (stream && stream->segment.format == GST_FORMAT_TIME) {
+    stream = gst_streamsync_pad_get_stream (pad);
+    if (stream->segment.format == GST_FORMAT_TIME) {
       GstClockTime position;
 
       if (stream->segment.rate > 0.0)
@@ -778,39 +876,40 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
         g_cond_broadcast (&ostream->stream_finish_cond);
       }
     }
+    gst_syncstream_unref (stream);
     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
   }
 
   return ret;
 }
 
-/* GstElement vfuncs */
+/* Must be called with lock! */
 static GstPad *
-gst_stream_synchronizer_request_new_pad (GstElement * element,
-    GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
+gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync)
 {
-  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
-  GstSyncStream *stream;
+  GstSyncStream *stream = NULL;
+  GstStreamSyncPad *sinkpad, *srcpad;
   gchar *tmp;
 
-  GST_STREAM_SYNCHRONIZER_LOCK (self);
-  GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
-      self->current_stream_number);
-
   stream = g_slice_new0 (GstSyncStream);
-  stream->transform = self;
-  stream->stream_number = self->current_stream_number;
+  stream->transform = sync;
+  stream->stream_number = sync->current_stream_number;
   g_cond_init (&stream->stream_finish_cond);
   stream->stream_start_seqnum = G_MAXUINT32;
   stream->segment_seqnum = G_MAXUINT32;
   stream->group_id = G_MAXUINT;
   stream->seen_data = FALSE;
   stream->send_gap_event = FALSE;
+  stream->refcount = 1;
 
-  tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
-  stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
+  tmp = g_strdup_printf ("sink_%u", sync->current_stream_number);
+  stream->sinkpad =
+      gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp);
   g_free (tmp);
-  gst_pad_set_element_private (stream->sinkpad, stream);
+
+  GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream =
+      gst_syncstream_ref (stream);
+
   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
   gst_pad_set_event_function (stream->sinkpad,
@@ -821,10 +920,24 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
   GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
   GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
 
-  tmp = g_strdup_printf ("src_%u", self->current_stream_number);
-  stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
+  tmp = g_strdup_printf ("src_%u", sync->current_stream_number);
+  stream->srcpad =
+      gst_streamsync_pad_new_from_static_template (&srctemplate, tmp);
   g_free (tmp);
-  gst_pad_set_element_private (stream->srcpad, stream);
+
+  GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream =
+      gst_syncstream_ref (stream);
+
+  sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad);
+  srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad);
+  /* Hold a strong reference from the sink (request pad) to the src to
+   * ensure a predicatable destruction order */
+  sinkpad->pad = gst_object_ref (srcpad);
+  /* And a weak reference from the src to the sink, to know when pad
+   * release is occuring, and to ensure we do not try and take
+   * references to inactive / destructing streams. */
+  g_weak_ref_init (&srcpad->otherpad, stream->sinkpad);
+
   gst_pad_set_iterate_internal_links_function (stream->srcpad,
       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
   gst_pad_set_event_function (stream->srcpad,
@@ -835,21 +948,43 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
 
   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
 
-  self->streams = g_list_prepend (self->streams, stream);
-  self->current_stream_number++;
-  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+  GST_STREAM_SYNCHRONIZER_UNLOCK (sync);
 
   /* Add pads and activate unless we're going to NULL */
-  g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
-  if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
+  g_rec_mutex_lock (GST_STATE_GET_LOCK (sync));
+  if (GST_STATE_TARGET (sync) != GST_STATE_NULL) {
     gst_pad_set_active (stream->srcpad, TRUE);
     gst_pad_set_active (stream->sinkpad, TRUE);
   }
-  gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
-  gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
-  g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
+  gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad);
+  gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad);
+  g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync));
+
+  GST_STREAM_SYNCHRONIZER_LOCK (sync);
+
+  sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream));
+  sync->current_stream_number++;
+
+  return GST_PAD_CAST (sinkpad);
+}
+
+/* GstElement vfuncs */
+static GstPad *
+gst_stream_synchronizer_request_new_pad (GstElement * element,
+    GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
+{
+  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
+  GstPad *request_pad;
 
-  return stream->sinkpad;
+  GST_STREAM_SYNCHRONIZER_LOCK (self);
+  GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
+      self->current_stream_number);
+
+  request_pad = gst_stream_synchronizer_new_pad (self);
+
+  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+
+  return request_pad;
 }
 
 /* Must be called with lock! */
@@ -878,15 +1013,15 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
    * (due to reverse lock order) when deactivating pads */
   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
 
-  gst_pad_set_element_private (stream->srcpad, NULL);
-  gst_pad_set_element_private (stream->sinkpad, NULL);
   gst_pad_set_active (stream->srcpad, FALSE);
   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
   gst_pad_set_active (stream->sinkpad, FALSE);
   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
 
   g_cond_clear (&stream->stream_finish_cond);
-  g_slice_free (GstSyncStream, stream);
+
+  /* Release the ref maintaining validity in the streams list */
+  gst_syncstream_unref (stream);
 
   /* NOTE: In theory we have to check here if all streams
    * are EOS but the one that was removed wasn't and then
@@ -908,12 +1043,12 @@ gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
   GstSyncStream *stream;
 
   GST_STREAM_SYNCHRONIZER_LOCK (self);
-  stream = gst_pad_get_element_private (pad);
-  if (stream) {
-    g_assert (stream->sinkpad == pad);
+  stream = gst_streamsync_pad_get_stream (pad);
+  g_assert (stream->sinkpad == pad);
 
-    gst_stream_synchronizer_release_stream (self, stream);
-  }
+  gst_stream_synchronizer_release_stream (self, stream);
+
+  gst_syncstream_unref (stream);
   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
 }