From d86f6132ed606ad7e1f4d18221c0bb9b34472cca Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Tue, 14 Aug 2012 18:53:52 +0200 Subject: [PATCH] streamsynchronizer: Handle stream switching * Update outgoing segment.base with accumulated time, ensuring all streams are synchronized. * Only consider streams as "new" is they have a STREAM_START event with a different seqnum. * Use GstStream segment.base instead of separate variable to store the past running time. * Disable passthrough * Switch to glib 2.32 GMutex/GCond * Avoid getting pad parent the expensive way * Minor other fixes --- gst/playback/gststreamsynchronizer.c | 203 ++++++++++++----------------------- gst/playback/gststreamsynchronizer.h | 2 +- 2 files changed, 69 insertions(+), 136 deletions(-) diff --git a/gst/playback/gststreamsynchronizer.c b/gst/playback/gststreamsynchronizer.c index fd78697..d12a15b 100644 --- a/gst/playback/gststreamsynchronizer.c +++ b/gst/playback/gststreamsynchronizer.c @@ -22,7 +22,6 @@ #endif #include "gststreamsynchronizer.h" -#include "gst/glib-compat-private.h" GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug); #define GST_CAT_DEFAULT stream_synchronizer_debug @@ -31,7 +30,7 @@ GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug); GST_LOG_OBJECT (obj, \ "locking from thread %p", \ g_thread_self ()); \ - g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \ + g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \ GST_LOG_OBJECT (obj, \ "locked from thread %p", \ g_thread_self ()); \ @@ -41,7 +40,7 @@ GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug); GST_LOG_OBJECT (obj, \ "unlocking from thread %p", \ g_thread_self ()); \ - g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \ + g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \ } G_STMT_END static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u", @@ -53,8 +52,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); -static const gboolean passthrough = FALSE; - #define gst_stream_synchronizer_parent_class parent_class G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer, GST_TYPE_ELEMENT); @@ -67,42 +64,38 @@ typedef struct GstPad *sinkpad; GstSegment segment; - gboolean wait; + gboolean wait; /* TRUE if waiting/blocking */ gboolean new_stream; gboolean drop_discont; - gboolean is_eos; + gboolean is_eos; /* TRUE if EOS was received */ gboolean seen_data; - gint64 running_time_diff; + GCond stream_finish_cond; - GCond *stream_finish_cond; + /* seqnum of the previously received STREAM_START + * default: G_MAXUINT32 */ + guint32 stream_start_seqnum; + guint32 segment_seqnum; } GstStream; /* Must be called with lock! */ -static GstPad * +static inline GstPad * gst_stream_get_other_pad (GstStream * stream, GstPad * pad) { if (stream->sinkpad == pad) return gst_object_ref (stream->srcpad); - else if (stream->srcpad == pad) + if (stream->srcpad == pad) return gst_object_ref (stream->sinkpad); return NULL; } static GstPad * -gst_stream_get_other_pad_from_pad (GstPad * pad) +gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad) { - GstObject *parent = gst_pad_get_parent (pad); - GstStreamSynchronizer *self; GstStream *stream; GstPad *opad = NULL; - /* released pad does not have parent anymore */ - if (!G_LIKELY (parent)) - goto exit; - - self = GST_STREAM_SYNCHRONIZER (parent); GST_STREAM_SYNCHRONIZER_LOCK (self); stream = gst_pad_get_element_private (pad); if (!stream) @@ -112,9 +105,7 @@ gst_stream_get_other_pad_from_pad (GstPad * pad) out: GST_STREAM_SYNCHRONIZER_UNLOCK (self); - gst_object_unref (self); -exit: if (!opad) GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing"); @@ -129,7 +120,8 @@ gst_stream_synchronizer_iterate_internal_links (GstPad * pad, GstIterator *it = NULL; GstPad *opad; - opad = gst_stream_get_other_pad_from_pad (pad); + opad = + gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad); if (opad) { GValue value = { 0, }; @@ -152,7 +144,8 @@ gst_stream_synchronizer_query (GstPad * pad, GstObject * parent, GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query)); - opad = gst_stream_get_other_pad_from_pad (pad); + opad = + gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad); if (opad) { ret = gst_pad_peer_query (opad, query); gst_object_unref (opad); @@ -170,9 +163,6 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent, GstPad *opad; gboolean ret = FALSE; - if (passthrough) - goto skip_adjustments; - GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT, GST_EVENT_TYPE_NAME (event), event); @@ -181,7 +171,7 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent, gdouble proportion; GstClockTimeDiff diff; GstClockTime timestamp; - gint64 running_time_diff; + gint64 running_time_diff = -1; GstStream *stream; gst_event_parse_qos (event, NULL, &proportion, &diff, ×tamp); @@ -190,15 +180,14 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent, GST_STREAM_SYNCHRONIZER_LOCK (self); stream = gst_pad_get_element_private (pad); if (stream) - running_time_diff = stream->running_time_diff; - else - running_time_diff = -1; + running_time_diff = stream->segment.base; GST_STREAM_SYNCHRONIZER_UNLOCK (self); if (running_time_diff == -1) { GST_WARNING_OBJECT (pad, "QOS event before group start"); goto out; - } else if (timestamp < running_time_diff) { + } + if (timestamp < running_time_diff) { GST_DEBUG_OBJECT (pad, "QOS event from previous group"); goto out; } @@ -227,9 +216,7 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent, break; } -skip_adjustments: - - opad = gst_stream_get_other_pad_from_pad (pad); + opad = gst_stream_get_other_pad_from_pad (self, pad); if (opad) { ret = gst_pad_push_event (opad, event); gst_object_unref (opad); @@ -248,9 +235,6 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, GstPad *opad; gboolean ret = FALSE; - if (passthrough) - goto skip_adjustments; - GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT, GST_EVENT_TYPE_NAME (event), event); @@ -258,12 +242,13 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, case GST_EVENT_STREAM_START: { GstStream *stream; + guint32 seqnum = gst_event_get_seqnum (event); + GList *l; + gboolean all_wait = TRUE; GST_STREAM_SYNCHRONIZER_LOCK (self); stream = gst_pad_get_element_private (pad); - if (stream) { - GList *l; - gboolean all_wait = TRUE; + if (stream && stream->stream_start_seqnum != seqnum) { GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number); @@ -310,10 +295,11 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, for (l = self->streams; l; l = l->next) { GstStream *ostream = l->data; - g_cond_broadcast (ostream->stream_finish_cond); + g_cond_broadcast (&ostream->stream_finish_cond); } } - } + } else + GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source"); GST_STREAM_SYNCHRONIZER_UNLOCK (self); } break; @@ -328,7 +314,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, if (stream) { if (stream->wait) { GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number); - g_cond_wait (stream->stream_finish_cond, self->lock); + g_cond_wait (&stream->stream_finish_cond, &self->lock); stream = gst_pad_get_element_private (pad); if (stream) stream->wait = FALSE; @@ -343,42 +329,9 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, if (stream && segment.format == GST_FORMAT_TIME) { if (stream->new_stream) { - gint64 position_running_time = 0; - gint64 stop_running_time = 0; - - if (stream->segment.format == GST_FORMAT_TIME) { - position_running_time = - gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME, - stream->segment.position); - position_running_time = MAX (position_running_time, 0); - stop_running_time = - gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME, - stream->segment.stop); - stop_running_time = MAX (position_running_time, 0); - - if (stop_running_time != position_running_time) { - GST_WARNING_OBJECT (pad, - "Gap between position and segment stop: %" GST_TIME_FORMAT - " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time), - GST_TIME_ARGS (position_running_time)); - } - - if (stop_running_time < position_running_time) { - GST_DEBUG_OBJECT (pad, "Updating stop position"); - stream->segment.stop = stream->segment.position; - gst_pad_push_event (stream->srcpad, - gst_event_new_segment (&stream->segment)); - } - stop_running_time = MAX (stop_running_time, position_running_time); - GST_DEBUG_OBJECT (pad, - "Stop running time of last group: %" GST_TIME_FORMAT, - GST_TIME_ARGS (stop_running_time)); - } stream->new_stream = FALSE; stream->drop_discont = TRUE; - - - segment.base = stop_running_time; + segment.base = self->group_start_time; } GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT, @@ -386,14 +339,15 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, gst_segment_copy_into (&segment, &stream->segment); GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT, &stream->segment); + stream->segment_seqnum = gst_event_get_seqnum (event); GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.base)); - stream->running_time_diff = stream->segment.base; { GstEvent *tmpev; tmpev = gst_event_new_segment (&stream->segment); + gst_event_set_seqnum (tmpev, stream->segment_seqnum); gst_event_unref (event); event = tmpev; } @@ -413,7 +367,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, stream = gst_pad_get_element_private (pad); if (stream) { GST_DEBUG_OBJECT (pad, "Flushing streams"); - g_cond_broadcast (stream->stream_finish_cond); + g_cond_broadcast (&stream->stream_finish_cond); } GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; @@ -509,9 +463,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, break; } -skip_adjustments: - - opad = gst_stream_get_other_pad_from_pad (pad); + opad = gst_stream_get_other_pad_from_pad (self, pad); if (opad) { ret = gst_pad_push_event (opad, event); gst_object_unref (opad); @@ -533,15 +485,6 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, GstClockTime timestamp = GST_CLOCK_TIME_NONE; GstClockTime timestamp_end = GST_CLOCK_TIME_NONE; - if (passthrough) { - opad = gst_stream_get_other_pad_from_pad (pad); - if (opad) { - ret = gst_pad_push (opad, buffer); - gst_object_unref (opad); - } - goto done; - } - GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT, @@ -558,24 +501,25 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, GST_STREAM_SYNCHRONIZER_LOCK (self); stream = gst_pad_get_element_private (pad); - if (stream) + if (stream) { stream->seen_data = TRUE; - if (stream && stream->drop_discont) { - buffer = gst_buffer_make_writable (buffer); - GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); - stream->drop_discont = FALSE; - } + if (stream->drop_discont) { + buffer = gst_buffer_make_writable (buffer); + GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); + stream->drop_discont = FALSE; + } - if (stream && stream->segment.format == GST_FORMAT_TIME - && GST_CLOCK_TIME_IS_VALID (timestamp)) { - GST_LOG_OBJECT (pad, - "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, - GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp)); - stream->segment.position = timestamp; + if (stream->segment.format == GST_FORMAT_TIME + && GST_CLOCK_TIME_IS_VALID (timestamp)) { + GST_LOG_OBJECT (pad, + "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, + GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp)); + stream->segment.position = timestamp; + } } GST_STREAM_SYNCHRONIZER_UNLOCK (self); - opad = gst_stream_get_other_pad_from_pad (pad); + opad = gst_stream_get_other_pad_from_pad (self, pad); if (opad) { ret = gst_pad_push (opad, buffer); gst_object_unref (opad); @@ -590,7 +534,7 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, if (stream && stream->segment.format == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (timestamp_end)) { GST_LOG_OBJECT (pad, - "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, + "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp_end)); stream->segment.position = timestamp_end; @@ -637,7 +581,6 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, GST_STREAM_SYNCHRONIZER_UNLOCK (self); } -done: return ret; } @@ -657,7 +600,9 @@ gst_stream_synchronizer_request_new_pad (GstElement * element, stream = g_slice_new0 (GstStream); stream->transform = self; stream->stream_number = self->current_stream_number; - stream->stream_finish_cond = g_cond_new (); + g_cond_init (&stream->stream_finish_cond); + stream->stream_start_seqnum = G_MAXUINT32; + stream->segment_seqnum = G_MAXUINT32; tmp = g_strdup_printf ("sink_%u", self->current_stream_number); stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); @@ -743,14 +688,17 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self, stream->segment.position); stop_running_time = MAX (stop_running_time, position_running_time); - GST_DEBUG_OBJECT (stream->sinkpad, - "Stop running time was: %" GST_TIME_FORMAT, - GST_TIME_ARGS (stop_running_time)); + if (stop_running_time > self->group_start_time) { + GST_DEBUG_OBJECT (stream->sinkpad, + "Updating global start running time from %" GST_TIME_FORMAT " to %" + GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time), + GST_TIME_ARGS (stop_running_time)); - self->group_start_time = MAX (self->group_start_time, stop_running_time); + self->group_start_time = stop_running_time; + } } - g_cond_free (stream->stream_finish_cond); + g_cond_clear (&stream->stream_finish_cond); g_slice_free (GstStream, stream); /* NOTE: In theory we have to check here if all streams @@ -787,7 +735,7 @@ gst_stream_synchronizer_change_state (GstElement * element, GstStateChange transition) { GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element); - GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + GstStateChangeReturn ret; switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: @@ -799,9 +747,6 @@ gst_stream_synchronizer_change_state (GstElement * element, self->group_start_time = 0; self->shutdown = FALSE; break; - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING"); - break; case GST_STATE_CHANGE_PAUSED_TO_READY:{ GList *l; @@ -810,7 +755,7 @@ gst_stream_synchronizer_change_state (GstElement * element, GST_STREAM_SYNCHRONIZER_LOCK (self); for (l = self->streams; l; l = l->next) { GstStream *ostream = l->data; - g_cond_broadcast (ostream->stream_finish_cond); + g_cond_broadcast (&ostream->stream_finish_cond); } self->shutdown = TRUE; GST_STREAM_SYNCHRONIZER_UNLOCK (self); @@ -819,19 +764,12 @@ gst_stream_synchronizer_change_state (GstElement * element, break; } - { - GstStateChangeReturn bret; - - bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret); - if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE)) - return ret; - } + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret); + if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS)) + return ret; switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED"); - break; case GST_STATE_CHANGE_PAUSED_TO_READY:{ GList *l; @@ -855,8 +793,6 @@ gst_stream_synchronizer_change_state (GstElement * element, GST_DEBUG_OBJECT (self, "State change READY->NULL"); GST_STREAM_SYNCHRONIZER_LOCK (self); - while (self->streams) - gst_stream_synchronizer_release_stream (self, self->streams->data); self->current_stream_number = 0; GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; @@ -874,10 +810,7 @@ gst_stream_synchronizer_finalize (GObject * object) { GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object); - if (self->lock) { - g_mutex_free (self->lock); - self->lock = NULL; - } + g_mutex_clear (&self->lock); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -886,7 +819,7 @@ gst_stream_synchronizer_finalize (GObject * object) static void gst_stream_synchronizer_init (GstStreamSynchronizer * self) { - self->lock = g_mutex_new (); + g_mutex_init (&self->lock); } static void diff --git a/gst/playback/gststreamsynchronizer.h b/gst/playback/gststreamsynchronizer.h index 32b6c69..f7011cd 100644 --- a/gst/playback/gststreamsynchronizer.h +++ b/gst/playback/gststreamsynchronizer.h @@ -45,7 +45,7 @@ struct _GstStreamSynchronizer GstElement parent; /* < private > */ - GMutex *lock; + GMutex lock; gboolean shutdown; GList *streams; -- 2.7.4