guint32 stream_start_seqnum;
guint32 segment_seqnum;
guint group_id;
+
+ gint refcount;
} GstSyncStream;
-/* Must be called with lock! */
-static inline GstPad *
-gst_stream_get_other_pad (GstSyncStream * stream, GstPad * pad)
+static GstSyncStream *
+gst_syncstream_ref (GstSyncStream * stream)
+{
+ g_return_val_if_fail (stream != NULL, NULL);
+ g_atomic_int_add (&stream->refcount, 1);
+ return stream;
+}
+
+static void
+gst_syncstream_unref (GstSyncStream * stream)
{
- if (stream->sinkpad == pad)
- return gst_object_ref (stream->srcpad);
- if (stream->srcpad == pad)
- return gst_object_ref (stream->sinkpad);
+ g_return_if_fail (stream != NULL);
+ g_return_if_fail (stream->refcount > 0);
- return NULL;
+ if (g_atomic_int_dec_and_test (&stream->refcount))
+ g_slice_free (GstSyncStream, stream);
}
-static GstPad *
-gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
+G_BEGIN_DECLS
+#define GST_TYPE_STREAMSYNC_PAD (gst_streamsync_pad_get_type ())
+#define GST_IS_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD))
+#define GST_IS_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD))
+#define GST_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad))
+#define GST_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass))
+typedef struct _GstStreamSyncPad GstStreamSyncPad;
+typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass;
+
+struct _GstStreamSyncPad
{
+ GstPad parent;
+
GstSyncStream *stream;
- GstPad *opad = NULL;
- GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (!stream)
- goto out;
+ /* Since we need to access data associated with a pad in this
+ * element, it's important to manage the respective lifetimes of the
+ * stored pad data and the pads themselves. Pad deactivation happens
+ * without mutual exclusion to the use of pad data in this element.
+ *
+ * The approach here is to have the sinkpad (the request pad) hold a
+ * strong reference onto the srcpad (so that it stays alive until
+ * the last pad is destroyed). Similarly the srcpad has a weak
+ * reference to the sinkpad (request pad) to ensure it knows when
+ * the pads are destroyed, since the pad data may be requested from
+ * either the srcpad or the sinkpad. This avoids a nasty set of
+ * potential race conditions.
+ *
+ * The code is arranged so that in the srcpad, the pad pointer is
+ * always NULL (not used) and in the sinkpad, the otherpad is always
+ * NULL. */
+ GstPad *pad;
+ GWeakRef otherpad;
+};
+
+struct _GstStreamSyncPadClass
+{
+ GstPadClass parent_class;
+};
- opad = gst_stream_get_other_pad (stream, pad);
+static GType gst_streamsync_pad_get_type (void);
+static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad);
-out:
- GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+G_END_DECLS
+#define GST_STREAMSYNC_PAD_CAST(obj) ((GstStreamSyncPad *)obj)
+ G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD);
+
+static void gst_streamsync_pad_dispose (GObject * object);
+
+static void
+gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass)
+{
+ GObjectClass *gobject_class;
+ gobject_class = G_OBJECT_CLASS (klass);
+ gobject_class->dispose = gst_streamsync_pad_dispose;
+}
+
+static void
+gst_streamsync_pad_init (GstStreamSyncPad * ppad)
+{
+}
+
+static void
+gst_streamsync_pad_dispose (GObject * object)
+{
+ GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object);
+
+ if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK)
+ gst_clear_object (&spad->pad);
+ else
+ g_weak_ref_clear (&spad->otherpad);
+
+ g_clear_pointer (&spad->stream, gst_syncstream_unref);
+
+ G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object);
+}
+
+static GstPad *
+gst_streamsync_pad_new_from_template (GstPadTemplate * templ,
+ const gchar * name)
+{
+ g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
+
+ return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD,
+ "name", name, "direction", templ->direction, "template", templ,
+ NULL));
+}
+
+static GstPad *
+gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ,
+ const gchar * name)
+{
+ GstPad *pad;
+ GstPadTemplate *template;
+
+ template = gst_static_pad_template_get (templ);
+ pad = gst_streamsync_pad_new_from_template (template, name);
+ gst_object_unref (template);
+
+ return pad;
+}
+
+static GstSyncStream *
+gst_streamsync_pad_get_stream (GstPad * pad)
+{
+ GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
+ return gst_syncstream_ref (spad->stream);
+}
+
+static GstPad *
+gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
+{
+ GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
+ GstPad *opad = NULL;
+
+ if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK)
+ opad = gst_object_ref (spad->pad);
+ else
+ opad = g_weak_ref_get (&spad->otherpad);
if (!opad)
GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
gst_event_unref (event);
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (stream)
- running_time_diff = stream->segment.base;
+ stream = gst_streamsync_pad_get_stream (pad);
+ running_time_diff = stream->segment.base;
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
if (running_time_diff == -1) {
gboolean ret = FALSE;
GstSyncStream *stream;
+ stream = gst_streamsync_pad_get_stream (pad);
+
while (!self->eos && !self->flushing) {
- stream = gst_pad_get_element_private (pad);
- if (!stream) {
- GST_WARNING_OBJECT (pad, "unknown stream");
- return ret;
- }
if (stream->flushing) {
GST_DEBUG_OBJECT (pad, "Flushing");
break;
ret = gst_pad_push_event (pad, event);
GST_STREAM_SYNCHRONIZER_LOCK (self);
if (!ret) {
+ gst_syncstream_unref (stream);
return ret;
}
stream->send_gap_event = FALSE;
g_cond_wait (&stream->stream_finish_cond, &self->lock);
}
+ gst_syncstream_unref (stream);
return TRUE;
}
self->have_group_id &= have_group_id;
have_group_id = self->have_group_id;
- stream = gst_pad_get_element_private (pad);
-
- if (!stream) {
- GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
- GST_STREAM_SYNCHRONIZER_UNLOCK (self);
- break;
- }
+ stream = gst_streamsync_pad_get_stream (pad);
gst_event_parse_stream_flags (event, &stream->flags);
}
}
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
goto done;
}
- stream = gst_pad_get_element_private (pad);
- if (stream && segment.format == GST_FORMAT_TIME) {
+ stream = gst_streamsync_pad_get_stream (pad);
+ if (segment.format == GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (pad,
"New stream, updating base from %" GST_TIME_FORMAT " to %"
GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
gst_format_get_name (segment.format));
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
}
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
GstSyncStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
+ stream = gst_streamsync_pad_get_stream (pad);
self->eos = FALSE;
- if (stream) {
- GST_DEBUG_OBJECT (pad, "Flushing streams");
- stream->flushing = TRUE;
- g_cond_broadcast (&stream->stream_finish_cond);
- }
+ GST_DEBUG_OBJECT (pad, "Flushing streams");
+ stream->flushing = TRUE;
+ g_cond_broadcast (&stream->stream_finish_cond);
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
GstClockTime new_group_start_time = 0;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (stream) {
- GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
- stream->stream_number);
- gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
+ stream = gst_streamsync_pad_get_stream (pad);
+ GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
+ stream->stream_number);
+ gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
- stream->is_eos = FALSE;
- stream->eos_sent = FALSE;
- stream->flushing = FALSE;
- stream->wait = FALSE;
- g_cond_broadcast (&stream->stream_finish_cond);
- }
+ stream->is_eos = FALSE;
+ stream->eos_sent = FALSE;
+ stream->flushing = FALSE;
+ stream->wait = FALSE;
+ g_cond_broadcast (&stream->stream_finish_cond);
for (l = self->streams; l; l = l->next) {
GstSyncStream *ostream = l->data;
GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
GST_TIME_ARGS (new_group_start_time));
self->group_start_time = new_group_start_time;
+
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
GstSyncStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (stream) {
- stream->is_eos = FALSE;
- stream->eos_sent = FALSE;
- stream->wait = FALSE;
- g_cond_broadcast (&stream->stream_finish_cond);
- }
+ stream = gst_streamsync_pad_get_stream (pad);
+ stream->is_eos = FALSE;
+ stream->eos_sent = FALSE;
+ stream->wait = FALSE;
+ g_cond_broadcast (&stream->stream_finish_cond);
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
break;
guint32 seqnum;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (!stream) {
- GST_STREAM_SYNCHRONIZER_UNLOCK (self);
- GST_WARNING_OBJECT (pad, "EOS for unknown stream");
- break;
- }
+ stream = gst_streamsync_pad_get_stream (pad);
GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
stream->is_eos = TRUE;
epad = pads;
while (epad) {
pad = epad->data;
- ostream = gst_pad_get_element_private (pad);
- if (ostream) {
- g_cond_broadcast (&ostream->stream_finish_cond);
- }
-
+ ostream = gst_streamsync_pad_get_stream (pad);
+ g_cond_broadcast (&ostream->stream_finish_cond);
+ gst_syncstream_unref (ostream);
gst_object_unref (pad);
epad = g_slist_next (epad);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
ret = gst_pad_push_event (srcpad, topush);
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (stream) {
- stream->eos_sent = TRUE;
- }
+ stream = gst_streamsync_pad_get_stream (pad);
+ stream->eos_sent = TRUE;
+ gst_syncstream_unref (stream);
}
gst_object_unref (srcpad);
gst_event_unref (event);
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
goto done;
}
timestamp_end = timestamp + duration;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
-
- if (stream) {
- stream->seen_data = TRUE;
- if (stream->segment.format == GST_FORMAT_TIME
- && GST_CLOCK_TIME_IS_VALID (timestamp)) {
- GST_LOG_OBJECT (pad,
- "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
- GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
- if (stream->segment.rate > 0.0)
- stream->segment.position = timestamp;
- else
- stream->segment.position = timestamp_end;
- }
+ stream = gst_streamsync_pad_get_stream (pad);
+
+ stream->seen_data = TRUE;
+ if (stream->segment.format == GST_FORMAT_TIME
+ && GST_CLOCK_TIME_IS_VALID (timestamp)) {
+ GST_LOG_OBJECT (pad,
+ "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
+ if (stream->segment.rate > 0.0)
+ stream->segment.position = timestamp;
+ else
+ stream->segment.position = timestamp_end;
}
+
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
opad = gst_stream_get_other_pad_from_pad (self, pad);
GList *l;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (stream && stream->segment.format == GST_FORMAT_TIME) {
+ stream = gst_streamsync_pad_get_stream (pad);
+ if (stream->segment.format == GST_FORMAT_TIME) {
GstClockTime position;
if (stream->segment.rate > 0.0)
g_cond_broadcast (&ostream->stream_finish_cond);
}
}
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
return ret;
}
-/* GstElement vfuncs */
+/* Must be called with lock! */
static GstPad *
-gst_stream_synchronizer_request_new_pad (GstElement * element,
- GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
+gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync)
{
- GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
- GstSyncStream *stream;
+ GstSyncStream *stream = NULL;
+ GstStreamSyncPad *sinkpad, *srcpad;
gchar *tmp;
- GST_STREAM_SYNCHRONIZER_LOCK (self);
- GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
- self->current_stream_number);
-
stream = g_slice_new0 (GstSyncStream);
- stream->transform = self;
- stream->stream_number = self->current_stream_number;
+ stream->transform = sync;
+ stream->stream_number = sync->current_stream_number;
g_cond_init (&stream->stream_finish_cond);
stream->stream_start_seqnum = G_MAXUINT32;
stream->segment_seqnum = G_MAXUINT32;
stream->group_id = G_MAXUINT;
stream->seen_data = FALSE;
stream->send_gap_event = FALSE;
+ stream->refcount = 1;
- tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
- stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
+ tmp = g_strdup_printf ("sink_%u", sync->current_stream_number);
+ stream->sinkpad =
+ gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp);
g_free (tmp);
- gst_pad_set_element_private (stream->sinkpad, stream);
+
+ GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream =
+ gst_syncstream_ref (stream);
+
gst_pad_set_iterate_internal_links_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
gst_pad_set_event_function (stream->sinkpad,
GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
- tmp = g_strdup_printf ("src_%u", self->current_stream_number);
- stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
+ tmp = g_strdup_printf ("src_%u", sync->current_stream_number);
+ stream->srcpad =
+ gst_streamsync_pad_new_from_static_template (&srctemplate, tmp);
g_free (tmp);
- gst_pad_set_element_private (stream->srcpad, stream);
+
+ GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream =
+ gst_syncstream_ref (stream);
+
+ sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad);
+ srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad);
+ /* Hold a strong reference from the sink (request pad) to the src to
+ * ensure a predicatable destruction order */
+ sinkpad->pad = gst_object_ref (srcpad);
+ /* And a weak reference from the src to the sink, to know when pad
+ * release is occuring, and to ensure we do not try and take
+ * references to inactive / destructing streams. */
+ g_weak_ref_init (&srcpad->otherpad, stream->sinkpad);
+
gst_pad_set_iterate_internal_links_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
gst_pad_set_event_function (stream->srcpad,
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
- self->streams = g_list_prepend (self->streams, stream);
- self->current_stream_number++;
- GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+ GST_STREAM_SYNCHRONIZER_UNLOCK (sync);
/* Add pads and activate unless we're going to NULL */
- g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
- if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
+ g_rec_mutex_lock (GST_STATE_GET_LOCK (sync));
+ if (GST_STATE_TARGET (sync) != GST_STATE_NULL) {
gst_pad_set_active (stream->srcpad, TRUE);
gst_pad_set_active (stream->sinkpad, TRUE);
}
- gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
- gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
- g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
+ gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad);
+ gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad);
+ g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync));
+
+ GST_STREAM_SYNCHRONIZER_LOCK (sync);
+
+ sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream));
+ sync->current_stream_number++;
+
+ return GST_PAD_CAST (sinkpad);
+}
+
+/* GstElement vfuncs */
+static GstPad *
+gst_stream_synchronizer_request_new_pad (GstElement * element,
+ GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
+{
+ GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
+ GstPad *request_pad;
- return stream->sinkpad;
+ GST_STREAM_SYNCHRONIZER_LOCK (self);
+ GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
+ self->current_stream_number);
+
+ request_pad = gst_stream_synchronizer_new_pad (self);
+
+ GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+
+ return request_pad;
}
/* Must be called with lock! */
* (due to reverse lock order) when deactivating pads */
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
- gst_pad_set_element_private (stream->srcpad, NULL);
- gst_pad_set_element_private (stream->sinkpad, NULL);
gst_pad_set_active (stream->srcpad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
gst_pad_set_active (stream->sinkpad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
g_cond_clear (&stream->stream_finish_cond);
- g_slice_free (GstSyncStream, stream);
+
+ /* Release the ref maintaining validity in the streams list */
+ gst_syncstream_unref (stream);
/* NOTE: In theory we have to check here if all streams
* are EOS but the one that was removed wasn't and then
GstSyncStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self);
- stream = gst_pad_get_element_private (pad);
- if (stream) {
- g_assert (stream->sinkpad == pad);
+ stream = gst_streamsync_pad_get_stream (pad);
+ g_assert (stream->sinkpad == pad);
- gst_stream_synchronizer_release_stream (self, stream);
- }
+ gst_stream_synchronizer_release_stream (self, stream);
+
+ gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}