GstAllocationParams params; /* OBJECT_LOCK */
GCond async_cond; /* OBJECT_LOCK */
+
+ /* for _submit_buffer_list() */
+ GstBufferList *pending_bufferlist;
};
+#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
+ ((src)->priv->pending_bufferlist != NULL)
+
static GstElementClass *parent_class = NULL;
static void gst_base_src_class_init (GstBaseSrcClass * klass);
res_buf = in_buf;
}
+ if (res_buf == NULL) {
+ GstBufferList *pending_list = src->priv->pending_bufferlist;
+
+ if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
+ goto null_buffer;
+
+ res_buf = gst_buffer_list_get_writable (pending_list, 0);
+ own_res_buf = FALSE;
+ }
+
/* no timestamp set and we are at offset 0, we can timestamp with 0 */
if (offset == 0 && src->segment.time == 0
&& GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
GST_DEBUG_OBJECT (src, "we are EOS");
return GST_FLOW_EOS;
}
+null_buffer:
+ {
+ GST_ELEMENT_ERROR (src, STREAM, FAILED,
+ (_("Internal data flow error.")),
+ ("Subclass %s neither returned a buffer nor submitted a buffer list "
+ "from its create function", G_OBJECT_TYPE_NAME (src)));
+ return GST_FLOW_ERROR;
+ }
}
static GstFlowReturn
GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
GST_TIME_ARGS (position), blocksize);
+ /* clean up just in case we got interrupted or so last time round */
+ if (src->priv->pending_bufferlist != NULL) {
+ gst_buffer_list_unref (src->priv->pending_bufferlist);
+ src->priv->pending_bufferlist = NULL;
+ }
+
ret = gst_base_src_get_range (src, position, blocksize, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
GST_LIVE_UNLOCK (src);
goto pause;
}
- /* this should not happen */
- if (G_UNLIKELY (buf == NULL))
- goto null_buffer;
+
+ /* Note: at this point buf might be a single buf returned which we own or
+ * the first buf of a pending buffer list submitted via submit_buffer_list(),
+ * in which case the buffer is owned by the pending buffer list and not us. */
+ g_assert (buf != NULL);
/* push events to close/start our segment before we push the buffer. */
if (G_UNLIKELY (src->priv->segment_pending)) {
}
GST_LIVE_UNLOCK (src);
- ret = gst_pad_push (pad, buf);
+ /* push buffer or buffer list */
+ if (src->priv->pending_bufferlist != NULL) {
+ ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
+ src->priv->pending_bufferlist = NULL;
+ } else {
+ ret = gst_pad_push (pad, buf);
+ }
+
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (ret == GST_FLOW_NOT_NEGOTIATED) {
goto not_negotiated;
}
goto done;
}
-null_buffer:
- {
- GST_ELEMENT_ERROR (src, STREAM, FAILED,
- (_("Internal data flow error.")), ("element returned NULL buffer"));
- GST_LIVE_UNLOCK (src);
- goto done;
- }
}
static gboolean
if (bclass->stop)
result = bclass->stop (basesrc);
+ if (basesrc->priv->pending_bufferlist != NULL) {
+ gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
+ basesrc->priv->pending_bufferlist = NULL;
+ }
+
gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
return result;
*params = src->priv->params;
GST_OBJECT_UNLOCK (src);
}
+
+/**
+ * gst_base_src_submit_buffer_list:
+ * @src: a #GstBaseSrc
+ * @buffer_list: (transfer full): a #GstBufferList
+ *
+ * Subclasses can call this from their create virtual method implementation
+ * to submit a buffer list to be pushed out later. This is useful in
+ * cases where the create function wants to produce multiple buffers to be
+ * pushed out in one go in form of a #GstBufferList, which can reduce overhead
+ * drastically, especially for packetised inputs (for data streams where
+ * the packetisation/chunking is not important it is usually more efficient
+ * to return larger buffers instead).
+ *
+ * Subclasses that use this function from their create function must return
+ * %GST_FLOW_OK and no buffer from their create virtual method implementation.
+ * If a buffer is returned after a buffer list has also been submitted via this
+ * function the behaviour is undefined.
+ *
+ * Subclasses must only call this function once per create function call and
+ * subclasses must only call this function when the source operates in push
+ * mode.
+ *
+ * Since: 1.14
+ */
+void
+gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
+{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+ g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
+ g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
+
+ src->priv->pending_bufferlist = buffer_list;
+
+ GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
+ gst_buffer_list_length (buffer_list));
+}
*
* some unit tests for GstBaseSrc
*
- * Copyright (C) 2006 Tim-Philipp Müller <tim centricular net>
+ * Copyright (C) 2006-2017 Tim-Philipp Müller <tim centricular net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
GST_END_TEST;
+typedef GstBaseSrc TestSrc;
+typedef GstBaseSrcClass TestSrcClass;
+
+static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+static GType test_src_get_type (void);
+
+G_DEFINE_TYPE (TestSrc, test_src, GST_TYPE_BASE_SRC);
+
+static void
+test_src_init (TestSrc * src)
+{
+}
+
+static GstFlowReturn
+test_src_create (GstBaseSrc * src, guint64 offset, guint size,
+ GstBuffer ** p_buf)
+{
+ GstBuffer *buf;
+ static int num = 0;
+
+ fail_if (*p_buf != NULL);
+
+ buf = gst_buffer_new ();
+ GST_BUFFER_OFFSET (buf) = num++;
+
+ if (num == 1 || g_random_boolean ()) {
+ GstBufferList *buflist = gst_buffer_list_new ();
+
+ gst_buffer_list_add (buflist, buf);
+
+ buf = gst_buffer_new ();
+ GST_BUFFER_OFFSET (buf) = num++;
+ gst_buffer_list_add (buflist, buf);
+ gst_base_src_submit_buffer_list (src, buflist);
+ } else {
+ *p_buf = buf;
+ }
+
+ return GST_FLOW_OK;
+}
+
+static void
+test_src_class_init (TestSrcClass * klass)
+{
+ GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
+
+ gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
+ &src_template);
+
+ gstbasesrc_class->create = test_src_create;
+}
+
+static GstPad *mysinkpad;
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+#define NUM_BUFFERS 100
+static gboolean done;
+static guint expect_offset;
+
+static GstFlowReturn
+chain_____func (GstPad * pad, GstObject * parent, GstBuffer * buf)
+{
+ GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
+
+ fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
+ ++expect_offset;
+
+ if (GST_BUFFER_OFFSET (buf) > NUM_BUFFERS) {
+ g_mutex_lock (&check_mutex);
+ done = TRUE;
+ g_cond_signal (&check_cond);
+ g_mutex_unlock (&check_mutex);
+ }
+ gst_buffer_unref (buf);
+
+ return GST_FLOW_OK;
+}
+
+static GstFlowReturn
+chainlist_func (GstPad * pad, GstObject * parent, GstBufferList * list)
+{
+ guint i, len;
+
+ len = gst_buffer_list_length (list);
+
+ GST_DEBUG ("buffer list with %u buffers", len);
+ for (i = 0; i < len; ++i) {
+ GstBuffer *buf = gst_buffer_list_get (list, i);
+ GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
+
+ fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
+ ++expect_offset;
+ }
+
+ gst_buffer_list_unref (list);
+ return GST_FLOW_OK;
+}
+
+GST_START_TEST (basesrc_create_bufferlist)
+{
+ GstElement *src;
+
+ src = g_object_new (test_src_get_type (), NULL);
+
+ mysinkpad = gst_check_setup_sink_pad (src, &sinktemplate);
+ gst_pad_set_chain_function (mysinkpad, chain_____func);
+ gst_pad_set_chain_list_function (mysinkpad, chainlist_func);
+ gst_pad_set_active (mysinkpad, TRUE);
+
+ done = FALSE;
+ expect_offset = 0;
+
+ gst_element_set_state (src, GST_STATE_PLAYING);
+
+ g_mutex_lock (&check_mutex);
+ while (!done)
+ g_cond_wait (&check_cond, &check_mutex);
+ g_mutex_unlock (&check_mutex);
+
+ gst_element_set_state (src, GST_STATE_NULL);
+
+ gst_check_teardown_sink_pad (src);
+
+ gst_object_unref (src);
+}
+
+GST_END_TEST;
+
static Suite *
gst_basesrc_suite (void)
{
tcase_add_test (tc, basesrc_eos_events_pull_live_eos);
tcase_add_test (tc, basesrc_seek_events_rate_update);
tcase_add_test (tc, basesrc_seek_on_last_buffer);
+ tcase_add_test (tc, basesrc_create_bufferlist);
return s;
}