imagefreeze: Add a live mode
authorSebastian Dröge <sebastian@centricular.com>
Mon, 29 Jun 2020 07:10:09 +0000 (10:10 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 29 Jun 2020 09:07:14 +0000 (12:07 +0300)
Previously imagefreeze would always operate as non-live element and
output frames as fast as possible according to the configured segment
(via SEEK events) and the negotiated framerate from start to stop or the
other way around.

With the new live mode (enabled via the is-live property) it would only
output frames in PLAYING. Frames would be output according to the
negotiated framerate unless it would be too late, in which case it would
jump ahead and skip over the requirement amount of frames.

This makes it possible to actually use imagefreeze in live pipelines
without having to manually ensure somehow that it would start outputting
at the current running time and without still risking to fall behind
without recovery.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/653>

docs/gst_plugins_cache.json
gst/imagefreeze/gstimagefreeze.c
gst/imagefreeze/gstimagefreeze.h

index 7a2de1f..c02e205 100644 (file)
                         "type": "gboolean",
                         "writable": true
                     },
+                    "is-live": {
+                        "blurb": "Whether to output a live video stream",
+                        "conditionally-available": false,
+                        "construct": false,
+                        "construct-only": false,
+                        "controllable": false,
+                        "default": "false",
+                        "mutable": "null",
+                        "readable": true,
+                        "type": "gboolean",
+                        "writable": true
+                    },
                     "num-buffers": {
                         "blurb": "Number of buffers to output before sending EOS (-1 = unlimited)",
                         "conditionally-available": false,
index 873ba4a..12d83b2 100644 (file)
@@ -1,6 +1,7 @@
 /* GStreamer
  * Copyright (c) 2005 Edward Hervey <bilboed@bilboed.com>
  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
+ * Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 
 #define DEFAULT_NUM_BUFFERS     -1
 #define DEFAULT_ALLOW_REPLACE   FALSE
+#define DEFAULT_IS_LIVE         FALSE
 
 enum
 {
   PROP_0,
   PROP_NUM_BUFFERS,
   PROP_ALLOW_REPLACE,
+  PROP_IS_LIVE,
 };
 
 static void gst_image_freeze_finalize (GObject * object);
@@ -61,6 +64,7 @@ static void gst_image_freeze_reset (GstImageFreeze * self);
 
 static GstStateChangeReturn gst_image_freeze_change_state (GstElement * element,
     GstStateChange transition);
+static GstClock *gst_image_freeze_provide_clock (GstElement * element);
 
 static void gst_image_freeze_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -119,8 +123,24 @@ gst_image_freeze_class_init (GstImageFreezeClass * klass)
           "Allow replacing the input buffer and always output the latest",
           DEFAULT_ALLOW_REPLACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstImageFreeze:is-live
+   *
+   * Selects whether the output stream should be a non-live stream based on
+   * the segment configured via a %GST_EVENT_SEEK, or whether the output
+   * stream should be a live stream with the negotiated framerate.
+   *
+   * Since: 1.18
+   */
+  g_object_class_install_property (gobject_class, PROP_IS_LIVE,
+      g_param_spec_boolean ("is-live", "Is Live",
+          "Whether to output a live video stream",
+          DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_image_freeze_change_state);
+  gstelement_class->provide_clock =
+      GST_DEBUG_FUNCPTR (gst_image_freeze_provide_clock);
 
   gst_element_class_set_static_metadata (gstelement_class,
       "Still frame stream generator",
@@ -156,9 +176,11 @@ gst_image_freeze_init (GstImageFreeze * self)
   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
 
   g_mutex_init (&self->lock);
+  g_cond_init (&self->blocked_cond);
 
   self->num_buffers = DEFAULT_NUM_BUFFERS;
   self->allow_replace = DEFAULT_ALLOW_REPLACE;
+  self->is_live = DEFAULT_IS_LIVE;
 
   gst_image_freeze_reset (self);
 }
@@ -173,6 +195,7 @@ gst_image_freeze_finalize (GObject * object)
   gst_image_freeze_reset (self);
 
   g_mutex_clear (&self->lock);
+  g_cond_clear (&self->blocked_cond);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -529,17 +552,30 @@ gst_image_freeze_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       gboolean seekable;
 
       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
-      seekable = (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT);
+      seekable = !self->is_live && (format == GST_FORMAT_TIME
+          || format == GST_FORMAT_DEFAULT);
 
       gst_query_set_seeking (query, format, seekable, (seekable ? 0 : -1), -1);
       ret = TRUE;
       break;
     }
     case GST_QUERY_LATENCY:
-      /* We never run as a live element, even if upstream is live, and never
-       * output any buffers with latency but immediately generate buffers as
-       * fast as we can according to the negotiated framerate */
-      gst_query_set_latency (query, FALSE, 0, GST_CLOCK_TIME_NONE);
+      if (self->is_live) {
+        /* If we run live, we output the buffer without any latency but allow
+         * for at most one frame of latency. If downstream takes longer to
+         * consume out frame we would skip ahead */
+        if (self->fps_n > 0 && self->fps_d > 0)
+          gst_query_set_latency (query, TRUE, 0,
+              gst_util_uint64_scale_ceil (GST_SECOND, self->fps_d,
+                  self->fps_n));
+        else
+          gst_query_set_latency (query, TRUE, 0, GST_CLOCK_TIME_NONE);
+      } else {
+        /* If we don't run live, even if upstream is live, we never output any
+         * buffers with latency but immediately generate buffers as fast as we
+         * can according to the negotiated framerate */
+        gst_query_set_latency (query, FALSE, 0, GST_CLOCK_TIME_NONE);
+      }
       break;
     default:
       ret = FALSE;
@@ -626,6 +662,13 @@ gst_image_freeze_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
       gboolean flush;
       guint32 seqnum;
 
+      if (self->is_live) {
+        GST_ERROR_OBJECT (pad, "Can't seek in live mode");
+        ret = FALSE;
+        gst_event_unref (event);
+        break;
+      }
+
       seqnum = gst_event_get_seqnum (event);
       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
           &stop_type, &stop);
@@ -751,6 +794,9 @@ gst_image_freeze_set_property (GObject * object, guint prop_id,
     case PROP_ALLOW_REPLACE:
       self->allow_replace = g_value_get_boolean (value);
       break;
+    case PROP_IS_LIVE:
+      self->is_live = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -772,6 +818,9 @@ gst_image_freeze_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_ALLOW_REPLACE:
       g_value_set_boolean (value, self->allow_replace);
       break;
+    case PROP_IS_LIVE:
+      g_value_set_boolean (value, self->is_live);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -904,16 +953,97 @@ gst_image_freeze_src_loop (GstPad * pad)
 
   g_mutex_lock (&self->lock);
   offset = self->offset;
+  if (self->is_live) {
+    GstClockTime base_time, clock_time;
+    GstClockTimeDiff jitter;
+    GstClockReturn clock_ret;
+    GstClock *clock;
+
+    /* Wait until the element went to PLAYING or flushing */
+    while (self->blocked && !self->flushing)
+      g_cond_wait (&self->blocked_cond, &self->lock);
+
+    if (self->flushing) {
+      g_mutex_unlock (&self->lock);
+      gst_buffer_unref (buffer);
+      flow_ret = GST_FLOW_FLUSHING;
+      goto pause_task;
+    }
+
+    /* Wait on the clock until the time for our current frame is reached */
+    clock = gst_element_get_clock (GST_ELEMENT (self));
+    base_time = gst_element_get_base_time (GST_ELEMENT (self));
+    if (self->fps_n != 0) {
+      clock_time =
+          base_time + gst_util_uint64_scale (offset, self->fps_d * GST_SECOND,
+          self->fps_n);
+    } else {
+      clock_time = base_time;
+    }
+
+    self->clock_id = gst_clock_new_single_shot_id (clock, clock_time);
+    g_mutex_unlock (&self->lock);
+    GST_TRACE_OBJECT (self,
+        "Waiting for %" GST_TIME_FORMAT ", now %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (clock_time), GST_TIME_ARGS (gst_clock_get_time (clock)));
+    clock_ret = gst_clock_id_wait (self->clock_id, &jitter);
+    GST_TRACE_OBJECT (self,
+        "Waited for %" GST_TIME_FORMAT ", clock ret %d, jitter %"
+        GST_STIME_FORMAT, GST_TIME_ARGS (clock_time), clock_ret,
+        GST_STIME_ARGS (jitter));
+    g_mutex_lock (&self->lock);
+    gst_clock_id_unref (self->clock_id);
+    self->clock_id = NULL;
+    gst_object_unref (clock);
+
+    if (self->flushing || clock_ret == GST_CLOCK_UNSCHEDULED) {
+      g_mutex_unlock (&self->lock);
+      gst_buffer_unref (buffer);
+      flow_ret = GST_FLOW_FLUSHING;
+      goto pause_task;
+    }
+
+    /* If we were late, adjust our offset and jump ahead if needed */
+    if (self->fps_n != 0) {
+      if (jitter > 0) {
+        guint64 new_offset =
+            gst_util_uint64_scale (clock_time + jitter - base_time, self->fps_n,
+            self->fps_d * GST_SECOND);
+
+        if (new_offset != offset) {
+          GST_INFO_OBJECT (self,
+              "Late by %" GST_TIME_FORMAT ", old offset %" G_GUINT64_FORMAT
+              ", new offset %" G_GUINT64_FORMAT, GST_TIME_ARGS (jitter), offset,
+              new_offset);
+          self->offset = offset = new_offset;
+        }
+      }
 
-  if (self->fps_n != 0) {
-    timestamp =
-        gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, self->fps_n);
-    timestamp_end =
-        gst_util_uint64_scale (offset + 1, self->fps_d * GST_SECOND,
-        self->fps_n);
+      timestamp =
+          gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, self->fps_n);
+      timestamp_end =
+          gst_util_uint64_scale (offset + 1, self->fps_d * GST_SECOND,
+          self->fps_n);
+    } else {
+      /* If we have no framerate then we output a single frame now */
+      if (jitter > 0)
+        timestamp = jitter;
+      else
+        timestamp = 0;
+
+      timestamp_end = GST_CLOCK_TIME_NONE;
+    }
   } else {
-    timestamp = self->segment.start;
-    timestamp_end = GST_CLOCK_TIME_NONE;
+    if (self->fps_n != 0) {
+      timestamp =
+          gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, self->fps_n);
+      timestamp_end =
+          gst_util_uint64_scale (offset + 1, self->fps_d * GST_SECOND,
+          self->fps_n);
+    } else {
+      timestamp = self->segment.start;
+      timestamp_end = GST_CLOCK_TIME_NONE;
+    }
   }
 
   eos = (self->fps_n == 0 && offset > 0) ||
@@ -1022,17 +1152,36 @@ gst_image_freeze_change_state (GstElement * element, GstStateChange transition)
 {
   GstImageFreeze *self = GST_IMAGE_FREEZE (element);
   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+  gboolean no_preroll = FALSE;
 
   switch (transition) {
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       gst_image_freeze_reset (self);
       g_mutex_lock (&self->lock);
       self->flushing = FALSE;
+      self->blocked = TRUE;
+      g_mutex_unlock (&self->lock);
+      if (self->is_live)
+        no_preroll = TRUE;
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      g_mutex_lock (&self->lock);
+      self->blocked = FALSE;
+      g_cond_signal (&self->blocked_cond);
       g_mutex_unlock (&self->lock);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_pad_stop_task (self->srcpad);
+      g_mutex_lock (&self->lock);
+      self->flushing = TRUE;
+      if (self->clock_id) {
+        GST_DEBUG_OBJECT (self, "unlock clock wait");
+        gst_clock_id_unschedule (self->clock_id);
+      }
+      self->blocked = FALSE;
+      g_cond_signal (&self->blocked_cond);
+      g_mutex_unlock (&self->lock);
       gst_image_freeze_reset (self);
+      gst_pad_stop_task (self->srcpad);
       break;
     default:
       break;
@@ -1042,13 +1191,30 @@ gst_image_freeze_change_state (GstElement * element, GstStateChange transition)
     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
   switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      g_mutex_lock (&self->lock);
+      self->blocked = TRUE;
+      g_mutex_unlock (&self->lock);
+      if (self->is_live)
+        no_preroll = TRUE;
+      break;
     default:
       break;
   }
 
+  if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
+    ret = GST_STATE_CHANGE_NO_PREROLL;
+
   return ret;
 }
 
+/* FIXME: GStreamer 2.0 */
+static GstClock *
+gst_image_freeze_provide_clock (GstElement * element)
+{
+  return gst_system_clock_obtain ();
+}
+
 static gboolean
 plugin_init (GstPlugin * plugin)
 {
index 08af41f..d23feaa 100644 (file)
@@ -63,6 +63,11 @@ struct _GstImageFreeze
 
   gboolean allow_replace;
 
+  gboolean is_live;
+  gboolean blocked;
+  GCond blocked_cond;
+  GstClockID clock_id;
+
   guint64 offset;
 
   gboolean flushing;