appsink: add support for buffer lists
authorPatricia Muscalu <patricia@axis.com>
Mon, 4 Jul 2016 07:32:28 +0000 (09:32 +0200)
committerJan Schmidt <jan@centricular.com>
Tue, 15 Nov 2016 15:06:40 +0000 (02:06 +1100)
https://bugzilla.gnome.org/show_bug.cgi?id=752363

gst-libs/gst/app/gstappsink.c
gst-libs/gst/app/gstappsink.h
tests/check/elements/appsink.c
win32/common/libgstapp.def

index 7b8d481..46ab582 100644 (file)
@@ -68,6 +68,7 @@
 #include <gst/gst.h>
 #include <gst/base/gstbasesink.h>
 #include <gst/gstbuffer.h>
+#include <gst/gstbufferlist.h>
 
 #include <string.h>
 
@@ -94,6 +95,7 @@ struct _GstAppSinkPrivate
   gboolean unlock;
   gboolean started;
   gboolean is_eos;
+  gboolean buffer_lists_supported;
 
   GstAppSinkCallbacks callbacks;
   gpointer user_data;
@@ -124,6 +126,7 @@ enum
 #define DEFAULT_PROP_MAX_BUFFERS       0
 #define DEFAULT_PROP_DROP              FALSE
 #define DEFAULT_PROP_WAIT_ON_EOS       TRUE
+#define DEFAULT_PROP_BUFFER_LIST       FALSE
 
 enum
 {
@@ -134,6 +137,7 @@ enum
   PROP_MAX_BUFFERS,
   PROP_DROP,
   PROP_WAIT_ON_EOS,
+  PROP_BUFFER_LIST,
   PROP_LAST
 };
 
@@ -162,8 +166,12 @@ static gboolean gst_app_sink_event (GstBaseSink * sink, GstEvent * event);
 static gboolean gst_app_sink_query (GstBaseSink * bsink, GstQuery * query);
 static GstFlowReturn gst_app_sink_preroll (GstBaseSink * psink,
     GstBuffer * buffer);
+static GstFlowReturn gst_app_sink_render_common (GstBaseSink * psink,
+    GstMiniObject * data, gboolean is_list);
 static GstFlowReturn gst_app_sink_render (GstBaseSink * psink,
     GstBuffer * buffer);
+static GstFlowReturn gst_app_sink_render_list (GstBaseSink * psink,
+    GstBufferList * list);
 static gboolean gst_app_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
 static GstCaps *gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter);
 
@@ -216,6 +224,10 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
           "Drop old buffers when the buffer queue is filled", DEFAULT_PROP_DROP,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_BUFFER_LIST,
+      g_param_spec_boolean ("buffer-list", "Buffer List",
+          "Use buffer lists", DEFAULT_PROP_BUFFER_LIST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
    * GstAppSink::wait-on-eos:
    *
@@ -413,6 +425,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
   basesink_class->event = gst_app_sink_event;
   basesink_class->preroll = gst_app_sink_preroll;
   basesink_class->render = gst_app_sink_render;
+  basesink_class->render_list = gst_app_sink_render_list;
   basesink_class->get_caps = gst_app_sink_getcaps;
   basesink_class->set_caps = gst_app_sink_setcaps;
   basesink_class->query = gst_app_sink_query;
@@ -442,6 +455,7 @@ gst_app_sink_init (GstAppSink * appsink)
   priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
   priv->drop = DEFAULT_PROP_DROP;
   priv->wait_on_eos = DEFAULT_PROP_WAIT_ON_EOS;
+  priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST;
 }
 
 static void
@@ -507,6 +521,10 @@ gst_app_sink_set_property (GObject * object, guint prop_id,
     case PROP_DROP:
       gst_app_sink_set_drop (appsink, g_value_get_boolean (value));
       break;
+    case PROP_BUFFER_LIST:
+      gst_app_sink_set_buffer_list_support (appsink,
+          g_value_get_boolean (value));
+      break;
     case PROP_WAIT_ON_EOS:
       gst_app_sink_set_wait_on_eos (appsink, g_value_get_boolean (value));
       break;
@@ -545,6 +563,10 @@ gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_DROP:
       g_value_set_boolean (value, gst_app_sink_get_drop (appsink));
       break;
+    case PROP_BUFFER_LIST:
+      g_value_set_boolean (value,
+          gst_app_sink_get_buffer_list_support (appsink));
+      break;
     case PROP_WAIT_ON_EOS:
       g_value_set_boolean (value, gst_app_sink_get_wait_on_eos (appsink));
       break;
@@ -753,20 +775,17 @@ flushing:
   }
 }
 
-static GstBuffer *
+static GstMiniObject *
 dequeue_buffer (GstAppSink * appsink)
 {
   GstAppSinkPrivate *priv = appsink->priv;
-  GstBuffer *buffer;
+  GstMiniObject *obj;
 
   do {
-    GstMiniObject *obj;
-
     obj = g_queue_pop_head (priv->queue);
 
-    if (GST_IS_BUFFER (obj)) {
-      buffer = GST_BUFFER_CAST (obj);
-      GST_DEBUG_OBJECT (appsink, "dequeued buffer %p", buffer);
+    if (GST_IS_BUFFER (obj) || GST_IS_BUFFER_LIST (obj)) {
+      GST_DEBUG_OBJECT (appsink, "dequeued buffer/list %p", obj);
       priv->num_buffers--;
       break;
     } else if (GST_IS_EVENT (obj)) {
@@ -794,11 +813,12 @@ dequeue_buffer (GstAppSink * appsink)
     }
   } while (TRUE);
 
-  return buffer;
+  return obj;
 }
 
 static GstFlowReturn
-gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
+gst_app_sink_render_common (GstBaseSink * psink, GstMiniObject * data,
+    gboolean is_list)
 {
   GstFlowReturn ret;
   GstAppSink *appsink = GST_APP_SINK_CAST (psink);
@@ -819,17 +839,17 @@ restart:
         priv->last_caps);
   }
 
-  GST_DEBUG_OBJECT (appsink, "pushing render buffer %p on queue (%d)",
-      buffer, priv->num_buffers);
+  GST_DEBUG_OBJECT (appsink, "pushing render buffer/list %p on queue (%d)",
+      data, priv->num_buffers);
 
   while (priv->max_buffers > 0 && priv->num_buffers >= priv->max_buffers) {
     if (priv->drop) {
-      GstBuffer *old;
+      GstMiniObject *old;
 
-      /* we need to drop the oldest buffer and try again */
+      /* we need to drop the oldest buffer/list and try again */
       if ((old = dequeue_buffer (appsink))) {
-        GST_DEBUG_OBJECT (appsink, "dropping old buffer %p", old);
-        gst_buffer_unref (old);
+        GST_DEBUG_OBJECT (appsink, "dropping old buffer/list %p", old);
+        gst_mini_object_unref (old);
       }
     } else {
       GST_DEBUG_OBJECT (appsink, "waiting for free space, length %d >= %d",
@@ -851,8 +871,8 @@ restart:
         goto flushing;
     }
   }
-  /* we need to ref the buffer when pushing it in the queue */
-  g_queue_push_tail (priv->queue, gst_buffer_ref (buffer));
+  /* we need to ref the buffer/list when pushing it in the queue */
+  g_queue_push_tail (priv->queue, gst_mini_object_ref (data));
   priv->num_buffers++;
   g_cond_signal (&priv->cond);
   emit = priv->emit_signals;
@@ -880,6 +900,43 @@ stopping:
   }
 }
 
+static GstFlowReturn
+gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
+{
+  return gst_app_sink_render_common (psink, GST_MINI_OBJECT_CAST (buffer),
+      FALSE);
+}
+
+static GstFlowReturn
+gst_app_sink_render_list (GstBaseSink * sink, GstBufferList * list)
+{
+  GstFlowReturn flow;
+  GstAppSink *appsink;
+  GstBuffer *buffer;
+  guint i, len;
+
+  appsink = GST_APP_SINK_CAST (sink);
+
+  if (appsink->priv->buffer_lists_supported)
+    return gst_app_sink_render_common (sink, GST_MINI_OBJECT_CAST (list), TRUE);
+
+  /* The application doesn't support buffer lists, extract individual buffers
+   * then and push them one-by-one */
+  GST_INFO_OBJECT (sink, "chaining each group in list as a merged buffer");
+
+  len = gst_buffer_list_length (list);
+
+  flow = GST_FLOW_OK;
+  for (i = 0; i < len; i++) {
+    buffer = gst_buffer_list_get (list, i);
+    flow = gst_app_sink_render (sink, buffer);
+    if (flow != GST_FLOW_OK)
+      break;
+  }
+
+  return flow;
+}
+
 static GstCaps *
 gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter)
 {
@@ -1186,6 +1243,57 @@ gst_app_sink_get_drop (GstAppSink * appsink)
 }
 
 /**
+ * gst_app_sink_set_buffer_list_support:
+ * @appsink: a #GstAppSink
+ * @buffer_list: enable or disable buffer list support
+ *
+ * Instruct @appsink to enable or disable buffer list support.
+ *
+ */
+void
+gst_app_sink_set_buffer_list_support (GstAppSink * appsink,
+    gboolean buffer_list)
+{
+  GstAppSinkPrivate *priv;
+
+  g_return_if_fail (GST_IS_APP_SINK (appsink));
+
+  priv = appsink->priv;
+
+  g_mutex_lock (&priv->mutex);
+  if (priv->buffer_lists_supported != buffer_list) {
+    priv->buffer_lists_supported = buffer_list;
+  }
+  g_mutex_unlock (&priv->mutex);
+}
+
+/**
+ * gst_app_sink_get_buffer_list_support:
+ * @appsink: a #GstAppSink
+ *
+ * Check if @appsink supports buffer lists.
+ *
+ * Returns: %TRUE if @appsink supports buffer lists.
+ *
+ */
+gboolean
+gst_app_sink_get_buffer_list_support (GstAppSink * appsink)
+{
+  gboolean result;
+  GstAppSinkPrivate *priv;
+
+  g_return_val_if_fail (GST_IS_APP_SINK (appsink), FALSE);
+
+  priv = appsink->priv;
+
+  g_mutex_lock (&priv->mutex);
+  result = priv->buffer_lists_supported;
+  g_mutex_unlock (&priv->mutex);
+
+  return result;
+}
+
+/**
  * gst_app_sink_set_wait_on_eos:
  * @appsink: a #GstAppSink
  * @wait: the new state
@@ -1418,7 +1526,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
 {
   GstAppSinkPrivate *priv;
   GstSample *sample = NULL;
-  GstBuffer *buffer;
+  GstMiniObject *obj;
   gboolean timeout_valid;
   gint64 end_time;
 
@@ -1454,10 +1562,18 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
       g_cond_wait (&priv->cond, &priv->mutex);
     }
   }
-  buffer = dequeue_buffer (appsink);
-  GST_DEBUG_OBJECT (appsink, "we have a buffer %p", buffer);
-  sample = gst_sample_new (buffer, priv->last_caps, &priv->last_segment, NULL);
-  gst_buffer_unref (buffer);
+
+  obj = dequeue_buffer (appsink);
+  if (GST_IS_BUFFER (obj)) {
+    GST_DEBUG_OBJECT (appsink, "we have a buffer %p", obj);
+    sample = gst_sample_new (GST_BUFFER_CAST (obj), priv->last_caps,
+        &priv->last_segment, NULL);
+  } else {
+    GST_DEBUG_OBJECT (appsink, "we have a list %p", obj);
+    sample = gst_sample_new (NULL, priv->last_caps, &priv->last_segment, NULL);
+    gst_sample_set_buffer_list (sample, GST_BUFFER_LIST_CAST (obj));
+  }
+  gst_mini_object_unref (obj);
 
   g_cond_signal (&priv->cond);
   g_mutex_unlock (&priv->mutex);
index 8217d26..bd26399 100644 (file)
@@ -117,6 +117,9 @@ guint           gst_app_sink_get_max_buffers  (GstAppSink *appsink);
 void            gst_app_sink_set_drop         (GstAppSink *appsink, gboolean drop);
 gboolean        gst_app_sink_get_drop         (GstAppSink *appsink);
 
+void            gst_app_sink_set_buffer_list_support  (GstAppSink *appsink, gboolean drop);
+gboolean        gst_app_sink_get_buffer_list_support  (GstAppSink *appsink);
+
 void            gst_app_sink_set_wait_on_eos  (GstAppSink *appsink, gboolean wait);
 gboolean        gst_app_sink_get_wait_on_eos  (GstAppSink *appsink);
 
index aabeb00..a5b2c82 100644 (file)
@@ -224,7 +224,7 @@ create_buffer_list (void)
 }
 
 static GstFlowReturn
-callback_function_sample (GstAppSink * appsink, gpointer p_counter)
+callback_function_sample_fallback (GstAppSink * appsink, gpointer p_counter)
 {
   GstSample *sample;
   GstBuffer *buf;
@@ -260,16 +260,50 @@ callback_function_sample (GstAppSink * appsink, gpointer p_counter)
   return GST_FLOW_OK;
 }
 
+static GstFlowReturn
+callback_function_sample (GstAppSink * appsink, gpointer p_counter)
+{
+  GstSample *sample;
+  GstBufferList *list;
+  gint *p_int_counter = p_counter;
+  guint len;
+  gint i;
+
+  sample = gst_app_sink_pull_sample (appsink);
+  list = gst_sample_get_buffer_list (sample);
+  fail_unless (GST_IS_BUFFER_LIST (list));
+  len = gst_buffer_list_length (list);
+  fail_unless_equals_int (len, 3);
+
+  for (i = 0; i < len; i++) {
+    GstBuffer *buf = gst_buffer_list_get (list, i);
+    fail_unless_equals_int (gst_buffer_get_size (buf), sizeof (gint));
+    gst_check_buffer_data (buf, &values[i], sizeof (gint));
+  }
+
+  gst_sample_unref (sample);
+
+  *p_int_counter += 1;
+
+  return GST_FLOW_OK;
+}
+
 GST_START_TEST (test_buffer_list_fallback)
 {
   GstElement *sink;
   GstBufferList *list;
   GstAppSinkCallbacks callbacks = { NULL };
   gint counter = 0;
+  gboolean buffer_list_support;
 
   sink = setup_appsink ();
 
-  callbacks.new_sample = callback_function_sample;
+  /* verify that the buffer list support is disabled per default */
+  g_object_get (sink, "buffer-list", &buffer_list_support, NULL);
+  fail_unless (buffer_list_support == FALSE);
+
+
+  callbacks.new_sample = callback_function_sample_fallback;
 
   gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &counter, NULL);
 
@@ -286,6 +320,35 @@ GST_START_TEST (test_buffer_list_fallback)
 
 GST_END_TEST;
 
+GST_START_TEST (test_buffer_list_support)
+{
+  GstElement *sink;
+  GstBufferList *list;
+  GstAppSinkCallbacks callbacks = { NULL };
+  gint counter = 0;
+
+  sink = setup_appsink ();
+
+  /* enable buffer list support */
+  g_object_set (sink, "buffer-list", TRUE, NULL);
+
+  callbacks.new_sample = callback_function_sample;
+
+  gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &counter, NULL);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  list = create_buffer_list ();
+  fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
+
+  fail_unless_equals_int (counter, 1);
+
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_appsink (sink);
+}
+
+GST_END_TEST;
+
 GST_START_TEST (test_buffer_list_fallback_signal)
 {
   GstElement *sink;
@@ -295,6 +358,36 @@ GST_START_TEST (test_buffer_list_fallback_signal)
   sink = setup_appsink ();
 
   /* C calling convention to the rescue.. */
+  g_signal_connect (sink, "new-sample",
+      G_CALLBACK (callback_function_sample_fallback), &counter);
+
+  g_object_set (sink, "emit-signals", TRUE, NULL);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  list = create_buffer_list ();
+  fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
+
+  fail_unless_equals_int (counter, 3);
+
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_appsink (sink);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_buffer_list_signal)
+{
+  GstElement *sink;
+  GstBufferList *list;
+  gint counter = 0;
+
+  sink = setup_appsink ();
+
+  /* enable buffer list support */
+  g_object_set (sink, "buffer-list", TRUE, NULL);
+
+  /* C calling convention to the rescue.. */
   g_signal_connect (sink, "new-sample", G_CALLBACK (callback_function_sample),
       &counter);
 
@@ -305,7 +398,7 @@ GST_START_TEST (test_buffer_list_fallback_signal)
   list = create_buffer_list ();
   fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
 
-  fail_unless_equals_int (counter, 3);
+  fail_unless_equals_int (counter, 1);
 
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
   cleanup_appsink (sink);
@@ -422,7 +515,9 @@ appsink_suite (void)
   tcase_add_test (tc_chain, test_notify0);
   tcase_add_test (tc_chain, test_notify1);
   tcase_add_test (tc_chain, test_buffer_list_fallback);
+  tcase_add_test (tc_chain, test_buffer_list_support);
   tcase_add_test (tc_chain, test_buffer_list_fallback_signal);
+  tcase_add_test (tc_chain, test_buffer_list_signal);
   tcase_add_test (tc_chain, test_segment);
   tcase_add_test (tc_chain, test_pull_with_timeout);
 
index 1a7184e..e954c23 100644 (file)
@@ -1,4 +1,5 @@
 EXPORTS
+       gst_app_sink_get_buffer_list_support
        gst_app_sink_get_caps
        gst_app_sink_get_drop
        gst_app_sink_get_emit_signals
@@ -8,6 +9,7 @@ EXPORTS
        gst_app_sink_is_eos
        gst_app_sink_pull_preroll
        gst_app_sink_pull_sample
+       gst_app_sink_set_buffer_list_support
        gst_app_sink_set_callbacks
        gst_app_sink_set_caps
        gst_app_sink_set_drop