* </itemizedlist>
* </para>
* <para>
+ * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any
+ * time by overriding #GstBaseSrc::check_get_range so that it returns TRUE.
+ * </para>
+ * <para>
* If all the conditions are met for operating in pull mode, #GstBaseSrc is
* automatically seekable in push mode as well. The following conditions must
* be met to make the element seekable in push mode when the format is not
* </itemizedlist>
* </para>
* <para>
- * When the default format is not GST_FORMAT_BYTES, the subclass should ignore
- * the offset and length in the #GstBaseSrc::create method. It is recommended
- * to subclass #GstPushSrc instead in this situation.
+ * When the element does not meet the requirements to operate in pull mode,
+ * the offset and length in the #GstBaseSrc::create method should be ignored.
+ * It is recommended to subclass #GstPushSrc instead, in this situation. If the
+ * element can operate in pull mode but only with specific offsets and
+ * lengths, it is allowed to generate an error when the wrong values are passed
+ * to the #GstBaseSrc::create function.
* </para>
* <para>
* #GstBaseSrc has support for live sources. Live sources are sources that
* thread might be blocked in PREROLL.
* </para>
* <para>
- * Last reviewed on 2006-04-14 (0.10.6)
+ * Last reviewed on 2006-07-06 (0.10.9)
* </para>
* </refsect2>
*/
g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
g_param_spec_ulong ("blocksize", "Block size",
- "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE,
- G_PARAM_READWRITE));
+ "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
+ DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
g_param_spec_int ("num-buffers", "num-buffers",
"Number of buffers to output before sending EOS", -1, G_MAXINT,
}
}
-/* with STREAM_LOCK and LOCK*/
+/* with STREAM_LOCK and LOCK */
static GstClockReturn
gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time)
{
if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
return GST_CLOCK_OK;
- /* clock_id should be NULL outside of this function */
- g_assert (basesrc->clock_id == NULL);
- g_assert (GST_CLOCK_TIME_IS_VALID (time));
-
id = gst_clock_new_single_shot_id (clock, time);
basesrc->clock_id = id;
if (src->segment.format != GST_FORMAT_BYTES)
return TRUE;
+ /* get total file size */
size = (guint64) src->segment.duration;
/* the max amount of bytes to read is the total size or
", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
*length, size, src->segment.stop, maxsize);
- /* check size */
+ /* check size if we have one */
if (maxsize != -1) {
- if (offset > maxsize)
- goto unexpected_length;
-
- if (offset + *length > maxsize) {
+ /* if we run past the end, check if the file became bigger and
+ * retry. */
+ if (G_UNLIKELY (offset + *length >= maxsize)) {
/* see if length of the file changed */
if (bclass->get_size)
bclass->get_size (src, &size);
+ /* make sure we don't exceed the configured segment stop
+ * if it was set */
if (src->segment.stop != -1)
maxsize = MIN (size, src->segment.stop);
else
maxsize = size;
- if (offset + *length > maxsize) {
+ /* if we are at or past the end, EOS */
+ if (G_UNLIKELY (offset >= maxsize))
+ goto unexpected_length;
+
+ /* else we can clip to the end */
+ if (G_UNLIKELY (offset + *length >= maxsize))
*length = maxsize - offset;
- }
}
}
- if (*length == 0)
- goto unexpected_length;
- /* keep track f current position. segment is in bytes, we checked
+ /* keep track of current position. segment is in bytes, we checked
* that above. */
gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
GST_LIVE_LOCK (src);
if (src->is_live) {
- while (!src->live_running) {
+ while (G_UNLIKELY (!src->live_running)) {
GST_DEBUG ("live source signal waiting");
GST_LIVE_SIGNAL (src);
GST_DEBUG ("live source waiting for running state");
GST_LIVE_WAIT (src);
GST_DEBUG ("live source unlocked");
}
- /* FIXME, use another variable to signal stopping */
+ /* FIXME, use another variable to signal stopping so that we don't
+ * have to grab another lock. */
GST_OBJECT_LOCK (src->srcpad);
- if (GST_PAD_IS_FLUSHING (src->srcpad))
+ if (G_UNLIKELY (GST_PAD_IS_FLUSHING (src->srcpad)))
goto flushing;
GST_OBJECT_UNLOCK (src->srcpad);
}
GST_LIVE_UNLOCK (src);
- if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))
+ if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
goto not_started;
if (G_UNLIKELY (!bclass->create))
goto no_function;
- if (!gst_base_src_update_length (src, offset, &length))
+ if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
goto unexpected_length;
- if (src->num_buffers_left == 0) {
- goto reached_num_buffers;
- } else {
- if (src->num_buffers_left > 0)
+ /* normally we don't count buffers */
+ if (G_UNLIKELY (src->num_buffers_left >= 0)) {
+ if (src->num_buffers_left == 0)
+ goto reached_num_buffers;
+ else
src->num_buffers_left--;
}
GST_DEBUG_OBJECT (src,
- "calling create offset %" G_GUINT64_FORMAT " %" G_GINT64_FORMAT, offset,
- src->segment.time);
+ "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
+ G_GINT64_FORMAT, offset, length, src->segment.time);
ret = bclass->create (src, offset, length, buf);
- if (ret != GST_FLOW_OK)
+ if (G_UNLIKELY (ret != GST_FLOW_OK))
goto done;
- /* no timestamp set and we are at offset 0 */
+ /* no timestamp set and we are at offset 0, we can timestamp with 0 */
if (offset == 0 && src->segment.time == 0
&& GST_BUFFER_TIMESTAMP (*buf) == -1)
GST_BUFFER_TIMESTAMP (*buf) = 0;
GST_DEBUG_OBJECT (src, "buffer ok");
break;
default:
+ /* this case is triggered when we were waiting for the clock and
+ * it got unlocked because we did a state change. We return
+ * WRONG_STATE in this case to stop the dataflow also get rid of the
+ * produced buffer. */
GST_DEBUG_OBJECT (src, "clock returned %d, not returning", status);
gst_buffer_unref (*buf);
*buf = NULL;
}
static gboolean
-gst_base_src_pad_check_get_range (GstPad * pad)
+gst_base_src_default_check_get_range (GstBaseSrc * src)
{
- GstBaseSrcClass *bclass;
- GstBaseSrc *src;
gboolean res;
- src = GST_BASE_SRC (gst_pad_get_parent (pad));
+ if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
+ GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
+ gst_base_src_start (src);
+ gst_base_src_stop (src);
+ }
+
+ /* we can operate in getrange mode if the native format is bytes
+ * and we are seekable, this condition is set in the random_access
+ * flag and is set in the _start() method. */
+ res = src->random_access;
+
+ return res;
+}
+
+static gboolean
+gst_base_src_check_get_range (GstBaseSrc * src)
+{
+ GstBaseSrcClass *bclass;
+ gboolean res;
bclass = GST_BASE_SRC_GET_CLASS (src);
- if (bclass->check_get_range == NULL) {
- GST_WARNING_OBJECT (src, "no check_get_range function set");
- return FALSE;
- }
+ if (bclass->check_get_range == NULL)
+ goto no_function;
res = bclass->check_get_range (src);
GST_LOG_OBJECT (src, "%s() returned %d",
GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
- gst_object_unref (src);
-
return res;
+
+ /* ERRORS */
+no_function:
+ {
+ GST_WARNING_OBJECT (src, "no check_get_range function set");
+ return FALSE;
+ }
}
static gboolean
-gst_base_src_default_check_get_range (GstBaseSrc * src)
+gst_base_src_pad_check_get_range (GstPad * pad)
{
+ GstBaseSrc *src;
gboolean res;
- if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
- GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
- gst_base_src_start (src);
- gst_base_src_stop (src);
- }
+ src = GST_BASE_SRC (gst_pad_get_parent (pad));
- /* we can operate in getrange mode if the native format is bytes
- * and we are seekable, this condition is set in the random_access
- * flag and is set in the _start() method. */
- res = src->random_access;
+ res = gst_base_src_check_get_range (src);
+
+ gst_object_unref (src);
return res;
}
ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
- GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %d", ret);
+ GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %d", ret);
goto pause;
}
if (G_UNLIKELY (buf == NULL))
ret = gst_pad_push (pad, buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
- GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %d", ret);
+ GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %d", ret);
goto pause;
}
* first so the app knows about the error first. */
GST_ELEMENT_ERROR (src, STREAM, FAILED,
(_("Internal data flow error.")),
- ("streaming task paused, reason %s", reason));
+ ("streaming task paused, reason %s (%d)", reason, ret));
gst_pad_push_event (pad, gst_event_new_eos ());
src->priv->last_sent_eos = TRUE;
}
could_not_start:
{
GST_DEBUG_OBJECT (basesrc, "could not start");
+ /* subclass is supposed to post a message */
return FALSE;
}
could_not_negotiate:
goto error_start;
/* if not random_access, we cannot operate in pull mode for now */
- if (!basesrc->random_access) {
- gst_base_src_stop (basesrc);
- return FALSE;
- }
+ if (!gst_base_src_check_get_range (basesrc))
+ goto no_get_range;
+
return TRUE;
} else {
GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
GST_DEBUG_OBJECT (basesrc, "Failed to start in pull mode");
return FALSE;
}
+no_get_range:
+ {
+ GST_DEBUG_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
+ gst_base_src_stop (basesrc);
+ return FALSE;
+ }
error_stop:
{
GST_DEBUG_OBJECT (basesrc, "Failed to stop in pull mode");
GST_END_TEST;
+GST_START_TEST (test_pull)
+{
+ GstElement *src;
+ GstQuery *seeking_query;
+ gboolean res, seekable;
+ gint64 start, stop;
+ GstPad *pad;
+ GstFlowReturn ret;
+ GstBuffer *buffer1, *buffer2;
+
+ src = setup_filesrc ();
+
+ g_object_set (G_OBJECT (src), "location", TESTFILE, NULL);
+ fail_unless (gst_element_set_state (src,
+ GST_STATE_READY) == GST_STATE_CHANGE_SUCCESS,
+ "could not set to ready");
+
+ /* get the source pad */
+ pad = gst_element_get_pad (src, "src");
+ fail_unless (pad != NULL);
+
+ /* activate the pad in pull mode */
+ res = gst_pad_activate_pull (pad, TRUE);
+ fail_unless (res == TRUE);
+
+ /* not start playing */
+ fail_unless (gst_element_set_state (src,
+ GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
+ "could not set to paused");
+
+ /* Test that filesrc is seekable with a file fd */
+ fail_unless ((seeking_query = gst_query_new_seeking (GST_FORMAT_BYTES))
+ != NULL);
+ fail_unless (gst_element_query (src, seeking_query) == TRUE);
+
+ /* get the seeking capabilities */
+ gst_query_parse_seeking (seeking_query, NULL, &seekable, &start, &stop);
+ fail_unless (seekable == TRUE);
+ fail_unless (start == 0);
+ fail_unless (start != -1);
+ gst_query_unref (seeking_query);
+
+ /* do some pulls */
+ ret = gst_pad_get_range (pad, 0, 100, &buffer1);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer1 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer1) == 100);
+
+ ret = gst_pad_get_range (pad, 0, 50, &buffer2);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer2 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer2) == 50);
+
+ /* this should be the same */
+ fail_unless (memcmp (GST_BUFFER_DATA (buffer1), GST_BUFFER_DATA (buffer2),
+ 50) == 0);
+
+ gst_buffer_unref (buffer2);
+
+ /* read next 50 bytes */
+ ret = gst_pad_get_range (pad, 50, 50, &buffer2);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer2 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer2) == 50);
+
+ /* compare with previously read data */
+ fail_unless (memcmp (GST_BUFFER_DATA (buffer1) + 50,
+ GST_BUFFER_DATA (buffer2), 50) == 0);
+
+ gst_buffer_unref (buffer1);
+ gst_buffer_unref (buffer2);
+
+ /* read 10 bytes at end-10 should give exactly 10 bytes */
+ ret = gst_pad_get_range (pad, stop - 10, 10, &buffer1);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer1 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer1) == 10);
+ gst_buffer_unref (buffer1);
+
+ /* read 20 bytes at end-10 should give exactly 10 bytes */
+ ret = gst_pad_get_range (pad, stop - 10, 20, &buffer1);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer1 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer1) == 10);
+ gst_buffer_unref (buffer1);
+
+ /* read 0 bytes at end-1 should return 0 bytes */
+ ret = gst_pad_get_range (pad, stop - 1, 0, &buffer1);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer1 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer1) == 0);
+ gst_buffer_unref (buffer1);
+
+ /* read 10 bytes at end-1 should return 1 byte */
+ ret = gst_pad_get_range (pad, stop - 1, 10, &buffer1);
+ fail_unless (ret == GST_FLOW_OK);
+ fail_unless (buffer1 != NULL);
+ fail_unless (GST_BUFFER_SIZE (buffer1) == 1);
+ gst_buffer_unref (buffer1);
+
+ /* read 0 bytes at end should EOS */
+ ret = gst_pad_get_range (pad, stop, 0, &buffer1);
+ fail_unless (ret == GST_FLOW_UNEXPECTED);
+
+ /* read 10 bytes before end should EOS */
+ ret = gst_pad_get_range (pad, stop, 10, &buffer1);
+ fail_unless (ret == GST_FLOW_UNEXPECTED);
+
+ /* read 0 bytes after end should EOS */
+ ret = gst_pad_get_range (pad, stop + 10, 0, &buffer1);
+ fail_unless (ret == GST_FLOW_UNEXPECTED);
+
+ /* read 10 bytes after end should EOS too */
+ ret = gst_pad_get_range (pad, stop + 10, 10, &buffer1);
+ fail_unless (ret == GST_FLOW_UNEXPECTED);
+
+ fail_unless (gst_element_set_state (src,
+ GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
+
+ /* cleanup */
+ gst_object_unref (pad);
+ cleanup_filesrc (src);
+}
+
+GST_END_TEST;
+
GST_START_TEST (test_coverage)
{
GstElement *src;
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_seeking);
+ tcase_add_test (tc_chain, test_pull);
tcase_add_test (tc_chain, test_coverage);
return s;