"type": "GFile",
"writable": true
},
+ "is-growing": {
+ "blurb": "Whether the file is growing, ignoring its end",
+ "conditionally-available": false,
+ "construct": false,
+ "construct-only": false,
+ "controllable": false,
+ "default": "false",
+ "mutable": "null",
+ "readable": true,
+ "type": "gboolean",
+ "writable": true
+ },
"location": {
"blurb": "URI location to read from",
"conditionally-available": false,
"writable": true
}
},
- "rank": "secondary"
+ "rank": "secondary",
+ "signals": {
+ "done-waiting-data": {
+ "args": [],
+ "return-type": "void",
+ "when": "last"
+ },
+ "waiting-data": {
+ "args": [],
+ "return-type": "void",
+ "when": "last"
+ }
+ }
},
"giostreamsink": {
"author": "Sebastian Dröge <sebastian.droege@collabora.co.uk>",
*
* Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
* Copyright (C) 2007 Sebastian Dröge <slomo@circular-chaos.org>
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
*
* Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
* Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
GError *err = NULL;
GstBuffer *newbuffer;
GstMemory *mem;
+ gboolean waited_for_data = FALSE;
+ GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
newbuffer = gst_buffer_new ();
while (size - read > 0 && (res =
g_input_stream_read (G_INPUT_STREAM (src->stream),
map.data + streamread, cachesize - streamread, src->cancel,
- &err)) > 0) {
+ &err)) >= 0) {
+
read += res;
streamread += res;
src->position += res;
+
+ if (res != 0)
+ continue;
+
+ if (!klass->wait_for_data || !klass->wait_for_data (src))
+ break;
+
+ waited_for_data = TRUE;
}
+
+ if (waited_for_data && klass->waited_for_data)
+ klass->waited_for_data (src);
+
gst_memory_unmap (mem, &map);
gst_buffer_append_memory (src->cache, mem);
*
* Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
* Copyright (C) 2007-2009 Sebastian Dröge <slomo@circular-chaos.org>
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
struct _GstGioBaseSrc
{
GstBaseSrc src;
-
+
/* < protected > */
GCancellable *cancel;
guint64 position;
GstBuffer *cache;
};
-struct _GstGioBaseSrcClass
+struct _GstGioBaseSrcClass
{
GstBaseSrcClass parent_class;
GInputStream * (*get_stream) (GstGioBaseSrc *bsrc);
+
+ /* Returns TRUE if the files grew and we should try
+ reading again, FALSE otherwise */
+ gboolean (*wait_for_data) (GstGioBaseSrc *bsrc);
+ void (*waited_for_data) (GstGioBaseSrc *bsrc);
+
gboolean close_on_stop;
};
*
* Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
* Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
{
PROP_0,
PROP_LOCATION,
- PROP_FILE
+ PROP_FILE,
+ PROP_GROWING_FILE,
};
+static gint waiting_data_signal = 0;
+static gint done_waiting_data_signal = 0;
+
#define gst_gio_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstGioSrc, gst_gio_src,
GST_TYPE_GIO_BASE_SRC, gst_gio_uri_handler_do_init (g_define_type_id));
static gboolean gst_gio_src_query (GstBaseSrc * base_src, GstQuery * query);
static void
+gst_gio_src_file_changed_cb (GstGioSrc * src)
+{
+ GST_DEBUG_OBJECT (src, "Underlying file changed.");
+ GST_OBJECT_LOCK (src);
+ src->changed = TRUE;
+ if (src->monitoring_mainloop)
+ g_main_loop_quit (src->monitoring_mainloop);
+ GST_OBJECT_UNLOCK (src);
+}
+
+static void
+gst_gio_src_waited_for_data (GstGioBaseSrc * bsrc)
+{
+ GstGioSrc *src = GST_GIO_SRC (bsrc);
+
+ src->waiting_for_data = FALSE;
+ g_signal_emit (bsrc, done_waiting_data_signal, 0, NULL);
+}
+
+static gboolean
+gst_gio_src_wait_for_data (GstGioBaseSrc * bsrc)
+{
+ GMainContext *ctx;
+ GstGioSrc *src = GST_GIO_SRC (bsrc);
+
+ g_return_val_if_fail (!src->monitor, FALSE);
+
+ GST_OBJECT_LOCK (src);
+ if (!src->is_growing) {
+ GST_OBJECT_UNLOCK (src);
+
+ return FALSE;
+ }
+
+ src->monitor = g_file_monitor (src->file, G_FILE_MONITOR_NONE,
+ bsrc->cancel, NULL);
+
+ if (!src->monitor) {
+ GST_OBJECT_UNLOCK (src);
+
+ GST_WARNING_OBJECT (bsrc, "Could not create a monitor");
+ return FALSE;
+ }
+
+ g_signal_connect_swapped (src->monitor, "changed",
+ G_CALLBACK (gst_gio_src_file_changed_cb), src);
+ GST_OBJECT_UNLOCK (src);
+
+ if (!src->waiting_for_data) {
+ g_signal_emit (src, waiting_data_signal, 0, NULL);
+ src->waiting_for_data = TRUE;
+ }
+
+ ctx = g_main_context_new ();
+ g_main_context_push_thread_default (ctx);
+ GST_OBJECT_LOCK (src);
+ src->changed = FALSE;
+ src->monitoring_mainloop = g_main_loop_new (ctx, FALSE);
+ GST_OBJECT_UNLOCK (src);
+
+ g_main_loop_run (src->monitoring_mainloop);
+
+ g_signal_handlers_disconnect_by_func (src->monitor,
+ gst_gio_src_file_changed_cb, src);
+
+ GST_OBJECT_LOCK (src);
+ gst_clear_object (&src->monitor);
+ g_main_loop_unref (src->monitoring_mainloop);
+ src->monitoring_mainloop = NULL;
+ GST_OBJECT_UNLOCK (src);
+
+ g_main_context_pop_thread_default (ctx);
+ g_main_context_unref (ctx);
+
+ return src->changed;
+}
+
+static gboolean
+gst_gio_src_unlock (GstBaseSrc * base_src)
+{
+ GstGioSrc *src = GST_GIO_SRC (base_src);
+
+ GST_LOG_OBJECT (src, "triggering cancellation");
+
+ GST_OBJECT_LOCK (src);
+ while (src->monitoring_mainloop) {
+ /* Ensure that we have already started the mainloop */
+ if (!g_main_loop_is_running (src->monitoring_mainloop)) {
+ GST_OBJECT_UNLOCK (src);
+
+ /* Letting a chance for the waiting for data function to cleanup the
+ * mainloop. */
+ g_thread_yield ();
+
+ GST_OBJECT_LOCK (src);
+ continue;
+ }
+ g_main_loop_quit (src->monitoring_mainloop);
+ break;
+ }
+ GST_OBJECT_UNLOCK (src);
+
+ return GST_CALL_PARENT_WITH_DEFAULT (GST_BASE_SRC_CLASS, unlock, (base_src),
+ TRUE);
+}
+
+static void
gst_gio_src_class_init (GstGioSrcClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
/**
* GstGioSrc:file:
*
- * %GFile to read from.
+ * #GFile to read from.
*/
g_object_class_install_property (gobject_class, PROP_FILE,
g_param_spec_object ("file", "File", "GFile to read from",
G_TYPE_FILE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstGioSrc:is-growing:
+ *
+ * Whether the file is currently growing. When activated EOS is never pushed
+ * and the user needs to handle it himself. This modes allows to keep reading
+ * the file while it is being written on file.
+ *
+ * You can reset the property to %FALSE at any time and the file will start
+ * not being considered growing and EOS will be pushed when required.
+ *
+ * Since: 1.20
+ */
+ g_object_class_install_property (gobject_class, PROP_GROWING_FILE,
+ g_param_spec_boolean ("is-growing", "File is growing",
+ "Whether the file is growing, ignoring its end",
+ FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gst_element_class_set_static_metadata (gstelement_class, "GIO source",
"Source/File",
"Read from any GIO-supported location",
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_src_query);
+ gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_src_unlock);
gstgiobasesrc_class->get_stream = GST_DEBUG_FUNCPTR (gst_gio_src_get_stream);
gstgiobasesrc_class->close_on_stop = TRUE;
+ gstgiobasesrc_class->wait_for_data = gst_gio_src_wait_for_data;
+ gstgiobasesrc_class->waited_for_data = gst_gio_src_waited_for_data;
+
+ /**
+ * GstGioSrc::waiting-data:
+ *
+ * Signal notifying that we are stalled waiting for data
+ *
+ * Since: 1.20
+ */
+ waiting_data_signal = g_signal_new ("waiting-data",
+ G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL,
+ NULL, G_TYPE_NONE, 0);
+
+ /**
+ * GstGioSrc::done-waiting-data:
+ *
+ * Signal notifying that we are done waiting for data
+ *
+ * Since: 1.20
+ */
+ done_waiting_data_signal = g_signal_new ("done-waiting-data",
+ G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL,
+ NULL, G_TYPE_NONE, 0);
}
static void
GST_OBJECT_UNLOCK (GST_OBJECT (src));
break;
}
+ case PROP_GROWING_FILE:
+ {
+ gboolean was_growing;
+
+ GST_OBJECT_LOCK (src);
+ was_growing = src->is_growing;
+ src->is_growing = g_value_get_boolean (value);
+ gst_base_src_set_dynamic_size (GST_BASE_SRC (src), src->is_growing);
+ gst_base_src_set_automatic_eos (GST_BASE_SRC (src), !src->is_growing);
+
+ while (was_growing && !src->is_growing && src->monitoring_mainloop) {
+ /* Ensure that we have already started the mainloop */
+ if (!g_main_loop_is_running (src->monitoring_mainloop)) {
+ GST_OBJECT_UNLOCK (src);
+ /* Letting a chance for the waiting for data function to cleanup the
+ * mainloop. */
+ GST_OBJECT_LOCK (src);
+ continue;
+ }
+ g_main_loop_quit (src->monitoring_mainloop);
+ break;
+ }
+ GST_OBJECT_UNLOCK (src);
+
+ break;
+ }
case PROP_FILE:
if (GST_STATE (src) == GST_STATE_PLAYING ||
GST_STATE (src) == GST_STATE_PAUSED) {
g_value_set_object (value, src->file);
GST_OBJECT_UNLOCK (GST_OBJECT (src));
break;
+ case PROP_GROWING_FILE:
+ {
+ g_value_set_boolean (value, src->is_growing);
+ break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
gst_query_set_scheduling (query, flags, 1, -1, 0);
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
- if (flags & GST_SCHEDULING_FLAG_SEEKABLE)
+ GST_OBJECT_LOCK (src);
+ if (flags & GST_SCHEDULING_FLAG_SEEKABLE && !src->is_growing)
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
+ GST_OBJECT_UNLOCK (src);
res = TRUE;
break;
*
* Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
* Copyright (C) 2007-2009 Sebastian Dröge <slomo@circular-chaos.org>
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
struct _GstGioSrc
{
GstGioBaseSrc src;
-
+
/*< private >*/
GFile *file;
+
+ gboolean is_growing;
+ GFileMonitor *monitor;
+ GMainLoop *monitoring_mainloop;
+ gboolean changed;
+ gboolean waiting_for_data;
};
G_END_DECLS
--- /dev/null
+# Pipeline with 2 branches, the first one write random data into a file in $(growing_file_location).
+# That branch is synchronized on the test clock that we drive in the scenario.
+# The second branch reads it with giosrc and does some tests like waiting for the
+# `done-waiting-signal` signals on the source etc...
+#
+# The whole dataflow is checked and we ensure that the exact same buffer content
+# is read from the giosrc.
+set-globals, growing_file_location="$(logsdir)/$(test_name)-growing.rand"
+meta,
+ seek=false,
+ handles-states=false,
+ args = {
+ "fakesrc num-buffers=30 datarate=30 filltype=pattern-span sizetype=fixed filltype=random format=time ! filesink sync=true location=$(growing_file_location) name=filesink buffer-mode=unbuffered \
+ giosrc name=giosrc is-growing=true location=file://$(growing_file_location) ! fakesink name=growing-file-sink async=false" \
+ },
+ configs = {
+ "$(validateflow), pad=filesink:sink, record-buffers=true, ignored-fields=\"stream-start={stream-id,group-id,stream}\", buffers-checksum=as-id",
+ "$(validateflow), pad=growing-file-sink:sink, record-buffers=true, ignored-fields=\"stream-start={stream-id,group-id,stream}\", buffers-checksum=as-id",
+ }
+
+
+crank-clock, repeat=5
+wait, signal-name=waiting-data, target-element-name=giosrc
+
+checkpoint
+
+crank-clock, repeat=5
+wait, signal-name=waiting-data, target-element-name=giosrc
+
+wait, signal-name=done-waiting-data, target-element-name=giosrc, non-blocking=true
+crank-clock, repeat=21
+
+checkpoint
+
+wait, signal-name=waiting-data, target-element-name=giosrc
+
+checkpoint
+
+# Make sure EOS is outputted now.
+set-properties, giosrc::is_growing=false
+
+stop, on-message=eos
\ No newline at end of file
--- /dev/null
+event stream-start: GstEventStreamStart, flags=(GstStreamFlags)GST_STREAM_FLAG_NONE;
+event segment: format=TIME, start=0:00:00.000000000, offset=0:00:00.000000000, stop=none, time=0:00:00.000000000, base=0:00:00.000000000, position=0:00:00.000000000
+buffer: content-id=0, dts=0:00:00.000000000, pts=0:00:00.000000000, dur=0:02:16.533333333, flags=discont tag-memory
+buffer: content-id=1, dts=0:02:16.533333333, pts=0:02:16.533333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=2, dts=0:04:33.066666666, pts=0:04:33.066666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=3, dts=0:06:49.600000000, pts=0:06:49.600000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=4, dts=0:09:06.133333333, pts=0:09:06.133333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=5, dts=0:11:22.666666666, pts=0:11:22.666666666, dur=0:02:16.533333333, flags=tag-memory
+
+CHECKPOINT
+
+buffer: content-id=6, dts=0:13:39.200000000, pts=0:13:39.200000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=7, dts=0:15:55.733333333, pts=0:15:55.733333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=8, dts=0:18:12.266666666, pts=0:18:12.266666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=9, dts=0:20:28.800000000, pts=0:20:28.800000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=10, dts=0:22:45.333333333, pts=0:22:45.333333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=11, dts=0:25:01.866666666, pts=0:25:01.866666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=12, dts=0:27:18.400000000, pts=0:27:18.400000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=13, dts=0:29:34.933333333, pts=0:29:34.933333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=14, dts=0:31:51.466666666, pts=0:31:51.466666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=15, dts=0:34:08.000000000, pts=0:34:08.000000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=16, dts=0:36:24.533333333, pts=0:36:24.533333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=17, dts=0:38:41.066666666, pts=0:38:41.066666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=18, dts=0:40:57.600000000, pts=0:40:57.600000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=19, dts=0:43:14.133333333, pts=0:43:14.133333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=20, dts=0:45:30.666666666, pts=0:45:30.666666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=21, dts=0:47:47.200000000, pts=0:47:47.200000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=22, dts=0:50:03.733333333, pts=0:50:03.733333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=23, dts=0:52:20.266666666, pts=0:52:20.266666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=24, dts=0:54:36.800000000, pts=0:54:36.800000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=25, dts=0:56:53.333333333, pts=0:56:53.333333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=26, dts=0:59:09.866666666, pts=0:59:09.866666666, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=27, dts=1:01:26.400000000, pts=1:01:26.400000000, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=28, dts=1:03:42.933333333, pts=1:03:42.933333333, dur=0:02:16.533333333, flags=tag-memory
+buffer: content-id=29, dts=1:05:59.466666666, pts=1:05:59.466666666, dur=0:02:16.533333333, flags=tag-memory
+event eos: (no structure)
+
+CHECKPOINT
+
+
+CHECKPOINT
+
--- /dev/null
+event stream-start: GstEventStreamStart, flags=(GstStreamFlags)GST_STREAM_FLAG_NONE;
+event segment: format=BYTES, start=0, offset=0, stop=18446744073709551615, time=0, base=0, position=0, duration=0
+buffer: content-id=0, dts=0:00:00.000000000, pts=0:00:00.000000000, flags=discont tag-memory
+buffer: content-id=1, flags=tag-memory
+buffer: content-id=2, flags=tag-memory
+buffer: content-id=3, flags=tag-memory
+buffer: content-id=4, flags=tag-memory
+
+CHECKPOINT
+
+buffer: content-id=5, flags=tag-memory
+buffer: content-id=6, flags=tag-memory
+buffer: content-id=7, flags=tag-memory
+buffer: content-id=8, flags=tag-memory
+buffer: content-id=9, flags=tag-memory
+
+CHECKPOINT
+
+buffer: content-id=10, flags=tag-memory
+buffer: content-id=11, flags=tag-memory
+buffer: content-id=12, flags=tag-memory
+buffer: content-id=13, flags=tag-memory
+buffer: content-id=14, flags=tag-memory
+buffer: content-id=15, flags=tag-memory
+buffer: content-id=16, flags=tag-memory
+buffer: content-id=17, flags=tag-memory
+buffer: content-id=18, flags=tag-memory
+buffer: content-id=19, flags=tag-memory
+buffer: content-id=20, flags=tag-memory
+buffer: content-id=21, flags=tag-memory
+buffer: content-id=22, flags=tag-memory
+buffer: content-id=23, flags=tag-memory
+buffer: content-id=24, flags=tag-memory
+buffer: content-id=25, flags=tag-memory
+buffer: content-id=26, flags=tag-memory
+buffer: content-id=27, flags=tag-memory
+buffer: content-id=28, flags=tag-memory
+buffer: content-id=29, flags=tag-memory
+
+CHECKPOINT
+
+event eos: (no structure)
'videorate/rate_2_0',
'videorate/rate_2_0_with_decoder',
'compositor/renogotiate_failing_unsupported_src_format',
+ 'giosrc/read-growing-file',
]
env = environment()