2 * Copyright (C) 2007 David Schleef <ds@schleef.org>
3 * (C) 2008 Wim Taymans <wim.taymans@gmail.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * SECTION:element-appsrc
23 * @see_also: #GstBaseSrc, appsink
25 * The appsrc element can be used by applications to insert data into a
28 * appsrc can be used by linking to the gstappsrc.h header file to access the
29 * methods or by using the appsrc action signals.
31 * Before operating appsrc, the caps property must be set to a fixed caps
32 * describing the format of the data that will be pushed with appsrc.
34 * The main way of handing data to the appsrc element is by calling the
35 * gst_app_src_push_buffer() method or by emiting the push-buffer action signal.
36 * This will put the buffer onto a queue from which appsrc will read from in its
37 * streaming thread. It is important to note that data transport will not happen
38 * from the thread that performed the push-buffer call.
40 * The "max-bytes" property controls how much data can be queued in appsrc
41 * before appsrc considers the queue full. A filled internal queue will always
42 * signal the "enough-data" signal, which signals the application that it should
43 * stop pushing data into appsrc. The "block" property will cause appsrc to
44 * block the push-buffer method until free data becomes available again.
46 * When the internal queue is running out of data, the "need-data" signal is
47 * emited, which signals the application that it should start pushing more data
50 * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
51 * "seek-data" signal when the "stream-mode" property is set to "seekable" or
52 * "random-access". The signal argument will contain the new desired position in
53 * the stream expressed in the unit set with the "format" property. After
54 * receiving the seek-data signal, the application should push-buffers from the
57 * These signals allow the application to operate the appsrc in two different
60 * The push model, in which the application repeadedly calls the push-buffer method
61 * with a new buffer. Optionally, the queue size in the appsrc can be controlled
62 * with the enough-data and need-data signals by respectively stopping/starting
63 * the push-buffer calls. This is a typical mode of operation for the
64 * stream-type "stream" and "seekable". Use this model when implementing various
65 * network protocols or hardware devices.
67 * The pull model where the need-data signal triggers the next push-buffer call.
68 * This mode is typically used in the "random-access" stream-type. Use this
69 * model for file access or other randomly accessable sources. In this mode, a
70 * buffer of exactly the amount of bytes given by the need-data signal should be
73 * In all modes, the size property on appsrc should contain the total stream
74 * size in bytes. Setting this property is mandatory in the random-access mode.
75 * For the stream and seekable modes, setting this property is optional but
78 * When the application is finished pushing data into appsrc, it should call
79 * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
80 * this call, no more buffers can be pushed into appsrc until a flushing seek
81 * happened or the state of the appsrc has gone through READY.
83 * Last reviewed on 2008-12-17 (0.10.10)
91 #include <gst/base/gstbasesrc.h>
95 #include "gstapp-marshal.h"
96 #include "gstappsrc.h"
99 GST_DEBUG_CATEGORY (app_src_debug);
100 #define GST_CAT_DEFAULT app_src_debug
102 static const GstElementDetails app_src_details = GST_ELEMENT_DETAILS ("AppSrc",
104 "Allow the application to feed buffers to a pipeline",
105 "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
116 SIGNAL_END_OF_STREAM,
121 #define DEFAULT_PROP_SIZE -1
122 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
123 #define DEFAULT_PROP_MAX_BYTES 200000
124 #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
125 #define DEFAULT_PROP_BLOCK FALSE
126 #define DEFAULT_PROP_IS_LIVE FALSE
127 #define DEFAULT_PROP_MIN_LATENCY -1
128 #define DEFAULT_PROP_MAX_LATENCY -1
145 static GstStaticPadTemplate gst_app_src_template =
146 GST_STATIC_PAD_TEMPLATE ("src",
149 GST_STATIC_CAPS_ANY);
152 #define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
154 stream_type_get_type (void)
156 static GType stream_type_type = 0;
157 static const GEnumValue stream_type[] = {
158 {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
159 {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
160 {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
164 if (!stream_type_type) {
165 stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
167 return stream_type_type;
170 static void gst_app_src_uri_handler_init (gpointer g_iface,
171 gpointer iface_data);
173 static void gst_app_src_dispose (GObject * object);
174 static void gst_app_src_finalize (GObject * object);
176 static void gst_app_src_set_property (GObject * object, guint prop_id,
177 const GValue * value, GParamSpec * pspec);
178 static void gst_app_src_get_property (GObject * object, guint prop_id,
179 GValue * value, GParamSpec * pspec);
181 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
182 gboolean do_min, guint64 min, gboolean do_max, guint64 max);
184 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
185 guint64 offset, guint size, GstBuffer ** buf);
186 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
187 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
188 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
189 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
190 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
191 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
192 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
193 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
194 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
196 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
199 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
202 _do_init (GType filesrc_type)
204 static const GInterfaceInfo urihandler_info = {
205 gst_app_src_uri_handler_init,
209 g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
213 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
217 gst_app_src_base_init (gpointer g_class)
219 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
221 GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
223 gst_element_class_set_details (element_class, &app_src_details);
225 gst_element_class_add_pad_template (element_class,
226 gst_static_pad_template_get (&gst_app_src_template));
230 gst_app_src_class_init (GstAppSrcClass * klass)
232 GObjectClass *gobject_class = (GObjectClass *) klass;
233 GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
235 gobject_class->dispose = gst_app_src_dispose;
236 gobject_class->finalize = gst_app_src_finalize;
238 gobject_class->set_property = gst_app_src_set_property;
239 gobject_class->get_property = gst_app_src_get_property;
244 * The GstCaps that will negotiated downstream and will be put
245 * on outgoing buffers.
247 g_object_class_install_property (gobject_class, PROP_CAPS,
248 g_param_spec_boxed ("caps", "Caps",
249 "The allowed caps for the src pad", GST_TYPE_CAPS,
250 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254 * The format to use for segment events. When the source is producing
255 * timestamped buffers this property should be set to GST_FORMAT_TIME.
257 g_object_class_install_property (gobject_class, PROP_FORMAT,
258 g_param_spec_enum ("format", "Format",
259 "The format of the segment events and seek", GST_TYPE_FORMAT,
260 DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
264 * The total size in bytes of the data stream. If the total size is known, it
265 * is recommended to configure it with this property.
267 g_object_class_install_property (gobject_class, PROP_SIZE,
268 g_param_spec_int64 ("size", "Size",
269 "The size of the data stream in bytes (-1 if unknown)",
270 -1, G_MAXINT64, DEFAULT_PROP_SIZE,
271 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 * GstAppSrc::stream-type
275 * The type of stream that this source is producing. For seekable streams the
276 * application should connect to the seek-data signal.
278 g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
279 g_param_spec_enum ("stream-type", "Stream Type",
280 "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
281 DEFAULT_PROP_STREAM_TYPE,
282 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
284 * GstAppSrc::max-bytes
286 * The maximum amount of bytes that can be queued internally.
287 * After the maximum amount of bytes are queued, appsrc will emit the
288 * "enough-data" signal.
290 g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
291 g_param_spec_uint64 ("max-bytes", "Max bytes",
292 "The maximum number of bytes to queue internally (0 = unlimited)",
293 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
294 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 * When max-bytes are queued and after the enough-data signal has been emited,
299 * block any further push-buffer calls until the amount of queued bytes drops
300 * below the max-bytes limit.
302 g_object_class_install_property (gobject_class, PROP_BLOCK,
303 g_param_spec_boolean ("block", "Block",
304 "Block push-buffer when max-bytes are queued",
305 DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310 * Instruct the source to behave like a live source. This includes that it
311 * will only push out buffers in the PLAYING state.
313 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
314 g_param_spec_boolean ("is-live", "Is Live",
315 "Whether to act as a live source",
316 DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
318 * GstAppSrc::min-latency
320 * The minimum latency of the source. A value of -1 will use the default
321 * latency calculations of #GstBaseSrc.
323 g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
324 g_param_spec_int64 ("min-latency", "Min Latency",
325 "The minimum latency (-1 = default)",
326 -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
327 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
329 * GstAppSrc::max-latency
331 * The maximum latency of the source. A value of -1 means an unlimited amout
334 g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
335 g_param_spec_int64 ("max-latency", "Max Latency",
336 "The maximum latency (-1 = unlimited)",
337 -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
338 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
341 * GstAppSrc::need-data:
342 * @appsrc: the appsrc element that emited the signal
343 * @length: the amount of bytes needed.
345 * Signal that the source needs more data. In the callback or from another
346 * thread you should call push-buffer or end-of-stream.
348 * @length is just a hint and when it is set to -1, any number of bytes can be
349 * pushed into @appsrc.
351 * You can call push-buffer multiple times until the enough-data signal is
354 gst_app_src_signals[SIGNAL_NEED_DATA] =
355 g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
356 G_STRUCT_OFFSET (GstAppSrcClass, need_data),
357 NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
360 * GstAppSrc::enough-data:
361 * @appsrc: the appsrc element that emited the signal
363 * Signal that the source has enough data. It is recommended that the
364 * application stops calling push-buffer until the need-data signal is
365 * emited again to avoid excessive buffer queueing.
367 gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
368 g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
369 G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
370 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
373 * GstAppSrc::seek-data:
374 * @appsrc: the appsrc element that emited the signal
375 * @offset: the offset to seek to
377 * Seek to the given offset. The next push-buffer should produce buffers from
379 * This callback is only called for seekable stream types.
381 * Returns: %TRUE if the seek succeeded.
383 gst_app_src_signals[SIGNAL_SEEK_DATA] =
384 g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
385 G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
386 NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
390 * GstAppSrc::push-buffer:
391 * @appsrc: the appsrc
392 * @buffer: a buffer to push
394 * Adds a buffer to the queue of buffers that the appsrc element will
395 * push to its source pad. This function does not take ownership of the
396 * buffer so the buffer needs to be unreffed after calling this function.
398 * When the block property is TRUE, this function can block until free space
399 * becomes available in the queue.
401 gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
402 g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
403 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
404 push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT,
405 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
408 * GstAppSrc::end-of-stream:
409 * @appsrc: the appsrc
411 * Notify @appsrc that no more buffer are available.
413 gst_app_src_signals[SIGNAL_END_OF_STREAM] =
414 g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
415 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
416 end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID,
417 GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
419 basesrc_class->create = gst_app_src_create;
420 basesrc_class->start = gst_app_src_start;
421 basesrc_class->stop = gst_app_src_stop;
422 basesrc_class->unlock = gst_app_src_unlock;
423 basesrc_class->unlock_stop = gst_app_src_unlock_stop;
424 basesrc_class->do_seek = gst_app_src_do_seek;
425 basesrc_class->is_seekable = gst_app_src_is_seekable;
426 basesrc_class->check_get_range = gst_app_src_check_get_range;
427 basesrc_class->get_size = gst_app_src_do_get_size;
428 basesrc_class->get_size = gst_app_src_do_get_size;
429 basesrc_class->query = gst_app_src_query;
431 klass->push_buffer = gst_app_src_push_buffer_action;
432 klass->end_of_stream = gst_app_src_end_of_stream;
436 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
438 appsrc->mutex = g_mutex_new ();
439 appsrc->cond = g_cond_new ();
440 appsrc->queue = g_queue_new ();
442 appsrc->size = DEFAULT_PROP_SIZE;
443 appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE;
444 appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES;
445 appsrc->format = DEFAULT_PROP_FORMAT;
446 appsrc->block = DEFAULT_PROP_BLOCK;
447 appsrc->min_latency = DEFAULT_PROP_MIN_LATENCY;
448 appsrc->max_latency = DEFAULT_PROP_MAX_LATENCY;
450 gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
454 gst_app_src_flush_queued (GstAppSrc * src)
458 while ((buf = g_queue_pop_head (src->queue)))
459 gst_buffer_unref (buf);
463 gst_app_src_dispose (GObject * obj)
465 GstAppSrc *appsrc = GST_APP_SRC (obj);
468 gst_caps_unref (appsrc->caps);
471 gst_app_src_flush_queued (appsrc);
473 G_OBJECT_CLASS (parent_class)->dispose (obj);
477 gst_app_src_finalize (GObject * obj)
479 GstAppSrc *appsrc = GST_APP_SRC (obj);
481 g_mutex_free (appsrc->mutex);
482 g_cond_free (appsrc->cond);
483 g_queue_free (appsrc->queue);
485 G_OBJECT_CLASS (parent_class)->finalize (obj);
489 gst_app_src_set_property (GObject * object, guint prop_id,
490 const GValue * value, GParamSpec * pspec)
492 GstAppSrc *appsrc = GST_APP_SRC (object);
496 gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
499 gst_app_src_set_size (appsrc, g_value_get_int64 (value));
501 case PROP_STREAM_TYPE:
502 gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
505 gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
508 appsrc->format = g_value_get_enum (value);
511 appsrc->block = g_value_get_boolean (value);
514 gst_base_src_set_live (GST_BASE_SRC (appsrc),
515 g_value_get_boolean (value));
517 case PROP_MIN_LATENCY:
518 gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
521 case PROP_MAX_LATENCY:
522 gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
523 g_value_get_int64 (value));
526 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
532 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
535 GstAppSrc *appsrc = GST_APP_SRC (object);
542 /* we're missing a _take_caps() function to transfer ownership */
543 caps = gst_app_src_get_caps (appsrc);
544 gst_value_set_caps (value, caps);
546 gst_caps_unref (caps);
550 g_value_set_int64 (value, gst_app_src_get_size (appsrc));
552 case PROP_STREAM_TYPE:
553 g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
556 g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
559 g_value_set_enum (value, appsrc->format);
562 g_value_set_boolean (value, appsrc->block);
565 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
567 case PROP_MIN_LATENCY:
571 gst_app_src_get_latency (appsrc, &min, NULL);
572 g_value_set_int64 (value, min);
575 case PROP_MAX_LATENCY:
579 gst_app_src_get_latency (appsrc, &max, NULL);
580 g_value_set_int64 (value, max);
584 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
590 gst_app_src_unlock (GstBaseSrc * bsrc)
592 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
594 g_mutex_lock (appsrc->mutex);
595 GST_DEBUG_OBJECT (appsrc, "unlock start");
596 appsrc->flushing = TRUE;
597 g_cond_broadcast (appsrc->cond);
598 g_mutex_unlock (appsrc->mutex);
604 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
606 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
608 g_mutex_lock (appsrc->mutex);
609 GST_DEBUG_OBJECT (appsrc, "unlock stop");
610 appsrc->flushing = FALSE;
611 g_cond_broadcast (appsrc->cond);
612 g_mutex_unlock (appsrc->mutex);
618 gst_app_src_start (GstBaseSrc * bsrc)
620 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
622 g_mutex_lock (appsrc->mutex);
623 GST_DEBUG_OBJECT (appsrc, "starting");
624 appsrc->started = TRUE;
625 /* set the offset to -1 so that we always do a first seek. This is only used
626 * in random-access mode. */
628 appsrc->flushing = FALSE;
629 g_mutex_unlock (appsrc->mutex);
631 gst_base_src_set_format (bsrc, appsrc->format);
637 gst_app_src_stop (GstBaseSrc * bsrc)
639 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
641 g_mutex_lock (appsrc->mutex);
642 GST_DEBUG_OBJECT (appsrc, "stopping");
643 appsrc->is_eos = FALSE;
644 appsrc->flushing = TRUE;
645 appsrc->started = FALSE;
646 gst_app_src_flush_queued (appsrc);
647 g_mutex_unlock (appsrc->mutex);
653 gst_app_src_is_seekable (GstBaseSrc * src)
655 GstAppSrc *appsrc = GST_APP_SRC (src);
656 gboolean res = FALSE;
658 switch (appsrc->stream_type) {
659 case GST_APP_STREAM_TYPE_STREAM:
661 case GST_APP_STREAM_TYPE_SEEKABLE:
662 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
670 gst_app_src_check_get_range (GstBaseSrc * src)
672 GstAppSrc *appsrc = GST_APP_SRC (src);
673 gboolean res = FALSE;
675 switch (appsrc->stream_type) {
676 case GST_APP_STREAM_TYPE_STREAM:
677 case GST_APP_STREAM_TYPE_SEEKABLE:
679 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
687 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
689 GstAppSrc *appsrc = GST_APP_SRC (src);
691 *size = gst_app_src_get_size (appsrc);
697 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
699 GstAppSrc *appsrc = GST_APP_SRC (src);
702 switch (GST_QUERY_TYPE (query)) {
703 case GST_QUERY_LATENCY:
705 GstClockTime min, max;
708 /* Query the parent class for the defaults */
709 res = gst_base_src_query_latency (src, &live, &min, &max);
711 /* overwrite with our values when we need to */
712 g_mutex_lock (appsrc->mutex);
713 if (appsrc->min_latency != -1)
714 min = appsrc->min_latency;
715 if (appsrc->max_latency != -1)
716 max = appsrc->max_latency;
717 g_mutex_unlock (appsrc->mutex);
719 gst_query_set_latency (query, live, min, max);
723 res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
730 /* will be called in push mode */
732 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
734 GstAppSrc *appsrc = GST_APP_SRC (src);
735 gint64 desired_position;
736 gboolean res = FALSE;
738 desired_position = segment->last_stop;
740 GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
741 desired_position, gst_format_get_name (segment->format));
743 /* no need to try to seek in streaming mode */
744 if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM)
747 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
748 desired_position, &res);
751 GST_DEBUG_OBJECT (appsrc, "flushing queue");
752 gst_app_src_flush_queued (appsrc);
754 GST_WARNING_OBJECT (appsrc, "seek failed");
761 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
764 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
767 g_mutex_lock (appsrc->mutex);
768 /* check flushing first */
769 if (G_UNLIKELY (appsrc->flushing))
772 if (appsrc->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
773 /* if we are dealing with a random-access stream, issue a seek if the offset
775 if (G_UNLIKELY (appsrc->offset != offset)) {
778 g_mutex_unlock (appsrc->mutex);
780 GST_DEBUG_OBJECT (appsrc,
781 "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
782 appsrc->offset, offset);
784 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
787 if (G_UNLIKELY (!res))
788 /* failing to seek is fatal */
791 g_mutex_lock (appsrc->mutex);
793 appsrc->offset = offset;
798 /* return data as long as we have some */
799 if (!g_queue_is_empty (appsrc->queue)) {
802 *buf = g_queue_pop_head (appsrc->queue);
803 buf_size = GST_BUFFER_SIZE (*buf);
805 GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
807 appsrc->queued_bytes -= buf_size;
809 /* only update the offset when in random_access mode */
810 if (appsrc->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
811 appsrc->offset += buf_size;
814 gst_buffer_set_caps (*buf, appsrc->caps);
816 /* signal that we removed an item */
817 g_cond_broadcast (appsrc->cond);
822 g_mutex_unlock (appsrc->mutex);
824 /* we have no data, we need some. We fire the signal with the size hint. */
825 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
828 g_mutex_lock (appsrc->mutex);
829 /* we can be flushing now because we released the lock */
830 if (G_UNLIKELY (appsrc->flushing))
833 /* if we have a buffer now, continue the loop and try to return it. In
834 * random-access mode (where a buffer is normally pushed in the above
835 * signal) we can still be empty because the pushed buffer got flushed or
836 * when the application pushes the requested buffer later, we support both
838 if (!g_queue_is_empty (appsrc->queue))
841 /* no buffer yet, maybe we are EOS, if not, block for more data. */
845 if (G_UNLIKELY (appsrc->is_eos))
848 /* nothing to return, wait a while for new data or flushing. */
849 g_cond_wait (appsrc->cond, appsrc->mutex);
851 g_mutex_unlock (appsrc->mutex);
858 GST_DEBUG_OBJECT (appsrc, "we are flushing");
859 g_mutex_unlock (appsrc->mutex);
860 return GST_FLOW_WRONG_STATE;
864 GST_DEBUG_OBJECT (appsrc, "we are EOS");
865 g_mutex_unlock (appsrc->mutex);
866 return GST_FLOW_UNEXPECTED;
870 GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
872 return GST_FLOW_ERROR;
879 * gst_app_src_set_caps:
880 * @appsrc: a #GstAppSrc
883 * Set the capabilities on the appsrc element. This function takes
884 * a copy of the caps structure. After calling this method, the source will
885 * only produce caps that match @caps. @caps must be fixed and the caps on the
886 * buffers must match the caps or left NULL.
889 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
893 g_return_if_fail (GST_IS_APP_SRC (appsrc));
895 GST_OBJECT_LOCK (appsrc);
896 GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
897 if ((old = appsrc->caps) != caps) {
899 appsrc->caps = gst_caps_copy (caps);
903 gst_caps_unref (old);
905 GST_OBJECT_UNLOCK (appsrc);
909 * gst_app_src_get_caps:
910 * @appsrc: a #GstAppSrc
912 * Get the configured caps on @appsrc.
914 * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
917 gst_app_src_get_caps (GstAppSrc * appsrc)
921 g_return_val_if_fail (appsrc != NULL, NULL);
922 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
924 GST_OBJECT_LOCK (appsrc);
925 if ((caps = appsrc->caps))
927 GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
928 GST_OBJECT_UNLOCK (appsrc);
934 * gst_app_src_set_size:
935 * @appsrc: a #GstAppSrc
936 * @size: the size to set
938 * Set the size of the stream in bytes. A value of -1 means that the size is
942 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
944 g_return_if_fail (appsrc != NULL);
945 g_return_if_fail (GST_IS_APP_SRC (appsrc));
947 GST_OBJECT_LOCK (appsrc);
948 GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
950 GST_OBJECT_UNLOCK (appsrc);
954 * gst_app_src_get_size:
955 * @appsrc: a #GstAppSrc
957 * Get the size of the stream in bytes. A value of -1 means that the size is
960 * Returns: the size of the stream previously set with gst_app_src_set_size();
963 gst_app_src_get_size (GstAppSrc * appsrc)
967 g_return_val_if_fail (appsrc != NULL, -1);
968 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
970 GST_OBJECT_LOCK (appsrc);
972 GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
973 GST_OBJECT_UNLOCK (appsrc);
979 * gst_app_src_set_stream_type:
980 * @appsrc: a #GstAppSrc
981 * @type: the new state
983 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
986 * A stream_type stream
989 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
991 g_return_if_fail (appsrc != NULL);
992 g_return_if_fail (GST_IS_APP_SRC (appsrc));
994 GST_OBJECT_LOCK (appsrc);
995 GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
996 appsrc->stream_type = type;
997 GST_OBJECT_UNLOCK (appsrc);
1001 * gst_app_src_get_stream_type:
1002 * @appsrc: a #GstAppSrc
1004 * Get the stream type. Control the stream type of @appsrc
1005 * with gst_app_src_set_stream_type().
1007 * Returns: the stream type.
1010 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1012 gboolean stream_type;
1014 g_return_val_if_fail (appsrc != NULL, FALSE);
1015 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1017 GST_OBJECT_LOCK (appsrc);
1018 stream_type = appsrc->stream_type;
1019 GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1020 GST_OBJECT_UNLOCK (appsrc);
1026 * gst_app_src_set_max_bytes:
1027 * @appsrc: a #GstAppSrc
1028 * @max: the maximum number of bytes to queue
1030 * Set the maximum amount of bytes that can be queued in @appsrc.
1031 * After the maximum amount of bytes are queued, @appsrc will emit the
1032 * "enough-data" signal.
1035 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1037 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1039 g_mutex_lock (appsrc->mutex);
1040 if (max != appsrc->max_bytes) {
1041 GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1042 appsrc->max_bytes = max;
1043 /* signal the change */
1044 g_cond_broadcast (appsrc->cond);
1046 g_mutex_unlock (appsrc->mutex);
1050 * gst_app_src_get_max_bytes:
1051 * @appsrc: a #GstAppSrc
1053 * Get the maximum amount of bytes that can be queued in @appsrc.
1055 * Returns: The maximum amount of bytes that can be queued.
1058 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1062 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1064 g_mutex_lock (appsrc->mutex);
1065 result = appsrc->max_bytes;
1066 GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1067 g_mutex_unlock (appsrc->mutex);
1073 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1074 gboolean do_max, guint64 max)
1076 gboolean changed = FALSE;
1078 g_mutex_lock (appsrc->mutex);
1079 if (do_min && appsrc->min_latency != min) {
1080 appsrc->min_latency = min;
1083 if (do_max && appsrc->max_latency != max) {
1084 appsrc->max_latency = max;
1087 g_mutex_unlock (appsrc->mutex);
1090 GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1091 gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1092 gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1097 * gst_app_src_set_latency:
1098 * @appsrc: a #GstAppSrc
1099 * @min: the min latency
1100 * @max: the min latency
1102 * Configure the @min and @max latency in @src. If @min is set to -1, the
1103 * default latency calculations for pseudo-live sources will be used.
1106 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1108 gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1112 * gst_app_src_get_latency:
1113 * @appsrc: a #GstAppSrc
1114 * @min: the min latency
1115 * @max: the min latency
1117 * Retrieve the min and max latencies in @min and @max respectively.
1120 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1122 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1124 g_mutex_lock (appsrc->mutex);
1126 *min = appsrc->min_latency;
1128 *max = appsrc->max_latency;
1129 g_mutex_unlock (appsrc->mutex);
1132 static GstFlowReturn
1133 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1136 gboolean first = TRUE;
1138 g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
1139 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1140 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1142 g_mutex_lock (appsrc->mutex);
1145 /* can't accept buffers when we are flushing or EOS */
1146 if (appsrc->flushing)
1152 if (appsrc->max_bytes && appsrc->queued_bytes >= appsrc->max_bytes) {
1153 GST_DEBUG_OBJECT (appsrc, "queue filled (%" G_GUINT64_FORMAT " >= %"
1154 G_GUINT64_FORMAT ")", appsrc->queued_bytes, appsrc->max_bytes);
1157 /* only signal on the first push */
1158 g_mutex_unlock (appsrc->mutex);
1160 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1163 g_mutex_lock (appsrc->mutex);
1164 /* continue to check for flushing/eos after releasing the lock */
1168 if (appsrc->block) {
1169 GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1170 /* we are filled, wait until a buffer gets popped or when we
1172 g_cond_wait (appsrc->cond, appsrc->mutex);
1174 /* no need to wait for free space, we just pump more data into the
1175 * queue hoping that the caller reacts to the enough-data signal and
1176 * stops pushing buffers. */
1183 GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1185 gst_buffer_ref (buffer);
1186 g_queue_push_tail (appsrc->queue, buffer);
1187 appsrc->queued_bytes += GST_BUFFER_SIZE (buffer);
1188 g_cond_broadcast (appsrc->cond);
1189 g_mutex_unlock (appsrc->mutex);
1196 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1198 gst_buffer_unref (buffer);
1199 g_mutex_unlock (appsrc->mutex);
1200 return GST_FLOW_WRONG_STATE;
1204 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1206 gst_buffer_unref (buffer);
1207 g_mutex_unlock (appsrc->mutex);
1208 return GST_FLOW_UNEXPECTED;
1213 * gst_app_src_push_buffer:
1214 * @appsrc: a #GstAppSrc
1215 * @buffer: a #GstBuffer to push
1217 * Adds a buffer to the queue of buffers that the appsrc element will
1218 * push to its source pad. This function takes ownership of the buffer.
1220 * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1221 * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1222 * #GST_FLOW_UNEXPECTED when EOS occured.
1225 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1227 return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1230 /* push a buffer without stealing the ref of the buffer. This is used for the
1232 static GstFlowReturn
1233 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1235 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1239 * gst_app_src_end_of_stream:
1240 * @appsrc: a #GstAppSrc
1242 * Indicates to the appsrc element that the last buffer queued in the
1243 * element is the last buffer of the stream.
1245 * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1246 * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1249 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1251 g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
1252 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1254 g_mutex_lock (appsrc->mutex);
1255 /* can't accept buffers when we are flushing. We can accept them when we are
1256 * EOS although it will not do anything. */
1257 if (appsrc->flushing)
1260 GST_DEBUG_OBJECT (appsrc, "sending EOS");
1261 appsrc->is_eos = TRUE;
1262 g_cond_broadcast (appsrc->cond);
1263 g_mutex_unlock (appsrc->mutex);
1270 GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1271 return GST_FLOW_WRONG_STATE;
1275 /*** GSTURIHANDLER INTERFACE *************************************************/
1278 gst_app_src_uri_get_type (void)
1284 gst_app_src_uri_get_protocols (void)
1286 static gchar *protocols[] = { "appsrc", NULL };
1290 static const gchar *
1291 gst_app_src_uri_get_uri (GstURIHandler * handler)
1297 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1302 protocol = gst_uri_get_protocol (uri);
1303 ret = !strcmp (protocol, "appsrc");
1310 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1312 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1314 iface->get_type = gst_app_src_uri_get_type;
1315 iface->get_protocols = gst_app_src_uri_get_protocols;
1316 iface->get_uri = gst_app_src_uri_get_uri;
1317 iface->set_uri = gst_app_src_uri_set_uri;