#define GST_BASE_SRC_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
+/* The basesrc implementation need to respect the following locking order:
+ * 1. STREAM_LOCK
+ * 2. LIVE_LOCK
+ * 3. OBJECT_LOCK
+ */
struct _GstBaseSrcPrivate
{
- gboolean discont;
- gboolean flushing;
+ gboolean discont; /* STREAM_LOCK */
+ gboolean flushing; /* LIVE_LOCK */
- GstFlowReturn start_result;
- gboolean async;
+ GstFlowReturn start_result; /* OBJECT_LOCK */
+ gboolean async; /* OBJECT_LOCK */
/* if a stream-start event should be sent */
- gboolean stream_start_pending;
+ gboolean stream_start_pending; /* STREAM_LOCK */
/* if segment should be sent and a
* seqnum if it was originated by a seek */
- gboolean segment_pending;
- guint32 segment_seqnum;
+ gboolean segment_pending; /* OBJECT_LOCK */
+ guint32 segment_seqnum; /* OBJECT_LOCK */
/* if EOS is pending (atomic) */
- GstEvent *pending_eos;
- gint has_pending_eos;
+ GstEvent *pending_eos; /* OBJECT_LOCK */
+ gint has_pending_eos; /* atomic */
/* if the eos was caused by a forced eos from the application */
- gboolean forced_eos;
+ gboolean forced_eos; /* LIVE_LOCK */
/* startup latency is the time it takes between going to PLAYING and producing
* the first BUFFER with running_time 0. This value is included in the latency
* reporting. */
- GstClockTime latency;
+ GstClockTime latency; /* OBJECT_LOCK */
/* timestamp offset, this is the offset add to the values of gst_times for
* pseudo live sources */
- GstClockTimeDiff ts_offset;
+ GstClockTimeDiff ts_offset; /* OBJECT_LOCK */
- gboolean do_timestamp;
- volatile gint dynamic_size;
- volatile gint automatic_eos;
+ gboolean do_timestamp; /* OBJECT_LOCK */
+ volatile gint dynamic_size; /* atomic */
+ volatile gint automatic_eos; /* atomic */
/* stream sequence number */
- guint32 seqnum;
+ guint32 seqnum; /* STREAM_LOCK */
/* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
* pushed in the data stream */
- GList *pending_events;
- volatile gint have_events;
+ GList *pending_events; /* OBJECT_LOCK */
+ volatile gint have_events; /* OBJECT_LOCK */
/* QoS *//* with LOCK */
- gboolean qos_enabled;
- gdouble proportion;
- GstClockTime earliest_time;
+ gboolean qos_enabled; /* unused */
+ gdouble proportion; /* OBJECT_LOCK */
+ GstClockTime earliest_time; /* OBJECT_LOCK */
- GstBufferPool *pool;
- GstAllocator *allocator;
- GstAllocationParams params;
+ GstBufferPool *pool; /* OBJECT_LOCK */
+ GstAllocator *allocator; /* OBJECT_LOCK */
+ GstAllocationParams params; /* OBJECT_LOCK */
- GCond async_cond;
+ GCond async_cond; /* OBJECT_LOCK */
};
static GstElementClass *parent_class = NULL;
GstQuery * query);
static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
- gboolean flushing, gboolean live_play, gboolean * playing);
+ gboolean flushing);
static gboolean gst_base_src_start (GstBaseSrc * basesrc);
static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
+/* Call with LIVE_LOCK held */
+static GstFlowReturn
+gst_base_src_wait_playing_unlocked (GstBaseSrc * src)
+{
+ while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) {
+ /* block until the state changes, or we get a flush, or something */
+ GST_DEBUG_OBJECT (src, "live source waiting for running state");
+ GST_LIVE_WAIT (src);
+ GST_DEBUG_OBJECT (src, "live source unlocked");
+ }
+
+ if (src->priv->flushing)
+ goto flushing;
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG_OBJECT (src, "we are flushing");
+ return GST_FLOW_FLUSHING;
+ }
+}
+
+
/**
* gst_base_src_wait_playing:
* @src: the src
GstFlowReturn
gst_base_src_wait_playing (GstBaseSrc * src)
{
- g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
+ GstFlowReturn ret;
- do {
- /* block until the state changes, or we get a flush, or something */
- GST_DEBUG_OBJECT (src, "live source waiting for running state");
- GST_LIVE_WAIT (src);
- GST_DEBUG_OBJECT (src, "live source unlocked");
- if (src->priv->flushing)
- goto flushing;
- } while (G_UNLIKELY (!src->live_running));
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
- return GST_FLOW_OK;
+ GST_LIVE_LOCK (src);
+ ret = gst_base_src_wait_playing_unlocked (src);
+ GST_LIVE_UNLOCK (src);
- /* ERRORS */
-flushing:
- {
- GST_DEBUG_OBJECT (src, "we are flushing");
- return GST_FLOW_FLUSHING;
- }
+ return ret;
}
/**
return res;
}
+/* called with STREAM_LOCK */
static gboolean
gst_base_src_send_stream_start (GstBaseSrc * src)
{
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
- gboolean flush, playing;
+ gboolean flush;
gboolean update;
gboolean relative_seek = FALSE;
gboolean seekseg_configured = FALSE;
/* unblock streaming thread. */
if (unlock)
- gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
+ gst_base_src_set_flushing (src, TRUE);
/* grab streaming lock, this should eventually be possible, either
* because the task is paused, our streaming thread stopped
}
if (unlock)
- gst_base_src_set_flushing (src, FALSE, playing, NULL);
+ gst_base_src_set_flushing (src, FALSE);
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
* copy the current segment info into the temp segment that we can actually
/* bidirectional events */
case GST_EVENT_FLUSH_START:
GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
+
result = gst_pad_push_event (src->srcpad, event);
- /* also unblock the create function */
- gst_base_src_activate_pool (src, FALSE);
- /* unlock any subclasses, we need to do this before grabbing the
- * LIVE_LOCK since we hold this lock before going into ::create. We pass an
- * unlock to the params because of backwards compat (see seek handler)*/
- if (bclass->unlock)
- bclass->unlock (src);
-
- /* the live lock is released when we are blocked, waiting for playing or
- * when we sync to the clock. */
- GST_LIVE_LOCK (src);
- src->priv->flushing = TRUE;
- /* clear pending EOS if any */
- if (g_atomic_int_get (&src->priv->has_pending_eos)) {
- GST_OBJECT_LOCK (src);
- CLEAR_PENDING_EOS (src);
- src->priv->forced_eos = FALSE;
- GST_OBJECT_UNLOCK (src);
- }
- if (bclass->unlock_stop)
- bclass->unlock_stop (src);
- if (src->clock_id)
- gst_clock_id_unschedule (src->clock_id);
- GST_DEBUG_OBJECT (src, "signal");
- GST_LIVE_SIGNAL (src);
- GST_LIVE_UNLOCK (src);
+ gst_base_src_set_flushing (src, TRUE);
event = NULL;
break;
case GST_EVENT_FLUSH_STOP:
{
gboolean start;
- GST_LIVE_LOCK (src);
- src->priv->segment_pending = TRUE;
- src->priv->flushing = FALSE;
+ GST_PAD_STREAM_LOCK (src->srcpad);
+ gst_base_src_set_flushing (src, FALSE);
+
GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
result = gst_pad_push_event (src->srcpad, event);
- gst_base_src_activate_pool (src, TRUE);
+ /* For external flush, restart the task .. */
+ GST_LIVE_LOCK (src);
+ src->priv->segment_pending = TRUE;
GST_OBJECT_LOCK (src->srcpad);
start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
GST_OBJECT_UNLOCK (src->srcpad);
+ /* ... and for live sources, only if in playing state */
if (src->is_live) {
if (!src->live_running)
start = FALSE;
if (start)
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
src->srcpad, NULL);
+
GST_LIVE_UNLOCK (src);
+ GST_PAD_STREAM_UNLOCK (src->srcpad);
+
event = NULL;
break;
}
/* downstream serialized events */
case GST_EVENT_EOS:
{
+ gboolean push_mode;
+
/* queue EOS and make sure the task or pull function performs the EOS
* actions.
*
- * We have two possibilities:
+ * For push mode, This will be done in 3 steps. It is required to not
+ * block here as gst_element_send_event() will hold the STATE_LOCK, hence
+ * blocking would prevent asynchronous state change to complete.
+ *
+ * 1. We stop the streaming thread
+ * 2. We set the pending eos
+ * 3. We start the streaming thread again, so it is performed
+ * asynchronously.
*
- * - Before we are to enter the _create function, we check the has_pending_eos
- * first and do EOS instead of entering it.
- * - If we are in the _create function or we did not manage to set the
- * flag fast enough and we are about to enter the _create function,
- * we unlock it so that we exit with FLUSHING immediately. We then
- * check the EOS flag and do the EOS logic.
+ * For pull mode, we simply mark the pending EOS without flushing.
*/
- GST_OBJECT_LOCK (src);
- g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
- if (src->priv->pending_eos)
- gst_event_unref (src->priv->pending_eos);
- src->priv->pending_eos = event;
- event = NULL;
- GST_OBJECT_UNLOCK (src);
- GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
+ GST_OBJECT_LOCK (src->srcpad);
+ push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
+ GST_OBJECT_UNLOCK (src->srcpad);
- /* unlock the _create function so that we can check the has_pending_eos flag
- * and we can do EOS. This will eventually release the LIVE_LOCK again so
- * that we can grab it and stop the unlock again. We don't take the stream
- * lock so that this operation is guaranteed to never block. */
- gst_base_src_activate_pool (src, FALSE);
- if (bclass->unlock)
- bclass->unlock (src);
+ if (push_mode) {
+ gst_base_src_set_flushing (src, TRUE);
- GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
+ GST_PAD_STREAM_LOCK (src->srcpad);
+ gst_base_src_set_flushing (src, FALSE);
- GST_LIVE_LOCK (src);
- GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
- /* now stop the unlock of the streaming thread again. Grabbing the live
- * lock is enough because that protects the create function. */
- if (bclass->unlock_stop)
- bclass->unlock_stop (src);
- gst_base_src_activate_pool (src, TRUE);
- GST_LIVE_UNLOCK (src);
+ GST_OBJECT_LOCK (src);
+ g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+ if (src->priv->pending_eos)
+ gst_event_unref (src->priv->pending_eos);
+ src->priv->pending_eos = event;
+ GST_OBJECT_UNLOCK (src);
+ GST_DEBUG_OBJECT (src,
+ "EOS marked, start task for asynchronous handling");
+ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+ src->srcpad, NULL);
+
+ GST_PAD_STREAM_UNLOCK (src->srcpad);
+ } else {
+ /* In pull mode, we need not to return flushing to downstream, though
+ * the stream lock is not kept after getrange was unblocked */
+ GST_OBJECT_LOCK (src);
+ g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
+ if (src->priv->pending_eos)
+ gst_event_unref (src->priv->pending_eos);
+ src->priv->pending_eos = event;
+ GST_OBJECT_UNLOCK (src);
+
+ gst_base_src_activate_pool (src, FALSE);
+ if (bclass->unlock)
+ bclass->unlock (src);
+
+ GST_PAD_STREAM_LOCK (src->srcpad);
+ if (bclass->unlock_stop)
+ bclass->unlock_stop (src);
+ GST_PAD_STREAM_UNLOCK (src->srcpad);
+ }
+
+
+ event = NULL;
result = TRUE;
break;
}
case GST_EVENT_FLUSH_START:
/* cancel any blocking getrange, is normally called
* when in pull mode. */
- result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
+ result = gst_base_src_set_flushing (src, TRUE);
break;
case GST_EVENT_FLUSH_STOP:
- result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
+ result = gst_base_src_set_flushing (src, FALSE);
break;
case GST_EVENT_QOS:
{
again:
if (src->is_live) {
if (G_UNLIKELY (!src->live_running)) {
- ret = gst_base_src_wait_playing (src);
+ ret = gst_base_src_wait_playing_unlocked (src);
if (ret != GST_FLOW_OK)
goto stopped;
}
res_buf = in_buf = *buf;
+ GST_LIVE_UNLOCK (src);
ret = bclass->create (src, offset, length, &res_buf);
+ GST_LIVE_LOCK (src);
+
+ /* As we released the LIVE_LOCK, the state may have changed */
+ if (src->is_live) {
+ if (G_UNLIKELY (!src->live_running)) {
+ GstFlowReturn wait_ret;
+ wait_ret = gst_base_src_wait_playing_unlocked (src);
+ if (wait_ret != GST_FLOW_OK) {
+ if (ret == GST_FLOW_OK && *buf == NULL)
+ gst_buffer_unref (res_buf);
+ ret = wait_ret;
+ goto stopped;
+ }
+ }
+ }
/* The create function could be unlocked because we have a pending EOS. It's
* possible that we have a valid buffer from create that we need to
}
}
+/* Called with STREAM_LOCK */
static void
gst_base_src_loop (GstPad * pad)
{
goto flushing;
GST_LIVE_UNLOCK (src);
+ /* Just return if EOS is pushed again, as the app might be unaware that an
+ * EOS have been sent already */
+ if (GST_PAD_IS_EOS (pad)) {
+ GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task");
+ gst_pad_pause_task (pad);
+ goto done;
+ }
+
gst_base_src_send_stream_start (src);
/* The stream-start event could've caused something to flush us */
/* stop flushing now but for live sources, still block in the LIVE lock when
* we are not yet PLAYING */
- gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
+ gst_base_src_set_flushing (basesrc, FALSE);
gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
GST_DEBUG_OBJECT (basesrc, "stopping source");
/* flush all */
- gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
+ gst_base_src_set_flushing (basesrc, TRUE);
/* stop the task */
gst_pad_stop_task (basesrc->srcpad);
/* start or stop flushing dataprocessing
*/
static gboolean
-gst_base_src_set_flushing (GstBaseSrc * basesrc,
- gboolean flushing, gboolean live_play, gboolean * playing)
+gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
{
GstBaseSrcClass *bclass;
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
- GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
+ GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
if (flushing) {
gst_base_src_activate_pool (basesrc, FALSE);
- /* unlock any subclasses, we need to do this before grabbing the
- * LIVE_LOCK since we hold this lock before going into ::create. We pass an
- * unlock to the params because of backwards compat (see seek handler)*/
+ /* unlock any subclasses to allow turning off the streaming thread */
if (bclass->unlock)
bclass->unlock (basesrc);
}
- /* the live lock is released when we are blocked, waiting for playing or
- * when we sync to the clock. */
+ /* the live lock is released when we are blocked, waiting for playing,
+ * when we sync to the clock or creating a buffer */
GST_LIVE_LOCK (basesrc);
- if (playing)
- *playing = basesrc->live_running;
basesrc->priv->flushing = flushing;
if (flushing) {
- /* if we are locked in the live lock, signal it to make it flush */
- basesrc->live_running = TRUE;
-
/* clear pending EOS if any */
if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
GST_OBJECT_LOCK (basesrc);
GST_OBJECT_UNLOCK (basesrc);
}
- /* step 1, now that we have the LIVE lock, clear our unlock request */
- if (bclass->unlock_stop)
- bclass->unlock_stop (basesrc);
-
- /* step 2, unblock clock sync (if any) or any other blocking thing */
+ /* unblock clock sync (if any) or any other blocking thing */
if (basesrc->clock_id)
gst_clock_id_unschedule (basesrc->clock_id);
} else {
- /* signal the live source that it can start playing */
- basesrc->live_running = live_play;
-
gst_base_src_activate_pool (basesrc, TRUE);
/* Drop all delayed events */
}
GST_OBJECT_UNLOCK (basesrc);
}
+
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
+ if (!flushing) {
+ /* Now wait for the stream lock to be released and clear our unlock request */
+ GST_PAD_STREAM_LOCK (basesrc->srcpad);
+ if (bclass->unlock_stop)
+ bclass->unlock_stop (basesrc);
+ GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
+ }
+
return TRUE;
}
static gboolean
gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
{
- GstBaseSrcClass *bclass;
-
- bclass = GST_BASE_SRC_GET_CLASS (basesrc);
-
- /* unlock subclasses locked in ::create, we only do this when we stop playing. */
- if (!live_play) {
- GST_DEBUG_OBJECT (basesrc, "unlock");
- if (bclass->unlock)
- bclass->unlock (basesrc);
- }
-
/* we are now able to grab the LIVE lock, when we get it, we can be
* waiting for PLAYING while blocked in the LIVE cond or we can be waiting
* for the clock. */
if (live_play) {
gboolean start;
- /* clear our unlock request when going to PLAYING */
- GST_DEBUG_OBJECT (basesrc, "unlock stop");
- if (bclass->unlock_stop)
- bclass->unlock_stop (basesrc);
-
/* for live sources we restart the timestamp correction */
GST_OBJECT_LOCK (basesrc);
basesrc->priv->latency = -1;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
if (gst_base_src_is_live (basesrc)) {
- /* make sure we block in the live lock in PAUSED */
+ /* make sure we block in the live cond in PAUSED */
gst_base_src_set_playing (basesrc, FALSE);
no_preroll = TRUE;
}