basesrc: add buffer list support
authorTim-Philipp Müller <tim@centricular.com>
Wed, 30 Aug 2017 12:03:28 +0000 (13:03 +0100)
committerTim-Philipp Müller <tim@centricular.com>
Thu, 7 Dec 2017 12:17:09 +0000 (12:17 +0000)
Add a gst_base_src_submit_buffer_list() function that allows subclasses
to produce a bufferlist containing multiple buffers in the ::create()
function. The buffers in the buffer list will then also be pushed out
in one go as a GstBufferList. This can reduce push overhead
significantly for sources with packetised inputs (such as udpsrc)
in high-throughput scenarios.

The _submit_buffer_list() approach was chosen because it is fairly
straight-forward, backwards-compatible, bindings-friendly (as opposed
to e.g. making the create function return a mini object instead),
and it allows the subclass maximum control: the subclass can decide
dynamically at runtime whether to return a list or a single buffer
(which would be messier if we added a create_list virtual method).

https://bugzilla.gnome.org/show_bug.cgi?id=750241

docs/libs/gstreamer-libs-sections.txt
libs/gst/base/gstbasesrc.c
libs/gst/base/gstbasesrc.h
tests/check/libs/basesrc.c
win32/common/libgstbase.def

index 003210c..957f3a8 100644 (file)
@@ -335,6 +335,7 @@ gst_base_src_get_allocator
 gst_base_src_get_buffer_pool
 gst_base_src_is_async
 gst_base_src_set_async
+gst_base_src_submit_buffer_list
 
 GST_BASE_SRC_PAD
 GST_BASE_SRC_IS_STARTED
index 30411b0..da73c8c 100644 (file)
@@ -261,8 +261,14 @@ struct _GstBaseSrcPrivate
   GstAllocationParams params;   /* OBJECT_LOCK */
 
   GCond async_cond;             /* OBJECT_LOCK */
+
+  /* for _submit_buffer_list() */
+  GstBufferList *pending_bufferlist;
 };
 
+#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
+    ((src)->priv->pending_bufferlist != NULL)
+
 static GstElementClass *parent_class = NULL;
 
 static void gst_base_src_class_init (GstBaseSrcClass * klass);
@@ -2556,6 +2562,16 @@ again:
     res_buf = in_buf;
   }
 
+  if (res_buf == NULL) {
+    GstBufferList *pending_list = src->priv->pending_bufferlist;
+
+    if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
+      goto null_buffer;
+
+    res_buf = gst_buffer_list_get_writable (pending_list, 0);
+    own_res_buf = FALSE;
+  }
+
   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
   if (offset == 0 && src->segment.time == 0
       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
@@ -2671,6 +2687,14 @@ eos:
     GST_DEBUG_OBJECT (src, "we are EOS");
     return GST_FLOW_EOS;
   }
+null_buffer:
+  {
+    GST_ELEMENT_ERROR (src, STREAM, FAILED,
+        (_("Internal data flow error.")),
+        ("Subclass %s neither returned a buffer nor submitted a buffer list "
+            "from its create function", G_OBJECT_TYPE_NAME (src)));
+    return GST_FLOW_ERROR;
+  }
 }
 
 static GstFlowReturn
@@ -2803,6 +2827,12 @@ gst_base_src_loop (GstPad * pad)
   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
       GST_TIME_ARGS (position), blocksize);
 
+  /* clean up just in case we got interrupted or so last time round */
+  if (src->priv->pending_bufferlist != NULL) {
+    gst_buffer_list_unref (src->priv->pending_bufferlist);
+    src->priv->pending_bufferlist = NULL;
+  }
+
   ret = gst_base_src_get_range (src, position, blocksize, &buf);
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
@@ -2810,9 +2840,11 @@ gst_base_src_loop (GstPad * pad)
     GST_LIVE_UNLOCK (src);
     goto pause;
   }
-  /* this should not happen */
-  if (G_UNLIKELY (buf == NULL))
-    goto null_buffer;
+
+  /* Note: at this point buf might be a single buf returned which we own or
+   * the first buf of a pending buffer list submitted via submit_buffer_list(),
+   * in which case the buffer is owned by the pending buffer list and not us. */
+  g_assert (buf != NULL);
 
   /* push events to close/start our segment before we push the buffer. */
   if (G_UNLIKELY (src->priv->segment_pending)) {
@@ -2917,7 +2949,14 @@ gst_base_src_loop (GstPad * pad)
   }
   GST_LIVE_UNLOCK (src);
 
-  ret = gst_pad_push (pad, buf);
+  /* push buffer or buffer list */
+  if (src->priv->pending_bufferlist != NULL) {
+    ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
+    src->priv->pending_bufferlist = NULL;
+  } else {
+    ret = gst_pad_push (pad, buf);
+  }
+
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
     if (ret == GST_FLOW_NOT_NEGOTIATED) {
       goto not_negotiated;
@@ -3018,13 +3057,6 @@ pause:
     }
     goto done;
   }
-null_buffer:
-  {
-    GST_ELEMENT_ERROR (src, STREAM, FAILED,
-        (_("Internal data flow error.")), ("element returned NULL buffer"));
-    GST_LIVE_UNLOCK (src);
-    goto done;
-  }
 }
 
 static gboolean
@@ -3619,6 +3651,11 @@ gst_base_src_stop (GstBaseSrc * basesrc)
   if (bclass->stop)
     result = bclass->stop (basesrc);
 
+  if (basesrc->priv->pending_bufferlist != NULL) {
+    gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
+    basesrc->priv->pending_bufferlist = NULL;
+  }
+
   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
 
   return result;
@@ -3959,3 +3996,40 @@ gst_base_src_get_allocator (GstBaseSrc * src,
     *params = src->priv->params;
   GST_OBJECT_UNLOCK (src);
 }
+
+/**
+ * gst_base_src_submit_buffer_list:
+ * @src: a #GstBaseSrc
+ * @buffer_list: (transfer full): a #GstBufferList
+ *
+ * Subclasses can call this from their create virtual method implementation
+ * to submit a buffer list to be pushed out later. This is useful in
+ * cases where the create function wants to produce multiple buffers to be
+ * pushed out in one go in form of a #GstBufferList, which can reduce overhead
+ * drastically, especially for packetised inputs (for data streams where
+ * the packetisation/chunking is not important it is usually more efficient
+ * to return larger buffers instead).
+ *
+ * Subclasses that use this function from their create function must return
+ * %GST_FLOW_OK and no buffer from their create virtual method implementation.
+ * If a buffer is returned after a buffer list has also been submitted via this
+ * function the behaviour is undefined.
+ *
+ * Subclasses must only call this function once per create function call and
+ * subclasses must only call this function when the source operates in push
+ * mode.
+ *
+ * Since: 1.14
+ */
+void
+gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+  g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
+  g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
+
+  src->priv->pending_bufferlist = buffer_list;
+
+  GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
+      gst_buffer_list_length (buffer_list));
+}
index 757a471..f23799a 100644 (file)
@@ -299,6 +299,9 @@ void            gst_base_src_get_allocator    (GstBaseSrc *src,
                                                GstAllocator **allocator,
                                                GstAllocationParams *params);
 
+GST_EXPORT
+void            gst_base_src_submit_buffer_list (GstBaseSrc    * src,
+                                                 GstBufferList * buffer_list);
 
 #ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC
 G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstBaseSrc, gst_object_unref)
index 82c6589..ee360ce 100644 (file)
@@ -2,7 +2,7 @@
  *
  * some unit tests for GstBaseSrc
  *
- * Copyright (C) 2006 Tim-Philipp Müller <tim centricular net>
+ * Copyright (C) 2006-2017 Tim-Philipp Müller <tim centricular net>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -768,6 +768,142 @@ GST_START_TEST (basesrc_seek_on_last_buffer)
 
 GST_END_TEST;
 
+typedef GstBaseSrc TestSrc;
+typedef GstBaseSrcClass TestSrcClass;
+
+static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+static GType test_src_get_type (void);
+
+G_DEFINE_TYPE (TestSrc, test_src, GST_TYPE_BASE_SRC);
+
+static void
+test_src_init (TestSrc * src)
+{
+}
+
+static GstFlowReturn
+test_src_create (GstBaseSrc * src, guint64 offset, guint size,
+    GstBuffer ** p_buf)
+{
+  GstBuffer *buf;
+  static int num = 0;
+
+  fail_if (*p_buf != NULL);
+
+  buf = gst_buffer_new ();
+  GST_BUFFER_OFFSET (buf) = num++;
+
+  if (num == 1 || g_random_boolean ()) {
+    GstBufferList *buflist = gst_buffer_list_new ();
+
+    gst_buffer_list_add (buflist, buf);
+
+    buf = gst_buffer_new ();
+    GST_BUFFER_OFFSET (buf) = num++;
+    gst_buffer_list_add (buflist, buf);
+    gst_base_src_submit_buffer_list (src, buflist);
+  } else {
+    *p_buf = buf;
+  }
+
+  return GST_FLOW_OK;
+}
+
+static void
+test_src_class_init (TestSrcClass * klass)
+{
+  GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
+
+  gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
+      &src_template);
+
+  gstbasesrc_class->create = test_src_create;
+}
+
+static GstPad *mysinkpad;
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+#define NUM_BUFFERS 100
+static gboolean done;
+static guint expect_offset;
+
+static GstFlowReturn
+chain_____func (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+  GST_LOG ("  buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
+
+  fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
+  ++expect_offset;
+
+  if (GST_BUFFER_OFFSET (buf) > NUM_BUFFERS) {
+    g_mutex_lock (&check_mutex);
+    done = TRUE;
+    g_cond_signal (&check_cond);
+    g_mutex_unlock (&check_mutex);
+  }
+  gst_buffer_unref (buf);
+
+  return GST_FLOW_OK;
+}
+
+static GstFlowReturn
+chainlist_func (GstPad * pad, GstObject * parent, GstBufferList * list)
+{
+  guint i, len;
+
+  len = gst_buffer_list_length (list);
+
+  GST_DEBUG ("buffer list with %u buffers", len);
+  for (i = 0; i < len; ++i) {
+    GstBuffer *buf = gst_buffer_list_get (list, i);
+    GST_LOG ("  buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
+
+    fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
+    ++expect_offset;
+  }
+
+  gst_buffer_list_unref (list);
+  return GST_FLOW_OK;
+}
+
+GST_START_TEST (basesrc_create_bufferlist)
+{
+  GstElement *src;
+
+  src = g_object_new (test_src_get_type (), NULL);
+
+  mysinkpad = gst_check_setup_sink_pad (src, &sinktemplate);
+  gst_pad_set_chain_function (mysinkpad, chain_____func);
+  gst_pad_set_chain_list_function (mysinkpad, chainlist_func);
+  gst_pad_set_active (mysinkpad, TRUE);
+
+  done = FALSE;
+  expect_offset = 0;
+
+  gst_element_set_state (src, GST_STATE_PLAYING);
+
+  g_mutex_lock (&check_mutex);
+  while (!done)
+    g_cond_wait (&check_cond, &check_mutex);
+  g_mutex_unlock (&check_mutex);
+
+  gst_element_set_state (src, GST_STATE_NULL);
+
+  gst_check_teardown_sink_pad (src);
+
+  gst_object_unref (src);
+}
+
+GST_END_TEST;
+
 static Suite *
 gst_basesrc_suite (void)
 {
@@ -783,6 +919,7 @@ gst_basesrc_suite (void)
   tcase_add_test (tc, basesrc_eos_events_pull_live_eos);
   tcase_add_test (tc, basesrc_seek_events_rate_update);
   tcase_add_test (tc, basesrc_seek_on_last_buffer);
+  tcase_add_test (tc, basesrc_create_bufferlist);
 
   return s;
 }
index a65c611..ff2b844 100644 (file)
@@ -114,6 +114,7 @@ EXPORTS
        gst_base_src_set_live
        gst_base_src_start_complete
        gst_base_src_start_wait
+       gst_base_src_submit_buffer_list
        gst_base_src_wait_playing
        gst_base_transform_get_allocator
        gst_base_transform_get_buffer_pool