tizen 2.0 init
[framework/multimedia/gst-plugins-base0.10.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @short_description: Easy way for applications to inject buffers into a
23  *     pipeline
24  * @see_also: #GstBaseSrc, appsink
25  *
26  * The appsrc element can be used by applications to insert data into a
27  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
28  * external API functions.
29  *
30  * appsrc can be used by linking with the libgstapp library to access the
31  * methods directly or by using the appsrc action signals.
32  *
33  * Before operating appsrc, the caps property must be set to a fixed caps
34  * describing the format of the data that will be pushed with appsrc. An
35  * exception to this is when pushing buffers with unknown caps, in which case no
36  * caps should be set. This is typically true of file-like sources that push raw
37  * byte buffers.
38  *
39  * The main way of handing data to the appsrc element is by calling the
40  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
41  * This will put the buffer onto a queue from which appsrc will read from in its
42  * streaming thread. It is important to note that data transport will not happen
43  * from the thread that performed the push-buffer call.
44  *
45  * The "max-bytes" property controls how much data can be queued in appsrc
46  * before appsrc considers the queue full. A filled internal queue will always
47  * signal the "enough-data" signal, which signals the application that it should
48  * stop pushing data into appsrc. The "block" property will cause appsrc to
49  * block the push-buffer method until free data becomes available again.
50  *
51  * When the internal queue is running out of data, the "need-data" signal is
52  * emitted, which signals the application that it should start pushing more data
53  * into appsrc.
54  *
55  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
56  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
57  * "random-access". The signal argument will contain the new desired position in
58  * the stream expressed in the unit set with the "format" property. After
59  * receiving the seek-data signal, the application should push-buffers from the
60  * new position.
61  *
62  * These signals allow the application to operate the appsrc in two different
63  * ways:
64  *
65  * The push model, in which the application repeatedly calls the push-buffer method
66  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
67  * with the enough-data and need-data signals by respectively stopping/starting
68  * the push-buffer calls. This is a typical mode of operation for the
69  * stream-type "stream" and "seekable". Use this model when implementing various
70  * network protocols or hardware devices.
71  *
72  * The pull model where the need-data signal triggers the next push-buffer call.
73  * This mode is typically used in the "random-access" stream-type. Use this
74  * model for file access or other randomly accessable sources. In this mode, a
75  * buffer of exactly the amount of bytes given by the need-data signal should be
76  * pushed into appsrc.
77  *
78  * In all modes, the size property on appsrc should contain the total stream
79  * size in bytes. Setting this property is mandatory in the random-access mode.
80  * For the stream and seekable modes, setting this property is optional but
81  * recommended.
82  *
83  * When the application is finished pushing data into appsrc, it should call
84  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
85  * this call, no more buffers can be pushed into appsrc until a flushing seek
86  * happened or the state of the appsrc has gone through READY.
87  *
88  * Last reviewed on 2008-12-17 (0.10.10)
89  *
90  * Since: 0.10.22
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/base/gstbasesrc.h>
99
100 #include <string.h>
101
102 #include "gstapp-marshal.h"
103 #include "gstappsrc.h"
104
105 #include "gst/glib-compat-private.h"
106
107 struct _GstAppSrcPrivate
108 {
109   GCond *cond;
110   GMutex *mutex;
111   GQueue *queue;
112
113   GstCaps *caps;
114   gint64 size;
115   GstAppStreamType stream_type;
116   guint64 max_bytes;
117   GstFormat format;
118   gboolean block;
119
120   gboolean flushing;
121   gboolean started;
122   gboolean is_eos;
123   guint64 queued_bytes;
124   guint64 offset;
125   GstAppStreamType current_type;
126
127   guint64 min_latency;
128   guint64 max_latency;
129   gboolean emit_signals;
130   guint min_percent;
131
132   GstAppSrcCallbacks callbacks;
133   gpointer user_data;
134   GDestroyNotify notify;
135 };
136
137 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
138 #define GST_CAT_DEFAULT app_src_debug
139
140 enum
141 {
142   /* signals */
143   SIGNAL_NEED_DATA,
144   SIGNAL_ENOUGH_DATA,
145   SIGNAL_SEEK_DATA,
146
147   /* actions */
148   SIGNAL_PUSH_BUFFER,
149   SIGNAL_END_OF_STREAM,
150
151   LAST_SIGNAL
152 };
153
154 #define DEFAULT_PROP_SIZE          -1
155 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
156 #define DEFAULT_PROP_MAX_BYTES     200000
157 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
158 #define DEFAULT_PROP_BLOCK         FALSE
159 #define DEFAULT_PROP_IS_LIVE       FALSE
160 #define DEFAULT_PROP_MIN_LATENCY   -1
161 #define DEFAULT_PROP_MAX_LATENCY   -1
162 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
163 #define DEFAULT_PROP_MIN_PERCENT   0
164
165 enum
166 {
167   PROP_0,
168   PROP_CAPS,
169   PROP_SIZE,
170   PROP_STREAM_TYPE,
171   PROP_MAX_BYTES,
172   PROP_FORMAT,
173   PROP_BLOCK,
174   PROP_IS_LIVE,
175   PROP_MIN_LATENCY,
176   PROP_MAX_LATENCY,
177   PROP_EMIT_SIGNALS,
178   PROP_MIN_PERCENT,
179   PROP_LAST
180 };
181
182 static GstStaticPadTemplate gst_app_src_template =
183 GST_STATIC_PAD_TEMPLATE ("src",
184     GST_PAD_SRC,
185     GST_PAD_ALWAYS,
186     GST_STATIC_CAPS_ANY);
187
188 GType
189 gst_app_stream_type_get_type (void)
190 {
191   static volatile gsize stream_type_type = 0;
192   static const GEnumValue stream_type[] = {
193     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
194     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
195     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
196         "random-access"},
197     {0, NULL, NULL}
198   };
199
200   if (g_once_init_enter (&stream_type_type)) {
201     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
202     g_once_init_leave (&stream_type_type, tmp);
203   }
204
205   return (GType) stream_type_type;
206 }
207
208 static void gst_app_src_uri_handler_init (gpointer g_iface,
209     gpointer iface_data);
210
211 static void gst_app_src_dispose (GObject * object);
212 static void gst_app_src_finalize (GObject * object);
213
214 static void gst_app_src_set_property (GObject * object, guint prop_id,
215     const GValue * value, GParamSpec * pspec);
216 static void gst_app_src_get_property (GObject * object, guint prop_id,
217     GValue * value, GParamSpec * pspec);
218
219 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
220     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
221
222 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc);
223 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
224     guint64 offset, guint size, GstBuffer ** buf);
225 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
227 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
228 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
229 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
230 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
231 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
232 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
233 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
234
235 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
236     GstBuffer * buffer);
237
238 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
239
240 static void
241 _do_init (GType filesrc_type)
242 {
243   static const GInterfaceInfo urihandler_info = {
244     gst_app_src_uri_handler_init,
245     NULL,
246     NULL
247   };
248   g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
249       &urihandler_info);
250 }
251
252 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
253     _do_init);
254
255 static void
256 gst_app_src_base_init (gpointer g_class)
257 {
258   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
259
260   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
261
262   gst_element_class_set_details_simple (element_class, "AppSrc",
263       "Generic/Source", "Allow the application to feed buffers to a pipeline",
264       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
265
266   gst_element_class_add_static_pad_template (element_class,
267       &gst_app_src_template);
268 }
269
270 static void
271 gst_app_src_class_init (GstAppSrcClass * klass)
272 {
273   GObjectClass *gobject_class = (GObjectClass *) klass;
274   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
275
276   gobject_class->dispose = gst_app_src_dispose;
277   gobject_class->finalize = gst_app_src_finalize;
278
279   gobject_class->set_property = gst_app_src_set_property;
280   gobject_class->get_property = gst_app_src_get_property;
281
282   /**
283    * GstAppSrc::caps
284    *
285    * The GstCaps that will negotiated downstream and will be put
286    * on outgoing buffers.
287    */
288   g_object_class_install_property (gobject_class, PROP_CAPS,
289       g_param_spec_boxed ("caps", "Caps",
290           "The allowed caps for the src pad", GST_TYPE_CAPS,
291           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292   /**
293    * GstAppSrc::format
294    *
295    * The format to use for segment events. When the source is producing
296    * timestamped buffers this property should be set to GST_FORMAT_TIME.
297    */
298   g_object_class_install_property (gobject_class, PROP_FORMAT,
299       g_param_spec_enum ("format", "Format",
300           "The format of the segment events and seek", GST_TYPE_FORMAT,
301           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302   /**
303    * GstAppSrc::size
304    *
305    * The total size in bytes of the data stream. If the total size is known, it
306    * is recommended to configure it with this property.
307    */
308   g_object_class_install_property (gobject_class, PROP_SIZE,
309       g_param_spec_int64 ("size", "Size",
310           "The size of the data stream in bytes (-1 if unknown)",
311           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
312           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
313   /**
314    * GstAppSrc::stream-type
315    *
316    * The type of stream that this source is producing.  For seekable streams the
317    * application should connect to the seek-data signal.
318    */
319   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
320       g_param_spec_enum ("stream-type", "Stream Type",
321           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
322           DEFAULT_PROP_STREAM_TYPE,
323           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   /**
325    * GstAppSrc::max-bytes
326    *
327    * The maximum amount of bytes that can be queued internally.
328    * After the maximum amount of bytes are queued, appsrc will emit the
329    * "enough-data" signal.
330    */
331   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
332       g_param_spec_uint64 ("max-bytes", "Max bytes",
333           "The maximum number of bytes to queue internally (0 = unlimited)",
334           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
335           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336   /**
337    * GstAppSrc::block
338    *
339    * When max-bytes are queued and after the enough-data signal has been emitted,
340    * block any further push-buffer calls until the amount of queued bytes drops
341    * below the max-bytes limit.
342    */
343   g_object_class_install_property (gobject_class, PROP_BLOCK,
344       g_param_spec_boolean ("block", "Block",
345           "Block push-buffer when max-bytes are queued",
346           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347
348   /**
349    * GstAppSrc::is-live
350    *
351    * Instruct the source to behave like a live source. This includes that it
352    * will only push out buffers in the PLAYING state.
353    */
354   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
355       g_param_spec_boolean ("is-live", "Is Live",
356           "Whether to act as a live source",
357           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358   /**
359    * GstAppSrc::min-latency
360    *
361    * The minimum latency of the source. A value of -1 will use the default
362    * latency calculations of #GstBaseSrc.
363    */
364   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
365       g_param_spec_int64 ("min-latency", "Min Latency",
366           "The minimum latency (-1 = default)",
367           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   /**
370    * GstAppSrc::max-latency
371    *
372    * The maximum latency of the source. A value of -1 means an unlimited amout
373    * of latency.
374    */
375   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
376       g_param_spec_int64 ("max-latency", "Max Latency",
377           "The maximum latency (-1 = unlimited)",
378           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
379           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   /**
382    * GstAppSrc::emit-signals
383    *
384    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
385    * This option is by default enabled for backwards compatibility reasons but
386    * can disabled when needed because signal emission is expensive.
387    *
388    * Since: 0.10.23
389    */
390   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
391       g_param_spec_boolean ("emit-signals", "Emit signals",
392           "Emit need-data, enough-data and seek-data signals",
393           DEFAULT_PROP_EMIT_SIGNALS,
394           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
395
396   /**
397    * GstAppSrc::empty-percent
398    *
399    * Make appsrc emit the "need-data" signal when the amount of bytes in the
400    * queue drops below this percentage of max-bytes.
401    *
402    * Since: 0.10.27
403    */
404   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
405       g_param_spec_uint ("min-percent", "Min Percent",
406           "Emit need-data when queued bytes drops below this percent of max-bytes",
407           0, 100, DEFAULT_PROP_MIN_PERCENT,
408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409
410   /**
411    * GstAppSrc::need-data:
412    * @appsrc: the appsrc element that emitted the signal
413    * @length: the amount of bytes needed.
414    *
415    * Signal that the source needs more data. In the callback or from another
416    * thread you should call push-buffer or end-of-stream.
417    *
418    * @length is just a hint and when it is set to -1, any number of bytes can be
419    * pushed into @appsrc.
420    *
421    * You can call push-buffer multiple times until the enough-data signal is
422    * fired.
423    */
424   gst_app_src_signals[SIGNAL_NEED_DATA] =
425       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
426       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
427       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
428
429   /**
430    * GstAppSrc::enough-data:
431    * @appsrc: the appsrc element that emitted the signal
432    *
433    * Signal that the source has enough data. It is recommended that the
434    * application stops calling push-buffer until the need-data signal is
435    * emitted again to avoid excessive buffer queueing.
436    */
437   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
438       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
439       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
440       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
441
442   /**
443    * GstAppSrc::seek-data:
444    * @appsrc: the appsrc element that emitted the signal
445    * @offset: the offset to seek to
446    *
447    * Seek to the given offset. The next push-buffer should produce buffers from
448    * the new @offset.
449    * This callback is only called for seekable stream types.
450    *
451    * Returns: %TRUE if the seek succeeded.
452    */
453   gst_app_src_signals[SIGNAL_SEEK_DATA] =
454       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
455       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
456       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
457       G_TYPE_UINT64);
458
459    /**
460     * GstAppSrc::push-buffer:
461     * @appsrc: the appsrc
462     * @buffer: a buffer to push
463     *
464     * Adds a buffer to the queue of buffers that the appsrc element will
465     * push to its source pad. This function does not take ownership of the
466     * buffer so the buffer needs to be unreffed after calling this function.
467     *
468     * When the block property is TRUE, this function can block until free space
469     * becomes available in the queue.
470     */
471   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
472       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
473       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
474           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
475       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
476
477    /**
478     * GstAppSrc::end-of-stream:
479     * @appsrc: the appsrc
480     *
481     * Notify @appsrc that no more buffer are available.
482     */
483   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
484       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
485       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
486           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
487       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
488
489   basesrc_class->get_caps = gst_app_src_internal_get_caps;
490   basesrc_class->create = gst_app_src_create;
491   basesrc_class->start = gst_app_src_start;
492   basesrc_class->stop = gst_app_src_stop;
493   basesrc_class->unlock = gst_app_src_unlock;
494   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
495   basesrc_class->do_seek = gst_app_src_do_seek;
496   basesrc_class->is_seekable = gst_app_src_is_seekable;
497   basesrc_class->check_get_range = gst_app_src_check_get_range;
498   basesrc_class->get_size = gst_app_src_do_get_size;
499   basesrc_class->get_size = gst_app_src_do_get_size;
500   basesrc_class->query = gst_app_src_query;
501
502   klass->push_buffer = gst_app_src_push_buffer_action;
503   klass->end_of_stream = gst_app_src_end_of_stream;
504
505   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
506 }
507
508 static void
509 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
510 {
511   GstAppSrcPrivate *priv;
512
513   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
514       GstAppSrcPrivate);
515
516   priv->mutex = g_mutex_new ();
517   priv->cond = g_cond_new ();
518   priv->queue = g_queue_new ();
519
520   priv->size = DEFAULT_PROP_SIZE;
521   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
522   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
523   priv->format = DEFAULT_PROP_FORMAT;
524   priv->block = DEFAULT_PROP_BLOCK;
525   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
526   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
527   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
528   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
529
530   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
531 }
532
533 static void
534 gst_app_src_flush_queued (GstAppSrc * src)
535 {
536   GstBuffer *buf;
537   GstAppSrcPrivate *priv = src->priv;
538
539   while ((buf = g_queue_pop_head (priv->queue)))
540     gst_buffer_unref (buf);
541   priv->queued_bytes = 0;
542 }
543
544 static void
545 gst_app_src_dispose (GObject * obj)
546 {
547   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
548   GstAppSrcPrivate *priv = appsrc->priv;
549
550   if (priv->caps) {
551     gst_caps_unref (priv->caps);
552     priv->caps = NULL;
553   }
554   gst_app_src_flush_queued (appsrc);
555
556   G_OBJECT_CLASS (parent_class)->dispose (obj);
557 }
558
559 static void
560 gst_app_src_finalize (GObject * obj)
561 {
562   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
563   GstAppSrcPrivate *priv = appsrc->priv;
564
565   g_mutex_free (priv->mutex);
566   g_cond_free (priv->cond);
567   g_queue_free (priv->queue);
568
569   G_OBJECT_CLASS (parent_class)->finalize (obj);
570 }
571
572 static GstCaps *
573 gst_app_src_internal_get_caps (GstBaseSrc * bsrc)
574 {
575   return gst_app_src_get_caps (GST_APP_SRC_CAST (bsrc));
576 }
577
578 static void
579 gst_app_src_set_property (GObject * object, guint prop_id,
580     const GValue * value, GParamSpec * pspec)
581 {
582   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
583   GstAppSrcPrivate *priv = appsrc->priv;
584
585   switch (prop_id) {
586     case PROP_CAPS:
587       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
588       break;
589     case PROP_SIZE:
590       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
591       break;
592     case PROP_STREAM_TYPE:
593       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
594       break;
595     case PROP_MAX_BYTES:
596       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
597       break;
598     case PROP_FORMAT:
599       priv->format = g_value_get_enum (value);
600       break;
601     case PROP_BLOCK:
602       priv->block = g_value_get_boolean (value);
603       break;
604     case PROP_IS_LIVE:
605       gst_base_src_set_live (GST_BASE_SRC (appsrc),
606           g_value_get_boolean (value));
607       break;
608     case PROP_MIN_LATENCY:
609       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
610           FALSE, -1);
611       break;
612     case PROP_MAX_LATENCY:
613       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
614           g_value_get_int64 (value));
615       break;
616     case PROP_EMIT_SIGNALS:
617       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
618       break;
619     case PROP_MIN_PERCENT:
620       priv->min_percent = g_value_get_uint (value);
621       break;
622     default:
623       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
624       break;
625   }
626 }
627
628 static void
629 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
630     GParamSpec * pspec)
631 {
632   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
633   GstAppSrcPrivate *priv = appsrc->priv;
634
635   switch (prop_id) {
636     case PROP_CAPS:
637     {
638       GstCaps *caps;
639
640       /* we're missing a _take_caps() function to transfer ownership */
641       caps = gst_app_src_get_caps (appsrc);
642       gst_value_set_caps (value, caps);
643       if (caps)
644         gst_caps_unref (caps);
645       break;
646     }
647     case PROP_SIZE:
648       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
649       break;
650     case PROP_STREAM_TYPE:
651       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
652       break;
653     case PROP_MAX_BYTES:
654       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
655       break;
656     case PROP_FORMAT:
657       g_value_set_enum (value, priv->format);
658       break;
659     case PROP_BLOCK:
660       g_value_set_boolean (value, priv->block);
661       break;
662     case PROP_IS_LIVE:
663       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
664       break;
665     case PROP_MIN_LATENCY:
666     {
667       guint64 min;
668
669       gst_app_src_get_latency (appsrc, &min, NULL);
670       g_value_set_int64 (value, min);
671       break;
672     }
673     case PROP_MAX_LATENCY:
674     {
675       guint64 max;
676
677       gst_app_src_get_latency (appsrc, &max, NULL);
678       g_value_set_int64 (value, max);
679       break;
680     }
681     case PROP_EMIT_SIGNALS:
682       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
683       break;
684     case PROP_MIN_PERCENT:
685       g_value_set_uint (value, priv->min_percent);
686       break;
687     default:
688       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
689       break;
690   }
691 }
692
693 static gboolean
694 gst_app_src_unlock (GstBaseSrc * bsrc)
695 {
696   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
697   GstAppSrcPrivate *priv = appsrc->priv;
698
699   g_mutex_lock (priv->mutex);
700   GST_DEBUG_OBJECT (appsrc, "unlock start");
701   priv->flushing = TRUE;
702   g_cond_broadcast (priv->cond);
703   g_mutex_unlock (priv->mutex);
704
705   return TRUE;
706 }
707
708 static gboolean
709 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
710 {
711   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
712   GstAppSrcPrivate *priv = appsrc->priv;
713
714   g_mutex_lock (priv->mutex);
715   GST_DEBUG_OBJECT (appsrc, "unlock stop");
716   priv->flushing = FALSE;
717   g_cond_broadcast (priv->cond);
718   g_mutex_unlock (priv->mutex);
719
720   return TRUE;
721 }
722
723 static gboolean
724 gst_app_src_start (GstBaseSrc * bsrc)
725 {
726   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
727   GstAppSrcPrivate *priv = appsrc->priv;
728
729   g_mutex_lock (priv->mutex);
730   GST_DEBUG_OBJECT (appsrc, "starting");
731   priv->started = TRUE;
732   /* set the offset to -1 so that we always do a first seek. This is only used
733    * in random-access mode. */
734   priv->offset = -1;
735   priv->flushing = FALSE;
736   g_mutex_unlock (priv->mutex);
737
738   gst_base_src_set_format (bsrc, priv->format);
739
740   return TRUE;
741 }
742
743 static gboolean
744 gst_app_src_stop (GstBaseSrc * bsrc)
745 {
746   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
747   GstAppSrcPrivate *priv = appsrc->priv;
748
749   g_mutex_lock (priv->mutex);
750   GST_DEBUG_OBJECT (appsrc, "stopping");
751   priv->is_eos = FALSE;
752   priv->flushing = TRUE;
753   priv->started = FALSE;
754   gst_app_src_flush_queued (appsrc);
755   g_mutex_unlock (priv->mutex);
756
757   return TRUE;
758 }
759
760 static gboolean
761 gst_app_src_is_seekable (GstBaseSrc * src)
762 {
763   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
764   GstAppSrcPrivate *priv = appsrc->priv;
765   gboolean res = FALSE;
766
767   switch (priv->stream_type) {
768     case GST_APP_STREAM_TYPE_STREAM:
769       break;
770     case GST_APP_STREAM_TYPE_SEEKABLE:
771     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
772       res = TRUE;
773       break;
774   }
775   return res;
776 }
777
778 static gboolean
779 gst_app_src_check_get_range (GstBaseSrc * src)
780 {
781   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
782   GstAppSrcPrivate *priv = appsrc->priv;
783   gboolean res = FALSE;
784
785   switch (priv->stream_type) {
786     case GST_APP_STREAM_TYPE_STREAM:
787     case GST_APP_STREAM_TYPE_SEEKABLE:
788       break;
789     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
790       res = TRUE;
791       break;
792   }
793   return res;
794 }
795
796 static gboolean
797 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
798 {
799   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
800
801   *size = gst_app_src_get_size (appsrc);
802
803   return TRUE;
804 }
805
806 static gboolean
807 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
808 {
809   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
810   GstAppSrcPrivate *priv = appsrc->priv;
811   gboolean res;
812
813   switch (GST_QUERY_TYPE (query)) {
814     case GST_QUERY_LATENCY:
815     {
816       GstClockTime min, max;
817       gboolean live;
818
819       /* Query the parent class for the defaults */
820       res = gst_base_src_query_latency (src, &live, &min, &max);
821
822       /* overwrite with our values when we need to */
823       g_mutex_lock (priv->mutex);
824       if (priv->min_latency != -1)
825         min = priv->min_latency;
826       if (priv->max_latency != -1)
827         max = priv->max_latency;
828       g_mutex_unlock (priv->mutex);
829
830       gst_query_set_latency (query, live, min, max);
831       break;
832     }
833     default:
834       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
835       break;
836   }
837
838   return res;
839 }
840
841 /* will be called in push mode */
842 static gboolean
843 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
844 {
845   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
846   GstAppSrcPrivate *priv = appsrc->priv;
847   gint64 desired_position;
848   gboolean res = FALSE;
849
850   desired_position = segment->last_stop;
851
852   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
853       desired_position, gst_format_get_name (segment->format));
854
855   /* no need to try to seek in streaming mode */
856   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
857     return TRUE;
858
859   if (priv->callbacks.seek_data)
860     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
861   else {
862     gboolean emit;
863
864     g_mutex_lock (priv->mutex);
865     emit = priv->emit_signals;
866     g_mutex_unlock (priv->mutex);
867
868     if (emit)
869       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
870           desired_position, &res);
871   }
872
873   if (res) {
874     GST_DEBUG_OBJECT (appsrc, "flushing queue");
875     gst_app_src_flush_queued (appsrc);
876     priv->is_eos = FALSE;
877   } else {
878     GST_WARNING_OBJECT (appsrc, "seek failed");
879   }
880
881   return res;
882 }
883
884 /* must be called with the appsrc mutex */
885 static gboolean
886 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
887 {
888   gboolean res = FALSE;
889   gboolean emit;
890   GstAppSrcPrivate *priv = appsrc->priv;
891
892   emit = priv->emit_signals;
893   g_mutex_unlock (priv->mutex);
894
895   GST_DEBUG_OBJECT (appsrc,
896       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
897       priv->offset, offset);
898
899   if (priv->callbacks.seek_data)
900     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
901   else if (emit)
902     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
903         offset, &res);
904
905   g_mutex_lock (priv->mutex);
906
907   return res;
908 }
909
910 /* must be called with the appsrc mutex. After this call things can be
911  * flushing */
912 static void
913 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
914 {
915   gboolean emit;
916   GstAppSrcPrivate *priv = appsrc->priv;
917
918   emit = priv->emit_signals;
919   g_mutex_unlock (priv->mutex);
920
921   /* we have no data, we need some. We fire the signal with the size hint. */
922   if (priv->callbacks.need_data)
923     priv->callbacks.need_data (appsrc, size, priv->user_data);
924   else if (emit)
925     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
926         NULL);
927
928   g_mutex_lock (priv->mutex);
929   /* we can be flushing now because we released the lock */
930 }
931
932 static GstFlowReturn
933 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
934     GstBuffer ** buf)
935 {
936   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
937   GstAppSrcPrivate *priv = appsrc->priv;
938   GstFlowReturn ret;
939   GstCaps *caps;
940
941   GST_OBJECT_LOCK (appsrc);
942   caps = priv->caps ? gst_caps_ref (priv->caps) : NULL;
943   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
944           bsrc->segment.format == GST_FORMAT_BYTES)) {
945     GST_DEBUG_OBJECT (appsrc,
946         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
947         bsrc->segment.duration, priv->size);
948     gst_segment_set_duration (&bsrc->segment, GST_FORMAT_BYTES, priv->size);
949     GST_OBJECT_UNLOCK (appsrc);
950
951     gst_element_post_message (GST_ELEMENT (appsrc),
952         gst_message_new_duration (GST_OBJECT (appsrc), GST_FORMAT_BYTES,
953             priv->size));
954   } else {
955     GST_OBJECT_UNLOCK (appsrc);
956   }
957
958   g_mutex_lock (priv->mutex);
959   /* check flushing first */
960   if (G_UNLIKELY (priv->flushing))
961     goto flushing;
962
963   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
964     /* if we are dealing with a random-access stream, issue a seek if the offset
965      * changed. */
966     if (G_UNLIKELY (priv->offset != offset)) {
967       gboolean res;
968
969       /* do the seek */
970       res = gst_app_src_emit_seek (appsrc, offset);
971
972       if (G_UNLIKELY (!res))
973         /* failing to seek is fatal */
974         goto seek_error;
975
976       priv->offset = offset;
977     }
978   }
979
980   while (TRUE) {
981     /* return data as long as we have some */
982     if (!g_queue_is_empty (priv->queue)) {
983       guint buf_size;
984
985       *buf = g_queue_pop_head (priv->queue);
986       buf_size = GST_BUFFER_SIZE (*buf);
987
988       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
989
990       priv->queued_bytes -= buf_size;
991
992       /* only update the offset when in random_access mode */
993       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
994         priv->offset += buf_size;
995       if (caps) {
996         *buf = gst_buffer_make_metadata_writable (*buf);
997         gst_buffer_set_caps (*buf, caps);
998       }
999
1000       /* signal that we removed an item */
1001       g_cond_broadcast (priv->cond);
1002
1003       /* see if we go lower than the empty-percent */
1004       if (priv->min_percent && priv->max_bytes) {
1005         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
1006           /* ignore flushing state, we got a buffer and we will return it now.
1007            * Errors will be handled in the next round */
1008           gst_app_src_emit_need_data (appsrc, size);
1009       }
1010       ret = GST_FLOW_OK;
1011       break;
1012     } else {
1013       gst_app_src_emit_need_data (appsrc, size);
1014
1015       /* we can be flushing now because we released the lock above */
1016       if (G_UNLIKELY (priv->flushing))
1017         goto flushing;
1018
1019       /* if we have a buffer now, continue the loop and try to return it. In
1020        * random-access mode (where a buffer is normally pushed in the above
1021        * signal) we can still be empty because the pushed buffer got flushed or
1022        * when the application pushes the requested buffer later, we support both
1023        * possibilities. */
1024       if (!g_queue_is_empty (priv->queue))
1025         continue;
1026
1027       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1028     }
1029
1030     /* check EOS */
1031     if (G_UNLIKELY (priv->is_eos))
1032       goto eos;
1033
1034     /* nothing to return, wait a while for new data or flushing. */
1035     g_cond_wait (priv->cond, priv->mutex);
1036   }
1037   g_mutex_unlock (priv->mutex);
1038   if (caps)
1039     gst_caps_unref (caps);
1040   return ret;
1041
1042   /* ERRORS */
1043 flushing:
1044   {
1045     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1046     g_mutex_unlock (priv->mutex);
1047     if (caps)
1048       gst_caps_unref (caps);
1049     return GST_FLOW_WRONG_STATE;
1050   }
1051 eos:
1052   {
1053     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1054     g_mutex_unlock (priv->mutex);
1055     if (caps)
1056       gst_caps_unref (caps);
1057     return GST_FLOW_UNEXPECTED;
1058   }
1059 seek_error:
1060   {
1061     g_mutex_unlock (priv->mutex);
1062     if (caps)
1063       gst_caps_unref (caps);
1064     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1065         GST_ERROR_SYSTEM);
1066     return GST_FLOW_ERROR;
1067   }
1068 }
1069
1070 /* external API */
1071
1072 /**
1073  * gst_app_src_set_caps:
1074  * @appsrc: a #GstAppSrc
1075  * @caps: caps to set
1076  *
1077  * Set the capabilities on the appsrc element.  This function takes
1078  * a copy of the caps structure. After calling this method, the source will
1079  * only produce caps that match @caps. @caps must be fixed and the caps on the
1080  * buffers must match the caps or left NULL.
1081  *
1082  * Since: 0.10.22
1083  */
1084 void
1085 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1086 {
1087   GstCaps *old;
1088   GstAppSrcPrivate *priv;
1089
1090   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1091
1092   priv = appsrc->priv;
1093
1094   GST_OBJECT_LOCK (appsrc);
1095   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1096   if ((old = priv->caps) != caps) {
1097     if (caps)
1098       priv->caps = gst_caps_copy (caps);
1099     else
1100       priv->caps = NULL;
1101     if (old)
1102       gst_caps_unref (old);
1103   }
1104   GST_OBJECT_UNLOCK (appsrc);
1105 }
1106
1107 /**
1108  * gst_app_src_get_caps:
1109  * @appsrc: a #GstAppSrc
1110  *
1111  * Get the configured caps on @appsrc.
1112  *
1113  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1114  *
1115  * Since: 0.10.22
1116  */
1117 GstCaps *
1118 gst_app_src_get_caps (GstAppSrc * appsrc)
1119 {
1120   GstCaps *caps;
1121   GstAppSrcPrivate *priv;
1122
1123   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1124
1125   priv = appsrc->priv;
1126
1127   GST_OBJECT_LOCK (appsrc);
1128   if ((caps = priv->caps))
1129     gst_caps_ref (caps);
1130   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1131   GST_OBJECT_UNLOCK (appsrc);
1132
1133   return caps;
1134 }
1135
1136 /**
1137  * gst_app_src_set_size:
1138  * @appsrc: a #GstAppSrc
1139  * @size: the size to set
1140  *
1141  * Set the size of the stream in bytes. A value of -1 means that the size is
1142  * not known.
1143  *
1144  * Since: 0.10.22
1145  */
1146 void
1147 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1148 {
1149   GstAppSrcPrivate *priv;
1150
1151   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1152
1153   priv = appsrc->priv;
1154
1155   GST_OBJECT_LOCK (appsrc);
1156   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1157   priv->size = size;
1158   GST_OBJECT_UNLOCK (appsrc);
1159 }
1160
1161 /**
1162  * gst_app_src_get_size:
1163  * @appsrc: a #GstAppSrc
1164  *
1165  * Get the size of the stream in bytes. A value of -1 means that the size is
1166  * not known.
1167  *
1168  * Returns: the size of the stream previously set with gst_app_src_set_size();
1169  *
1170  * Since: 0.10.22
1171  */
1172 gint64
1173 gst_app_src_get_size (GstAppSrc * appsrc)
1174 {
1175   gint64 size;
1176   GstAppSrcPrivate *priv;
1177
1178   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1179
1180   priv = appsrc->priv;
1181
1182   GST_OBJECT_LOCK (appsrc);
1183   size = priv->size;
1184   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1185   GST_OBJECT_UNLOCK (appsrc);
1186
1187   return size;
1188 }
1189
1190 /**
1191  * gst_app_src_set_stream_type:
1192  * @appsrc: a #GstAppSrc
1193  * @type: the new state
1194  *
1195  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1196  * be connected to.
1197  *
1198  * A stream_type stream
1199  *
1200  * Since: 0.10.22
1201  */
1202 void
1203 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1204 {
1205   GstAppSrcPrivate *priv;
1206
1207   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1208
1209   priv = appsrc->priv;
1210
1211   GST_OBJECT_LOCK (appsrc);
1212   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1213   priv->stream_type = type;
1214   GST_OBJECT_UNLOCK (appsrc);
1215 }
1216
1217 /**
1218  * gst_app_src_get_stream_type:
1219  * @appsrc: a #GstAppSrc
1220  *
1221  * Get the stream type. Control the stream type of @appsrc
1222  * with gst_app_src_set_stream_type().
1223  *
1224  * Returns: the stream type.
1225  *
1226  * Since: 0.10.22
1227  */
1228 GstAppStreamType
1229 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1230 {
1231   gboolean stream_type;
1232   GstAppSrcPrivate *priv;
1233
1234   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1235
1236   priv = appsrc->priv;
1237
1238   GST_OBJECT_LOCK (appsrc);
1239   stream_type = priv->stream_type;
1240   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1241   GST_OBJECT_UNLOCK (appsrc);
1242
1243   return stream_type;
1244 }
1245
1246 /**
1247  * gst_app_src_set_max_bytes:
1248  * @appsrc: a #GstAppSrc
1249  * @max: the maximum number of bytes to queue
1250  *
1251  * Set the maximum amount of bytes that can be queued in @appsrc.
1252  * After the maximum amount of bytes are queued, @appsrc will emit the
1253  * "enough-data" signal.
1254  *
1255  * Since: 0.10.22
1256  */
1257 void
1258 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1259 {
1260   GstAppSrcPrivate *priv;
1261
1262   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1263
1264   priv = appsrc->priv;
1265
1266   g_mutex_lock (priv->mutex);
1267   if (max != priv->max_bytes) {
1268     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1269     priv->max_bytes = max;
1270     /* signal the change */
1271     g_cond_broadcast (priv->cond);
1272   }
1273   g_mutex_unlock (priv->mutex);
1274 }
1275
1276 /**
1277  * gst_app_src_get_max_bytes:
1278  * @appsrc: a #GstAppSrc
1279  *
1280  * Get the maximum amount of bytes that can be queued in @appsrc.
1281  *
1282  * Returns: The maximum amount of bytes that can be queued.
1283  *
1284  * Since: 0.10.22
1285  */
1286 guint64
1287 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1288 {
1289   guint64 result;
1290   GstAppSrcPrivate *priv;
1291
1292   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1293
1294   priv = appsrc->priv;
1295
1296   g_mutex_lock (priv->mutex);
1297   result = priv->max_bytes;
1298   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1299   g_mutex_unlock (priv->mutex);
1300
1301   return result;
1302 }
1303
1304 static void
1305 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1306     gboolean do_max, guint64 max)
1307 {
1308   GstAppSrcPrivate *priv = appsrc->priv;
1309   gboolean changed = FALSE;
1310
1311   g_mutex_lock (priv->mutex);
1312   if (do_min && priv->min_latency != min) {
1313     priv->min_latency = min;
1314     changed = TRUE;
1315   }
1316   if (do_max && priv->max_latency != max) {
1317     priv->max_latency = max;
1318     changed = TRUE;
1319   }
1320   g_mutex_unlock (priv->mutex);
1321
1322   if (changed) {
1323     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1324     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1325         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1326   }
1327 }
1328
1329 /**
1330  * gst_app_src_set_latency:
1331  * @appsrc: a #GstAppSrc
1332  * @min: the min latency
1333  * @max: the min latency
1334  *
1335  * Configure the @min and @max latency in @src. If @min is set to -1, the
1336  * default latency calculations for pseudo-live sources will be used.
1337  *
1338  * Since: 0.10.22
1339  */
1340 void
1341 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1342 {
1343   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1344 }
1345
1346 /**
1347  * gst_app_src_get_latency:
1348  * @appsrc: a #GstAppSrc
1349  * @min: the min latency
1350  * @max: the min latency
1351  *
1352  * Retrieve the min and max latencies in @min and @max respectively.
1353  *
1354  * Since: 0.10.22
1355  */
1356 void
1357 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1358 {
1359   GstAppSrcPrivate *priv;
1360
1361   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1362
1363   priv = appsrc->priv;
1364
1365   g_mutex_lock (priv->mutex);
1366   if (min)
1367     *min = priv->min_latency;
1368   if (max)
1369     *max = priv->max_latency;
1370   g_mutex_unlock (priv->mutex);
1371 }
1372
1373 /**
1374  * gst_app_src_set_emit_signals:
1375  * @appsrc: a #GstAppSrc
1376  * @emit: the new state
1377  *
1378  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1379  * by default disabled because signal emission is expensive and unneeded when
1380  * the application prefers to operate in pull mode.
1381  *
1382  * Since: 0.10.23
1383  */
1384 void
1385 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1386 {
1387   GstAppSrcPrivate *priv;
1388
1389   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1390
1391   priv = appsrc->priv;
1392
1393   g_mutex_lock (priv->mutex);
1394   priv->emit_signals = emit;
1395   g_mutex_unlock (priv->mutex);
1396 }
1397
1398 /**
1399  * gst_app_src_get_emit_signals:
1400  * @appsrc: a #GstAppSrc
1401  *
1402  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1403  *
1404  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
1405  * signals.
1406  *
1407  * Since: 0.10.23
1408  */
1409 gboolean
1410 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1411 {
1412   gboolean result;
1413   GstAppSrcPrivate *priv;
1414
1415   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1416
1417   priv = appsrc->priv;
1418
1419   g_mutex_lock (priv->mutex);
1420   result = priv->emit_signals;
1421   g_mutex_unlock (priv->mutex);
1422
1423   return result;
1424 }
1425
1426 static GstFlowReturn
1427 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1428     gboolean steal_ref)
1429 {
1430   gboolean first = TRUE;
1431   GstAppSrcPrivate *priv;
1432
1433   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1434   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1435
1436   priv = appsrc->priv;
1437
1438   g_mutex_lock (priv->mutex);
1439
1440   while (TRUE) {
1441     /* can't accept buffers when we are flushing or EOS */
1442     if (priv->flushing)
1443       goto flushing;
1444
1445     if (priv->is_eos)
1446       goto eos;
1447
1448     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1449       GST_DEBUG_OBJECT (appsrc,
1450           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1451           priv->queued_bytes, priv->max_bytes);
1452
1453       if (first) {
1454         gboolean emit;
1455
1456         emit = priv->emit_signals;
1457         /* only signal on the first push */
1458         g_mutex_unlock (priv->mutex);
1459
1460         if (priv->callbacks.enough_data)
1461           priv->callbacks.enough_data (appsrc, priv->user_data);
1462         else if (emit)
1463           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1464               NULL);
1465
1466         g_mutex_lock (priv->mutex);
1467         /* continue to check for flushing/eos after releasing the lock */
1468         first = FALSE;
1469         continue;
1470       }
1471       if (priv->block) {
1472         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1473         /* we are filled, wait until a buffer gets popped or when we
1474          * flush. */
1475         g_cond_wait (priv->cond, priv->mutex);
1476       } else {
1477         /* no need to wait for free space, we just pump more data into the
1478          * queue hoping that the caller reacts to the enough-data signal and
1479          * stops pushing buffers. */
1480         break;
1481       }
1482     } else
1483       break;
1484   }
1485
1486   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1487   if (!steal_ref)
1488     gst_buffer_ref (buffer);
1489   g_queue_push_tail (priv->queue, buffer);
1490   priv->queued_bytes += GST_BUFFER_SIZE (buffer);
1491   g_cond_broadcast (priv->cond);
1492   g_mutex_unlock (priv->mutex);
1493
1494   return GST_FLOW_OK;
1495
1496   /* ERRORS */
1497 flushing:
1498   {
1499     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1500     if (steal_ref)
1501       gst_buffer_unref (buffer);
1502     g_mutex_unlock (priv->mutex);
1503     return GST_FLOW_WRONG_STATE;
1504   }
1505 eos:
1506   {
1507     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1508     if (steal_ref)
1509       gst_buffer_unref (buffer);
1510     g_mutex_unlock (priv->mutex);
1511     return GST_FLOW_UNEXPECTED;
1512   }
1513 }
1514
1515 /**
1516  * gst_app_src_push_buffer:
1517  * @appsrc: a #GstAppSrc
1518  * @buffer: a #GstBuffer to push
1519  *
1520  * Adds a buffer to the queue of buffers that the appsrc element will
1521  * push to its source pad.  This function takes ownership of the buffer.
1522  *
1523  * When the block property is TRUE, this function can block until free
1524  * space becomes available in the queue.
1525  *
1526  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1527  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1528  * #GST_FLOW_UNEXPECTED when EOS occured.
1529  *
1530  * Since: 0.10.22
1531  */
1532 GstFlowReturn
1533 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1534 {
1535   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1536 }
1537
1538 /* push a buffer without stealing the ref of the buffer. This is used for the
1539  * action signal. */
1540 static GstFlowReturn
1541 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1542 {
1543   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1544 }
1545
1546 /**
1547  * gst_app_src_end_of_stream:
1548  * @appsrc: a #GstAppSrc
1549  *
1550  * Indicates to the appsrc element that the last buffer queued in the
1551  * element is the last buffer of the stream.
1552  *
1553  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1554  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1555  *
1556  * Since: 0.10.22
1557  */
1558 GstFlowReturn
1559 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1560 {
1561   GstAppSrcPrivate *priv;
1562
1563   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1564
1565   priv = appsrc->priv;
1566
1567   g_mutex_lock (priv->mutex);
1568   /* can't accept buffers when we are flushing. We can accept them when we are
1569    * EOS although it will not do anything. */
1570   if (priv->flushing)
1571     goto flushing;
1572
1573   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1574   priv->is_eos = TRUE;
1575   g_cond_broadcast (priv->cond);
1576   g_mutex_unlock (priv->mutex);
1577
1578   return GST_FLOW_OK;
1579
1580   /* ERRORS */
1581 flushing:
1582   {
1583     g_mutex_unlock (priv->mutex);
1584     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1585     return GST_FLOW_WRONG_STATE;
1586   }
1587 }
1588
1589 /**
1590  * gst_app_src_set_callbacks:
1591  * @appsrc: a #GstAppSrc
1592  * @callbacks: the callbacks
1593  * @user_data: a user_data argument for the callbacks
1594  * @notify: a destroy notify function
1595  *
1596  * Set callbacks which will be executed when data is needed, enough data has
1597  * been collected or when a seek should be performed.
1598  * This is an alternative to using the signals, it has lower overhead and is thus
1599  * less expensive, but also less flexible.
1600  *
1601  * If callbacks are installed, no signals will be emitted for performance
1602  * reasons.
1603  *
1604  * Since: 0.10.23
1605  */
1606 void
1607 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1608     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1609 {
1610   GDestroyNotify old_notify;
1611   GstAppSrcPrivate *priv;
1612
1613   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1614   g_return_if_fail (callbacks != NULL);
1615
1616   priv = appsrc->priv;
1617
1618   GST_OBJECT_LOCK (appsrc);
1619   old_notify = priv->notify;
1620
1621   if (old_notify) {
1622     gpointer old_data;
1623
1624     old_data = priv->user_data;
1625
1626     priv->user_data = NULL;
1627     priv->notify = NULL;
1628     GST_OBJECT_UNLOCK (appsrc);
1629
1630     old_notify (old_data);
1631
1632     GST_OBJECT_LOCK (appsrc);
1633   }
1634   priv->callbacks = *callbacks;
1635   priv->user_data = user_data;
1636   priv->notify = notify;
1637   GST_OBJECT_UNLOCK (appsrc);
1638 }
1639
1640 /*** GSTURIHANDLER INTERFACE *************************************************/
1641
1642 static GstURIType
1643 gst_app_src_uri_get_type (void)
1644 {
1645   return GST_URI_SRC;
1646 }
1647
1648 static gchar **
1649 gst_app_src_uri_get_protocols (void)
1650 {
1651   static gchar *protocols[] = { (char *) "appsrc", NULL };
1652
1653   return protocols;
1654 }
1655
1656 static const gchar *
1657 gst_app_src_uri_get_uri (GstURIHandler * handler)
1658 {
1659   return "appsrc";
1660 }
1661
1662 static gboolean
1663 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1664 {
1665   gchar *protocol;
1666   gboolean ret;
1667
1668   protocol = gst_uri_get_protocol (uri);
1669   ret = !strcmp (protocol, "appsrc");
1670   g_free (protocol);
1671
1672   return ret;
1673 }
1674
1675 static void
1676 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1677 {
1678   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1679
1680   iface->get_type = gst_app_src_uri_get_type;
1681   iface->get_protocols = gst_app_src_uri_get_protocols;
1682   iface->get_uri = gst_app_src_uri_get_uri;
1683   iface->set_uri = gst_app_src_uri_set_uri;
1684 }