clocksync: Add new clocksync element
authorJan Schmidt <jan@centricular.com>
Mon, 24 Feb 2020 17:47:35 +0000 (04:47 +1100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 26 Feb 2020 16:36:29 +0000 (16:36 +0000)
The clocksync element is a generic element that can be
placed in a pipeline to synchronise passing buffers to the
clock at that point. This is similar to 'identity sync=true',
but because it isn't GstBaseTransform-based, it can process
GstBufferLists without breaking them into separate GstBuffers

docs/plugins/gst_plugins_cache.json
plugins/elements/gstclocksync.c [new file with mode: 0644]
plugins/elements/gstclocksync.h [new file with mode: 0644]
plugins/elements/gstelements.c
plugins/elements/meson.build
tests/check/elements/clocksync.c [new file with mode: 0644]
tests/check/meson.build

index 716fbeb5c3f1e6d8825b51dc599ef79fc7eee59f..9ec2ca252c7619c952146df2ab9a62d8d8b3de39 100644 (file)
                 },
                 "rank": "none"
             },
+            "clocksync": {
+                "author": "Jan Schmidt <jan@centricular.com>",
+                "description": "Synchronise buffers to the clock",
+                "hierarchy": [
+                    "GstClockSync",
+                    "GstElement",
+                    "GstObject",
+                    "GInitiallyUnowned",
+                    "GObject"
+                ],
+                "klass": "Generic",
+                "long-name": "ClockSync",
+                "pad-templates": {
+                    "sink": {
+                        "caps": "ANY",
+                        "direction": "sink",
+                        "presence": "always"
+                    },
+                    "src": {
+                        "caps": "ANY",
+                        "direction": "src",
+                        "presence": "always"
+                    }
+                },
+                "properties": {
+                    "signal-handoffs": {
+                        "blurb": "Send a signal before pushing the buffer",
+                        "construct": false,
+                        "construct-only": false,
+                        "default": "false",
+                        "type-name": "gboolean",
+                        "writable": true
+                    },
+                    "sync": {
+                        "blurb": "Synchronize to pipeline clock",
+                        "construct": false,
+                        "construct-only": false,
+                        "default": "true",
+                        "type-name": "gboolean",
+                        "writable": true
+                    },
+                    "ts-offset": {
+                        "blurb": "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync",
+                        "construct": false,
+                        "construct-only": false,
+                        "default": "0",
+                        "max": "9223372036854775807",
+                        "min": "-9223372036854775808",
+                        "type-name": "gint64",
+                        "writable": true
+                    }
+                },
+                "rank": "none",
+                "signals": {
+                    "handoff": {
+                        "args": [
+                            "GstBuffer"
+                        ],
+                        "retval": "void"
+                    },
+                    "handoff-list": {
+                        "args": [
+                            "GstBufferList"
+                        ],
+                        "retval": "void"
+                    }
+                }
+            },
             "concat": {
                 "author": "Sebastian Dr\u00f6ge <sebastian@centricular.com>",
                 "description": "Concatenate multiple streams",
diff --git a/plugins/elements/gstclocksync.c b/plugins/elements/gstclocksync.c
new file mode 100644 (file)
index 0000000..fb5efbe
--- /dev/null
@@ -0,0 +1,583 @@
+/*
+ * GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ *                    2000 Wim Taymans <wtay@chello.be>
+ *                    2005 Wim Taymans <wim@fluendo.com>
+ * Copyright (C) 2020 Jan Schmidt <jan@centricular.com>
+ *
+ * gstclocksync.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-clocksync
+ * @title: clocksync
+ *
+ * Simple element that passes all buffers and buffer-lists intact, but
+ * synchronising them to the clock before passing.
+ *
+ * Synchronisation to the clock is on by default, but can be turned
+ * off by setting the 'sync' property to FALSE
+ *
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * gst-launch -v -m videotestsrc ! clocksync ! fakesink silent=TRUE
+ * ]|
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <config.h>
+#endif
+
+#include <gst/gst.h>
+
+#include "gstclocksync.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_clock_sync_debug);
+#define GST_CAT_DEFAULT gst_clock_sync_debug
+
+/* ClockSync signals and args */
+enum
+{
+  SIGNAL_HANDOFF,
+  SIGNAL_HANDOFF_LIST,
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+#define DEFAULT_SYNC                    TRUE
+#define DEFAULT_TS_OFFSET               0
+#define DEFAULT_SIGNAL_HANDOFFS         FALSE
+
+enum
+{
+  PROP_0,
+  PROP_SYNC,
+  PROP_TS_OFFSET,
+  PROP_SIGNAL_HANDOFFS,
+};
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("ANY")
+    );
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS ("ANY")
+    );
+
+#define _do_init \
+    GST_DEBUG_CATEGORY_INIT (gst_clock_sync_debug, "clocksync", 0, "clocksync element");
+#define gst_clock_sync_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstClockSync, gst_clock_sync, GST_TYPE_ELEMENT,
+    _do_init);
+
+static void gst_clock_sync_finalize (GObject * object);
+
+static void gst_clock_sync_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_clock_sync_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+static gboolean gst_clock_sync_sink_event (GstPad * pad, GstObject * parent,
+    GstEvent * event);
+static GstFlowReturn gst_clock_sync_chain (GstPad * pad, GstObject * parent,
+    GstBuffer * buf);
+static GstFlowReturn gst_clock_sync_chain_list (GstPad * pad,
+    GstObject * parent, GstBufferList * buflist);
+static gboolean gst_clock_sync_src_query (GstPad * pad, GstObject * parent,
+    GstQuery * query);
+static GstStateChangeReturn gst_clocksync_change_state (GstElement * element,
+    GstStateChange transition);
+
+static guint gst_clocksync_signals[LAST_SIGNAL] = { 0 };
+
+static void
+gst_clock_sync_class_init (GstClockSyncClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  gobject_class->set_property = gst_clock_sync_set_property;
+  gobject_class->get_property = gst_clock_sync_get_property;
+  gobject_class->finalize = gst_clock_sync_finalize;
+
+  g_object_class_install_property (gobject_class, PROP_SYNC,
+      g_param_spec_boolean ("sync", "Synchronize",
+          "Synchronize to pipeline clock", DEFAULT_SYNC,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
+      g_param_spec_int64 ("ts-offset", "Timestamp offset for synchronisation",
+          "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync",
+          G_MININT64, G_MAXINT64, DEFAULT_TS_OFFSET,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstClockSync::signal-handoffs
+   *
+   * If set to %TRUE, the clocksync will emit a handoff signal when handling a buffer.
+   * When set to %FALSE, no signal will be emitted, which might improve performance.
+   */
+  g_object_class_install_property (gobject_class, PROP_SIGNAL_HANDOFFS,
+      g_param_spec_boolean ("signal-handoffs",
+          "Signal handoffs", "Send a signal before pushing the buffer",
+          DEFAULT_SIGNAL_HANDOFFS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstClockSync::handoff:
+   * @clocksync: the clocksync instance
+   * @buffer: the buffer that just has been received
+   * @pad: the pad that received it
+   *
+   * This signal gets emitted before passing the buffer downstream.
+   */
+  gst_clocksync_signals[SIGNAL_HANDOFF] =
+      g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstClockSyncClass, handoff), NULL, NULL,
+      NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
+  /**
+   * GstClockSync::handoff-list:
+   * @clocksync: the clocksync instance
+   * @buffer_list: the buffer list that just has been received
+   * @pad: the pad that received it
+   *
+   * This signal gets emitted before passing the buffer list downstream.
+   */
+  gst_clocksync_signals[SIGNAL_HANDOFF_LIST] =
+      g_signal_new ("handoff-list", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstClockSyncClass, handoff_list),
+      NULL, NULL, NULL, G_TYPE_NONE, 1,
+      GST_TYPE_BUFFER_LIST | G_SIGNAL_TYPE_STATIC_SCOPE);
+
+  gstelement_class->change_state =
+      GST_DEBUG_FUNCPTR (gst_clocksync_change_state);
+
+  gst_element_class_set_static_metadata (gstelement_class,
+      "ClockSync",
+      "Generic",
+      "Synchronise buffers to the clock", "Jan Schmidt <jan@centricular.com>");
+
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
+}
+
+static void
+gst_clock_sync_finalize (GObject * object)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (object);
+
+  g_cond_clear (&clocksync->blocked_cond);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_clock_sync_init (GstClockSync * clocksync)
+{
+  clocksync->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
+  gst_pad_set_event_function (clocksync->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_clock_sync_sink_event));
+  gst_pad_set_chain_function (clocksync->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_clock_sync_chain));
+  gst_pad_set_chain_list_function (clocksync->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_clock_sync_chain_list));
+  GST_PAD_SET_PROXY_CAPS (clocksync->sinkpad);
+  gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->sinkpad);
+
+  clocksync->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
+
+  gst_pad_set_query_function (clocksync->srcpad, gst_clock_sync_src_query);
+
+  GST_PAD_SET_PROXY_CAPS (clocksync->srcpad);
+  gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->srcpad);
+
+  clocksync->signal_handoffs = DEFAULT_SIGNAL_HANDOFFS;
+  clocksync->ts_offset = DEFAULT_TS_OFFSET;
+  clocksync->sync = DEFAULT_SYNC;
+  g_cond_init (&clocksync->blocked_cond);
+}
+
+static void
+gst_clock_sync_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (object);
+
+  switch (prop_id) {
+    case PROP_SYNC:
+      clocksync->sync = g_value_get_boolean (value);
+      break;
+    case PROP_TS_OFFSET:
+      clocksync->ts_offset = g_value_get_int64 (value);
+      break;
+    case PROP_SIGNAL_HANDOFFS:
+      clocksync->signal_handoffs = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_clock_sync_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (object);
+
+  switch (prop_id) {
+    case PROP_SYNC:
+      g_value_set_boolean (value, clocksync->sync);
+      break;
+    case PROP_TS_OFFSET:
+      g_value_set_int64 (value, clocksync->ts_offset);
+      break;
+    case PROP_SIGNAL_HANDOFFS:
+      g_value_set_boolean (value, clocksync->signal_handoffs);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static GstFlowReturn
+gst_clocksync_do_sync (GstClockSync * clocksync, GstClockTime running_time)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstClock *clock;
+
+  if (!clocksync->sync)
+    return GST_FLOW_OK;
+
+  if (running_time == GST_CLOCK_TIME_NONE)
+    return GST_FLOW_OK;         /* Can't sync on an invalid time either way */
+
+  if (clocksync->segment.format != GST_FORMAT_TIME)
+    return GST_FLOW_OK;
+
+  GST_OBJECT_LOCK (clocksync);
+
+  if (clocksync->flushing) {
+    GST_OBJECT_UNLOCK (clocksync);
+    return GST_FLOW_FLUSHING;
+  }
+
+  while (clocksync->blocked && !clocksync->flushing)
+    g_cond_wait (&clocksync->blocked_cond, GST_OBJECT_GET_LOCK (clocksync));
+
+  if (clocksync->flushing) {
+    GST_OBJECT_UNLOCK (clocksync);
+    return GST_FLOW_FLUSHING;
+  }
+
+  if ((clock = GST_ELEMENT (clocksync)->clock)) {
+    GstClockReturn cret;
+    GstClockTime timestamp;
+    GstClockTimeDiff ts_offset = clocksync->ts_offset;
+
+    timestamp = running_time + GST_ELEMENT (clocksync)->base_time +
+        clocksync->upstream_latency;
+    if (ts_offset < 0) {
+      ts_offset = -ts_offset;
+      if (ts_offset < timestamp)
+        timestamp -= ts_offset;
+      else
+        timestamp = 0;
+    } else
+      timestamp += ts_offset;
+
+    /* save id if we need to unlock */
+    clocksync->clock_id = gst_clock_new_single_shot_id (clock, timestamp);
+    GST_OBJECT_UNLOCK (clocksync);
+
+    cret = gst_clock_id_wait (clocksync->clock_id, NULL);
+
+    GST_OBJECT_LOCK (clocksync);
+    if (clocksync->clock_id) {
+      gst_clock_id_unref (clocksync->clock_id);
+      clocksync->clock_id = NULL;
+    }
+    if (cret == GST_CLOCK_UNSCHEDULED || clocksync->flushing)
+      ret = GST_FLOW_FLUSHING;
+  }
+  GST_OBJECT_UNLOCK (clocksync);
+
+  return ret;
+}
+
+static gboolean
+gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+  gboolean ret;
+
+  GST_LOG_OBJECT (clocksync, "Received %s event: %" GST_PTR_FORMAT,
+      GST_EVENT_TYPE_NAME (event), event);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEGMENT:
+      /* store the event for synching */
+      gst_event_copy_segment (event, &clocksync->segment);
+      break;
+    case GST_EVENT_GAP:
+    {
+      GstClockTime start, dur;
+
+      if (clocksync->segment.format != GST_FORMAT_TIME)
+        break;
+
+      gst_event_parse_gap (event, &start, &dur);
+      if (GST_CLOCK_TIME_IS_VALID (start)) {
+        start = gst_segment_to_running_time (&clocksync->segment,
+            GST_FORMAT_TIME, start);
+
+        gst_clocksync_do_sync (clocksync, start);
+      }
+      break;
+    }
+    case GST_EVENT_FLUSH_START:
+      GST_OBJECT_LOCK (clocksync);
+      clocksync->flushing = TRUE;
+      g_cond_broadcast (&clocksync->blocked_cond);
+      if (clocksync->clock_id) {
+        GST_DEBUG_OBJECT (clocksync, "unlock clock wait");
+        gst_clock_id_unschedule (clocksync->clock_id);
+      }
+      GST_OBJECT_UNLOCK (clocksync);
+      break;
+    case GST_EVENT_FLUSH_STOP:
+      GST_OBJECT_LOCK (clocksync);
+      clocksync->flushing = FALSE;
+      gst_segment_init (&clocksync->segment, GST_FORMAT_UNDEFINED);
+      GST_OBJECT_UNLOCK (clocksync);
+    default:
+      break;
+  }
+
+  /* Always handle all events as normal: */
+  ret = gst_pad_event_default (pad, parent, event);
+  return ret;
+}
+
+static GstFlowReturn
+gst_clock_sync_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  GST_LOG_OBJECT (clocksync, "Handling buffer %" GST_PTR_FORMAT, buf);
+
+  if (clocksync->segment.format == GST_FORMAT_TIME) {
+    GstClockTime runtimestamp = 0;
+    GstClockTime rundts, runpts;
+
+    rundts = gst_segment_to_running_time (&clocksync->segment,
+        GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
+    runpts = gst_segment_to_running_time (&clocksync->segment,
+        GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
+
+    if (GST_CLOCK_TIME_IS_VALID (rundts))
+      runtimestamp = rundts;
+    else if (GST_CLOCK_TIME_IS_VALID (runpts))
+      runtimestamp = runpts;
+
+    ret = gst_clocksync_do_sync (clocksync, runtimestamp);
+    if (ret != GST_FLOW_OK) {
+      GST_LOG_OBJECT (clocksync,
+          "Interrupted while waiting on the clock. Dropping buffer.");
+      gst_buffer_unref (buf);
+      return ret;
+    }
+  }
+
+  if (clocksync->signal_handoffs)
+    g_signal_emit (clocksync, gst_clocksync_signals[SIGNAL_HANDOFF], 0, buf);
+
+  /* Forward the buffer */
+  return gst_pad_push (clocksync->srcpad, buf);
+}
+
+static GstFlowReturn
+gst_clock_sync_chain_list (GstPad * pad, GstObject * parent,
+    GstBufferList * buffer_list)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstBuffer *buf;
+
+  GST_LOG_OBJECT (clocksync, "Handling buffer list %" GST_PTR_FORMAT,
+      buffer_list);
+
+  if (gst_buffer_list_length (buffer_list) == 0)
+    goto done;
+
+  buf = gst_buffer_list_get (buffer_list, 0);
+
+  if (clocksync->segment.format == GST_FORMAT_TIME) {
+    GstClockTime runtimestamp = 0;
+    GstClockTime rundts, runpts;
+
+    rundts = gst_segment_to_running_time (&clocksync->segment,
+        GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
+    runpts = gst_segment_to_running_time (&clocksync->segment,
+        GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
+
+    if (GST_CLOCK_TIME_IS_VALID (rundts))
+      runtimestamp = rundts;
+    else if (GST_CLOCK_TIME_IS_VALID (runpts))
+      runtimestamp = runpts;
+
+    ret = gst_clocksync_do_sync (clocksync, runtimestamp);
+    if (ret != GST_FLOW_OK) {
+      gst_buffer_list_unref (buffer_list);
+      return ret;
+    }
+  }
+
+  if (clocksync->signal_handoffs)
+    g_signal_emit (clocksync, gst_clocksync_signals[SIGNAL_HANDOFF_LIST], 0,
+        buffer_list);
+
+  /* Forward the buffer list */
+done:
+  return gst_pad_push_list (clocksync->srcpad, buffer_list);
+}
+
+static gboolean
+gst_clock_sync_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+  gboolean res;
+
+  res = gst_pad_query_default (pad, parent, query);
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_LATENCY:{
+      gboolean live = FALSE;
+      GstClockTime min = 0, max = 0;
+
+      if (res) {
+        gst_query_parse_latency (query, &live, &min, &max);
+
+        if (clocksync->sync && max < min) {
+          GST_ELEMENT_WARNING (parent, CORE, CLOCK, (NULL),
+              ("Impossible to configure latency upstream of clocksync sync=true:"
+                  " max %" GST_TIME_FORMAT " < min %"
+                  GST_TIME_FORMAT ". Add queues or other buffering elements.",
+                  GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
+        }
+      }
+
+      /* Ignore the upstream latency if it is not live */
+      GST_OBJECT_LOCK (clocksync);
+      if (live)
+        clocksync->upstream_latency = min;
+      else {
+        clocksync->upstream_latency = 0;
+        /* if upstream is non-live source, then there is no
+         * limit on the maximum latency */
+        max = -1;
+      }
+
+      GST_OBJECT_UNLOCK (clocksync);
+
+      GST_DEBUG_OBJECT (clocksync,
+          "Configured upstream latency = %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (clocksync->upstream_latency));
+
+      gst_query_set_latency (query, live || clocksync->sync, min, max);
+      break;
+    }
+    default:
+      break;
+  }
+
+  return res;
+}
+
+static GstStateChangeReturn
+gst_clocksync_change_state (GstElement * element, GstStateChange transition)
+{
+  GstClockSync *clocksync = GST_CLOCKSYNC (element);
+  GstStateChangeReturn ret;
+  gboolean no_preroll = FALSE;
+
+  switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      GST_OBJECT_LOCK (clocksync);
+      clocksync->flushing = FALSE;
+      clocksync->blocked = TRUE;
+      GST_OBJECT_UNLOCK (clocksync);
+      if (clocksync->sync)
+        no_preroll = TRUE;
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      GST_OBJECT_LOCK (clocksync);
+      clocksync->blocked = FALSE;
+      g_cond_broadcast (&clocksync->blocked_cond);
+      GST_OBJECT_UNLOCK (clocksync);
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      GST_OBJECT_LOCK (clocksync);
+      clocksync->flushing = TRUE;
+      if (clocksync->clock_id) {
+        GST_DEBUG_OBJECT (clocksync, "unlock clock wait");
+        gst_clock_id_unschedule (clocksync->clock_id);
+      }
+      clocksync->blocked = FALSE;
+      g_cond_broadcast (&clocksync->blocked_cond);
+      GST_OBJECT_UNLOCK (clocksync);
+      break;
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      GST_OBJECT_LOCK (clocksync);
+      clocksync->upstream_latency = 0;
+      clocksync->blocked = TRUE;
+      GST_OBJECT_UNLOCK (clocksync);
+      if (clocksync->sync)
+        no_preroll = TRUE;
+      break;
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
+      break;
+    default:
+      break;
+  }
+
+  if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
+    ret = GST_STATE_CHANGE_NO_PREROLL;
+
+  return ret;
+}
diff --git a/plugins/elements/gstclocksync.h b/plugins/elements/gstclocksync.h
new file mode 100644 (file)
index 0000000..44ab48e
--- /dev/null
@@ -0,0 +1,81 @@
+/* GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ *                    2000 Wim Taymans <wtay@chello.be>
+ *                    2005 Wim Taymans <wim@fluendo.com>
+ * Copyright (C) 2020 Jan Schmidt <jan@centricular.com>
+ *
+ * gstclocksync.h:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_CLOCKSYNC_H__
+#define __GST_CLOCKSYNC_H__
+
+#include <gst/gst.h>
+
+G_BEGIN_DECLS
+
+/* #defines don't like whitespacey bits */
+#define GST_TYPE_CLOCKSYNC \
+  (gst_clock_sync_get_type())
+#define GST_CLOCKSYNC(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_CLOCKSYNC,GstClockSync))
+#define GST_CLOCKSYNC_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_CLOCKSYNC,GstClockSyncClass))
+#define GST_IS_CLOCKSYNC(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CLOCKSYNC))
+#define GST_IS_CLOCKSYNC_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_CLOCKSYNC))
+
+typedef struct _GstClockSync      GstClockSync;
+typedef struct _GstClockSyncClass GstClockSyncClass;
+
+struct _GstClockSync
+{
+  GstElement element;
+
+  /*< private >*/
+  GstPad *sinkpad, *srcpad;
+
+  gboolean       signal_handoffs;
+
+  GstSegment     segment;
+  GstClockID     clock_id;
+  gboolean       flushing;
+  gboolean          sync;
+
+  GCond          blocked_cond;
+  gboolean       blocked;
+  GstClockTimeDiff  ts_offset;
+
+  GstClockTime   upstream_latency;
+};
+
+struct _GstClockSyncClass 
+{
+  GstElementClass parent_class;
+
+  /* signals */
+  void (*handoff) (GstElement *element, GstBuffer *buf);
+  void (*handoff_list) (GstElement *element, GstBufferList *buffer_list);
+};
+
+GType gst_clock_sync_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_CLOCKSYNC_H__ */
index f0324392a005e575278ed9e5ed4002d959baa1d8..5ed328c2a5f1b3163dde4d4ef9b2f6b5b876af4e 100644 (file)
@@ -32,6 +32,7 @@
 #include <gst/gst.h>
 
 #include "gstcapsfilter.h"
+#include "gstclocksync.h"
 #include "gstconcat.h"
 #include "gstdataurisrc.h"
 #include "gstdownloadbuffer.h"
@@ -59,6 +60,9 @@ plugin_init (GstPlugin * plugin)
   if (!gst_element_register (plugin, "capsfilter", GST_RANK_NONE,
           gst_capsfilter_get_type ()))
     return FALSE;
+  if (!gst_element_register (plugin, "clocksync", GST_RANK_NONE,
+          gst_clock_sync_get_type ()))
+    return FALSE;
   if (!gst_element_register (plugin, "concat", GST_RANK_NONE,
           gst_concat_get_type ()))
     return FALSE;
index bb0bca35b6d4de31661bafb118ce61ee9d3957dc..c1d91d2f3406e40c4bd50ce07d76904b391406b9 100644 (file)
@@ -1,5 +1,6 @@
 gst_elements_sources = [
   'gstcapsfilter.c',
+  'gstclocksync.c',
   'gstconcat.c',
   'gstdataurisrc.c',
   'gstdownloadbuffer.c',
diff --git a/tests/check/elements/clocksync.c b/tests/check/elements/clocksync.c
new file mode 100644 (file)
index 0000000..c9d4fb6
--- /dev/null
@@ -0,0 +1,235 @@
+/* GStreamer
+ *
+ * unit test for clocksync
+ *
+ * Copyright (C) <2005> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2015> Havard Graff           <havard@pexip.com>
+ * Copyright (C) <2020> Jan Schmidt            <jan@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/check/gstcheck.h>
+#include <gst/check/gstharness.h>
+
+GST_START_TEST (test_one_buffer)
+{
+  GstHarness *h = gst_harness_new_parse ("clocksync sync=false");
+  GstBuffer *buffer_in;
+  GstBuffer *buffer_out;
+
+  gst_harness_set_src_caps_str (h, "mycaps");
+
+  buffer_in = gst_buffer_new_and_alloc (4);
+  ASSERT_BUFFER_REFCOUNT (buffer_in, "buffer", 1);
+
+  gst_buffer_fill (buffer_in, 0, "data", 4);
+
+  /* pushing gives away my reference ... */
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in));
+
+  /* ... but it should end up being collected on GstHarness queue */
+  fail_unless_equals_int (1, gst_harness_buffers_in_queue (h));
+  buffer_out = gst_harness_pull (h);
+
+  fail_unless (buffer_in == buffer_out);
+  ASSERT_BUFFER_REFCOUNT (buffer_out, "buffer", 1);
+
+  /* cleanup */
+  gst_buffer_unref (buffer_out);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+static void
+handoff_func (GstElement * clocksync, GstBuffer * buf, GstBuffer ** ret)
+{
+  (void) clocksync;
+  *ret = buf;
+}
+
+GST_START_TEST (test_signal_handoffs)
+{
+  GstHarness *h = gst_harness_new_parse ("clocksync sync=false name=c");
+  GstBuffer *buffer_in;
+  GstBuffer *buffer_signaled = NULL;
+  GstElement *c = gst_bin_get_by_name (GST_BIN (h->element), "c");
+
+  gst_harness_set_src_caps_str (h, "mycaps");
+
+  /* connect to the handoff signal */
+  g_signal_connect (c, "handoff", G_CALLBACK (handoff_func), &buffer_signaled);
+
+  /* first, turn off signal-handoffs */
+  g_object_set (c, "signal-handoffs", FALSE, NULL);
+
+  /* then push a buffer */
+  buffer_in = gst_buffer_new_and_alloc (4);
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in));
+
+  /* verify that we got no buffer signaled */
+  fail_unless (buffer_signaled == NULL);
+
+  /* now turn on signal-handoffs */
+  g_object_set (c, "signal-handoffs", TRUE, NULL);
+
+  /* then push another buffer */
+  buffer_in = gst_buffer_new_and_alloc (4);
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in));
+
+  /* verify the buffer signaled is equal to the one pushed in */
+  fail_unless (buffer_signaled == buffer_in);
+  ASSERT_BUFFER_REFCOUNT (buffer_signaled, "buffer", 1);
+
+  /* cleanup */
+  gst_object_unref (c);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_sync_on_timestamp)
+{
+  /* the reason to use the queue in front of the clocksync element
+     is to effectively make gst_harness_push asynchronous, not locking
+     up the test, waiting for gst_clock_id_wait */
+  GstHarness *h = gst_harness_new_parse ("queue ! clocksync");
+  GstBuffer *buf;
+  GstClock *clock;
+  GstClockTime timestamp = 123456789;
+
+  /* use testclock */
+  gst_harness_use_testclock (h);
+  gst_harness_set_src_caps_str (h, "mycaps");
+
+  /* make a buffer and set the timestamp */
+  buf = gst_buffer_new ();
+  GST_BUFFER_PTS (buf) = timestamp;
+
+  /* push the buffer, and verify it does *not* make it through */
+  gst_harness_push (h, buf);
+  fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+
+  /* verify the clocksync element has registered exactly one GstClockID */
+  fail_unless (gst_harness_wait_for_clock_id_waits (h, 1, 42));
+
+  /* crank the clock and pull the buffer */
+  gst_harness_crank_single_clock_wait (h);
+  buf = gst_harness_pull (h);
+
+  /* verify that the buffer has the right timestamp, and that the time on
+     the clock is equal to the timestamp */
+  fail_unless_equals_int64 (timestamp, GST_BUFFER_PTS (buf));
+  clock = gst_element_get_clock (h->element);
+  fail_unless_equals_int64 (timestamp, gst_clock_get_time (clock));
+
+  /* cleanup */
+  gst_object_unref (clock);
+  gst_buffer_unref (buf);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_no_sync_on_timestamp)
+{
+  GstHarness *h = gst_harness_new_parse ("clocksync sync=false");
+  GstBuffer *buf;
+  GstClockTime timestamp = 123456789;
+
+  /* use testclock */
+  gst_harness_use_testclock (h);
+  gst_harness_set_src_caps_str (h, "mycaps");
+
+  /* make a buffer and set the timestamp */
+  buf = gst_buffer_new ();
+  GST_BUFFER_PTS (buf) = timestamp;
+
+  /* push the buffer, and verify it was forwarded immediately */
+  gst_harness_push (h, buf);
+  fail_unless_equals_int (1, gst_harness_buffers_in_queue (h));
+
+  buf = gst_harness_pull (h);
+  /* verify that the buffer has the right timestamp */
+  fail_unless_equals_int64 (timestamp, GST_BUFFER_PTS (buf));
+
+  /* cleanup */
+  gst_buffer_unref (buf);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_stopping_element_unschedules_sync)
+{
+  /* the reason to use the queue in front of the clocksync element
+     is to effectively make gst_harness_push asynchronous, not locking
+     up the test, waiting for gst_clock_id_wait */
+  GstHarness *h = gst_harness_new_parse ("queue ! clocksync sync=true");
+  GstBuffer *buf;
+  GstClockTime timestamp = 123456789;
+
+  /* use testclock */
+  gst_harness_use_testclock (h);
+  gst_harness_set_src_caps_str (h, "mycaps");
+
+  /* make a buffer and set the timestamp */
+  buf = gst_buffer_new ();
+  GST_BUFFER_PTS (buf) = timestamp;
+
+  /* push the buffer, and verify it does *not* make it through */
+  gst_harness_push (h, buf);
+  fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+
+  /* verify the clocksync element has registered exactly one GstClockID */
+  fail_unless (gst_harness_wait_for_clock_id_waits (h, 1, 42));
+
+  /* setting clocksync to READY should unschedule the sync */
+  gst_element_set_state (h->element, GST_STATE_READY);
+
+  /* verify the clocksync element no longer waits on the clock */
+  fail_unless (gst_harness_wait_for_clock_id_waits (h, 0, 42));
+
+  /* and that the waiting buffer was dropped */
+  fail_unless_equals_int (0, gst_harness_buffers_received (h));
+
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+static Suite *
+clocksync_suite (void)
+{
+  Suite *s = suite_create ("clocksync");
+  TCase *tc_chain = tcase_create ("general");
+
+  suite_add_tcase (s, tc_chain);
+  tcase_add_test (tc_chain, test_one_buffer);
+  tcase_add_test (tc_chain, test_signal_handoffs);
+  tcase_add_test (tc_chain, test_sync_on_timestamp);
+  tcase_add_test (tc_chain, test_stopping_element_unschedules_sync);
+  tcase_add_test (tc_chain, test_no_sync_on_timestamp);
+
+
+  return s;
+}
+
+GST_CHECK_MAIN (clocksync);
index 2372931cb59222249836931176d6dec55353c8bf..b47325bce4716752134ef7de3ea4690b27c697a6 100644 (file)
@@ -78,6 +78,7 @@ core_tests = [
   [ 'libs/typefindhelper.c' ],
   [ 'libs/queuearray.c' ],
   [ 'elements/capsfilter.c', not gst_registry ],
+  [ 'elements/clocksync.c', not gst_registry or not gst_parse ],
   [ 'elements/concat.c', not gst_registry ],
   [ 'elements/dataurisrc.c', not gst_registry ],
   [ 'elements/fakesrc.c', not gst_registry ],