* </para>
* <para>
* Since GStreamer 0.10.16 an application may send an EOS event to a source
- * element to make it send an EOS event downstream. This can typically be done
+ * element to make it perform the EOS logic (send EOS event downstream or post a
+ * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
* with the gst_element_send_event() function on the element or its parent bin.
* </para>
* <para>
GstEvent *close_segment;
GstEvent *start_segment;
- /* if EOS is pending */
- gboolean pending_eos;
+ /* if EOS is pending (atomic) */
+ gint pending_eos;
/* startup latency is the time it takes between going to PLAYING and producing
* the first BUFFER with running_time 0. This value is included in the latency
src = GST_BASE_SRC (element);
+ GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
+
switch (GST_EVENT_TYPE (event)) {
/* bidirectional events */
case GST_EVENT_FLUSH_START:
/* downstream serialized events */
case GST_EVENT_EOS:
- /* queue EOS and make sure the task or pull function
- * performs the EOS actions. */
+ {
+ GstBaseSrcClass *bclass;
+
+ bclass = GST_BASE_SRC_GET_CLASS (src);
+
+ /* queue EOS and make sure the task or pull function performs the EOS
+ * actions.
+ *
+ * We have two possibilities:
+ *
+ * - Before we are to enter the _create function, we check the pending_eos
+ * first and do EOS instead of entering it.
+ * - If we are in the _create function or we did not manage to set the
+ * flag fast enough and we are about to enter the _create function,
+ * we unlock it so that we exit with WRONG_STATE immediatly. We then
+ * check the EOS flag and do the EOS logic.
+ */
+ g_atomic_int_set (&src->priv->pending_eos, TRUE);
+ GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
+
+ /* unlock the _create function so that we can check the pending_eos flag
+ * and we can do EOS. This will eventually release the LIVE_LOCK again so
+ * that we can grab it and stop the unlock again. We don't take the stream
+ * lock so that this operation is guaranteed to never block. */
+ if (bclass->unlock)
+ bclass->unlock (src);
+
+ GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
+
GST_LIVE_LOCK (src);
- src->priv->pending_eos = TRUE;
+ GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
+ /* now stop the unlock of the streaming thread again. Grabbing the live
+ * lock is enough because that protects the create function. */
+ if (bclass->unlock_stop)
+ bclass->unlock_stop (src);
GST_LIVE_UNLOCK (src);
+
result = TRUE;
break;
+ }
case GST_EVENT_NEWSEGMENT:
/* sending random NEWSEGMENT downstream can break sync. */
break;
src->num_buffers_left--;
}
+ /* don't enter the create function if a pending EOS event was set. For the
+ * logic of the pending_eos, check the event function of this class. */
+ if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
+ goto eos;
+
GST_DEBUG_OBJECT (src,
"calling create offset %" G_GUINT64_FORMAT " length %u, time %"
G_GINT64_FORMAT, offset, length, src->segment.time);
ret = bclass->create (src, offset, length, buf);
+
+ /* The create function could be unlocked because we have a pending EOS. It's
+ * possible that we have a valid buffer from create that we need to
+ * discard when the create function returned _OK. */
+ if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
+ if (ret == GST_FLOW_OK) {
+ gst_buffer_unref (*buf);
+ *buf = NULL;
+ }
+ goto eos;
+ }
+
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto not_ok;
if (G_UNLIKELY (src->priv->flushing))
goto flushing;
- if (G_UNLIKELY (src->priv->pending_eos))
- goto eos;
-
switch (status) {
case GST_CLOCK_EARLY:
/* the buffer is too late. We currently don't drop the buffer. */
eos:
{
GST_DEBUG_OBJECT (src, "we are EOS");
- gst_buffer_unref (*buf);
- *buf = NULL;
return GST_FLOW_UNEXPECTED;
}
}
if (G_UNLIKELY (src->priv->flushing))
goto flushing;
- /* if we're EOS, return right away */
- if (G_UNLIKELY (src->priv->pending_eos))
- goto eos;
-
res = gst_base_src_get_range (src, offset, length, buf);
done:
res = GST_FLOW_WRONG_STATE;
goto done;
}
-eos:
- {
- GST_DEBUG_OBJECT (src, "we are EOS");
- res = GST_FLOW_UNEXPECTED;
- goto done;
- }
}
static gboolean
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
GST_LIVE_LOCK (src);
+
if (G_UNLIKELY (src->priv->flushing))
goto flushing;
- /* if we're EOS, return right away */
- if (G_UNLIKELY (src->priv->pending_eos))
- goto eos;
-
src->priv->last_sent_eos = FALSE;
blocksize = src->blocksize;
ret = GST_FLOW_WRONG_STATE;
goto pause;
}
-eos:
- {
- GST_DEBUG_OBJECT (src, "we are EOS");
- GST_LIVE_UNLOCK (src);
- ret = GST_FLOW_UNEXPECTED;
- goto pause;
- }
pause:
{
const gchar *reason = gst_flow_get_name (ret);
if (flushing) {
/* if we are locked in the live lock, signal it to make it flush */
basesrc->live_running = TRUE;
+
/* clear pending EOS if any */
- basesrc->priv->pending_eos = FALSE;
+ g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
/* step 1, now that we have the LIVE lock, clear our unlock request */
if (bclass->unlock_stop)
gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
basesrc->priv->last_sent_eos = TRUE;
}
- basesrc->priv->pending_eos = FALSE;
+ g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
event_p = &basesrc->data.ABI.pending_seek;
gst_event_replace (event_p, NULL);
event_p = &basesrc->priv->close_segment;