streamsynchronizer: Handle stream switching
authorEdward Hervey <edward.hervey@collabora.co.uk>
Tue, 14 Aug 2012 16:53:52 +0000 (18:53 +0200)
committerEdward Hervey <edward.hervey@collabora.co.uk>
Tue, 14 Aug 2012 16:56:30 +0000 (18:56 +0200)
* Update outgoing segment.base with accumulated time, ensuring all
  streams are synchronized.
* Only consider streams as "new" is they have a STREAM_START event
  with a different seqnum.
* Use GstStream segment.base instead of separate variable to store
  the past running time.
* Disable passthrough
* Switch to glib 2.32 GMutex/GCond
* Avoid getting pad parent the expensive way
* Minor other fixes

gst/playback/gststreamsynchronizer.c
gst/playback/gststreamsynchronizer.h

index fd78697..d12a15b 100644 (file)
@@ -22,7 +22,6 @@
 #endif
 
 #include "gststreamsynchronizer.h"
-#include "gst/glib-compat-private.h"
 
 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
 #define GST_CAT_DEFAULT stream_synchronizer_debug
@@ -31,7 +30,7 @@ GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
     GST_LOG_OBJECT (obj,                                                \
                     "locking from thread %p",                           \
                     g_thread_self ());                                  \
-    g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
+    g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
     GST_LOG_OBJECT (obj,                                                \
                     "locked from thread %p",                            \
                     g_thread_self ());                                  \
@@ -41,7 +40,7 @@ GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
     GST_LOG_OBJECT (obj,                                                \
                     "unlocking from thread %p",                         \
                     g_thread_self ());                                  \
-    g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
+    g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
 } G_STMT_END
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
@@ -53,8 +52,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
     GST_PAD_REQUEST,
     GST_STATIC_CAPS_ANY);
 
-static const gboolean passthrough = FALSE;
-
 #define gst_stream_synchronizer_parent_class parent_class
 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
     GST_TYPE_ELEMENT);
@@ -67,42 +64,38 @@ typedef struct
   GstPad *sinkpad;
   GstSegment segment;
 
-  gboolean wait;
+  gboolean wait;                /* TRUE if waiting/blocking */
   gboolean new_stream;
   gboolean drop_discont;
-  gboolean is_eos;
+  gboolean is_eos;              /* TRUE if EOS was received */
   gboolean seen_data;
 
-  gint64 running_time_diff;
+  GCond stream_finish_cond;
 
-  GCond *stream_finish_cond;
+  /* seqnum of the previously received STREAM_START
+   * default: G_MAXUINT32 */
+  guint32 stream_start_seqnum;
+  guint32 segment_seqnum;
 } GstStream;
 
 /* Must be called with lock! */
-static GstPad *
+static inline GstPad *
 gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
 {
   if (stream->sinkpad == pad)
     return gst_object_ref (stream->srcpad);
-  else if (stream->srcpad == pad)
+  if (stream->srcpad == pad)
     return gst_object_ref (stream->sinkpad);
 
   return NULL;
 }
 
 static GstPad *
-gst_stream_get_other_pad_from_pad (GstPad * pad)
+gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
 {
-  GstObject *parent = gst_pad_get_parent (pad);
-  GstStreamSynchronizer *self;
   GstStream *stream;
   GstPad *opad = NULL;
 
-  /* released pad does not have parent anymore */
-  if (!G_LIKELY (parent))
-    goto exit;
-
-  self = GST_STREAM_SYNCHRONIZER (parent);
   GST_STREAM_SYNCHRONIZER_LOCK (self);
   stream = gst_pad_get_element_private (pad);
   if (!stream)
@@ -112,9 +105,7 @@ gst_stream_get_other_pad_from_pad (GstPad * pad)
 
 out:
   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
-  gst_object_unref (self);
 
-exit:
   if (!opad)
     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
 
@@ -129,7 +120,8 @@ gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
   GstIterator *it = NULL;
   GstPad *opad;
 
-  opad = gst_stream_get_other_pad_from_pad (pad);
+  opad =
+      gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
   if (opad) {
     GValue value = { 0, };
 
@@ -152,7 +144,8 @@ gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
 
   GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
 
-  opad = gst_stream_get_other_pad_from_pad (pad);
+  opad =
+      gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
   if (opad) {
     ret = gst_pad_peer_query (opad, query);
     gst_object_unref (opad);
@@ -170,9 +163,6 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
   GstPad *opad;
   gboolean ret = FALSE;
 
-  if (passthrough)
-    goto skip_adjustments;
-
   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
       GST_EVENT_TYPE_NAME (event), event);
 
@@ -181,7 +171,7 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
       gdouble proportion;
       GstClockTimeDiff diff;
       GstClockTime timestamp;
-      gint64 running_time_diff;
+      gint64 running_time_diff = -1;
       GstStream *stream;
 
       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
@@ -190,15 +180,14 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
       GST_STREAM_SYNCHRONIZER_LOCK (self);
       stream = gst_pad_get_element_private (pad);
       if (stream)
-        running_time_diff = stream->running_time_diff;
-      else
-        running_time_diff = -1;
+        running_time_diff = stream->segment.base;
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
 
       if (running_time_diff == -1) {
         GST_WARNING_OBJECT (pad, "QOS event before group start");
         goto out;
-      } else if (timestamp < running_time_diff) {
+      }
+      if (timestamp < running_time_diff) {
         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
         goto out;
       }
@@ -227,9 +216,7 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
       break;
   }
 
-skip_adjustments:
-
-  opad = gst_stream_get_other_pad_from_pad (pad);
+  opad = gst_stream_get_other_pad_from_pad (self, pad);
   if (opad) {
     ret = gst_pad_push_event (opad, event);
     gst_object_unref (opad);
@@ -248,9 +235,6 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
   GstPad *opad;
   gboolean ret = FALSE;
 
-  if (passthrough)
-    goto skip_adjustments;
-
   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
       GST_EVENT_TYPE_NAME (event), event);
 
@@ -258,12 +242,13 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
     case GST_EVENT_STREAM_START:
     {
       GstStream *stream;
+      guint32 seqnum = gst_event_get_seqnum (event);
+      GList *l;
+      gboolean all_wait = TRUE;
 
       GST_STREAM_SYNCHRONIZER_LOCK (self);
       stream = gst_pad_get_element_private (pad);
-      if (stream) {
-        GList *l;
-        gboolean all_wait = TRUE;
+      if (stream && stream->stream_start_seqnum != seqnum) {
 
         GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
 
@@ -310,10 +295,11 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
 
           for (l = self->streams; l; l = l->next) {
             GstStream *ostream = l->data;
-            g_cond_broadcast (ostream->stream_finish_cond);
+            g_cond_broadcast (&ostream->stream_finish_cond);
           }
         }
-      }
+      } else
+        GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
     }
       break;
@@ -328,7 +314,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       if (stream) {
         if (stream->wait) {
           GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
-          g_cond_wait (stream->stream_finish_cond, self->lock);
+          g_cond_wait (&stream->stream_finish_cond, &self->lock);
           stream = gst_pad_get_element_private (pad);
           if (stream)
             stream->wait = FALSE;
@@ -343,42 +329,9 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
 
       if (stream && segment.format == GST_FORMAT_TIME) {
         if (stream->new_stream) {
-          gint64 position_running_time = 0;
-          gint64 stop_running_time = 0;
-
-          if (stream->segment.format == GST_FORMAT_TIME) {
-            position_running_time =
-                gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
-                stream->segment.position);
-            position_running_time = MAX (position_running_time, 0);
-            stop_running_time =
-                gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
-                stream->segment.stop);
-            stop_running_time = MAX (position_running_time, 0);
-
-            if (stop_running_time != position_running_time) {
-              GST_WARNING_OBJECT (pad,
-                  "Gap between position and segment stop: %" GST_TIME_FORMAT
-                  " != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
-                  GST_TIME_ARGS (position_running_time));
-            }
-
-            if (stop_running_time < position_running_time) {
-              GST_DEBUG_OBJECT (pad, "Updating stop position");
-              stream->segment.stop = stream->segment.position;
-              gst_pad_push_event (stream->srcpad,
-                  gst_event_new_segment (&stream->segment));
-            }
-            stop_running_time = MAX (stop_running_time, position_running_time);
-            GST_DEBUG_OBJECT (pad,
-                "Stop running time of last group: %" GST_TIME_FORMAT,
-                GST_TIME_ARGS (stop_running_time));
-          }
           stream->new_stream = FALSE;
           stream->drop_discont = TRUE;
-
-
-          segment.base = stop_running_time;
+          segment.base = self->group_start_time;
         }
 
         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
@@ -386,14 +339,15 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
         gst_segment_copy_into (&segment, &stream->segment);
         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
             &stream->segment);
+        stream->segment_seqnum = gst_event_get_seqnum (event);
 
         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
             GST_TIME_ARGS (stream->segment.base));
-        stream->running_time_diff = stream->segment.base;
         {
           GstEvent *tmpev;
 
           tmpev = gst_event_new_segment (&stream->segment);
+          gst_event_set_seqnum (tmpev, stream->segment_seqnum);
           gst_event_unref (event);
           event = tmpev;
         }
@@ -413,7 +367,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       stream = gst_pad_get_element_private (pad);
       if (stream) {
         GST_DEBUG_OBJECT (pad, "Flushing streams");
-        g_cond_broadcast (stream->stream_finish_cond);
+        g_cond_broadcast (&stream->stream_finish_cond);
       }
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       break;
@@ -509,9 +463,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
       break;
   }
 
-skip_adjustments:
-
-  opad = gst_stream_get_other_pad_from_pad (pad);
+  opad = gst_stream_get_other_pad_from_pad (self, pad);
   if (opad) {
     ret = gst_pad_push_event (opad, event);
     gst_object_unref (opad);
@@ -533,15 +485,6 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
 
-  if (passthrough) {
-    opad = gst_stream_get_other_pad_from_pad (pad);
-    if (opad) {
-      ret = gst_pad_push (opad, buffer);
-      gst_object_unref (opad);
-    }
-    goto done;
-  }
-
   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
@@ -558,24 +501,25 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
   GST_STREAM_SYNCHRONIZER_LOCK (self);
   stream = gst_pad_get_element_private (pad);
 
-  if (stream)
+  if (stream) {
     stream->seen_data = TRUE;
-  if (stream && stream->drop_discont) {
-    buffer = gst_buffer_make_writable (buffer);
-    GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
-    stream->drop_discont = FALSE;
-  }
+    if (stream->drop_discont) {
+      buffer = gst_buffer_make_writable (buffer);
+      GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
+      stream->drop_discont = FALSE;
+    }
 
-  if (stream && stream->segment.format == GST_FORMAT_TIME
-      && GST_CLOCK_TIME_IS_VALID (timestamp)) {
-    GST_LOG_OBJECT (pad,
-        "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
-    stream->segment.position = timestamp;
+    if (stream->segment.format == GST_FORMAT_TIME
+        && GST_CLOCK_TIME_IS_VALID (timestamp)) {
+      GST_LOG_OBJECT (pad,
+          "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
+      stream->segment.position = timestamp;
+    }
   }
   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
 
-  opad = gst_stream_get_other_pad_from_pad (pad);
+  opad = gst_stream_get_other_pad_from_pad (self, pad);
   if (opad) {
     ret = gst_pad_push (opad, buffer);
     gst_object_unref (opad);
@@ -590,7 +534,7 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
     if (stream && stream->segment.format == GST_FORMAT_TIME
         && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
       GST_LOG_OBJECT (pad,
-          "Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
+          "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
           GST_TIME_ARGS (stream->segment.position),
           GST_TIME_ARGS (timestamp_end));
       stream->segment.position = timestamp_end;
@@ -637,7 +581,6 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
   }
 
-done:
   return ret;
 }
 
@@ -657,7 +600,9 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
   stream = g_slice_new0 (GstStream);
   stream->transform = self;
   stream->stream_number = self->current_stream_number;
-  stream->stream_finish_cond = g_cond_new ();
+  g_cond_init (&stream->stream_finish_cond);
+  stream->stream_start_seqnum = G_MAXUINT32;
+  stream->segment_seqnum = G_MAXUINT32;
 
   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
@@ -743,14 +688,17 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
         stream->segment.position);
     stop_running_time = MAX (stop_running_time, position_running_time);
 
-    GST_DEBUG_OBJECT (stream->sinkpad,
-        "Stop running time was: %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (stop_running_time));
+    if (stop_running_time > self->group_start_time) {
+      GST_DEBUG_OBJECT (stream->sinkpad,
+          "Updating global start running time from %" GST_TIME_FORMAT " to %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
+          GST_TIME_ARGS (stop_running_time));
 
-    self->group_start_time = MAX (self->group_start_time, stop_running_time);
+      self->group_start_time = stop_running_time;
+    }
   }
 
-  g_cond_free (stream->stream_finish_cond);
+  g_cond_clear (&stream->stream_finish_cond);
   g_slice_free (GstStream, stream);
 
   /* NOTE: In theory we have to check here if all streams
@@ -787,7 +735,7 @@ gst_stream_synchronizer_change_state (GstElement * element,
     GstStateChange transition)
 {
   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
-  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+  GstStateChangeReturn ret;
 
   switch (transition) {
     case GST_STATE_CHANGE_NULL_TO_READY:
@@ -799,9 +747,6 @@ gst_stream_synchronizer_change_state (GstElement * element,
       self->group_start_time = 0;
       self->shutdown = FALSE;
       break;
-    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
-      break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:{
       GList *l;
 
@@ -810,7 +755,7 @@ gst_stream_synchronizer_change_state (GstElement * element,
       GST_STREAM_SYNCHRONIZER_LOCK (self);
       for (l = self->streams; l; l = l->next) {
         GstStream *ostream = l->data;
-        g_cond_broadcast (ostream->stream_finish_cond);
+        g_cond_broadcast (&ostream->stream_finish_cond);
       }
       self->shutdown = TRUE;
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
@@ -819,19 +764,12 @@ gst_stream_synchronizer_change_state (GstElement * element,
       break;
   }
 
-  {
-    GstStateChangeReturn bret;
-
-    bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-    GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
-    if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
-      return ret;
-  }
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
+  if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
+    return ret;
 
   switch (transition) {
-    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
-      break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:{
       GList *l;
 
@@ -855,8 +793,6 @@ gst_stream_synchronizer_change_state (GstElement * element,
       GST_DEBUG_OBJECT (self, "State change READY->NULL");
 
       GST_STREAM_SYNCHRONIZER_LOCK (self);
-      while (self->streams)
-        gst_stream_synchronizer_release_stream (self, self->streams->data);
       self->current_stream_number = 0;
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
       break;
@@ -874,10 +810,7 @@ gst_stream_synchronizer_finalize (GObject * object)
 {
   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
 
-  if (self->lock) {
-    g_mutex_free (self->lock);
-    self->lock = NULL;
-  }
+  g_mutex_clear (&self->lock);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -886,7 +819,7 @@ gst_stream_synchronizer_finalize (GObject * object)
 static void
 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
 {
-  self->lock = g_mutex_new ();
+  g_mutex_init (&self->lock);
 }
 
 static void
index 32b6c69..f7011cd 100644 (file)
@@ -45,7 +45,7 @@ struct _GstStreamSynchronizer
   GstElement parent;
 
   /* < private > */
-  GMutex *lock;
+  GMutex lock;
   gboolean shutdown;
 
   GList *streams;