/**
* SECTION:gstbasesrc
+ * @title: GstBaseSrc
* @short_description: Base class for getrange based source elements
* @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
*
* This is a generic base class for source elements. The following
* types of sources are supported:
- * <itemizedlist>
- * <listitem><para>random access sources like files</para></listitem>
- * <listitem><para>seekable sources</para></listitem>
- * <listitem><para>live sources</para></listitem>
- * </itemizedlist>
+ *
+ * * random access sources like files
+ * * seekable sources
+ * * live sources
*
* The source can be configured to operate in any #GstFormat with the
* gst_base_src_set_format() method. The currently set format determines
*
* #GstBaseSrc always supports push mode scheduling. If the following
* conditions are met, it also supports pull mode scheduling:
- * <itemizedlist>
- * <listitem><para>The format is set to %GST_FORMAT_BYTES (default).</para>
- * </listitem>
- * <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
- * </listitem>
- * </itemizedlist>
+ *
+ * * The format is set to %GST_FORMAT_BYTES (default).
+ * * #GstBaseSrcClass.is_seekable() returns %TRUE.
*
* If all the conditions are met for operating in pull mode, #GstBaseSrc is
* automatically seekable in push mode as well. The following conditions must
* be met to make the element seekable in push mode when the format is not
* %GST_FORMAT_BYTES:
- * <itemizedlist>
- * <listitem><para>
- * #GstBaseSrcClass.is_seekable() returns %TRUE.
- * </para></listitem>
- * <listitem><para>
- * #GstBaseSrcClass.query() can convert all supported seek formats to the
- * internal format as set with gst_base_src_set_format().
- * </para></listitem>
- * <listitem><para>
- * #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
- * %TRUE.
- * </para></listitem>
- * </itemizedlist>
+ *
+ * * #GstBaseSrcClass.is_seekable() returns %TRUE.
+ * * #GstBaseSrcClass.query() can convert all supported seek formats to the
+ * internal format as set with gst_base_src_set_format().
+ * * #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
+ * %TRUE.
*
* When the element does not meet the requirements to operate in pull mode, the
* offset and length in the #GstBaseSrcClass.create() method should be ignored.
* There is only support in #GstBaseSrc for exactly one source pad, which
* should be named "src". A source implementation (subclass of #GstBaseSrc)
* should install a pad template in its class_init function, like so:
- * |[
+ * |[<!-- language="C" -->
* static void
* my_element_class_init (GstMyElementClass *klass)
* {
* GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
* // srctemplate should be a #GstStaticPadTemplate with direction
* // %GST_PAD_SRC and name "src"
- * gst_element_class_add_pad_template (gstelement_class,
- * gst_static_pad_template_get (&srctemplate));
+ * gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
*
* gst_element_class_set_static_metadata (gstelement_class,
* "Source name",
* "Source",
* "My Source element",
- * "The author <my.sink@my.email>");
+ * "The author <my.sink@my.email>");
* }
* ]|
*
- * <refsect2>
- * <title>Controlled shutdown of live sources in applications</title>
- * <para>
+ * ## Controlled shutdown of live sources in applications
+ *
* Applications that record from a live source may want to stop recording
* in a controlled way, so that the recording is stopped, but the data
* already in the pipeline is processed to the end (remember that many live
* After the EOS has been sent to the element, the application should wait for
* an EOS message to be posted on the pipeline's bus. Once this EOS message is
* received, it may safely shut down the entire pipeline.
- * </para>
- * </refsect2>
+ *
*/
#ifdef HAVE_CONFIG_H
#include <gst/glib-compat-private.h>
#include "gstbasesrc.h"
-#include "gsttypefindhelper.h"
#include <gst/gst-i18n-lib.h>
GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
#define DEFAULT_BLOCKSIZE 4096
#define DEFAULT_NUM_BUFFERS -1
-#define DEFAULT_TYPEFIND FALSE
#define DEFAULT_DO_TIMESTAMP FALSE
enum
PROP_0,
PROP_BLOCKSIZE,
PROP_NUM_BUFFERS,
+#ifndef GST_REMOVE_DEPRECATED
PROP_TYPEFIND,
+#endif
PROP_DO_TIMESTAMP
};
-#define GST_BASE_SRC_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
-
+/* The basesrc implementation need to respect the following locking order:
+ * 1. STREAM_LOCK
+ * 2. LIVE_LOCK
+ * 3. OBJECT_LOCK
+ */
struct _GstBaseSrcPrivate
{
- gboolean discont;
- gboolean flushing;
+ gboolean discont; /* STREAM_LOCK */
+ gboolean flushing; /* LIVE_LOCK */
- GstFlowReturn start_result;
- gboolean async;
+ GstFlowReturn start_result; /* OBJECT_LOCK */
+ gboolean async; /* OBJECT_LOCK */
/* if a stream-start event should be sent */
- gboolean stream_start_pending;
+ gboolean stream_start_pending; /* STREAM_LOCK */
/* if segment should be sent and a
* seqnum if it was originated by a seek */
- gboolean segment_pending;
- guint32 segment_seqnum;
+ gboolean segment_pending; /* OBJECT_LOCK */
+ guint32 segment_seqnum; /* OBJECT_LOCK */
/* if EOS is pending (atomic) */
- GstEvent *pending_eos;
- gint has_pending_eos;
+ GstEvent *pending_eos; /* OBJECT_LOCK */
+ gint has_pending_eos; /* atomic */
/* if the eos was caused by a forced eos from the application */
- gboolean forced_eos;
+ gboolean forced_eos; /* LIVE_LOCK */
/* startup latency is the time it takes between going to PLAYING and producing
* the first BUFFER with running_time 0. This value is included in the latency
* reporting. */
- GstClockTime latency;
+ GstClockTime latency; /* OBJECT_LOCK */
/* timestamp offset, this is the offset add to the values of gst_times for
* pseudo live sources */
- GstClockTimeDiff ts_offset;
+ GstClockTimeDiff ts_offset; /* OBJECT_LOCK */
- gboolean do_timestamp;
- volatile gint dynamic_size;
- volatile gint automatic_eos;
+ gboolean do_timestamp; /* OBJECT_LOCK */
+ volatile gint dynamic_size; /* atomic */
+ volatile gint automatic_eos; /* atomic */
/* stream sequence number */
- guint32 seqnum;
+ guint32 seqnum; /* STREAM_LOCK */
/* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
* pushed in the data stream */
- GList *pending_events;
- volatile gint have_events;
+ GList *pending_events; /* OBJECT_LOCK */
+ volatile gint have_events; /* OBJECT_LOCK */
/* QoS *//* with LOCK */
- gboolean qos_enabled;
- gdouble proportion;
- GstClockTime earliest_time;
+ gdouble proportion; /* OBJECT_LOCK */
+ GstClockTime earliest_time; /* OBJECT_LOCK */
- GstBufferPool *pool;
- GstAllocator *allocator;
- GstAllocationParams params;
+ GstBufferPool *pool; /* OBJECT_LOCK */
+ GstAllocator *allocator; /* OBJECT_LOCK */
+ GstAllocationParams params; /* OBJECT_LOCK */
- GCond async_cond;
+ GCond async_cond; /* OBJECT_LOCK */
+
+ /* for _submit_buffer_list() */
+ GstBufferList *pending_bufferlist;
};
+#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
+ ((src)->priv->pending_bufferlist != NULL)
+
static GstElementClass *parent_class = NULL;
+static gint private_offset = 0;
static void gst_base_src_class_init (GstBaseSrcClass * klass);
static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
_type = g_type_register_static (GST_TYPE_ELEMENT,
"GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
+
+ private_offset =
+ g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate));
+
g_once_init_leave (&base_src_type, _type);
}
return base_src_type;
}
+static inline GstBaseSrcPrivate *
+gst_base_src_get_instance_private (GstBaseSrc * self)
+{
+ return (G_STRUCT_MEMBER_P (self, private_offset));
+}
+
static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
GstCaps * filter);
static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
-static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
- gboolean active);
+static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc,
+ gboolean flushing);
static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
GstSegment * segment);
GstQuery * query);
static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
- gboolean flushing, gboolean live_play, gboolean * playing);
+ gboolean flushing);
static gboolean gst_base_src_start (GstBaseSrc * basesrc);
static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
- GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
+ if (private_offset != 0)
+ g_type_class_adjust_private_offset (klass, &private_offset);
- g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
+ GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
parent_class = g_type_class_peek_parent (klass);
"Number of buffers to output before sending EOS (-1 = unlimited)",
-1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
+#ifndef GST_REMOVE_DEPRECATED
g_object_class_install_property (gobject_class, PROP_TYPEFIND,
g_param_spec_boolean ("typefind", "Typefind",
- "Run typefind before negotiating", DEFAULT_TYPEFIND,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ "Run typefind before negotiating (deprecated, non-functional)", FALSE,
+ G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS));
+#endif
g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
g_param_spec_boolean ("do-timestamp", "Do timestamp",
"Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
GstPad *pad;
GstPadTemplate *pad_template;
- basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
+ basesrc->priv = gst_base_src_get_instance_private (basesrc);
basesrc->is_live = FALSE;
g_mutex_init (&basesrc->live_lock);
basesrc->clock_id = NULL;
/* we operate in BYTES by default */
gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
- basesrc->typefind = DEFAULT_TYPEFIND;
basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
g_atomic_int_set (&basesrc->priv->have_events, FALSE);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
+/* Call with LIVE_LOCK held */
+static GstFlowReturn
+gst_base_src_wait_playing_unlocked (GstBaseSrc * src)
+{
+ while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) {
+ /* block until the state changes, or we get a flush, or something */
+ GST_DEBUG_OBJECT (src, "live source waiting for running state");
+ GST_LIVE_WAIT (src);
+ GST_DEBUG_OBJECT (src, "live source unlocked");
+ }
+
+ if (src->priv->flushing)
+ goto flushing;
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG_OBJECT (src, "we are flushing");
+ return GST_FLOW_FLUSHING;
+ }
+}
+
+
/**
* gst_base_src_wait_playing:
* @src: the src
GstFlowReturn
gst_base_src_wait_playing (GstBaseSrc * src)
{
- g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
+ GstFlowReturn ret;
- do {
- /* block until the state changes, or we get a flush, or something */
- GST_DEBUG_OBJECT (src, "live source waiting for running state");
- GST_LIVE_WAIT (src);
- GST_DEBUG_OBJECT (src, "live source unlocked");
- if (src->priv->flushing)
- goto flushing;
- } while (G_UNLIKELY (!src->live_running));
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
- return GST_FLOW_OK;
+ GST_LIVE_LOCK (src);
+ ret = gst_base_src_wait_playing_unlocked (src);
+ GST_LIVE_UNLOCK (src);
- /* ERRORS */
-flushing:
- {
- GST_DEBUG_OBJECT (src, "we are flushing");
- return GST_FLOW_FLUSHING;
- }
+ return ret;
}
/**
* that can't return an authoritative size and only know that they're EOS
* when trying to read more should set this to %FALSE.
*
+ * When @src operates in %GST_FORMAT_TIME, #GstBaseSrc will send an EOS
+ * when a buffer outside of the currently configured segment is pushed if
+ * @automatic_eos is %TRUE. Since 1.16, if @automatic_eos is %FALSE an
+ * EOS will be pushed only when the #GstBaseSrc.create implementation
+ * returns %GST_FLOW_EOS.
+ *
* Since: 1.4
*/
void
* @max_latency: (out) (allow-none): the max latency of the source
*
* Query the source for the latency parameters. @live will be %TRUE when @src is
- * configured as a live source. @min_latency will be set to the difference
- * between the running time and the timestamp of the first buffer.
- * @max_latency is always the undefined value of -1.
+ * configured as a live source. @min_latency and @max_latency will be set
+ * to the difference between the running time and the timestamp of the first
+ * buffer.
*
* This function is mostly used by subclasses.
*
if (min_latency)
*min_latency = min;
if (max_latency)
- *max_latency = -1;
+ *max_latency = min;
GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
- GST_TIME_ARGS (-1));
+ GST_TIME_ARGS (min));
GST_OBJECT_UNLOCK (src);
return TRUE;
return res;
}
+/* called with STREAM_LOCK */
static gboolean
gst_base_src_send_stream_start (GstBaseSrc * src)
{
{
GstBaseSrcClass *bclass;
gboolean res = TRUE;
+ GstCaps *current_caps;
bclass = GST_BASE_SRC_GET_CLASS (src);
gst_base_src_send_stream_start (src);
- if (bclass->set_caps)
- res = bclass->set_caps (src, caps);
+ current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (src));
+ if (current_caps && gst_caps_is_equal (current_caps, caps)) {
+ GST_DEBUG_OBJECT (src, "New caps equal to old ones: %" GST_PTR_FORMAT,
+ caps);
+ res = TRUE;
+ } else {
+ if (bclass->set_caps)
+ res = bclass->set_caps (src, caps);
+
+ if (res)
+ res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
+ }
- if (res)
- res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
+ if (current_caps)
+ gst_caps_unref (current_caps);
return res;
}
} else
res = TRUE;
- gst_query_set_position (query, format, position);
+ if (res)
+ gst_query_set_position (query, format, position);
+
break;
}
}
* means that we cannot report the duration at all. */
res = TRUE;
}
- gst_query_set_duration (query, format, duration);
+
+ if (res)
+ gst_query_set_duration (query, format, duration);
+
break;
}
}
gst_base_src_seekable (src), 0, duration);
res = TRUE;
} else {
- /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
+ /* FIXME 2.0: return TRUE + seekable=FALSE for SEEKING query here */
/* Don't reply to the query to make up for demuxers which don't
* handle the SEEKING query yet. Players like Totem will fall back
* to the duration when the SEEKING query isn't answered. */
{
GstFlowReturn ret;
GstBaseSrcPrivate *priv = src->priv;
+ GstBufferPool *pool = NULL;
+ GstAllocator *allocator = NULL;
+ GstAllocationParams params;
+ GST_OBJECT_LOCK (src);
if (priv->pool) {
- ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
+ pool = gst_object_ref (priv->pool);
+ } else if (priv->allocator) {
+ allocator = gst_object_ref (priv->allocator);
+ }
+ params = priv->params;
+ GST_OBJECT_UNLOCK (src);
+
+ if (pool) {
+ ret = gst_buffer_pool_acquire_buffer (pool, buffer, NULL);
} else if (size != -1) {
- *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
+ *buffer = gst_buffer_new_allocate (allocator, size, ¶ms);
if (G_UNLIKELY (*buffer == NULL))
goto alloc_failed;
size);
goto alloc_failed;
}
+
+done:
+ if (pool)
+ gst_object_unref (pool);
+ if (allocator)
+ gst_object_unref (allocator);
+
return ret;
/* ERRORS */
alloc_failed:
{
GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
- return GST_FLOW_ERROR;
+ ret = GST_FLOW_ERROR;
+ goto done;
}
}
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
- gboolean flush, playing;
+ gboolean flush;
gboolean update;
gboolean relative_seek = FALSE;
gboolean seekseg_configured = FALSE;
/* unblock streaming thread. */
if (unlock)
- gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
+ gst_base_src_set_flushing (src, TRUE);
/* grab streaming lock, this should eventually be possible, either
* because the task is paused, our streaming thread stopped
}
if (unlock)
- gst_base_src_set_flushing (src, FALSE, playing, NULL);
+ gst_base_src_set_flushing (src, FALSE);
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
* copy the current segment info into the temp segment that we can actually
gst_element_post_message (GST_ELEMENT (src), message);
}
- /* for deriving a stop position for the playback segment from the seek
- * segment, we must take the duration when the stop is not set */
- /* FIXME: This is never used below */
- if ((stop = seeksegment.stop) == -1)
- stop = seeksegment.duration;
-
src->priv->segment_pending = TRUE;
src->priv->segment_seqnum = seqnum;
}
/* bidirectional events */
case GST_EVENT_FLUSH_START:
GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
+
result = gst_pad_push_event (src->srcpad, event);
- /* also unblock the create function */
- gst_base_src_activate_pool (src, FALSE);
- /* unlock any subclasses, we need to do this before grabbing the
- * LIVE_LOCK since we hold this lock before going into ::create. We pass an
- * unlock to the params because of backwards compat (see seek handler)*/
- if (bclass->unlock)
- bclass->unlock (src);
-
- /* the live lock is released when we are blocked, waiting for playing or
- * when we sync to the clock. */
- GST_LIVE_LOCK (src);
- src->priv->flushing = TRUE;
- /* clear pending EOS if any */
- if (g_atomic_int_get (&src->priv->has_pending_eos)) {
- GST_OBJECT_LOCK (src);
- CLEAR_PENDING_EOS (src);
- src->priv->forced_eos = FALSE;
- GST_OBJECT_UNLOCK (src);
- }
- if (bclass->unlock_stop)
- bclass->unlock_stop (src);
- if (src->clock_id)
- gst_clock_id_unschedule (src->clock_id);
- GST_DEBUG_OBJECT (src, "signal");
- GST_LIVE_SIGNAL (src);
- GST_LIVE_UNLOCK (src);
+ gst_base_src_set_flushing (src, TRUE);
event = NULL;
break;
case GST_EVENT_FLUSH_STOP:
{
gboolean start;
- GST_LIVE_LOCK (src);
- src->priv->segment_pending = TRUE;
- src->priv->flushing = FALSE;
+ GST_PAD_STREAM_LOCK (src->srcpad);
+ gst_base_src_set_flushing (src, FALSE);
+
GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
result = gst_pad_push_event (src->srcpad, event);
- gst_base_src_activate_pool (src, TRUE);
+ /* For external flush, restart the task .. */
+ GST_LIVE_LOCK (src);
+ src->priv->segment_pending = TRUE;
GST_OBJECT_LOCK (src->srcpad);
start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
GST_OBJECT_UNLOCK (src->srcpad);
+
+ /* ... and for live sources, only if in playing state */
+ if (src->is_live) {
+ if (!src->live_running)
+ start = FALSE;
+ }
+
if (start)
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
src->srcpad, NULL);
+
GST_LIVE_UNLOCK (src);
+ GST_PAD_STREAM_UNLOCK (src->srcpad);
+
event = NULL;
break;
}
/* downstream serialized events */
case GST_EVENT_EOS:
{
+ gboolean push_mode;
+
/* queue EOS and make sure the task or pull function performs the EOS
* actions.
*
- * We have two possibilities:
+ * For push mode, This will be done in 3 steps. It is required to not
+ * block here as gst_element_send_event() will hold the STATE_LOCK, hence
+ * blocking would prevent asynchronous state change to complete.
*
- * - Before we are to enter the _create function, we check the has_pending_eos
- * first and do EOS instead of entering it.
- * - If we are in the _create function or we did not manage to set the
- * flag fast enough and we are about to enter the _create function,
- * we unlock it so that we exit with FLUSHING immediately. We then
- * check the EOS flag and do the EOS logic.
+ * 1. We stop the streaming thread
+ * 2. We set the pending eos
+ * 3. We start the streaming thread again, so it is performed
+ * asynchronously.
+ *
+ * For pull mode, we simply mark the pending EOS without flushing.
*/
- GST_OBJECT_LOCK (src);
- g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
- if (src->priv->pending_eos)
- gst_event_unref (src->priv->pending_eos);
- src->priv->pending_eos = event;
- event = NULL;
- GST_OBJECT_UNLOCK (src);
- GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
+ GST_OBJECT_LOCK (src->srcpad);
+ push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
+ GST_OBJECT_UNLOCK (src->srcpad);
- /* unlock the _create function so that we can check the has_pending_eos flag
- * and we can do EOS. This will eventually release the LIVE_LOCK again so
- * that we can grab it and stop the unlock again. We don't take the stream
- * lock so that this operation is guaranteed to never block. */
- gst_base_src_activate_pool (src, FALSE);
- if (bclass->unlock)
- bclass->unlock (src);
+ if (push_mode) {
+ gst_base_src_set_flushing (src, TRUE);
- GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
+ GST_PAD_STREAM_LOCK (src->srcpad);
+ gst_base_src_set_flushing (src, FALSE);
- GST_LIVE_LOCK (src);
- GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
- /* now stop the unlock of the streaming thread again. Grabbing the live
- * lock is enough because that protects the create function. */
- if (bclass->unlock_stop)
- bclass->unlock_stop (src);
- gst_base_src_activate_pool (src, TRUE);
- GST_LIVE_UNLOCK (src);
+ GST_OBJECT_LOCK (src);
+ g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+ if (src->priv->pending_eos)
+ gst_event_unref (src->priv->pending_eos);
+ src->priv->pending_eos = event;
+ GST_OBJECT_UNLOCK (src);
+ GST_DEBUG_OBJECT (src,
+ "EOS marked, start task for asynchronous handling");
+ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+ src->srcpad, NULL);
+
+ GST_PAD_STREAM_UNLOCK (src->srcpad);
+ } else {
+ /* In pull mode, we need not to return flushing to downstream, though
+ * the stream lock is not kept after getrange was unblocked */
+ GST_OBJECT_LOCK (src);
+ g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+ if (src->priv->pending_eos)
+ gst_event_unref (src->priv->pending_eos);
+ src->priv->pending_eos = event;
+ GST_OBJECT_UNLOCK (src);
+
+ gst_base_src_set_pool_flushing (src, TRUE);
+ if (bclass->unlock)
+ bclass->unlock (src);
+
+ GST_PAD_STREAM_LOCK (src->srcpad);
+ if (bclass->unlock_stop)
+ bclass->unlock_stop (src);
+ gst_base_src_set_pool_flushing (src, TRUE);
+ GST_PAD_STREAM_UNLOCK (src->srcpad);
+ }
+
+
+ event = NULL;
result = TRUE;
break;
}
/* sending random SEGMENT downstream can break sync. */
break;
case GST_EVENT_TAG:
+ case GST_EVENT_SINK_MESSAGE:
case GST_EVENT_CUSTOM_DOWNSTREAM:
case GST_EVENT_CUSTOM_BOTH:
- /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
+ case GST_EVENT_PROTECTION:
+ /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH, PROTECTION in the dataflow */
GST_OBJECT_LOCK (src);
src->priv->pending_events =
g_list_append (src->priv->pending_events, event);
case GST_EVENT_FLUSH_START:
/* cancel any blocking getrange, is normally called
* when in pull mode. */
- result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
+ result = gst_base_src_set_flushing (src, TRUE);
break;
case GST_EVENT_FLUSH_STOP:
- result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
+ result = gst_base_src_set_flushing (src, FALSE);
break;
case GST_EVENT_QOS:
{
case PROP_NUM_BUFFERS:
src->num_buffers = g_value_get_int (value);
break;
+#ifndef GST_REMOVE_DEPRECATED
case PROP_TYPEFIND:
src->typefind = g_value_get_boolean (value);
break;
+#endif
case PROP_DO_TIMESTAMP:
gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
break;
case PROP_NUM_BUFFERS:
g_value_set_int (value, src->num_buffers);
break;
+#ifndef GST_REMOVE_DEPRECATED
case PROP_TYPEFIND:
g_value_set_boolean (value, src->typefind);
break;
+#endif
case PROP_DO_TIMESTAMP:
g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
break;
if (!GST_CLOCK_TIME_IS_VALID (dts)) {
if (do_timestamp) {
dts = running_time;
- } else {
+ } else if (!GST_CLOCK_TIME_IS_VALID (pts)) {
if (GST_CLOCK_TIME_IS_VALID (basesrc->segment.start)) {
dts = basesrc->segment.start;
} else {
{
guint64 size, maxsize;
GstBaseSrcClass *bclass;
- GstFormat format;
gint64 stop;
+ /* only operate if we are working with bytes */
+ if (src->segment.format != GST_FORMAT_BYTES)
+ return TRUE;
+
bclass = GST_BASE_SRC_GET_CLASS (src);
- format = src->segment.format;
stop = src->segment.stop;
/* get total file size */
size = src->segment.duration;
- /* only operate if we are working with bytes */
- if (format != GST_FORMAT_BYTES)
- return TRUE;
-
/* when not doing automatic EOS, just use the stop position. We don't use
* the size to check for EOS */
if (!g_atomic_int_get (&src->priv->automatic_eos))
if (!bclass->get_size (src, &size))
size = -1;
- /* make sure we don't exceed the configured segment stop
- * if it was set */
- if (stop != -1)
- maxsize = MIN (size, stop);
+ /* when not doing automatic EOS, just use the stop position. We don't use
+ * the size to check for EOS */
+ if (!g_atomic_int_get (&src->priv->automatic_eos))
+ maxsize = stop;
+ /* Otherwise, the max amount of bytes to read is the total
+ * size or up to the segment.stop if present. */
+ else if (stop != -1)
+ maxsize = size != -1 ? MIN (size, stop) : stop;
else
maxsize = size;
- /* if we are at or past the end, EOS */
- if (G_UNLIKELY (offset >= maxsize))
- goto unexpected_length;
-
- /* else we can clip to the end */
- if (G_UNLIKELY (offset + *length >= maxsize))
- *length = maxsize - offset;
+ if (maxsize != -1) {
+ /* if we are at or past the end, EOS */
+ if (G_UNLIKELY (offset >= maxsize))
+ goto unexpected_length;
+ /* else we can clip to the end */
+ if (G_UNLIKELY (offset + *length >= maxsize))
+ *length = maxsize - offset;
+ }
}
}
/* ERRORS */
unexpected_length:
{
+ GST_DEBUG_OBJECT (src, "processing at or past EOS");
return FALSE;
}
}
GstClockReturn status;
GstBuffer *res_buf;
GstBuffer *in_buf;
+ gboolean own_res_buf;
bclass = GST_BASE_SRC_GET_CLASS (src);
again:
if (src->is_live) {
if (G_UNLIKELY (!src->live_running)) {
- ret = gst_base_src_wait_playing (src);
+ ret = gst_base_src_wait_playing_unlocked (src);
if (ret != GST_FLOW_OK)
goto stopped;
}
G_GINT64_FORMAT, offset, length, src->segment.time);
res_buf = in_buf = *buf;
+ own_res_buf = (*buf == NULL);
+ GST_LIVE_UNLOCK (src);
ret = bclass->create (src, offset, length, &res_buf);
+ GST_LIVE_LOCK (src);
+
+ /* As we released the LIVE_LOCK, the state may have changed */
+ if (src->is_live) {
+ if (G_UNLIKELY (!src->live_running)) {
+ GstFlowReturn wait_ret;
+ wait_ret = gst_base_src_wait_playing_unlocked (src);
+ if (wait_ret != GST_FLOW_OK) {
+ if (ret == GST_FLOW_OK && own_res_buf)
+ gst_buffer_unref (res_buf);
+ ret = wait_ret;
+ goto stopped;
+ }
+ }
+ }
/* The create function could be unlocked because we have a pending EOS. It's
* possible that we have a valid buffer from create that we need to
* discard when the create function returned _OK. */
if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
if (ret == GST_FLOW_OK) {
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
}
src->priv->forced_eos = TRUE;
res_buf = in_buf;
}
+ if (res_buf == NULL) {
+ GstBufferList *pending_list = src->priv->pending_bufferlist;
+
+ if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
+ goto null_buffer;
+
+ res_buf = gst_buffer_list_get_writable (pending_list, 0);
+ own_res_buf = FALSE;
+ }
+
/* no timestamp set and we are at offset 0, we can timestamp with 0 */
if (offset == 0 && src->segment.time == 0
&& GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
/* this case is triggered when we were waiting for the clock and
* it got unlocked because we did a state change. In any case, get rid of
* the buffer. */
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
if (!src->live_running) {
GST_ELEMENT_ERROR (src, CORE, CLOCK,
(_("Internal clock error.")),
("clock returned unexpected return value %d", status));
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
ret = GST_FLOW_ERROR;
break;
GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
(_("Failed to map buffer.")),
("failed to map result buffer in WRITE mode"));
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
return GST_FLOW_ERROR;
}
flushing:
{
GST_DEBUG_OBJECT (src, "we are flushing");
- if (*buf == NULL)
+ if (own_res_buf)
gst_buffer_unref (res_buf);
return GST_FLOW_FLUSHING;
}
GST_DEBUG_OBJECT (src, "we are EOS");
return GST_FLOW_EOS;
}
+null_buffer:
+ {
+ GST_ELEMENT_ERROR (src, STREAM, FAILED,
+ (_("Internal data flow error.")),
+ ("Subclass %s neither returned a buffer nor submitted a buffer list "
+ "from its create function", G_OBJECT_TYPE_NAME (src)));
+ return GST_FLOW_ERROR;
+ }
}
static GstFlowReturn
}
}
+/* Called with STREAM_LOCK */
static void
gst_base_src_loop (GstPad * pad)
{
goto flushing;
GST_LIVE_UNLOCK (src);
+ /* Just return if EOS is pushed again, as the app might be unaware that an
+ * EOS have been sent already */
+ if (GST_PAD_IS_EOS (pad)) {
+ GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task");
+ gst_pad_pause_task (pad);
+ goto done;
+ }
+
gst_base_src_send_stream_start (src);
/* The stream-start event could've caused something to flush us */
GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
GST_TIME_ARGS (position), blocksize);
+ /* clean up just in case we got interrupted or so last time round */
+ if (src->priv->pending_bufferlist != NULL) {
+ gst_buffer_list_unref (src->priv->pending_bufferlist);
+ src->priv->pending_bufferlist = NULL;
+ }
+
ret = gst_base_src_get_range (src, position, blocksize, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
GST_LIVE_UNLOCK (src);
goto pause;
}
- /* this should not happen */
- if (G_UNLIKELY (buf == NULL))
- goto null_buffer;
+
+ /* Note: at this point buf might be a single buf returned which we own or
+ * the first buf of a pending buffer list submitted via submit_buffer_list(),
+ * in which case the buffer is owned by the pending buffer list and not us. */
+ g_assert (buf != NULL);
/* push events to close/start our segment before we push the buffer. */
if (G_UNLIKELY (src->priv->segment_pending)) {
/* positive rate, check if we reached the stop */
if (src->segment.stop != -1) {
if (position >= src->segment.stop) {
- eos = TRUE;
+ if (g_atomic_int_get (&src->priv->automatic_eos))
+ eos = TRUE;
position = src->segment.stop;
}
}
/* negative rate, check if we reached the start. start is always set to
* something different from -1 */
if (position <= src->segment.start) {
- eos = TRUE;
+ if (g_atomic_int_get (&src->priv->automatic_eos))
+ eos = TRUE;
position = src->segment.start;
}
/* when going reverse, all buffers are DISCONT */
}
GST_LIVE_UNLOCK (src);
- ret = gst_pad_push (pad, buf);
+ /* push buffer or buffer list */
+ if (src->priv->pending_bufferlist != NULL) {
+ ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
+ src->priv->pending_bufferlist = NULL;
+ } else {
+ ret = gst_pad_push (pad, buf);
+ }
+
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (ret == GST_FLOW_NOT_NEGOTIATED) {
goto not_negotiated;
* due to flushing and posting an error message because of
* that is the wrong thing to do, e.g. when we're doing
* a flushing seek. */
- GST_ELEMENT_ERROR (src, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)", reason, ret));
+ GST_ELEMENT_FLOW_ERROR (src, ret);
gst_pad_push_event (pad, event);
}
goto done;
}
-null_buffer:
- {
- GST_ELEMENT_ERROR (src, STREAM, FAILED,
- (_("Internal data flow error.")), ("element returned NULL buffer"));
- GST_LIVE_UNLOCK (src);
- goto done;
- }
}
static gboolean
}
}
-static gboolean
-gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
+static void
+gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing)
{
GstBaseSrcPrivate *priv = basesrc->priv;
GstBufferPool *pool;
- gboolean res = TRUE;
GST_OBJECT_LOCK (basesrc);
if ((pool = priv->pool))
GST_OBJECT_UNLOCK (basesrc);
if (pool) {
- res = gst_buffer_pool_set_active (pool, active);
+ gst_buffer_pool_set_flushing (pool, flushing);
gst_object_unref (pool);
}
- return res;
}
/* If change are not acceptable, fallback to generic pool */
if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min,
max)) {
- GST_DEBUG_OBJECT (basesrc, "unsuported pool, making new pool");
+ GST_DEBUG_OBJECT (basesrc, "unsupported pool, making new pool");
gst_object_unref (pool);
pool = gst_buffer_pool_new ();
GST_ELEMENT_ERROR (basesrc, RESOURCE, SETTINGS,
("Failed to configure the buffer pool"),
("Configuration is most likely invalid, please report this issue."));
+ gst_object_unref (pool);
return FALSE;
}
could_not_start:
{
GST_DEBUG_OBJECT (basesrc, "could not start");
- /* subclass is supposed to post a message. We don't have to call _stop. */
+ /* subclass is supposed to post a message but we post one as a fallback
+ * just in case. We don't have to call _stop. */
+ GST_ELEMENT_ERROR (basesrc, CORE, STATE_CHANGE, (NULL),
+ ("Failed to start"));
gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
return FALSE;
}
GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
- /* stop flushing now but for live sources, still block in the LIVE lock when
- * we are not yet PLAYING */
- gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
-
gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
GST_OBJECT_LOCK (basesrc->srcpad);
GST_DEBUG_OBJECT (basesrc, "stopping source");
/* flush all */
- gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
+ gst_base_src_set_flushing (basesrc, TRUE);
+
/* stop the task */
gst_pad_stop_task (basesrc->srcpad);
+ /* stop flushing, this will balance unlock/unlock_stop calls */
+ gst_base_src_set_flushing (basesrc, FALSE);
GST_OBJECT_LOCK (basesrc);
if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
if (bclass->stop)
result = bclass->stop (basesrc);
+ if (basesrc->priv->pending_bufferlist != NULL) {
+ gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
+ basesrc->priv->pending_bufferlist = NULL;
+ }
+
gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
return result;
/* start or stop flushing dataprocessing
*/
static gboolean
-gst_base_src_set_flushing (GstBaseSrc * basesrc,
- gboolean flushing, gboolean live_play, gboolean * playing)
+gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
{
GstBaseSrcClass *bclass;
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
- GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
+ GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
if (flushing) {
- gst_base_src_activate_pool (basesrc, FALSE);
- /* unlock any subclasses, we need to do this before grabbing the
- * LIVE_LOCK since we hold this lock before going into ::create. We pass an
- * unlock to the params because of backwards compat (see seek handler)*/
+ gst_base_src_set_pool_flushing (basesrc, TRUE);
+ /* unlock any subclasses to allow turning off the streaming thread */
if (bclass->unlock)
bclass->unlock (basesrc);
}
- /* the live lock is released when we are blocked, waiting for playing or
- * when we sync to the clock. */
+ /* the live lock is released when we are blocked, waiting for playing,
+ * when we sync to the clock or creating a buffer */
GST_LIVE_LOCK (basesrc);
- if (playing)
- *playing = basesrc->live_running;
basesrc->priv->flushing = flushing;
if (flushing) {
- /* if we are locked in the live lock, signal it to make it flush */
- basesrc->live_running = TRUE;
-
/* clear pending EOS if any */
if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
GST_OBJECT_LOCK (basesrc);
GST_OBJECT_UNLOCK (basesrc);
}
- /* step 1, now that we have the LIVE lock, clear our unlock request */
- if (bclass->unlock_stop)
- bclass->unlock_stop (basesrc);
-
- /* step 2, unblock clock sync (if any) or any other blocking thing */
+ /* unblock clock sync (if any) or any other blocking thing */
if (basesrc->clock_id)
gst_clock_id_unschedule (basesrc->clock_id);
} else {
- /* signal the live source that it can start playing */
- basesrc->live_running = live_play;
-
- gst_base_src_activate_pool (basesrc, TRUE);
+ gst_base_src_set_pool_flushing (basesrc, FALSE);
/* Drop all delayed events */
GST_OBJECT_LOCK (basesrc);
}
GST_OBJECT_UNLOCK (basesrc);
}
+
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
+ if (!flushing) {
+ /* Now wait for the stream lock to be released and clear our unlock request */
+ GST_PAD_STREAM_LOCK (basesrc->srcpad);
+ if (bclass->unlock_stop)
+ bclass->unlock_stop (basesrc);
+ GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
+ }
+
return TRUE;
}
static gboolean
gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
{
- GstBaseSrcClass *bclass;
-
- bclass = GST_BASE_SRC_GET_CLASS (basesrc);
-
- /* unlock subclasses locked in ::create, we only do this when we stop playing. */
- if (!live_play) {
- GST_DEBUG_OBJECT (basesrc, "unlock");
- if (bclass->unlock)
- bclass->unlock (basesrc);
- }
-
/* we are now able to grab the LIVE lock, when we get it, we can be
* waiting for PLAYING while blocked in the LIVE cond or we can be waiting
* for the clock. */
if (live_play) {
gboolean start;
- /* clear our unlock request when going to PLAYING */
- GST_DEBUG_OBJECT (basesrc, "unlock stop");
- if (bclass->unlock_stop)
- bclass->unlock_stop (basesrc);
-
/* for live sources we restart the timestamp correction */
+ GST_OBJECT_LOCK (basesrc);
basesrc->priv->latency = -1;
+ GST_OBJECT_UNLOCK (basesrc);
/* have to restart the task in case it stopped because of the unlock when
* we went to PAUSED. Only do this if we operating in push mode. */
GST_OBJECT_LOCK (basesrc->srcpad);
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
if (gst_base_src_is_live (basesrc)) {
- /* make sure we block in the live lock in PAUSED */
+ /* make sure we block in the live cond in PAUSED */
gst_base_src_set_playing (basesrc, FALSE);
no_preroll = TRUE;
}
* @src: a #GstBaseSrc
*
* Returns: (transfer full): the instance of the #GstBufferPool used
- * by the src; free it after use it
+ * by the src; unref it after usage.
*/
GstBufferPool *
gst_base_src_get_buffer_pool (GstBaseSrc * src)
{
+ GstBufferPool *ret = NULL;
+
g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
+ GST_OBJECT_LOCK (src);
if (src->priv->pool)
- return gst_object_ref (src->priv->pool);
+ ret = gst_object_ref (src->priv->pool);
+ GST_OBJECT_UNLOCK (src);
- return NULL;
+ return ret;
}
/**
* Lets #GstBaseSrc sub-classes to know the memory @allocator
* used by the base class and its @params.
*
- * Unref the @allocator after use it.
+ * Unref the @allocator after usage.
*/
void
gst_base_src_get_allocator (GstBaseSrc * src,
{
g_return_if_fail (GST_IS_BASE_SRC (src));
+ GST_OBJECT_LOCK (src);
if (allocator)
*allocator = src->priv->allocator ?
gst_object_ref (src->priv->allocator) : NULL;
if (params)
*params = src->priv->params;
+ GST_OBJECT_UNLOCK (src);
+}
+
+/**
+ * gst_base_src_submit_buffer_list:
+ * @src: a #GstBaseSrc
+ * @buffer_list: (transfer full): a #GstBufferList
+ *
+ * Subclasses can call this from their create virtual method implementation
+ * to submit a buffer list to be pushed out later. This is useful in
+ * cases where the create function wants to produce multiple buffers to be
+ * pushed out in one go in form of a #GstBufferList, which can reduce overhead
+ * drastically, especially for packetised inputs (for data streams where
+ * the packetisation/chunking is not important it is usually more efficient
+ * to return larger buffers instead).
+ *
+ * Subclasses that use this function from their create function must return
+ * %GST_FLOW_OK and no buffer from their create virtual method implementation.
+ * If a buffer is returned after a buffer list has also been submitted via this
+ * function the behaviour is undefined.
+ *
+ * Subclasses must only call this function once per create function call and
+ * subclasses must only call this function when the source operates in push
+ * mode.
+ *
+ * Since: 1.14
+ */
+void
+gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
+{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+ g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
+ g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
+
+ /* we need it to be writable later in get_range() where we use get_writable */
+ src->priv->pending_bufferlist = gst_buffer_list_make_writable (buffer_list);
+
+ GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
+ gst_buffer_list_length (buffer_list));
}