* #GstBaseSrcClass.is_seekable() returns %TRUE.
* </para></listitem>
* <listitem><para>
- * #GstBaseSrc:Class.query() can convert all supported seek formats to the
+ * #GstBaseSrcClass.query() can convert all supported seek formats to the
* internal format as set with gst_base_src_set_format().
* </para></listitem>
* <listitem><para>
GstClockTimeDiff ts_offset;
gboolean do_timestamp;
+ volatile gint dynamic_size;
/* stream sequence number */
guint32 seqnum;
- /* pending tags to be pushed in the data stream */
- GList *pending_tags;
+ /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
+ * pushed in the data stream */
+ GList *pending_events;
+ volatile gint have_events;
/* QoS *//* with LOCK */
gboolean qos_enabled;
static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
guint length, GstBuffer ** buf);
static gboolean gst_base_src_seekable (GstBaseSrc * src);
+static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
+ guint * length);
static void
gst_base_src_base_init (gpointer g_class)
gobject_class->set_property = gst_base_src_set_property;
gobject_class->get_property = gst_base_src_get_property;
+/* FIXME 0.11: blocksize property should be int, not ulong (min is >max here) */
g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
g_param_spec_ulong ("blocksize", "Block size",
"Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
+ g_atomic_int_set (&basesrc->priv->have_events, FALSE);
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
+ GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE);
GST_DEBUG_OBJECT (basesrc, "init done");
}
event_p = &basesrc->data.ABI.pending_seek;
gst_event_replace (event_p, NULL);
- if (basesrc->priv->pending_tags) {
- g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
- g_list_free (basesrc->priv->pending_tags);
+ if (basesrc->priv->pending_events) {
+ g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
+ NULL);
+ g_list_free (basesrc->priv->pending_events);
}
G_OBJECT_CLASS (parent_class)->finalize (object);
{
g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
- /* block until the state changes, or we get a flush, or something */
- GST_DEBUG_OBJECT (src, "live source waiting for running state");
- GST_LIVE_WAIT (src);
- if (src->priv->flushing)
- goto flushing;
- GST_DEBUG_OBJECT (src, "live source unlocked");
+ do {
+ /* block until the state changes, or we get a flush, or something */
+ GST_DEBUG_OBJECT (src, "live source waiting for running state");
+ GST_LIVE_WAIT (src);
+ GST_DEBUG_OBJECT (src, "live source unlocked");
+ if (src->priv->flushing)
+ goto flushing;
+ } while (G_UNLIKELY (!src->live_running));
return GST_FLOW_OK;
* for sending NEW_SEGMENT events and for performing seeks.
*
* If a format of GST_FORMAT_BYTES is set, the element will be able to
- * operate in pull mode if the #GstBaseSrc.is_seekable() returns TRUE.
+ * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
*
* This function must only be called in states < %GST_STATE_PAUSED.
*
}
/**
+ * gst_base_src_set_dynamic_size:
+ * @src: base source instance
+ * @dynamic: new dynamic size mode
+ *
+ * If not @dynamic, size is only updated when needed, such as when trying to
+ * read past current tracked size. Otherwise, size is checked for upon each
+ * read.
+ *
+ * Since: 0.10.35
+ */
+void
+gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
+{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
+ g_atomic_int_set (&src->priv->dynamic_size, dynamic);
+}
+
+/**
* gst_base_src_query_latency:
* @src: the source
- * @live: if the source is live
- * @min_latency: the min latency of the source
- * @max_latency: the max latency of the source
+ * @live: (out) (allow-none): if the source is live
+ * @min_latency: (out) (allow-none): the min latency of the source
+ * @max_latency: (out) (allow-none): the max latency of the source
*
* Query the source for the latency parameters. @live will be TRUE when @src is
* configured as a live source. @min_latency will be set to the difference
*
* Since: 0.10.22
*/
+/* FIXME 0.11: blocksize property should be int, not ulong */
void
gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
{
*
* Since: 0.10.22
*/
+/* FIXME 0.11: blocksize property should be int, not ulong */
gulong
gst_base_src_get_blocksize (GstBaseSrc * src)
{
GstFormat seg_format;
GST_OBJECT_LOCK (src);
- position = src->segment.last_stop;
+ position =
+ gst_segment_to_stream_time (&src->segment, src->segment.format,
+ src->segment.last_stop);
seg_format = src->segment.format;
GST_OBJECT_UNLOCK (src);
{
gint64 duration;
GstFormat seg_format;
+ guint length = 0;
+
+ /* may have to refresh duration */
+ if (g_atomic_int_get (&src->priv->dynamic_size))
+ gst_base_src_update_length (src, 0, &length);
- GST_OBJECT_LOCK (src);
/* this is the duration as configured by the subclass. */
+ GST_OBJECT_LOCK (src);
duration = src->segment.duration;
seg_format = src->segment.format;
GST_OBJECT_UNLOCK (src);
if (stop != -1)
stop -= src->segment.time;
}
+
gst_query_set_segment (query, src->segment.rate, src->segment.format,
start, stop);
GST_OBJECT_UNLOCK (src);
GstClockTime min, max;
gboolean live;
- /* Subclasses should override and implement something usefull */
+ /* Subclasses should override and implement something useful */
res = gst_base_src_query_latency (src, &live, &min, &max);
GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
gboolean result = FALSE;
src = GST_BASE_SRC (gst_pad_get_parent (pad));
+ if (G_UNLIKELY (src == NULL))
+ return FALSE;
bclass = GST_BASE_SRC_GET_CLASS (src);
guint32 seqnum;
GstEvent *tevent;
- GST_DEBUG_OBJECT (src, "doing seek");
+ GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
GST_OBJECT_LOCK (src);
dest_format = src->segment.format;
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
* copy the current segment info into the temp segment that we can actually
- * attempt the seek with. We only update the real segment if the seek suceeds. */
+ * attempt the seek with. We only update the real segment if the seek succeeds. */
if (!seekseg_configured) {
memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
src = GST_BASE_SRC (element);
- GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
+ GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
switch (GST_EVENT_TYPE (event)) {
/* bidirectional events */
* first and do EOS instead of entering it.
* - If we are in the _create function or we did not manage to set the
* flag fast enough and we are about to enter the _create function,
- * we unlock it so that we exit with WRONG_STATE immediatly. We then
+ * we unlock it so that we exit with WRONG_STATE immediately. We then
* check the EOS flag and do the EOS logic.
*/
g_atomic_int_set (&src->priv->pending_eos, TRUE);
/* sending random NEWSEGMENT downstream can break sync. */
break;
case GST_EVENT_TAG:
- /* Insert tag in the dataflow */
+ case GST_EVENT_CUSTOM_DOWNSTREAM:
+ case GST_EVENT_CUSTOM_BOTH:
+ /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
GST_OBJECT_LOCK (src);
- src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
+ src->priv->pending_events =
+ g_list_append (src->priv->pending_events, event);
+ g_atomic_int_set (&src->priv->have_events, TRUE);
GST_OBJECT_UNLOCK (src);
event = NULL;
result = TRUE;
case GST_EVENT_CUSTOM_UPSTREAM:
/* override send_event if you want this */
break;
- case GST_EVENT_CUSTOM_DOWNSTREAM:
- case GST_EVENT_CUSTOM_BOTH:
- /* FIXME, insert event in the dataflow */
- break;
case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
case GST_EVENT_CUSTOM_BOTH_OOB:
/* insert a random custom event into the pipeline */
{
gboolean result;
+ GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* is normally called when in push mode */
break;
}
default:
- result = TRUE;
+ result = FALSE;
break;
}
return result;
gboolean result = FALSE;
src = GST_BASE_SRC (gst_pad_get_parent (pad));
+ if (G_UNLIKELY (src == NULL)) {
+ gst_event_unref (event);
+ return FALSE;
+ }
+
bclass = GST_BASE_SRC_GET_CLASS (src);
if (bclass->event) {
GstBaseSrcClass *bclass;
GstFormat format;
gint64 stop;
+ gboolean dynamic;
bclass = GST_BASE_SRC_GET_CLASS (src);
", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
*length, size, stop, maxsize);
+ dynamic = g_atomic_int_get (&src->priv->dynamic_size);
+ GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
+
/* check size if we have one */
if (maxsize != -1) {
/* if we run past the end, check if the file became bigger and
* retry. */
- if (G_UNLIKELY (offset + *length >= maxsize)) {
+ if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
/* see if length of the file changed */
if (bclass->get_size)
if (!bclass->get_size (src, &size))
* segment is in bytes, we checked that above. */
GST_OBJECT_LOCK (src);
gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
- gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
GST_OBJECT_UNLOCK (src);
return TRUE;
again:
if (src->is_live) {
- while (G_UNLIKELY (!src->live_running)) {
+ if (G_UNLIKELY (!src->live_running)) {
ret = gst_base_src_wait_playing (src);
if (ret != GST_FLOW_OK)
goto stopped;
if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
goto unexpected_length;
+ /* track position */
+ GST_OBJECT_LOCK (src);
+ if (src->segment.format == GST_FORMAT_BYTES)
+ gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
+ GST_OBJECT_UNLOCK (src);
+
/* normally we don't count buffers */
if (G_UNLIKELY (src->num_buffers_left >= 0)) {
if (src->num_buffers_left == 0)
/* no timestamp set and we are at offset 0, we can timestamp with 0 */
if (offset == 0 && src->segment.time == 0
- && GST_BUFFER_TIMESTAMP (*buf) == -1)
+ && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) {
+ *buf = gst_buffer_make_metadata_writable (*buf);
GST_BUFFER_TIMESTAMP (*buf) = 0;
+ }
/* set pad caps on the buffer if the buffer had no caps */
- if (GST_BUFFER_CAPS (*buf) == NULL)
+ if (GST_BUFFER_CAPS (*buf) == NULL) {
+ *buf = gst_buffer_make_metadata_writable (*buf);
gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
+ }
/* now sync before pushing the buffer */
status = gst_base_src_do_sync (src, *buf);
gint64 position;
gboolean eos;
gulong blocksize;
- GList *tags, *tmp;
+ GList *pending_events = NULL, *tmp;
eos = FALSE;
}
src->priv->newsegment_pending = FALSE;
- GST_OBJECT_LOCK (src);
- /* take the tags */
- tags = src->priv->pending_tags;
- src->priv->pending_tags = NULL;
- GST_OBJECT_UNLOCK (src);
+ if (g_atomic_int_get (&src->priv->have_events)) {
+ GST_OBJECT_LOCK (src);
+ /* take the events */
+ pending_events = src->priv->pending_events;
+ src->priv->pending_events = NULL;
+ g_atomic_int_set (&src->priv->have_events, FALSE);
+ GST_OBJECT_UNLOCK (src);
+ }
- /* Push out pending tags if any */
- if (G_UNLIKELY (tags != NULL)) {
- for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
+ /* Push out pending events if any */
+ if (G_UNLIKELY (pending_events != NULL)) {
+ for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
GstEvent *ev = (GstEvent *) tmp->data;
gst_pad_push_event (pad, ev);
}
- g_list_free (tags);
+ g_list_free (pending_events);
}
/* figure out the new position */
GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
src->data.ABI.running = FALSE;
gst_pad_pause_task (pad);
- if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
- if (ret == GST_FLOW_UNEXPECTED) {
- gboolean flag_segment;
- GstFormat format;
- gint64 last_stop;
-
- /* perform EOS logic */
- flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
- format = src->segment.format;
- last_stop = src->segment.last_stop;
-
- if (flag_segment) {
- GstMessage *message;
-
- message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
- format, last_stop);
- gst_message_set_seqnum (message, src->priv->seqnum);
- gst_element_post_message (GST_ELEMENT_CAST (src), message);
- } else {
- event = gst_event_new_eos ();
- gst_event_set_seqnum (event, src->priv->seqnum);
- gst_pad_push_event (pad, event);
- src->priv->last_sent_eos = TRUE;
- }
+ if (ret == GST_FLOW_UNEXPECTED) {
+ gboolean flag_segment;
+ GstFormat format;
+ gint64 last_stop;
+
+ /* perform EOS logic */
+ flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
+ format = src->segment.format;
+ last_stop = src->segment.last_stop;
+
+ if (flag_segment) {
+ GstMessage *message;
+
+ message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
+ format, last_stop);
+ gst_message_set_seqnum (message, src->priv->seqnum);
+ gst_element_post_message (GST_ELEMENT_CAST (src), message);
} else {
event = gst_event_new_eos ();
gst_event_set_seqnum (event, src->priv->seqnum);
- /* for fatal errors we post an error message, post the error
- * first so the app knows about the error first. */
- GST_ELEMENT_ERROR (src, STREAM, FAILED,
- (_("Internal data flow error.")),
- ("streaming task paused, reason %s (%d)", reason, ret));
gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
+ } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_UNEXPECTED) {
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, src->priv->seqnum);
+ /* for fatal errors we post an error message, post the error
+ * first so the app knows about the error first.
+ * Also don't do this for WRONG_STATE because it happens
+ * due to flushing and posting an error message because of
+ * that is the wrong thing to do, e.g. when we're doing
+ * a flushing seek. */
+ GST_ELEMENT_ERROR (src, STREAM, FAILED,
+ (_("Internal data flow error.")),
+ ("streaming task paused, reason %s (%d)", reason, ret));
+ gst_pad_push_event (pad, event);
+ src->priv->last_sent_eos = TRUE;
}
goto done;
}
GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
if (peercaps) {
/* get intersection */
- caps = gst_caps_intersect (thiscaps, peercaps);
+ caps =
+ gst_caps_intersect_full (peercaps, thiscaps, GST_CAPS_INTERSECT_FIRST);
GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
- gst_caps_unref (thiscaps);
gst_caps_unref (peercaps);
} else {
/* no peer, work with our own caps then */
- caps = thiscaps;
+ caps = gst_caps_copy (thiscaps);
}
+ gst_caps_unref (thiscaps);
if (caps) {
/* take first (and best, since they are sorted) possibility */
- caps = gst_caps_make_writable (caps);
gst_caps_truncate (caps);
/* now fixate */
}
GST_DEBUG_OBJECT (basesrc,
- "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
- G_GINT64_FORMAT, format, result, size, basesrc->segment.duration);
+ "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
+ G_GINT64_FORMAT, gst_format_get_name (format), result, size,
+ basesrc->segment.duration);
seekable = gst_base_src_seekable (basesrc);
GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
} else {
/* signal the live source that it can start playing */
basesrc->live_running = live_play;
+
+ /* When unlocking drop all delayed events */
+ if (unlock) {
+ GST_OBJECT_LOCK (basesrc);
+ if (basesrc->priv->pending_events) {
+ g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
+ NULL);
+ g_list_free (basesrc->priv->pending_events);
+ basesrc->priv->pending_events = NULL;
+ g_atomic_int_set (&basesrc->priv->have_events, FALSE);
+ }
+ GST_OBJECT_UNLOCK (basesrc);
+ }
}
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
* already did this */
/* FIXME, deprecate this behaviour, it is very dangerous.
- * the prefered way of sending EOS downstream is by sending
+ * the preferred way of sending EOS downstream is by sending
* the EOS event to the element */
if (!basesrc->priv->last_sent_eos) {
GST_DEBUG_OBJECT (basesrc, "Sending EOS event");