2 * Copyright (C) 2007 David Schleef <ds@schleef.org>
3 * (C) 2008 Wim Taymans <wim.taymans@gmail.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
26 #include <gst/base/gstbasesrc.h>
30 #include "gstapp-marshal.h"
31 #include "gstappsrc.h"
34 GST_DEBUG_CATEGORY (app_src_debug);
35 #define GST_CAT_DEFAULT app_src_debug
37 static const GstElementDetails app_src_details = GST_ELEMENT_DETAILS ("AppSrc",
39 "Allow the application to feed buffers to a pipeline",
40 "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com");
56 #define DEFAULT_PROP_SIZE -1
57 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
58 #define DEFAULT_PROP_MAX_BYTES 200000
59 #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
60 #define DEFAULT_PROP_BLOCK FALSE
75 static GstStaticPadTemplate gst_app_src_template =
76 GST_STATIC_PAD_TEMPLATE ("src",
82 #define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
84 stream_type_get_type (void)
86 static GType stream_type_type = 0;
87 static const GEnumValue stream_type[] = {
88 {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
89 {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
90 {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
94 if (!stream_type_type) {
95 stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
97 return stream_type_type;
100 static void gst_app_src_uri_handler_init (gpointer g_iface,
101 gpointer iface_data);
103 static void gst_app_src_dispose (GObject * object);
104 static void gst_app_src_finalize (GObject * object);
106 static void gst_app_src_set_property (GObject * object, guint prop_id,
107 const GValue * value, GParamSpec * pspec);
108 static void gst_app_src_get_property (GObject * object, guint prop_id,
109 GValue * value, GParamSpec * pspec);
111 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
112 guint64 offset, guint size, GstBuffer ** buf);
113 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
114 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
115 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
116 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
117 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
118 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
119 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
120 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
122 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
125 _do_init (GType filesrc_type)
127 static const GInterfaceInfo urihandler_info = {
128 gst_app_src_uri_handler_init,
132 g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
136 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
140 gst_app_src_base_init (gpointer g_class)
142 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
144 GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
146 gst_element_class_set_details (element_class, &app_src_details);
148 gst_element_class_add_pad_template (element_class,
149 gst_static_pad_template_get (&gst_app_src_template));
153 gst_app_src_class_init (GstAppSrcClass * klass)
155 GObjectClass *gobject_class = (GObjectClass *) klass;
156 GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
158 gobject_class->dispose = gst_app_src_dispose;
159 gobject_class->finalize = gst_app_src_finalize;
161 gobject_class->set_property = gst_app_src_set_property;
162 gobject_class->get_property = gst_app_src_get_property;
164 g_object_class_install_property (gobject_class, PROP_CAPS,
165 g_param_spec_boxed ("caps", "Caps",
166 "The allowed caps for the src pad", GST_TYPE_CAPS,
167 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
169 g_object_class_install_property (gobject_class, PROP_FORMAT,
170 g_param_spec_enum ("format", "Format",
171 "The format of the segment events and seek", GST_TYPE_FORMAT,
172 DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
174 g_object_class_install_property (gobject_class, PROP_SIZE,
175 g_param_spec_int64 ("size", "Size",
176 "The size of the data stream (-1 if unknown)",
177 -1, G_MAXINT64, DEFAULT_PROP_SIZE,
178 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180 g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
181 g_param_spec_enum ("stream-type", "Stream Type",
182 "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
183 DEFAULT_PROP_STREAM_TYPE,
184 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
186 g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
187 g_param_spec_uint64 ("max-bytes", "Max bytes",
188 "The maximum number of bytes to queue internally (0 = unlimited)",
189 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
190 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192 g_object_class_install_property (gobject_class, PROP_BLOCK,
193 g_param_spec_boolean ("block", "Block",
194 "Block push-buffer when max-bytes are queued",
195 DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
198 * GstAppSrc::need-data:
199 * @appsrc: the appsrc element that emited the signal
200 * @length: the amount of bytes needed.
202 * Signal that the source needs more data. In the callback or from another
203 * thread you should call push-buffer or end-of-stream.
205 * @length is just a hint and when it is set to -1, any number of bytes can be
206 * pushed into @appsrc.
208 * You can call push-buffer multiple times until the enough-data signal is
211 gst_app_src_signals[SIGNAL_NEED_DATA] =
212 g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
213 G_STRUCT_OFFSET (GstAppSrcClass, need_data),
214 NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
217 * GstAppSrc::enough-data:
218 * @appsrc: the appsrc element that emited the signal
220 * Signal that the source has enough data. It is recommended that the
221 * application stops calling push-buffer until the need-data signal is
222 * emited again to avoid excessive buffer queueing.
224 gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
225 g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
226 G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
227 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
230 * GstAppSrc::seek-data:
231 * @appsrc: the appsrc element that emited the signal
232 * @offset: the offset to seek to
234 * Seek to the given offset. The next push-buffer should produce buffers from
236 * This callback is only called for seekable stream types.
238 * Returns: %TRUE if the seek succeeded.
240 gst_app_src_signals[SIGNAL_SEEK_DATA] =
241 g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
242 G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
243 NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
247 * GstAppSrc::push-buffer:
248 * @appsrc: the appsrc
249 * @buffer: a buffer to push
251 * Adds a buffer to the queue of buffers that the appsrc element will
252 * push to its source pad. This function will take ownership of @buffer.
254 gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
255 g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
256 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
257 push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT,
258 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
261 * GstAppSrc::end-of-stream:
262 * @appsrc: the appsrc
264 * Notify @appsrc that no more buffer are available.
266 gst_app_src_signals[SIGNAL_END_OF_STREAM] =
267 g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
268 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
269 end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID,
270 GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
272 basesrc_class->create = gst_app_src_create;
273 basesrc_class->start = gst_app_src_start;
274 basesrc_class->stop = gst_app_src_stop;
275 basesrc_class->unlock = gst_app_src_unlock;
276 basesrc_class->unlock_stop = gst_app_src_unlock_stop;
277 basesrc_class->do_seek = gst_app_src_do_seek;
278 basesrc_class->is_seekable = gst_app_src_is_seekable;
279 basesrc_class->check_get_range = gst_app_src_check_get_range;
280 basesrc_class->get_size = gst_app_src_do_get_size;
282 klass->push_buffer = gst_app_src_push_buffer;
283 klass->end_of_stream = gst_app_src_end_of_stream;
287 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
289 appsrc->mutex = g_mutex_new ();
290 appsrc->cond = g_cond_new ();
291 appsrc->queue = g_queue_new ();
293 appsrc->size = DEFAULT_PROP_SIZE;
294 appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE;
295 appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES;
296 appsrc->format = DEFAULT_PROP_FORMAT;
297 appsrc->block = DEFAULT_PROP_BLOCK;
301 gst_app_src_flush_queued (GstAppSrc * src)
305 while ((buf = g_queue_pop_head (src->queue)))
306 gst_buffer_unref (buf);
310 gst_app_src_dispose (GObject * obj)
312 GstAppSrc *appsrc = GST_APP_SRC (obj);
315 gst_caps_unref (appsrc->caps);
318 gst_app_src_flush_queued (appsrc);
320 G_OBJECT_CLASS (parent_class)->dispose (obj);
324 gst_app_src_finalize (GObject * obj)
326 GstAppSrc *appsrc = GST_APP_SRC (obj);
328 g_mutex_free (appsrc->mutex);
329 g_cond_free (appsrc->cond);
330 g_queue_free (appsrc->queue);
332 G_OBJECT_CLASS (parent_class)->finalize (obj);
336 gst_app_src_set_property (GObject * object, guint prop_id,
337 const GValue * value, GParamSpec * pspec)
339 GstAppSrc *appsrc = GST_APP_SRC (object);
343 gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
346 gst_app_src_set_size (appsrc, g_value_get_int64 (value));
348 case PROP_STREAM_TYPE:
349 gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
352 gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
355 appsrc->format = g_value_get_enum (value);
358 appsrc->block = g_value_get_boolean (value);
361 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
367 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
370 GstAppSrc *appsrc = GST_APP_SRC (object);
377 /* we're missing a _take_caps() function to transfer ownership */
378 caps = gst_app_src_get_caps (appsrc);
379 gst_value_set_caps (value, caps);
381 gst_caps_unref (caps);
385 g_value_set_int64 (value, gst_app_src_get_size (appsrc));
387 case PROP_STREAM_TYPE:
388 g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
391 g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
394 g_value_set_enum (value, appsrc->format);
397 g_value_set_boolean (value, appsrc->block);
400 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
406 gst_app_src_unlock (GstBaseSrc * bsrc)
408 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
410 g_mutex_lock (appsrc->mutex);
411 GST_DEBUG_OBJECT (appsrc, "unlock start");
412 appsrc->flushing = TRUE;
413 g_cond_broadcast (appsrc->cond);
414 g_mutex_unlock (appsrc->mutex);
420 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
422 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
424 g_mutex_lock (appsrc->mutex);
425 GST_DEBUG_OBJECT (appsrc, "unlock stop");
426 appsrc->flushing = FALSE;
427 g_cond_broadcast (appsrc->cond);
428 g_mutex_unlock (appsrc->mutex);
434 gst_app_src_start (GstBaseSrc * bsrc)
436 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
438 g_mutex_lock (appsrc->mutex);
439 GST_DEBUG_OBJECT (appsrc, "starting");
440 appsrc->started = TRUE;
441 /* set the offset to -1 so that we always do a first seek. This is only used
442 * in random-access mode. */
444 appsrc->flushing = FALSE;
445 g_mutex_unlock (appsrc->mutex);
447 gst_base_src_set_format (bsrc, appsrc->format);
453 gst_app_src_stop (GstBaseSrc * bsrc)
455 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
457 g_mutex_lock (appsrc->mutex);
458 GST_DEBUG_OBJECT (appsrc, "stopping");
459 appsrc->is_eos = FALSE;
460 appsrc->flushing = TRUE;
461 appsrc->started = FALSE;
462 gst_app_src_flush_queued (appsrc);
463 g_mutex_unlock (appsrc->mutex);
469 gst_app_src_is_seekable (GstBaseSrc * src)
471 GstAppSrc *appsrc = GST_APP_SRC (src);
472 gboolean res = FALSE;
474 switch (appsrc->stream_type) {
475 case GST_APP_STREAM_TYPE_STREAM:
477 case GST_APP_STREAM_TYPE_SEEKABLE:
478 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
486 gst_app_src_check_get_range (GstBaseSrc * src)
488 GstAppSrc *appsrc = GST_APP_SRC (src);
489 gboolean res = FALSE;
491 switch (appsrc->stream_type) {
492 case GST_APP_STREAM_TYPE_STREAM:
493 case GST_APP_STREAM_TYPE_SEEKABLE:
495 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
503 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
505 GstAppSrc *appsrc = GST_APP_SRC (src);
507 *size = gst_app_src_get_size (appsrc);
512 /* will be called in push mode */
514 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
516 GstAppSrc *appsrc = GST_APP_SRC (src);
517 gint64 desired_position;
518 gboolean res = FALSE;
520 desired_position = segment->last_stop;
522 GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
523 desired_position, gst_format_get_name (segment->format));
525 /* no need to try to seek in streaming mode */
526 if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM)
529 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
530 desired_position, &res);
533 GST_DEBUG_OBJECT (appsrc, "flushing queue");
534 gst_app_src_flush_queued (appsrc);
536 GST_WARNING_OBJECT (appsrc, "seek failed");
543 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
546 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
549 g_mutex_lock (appsrc->mutex);
550 /* check flushing first */
551 if (G_UNLIKELY (appsrc->flushing))
554 if (appsrc->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
555 /* if we are dealing with a random-access stream, issue a seek if the offset
557 if (G_UNLIKELY (appsrc->offset != offset)) {
560 g_mutex_unlock (appsrc->mutex);
562 GST_DEBUG_OBJECT (appsrc,
563 "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
564 appsrc->offset, offset);
566 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
569 if (G_UNLIKELY (!res))
570 /* failing to seek is fatal */
573 g_mutex_lock (appsrc->mutex);
575 appsrc->offset = offset;
580 /* return data as long as we have some */
581 if (!g_queue_is_empty (appsrc->queue)) {
584 *buf = g_queue_pop_head (appsrc->queue);
585 buf_size = GST_BUFFER_SIZE (*buf);
587 GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
589 appsrc->queued_bytes -= buf_size;
591 /* only update the offset when in random_access mode */
592 if (appsrc->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
593 appsrc->offset += buf_size;
596 gst_buffer_set_caps (*buf, appsrc->caps);
598 /* signal that we removed an item */
599 g_cond_broadcast (appsrc->cond);
604 g_mutex_unlock (appsrc->mutex);
606 /* we have no data, we need some. We fire the signal with the size hint. */
607 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
610 g_mutex_lock (appsrc->mutex);
611 /* we can be flushing now because we released the lock */
612 if (G_UNLIKELY (appsrc->flushing))
615 /* if we have a buffer now, continue the loop and try to return it. In
616 * random-access mode (where a buffer is normally pushed in the above
617 * signal) we can still be empty because the pushed buffer got flushed or
618 * when the application pushes the requested buffer later, we support both
620 if (!g_queue_is_empty (appsrc->queue))
623 /* no buffer yet, maybe we are EOS, if not, block for more data. */
627 if (G_UNLIKELY (appsrc->is_eos))
630 /* nothing to return, wait a while for new data or flushing. */
631 g_cond_wait (appsrc->cond, appsrc->mutex);
633 g_mutex_unlock (appsrc->mutex);
640 GST_DEBUG_OBJECT (appsrc, "we are flushing");
641 g_mutex_unlock (appsrc->mutex);
642 return GST_FLOW_WRONG_STATE;
646 GST_DEBUG_OBJECT (appsrc, "we are EOS");
647 g_mutex_unlock (appsrc->mutex);
648 return GST_FLOW_UNEXPECTED;
652 GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
654 return GST_FLOW_ERROR;
661 * gst_app_src_set_caps:
662 * @appsrc: a #GstAppSrc
665 * Set the capabilities on the appsrc element. This function takes
666 * a copy of the caps structure. After calling this method, the source will
667 * only produce caps that match @caps. @caps must be fixed and the caps on the
668 * buffers must match the caps or left NULL.
671 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
675 g_return_if_fail (GST_IS_APP_SRC (appsrc));
677 GST_OBJECT_LOCK (appsrc);
678 GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
679 if ((old = appsrc->caps) != caps) {
681 appsrc->caps = gst_caps_copy (caps);
685 gst_caps_unref (old);
687 GST_OBJECT_UNLOCK (appsrc);
691 * gst_app_src_get_caps:
692 * @appsrc: a #GstAppSrc
694 * Get the configured caps on @appsrc.
696 * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
699 gst_app_src_get_caps (GstAppSrc * appsrc)
703 g_return_val_if_fail (appsrc != NULL, NULL);
704 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
706 GST_OBJECT_LOCK (appsrc);
707 if ((caps = appsrc->caps))
709 GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
710 GST_OBJECT_UNLOCK (appsrc);
716 * gst_app_src_set_size:
717 * @appsrc: a #GstAppSrc
718 * @size: the size to set
720 * Set the size of the stream in bytes. A value of -1 means that the size is
724 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
726 g_return_if_fail (appsrc != NULL);
727 g_return_if_fail (GST_IS_APP_SRC (appsrc));
729 GST_OBJECT_LOCK (appsrc);
730 GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
732 GST_OBJECT_UNLOCK (appsrc);
736 * gst_app_src_get_size:
737 * @appsrc: a #GstAppSrc
739 * Get the size of the stream in bytes. A value of -1 means that the size is
742 * Returns: the size of the stream previously set with gst_app_src_set_size();
745 gst_app_src_get_size (GstAppSrc * appsrc)
749 g_return_val_if_fail (appsrc != NULL, -1);
750 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
752 GST_OBJECT_LOCK (appsrc);
754 GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
755 GST_OBJECT_UNLOCK (appsrc);
761 * gst_app_src_set_stream_type:
762 * @appsrc: a #GstAppSrc
763 * @type: the new state
765 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
768 * A stream_type stream
771 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
773 g_return_if_fail (appsrc != NULL);
774 g_return_if_fail (GST_IS_APP_SRC (appsrc));
776 GST_OBJECT_LOCK (appsrc);
777 GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
778 appsrc->stream_type = type;
779 GST_OBJECT_UNLOCK (appsrc);
783 * gst_app_src_get_stream_type:
784 * @appsrc: a #GstAppSrc
786 * Get the stream type. Control the stream type of @appsrc
787 * with gst_app_src_set_stream_type().
789 * Returns: the stream type.
792 gst_app_src_get_stream_type (GstAppSrc * appsrc)
794 gboolean stream_type;
796 g_return_val_if_fail (appsrc != NULL, FALSE);
797 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
799 GST_OBJECT_LOCK (appsrc);
800 stream_type = appsrc->stream_type;
801 GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
802 GST_OBJECT_UNLOCK (appsrc);
808 * gst_app_src_set_max_bytes:
809 * @appsrc: a #GstAppSrc
810 * @max: the maximum number of bytes to queue
812 * Set the maximum amount of bytes that can be queued in @appsrc.
813 * After the maximum amount of bytes are queued, @appsrc will emit the
814 * "enough-data" signal.
817 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
819 g_return_if_fail (GST_IS_APP_SRC (appsrc));
821 g_mutex_lock (appsrc->mutex);
822 if (max != appsrc->max_bytes) {
823 GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
824 appsrc->max_bytes = max;
825 /* signal the change */
826 g_cond_broadcast (appsrc->cond);
828 g_mutex_unlock (appsrc->mutex);
832 * gst_app_src_get_max_bytes:
833 * @appsrc: a #GstAppSrc
835 * Get the maximum amount of bytes that can be queued in @appsrc.
837 * Returns: The maximum amount of bytes that can be queued.
840 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
844 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
846 g_mutex_lock (appsrc->mutex);
847 result = appsrc->max_bytes;
848 GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
849 g_mutex_unlock (appsrc->mutex);
855 * gst_app_src_push_buffer:
856 * @appsrc: a #GstAppSrc
857 * @buffer: a #GstBuffer to push
859 * Adds a buffer to the queue of buffers that the appsrc element will
860 * push to its source pad. This function takes ownership of the buffer.
862 * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
863 * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
864 * #GST_FLOW_UNEXPECTED when EOS occured.
867 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
869 gboolean first = TRUE;
871 g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
872 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
873 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
875 g_mutex_lock (appsrc->mutex);
878 /* can't accept buffers when we are flushing or EOS */
879 if (appsrc->flushing)
885 if (appsrc->queued_bytes >= appsrc->max_bytes) {
886 GST_DEBUG_OBJECT (appsrc, "queue filled (%" G_GUINT64_FORMAT " >= %"
887 G_GUINT64_FORMAT ")", appsrc->queued_bytes, appsrc->max_bytes);
890 /* only signal on the first push */
891 g_mutex_unlock (appsrc->mutex);
893 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
896 g_mutex_lock (appsrc->mutex);
897 /* continue to check for flushing/eos after releasing the lock */
902 GST_DEBUG_OBJECT (appsrc, "waiting for free space");
903 /* we are filled, wait until a buffer gets popped or when we
905 g_cond_wait (appsrc->cond, appsrc->mutex);
907 /* no need to wait for free space, we just pump data into the queue */
914 GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
915 g_queue_push_tail (appsrc->queue, buffer);
916 appsrc->queued_bytes += GST_BUFFER_SIZE (buffer);
917 g_cond_broadcast (appsrc->cond);
918 g_mutex_unlock (appsrc->mutex);
925 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
926 gst_buffer_unref (buffer);
927 return GST_FLOW_WRONG_STATE;
931 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
932 gst_buffer_unref (buffer);
933 return GST_FLOW_UNEXPECTED;
938 * gst_app_src_end_of_stream:
939 * @appsrc: a #GstAppSrc
941 * Indicates to the appsrc element that the last buffer queued in the
942 * element is the last buffer of the stream.
944 * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
945 * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
948 gst_app_src_end_of_stream (GstAppSrc * appsrc)
950 g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
951 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
953 g_mutex_lock (appsrc->mutex);
954 /* can't accept buffers when we are flushing. We can accept them when we are
955 * EOS although it will not do anything. */
956 if (appsrc->flushing)
959 GST_DEBUG_OBJECT (appsrc, "sending EOS");
960 appsrc->is_eos = TRUE;
961 g_cond_broadcast (appsrc->cond);
962 g_mutex_unlock (appsrc->mutex);
969 GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
970 return GST_FLOW_WRONG_STATE;
974 /*** GSTURIHANDLER INTERFACE *************************************************/
977 gst_app_src_uri_get_type (void)
983 gst_app_src_uri_get_protocols (void)
985 static gchar *protocols[] = { "appsrc", NULL };
990 gst_app_src_uri_get_uri (GstURIHandler * handler)
996 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1001 protocol = gst_uri_get_protocol (uri);
1002 ret = !strcmp (protocol, "appsrc");
1009 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1011 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1013 iface->get_type = gst_app_src_uri_get_type;
1014 iface->get_protocols = gst_app_src_uri_get_protocols;
1015 iface->get_uri = gst_app_src_uri_get_uri;
1016 iface->set_uri = gst_app_src_uri_set_uri;