Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @short_description: Easy way for applications to inject buffers into a
23  *     pipeline
24  * @see_also: #GstBaseSrc, appsink
25  *
26  * The appsrc element can be used by applications to insert data into a
27  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
28  * external API functions.
29  *
30  * appsrc can be used by linking with the libgstapp library to access the
31  * methods directly or by using the appsrc action signals.
32  *
33  * Before operating appsrc, the caps property must be set to a fixed caps
34  * describing the format of the data that will be pushed with appsrc. An
35  * exception to this is when pushing buffers with unknown caps, in which case no
36  * caps should be set. This is typically true of file-like sources that push raw
37  * byte buffers.
38  *
39  * The main way of handing data to the appsrc element is by calling the
40  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
41  * This will put the buffer onto a queue from which appsrc will read from in its
42  * streaming thread. It is important to note that data transport will not happen
43  * from the thread that performed the push-buffer call.
44  *
45  * The "max-bytes" property controls how much data can be queued in appsrc
46  * before appsrc considers the queue full. A filled internal queue will always
47  * signal the "enough-data" signal, which signals the application that it should
48  * stop pushing data into appsrc. The "block" property will cause appsrc to
49  * block the push-buffer method until free data becomes available again.
50  *
51  * When the internal queue is running out of data, the "need-data" signal is
52  * emitted, which signals the application that it should start pushing more data
53  * into appsrc.
54  *
55  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
56  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
57  * "random-access". The signal argument will contain the new desired position in
58  * the stream expressed in the unit set with the "format" property. After
59  * receiving the seek-data signal, the application should push-buffers from the
60  * new position.
61  *
62  * These signals allow the application to operate the appsrc in two different
63  * ways:
64  *
65  * The push model, in which the application repeatedly calls the push-buffer method
66  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
67  * with the enough-data and need-data signals by respectively stopping/starting
68  * the push-buffer calls. This is a typical mode of operation for the
69  * stream-type "stream" and "seekable". Use this model when implementing various
70  * network protocols or hardware devices.
71  *
72  * The pull model where the need-data signal triggers the next push-buffer call.
73  * This mode is typically used in the "random-access" stream-type. Use this
74  * model for file access or other randomly accessable sources. In this mode, a
75  * buffer of exactly the amount of bytes given by the need-data signal should be
76  * pushed into appsrc.
77  *
78  * In all modes, the size property on appsrc should contain the total stream
79  * size in bytes. Setting this property is mandatory in the random-access mode.
80  * For the stream and seekable modes, setting this property is optional but
81  * recommended.
82  *
83  * When the application is finished pushing data into appsrc, it should call
84  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
85  * this call, no more buffers can be pushed into appsrc until a flushing seek
86  * happened or the state of the appsrc has gone through READY.
87  *
88  * Last reviewed on 2008-12-17 (0.10.10)
89  *
90  * Since: 0.10.22
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/base/gstbasesrc.h>
99
100 #include <string.h>
101
102 #include "gstapp-marshal.h"
103 #include "gstappsrc.h"
104
105 #include "gst/glib-compat-private.h"
106
107 struct _GstAppSrcPrivate
108 {
109   GCond *cond;
110   GMutex *mutex;
111   GQueue *queue;
112
113   GstCaps *caps;
114   gint64 size;
115   GstAppStreamType stream_type;
116   guint64 max_bytes;
117   GstFormat format;
118   gboolean block;
119
120   gboolean flushing;
121   gboolean started;
122   gboolean is_eos;
123   guint64 queued_bytes;
124   guint64 offset;
125   GstAppStreamType current_type;
126
127   guint64 min_latency;
128   guint64 max_latency;
129   gboolean emit_signals;
130   guint min_percent;
131
132   GstAppSrcCallbacks callbacks;
133   gpointer user_data;
134   GDestroyNotify notify;
135 };
136
137 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
138 #define GST_CAT_DEFAULT app_src_debug
139
140 enum
141 {
142   /* signals */
143   SIGNAL_NEED_DATA,
144   SIGNAL_ENOUGH_DATA,
145   SIGNAL_SEEK_DATA,
146
147   /* actions */
148   SIGNAL_PUSH_BUFFER,
149   SIGNAL_END_OF_STREAM,
150
151   LAST_SIGNAL
152 };
153
154 #define DEFAULT_PROP_SIZE          -1
155 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
156 #define DEFAULT_PROP_MAX_BYTES     200000
157 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
158 #define DEFAULT_PROP_BLOCK         FALSE
159 #define DEFAULT_PROP_IS_LIVE       FALSE
160 #define DEFAULT_PROP_MIN_LATENCY   -1
161 #define DEFAULT_PROP_MAX_LATENCY   -1
162 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
163 #define DEFAULT_PROP_MIN_PERCENT   0
164
165 enum
166 {
167   PROP_0,
168   PROP_CAPS,
169   PROP_SIZE,
170   PROP_STREAM_TYPE,
171   PROP_MAX_BYTES,
172   PROP_FORMAT,
173   PROP_BLOCK,
174   PROP_IS_LIVE,
175   PROP_MIN_LATENCY,
176   PROP_MAX_LATENCY,
177   PROP_EMIT_SIGNALS,
178   PROP_MIN_PERCENT,
179   PROP_LAST
180 };
181
182 static GstStaticPadTemplate gst_app_src_template =
183 GST_STATIC_PAD_TEMPLATE ("src",
184     GST_PAD_SRC,
185     GST_PAD_ALWAYS,
186     GST_STATIC_CAPS_ANY);
187
188 GType
189 gst_app_stream_type_get_type (void)
190 {
191   static volatile gsize stream_type_type = 0;
192   static const GEnumValue stream_type[] = {
193     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
194     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
195     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
196         "random-access"},
197     {0, NULL, NULL}
198   };
199
200   if (g_once_init_enter (&stream_type_type)) {
201     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
202     g_once_init_leave (&stream_type_type, tmp);
203   }
204
205   return (GType) stream_type_type;
206 }
207
208 static void gst_app_src_uri_handler_init (gpointer g_iface,
209     gpointer iface_data);
210
211 static void gst_app_src_dispose (GObject * object);
212 static void gst_app_src_finalize (GObject * object);
213
214 static void gst_app_src_set_property (GObject * object, guint prop_id,
215     const GValue * value, GParamSpec * pspec);
216 static void gst_app_src_get_property (GObject * object, guint prop_id,
217     GValue * value, GParamSpec * pspec);
218
219 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
220     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
221
222 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
223 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
224     guint64 offset, guint size, GstBuffer ** buf);
225 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
227 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
228 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
229 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
230 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
231 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
232 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
233
234 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
235     GstBuffer * buffer);
236
237 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
238
239 #define gst_app_src_parent_class parent_class
240 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
241     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
242
243 static void
244 gst_app_src_class_init (GstAppSrcClass * klass)
245 {
246   GObjectClass *gobject_class = (GObjectClass *) klass;
247   GstElementClass *element_class = (GstElementClass *) klass;
248   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
249
250   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
251
252   gobject_class->dispose = gst_app_src_dispose;
253   gobject_class->finalize = gst_app_src_finalize;
254
255   gobject_class->set_property = gst_app_src_set_property;
256   gobject_class->get_property = gst_app_src_get_property;
257
258   /**
259    * GstAppSrc::caps
260    *
261    * The GstCaps that will negotiated downstream and will be put
262    * on outgoing buffers.
263    */
264   g_object_class_install_property (gobject_class, PROP_CAPS,
265       g_param_spec_boxed ("caps", "Caps",
266           "The allowed caps for the src pad", GST_TYPE_CAPS,
267           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
268   /**
269    * GstAppSrc::format
270    *
271    * The format to use for segment events. When the source is producing
272    * timestamped buffers this property should be set to GST_FORMAT_TIME.
273    */
274   g_object_class_install_property (gobject_class, PROP_FORMAT,
275       g_param_spec_enum ("format", "Format",
276           "The format of the segment events and seek", GST_TYPE_FORMAT,
277           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278   /**
279    * GstAppSrc::size
280    *
281    * The total size in bytes of the data stream. If the total size is known, it
282    * is recommended to configure it with this property.
283    */
284   g_object_class_install_property (gobject_class, PROP_SIZE,
285       g_param_spec_int64 ("size", "Size",
286           "The size of the data stream in bytes (-1 if unknown)",
287           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
288           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289   /**
290    * GstAppSrc::stream-type
291    *
292    * The type of stream that this source is producing.  For seekable streams the
293    * application should connect to the seek-data signal.
294    */
295   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
296       g_param_spec_enum ("stream-type", "Stream Type",
297           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
298           DEFAULT_PROP_STREAM_TYPE,
299           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   /**
301    * GstAppSrc::max-bytes
302    *
303    * The maximum amount of bytes that can be queued internally.
304    * After the maximum amount of bytes are queued, appsrc will emit the
305    * "enough-data" signal.
306    */
307   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
308       g_param_spec_uint64 ("max-bytes", "Max bytes",
309           "The maximum number of bytes to queue internally (0 = unlimited)",
310           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
311           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
312   /**
313    * GstAppSrc::block
314    *
315    * When max-bytes are queued and after the enough-data signal has been emitted,
316    * block any further push-buffer calls until the amount of queued bytes drops
317    * below the max-bytes limit.
318    */
319   g_object_class_install_property (gobject_class, PROP_BLOCK,
320       g_param_spec_boolean ("block", "Block",
321           "Block push-buffer when max-bytes are queued",
322           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
323
324   /**
325    * GstAppSrc::is-live
326    *
327    * Instruct the source to behave like a live source. This includes that it
328    * will only push out buffers in the PLAYING state.
329    */
330   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
331       g_param_spec_boolean ("is-live", "Is Live",
332           "Whether to act as a live source",
333           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334   /**
335    * GstAppSrc::min-latency
336    *
337    * The minimum latency of the source. A value of -1 will use the default
338    * latency calculations of #GstBaseSrc.
339    */
340   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
341       g_param_spec_int64 ("min-latency", "Min Latency",
342           "The minimum latency (-1 = default)",
343           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
344           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345   /**
346    * GstAppSrc::max-latency
347    *
348    * The maximum latency of the source. A value of -1 means an unlimited amout
349    * of latency.
350    */
351   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
352       g_param_spec_int64 ("max-latency", "Max Latency",
353           "The maximum latency (-1 = unlimited)",
354           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   /**
358    * GstAppSrc::emit-signals
359    *
360    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
361    * This option is by default enabled for backwards compatibility reasons but
362    * can disabled when needed because signal emission is expensive.
363    *
364    * Since: 0.10.23
365    */
366   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
367       g_param_spec_boolean ("emit-signals", "Emit signals",
368           "Emit need-data, enough-data and seek-data signals",
369           DEFAULT_PROP_EMIT_SIGNALS,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371
372   /**
373    * GstAppSrc::empty-percent
374    *
375    * Make appsrc emit the "need-data" signal when the amount of bytes in the
376    * queue drops below this percentage of max-bytes.
377    *
378    * Since: 0.10.27
379    */
380   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
381       g_param_spec_uint ("min-percent", "Min Percent",
382           "Emit need-data when queued bytes drops below this percent of max-bytes",
383           0, 100, DEFAULT_PROP_MIN_PERCENT,
384           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
385
386   /**
387    * GstAppSrc::need-data:
388    * @appsrc: the appsrc element that emitted the signal
389    * @length: the amount of bytes needed.
390    *
391    * Signal that the source needs more data. In the callback or from another
392    * thread you should call push-buffer or end-of-stream.
393    *
394    * @length is just a hint and when it is set to -1, any number of bytes can be
395    * pushed into @appsrc.
396    *
397    * You can call push-buffer multiple times until the enough-data signal is
398    * fired.
399    */
400   gst_app_src_signals[SIGNAL_NEED_DATA] =
401       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
402       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
403       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
404
405   /**
406    * GstAppSrc::enough-data:
407    * @appsrc: the appsrc element that emitted the signal
408    *
409    * Signal that the source has enough data. It is recommended that the
410    * application stops calling push-buffer until the need-data signal is
411    * emitted again to avoid excessive buffer queueing.
412    */
413   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
414       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
415       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
416       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
417
418   /**
419    * GstAppSrc::seek-data:
420    * @appsrc: the appsrc element that emitted the signal
421    * @offset: the offset to seek to
422    *
423    * Seek to the given offset. The next push-buffer should produce buffers from
424    * the new @offset.
425    * This callback is only called for seekable stream types.
426    *
427    * Returns: %TRUE if the seek succeeded.
428    */
429   gst_app_src_signals[SIGNAL_SEEK_DATA] =
430       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
431       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
432       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
433       G_TYPE_UINT64);
434
435    /**
436     * GstAppSrc::push-buffer:
437     * @appsrc: the appsrc
438     * @buffer: a buffer to push
439     *
440     * Adds a buffer to the queue of buffers that the appsrc element will
441     * push to its source pad. This function does not take ownership of the
442     * buffer so the buffer needs to be unreffed after calling this function.
443     *
444     * When the block property is TRUE, this function can block until free space
445     * becomes available in the queue.
446     */
447   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
448       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
449       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
450           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__BOXED,
451       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
452
453    /**
454     * GstAppSrc::end-of-stream:
455     * @appsrc: the appsrc
456     *
457     * Notify @appsrc that no more buffer are available.
458     */
459   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
460       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
461       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
462           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
463       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
464
465   gst_element_class_set_details_simple (element_class, "AppSrc",
466       "Generic/Source", "Allow the application to feed buffers to a pipeline",
467       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
468
469   gst_element_class_add_pad_template (element_class,
470       gst_static_pad_template_get (&gst_app_src_template));
471
472   basesrc_class->negotiate = gst_app_src_negotiate;
473   basesrc_class->create = gst_app_src_create;
474   basesrc_class->start = gst_app_src_start;
475   basesrc_class->stop = gst_app_src_stop;
476   basesrc_class->unlock = gst_app_src_unlock;
477   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
478   basesrc_class->do_seek = gst_app_src_do_seek;
479   basesrc_class->is_seekable = gst_app_src_is_seekable;
480   basesrc_class->get_size = gst_app_src_do_get_size;
481   basesrc_class->get_size = gst_app_src_do_get_size;
482   basesrc_class->query = gst_app_src_query;
483
484   klass->push_buffer = gst_app_src_push_buffer_action;
485   klass->end_of_stream = gst_app_src_end_of_stream;
486
487   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
488 }
489
490 static void
491 gst_app_src_init (GstAppSrc * appsrc)
492 {
493   GstAppSrcPrivate *priv;
494
495   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
496       GstAppSrcPrivate);
497
498   priv->mutex = g_mutex_new ();
499   priv->cond = g_cond_new ();
500   priv->queue = g_queue_new ();
501
502   priv->size = DEFAULT_PROP_SIZE;
503   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
504   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
505   priv->format = DEFAULT_PROP_FORMAT;
506   priv->block = DEFAULT_PROP_BLOCK;
507   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
508   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
509   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
510   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
511
512   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
513 }
514
515 static void
516 gst_app_src_flush_queued (GstAppSrc * src)
517 {
518   GstBuffer *buf;
519   GstAppSrcPrivate *priv = src->priv;
520
521   while ((buf = g_queue_pop_head (priv->queue)))
522     gst_buffer_unref (buf);
523   priv->queued_bytes = 0;
524 }
525
526 static void
527 gst_app_src_dispose (GObject * obj)
528 {
529   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
530   GstAppSrcPrivate *priv = appsrc->priv;
531
532   if (priv->caps) {
533     gst_caps_unref (priv->caps);
534     priv->caps = NULL;
535   }
536   gst_app_src_flush_queued (appsrc);
537
538   G_OBJECT_CLASS (parent_class)->dispose (obj);
539 }
540
541 static void
542 gst_app_src_finalize (GObject * obj)
543 {
544   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
545   GstAppSrcPrivate *priv = appsrc->priv;
546
547   g_mutex_free (priv->mutex);
548   g_cond_free (priv->cond);
549   g_queue_free (priv->queue);
550
551   G_OBJECT_CLASS (parent_class)->finalize (obj);
552 }
553
554 static void
555 gst_app_src_set_property (GObject * object, guint prop_id,
556     const GValue * value, GParamSpec * pspec)
557 {
558   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
559   GstAppSrcPrivate *priv = appsrc->priv;
560
561   switch (prop_id) {
562     case PROP_CAPS:
563       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
564       break;
565     case PROP_SIZE:
566       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
567       break;
568     case PROP_STREAM_TYPE:
569       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
570       break;
571     case PROP_MAX_BYTES:
572       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
573       break;
574     case PROP_FORMAT:
575       priv->format = g_value_get_enum (value);
576       break;
577     case PROP_BLOCK:
578       priv->block = g_value_get_boolean (value);
579       break;
580     case PROP_IS_LIVE:
581       gst_base_src_set_live (GST_BASE_SRC (appsrc),
582           g_value_get_boolean (value));
583       break;
584     case PROP_MIN_LATENCY:
585       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
586           FALSE, -1);
587       break;
588     case PROP_MAX_LATENCY:
589       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
590           g_value_get_int64 (value));
591       break;
592     case PROP_EMIT_SIGNALS:
593       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
594       break;
595     case PROP_MIN_PERCENT:
596       priv->min_percent = g_value_get_uint (value);
597       break;
598     default:
599       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
600       break;
601   }
602 }
603
604 static void
605 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
606     GParamSpec * pspec)
607 {
608   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
609   GstAppSrcPrivate *priv = appsrc->priv;
610
611   switch (prop_id) {
612     case PROP_CAPS:
613     {
614       GstCaps *caps;
615
616       /* we're missing a _take_caps() function to transfer ownership */
617       caps = gst_app_src_get_caps (appsrc);
618       gst_value_set_caps (value, caps);
619       if (caps)
620         gst_caps_unref (caps);
621       break;
622     }
623     case PROP_SIZE:
624       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
625       break;
626     case PROP_STREAM_TYPE:
627       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
628       break;
629     case PROP_MAX_BYTES:
630       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
631       break;
632     case PROP_FORMAT:
633       g_value_set_enum (value, priv->format);
634       break;
635     case PROP_BLOCK:
636       g_value_set_boolean (value, priv->block);
637       break;
638     case PROP_IS_LIVE:
639       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
640       break;
641     case PROP_MIN_LATENCY:
642     {
643       guint64 min;
644
645       gst_app_src_get_latency (appsrc, &min, NULL);
646       g_value_set_int64 (value, min);
647       break;
648     }
649     case PROP_MAX_LATENCY:
650     {
651       guint64 max;
652
653       gst_app_src_get_latency (appsrc, &max, NULL);
654       g_value_set_int64 (value, max);
655       break;
656     }
657     case PROP_EMIT_SIGNALS:
658       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
659       break;
660     case PROP_MIN_PERCENT:
661       g_value_set_uint (value, priv->min_percent);
662       break;
663     default:
664       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
665       break;
666   }
667 }
668
669 static gboolean
670 gst_app_src_unlock (GstBaseSrc * bsrc)
671 {
672   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
673   GstAppSrcPrivate *priv = appsrc->priv;
674
675   g_mutex_lock (priv->mutex);
676   GST_DEBUG_OBJECT (appsrc, "unlock start");
677   priv->flushing = TRUE;
678   g_cond_broadcast (priv->cond);
679   g_mutex_unlock (priv->mutex);
680
681   return TRUE;
682 }
683
684 static gboolean
685 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
686 {
687   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
688   GstAppSrcPrivate *priv = appsrc->priv;
689
690   g_mutex_lock (priv->mutex);
691   GST_DEBUG_OBJECT (appsrc, "unlock stop");
692   priv->flushing = FALSE;
693   g_cond_broadcast (priv->cond);
694   g_mutex_unlock (priv->mutex);
695
696   return TRUE;
697 }
698
699 static gboolean
700 gst_app_src_start (GstBaseSrc * bsrc)
701 {
702   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
703   GstAppSrcPrivate *priv = appsrc->priv;
704
705   g_mutex_lock (priv->mutex);
706   GST_DEBUG_OBJECT (appsrc, "starting");
707   priv->started = TRUE;
708   /* set the offset to -1 so that we always do a first seek. This is only used
709    * in random-access mode. */
710   priv->offset = -1;
711   priv->flushing = FALSE;
712   g_mutex_unlock (priv->mutex);
713
714   gst_base_src_set_format (bsrc, priv->format);
715
716   return TRUE;
717 }
718
719 static gboolean
720 gst_app_src_stop (GstBaseSrc * bsrc)
721 {
722   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
723   GstAppSrcPrivate *priv = appsrc->priv;
724
725   g_mutex_lock (priv->mutex);
726   GST_DEBUG_OBJECT (appsrc, "stopping");
727   priv->is_eos = FALSE;
728   priv->flushing = TRUE;
729   priv->started = FALSE;
730   gst_app_src_flush_queued (appsrc);
731   g_mutex_unlock (priv->mutex);
732
733   return TRUE;
734 }
735
736 static gboolean
737 gst_app_src_is_seekable (GstBaseSrc * src)
738 {
739   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
740   GstAppSrcPrivate *priv = appsrc->priv;
741   gboolean res = FALSE;
742
743   switch (priv->stream_type) {
744     case GST_APP_STREAM_TYPE_STREAM:
745       break;
746     case GST_APP_STREAM_TYPE_SEEKABLE:
747     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
748       res = TRUE;
749       break;
750   }
751   return res;
752 }
753
754 static gboolean
755 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
756 {
757   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
758
759   *size = gst_app_src_get_size (appsrc);
760
761   return TRUE;
762 }
763
764 static gboolean
765 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
766 {
767   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
768   GstAppSrcPrivate *priv = appsrc->priv;
769   gboolean res;
770
771   switch (GST_QUERY_TYPE (query)) {
772     case GST_QUERY_LATENCY:
773     {
774       GstClockTime min, max;
775       gboolean live;
776
777       /* Query the parent class for the defaults */
778       res = gst_base_src_query_latency (src, &live, &min, &max);
779
780       /* overwrite with our values when we need to */
781       g_mutex_lock (priv->mutex);
782       if (priv->min_latency != -1)
783         min = priv->min_latency;
784       if (priv->max_latency != -1)
785         max = priv->max_latency;
786       g_mutex_unlock (priv->mutex);
787
788       gst_query_set_latency (query, live, min, max);
789       break;
790     }
791     case GST_QUERY_SCHEDULING:
792     {
793       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
794       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
795
796       switch (priv->stream_type) {
797         case GST_APP_STREAM_TYPE_STREAM:
798         case GST_APP_STREAM_TYPE_SEEKABLE:
799           break;
800         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
801           gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
802           break;
803       }
804       res = TRUE;
805       break;
806     }
807     default:
808       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
809       break;
810   }
811
812   return res;
813 }
814
815 /* will be called in push mode */
816 static gboolean
817 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
818 {
819   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
820   GstAppSrcPrivate *priv = appsrc->priv;
821   gint64 desired_position;
822   gboolean res = FALSE;
823
824   desired_position = segment->position;
825
826   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
827       desired_position, gst_format_get_name (segment->format));
828
829   /* no need to try to seek in streaming mode */
830   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
831     return TRUE;
832
833   if (priv->callbacks.seek_data)
834     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
835   else {
836     gboolean emit;
837
838     g_mutex_lock (priv->mutex);
839     emit = priv->emit_signals;
840     g_mutex_unlock (priv->mutex);
841
842     if (emit)
843       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
844           desired_position, &res);
845   }
846
847   if (res) {
848     GST_DEBUG_OBJECT (appsrc, "flushing queue");
849     gst_app_src_flush_queued (appsrc);
850     priv->is_eos = FALSE;
851   } else {
852     GST_WARNING_OBJECT (appsrc, "seek failed");
853   }
854
855   return res;
856 }
857
858 /* must be called with the appsrc mutex */
859 static gboolean
860 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
861 {
862   gboolean res = FALSE;
863   gboolean emit;
864   GstAppSrcPrivate *priv = appsrc->priv;
865
866   emit = priv->emit_signals;
867   g_mutex_unlock (priv->mutex);
868
869   GST_DEBUG_OBJECT (appsrc,
870       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
871       priv->offset, offset);
872
873   if (priv->callbacks.seek_data)
874     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
875   else if (emit)
876     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
877         offset, &res);
878
879   g_mutex_lock (priv->mutex);
880
881   return res;
882 }
883
884 /* must be called with the appsrc mutex. After this call things can be
885  * flushing */
886 static void
887 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
888 {
889   gboolean emit;
890   GstAppSrcPrivate *priv = appsrc->priv;
891
892   emit = priv->emit_signals;
893   g_mutex_unlock (priv->mutex);
894
895   /* we have no data, we need some. We fire the signal with the size hint. */
896   if (priv->callbacks.need_data)
897     priv->callbacks.need_data (appsrc, size, priv->user_data);
898   else if (emit)
899     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
900         NULL);
901
902   g_mutex_lock (priv->mutex);
903   /* we can be flushing now because we released the lock */
904 }
905
906 static gboolean
907 gst_app_src_negotiate (GstBaseSrc * basesrc)
908 {
909   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
910   GstAppSrcPrivate *priv = appsrc->priv;
911   GstCaps *caps;
912   gboolean result;
913
914   GST_OBJECT_LOCK (appsrc);
915   if ((caps = priv->caps))
916     gst_caps_ref (caps);
917   GST_OBJECT_UNLOCK (appsrc);
918
919   if (caps) {
920     result = gst_base_src_set_caps (basesrc, caps);
921     gst_caps_unref (caps);
922   } else {
923     result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
924   }
925   return result;
926 }
927
928 static GstFlowReturn
929 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
930     GstBuffer ** buf)
931 {
932   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
933   GstAppSrcPrivate *priv = appsrc->priv;
934   GstFlowReturn ret;
935
936   GST_OBJECT_LOCK (appsrc);
937   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
938           bsrc->segment.format == GST_FORMAT_BYTES)) {
939     GST_DEBUG_OBJECT (appsrc,
940         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
941         bsrc->segment.duration, priv->size);
942     bsrc->segment.duration = priv->size;
943     GST_OBJECT_UNLOCK (appsrc);
944
945     gst_element_post_message (GST_ELEMENT (appsrc),
946         gst_message_new_duration (GST_OBJECT (appsrc), GST_FORMAT_BYTES,
947             priv->size));
948   } else {
949     GST_OBJECT_UNLOCK (appsrc);
950   }
951
952   g_mutex_lock (priv->mutex);
953   /* check flushing first */
954   if (G_UNLIKELY (priv->flushing))
955     goto flushing;
956
957   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
958     /* if we are dealing with a random-access stream, issue a seek if the offset
959      * changed. */
960     if (G_UNLIKELY (priv->offset != offset)) {
961       gboolean res;
962
963       /* do the seek */
964       res = gst_app_src_emit_seek (appsrc, offset);
965
966       if (G_UNLIKELY (!res))
967         /* failing to seek is fatal */
968         goto seek_error;
969
970       priv->offset = offset;
971     }
972   }
973
974   while (TRUE) {
975     /* return data as long as we have some */
976     if (!g_queue_is_empty (priv->queue)) {
977       guint buf_size;
978
979       *buf = g_queue_pop_head (priv->queue);
980       buf_size = gst_buffer_get_size (*buf);
981
982       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
983
984       priv->queued_bytes -= buf_size;
985
986       /* only update the offset when in random_access mode */
987       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
988         priv->offset += buf_size;
989
990       /* signal that we removed an item */
991       g_cond_broadcast (priv->cond);
992
993       /* see if we go lower than the empty-percent */
994       if (priv->min_percent && priv->max_bytes) {
995         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
996           /* ignore flushing state, we got a buffer and we will return it now.
997            * Errors will be handled in the next round */
998           gst_app_src_emit_need_data (appsrc, size);
999       }
1000       ret = GST_FLOW_OK;
1001       break;
1002     } else {
1003       gst_app_src_emit_need_data (appsrc, size);
1004
1005       /* we can be flushing now because we released the lock above */
1006       if (G_UNLIKELY (priv->flushing))
1007         goto flushing;
1008
1009       /* if we have a buffer now, continue the loop and try to return it. In
1010        * random-access mode (where a buffer is normally pushed in the above
1011        * signal) we can still be empty because the pushed buffer got flushed or
1012        * when the application pushes the requested buffer later, we support both
1013        * possibilities. */
1014       if (!g_queue_is_empty (priv->queue))
1015         continue;
1016
1017       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1018     }
1019
1020     /* check EOS */
1021     if (G_UNLIKELY (priv->is_eos))
1022       goto eos;
1023
1024     /* nothing to return, wait a while for new data or flushing. */
1025     g_cond_wait (priv->cond, priv->mutex);
1026   }
1027   g_mutex_unlock (priv->mutex);
1028   return ret;
1029
1030   /* ERRORS */
1031 flushing:
1032   {
1033     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1034     g_mutex_unlock (priv->mutex);
1035     return GST_FLOW_WRONG_STATE;
1036   }
1037 eos:
1038   {
1039     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1040     g_mutex_unlock (priv->mutex);
1041     return GST_FLOW_EOS;
1042   }
1043 seek_error:
1044   {
1045     g_mutex_unlock (priv->mutex);
1046     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1047         GST_ERROR_SYSTEM);
1048     return GST_FLOW_ERROR;
1049   }
1050 }
1051
1052 /* external API */
1053
1054 /**
1055  * gst_app_src_set_caps:
1056  * @appsrc: a #GstAppSrc
1057  * @caps: caps to set
1058  *
1059  * Set the capabilities on the appsrc element.  This function takes
1060  * a copy of the caps structure. After calling this method, the source will
1061  * only produce caps that match @caps. @caps must be fixed and the caps on the
1062  * buffers must match the caps or left NULL.
1063  *
1064  * Since: 0.10.22
1065  */
1066 void
1067 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1068 {
1069   GstCaps *old;
1070   GstAppSrcPrivate *priv;
1071
1072   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1073
1074   priv = appsrc->priv;
1075
1076   GST_OBJECT_LOCK (appsrc);
1077   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1078   if ((old = priv->caps) != caps) {
1079     if (caps)
1080       priv->caps = gst_caps_copy (caps);
1081     else
1082       priv->caps = NULL;
1083     if (old)
1084       gst_caps_unref (old);
1085   }
1086   GST_OBJECT_UNLOCK (appsrc);
1087 }
1088
1089 /**
1090  * gst_app_src_get_caps:
1091  * @appsrc: a #GstAppSrc
1092  *
1093  * Get the configured caps on @appsrc.
1094  *
1095  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1096  *
1097  * Since: 0.10.22
1098  */
1099 GstCaps *
1100 gst_app_src_get_caps (GstAppSrc * appsrc)
1101 {
1102   GstCaps *caps;
1103   GstAppSrcPrivate *priv;
1104
1105   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1106
1107   priv = appsrc->priv;
1108
1109   GST_OBJECT_LOCK (appsrc);
1110   if ((caps = priv->caps))
1111     gst_caps_ref (caps);
1112   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1113   GST_OBJECT_UNLOCK (appsrc);
1114
1115   return caps;
1116 }
1117
1118 /**
1119  * gst_app_src_set_size:
1120  * @appsrc: a #GstAppSrc
1121  * @size: the size to set
1122  *
1123  * Set the size of the stream in bytes. A value of -1 means that the size is
1124  * not known.
1125  *
1126  * Since: 0.10.22
1127  */
1128 void
1129 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1130 {
1131   GstAppSrcPrivate *priv;
1132
1133   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1134
1135   priv = appsrc->priv;
1136
1137   GST_OBJECT_LOCK (appsrc);
1138   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1139   priv->size = size;
1140   GST_OBJECT_UNLOCK (appsrc);
1141 }
1142
1143 /**
1144  * gst_app_src_get_size:
1145  * @appsrc: a #GstAppSrc
1146  *
1147  * Get the size of the stream in bytes. A value of -1 means that the size is
1148  * not known.
1149  *
1150  * Returns: the size of the stream previously set with gst_app_src_set_size();
1151  *
1152  * Since: 0.10.22
1153  */
1154 gint64
1155 gst_app_src_get_size (GstAppSrc * appsrc)
1156 {
1157   gint64 size;
1158   GstAppSrcPrivate *priv;
1159
1160   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1161
1162   priv = appsrc->priv;
1163
1164   GST_OBJECT_LOCK (appsrc);
1165   size = priv->size;
1166   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1167   GST_OBJECT_UNLOCK (appsrc);
1168
1169   return size;
1170 }
1171
1172 /**
1173  * gst_app_src_set_stream_type:
1174  * @appsrc: a #GstAppSrc
1175  * @type: the new state
1176  *
1177  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1178  * be connected to.
1179  *
1180  * A stream_type stream
1181  *
1182  * Since: 0.10.22
1183  */
1184 void
1185 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1186 {
1187   GstAppSrcPrivate *priv;
1188
1189   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1190
1191   priv = appsrc->priv;
1192
1193   GST_OBJECT_LOCK (appsrc);
1194   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1195   priv->stream_type = type;
1196   GST_OBJECT_UNLOCK (appsrc);
1197 }
1198
1199 /**
1200  * gst_app_src_get_stream_type:
1201  * @appsrc: a #GstAppSrc
1202  *
1203  * Get the stream type. Control the stream type of @appsrc
1204  * with gst_app_src_set_stream_type().
1205  *
1206  * Returns: the stream type.
1207  *
1208  * Since: 0.10.22
1209  */
1210 GstAppStreamType
1211 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1212 {
1213   gboolean stream_type;
1214   GstAppSrcPrivate *priv;
1215
1216   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1217
1218   priv = appsrc->priv;
1219
1220   GST_OBJECT_LOCK (appsrc);
1221   stream_type = priv->stream_type;
1222   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1223   GST_OBJECT_UNLOCK (appsrc);
1224
1225   return stream_type;
1226 }
1227
1228 /**
1229  * gst_app_src_set_max_bytes:
1230  * @appsrc: a #GstAppSrc
1231  * @max: the maximum number of bytes to queue
1232  *
1233  * Set the maximum amount of bytes that can be queued in @appsrc.
1234  * After the maximum amount of bytes are queued, @appsrc will emit the
1235  * "enough-data" signal.
1236  *
1237  * Since: 0.10.22
1238  */
1239 void
1240 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1241 {
1242   GstAppSrcPrivate *priv;
1243
1244   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1245
1246   priv = appsrc->priv;
1247
1248   g_mutex_lock (priv->mutex);
1249   if (max != priv->max_bytes) {
1250     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1251     priv->max_bytes = max;
1252     /* signal the change */
1253     g_cond_broadcast (priv->cond);
1254   }
1255   g_mutex_unlock (priv->mutex);
1256 }
1257
1258 /**
1259  * gst_app_src_get_max_bytes:
1260  * @appsrc: a #GstAppSrc
1261  *
1262  * Get the maximum amount of bytes that can be queued in @appsrc.
1263  *
1264  * Returns: The maximum amount of bytes that can be queued.
1265  *
1266  * Since: 0.10.22
1267  */
1268 guint64
1269 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1270 {
1271   guint64 result;
1272   GstAppSrcPrivate *priv;
1273
1274   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1275
1276   priv = appsrc->priv;
1277
1278   g_mutex_lock (priv->mutex);
1279   result = priv->max_bytes;
1280   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1281   g_mutex_unlock (priv->mutex);
1282
1283   return result;
1284 }
1285
1286 static void
1287 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1288     gboolean do_max, guint64 max)
1289 {
1290   GstAppSrcPrivate *priv = appsrc->priv;
1291   gboolean changed = FALSE;
1292
1293   g_mutex_lock (priv->mutex);
1294   if (do_min && priv->min_latency != min) {
1295     priv->min_latency = min;
1296     changed = TRUE;
1297   }
1298   if (do_max && priv->max_latency != max) {
1299     priv->max_latency = max;
1300     changed = TRUE;
1301   }
1302   g_mutex_unlock (priv->mutex);
1303
1304   if (changed) {
1305     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1306     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1307         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1308   }
1309 }
1310
1311 /**
1312  * gst_app_src_set_latency:
1313  * @appsrc: a #GstAppSrc
1314  * @min: the min latency
1315  * @max: the min latency
1316  *
1317  * Configure the @min and @max latency in @src. If @min is set to -1, the
1318  * default latency calculations for pseudo-live sources will be used.
1319  *
1320  * Since: 0.10.22
1321  */
1322 void
1323 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1324 {
1325   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1326 }
1327
1328 /**
1329  * gst_app_src_get_latency:
1330  * @appsrc: a #GstAppSrc
1331  * @min: the min latency
1332  * @max: the min latency
1333  *
1334  * Retrieve the min and max latencies in @min and @max respectively.
1335  *
1336  * Since: 0.10.22
1337  */
1338 void
1339 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1340 {
1341   GstAppSrcPrivate *priv;
1342
1343   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1344
1345   priv = appsrc->priv;
1346
1347   g_mutex_lock (priv->mutex);
1348   if (min)
1349     *min = priv->min_latency;
1350   if (max)
1351     *max = priv->max_latency;
1352   g_mutex_unlock (priv->mutex);
1353 }
1354
1355 /**
1356  * gst_app_src_set_emit_signals:
1357  * @appsrc: a #GstAppSrc
1358  * @emit: the new state
1359  *
1360  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1361  * by default disabled because signal emission is expensive and unneeded when
1362  * the application prefers to operate in pull mode.
1363  *
1364  * Since: 0.10.23
1365  */
1366 void
1367 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1368 {
1369   GstAppSrcPrivate *priv;
1370
1371   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1372
1373   priv = appsrc->priv;
1374
1375   g_mutex_lock (priv->mutex);
1376   priv->emit_signals = emit;
1377   g_mutex_unlock (priv->mutex);
1378 }
1379
1380 /**
1381  * gst_app_src_get_emit_signals:
1382  * @appsrc: a #GstAppSrc
1383  *
1384  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1385  *
1386  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
1387  * signals.
1388  *
1389  * Since: 0.10.23
1390  */
1391 gboolean
1392 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1393 {
1394   gboolean result;
1395   GstAppSrcPrivate *priv;
1396
1397   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1398
1399   priv = appsrc->priv;
1400
1401   g_mutex_lock (priv->mutex);
1402   result = priv->emit_signals;
1403   g_mutex_unlock (priv->mutex);
1404
1405   return result;
1406 }
1407
1408 static GstFlowReturn
1409 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1410     gboolean steal_ref)
1411 {
1412   gboolean first = TRUE;
1413   GstAppSrcPrivate *priv;
1414
1415   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1416   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1417
1418   priv = appsrc->priv;
1419
1420   g_mutex_lock (priv->mutex);
1421
1422   while (TRUE) {
1423     /* can't accept buffers when we are flushing or EOS */
1424     if (priv->flushing)
1425       goto flushing;
1426
1427     if (priv->is_eos)
1428       goto eos;
1429
1430     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1431       GST_DEBUG_OBJECT (appsrc,
1432           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1433           priv->queued_bytes, priv->max_bytes);
1434
1435       if (first) {
1436         gboolean emit;
1437
1438         emit = priv->emit_signals;
1439         /* only signal on the first push */
1440         g_mutex_unlock (priv->mutex);
1441
1442         if (priv->callbacks.enough_data)
1443           priv->callbacks.enough_data (appsrc, priv->user_data);
1444         else if (emit)
1445           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1446               NULL);
1447
1448         g_mutex_lock (priv->mutex);
1449         /* continue to check for flushing/eos after releasing the lock */
1450         first = FALSE;
1451         continue;
1452       }
1453       if (priv->block) {
1454         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1455         /* we are filled, wait until a buffer gets popped or when we
1456          * flush. */
1457         g_cond_wait (priv->cond, priv->mutex);
1458       } else {
1459         /* no need to wait for free space, we just pump more data into the
1460          * queue hoping that the caller reacts to the enough-data signal and
1461          * stops pushing buffers. */
1462         break;
1463       }
1464     } else
1465       break;
1466   }
1467
1468   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1469   if (!steal_ref)
1470     gst_buffer_ref (buffer);
1471   g_queue_push_tail (priv->queue, buffer);
1472   priv->queued_bytes += gst_buffer_get_size (buffer);
1473   g_cond_broadcast (priv->cond);
1474   g_mutex_unlock (priv->mutex);
1475
1476   return GST_FLOW_OK;
1477
1478   /* ERRORS */
1479 flushing:
1480   {
1481     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1482     if (steal_ref)
1483       gst_buffer_unref (buffer);
1484     g_mutex_unlock (priv->mutex);
1485     return GST_FLOW_WRONG_STATE;
1486   }
1487 eos:
1488   {
1489     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1490     if (steal_ref)
1491       gst_buffer_unref (buffer);
1492     g_mutex_unlock (priv->mutex);
1493     return GST_FLOW_EOS;
1494   }
1495 }
1496
1497 /**
1498  * gst_app_src_push_buffer:
1499  * @appsrc: a #GstAppSrc
1500  * @buffer: a #GstBuffer to push
1501  *
1502  * Adds a buffer to the queue of buffers that the appsrc element will
1503  * push to its source pad.  This function takes ownership of the buffer.
1504  *
1505  * When the block property is TRUE, this function can block until free
1506  * space becomes available in the queue.
1507  *
1508  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1509  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1510  * #GST_FLOW_EOS when EOS occured.
1511  *
1512  * Since: 0.10.22
1513  */
1514 GstFlowReturn
1515 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1516 {
1517   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1518 }
1519
1520 /* push a buffer without stealing the ref of the buffer. This is used for the
1521  * action signal. */
1522 static GstFlowReturn
1523 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1524 {
1525   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1526 }
1527
1528 /**
1529  * gst_app_src_end_of_stream:
1530  * @appsrc: a #GstAppSrc
1531  *
1532  * Indicates to the appsrc element that the last buffer queued in the
1533  * element is the last buffer of the stream.
1534  *
1535  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1536  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1537  *
1538  * Since: 0.10.22
1539  */
1540 GstFlowReturn
1541 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1542 {
1543   GstAppSrcPrivate *priv;
1544
1545   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1546
1547   priv = appsrc->priv;
1548
1549   g_mutex_lock (priv->mutex);
1550   /* can't accept buffers when we are flushing. We can accept them when we are
1551    * EOS although it will not do anything. */
1552   if (priv->flushing)
1553     goto flushing;
1554
1555   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1556   priv->is_eos = TRUE;
1557   g_cond_broadcast (priv->cond);
1558   g_mutex_unlock (priv->mutex);
1559
1560   return GST_FLOW_OK;
1561
1562   /* ERRORS */
1563 flushing:
1564   {
1565     g_mutex_unlock (priv->mutex);
1566     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1567     return GST_FLOW_WRONG_STATE;
1568   }
1569 }
1570
1571 /**
1572  * gst_app_src_set_callbacks:
1573  * @appsrc: a #GstAppSrc
1574  * @callbacks: the callbacks
1575  * @user_data: a user_data argument for the callbacks
1576  * @notify: a destroy notify function
1577  *
1578  * Set callbacks which will be executed when data is needed, enough data has
1579  * been collected or when a seek should be performed.
1580  * This is an alternative to using the signals, it has lower overhead and is thus
1581  * less expensive, but also less flexible.
1582  *
1583  * If callbacks are installed, no signals will be emitted for performance
1584  * reasons.
1585  *
1586  * Since: 0.10.23
1587  */
1588 void
1589 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1590     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1591 {
1592   GDestroyNotify old_notify;
1593   GstAppSrcPrivate *priv;
1594
1595   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1596   g_return_if_fail (callbacks != NULL);
1597
1598   priv = appsrc->priv;
1599
1600   GST_OBJECT_LOCK (appsrc);
1601   old_notify = priv->notify;
1602
1603   if (old_notify) {
1604     gpointer old_data;
1605
1606     old_data = priv->user_data;
1607
1608     priv->user_data = NULL;
1609     priv->notify = NULL;
1610     GST_OBJECT_UNLOCK (appsrc);
1611
1612     old_notify (old_data);
1613
1614     GST_OBJECT_LOCK (appsrc);
1615   }
1616   priv->callbacks = *callbacks;
1617   priv->user_data = user_data;
1618   priv->notify = notify;
1619   GST_OBJECT_UNLOCK (appsrc);
1620 }
1621
1622 /*** GSTURIHANDLER INTERFACE *************************************************/
1623
1624 static GstURIType
1625 gst_app_src_uri_get_type (GType type)
1626 {
1627   return GST_URI_SRC;
1628 }
1629
1630 static const gchar *const *
1631 gst_app_src_uri_get_protocols (GType type)
1632 {
1633   static const gchar *protocols[] = { "appsrc", NULL };
1634
1635   return protocols;
1636 }
1637
1638 static gchar *
1639 gst_app_src_uri_get_uri (GstURIHandler * handler)
1640 {
1641   return g_strdup ("appsrc");
1642 }
1643
1644 static gboolean
1645 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1646     GError ** error)
1647 {
1648   /* GstURIHandler checks the protocol for us */
1649   return TRUE;
1650 }
1651
1652 static void
1653 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1654 {
1655   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1656
1657   iface->get_type = gst_app_src_uri_get_type;
1658   iface->get_protocols = gst_app_src_uri_get_protocols;
1659   iface->get_uri = gst_app_src_uri_get_uri;
1660   iface->set_uri = gst_app_src_uri_set_uri;
1661 }