--- /dev/null
+/*
+ * GStreamer
+ * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
+ * 2000 Wim Taymans <wtay@chello.be>
+ * 2005 Wim Taymans <wim@fluendo.com>
+ * Copyright (C) 2020 Jan Schmidt <jan@centricular.com>
+ *
+ * gstclocksync.c:
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-clocksync
+ * @title: clocksync
+ *
+ * Simple element that passes all buffers and buffer-lists intact, but
+ * synchronising them to the clock before passing.
+ *
+ * Synchronisation to the clock is on by default, but can be turned
+ * off by setting the 'sync' property to FALSE
+ *
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * gst-launch -v -m videotestsrc ! clocksync ! fakesink silent=TRUE
+ * ]|
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <gst/gst.h>
+
+#include "gstclocksync.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_clock_sync_debug);
+#define GST_CAT_DEFAULT gst_clock_sync_debug
+
+/* ClockSync signals and args */
+enum
+{
+ SIGNAL_HANDOFF,
+ SIGNAL_HANDOFF_LIST,
+ /* FILL ME */
+ LAST_SIGNAL
+};
+
+#define DEFAULT_SYNC TRUE
+#define DEFAULT_TS_OFFSET 0
+#define DEFAULT_SIGNAL_HANDOFFS FALSE
+
+enum
+{
+ PROP_0,
+ PROP_SYNC,
+ PROP_TS_OFFSET,
+ PROP_SIGNAL_HANDOFFS,
+};
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("ANY")
+ );
+
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("ANY")
+ );
+
+#define _do_init \
+ GST_DEBUG_CATEGORY_INIT (gst_clock_sync_debug, "clocksync", 0, "clocksync element");
+#define gst_clock_sync_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstClockSync, gst_clock_sync, GST_TYPE_ELEMENT,
+ _do_init);
+
+static void gst_clock_sync_finalize (GObject * object);
+
+static void gst_clock_sync_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_clock_sync_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+static gboolean gst_clock_sync_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static GstFlowReturn gst_clock_sync_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buf);
+static GstFlowReturn gst_clock_sync_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * buflist);
+static gboolean gst_clock_sync_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
+static GstStateChangeReturn gst_clocksync_change_state (GstElement * element,
+ GstStateChange transition);
+
+static guint gst_clocksync_signals[LAST_SIGNAL] = { 0 };
+
+static void
+gst_clock_sync_class_init (GstClockSyncClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ gobject_class->set_property = gst_clock_sync_set_property;
+ gobject_class->get_property = gst_clock_sync_get_property;
+ gobject_class->finalize = gst_clock_sync_finalize;
+
+ g_object_class_install_property (gobject_class, PROP_SYNC,
+ g_param_spec_boolean ("sync", "Synchronize",
+ "Synchronize to pipeline clock", DEFAULT_SYNC,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
+ g_param_spec_int64 ("ts-offset", "Timestamp offset for synchronisation",
+ "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync",
+ G_MININT64, G_MAXINT64, DEFAULT_TS_OFFSET,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstClockSync::signal-handoffs
+ *
+ * If set to %TRUE, the clocksync will emit a handoff signal when handling a buffer.
+ * When set to %FALSE, no signal will be emitted, which might improve performance.
+ */
+ g_object_class_install_property (gobject_class, PROP_SIGNAL_HANDOFFS,
+ g_param_spec_boolean ("signal-handoffs",
+ "Signal handoffs", "Send a signal before pushing the buffer",
+ DEFAULT_SIGNAL_HANDOFFS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstClockSync::handoff:
+ * @clocksync: the clocksync instance
+ * @buffer: the buffer that just has been received
+ * @pad: the pad that received it
+ *
+ * This signal gets emitted before passing the buffer downstream.
+ */
+ gst_clocksync_signals[SIGNAL_HANDOFF] =
+ g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstClockSyncClass, handoff), NULL, NULL,
+ NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE);
+
+ /**
+ * GstClockSync::handoff-list:
+ * @clocksync: the clocksync instance
+ * @buffer_list: the buffer list that just has been received
+ * @pad: the pad that received it
+ *
+ * This signal gets emitted before passing the buffer list downstream.
+ */
+ gst_clocksync_signals[SIGNAL_HANDOFF_LIST] =
+ g_signal_new ("handoff-list", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstClockSyncClass, handoff_list),
+ NULL, NULL, NULL, G_TYPE_NONE, 1,
+ GST_TYPE_BUFFER_LIST | G_SIGNAL_TYPE_STATIC_SCOPE);
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_clocksync_change_state);
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "ClockSync",
+ "Generic",
+ "Synchronise buffers to the clock", "Jan Schmidt <jan@centricular.com>");
+
+ gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+ gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
+}
+
+static void
+gst_clock_sync_finalize (GObject * object)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (object);
+
+ g_cond_clear (&clocksync->blocked_cond);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_clock_sync_init (GstClockSync * clocksync)
+{
+ clocksync->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
+ gst_pad_set_event_function (clocksync->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_clock_sync_sink_event));
+ gst_pad_set_chain_function (clocksync->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_clock_sync_chain));
+ gst_pad_set_chain_list_function (clocksync->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_clock_sync_chain_list));
+ GST_PAD_SET_PROXY_CAPS (clocksync->sinkpad);
+ gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->sinkpad);
+
+ clocksync->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
+
+ gst_pad_set_query_function (clocksync->srcpad, gst_clock_sync_src_query);
+
+ GST_PAD_SET_PROXY_CAPS (clocksync->srcpad);
+ gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->srcpad);
+
+ clocksync->signal_handoffs = DEFAULT_SIGNAL_HANDOFFS;
+ clocksync->ts_offset = DEFAULT_TS_OFFSET;
+ clocksync->sync = DEFAULT_SYNC;
+ g_cond_init (&clocksync->blocked_cond);
+}
+
+static void
+gst_clock_sync_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (object);
+
+ switch (prop_id) {
+ case PROP_SYNC:
+ clocksync->sync = g_value_get_boolean (value);
+ break;
+ case PROP_TS_OFFSET:
+ clocksync->ts_offset = g_value_get_int64 (value);
+ break;
+ case PROP_SIGNAL_HANDOFFS:
+ clocksync->signal_handoffs = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_clock_sync_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (object);
+
+ switch (prop_id) {
+ case PROP_SYNC:
+ g_value_set_boolean (value, clocksync->sync);
+ break;
+ case PROP_TS_OFFSET:
+ g_value_set_int64 (value, clocksync->ts_offset);
+ break;
+ case PROP_SIGNAL_HANDOFFS:
+ g_value_set_boolean (value, clocksync->signal_handoffs);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstFlowReturn
+gst_clocksync_do_sync (GstClockSync * clocksync, GstClockTime running_time)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstClock *clock;
+
+ if (!clocksync->sync)
+ return GST_FLOW_OK;
+
+ if (running_time == GST_CLOCK_TIME_NONE)
+ return GST_FLOW_OK; /* Can't sync on an invalid time either way */
+
+ if (clocksync->segment.format != GST_FORMAT_TIME)
+ return GST_FLOW_OK;
+
+ GST_OBJECT_LOCK (clocksync);
+
+ if (clocksync->flushing) {
+ GST_OBJECT_UNLOCK (clocksync);
+ return GST_FLOW_FLUSHING;
+ }
+
+ while (clocksync->blocked && !clocksync->flushing)
+ g_cond_wait (&clocksync->blocked_cond, GST_OBJECT_GET_LOCK (clocksync));
+
+ if (clocksync->flushing) {
+ GST_OBJECT_UNLOCK (clocksync);
+ return GST_FLOW_FLUSHING;
+ }
+
+ if ((clock = GST_ELEMENT (clocksync)->clock)) {
+ GstClockReturn cret;
+ GstClockTime timestamp;
+ GstClockTimeDiff ts_offset = clocksync->ts_offset;
+
+ timestamp = running_time + GST_ELEMENT (clocksync)->base_time +
+ clocksync->upstream_latency;
+ if (ts_offset < 0) {
+ ts_offset = -ts_offset;
+ if (ts_offset < timestamp)
+ timestamp -= ts_offset;
+ else
+ timestamp = 0;
+ } else
+ timestamp += ts_offset;
+
+ /* save id if we need to unlock */
+ clocksync->clock_id = gst_clock_new_single_shot_id (clock, timestamp);
+ GST_OBJECT_UNLOCK (clocksync);
+
+ cret = gst_clock_id_wait (clocksync->clock_id, NULL);
+
+ GST_OBJECT_LOCK (clocksync);
+ if (clocksync->clock_id) {
+ gst_clock_id_unref (clocksync->clock_id);
+ clocksync->clock_id = NULL;
+ }
+ if (cret == GST_CLOCK_UNSCHEDULED || clocksync->flushing)
+ ret = GST_FLOW_FLUSHING;
+ }
+ GST_OBJECT_UNLOCK (clocksync);
+
+ return ret;
+}
+
+static gboolean
+gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+ gboolean ret;
+
+ GST_LOG_OBJECT (clocksync, "Received %s event: %" GST_PTR_FORMAT,
+ GST_EVENT_TYPE_NAME (event), event);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEGMENT:
+ /* store the event for synching */
+ gst_event_copy_segment (event, &clocksync->segment);
+ break;
+ case GST_EVENT_GAP:
+ {
+ GstClockTime start, dur;
+
+ if (clocksync->segment.format != GST_FORMAT_TIME)
+ break;
+
+ gst_event_parse_gap (event, &start, &dur);
+ if (GST_CLOCK_TIME_IS_VALID (start)) {
+ start = gst_segment_to_running_time (&clocksync->segment,
+ GST_FORMAT_TIME, start);
+
+ gst_clocksync_do_sync (clocksync, start);
+ }
+ break;
+ }
+ case GST_EVENT_FLUSH_START:
+ GST_OBJECT_LOCK (clocksync);
+ clocksync->flushing = TRUE;
+ g_cond_broadcast (&clocksync->blocked_cond);
+ if (clocksync->clock_id) {
+ GST_DEBUG_OBJECT (clocksync, "unlock clock wait");
+ gst_clock_id_unschedule (clocksync->clock_id);
+ }
+ GST_OBJECT_UNLOCK (clocksync);
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ GST_OBJECT_LOCK (clocksync);
+ clocksync->flushing = FALSE;
+ gst_segment_init (&clocksync->segment, GST_FORMAT_UNDEFINED);
+ GST_OBJECT_UNLOCK (clocksync);
+ default:
+ break;
+ }
+
+ /* Always handle all events as normal: */
+ ret = gst_pad_event_default (pad, parent, event);
+ return ret;
+}
+
+static GstFlowReturn
+gst_clock_sync_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+ GstFlowReturn ret = GST_FLOW_OK;
+
+ GST_LOG_OBJECT (clocksync, "Handling buffer %" GST_PTR_FORMAT, buf);
+
+ if (clocksync->segment.format == GST_FORMAT_TIME) {
+ GstClockTime runtimestamp = 0;
+ GstClockTime rundts, runpts;
+
+ rundts = gst_segment_to_running_time (&clocksync->segment,
+ GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
+ runpts = gst_segment_to_running_time (&clocksync->segment,
+ GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
+
+ if (GST_CLOCK_TIME_IS_VALID (rundts))
+ runtimestamp = rundts;
+ else if (GST_CLOCK_TIME_IS_VALID (runpts))
+ runtimestamp = runpts;
+
+ ret = gst_clocksync_do_sync (clocksync, runtimestamp);
+ if (ret != GST_FLOW_OK) {
+ GST_LOG_OBJECT (clocksync,
+ "Interrupted while waiting on the clock. Dropping buffer.");
+ gst_buffer_unref (buf);
+ return ret;
+ }
+ }
+
+ if (clocksync->signal_handoffs)
+ g_signal_emit (clocksync, gst_clocksync_signals[SIGNAL_HANDOFF], 0, buf);
+
+ /* Forward the buffer */
+ return gst_pad_push (clocksync->srcpad, buf);
+}
+
+static GstFlowReturn
+gst_clock_sync_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstBuffer *buf;
+
+ GST_LOG_OBJECT (clocksync, "Handling buffer list %" GST_PTR_FORMAT,
+ buffer_list);
+
+ if (gst_buffer_list_length (buffer_list) == 0)
+ goto done;
+
+ buf = gst_buffer_list_get (buffer_list, 0);
+
+ if (clocksync->segment.format == GST_FORMAT_TIME) {
+ GstClockTime runtimestamp = 0;
+ GstClockTime rundts, runpts;
+
+ rundts = gst_segment_to_running_time (&clocksync->segment,
+ GST_FORMAT_TIME, GST_BUFFER_DTS (buf));
+ runpts = gst_segment_to_running_time (&clocksync->segment,
+ GST_FORMAT_TIME, GST_BUFFER_PTS (buf));
+
+ if (GST_CLOCK_TIME_IS_VALID (rundts))
+ runtimestamp = rundts;
+ else if (GST_CLOCK_TIME_IS_VALID (runpts))
+ runtimestamp = runpts;
+
+ ret = gst_clocksync_do_sync (clocksync, runtimestamp);
+ if (ret != GST_FLOW_OK) {
+ gst_buffer_list_unref (buffer_list);
+ return ret;
+ }
+ }
+
+ if (clocksync->signal_handoffs)
+ g_signal_emit (clocksync, gst_clocksync_signals[SIGNAL_HANDOFF_LIST], 0,
+ buffer_list);
+
+ /* Forward the buffer list */
+done:
+ return gst_pad_push_list (clocksync->srcpad, buffer_list);
+}
+
+static gboolean
+gst_clock_sync_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (parent);
+ gboolean res;
+
+ res = gst_pad_query_default (pad, parent, query);
+
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_LATENCY:{
+ gboolean live = FALSE;
+ GstClockTime min = 0, max = 0;
+
+ if (res) {
+ gst_query_parse_latency (query, &live, &min, &max);
+
+ if (clocksync->sync && max < min) {
+ GST_ELEMENT_WARNING (parent, CORE, CLOCK, (NULL),
+ ("Impossible to configure latency upstream of clocksync sync=true:"
+ " max %" GST_TIME_FORMAT " < min %"
+ GST_TIME_FORMAT ". Add queues or other buffering elements.",
+ GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
+ }
+ }
+
+ /* Ignore the upstream latency if it is not live */
+ GST_OBJECT_LOCK (clocksync);
+ if (live)
+ clocksync->upstream_latency = min;
+ else {
+ clocksync->upstream_latency = 0;
+ /* if upstream is non-live source, then there is no
+ * limit on the maximum latency */
+ max = -1;
+ }
+
+ GST_OBJECT_UNLOCK (clocksync);
+
+ GST_DEBUG_OBJECT (clocksync,
+ "Configured upstream latency = %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (clocksync->upstream_latency));
+
+ gst_query_set_latency (query, live || clocksync->sync, min, max);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return res;
+}
+
+static GstStateChangeReturn
+gst_clocksync_change_state (GstElement * element, GstStateChange transition)
+{
+ GstClockSync *clocksync = GST_CLOCKSYNC (element);
+ GstStateChangeReturn ret;
+ gboolean no_preroll = FALSE;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ GST_OBJECT_LOCK (clocksync);
+ clocksync->flushing = FALSE;
+ clocksync->blocked = TRUE;
+ GST_OBJECT_UNLOCK (clocksync);
+ if (clocksync->sync)
+ no_preroll = TRUE;
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ GST_OBJECT_LOCK (clocksync);
+ clocksync->blocked = FALSE;
+ g_cond_broadcast (&clocksync->blocked_cond);
+ GST_OBJECT_UNLOCK (clocksync);
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ GST_OBJECT_LOCK (clocksync);
+ clocksync->flushing = TRUE;
+ if (clocksync->clock_id) {
+ GST_DEBUG_OBJECT (clocksync, "unlock clock wait");
+ gst_clock_id_unschedule (clocksync->clock_id);
+ }
+ clocksync->blocked = FALSE;
+ g_cond_broadcast (&clocksync->blocked_cond);
+ GST_OBJECT_UNLOCK (clocksync);
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ GST_OBJECT_LOCK (clocksync);
+ clocksync->upstream_latency = 0;
+ clocksync->blocked = TRUE;
+ GST_OBJECT_UNLOCK (clocksync);
+ if (clocksync->sync)
+ no_preroll = TRUE;
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ break;
+ default:
+ break;
+ }
+
+ if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
+ ret = GST_STATE_CHANGE_NO_PREROLL;
+
+ return ret;
+}
--- /dev/null
+/* GStreamer
+ *
+ * unit test for clocksync
+ *
+ * Copyright (C) <2005> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2015> Havard Graff <havard@pexip.com>
+ * Copyright (C) <2020> Jan Schmidt <jan@centricular.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/check/gstcheck.h>
+#include <gst/check/gstharness.h>
+
+GST_START_TEST (test_one_buffer)
+{
+ GstHarness *h = gst_harness_new_parse ("clocksync sync=false");
+ GstBuffer *buffer_in;
+ GstBuffer *buffer_out;
+
+ gst_harness_set_src_caps_str (h, "mycaps");
+
+ buffer_in = gst_buffer_new_and_alloc (4);
+ ASSERT_BUFFER_REFCOUNT (buffer_in, "buffer", 1);
+
+ gst_buffer_fill (buffer_in, 0, "data", 4);
+
+ /* pushing gives away my reference ... */
+ fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in));
+
+ /* ... but it should end up being collected on GstHarness queue */
+ fail_unless_equals_int (1, gst_harness_buffers_in_queue (h));
+ buffer_out = gst_harness_pull (h);
+
+ fail_unless (buffer_in == buffer_out);
+ ASSERT_BUFFER_REFCOUNT (buffer_out, "buffer", 1);
+
+ /* cleanup */
+ gst_buffer_unref (buffer_out);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+static void
+handoff_func (GstElement * clocksync, GstBuffer * buf, GstBuffer ** ret)
+{
+ (void) clocksync;
+ *ret = buf;
+}
+
+GST_START_TEST (test_signal_handoffs)
+{
+ GstHarness *h = gst_harness_new_parse ("clocksync sync=false name=c");
+ GstBuffer *buffer_in;
+ GstBuffer *buffer_signaled = NULL;
+ GstElement *c = gst_bin_get_by_name (GST_BIN (h->element), "c");
+
+ gst_harness_set_src_caps_str (h, "mycaps");
+
+ /* connect to the handoff signal */
+ g_signal_connect (c, "handoff", G_CALLBACK (handoff_func), &buffer_signaled);
+
+ /* first, turn off signal-handoffs */
+ g_object_set (c, "signal-handoffs", FALSE, NULL);
+
+ /* then push a buffer */
+ buffer_in = gst_buffer_new_and_alloc (4);
+ fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in));
+
+ /* verify that we got no buffer signaled */
+ fail_unless (buffer_signaled == NULL);
+
+ /* now turn on signal-handoffs */
+ g_object_set (c, "signal-handoffs", TRUE, NULL);
+
+ /* then push another buffer */
+ buffer_in = gst_buffer_new_and_alloc (4);
+ fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in));
+
+ /* verify the buffer signaled is equal to the one pushed in */
+ fail_unless (buffer_signaled == buffer_in);
+ ASSERT_BUFFER_REFCOUNT (buffer_signaled, "buffer", 1);
+
+ /* cleanup */
+ gst_object_unref (c);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_sync_on_timestamp)
+{
+ /* the reason to use the queue in front of the clocksync element
+ is to effectively make gst_harness_push asynchronous, not locking
+ up the test, waiting for gst_clock_id_wait */
+ GstHarness *h = gst_harness_new_parse ("queue ! clocksync");
+ GstBuffer *buf;
+ GstClock *clock;
+ GstClockTime timestamp = 123456789;
+
+ /* use testclock */
+ gst_harness_use_testclock (h);
+ gst_harness_set_src_caps_str (h, "mycaps");
+
+ /* make a buffer and set the timestamp */
+ buf = gst_buffer_new ();
+ GST_BUFFER_PTS (buf) = timestamp;
+
+ /* push the buffer, and verify it does *not* make it through */
+ gst_harness_push (h, buf);
+ fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+
+ /* verify the clocksync element has registered exactly one GstClockID */
+ fail_unless (gst_harness_wait_for_clock_id_waits (h, 1, 42));
+
+ /* crank the clock and pull the buffer */
+ gst_harness_crank_single_clock_wait (h);
+ buf = gst_harness_pull (h);
+
+ /* verify that the buffer has the right timestamp, and that the time on
+ the clock is equal to the timestamp */
+ fail_unless_equals_int64 (timestamp, GST_BUFFER_PTS (buf));
+ clock = gst_element_get_clock (h->element);
+ fail_unless_equals_int64 (timestamp, gst_clock_get_time (clock));
+
+ /* cleanup */
+ gst_object_unref (clock);
+ gst_buffer_unref (buf);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_no_sync_on_timestamp)
+{
+ GstHarness *h = gst_harness_new_parse ("clocksync sync=false");
+ GstBuffer *buf;
+ GstClockTime timestamp = 123456789;
+
+ /* use testclock */
+ gst_harness_use_testclock (h);
+ gst_harness_set_src_caps_str (h, "mycaps");
+
+ /* make a buffer and set the timestamp */
+ buf = gst_buffer_new ();
+ GST_BUFFER_PTS (buf) = timestamp;
+
+ /* push the buffer, and verify it was forwarded immediately */
+ gst_harness_push (h, buf);
+ fail_unless_equals_int (1, gst_harness_buffers_in_queue (h));
+
+ buf = gst_harness_pull (h);
+ /* verify that the buffer has the right timestamp */
+ fail_unless_equals_int64 (timestamp, GST_BUFFER_PTS (buf));
+
+ /* cleanup */
+ gst_buffer_unref (buf);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_stopping_element_unschedules_sync)
+{
+ /* the reason to use the queue in front of the clocksync element
+ is to effectively make gst_harness_push asynchronous, not locking
+ up the test, waiting for gst_clock_id_wait */
+ GstHarness *h = gst_harness_new_parse ("queue ! clocksync sync=true");
+ GstBuffer *buf;
+ GstClockTime timestamp = 123456789;
+
+ /* use testclock */
+ gst_harness_use_testclock (h);
+ gst_harness_set_src_caps_str (h, "mycaps");
+
+ /* make a buffer and set the timestamp */
+ buf = gst_buffer_new ();
+ GST_BUFFER_PTS (buf) = timestamp;
+
+ /* push the buffer, and verify it does *not* make it through */
+ gst_harness_push (h, buf);
+ fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+
+ /* verify the clocksync element has registered exactly one GstClockID */
+ fail_unless (gst_harness_wait_for_clock_id_waits (h, 1, 42));
+
+ /* setting clocksync to READY should unschedule the sync */
+ gst_element_set_state (h->element, GST_STATE_READY);
+
+ /* verify the clocksync element no longer waits on the clock */
+ fail_unless (gst_harness_wait_for_clock_id_waits (h, 0, 42));
+
+ /* and that the waiting buffer was dropped */
+ fail_unless_equals_int (0, gst_harness_buffers_received (h));
+
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+static Suite *
+clocksync_suite (void)
+{
+ Suite *s = suite_create ("clocksync");
+ TCase *tc_chain = tcase_create ("general");
+
+ suite_add_tcase (s, tc_chain);
+ tcase_add_test (tc_chain, test_one_buffer);
+ tcase_add_test (tc_chain, test_signal_handoffs);
+ tcase_add_test (tc_chain, test_sync_on_timestamp);
+ tcase_add_test (tc_chain, test_stopping_element_unschedules_sync);
+ tcase_add_test (tc_chain, test_no_sync_on_timestamp);
+
+
+ return s;
+}
+
+GST_CHECK_MAIN (clocksync);