#include <gst/glib-compat-private.h>
#include "gstbasesrc.h"
-#include "gsttypefindhelper.h"
#include <gst/gst-i18n-lib.h>
GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
#define DEFAULT_BLOCKSIZE 4096
#define DEFAULT_NUM_BUFFERS -1
-#define DEFAULT_TYPEFIND FALSE
#define DEFAULT_DO_TIMESTAMP FALSE
enum
PROP_0,
PROP_BLOCKSIZE,
PROP_NUM_BUFFERS,
+#ifndef GST_REMOVE_DEPRECATED
PROP_TYPEFIND,
+#endif
PROP_DO_TIMESTAMP
};
-#define GST_BASE_SRC_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
-
/* The basesrc implementation need to respect the following locking order:
* 1. STREAM_LOCK
* 2. LIVE_LOCK
volatile gint have_events; /* OBJECT_LOCK */
/* QoS *//* with LOCK */
- gboolean qos_enabled; /* unused */
gdouble proportion; /* OBJECT_LOCK */
GstClockTime earliest_time; /* OBJECT_LOCK */
GstAllocationParams params; /* OBJECT_LOCK */
GCond async_cond; /* OBJECT_LOCK */
+
+ /* for _submit_buffer_list() */
+ GstBufferList *pending_bufferlist;
};
+#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
+ ((src)->priv->pending_bufferlist != NULL)
+
static GstElementClass *parent_class = NULL;
+static gint private_offset = 0;
static void gst_base_src_class_init (GstBaseSrcClass * klass);
static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
_type = g_type_register_static (GST_TYPE_ELEMENT,
"GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
+
+ private_offset =
+ g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate));
+
g_once_init_leave (&base_src_type, _type);
}
return base_src_type;
}
+static inline GstBaseSrcPrivate *
+gst_base_src_get_instance_private (GstBaseSrc * self)
+{
+ return (G_STRUCT_MEMBER_P (self, private_offset));
+}
+
static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
GstCaps * filter);
static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
-static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
- gboolean active);
+static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc,
+ gboolean flushing);
static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
GstSegment * segment);
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
- GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
+ if (private_offset != 0)
+ g_type_class_adjust_private_offset (klass, &private_offset);
- g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
+ GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
parent_class = g_type_class_peek_parent (klass);
"Number of buffers to output before sending EOS (-1 = unlimited)",
-1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
+#ifndef GST_REMOVE_DEPRECATED
g_object_class_install_property (gobject_class, PROP_TYPEFIND,
g_param_spec_boolean ("typefind", "Typefind",
- "Run typefind before negotiating", DEFAULT_TYPEFIND,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ "Run typefind before negotiating (deprecated, non-functional)", FALSE,
+ G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS));
+#endif
g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
g_param_spec_boolean ("do-timestamp", "Do timestamp",
"Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
GstPad *pad;
GstPadTemplate *pad_template;
- basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
+ basesrc->priv = gst_base_src_get_instance_private (basesrc);
basesrc->is_live = FALSE;
g_mutex_init (&basesrc->live_lock);
basesrc->clock_id = NULL;
/* we operate in BYTES by default */
gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
- basesrc->typefind = DEFAULT_TYPEFIND;
basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
g_atomic_int_set (&basesrc->priv->have_events, FALSE);
* that can't return an authoritative size and only know that they're EOS
* when trying to read more should set this to %FALSE.
*
+ * When @src operates in %GST_FORMAT_TIME, #GstBaseSrc will send an EOS
+ * when a buffer outside of the currently configured segment is pushed if
+ * @automatic_eos is %TRUE. Since 1.16, if @automatic_eos is %FALSE an
+ * EOS will be pushed only when the #GstBaseSrc.create implementation
+ * returns %GST_FLOW_EOS.
+ *
* Since: 1.4
*/
void
src->priv->pending_eos = event;
GST_OBJECT_UNLOCK (src);
- gst_base_src_activate_pool (src, FALSE);
+ gst_base_src_set_pool_flushing (src, TRUE);
if (bclass->unlock)
bclass->unlock (src);
GST_PAD_STREAM_LOCK (src->srcpad);
if (bclass->unlock_stop)
bclass->unlock_stop (src);
+ gst_base_src_set_pool_flushing (src, TRUE);
GST_PAD_STREAM_UNLOCK (src->srcpad);
}
/* sending random SEGMENT downstream can break sync. */
break;
case GST_EVENT_TAG:
+ case GST_EVENT_SINK_MESSAGE:
case GST_EVENT_CUSTOM_DOWNSTREAM:
case GST_EVENT_CUSTOM_BOTH:
case GST_EVENT_PROTECTION:
case PROP_NUM_BUFFERS:
src->num_buffers = g_value_get_int (value);
break;
+#ifndef GST_REMOVE_DEPRECATED
case PROP_TYPEFIND:
src->typefind = g_value_get_boolean (value);
break;
+#endif
case PROP_DO_TIMESTAMP:
gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
break;
case PROP_NUM_BUFFERS:
g_value_set_int (value, src->num_buffers);
break;
+#ifndef GST_REMOVE_DEPRECATED
case PROP_TYPEFIND:
g_value_set_boolean (value, src->typefind);
break;
+#endif
case PROP_DO_TIMESTAMP:
g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
break;
/* ERRORS */
unexpected_length:
{
- GST_WARNING_OBJECT (src, "processing at or past EOS");
+ GST_DEBUG_OBJECT (src, "processing at or past EOS");
return FALSE;
}
}
GstClockReturn status;
GstBuffer *res_buf;
GstBuffer *in_buf;
+ gboolean own_res_buf;
bclass = GST_BASE_SRC_GET_CLASS (src);
G_GINT64_FORMAT, offset, length, src->segment.time);
res_buf = in_buf = *buf;
+ own_res_buf = (*buf == NULL);
GST_LIVE_UNLOCK (src);
ret = bclass->create (src, offset, length, &res_buf);
GstFlowReturn wait_ret;
wait_ret = gst_base_src_wait_playing_unlocked (src);
if (wait_ret != GST_FLOW_OK) {
- if (ret == GST_FLOW_OK && *buf == NULL)
+ if (ret == GST_FLOW_OK && own_res_buf)
gst_buffer_unref (res_buf);
ret = wait_ret;
goto stopped;
* discard when the create function returned _OK. */
if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
if (ret == GST_FLOW_OK) {
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
}
src->priv->forced_eos = TRUE;
res_buf = in_buf;
}
+ if (res_buf == NULL) {
+ GstBufferList *pending_list = src->priv->pending_bufferlist;
+
+ if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
+ goto null_buffer;
+
+ res_buf = gst_buffer_list_get_writable (pending_list, 0);
+ own_res_buf = FALSE;
+ }
+
/* no timestamp set and we are at offset 0, we can timestamp with 0 */
if (offset == 0 && src->segment.time == 0
&& GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
/* this case is triggered when we were waiting for the clock and
* it got unlocked because we did a state change. In any case, get rid of
* the buffer. */
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
if (!src->live_running) {
GST_ELEMENT_ERROR (src, CORE, CLOCK,
(_("Internal clock error.")),
("clock returned unexpected return value %d", status));
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
ret = GST_FLOW_ERROR;
break;
GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
(_("Failed to map buffer.")),
("failed to map result buffer in WRITE mode"));
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
return GST_FLOW_ERROR;
}
flushing:
{
GST_DEBUG_OBJECT (src, "we are flushing");
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
return GST_FLOW_FLUSHING;
}
GST_DEBUG_OBJECT (src, "we are EOS");
return GST_FLOW_EOS;
}
+null_buffer:
+ {
+ GST_ELEMENT_ERROR (src, STREAM, FAILED,
+ (_("Internal data flow error.")),
+ ("Subclass %s neither returned a buffer nor submitted a buffer list "
+ "from its create function", G_OBJECT_TYPE_NAME (src)));
+ return GST_FLOW_ERROR;
+ }
}
static GstFlowReturn
GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
GST_TIME_ARGS (position), blocksize);
+ /* clean up just in case we got interrupted or so last time round */
+ if (src->priv->pending_bufferlist != NULL) {
+ gst_buffer_list_unref (src->priv->pending_bufferlist);
+ src->priv->pending_bufferlist = NULL;
+ }
+
ret = gst_base_src_get_range (src, position, blocksize, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
GST_LIVE_UNLOCK (src);
goto pause;
}
- /* this should not happen */
- if (G_UNLIKELY (buf == NULL))
- goto null_buffer;
+
+ /* Note: at this point buf might be a single buf returned which we own or
+ * the first buf of a pending buffer list submitted via submit_buffer_list(),
+ * in which case the buffer is owned by the pending buffer list and not us. */
+ g_assert (buf != NULL);
/* push events to close/start our segment before we push the buffer. */
if (G_UNLIKELY (src->priv->segment_pending)) {
/* positive rate, check if we reached the stop */
if (src->segment.stop != -1) {
if (position >= src->segment.stop) {
- eos = TRUE;
+ if (g_atomic_int_get (&src->priv->automatic_eos))
+ eos = TRUE;
position = src->segment.stop;
}
}
/* negative rate, check if we reached the start. start is always set to
* something different from -1 */
if (position <= src->segment.start) {
- eos = TRUE;
+ if (g_atomic_int_get (&src->priv->automatic_eos))
+ eos = TRUE;
position = src->segment.start;
}
/* when going reverse, all buffers are DISCONT */
}
GST_LIVE_UNLOCK (src);
- ret = gst_pad_push (pad, buf);
+ /* push buffer or buffer list */
+ if (src->priv->pending_bufferlist != NULL) {
+ ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
+ src->priv->pending_bufferlist = NULL;
+ } else {
+ ret = gst_pad_push (pad, buf);
+ }
+
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (ret == GST_FLOW_NOT_NEGOTIATED) {
goto not_negotiated;
}
goto done;
}
-null_buffer:
- {
- GST_ELEMENT_ERROR (src, STREAM, FAILED,
- (_("Internal data flow error.")), ("element returned NULL buffer"));
- GST_LIVE_UNLOCK (src);
- goto done;
- }
}
static gboolean
}
}
-static gboolean
-gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
+static void
+gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing)
{
GstBaseSrcPrivate *priv = basesrc->priv;
GstBufferPool *pool;
- gboolean res = TRUE;
GST_OBJECT_LOCK (basesrc);
if ((pool = priv->pool))
GST_OBJECT_UNLOCK (basesrc);
if (pool) {
- res = gst_buffer_pool_set_active (pool, active);
+ gst_buffer_pool_set_flushing (pool, flushing);
gst_object_unref (pool);
}
- return res;
}
GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
- /* stop flushing now but for live sources, still block in the LIVE lock when
- * we are not yet PLAYING */
- gst_base_src_set_flushing (basesrc, FALSE);
-
gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
GST_OBJECT_LOCK (basesrc->srcpad);
/* flush all */
gst_base_src_set_flushing (basesrc, TRUE);
+
/* stop the task */
gst_pad_stop_task (basesrc->srcpad);
+ /* stop flushing, this will balance unlock/unlock_stop calls */
+ gst_base_src_set_flushing (basesrc, FALSE);
GST_OBJECT_LOCK (basesrc);
if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
if (bclass->stop)
result = bclass->stop (basesrc);
+ if (basesrc->priv->pending_bufferlist != NULL) {
+ gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
+ basesrc->priv->pending_bufferlist = NULL;
+ }
+
gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
return result;
GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
if (flushing) {
- gst_base_src_activate_pool (basesrc, FALSE);
+ gst_base_src_set_pool_flushing (basesrc, TRUE);
/* unlock any subclasses to allow turning off the streaming thread */
if (bclass->unlock)
bclass->unlock (basesrc);
if (basesrc->clock_id)
gst_clock_id_unschedule (basesrc->clock_id);
} else {
- gst_base_src_activate_pool (basesrc, TRUE);
+ gst_base_src_set_pool_flushing (basesrc, FALSE);
/* Drop all delayed events */
GST_OBJECT_LOCK (basesrc);
*params = src->priv->params;
GST_OBJECT_UNLOCK (src);
}
+
+/**
+ * gst_base_src_submit_buffer_list:
+ * @src: a #GstBaseSrc
+ * @buffer_list: (transfer full): a #GstBufferList
+ *
+ * Subclasses can call this from their create virtual method implementation
+ * to submit a buffer list to be pushed out later. This is useful in
+ * cases where the create function wants to produce multiple buffers to be
+ * pushed out in one go in form of a #GstBufferList, which can reduce overhead
+ * drastically, especially for packetised inputs (for data streams where
+ * the packetisation/chunking is not important it is usually more efficient
+ * to return larger buffers instead).
+ *
+ * Subclasses that use this function from their create function must return
+ * %GST_FLOW_OK and no buffer from their create virtual method implementation.
+ * If a buffer is returned after a buffer list has also been submitted via this
+ * function the behaviour is undefined.
+ *
+ * Subclasses must only call this function once per create function call and
+ * subclasses must only call this function when the source operates in push
+ * mode.
+ *
+ * Since: 1.14
+ */
+void
+gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
+{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+ g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
+ g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
+
+ /* we need it to be writable later in get_range() where we use get_writable */
+ src->priv->pending_bufferlist = gst_buffer_list_make_writable (buffer_list);
+
+ GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
+ gst_buffer_list_length (buffer_list));
+}