streamsynchronizer: Implement grouping of streams via the group-id
authorSebastian Dröge <slomo@circular-chaos.org>
Mon, 22 Jul 2013 11:15:09 +0000 (13:15 +0200)
committerSebastian Dröge <slomo@circular-chaos.org>
Mon, 22 Jul 2013 13:24:13 +0000 (15:24 +0200)
https://bugzilla.gnome.org/show_bug.cgi?id=704427
https://bugzilla.gnome.org/show_bug.cgi?id=704408

gst/playback/gststreamsynchronizer.c
gst/playback/gststreamsynchronizer.h

index a5eb53037f826c3887fcf21b788f5ea2fbab57d4..69f163f5e65db07fbef4cc500f67d9cac1bb1444 100644 (file)
@@ -76,6 +76,7 @@ typedef struct
    * default: G_MAXUINT32 */
   guint32 stream_start_seqnum;
   guint32 segment_seqnum;
+  guint group_id;
 } GstStream;
 
 /* Must be called with lock! */
@@ -243,86 +244,119 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
     {
       GstStream *stream, *ostream;
       guint32 seqnum = gst_event_get_seqnum (event);
+      guint group_id;
+      gboolean have_group_id;
       GList *l;
       gboolean all_wait = TRUE;
       gboolean new_stream = TRUE;
 
+      have_group_id = gst_event_parse_group_id (event, &group_id);
+
       GST_STREAM_SYNCHRONIZER_LOCK (self);
+      self->have_group_id &= have_group_id;
+      have_group_id = self->have_group_id;
+
       stream = gst_pad_get_element_private (pad);
-      if (stream && stream->stream_start_seqnum != seqnum) {
+
+      if (!stream) {
+        GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
+        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+        break;
+      }
+
+      if ((have_group_id && stream->group_id != group_id) || (!have_group_id
+              && stream->stream_start_seqnum != seqnum)) {
         stream->is_eos = FALSE;
         stream->stream_start_seqnum = seqnum;
+        stream->group_id = group_id;
         stream->drop_discont = TRUE;
 
-        /* Check if this belongs to a stream that is already there,
-         * e.g. we got the visualizations for an audio stream */
-        for (l = self->streams; l; l = l->next) {
-          ostream = l->data;
+        if (!have_group_id) {
+          /* Check if this belongs to a stream that is already there,
+           * e.g. we got the visualizations for an audio stream */
+          for (l = self->streams; l; l = l->next) {
+            ostream = l->data;
+
+            if (ostream != stream && ostream->stream_start_seqnum == seqnum
+                && !ostream->wait) {
+              new_stream = FALSE;
+              break;
+            }
+          }
 
-          if (ostream != stream && ostream->stream_start_seqnum == seqnum
-              && !ostream->wait) {
-            new_stream = FALSE;
+          if (!new_stream) {
+            GST_DEBUG_OBJECT (pad,
+                "Stream %d belongs to running stream %d, no waiting",
+                stream->stream_number, ostream->stream_number);
+            stream->wait = FALSE;
+            stream->new_stream = FALSE;
+
+            GST_STREAM_SYNCHRONIZER_UNLOCK (self);
             break;
           }
+        } else if (group_id == self->group_id) {
+          GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
+              "no waiting", stream->stream_number, group_id);
+          GST_STREAM_SYNCHRONIZER_UNLOCK (self);
+          break;
         }
 
-        if (!new_stream) {
-          GST_DEBUG_OBJECT (pad,
-              "Stream %d belongs to running stream %d, no waiting",
-              stream->stream_number, ostream->stream_number);
-          stream->wait = FALSE;
-          stream->new_stream = FALSE;
-        } else {
-          GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
+        GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
 
-          stream->wait = TRUE;
-          stream->new_stream = TRUE;
+        stream->wait = TRUE;
+        stream->new_stream = TRUE;
 
-          for (l = self->streams; l; l = l->next) {
-            GstStream *ostream = l->data;
+        for (l = self->streams; l; l = l->next) {
+          GstStream *ostream = l->data;
 
-            all_wait = all_wait && ostream->wait;
-            if (!all_wait)
-              break;
-          }
-          if (all_wait) {
-            gint64 position = 0;
+          all_wait = all_wait && ostream->wait && (!have_group_id
+              || ostream->group_id == group_id);
+          if (!all_wait)
+            break;
+        }
 
+        if (all_wait) {
+          gint64 position = 0;
+
+          if (have_group_id)
+            GST_DEBUG_OBJECT (self,
+                "All streams have changed to group id %u -- unblocking",
+                group_id);
+          else
             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
 
-            for (l = self->streams; l; l = l->next) {
-              GstStream *ostream = l->data;
-              gint64 stop_running_time;
-              gint64 position_running_time;
-
-              ostream->wait = FALSE;
-
-              if (ostream->segment.format == GST_FORMAT_TIME) {
-                stop_running_time =
-                    gst_segment_to_running_time (&ostream->segment,
-                    GST_FORMAT_TIME, ostream->segment.stop);
-                position_running_time =
-                    gst_segment_to_running_time (&ostream->segment,
-                    GST_FORMAT_TIME, ostream->segment.position);
-                position =
-                    MAX (position, MAX (stop_running_time,
-                        position_running_time));
-              }
+          self->group_id = group_id;
+
+          for (l = self->streams; l; l = l->next) {
+            GstStream *ostream = l->data;
+            gint64 stop_running_time;
+            gint64 position_running_time;
+
+            ostream->wait = FALSE;
+
+            if (ostream->segment.format == GST_FORMAT_TIME) {
+              stop_running_time =
+                  gst_segment_to_running_time (&ostream->segment,
+                  GST_FORMAT_TIME, ostream->segment.stop);
+              position_running_time =
+                  gst_segment_to_running_time (&ostream->segment,
+                  GST_FORMAT_TIME, ostream->segment.position);
+              position =
+                  MAX (position, MAX (stop_running_time,
+                      position_running_time));
             }
-            position = MAX (0, position);
-            self->group_start_time = MAX (self->group_start_time, position);
+          }
+          position = MAX (0, position);
+          self->group_start_time = MAX (self->group_start_time, position);
 
-            GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
-                GST_TIME_ARGS (self->group_start_time));
+          GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (self->group_start_time));
 
-            for (l = self->streams; l; l = l->next) {
-              GstStream *ostream = l->data;
-              g_cond_broadcast (&ostream->stream_finish_cond);
-            }
+          for (l = self->streams; l; l = l->next) {
+            GstStream *ostream = l->data;
+            g_cond_broadcast (&ostream->stream_finish_cond);
           }
         }
-      } else {
-        GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
       }
 
       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
@@ -657,6 +691,7 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
   g_cond_init (&stream->stream_finish_cond);
   stream->stream_start_seqnum = G_MAXUINT32;
   stream->segment_seqnum = G_MAXUINT32;
+  stream->group_id = G_MAXUINT;
 
   tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
   stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
@@ -717,6 +752,10 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
     }
   }
   g_assert (l != NULL);
+  if (self->streams == NULL) {
+    self->have_group_id = TRUE;
+    self->group_id = G_MAXUINT;
+  }
 
   /* we can drop the lock, since stream exists now only local.
    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
@@ -799,6 +838,8 @@ gst_stream_synchronizer_change_state (GstElement * element,
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
       self->group_start_time = 0;
+      self->have_group_id = TRUE;
+      self->group_id = G_MAXUINT;
       self->shutdown = FALSE;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:{
index b9993122b66f7b724ef22bfce431a43314ae0506..2f5a3ecded5c4985509f693721ed37b71cf1dd7d 100644 (file)
@@ -52,6 +52,9 @@ struct _GstStreamSynchronizer
   guint current_stream_number;
 
   GstClockTime group_start_time;
+
+  gboolean have_group_id;
+  guint group_id;
 };
 
 struct _GstStreamSynchronizerClass