app: add callbacks to appsrc, cleanups
[platform/upstream/gstreamer.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-appsrc
23  *
24  * The appsrc element can be used by applications to insert data into a
25  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
26  * external API functions.
27  *
28  * For the documentation of the API, please see the
29  * <link linkend="gst-plugins-base-libs-appsrc">libgstapp</link> section in the
30  * GStreamer Plugins Base Libraries documentation.
31  * 
32  * Since: 0.10.22
33  */
34
35 /**
36  * SECTION:gstappsrc
37  * @see_also: #GstBaseSrc, appsink
38  *
39  * The appsrc element can be used by applications to insert data into a
40  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
41  * external API functions.
42  *
43  * appsrc can be used by linking to the gstappsrc.h header file to access the
44  * methods or by using the appsrc action signals. For the API
45  * documentation, see the documentation for libgstapp in the
46  * GStreamer Base Plugins Library reference...
47  *
48  * Before operating appsrc, the caps property must be set to a fixed caps
49  * describing the format of the data that will be pushed with appsrc.
50  *
51  * The main way of handing data to the appsrc element is by calling the
52  * gst_app_src_push_buffer() method or by emiting the push-buffer action signal.
53  * This will put the buffer onto a queue from which appsrc will read from in its
54  * streaming thread. It is important to note that data transport will not happen
55  * from the thread that performed the push-buffer call.
56  *
57  * The "max-bytes" property controls how much data can be queued in appsrc
58  * before appsrc considers the queue full. A filled internal queue will always
59  * signal the "enough-data" signal, which signals the application that it should
60  * stop pushing data into appsrc. The "block" property will cause appsrc to
61  * block the push-buffer method until free data becomes available again.
62  *
63  * When the internal queue is running out of data, the "need-data" signal is
64  * emited, which signals the application that it should start pushing more data
65  * into appsrc.
66  *
67  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
68  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
69  * "random-access". The signal argument will contain the new desired position in
70  * the stream expressed in the unit set with the "format" property. After
71  * receiving the seek-data signal, the application should push-buffers from the
72  * new position.
73  *
74  * These signals allow the application to operate the appsrc in two different
75  * ways:
76  *
77  * The push model, in which the application repeadedly calls the push-buffer method
78  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
79  * with the enough-data and need-data signals by respectively stopping/starting
80  * the push-buffer calls. This is a typical mode of operation for the
81  * stream-type "stream" and "seekable". Use this model when implementing various
82  * network protocols or hardware devices.
83  *
84  * The pull model where the need-data signal triggers the next push-buffer call.
85  * This mode is typically used in the "random-access" stream-type. Use this
86  * model for file access or other randomly accessable sources. In this mode, a
87  * buffer of exactly the amount of bytes given by the need-data signal should be
88  * pushed into appsrc.
89  *
90  * In all modes, the size property on appsrc should contain the total stream
91  * size in bytes. Setting this property is mandatory in the random-access mode.
92  * For the stream and seekable modes, setting this property is optional but
93  * recommended.
94  *
95  * When the application is finished pushing data into appsrc, it should call 
96  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
97  * this call, no more buffers can be pushed into appsrc until a flushing seek
98  * happened or the state of the appsrc has gone through READY.
99  *
100  * Last reviewed on 2008-12-17 (0.10.10)
101  *
102  * Since: 0.10.22
103  */
104
105 #ifdef HAVE_CONFIG_H
106 #include "config.h"
107 #endif
108
109 #include <gst/gst.h>
110 #include <gst/base/gstbasesrc.h>
111
112 #include <string.h>
113
114 #include "gstapp-marshal.h"
115 #include "gstappsrc.h"
116
117 struct _GstAppSrcPrivate
118 {
119   GCond *cond;
120   GMutex *mutex;
121   GQueue *queue;
122
123   GstCaps *caps;
124   gint64 size;
125   GstAppStreamType stream_type;
126   guint64 max_bytes;
127   GstFormat format;
128   gboolean block;
129
130   gboolean flushing;
131   gboolean started;
132   gboolean is_eos;
133   guint64 queued_bytes;
134   guint64 offset;
135   GstAppStreamType current_type;
136
137   guint64 min_latency;
138   guint64 max_latency;
139   gboolean emit_signals;
140
141   GstAppSrcCallbacks callbacks;
142   gpointer user_data;
143   GDestroyNotify notify;
144 };
145
146 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
147 #define GST_CAT_DEFAULT app_src_debug
148
149 enum
150 {
151   /* signals */
152   SIGNAL_NEED_DATA,
153   SIGNAL_ENOUGH_DATA,
154   SIGNAL_SEEK_DATA,
155
156   /* actions */
157   SIGNAL_PUSH_BUFFER,
158   SIGNAL_END_OF_STREAM,
159
160   LAST_SIGNAL
161 };
162
163 #define DEFAULT_PROP_SIZE          -1
164 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
165 #define DEFAULT_PROP_MAX_BYTES     200000
166 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
167 #define DEFAULT_PROP_BLOCK         FALSE
168 #define DEFAULT_PROP_IS_LIVE       FALSE
169 #define DEFAULT_PROP_MIN_LATENCY   -1
170 #define DEFAULT_PROP_MAX_LATENCY   -1
171 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
172
173 enum
174 {
175   PROP_0,
176   PROP_CAPS,
177   PROP_SIZE,
178   PROP_STREAM_TYPE,
179   PROP_MAX_BYTES,
180   PROP_FORMAT,
181   PROP_BLOCK,
182   PROP_IS_LIVE,
183   PROP_MIN_LATENCY,
184   PROP_MAX_LATENCY,
185   PROP_EMIT_SIGNALS,
186   PROP_LAST
187 };
188
189 static GstStaticPadTemplate gst_app_src_template =
190 GST_STATIC_PAD_TEMPLATE ("src",
191     GST_PAD_SRC,
192     GST_PAD_ALWAYS,
193     GST_STATIC_CAPS_ANY);
194
195
196 #define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
197 static GType
198 stream_type_get_type (void)
199 {
200   static GType stream_type_type = 0;
201   static const GEnumValue stream_type[] = {
202     {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
203     {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
204     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
205     {0, NULL, NULL},
206   };
207
208   if (!stream_type_type) {
209     stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
210   }
211   return stream_type_type;
212 }
213
214 static void gst_app_src_uri_handler_init (gpointer g_iface,
215     gpointer iface_data);
216
217 static void gst_app_src_dispose (GObject * object);
218 static void gst_app_src_finalize (GObject * object);
219
220 static void gst_app_src_set_property (GObject * object, guint prop_id,
221     const GValue * value, GParamSpec * pspec);
222 static void gst_app_src_get_property (GObject * object, guint prop_id,
223     GValue * value, GParamSpec * pspec);
224
225 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
226     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
227
228 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
229     guint64 offset, guint size, GstBuffer ** buf);
230 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
231 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
232 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
233 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
234 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
235 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
236 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
237 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
238 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
239
240 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
241     GstBuffer * buffer);
242
243 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
244
245 static void
246 _do_init (GType filesrc_type)
247 {
248   static const GInterfaceInfo urihandler_info = {
249     gst_app_src_uri_handler_init,
250     NULL,
251     NULL
252   };
253   g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
254       &urihandler_info);
255 }
256
257 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
258     _do_init);
259
260 static void
261 gst_app_src_base_init (gpointer g_class)
262 {
263   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
264
265   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
266
267   gst_element_class_set_details_simple (element_class, "AppSrc",
268       "Generic/Src", "Allow the application to feed buffers to a pipeline",
269       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
270
271   gst_element_class_add_pad_template (element_class,
272       gst_static_pad_template_get (&gst_app_src_template));
273 }
274
275 static void
276 gst_app_src_class_init (GstAppSrcClass * klass)
277 {
278   GObjectClass *gobject_class = (GObjectClass *) klass;
279   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
280
281   gobject_class->dispose = gst_app_src_dispose;
282   gobject_class->finalize = gst_app_src_finalize;
283
284   gobject_class->set_property = gst_app_src_set_property;
285   gobject_class->get_property = gst_app_src_get_property;
286
287   /**
288    * GstAppSrc::caps
289    *
290    * The GstCaps that will negotiated downstream and will be put
291    * on outgoing buffers.
292    */
293   g_object_class_install_property (gobject_class, PROP_CAPS,
294       g_param_spec_boxed ("caps", "Caps",
295           "The allowed caps for the src pad", GST_TYPE_CAPS,
296           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297   /**
298    * GstAppSrc::format
299    *
300    * The format to use for segment events. When the source is producing
301    * timestamped buffers this property should be set to GST_FORMAT_TIME.
302    */
303   g_object_class_install_property (gobject_class, PROP_FORMAT,
304       g_param_spec_enum ("format", "Format",
305           "The format of the segment events and seek", GST_TYPE_FORMAT,
306           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
307   /**
308    * GstAppSrc::size
309    *
310    * The total size in bytes of the data stream. If the total size is known, it
311    * is recommended to configure it with this property.
312    */
313   g_object_class_install_property (gobject_class, PROP_SIZE,
314       g_param_spec_int64 ("size", "Size",
315           "The size of the data stream in bytes (-1 if unknown)",
316           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
317           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
318   /**
319    * GstAppSrc::stream-type
320    *
321    * The type of stream that this source is producing.  For seekable streams the
322    * application should connect to the seek-data signal.
323    */
324   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
325       g_param_spec_enum ("stream-type", "Stream Type",
326           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
327           DEFAULT_PROP_STREAM_TYPE,
328           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
329   /**
330    * GstAppSrc::max-bytes
331    *
332    * The maximum amount of bytes that can be queued internally.
333    * After the maximum amount of bytes are queued, appsrc will emit the
334    * "enough-data" signal.
335    */
336   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
337       g_param_spec_uint64 ("max-bytes", "Max bytes",
338           "The maximum number of bytes to queue internally (0 = unlimited)",
339           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
340           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
341   /**
342    * GstAppSrc::block
343    *
344    * When max-bytes are queued and after the enough-data signal has been emited,
345    * block any further push-buffer calls until the amount of queued bytes drops
346    * below the max-bytes limit.
347    */
348   g_object_class_install_property (gobject_class, PROP_BLOCK,
349       g_param_spec_boolean ("block", "Block",
350           "Block push-buffer when max-bytes are queued",
351           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352
353   /**
354    * GstAppSrc::is-live
355    *
356    * Instruct the source to behave like a live source. This includes that it
357    * will only push out buffers in the PLAYING state.
358    */
359   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
360       g_param_spec_boolean ("is-live", "Is Live",
361           "Whether to act as a live source",
362           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   /**
364    * GstAppSrc::min-latency
365    *
366    * The minimum latency of the source. A value of -1 will use the default
367    * latency calculations of #GstBaseSrc.
368    */
369   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
370       g_param_spec_int64 ("min-latency", "Min Latency",
371           "The minimum latency (-1 = default)",
372           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   /**
375    * GstAppSrc::max-latency
376    *
377    * The maximum latency of the source. A value of -1 means an unlimited amout
378    * of latency.
379    */
380   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
381       g_param_spec_int64 ("max-latency", "Max Latency",
382           "The maximum latency (-1 = unlimited)",
383           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
384           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
385
386   /**
387    * GstAppSrc::emit-signals
388    *
389    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
390    * This option is by default enabled for backwards compatibility reasons but
391    * can disabled when needed because signal emission is expensive.
392    *
393    * Since: 0.10.23
394    */
395   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
396       g_param_spec_boolean ("emit-signals", "Emit signals",
397           "Emit new-preroll and new-buffer signals", DEFAULT_PROP_EMIT_SIGNALS,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400   /**
401    * GstAppSrc::need-data:
402    * @appsrc: the appsrc element that emited the signal
403    * @length: the amount of bytes needed.
404    *
405    * Signal that the source needs more data. In the callback or from another
406    * thread you should call push-buffer or end-of-stream.
407    *
408    * @length is just a hint and when it is set to -1, any number of bytes can be
409    * pushed into @appsrc.
410    *
411    * You can call push-buffer multiple times until the enough-data signal is
412    * fired.
413    */
414   gst_app_src_signals[SIGNAL_NEED_DATA] =
415       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
416       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
417       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
418
419   /**
420    * GstAppSrc::enough-data:
421    * @appsrc: the appsrc element that emited the signal
422    *
423    * Signal that the source has enough data. It is recommended that the
424    * application stops calling push-buffer until the need-data signal is
425    * emited again to avoid excessive buffer queueing.
426    */
427   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
428       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
429       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
430       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
431
432   /**
433    * GstAppSrc::seek-data:
434    * @appsrc: the appsrc element that emited the signal
435    * @offset: the offset to seek to
436    *
437    * Seek to the given offset. The next push-buffer should produce buffers from
438    * the new @offset.
439    * This callback is only called for seekable stream types.
440    *
441    * Returns: %TRUE if the seek succeeded.
442    */
443   gst_app_src_signals[SIGNAL_SEEK_DATA] =
444       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
445       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
446       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
447       G_TYPE_UINT64);
448
449    /**
450     * GstAppSrc::push-buffer:
451     * @appsrc: the appsrc
452     * @buffer: a buffer to push
453     *
454     * Adds a buffer to the queue of buffers that the appsrc element will
455     * push to its source pad. This function does not take ownership of the
456     * buffer so the buffer needs to be unreffed after calling this function.
457     *
458     * When the block property is TRUE, this function can block until free space
459     * becomes available in the queue.
460     */
461   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
462       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
463       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
464           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
465       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
466
467    /**
468     * GstAppSrc::end-of-stream:
469     * @appsrc: the appsrc
470     *
471     * Notify @appsrc that no more buffer are available. 
472     */
473   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
474       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
475       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
476           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
477       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
478
479   basesrc_class->create = gst_app_src_create;
480   basesrc_class->start = gst_app_src_start;
481   basesrc_class->stop = gst_app_src_stop;
482   basesrc_class->unlock = gst_app_src_unlock;
483   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
484   basesrc_class->do_seek = gst_app_src_do_seek;
485   basesrc_class->is_seekable = gst_app_src_is_seekable;
486   basesrc_class->check_get_range = gst_app_src_check_get_range;
487   basesrc_class->get_size = gst_app_src_do_get_size;
488   basesrc_class->get_size = gst_app_src_do_get_size;
489   basesrc_class->query = gst_app_src_query;
490
491   klass->push_buffer = gst_app_src_push_buffer_action;
492   klass->end_of_stream = gst_app_src_end_of_stream;
493
494   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
495 }
496
497 static void
498 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
499 {
500   appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
501       GstAppSrcPrivate);
502
503   appsrc->priv->mutex = g_mutex_new ();
504   appsrc->priv->cond = g_cond_new ();
505   appsrc->priv->queue = g_queue_new ();
506
507   appsrc->priv->size = DEFAULT_PROP_SIZE;
508   appsrc->priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
509   appsrc->priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
510   appsrc->priv->format = DEFAULT_PROP_FORMAT;
511   appsrc->priv->block = DEFAULT_PROP_BLOCK;
512   appsrc->priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
513   appsrc->priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
514   appsrc->priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
515
516   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
517 }
518
519 static void
520 gst_app_src_flush_queued (GstAppSrc * src)
521 {
522   GstBuffer *buf;
523
524   while ((buf = g_queue_pop_head (src->priv->queue)))
525     gst_buffer_unref (buf);
526   src->priv->queued_bytes = 0;
527 }
528
529 static void
530 gst_app_src_dispose (GObject * obj)
531 {
532   GstAppSrc *appsrc = GST_APP_SRC (obj);
533
534   if (appsrc->priv->caps) {
535     gst_caps_unref (appsrc->priv->caps);
536     appsrc->priv->caps = NULL;
537   }
538   gst_app_src_flush_queued (appsrc);
539
540   G_OBJECT_CLASS (parent_class)->dispose (obj);
541 }
542
543 static void
544 gst_app_src_finalize (GObject * obj)
545 {
546   GstAppSrc *appsrc = GST_APP_SRC (obj);
547
548   g_mutex_free (appsrc->priv->mutex);
549   g_cond_free (appsrc->priv->cond);
550   g_queue_free (appsrc->priv->queue);
551
552   G_OBJECT_CLASS (parent_class)->finalize (obj);
553 }
554
555 static void
556 gst_app_src_set_property (GObject * object, guint prop_id,
557     const GValue * value, GParamSpec * pspec)
558 {
559   GstAppSrc *appsrc = GST_APP_SRC (object);
560
561   switch (prop_id) {
562     case PROP_CAPS:
563       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
564       break;
565     case PROP_SIZE:
566       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
567       break;
568     case PROP_STREAM_TYPE:
569       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
570       break;
571     case PROP_MAX_BYTES:
572       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
573       break;
574     case PROP_FORMAT:
575       appsrc->priv->format = g_value_get_enum (value);
576       break;
577     case PROP_BLOCK:
578       appsrc->priv->block = g_value_get_boolean (value);
579       break;
580     case PROP_IS_LIVE:
581       gst_base_src_set_live (GST_BASE_SRC (appsrc),
582           g_value_get_boolean (value));
583       break;
584     case PROP_MIN_LATENCY:
585       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
586           FALSE, -1);
587       break;
588     case PROP_MAX_LATENCY:
589       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
590           g_value_get_int64 (value));
591       break;
592     case PROP_EMIT_SIGNALS:
593       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
594       break;
595     default:
596       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
597       break;
598   }
599 }
600
601 static void
602 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
603     GParamSpec * pspec)
604 {
605   GstAppSrc *appsrc = GST_APP_SRC (object);
606
607   switch (prop_id) {
608     case PROP_CAPS:
609     {
610       GstCaps *caps;
611
612       /* we're missing a _take_caps() function to transfer ownership */
613       caps = gst_app_src_get_caps (appsrc);
614       gst_value_set_caps (value, caps);
615       if (caps)
616         gst_caps_unref (caps);
617       break;
618     }
619     case PROP_SIZE:
620       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
621       break;
622     case PROP_STREAM_TYPE:
623       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
624       break;
625     case PROP_MAX_BYTES:
626       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
627       break;
628     case PROP_FORMAT:
629       g_value_set_enum (value, appsrc->priv->format);
630       break;
631     case PROP_BLOCK:
632       g_value_set_boolean (value, appsrc->priv->block);
633       break;
634     case PROP_IS_LIVE:
635       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
636       break;
637     case PROP_MIN_LATENCY:
638     {
639       guint64 min;
640
641       gst_app_src_get_latency (appsrc, &min, NULL);
642       g_value_set_int64 (value, min);
643       break;
644     }
645     case PROP_MAX_LATENCY:
646     {
647       guint64 max;
648
649       gst_app_src_get_latency (appsrc, &max, NULL);
650       g_value_set_int64 (value, max);
651       break;
652     }
653     case PROP_EMIT_SIGNALS:
654       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
655       break;
656     default:
657       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
658       break;
659   }
660 }
661
662 static gboolean
663 gst_app_src_unlock (GstBaseSrc * bsrc)
664 {
665   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
666
667   g_mutex_lock (appsrc->priv->mutex);
668   GST_DEBUG_OBJECT (appsrc, "unlock start");
669   appsrc->priv->flushing = TRUE;
670   g_cond_broadcast (appsrc->priv->cond);
671   g_mutex_unlock (appsrc->priv->mutex);
672
673   return TRUE;
674 }
675
676 static gboolean
677 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
678 {
679   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
680
681   g_mutex_lock (appsrc->priv->mutex);
682   GST_DEBUG_OBJECT (appsrc, "unlock stop");
683   appsrc->priv->flushing = FALSE;
684   g_cond_broadcast (appsrc->priv->cond);
685   g_mutex_unlock (appsrc->priv->mutex);
686
687   return TRUE;
688 }
689
690 static gboolean
691 gst_app_src_start (GstBaseSrc * bsrc)
692 {
693   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
694
695   g_mutex_lock (appsrc->priv->mutex);
696   GST_DEBUG_OBJECT (appsrc, "starting");
697   appsrc->priv->started = TRUE;
698   /* set the offset to -1 so that we always do a first seek. This is only used
699    * in random-access mode. */
700   appsrc->priv->offset = -1;
701   appsrc->priv->flushing = FALSE;
702   g_mutex_unlock (appsrc->priv->mutex);
703
704   gst_base_src_set_format (bsrc, appsrc->priv->format);
705
706   return TRUE;
707 }
708
709 static gboolean
710 gst_app_src_stop (GstBaseSrc * bsrc)
711 {
712   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
713
714   g_mutex_lock (appsrc->priv->mutex);
715   GST_DEBUG_OBJECT (appsrc, "stopping");
716   appsrc->priv->is_eos = FALSE;
717   appsrc->priv->flushing = TRUE;
718   appsrc->priv->started = FALSE;
719   gst_app_src_flush_queued (appsrc);
720   g_mutex_unlock (appsrc->priv->mutex);
721
722   return TRUE;
723 }
724
725 static gboolean
726 gst_app_src_is_seekable (GstBaseSrc * src)
727 {
728   GstAppSrc *appsrc = GST_APP_SRC (src);
729   gboolean res = FALSE;
730
731   switch (appsrc->priv->stream_type) {
732     case GST_APP_STREAM_TYPE_STREAM:
733       break;
734     case GST_APP_STREAM_TYPE_SEEKABLE:
735     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
736       res = TRUE;
737       break;
738   }
739   return res;
740 }
741
742 static gboolean
743 gst_app_src_check_get_range (GstBaseSrc * src)
744 {
745   GstAppSrc *appsrc = GST_APP_SRC (src);
746   gboolean res = FALSE;
747
748   switch (appsrc->priv->stream_type) {
749     case GST_APP_STREAM_TYPE_STREAM:
750     case GST_APP_STREAM_TYPE_SEEKABLE:
751       break;
752     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
753       res = TRUE;
754       break;
755   }
756   return res;
757 }
758
759 static gboolean
760 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
761 {
762   GstAppSrc *appsrc = GST_APP_SRC (src);
763
764   *size = gst_app_src_get_size (appsrc);
765
766   return TRUE;
767 }
768
769 static gboolean
770 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
771 {
772   GstAppSrc *appsrc = GST_APP_SRC (src);
773   gboolean res;
774
775   switch (GST_QUERY_TYPE (query)) {
776     case GST_QUERY_LATENCY:
777     {
778       GstClockTime min, max;
779       gboolean live;
780
781       /* Query the parent class for the defaults */
782       res = gst_base_src_query_latency (src, &live, &min, &max);
783
784       /* overwrite with our values when we need to */
785       g_mutex_lock (appsrc->priv->mutex);
786       if (appsrc->priv->min_latency != -1)
787         min = appsrc->priv->min_latency;
788       if (appsrc->priv->max_latency != -1)
789         max = appsrc->priv->max_latency;
790       g_mutex_unlock (appsrc->priv->mutex);
791
792       gst_query_set_latency (query, live, min, max);
793       break;
794     }
795     default:
796       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
797       break;
798   }
799
800   return res;
801 }
802
803 /* will be called in push mode */
804 static gboolean
805 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
806 {
807   GstAppSrc *appsrc = GST_APP_SRC (src);
808   gint64 desired_position;
809   gboolean res = FALSE;
810
811   desired_position = segment->last_stop;
812
813   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
814       desired_position, gst_format_get_name (segment->format));
815
816   /* no need to try to seek in streaming mode */
817   if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
818     return TRUE;
819
820   if (appsrc->priv->callbacks.seek_data)
821     res = appsrc->priv->callbacks.seek_data (appsrc, desired_position,
822         appsrc->priv->user_data);
823   else {
824     gboolean emit;
825
826     g_mutex_lock (appsrc->priv->mutex);
827     emit = appsrc->priv->emit_signals;
828     g_mutex_unlock (appsrc->priv->mutex);
829
830     if (emit)
831       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
832           desired_position, &res);
833   }
834
835   if (res) {
836     GST_DEBUG_OBJECT (appsrc, "flushing queue");
837     gst_app_src_flush_queued (appsrc);
838   } else {
839     GST_WARNING_OBJECT (appsrc, "seek failed");
840   }
841
842   return res;
843 }
844
845 static GstFlowReturn
846 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
847     GstBuffer ** buf)
848 {
849   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
850   GstFlowReturn ret;
851
852   g_mutex_lock (appsrc->priv->mutex);
853   /* check flushing first */
854   if (G_UNLIKELY (appsrc->priv->flushing))
855     goto flushing;
856
857   if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
858     /* if we are dealing with a random-access stream, issue a seek if the offset
859      * changed. */
860     if (G_UNLIKELY (appsrc->priv->offset != offset)) {
861       gboolean res;
862       gboolean emit;
863
864       emit = appsrc->priv->emit_signals;
865       g_mutex_unlock (appsrc->priv->mutex);
866
867       GST_DEBUG_OBJECT (appsrc,
868           "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
869           appsrc->priv->offset, offset);
870
871       if (appsrc->priv->callbacks.seek_data)
872         res = appsrc->priv->callbacks.seek_data (appsrc, offset,
873             appsrc->priv->user_data);
874       else if (emit)
875         g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
876             offset, &res);
877
878       if (G_UNLIKELY (!res))
879         /* failing to seek is fatal */
880         goto seek_error;
881
882       g_mutex_lock (appsrc->priv->mutex);
883
884       appsrc->priv->offset = offset;
885     }
886   }
887
888   while (TRUE) {
889     /* return data as long as we have some */
890     if (!g_queue_is_empty (appsrc->priv->queue)) {
891       guint buf_size;
892
893       *buf = g_queue_pop_head (appsrc->priv->queue);
894       buf_size = GST_BUFFER_SIZE (*buf);
895
896       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
897
898       appsrc->priv->queued_bytes -= buf_size;
899
900       /* only update the offset when in random_access mode */
901       if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
902         appsrc->priv->offset += buf_size;
903       }
904
905       gst_buffer_set_caps (*buf, appsrc->priv->caps);
906
907       /* signal that we removed an item */
908       g_cond_broadcast (appsrc->priv->cond);
909
910       ret = GST_FLOW_OK;
911       break;
912     } else {
913       gboolean emit;
914
915       emit = appsrc->priv->emit_signals;
916       g_mutex_unlock (appsrc->priv->mutex);
917
918       /* we have no data, we need some. We fire the signal with the size hint. */
919       if (appsrc->priv->callbacks.need_data)
920         appsrc->priv->callbacks.need_data (appsrc, size,
921             appsrc->priv->user_data);
922       else if (emit)
923         g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
924             NULL);
925
926       g_mutex_lock (appsrc->priv->mutex);
927       /* we can be flushing now because we released the lock */
928       if (G_UNLIKELY (appsrc->priv->flushing))
929         goto flushing;
930
931       /* if we have a buffer now, continue the loop and try to return it. In
932        * random-access mode (where a buffer is normally pushed in the above
933        * signal) we can still be empty because the pushed buffer got flushed or
934        * when the application pushes the requested buffer later, we support both
935        * possiblities. */
936       if (!g_queue_is_empty (appsrc->priv->queue))
937         continue;
938
939       /* no buffer yet, maybe we are EOS, if not, block for more data. */
940     }
941
942     /* check EOS */
943     if (G_UNLIKELY (appsrc->priv->is_eos))
944       goto eos;
945
946     /* nothing to return, wait a while for new data or flushing. */
947     g_cond_wait (appsrc->priv->cond, appsrc->priv->mutex);
948   }
949   g_mutex_unlock (appsrc->priv->mutex);
950
951   return ret;
952
953   /* ERRORS */
954 flushing:
955   {
956     GST_DEBUG_OBJECT (appsrc, "we are flushing");
957     g_mutex_unlock (appsrc->priv->mutex);
958     return GST_FLOW_WRONG_STATE;
959   }
960 eos:
961   {
962     GST_DEBUG_OBJECT (appsrc, "we are EOS");
963     g_mutex_unlock (appsrc->priv->mutex);
964     return GST_FLOW_UNEXPECTED;
965   }
966 seek_error:
967   {
968     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
969         GST_ERROR_SYSTEM);
970     return GST_FLOW_ERROR;
971   }
972 }
973
974 /* external API */
975
976 /**
977  * gst_app_src_set_caps:
978  * @appsrc: a #GstAppSrc
979  * @caps: caps to set
980  *
981  * Set the capabilities on the appsrc element.  This function takes
982  * a copy of the caps structure. After calling this method, the source will
983  * only produce caps that match @caps. @caps must be fixed and the caps on the
984  * buffers must match the caps or left NULL.
985  * 
986  * Since: 0.10.22
987  */
988 void
989 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
990 {
991   GstCaps *old;
992
993   g_return_if_fail (GST_IS_APP_SRC (appsrc));
994
995   GST_OBJECT_LOCK (appsrc);
996   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
997   if ((old = appsrc->priv->caps) != caps) {
998     if (caps)
999       appsrc->priv->caps = gst_caps_copy (caps);
1000     else
1001       appsrc->priv->caps = NULL;
1002     if (old)
1003       gst_caps_unref (old);
1004   }
1005   GST_OBJECT_UNLOCK (appsrc);
1006 }
1007
1008 /**
1009  * gst_app_src_get_caps:
1010  * @appsrc: a #GstAppSrc
1011  *
1012  * Get the configured caps on @appsrc.
1013  *
1014  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1015  * 
1016  * Since: 0.10.22
1017  */
1018 GstCaps *
1019 gst_app_src_get_caps (GstAppSrc * appsrc)
1020 {
1021   GstCaps *caps;
1022
1023   g_return_val_if_fail (appsrc != NULL, NULL);
1024   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1025
1026   GST_OBJECT_LOCK (appsrc);
1027   if ((caps = appsrc->priv->caps))
1028     gst_caps_ref (caps);
1029   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1030   GST_OBJECT_UNLOCK (appsrc);
1031
1032   return caps;
1033 }
1034
1035 /**
1036  * gst_app_src_set_size:
1037  * @appsrc: a #GstAppSrc
1038  * @size: the size to set
1039  *
1040  * Set the size of the stream in bytes. A value of -1 means that the size is
1041  * not known. 
1042  * 
1043  * Since: 0.10.22
1044  */
1045 void
1046 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1047 {
1048   g_return_if_fail (appsrc != NULL);
1049   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1050
1051   GST_OBJECT_LOCK (appsrc);
1052   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1053   appsrc->priv->size = size;
1054   GST_OBJECT_UNLOCK (appsrc);
1055 }
1056
1057 /**
1058  * gst_app_src_get_size:
1059  * @appsrc: a #GstAppSrc
1060  *
1061  * Get the size of the stream in bytes. A value of -1 means that the size is
1062  * not known. 
1063  *
1064  * Returns: the size of the stream previously set with gst_app_src_set_size();
1065  * 
1066  * Since: 0.10.22
1067  */
1068 gint64
1069 gst_app_src_get_size (GstAppSrc * appsrc)
1070 {
1071   gint64 size;
1072
1073   g_return_val_if_fail (appsrc != NULL, -1);
1074   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1075
1076   GST_OBJECT_LOCK (appsrc);
1077   size = appsrc->priv->size;
1078   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1079   GST_OBJECT_UNLOCK (appsrc);
1080
1081   return size;
1082 }
1083
1084 /**
1085  * gst_app_src_set_stream_type:
1086  * @appsrc: a #GstAppSrc
1087  * @type: the new state
1088  *
1089  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1090  * be connected to.
1091  *
1092  * A stream_type stream 
1093  * 
1094  * Since: 0.10.22
1095  */
1096 void
1097 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1098 {
1099   g_return_if_fail (appsrc != NULL);
1100   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1101
1102   GST_OBJECT_LOCK (appsrc);
1103   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1104   appsrc->priv->stream_type = type;
1105   GST_OBJECT_UNLOCK (appsrc);
1106 }
1107
1108 /**
1109  * gst_app_src_get_stream_type:
1110  * @appsrc: a #GstAppSrc
1111  *
1112  * Get the stream type. Control the stream type of @appsrc
1113  * with gst_app_src_set_stream_type().
1114  *
1115  * Returns: the stream type.
1116  * 
1117  * Since: 0.10.22
1118  */
1119 GstAppStreamType
1120 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1121 {
1122   gboolean stream_type;
1123
1124   g_return_val_if_fail (appsrc != NULL, FALSE);
1125   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1126
1127   GST_OBJECT_LOCK (appsrc);
1128   stream_type = appsrc->priv->stream_type;
1129   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1130   GST_OBJECT_UNLOCK (appsrc);
1131
1132   return stream_type;
1133 }
1134
1135 /**
1136  * gst_app_src_set_max_bytes:
1137  * @appsrc: a #GstAppSrc
1138  * @max: the maximum number of bytes to queue
1139  *
1140  * Set the maximum amount of bytes that can be queued in @appsrc.
1141  * After the maximum amount of bytes are queued, @appsrc will emit the
1142  * "enough-data" signal.
1143  * 
1144  * Since: 0.10.22
1145  */
1146 void
1147 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1148 {
1149   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1150
1151   g_mutex_lock (appsrc->priv->mutex);
1152   if (max != appsrc->priv->max_bytes) {
1153     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1154     appsrc->priv->max_bytes = max;
1155     /* signal the change */
1156     g_cond_broadcast (appsrc->priv->cond);
1157   }
1158   g_mutex_unlock (appsrc->priv->mutex);
1159 }
1160
1161 /**
1162  * gst_app_src_get_max_bytes:
1163  * @appsrc: a #GstAppSrc
1164  *
1165  * Get the maximum amount of bytes that can be queued in @appsrc.
1166  *
1167  * Returns: The maximum amount of bytes that can be queued.
1168  * 
1169  * Since: 0.10.22
1170  */
1171 guint64
1172 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1173 {
1174   guint64 result;
1175
1176   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1177
1178   g_mutex_lock (appsrc->priv->mutex);
1179   result = appsrc->priv->max_bytes;
1180   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1181   g_mutex_unlock (appsrc->priv->mutex);
1182
1183   return result;
1184 }
1185
1186 static void
1187 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1188     gboolean do_max, guint64 max)
1189 {
1190   gboolean changed = FALSE;
1191
1192   g_mutex_lock (appsrc->priv->mutex);
1193   if (do_min && appsrc->priv->min_latency != min) {
1194     appsrc->priv->min_latency = min;
1195     changed = TRUE;
1196   }
1197   if (do_max && appsrc->priv->max_latency != max) {
1198     appsrc->priv->max_latency = max;
1199     changed = TRUE;
1200   }
1201   g_mutex_unlock (appsrc->priv->mutex);
1202
1203   if (changed) {
1204     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1205     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1206         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1207   }
1208 }
1209
1210 /**
1211  * gst_app_src_set_latency:
1212  * @appsrc: a #GstAppSrc
1213  * @min: the min latency
1214  * @max: the min latency
1215  *
1216  * Configure the @min and @max latency in @src. If @min is set to -1, the
1217  * default latency calculations for pseudo-live sources will be used.
1218  * 
1219  * Since: 0.10.22
1220  */
1221 void
1222 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1223 {
1224   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1225 }
1226
1227 /**
1228  * gst_app_src_get_latency:
1229  * @appsrc: a #GstAppSrc
1230  * @min: the min latency
1231  * @max: the min latency
1232  *
1233  * Retrieve the min and max latencies in @min and @max respectively.
1234  * 
1235  * Since: 0.10.22
1236  */
1237 void
1238 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1239 {
1240   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1241
1242   g_mutex_lock (appsrc->priv->mutex);
1243   if (min)
1244     *min = appsrc->priv->min_latency;
1245   if (max)
1246     *max = appsrc->priv->max_latency;
1247   g_mutex_unlock (appsrc->priv->mutex);
1248 }
1249
1250 /**
1251  * gst_app_src_set_emit_signals:
1252  * @appsrc: a #GstAppSrc
1253  * @emit: the new state
1254  *
1255  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1256  * by default disabled because signal emission is expensive and unneeded when
1257  * the application prefers to operate in pull mode.
1258  *
1259  * Since: 0.10.23
1260  */
1261 void
1262 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1263 {
1264   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1265
1266   g_mutex_lock (appsrc->priv->mutex);
1267   appsrc->priv->emit_signals = emit;
1268   g_mutex_unlock (appsrc->priv->mutex);
1269 }
1270
1271 /**
1272  * gst_app_src_get_emit_signals:
1273  * @appsrc: a #GstAppSrc
1274  *
1275  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1276  *
1277  * Returns: %TRUE if @appsrc is emiting the "new-preroll" and "new-buffer"
1278  * signals.
1279  *
1280  * Since: 0.10.23
1281  */
1282 gboolean
1283 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1284 {
1285   gboolean result;
1286
1287   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1288
1289   g_mutex_lock (appsrc->priv->mutex);
1290   result = appsrc->priv->emit_signals;
1291   g_mutex_unlock (appsrc->priv->mutex);
1292
1293   return result;
1294 }
1295
1296 static GstFlowReturn
1297 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1298     gboolean steal_ref)
1299 {
1300   gboolean first = TRUE;
1301
1302   g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
1303   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1304   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1305
1306   g_mutex_lock (appsrc->priv->mutex);
1307
1308   while (TRUE) {
1309     /* can't accept buffers when we are flushing or EOS */
1310     if (appsrc->priv->flushing)
1311       goto flushing;
1312
1313     if (appsrc->priv->is_eos)
1314       goto eos;
1315
1316     if (appsrc->priv->max_bytes
1317         && appsrc->priv->queued_bytes >= appsrc->priv->max_bytes) {
1318       GST_DEBUG_OBJECT (appsrc,
1319           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1320           appsrc->priv->queued_bytes, appsrc->priv->max_bytes);
1321
1322       if (first) {
1323         gboolean emit;
1324
1325         emit = appsrc->priv->emit_signals;
1326         /* only signal on the first push */
1327         g_mutex_unlock (appsrc->priv->mutex);
1328
1329         if (appsrc->priv->callbacks.enough_data)
1330           appsrc->priv->callbacks.enough_data (appsrc, appsrc->priv->user_data);
1331         else if (emit)
1332           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1333               NULL);
1334
1335         g_mutex_lock (appsrc->priv->mutex);
1336         /* continue to check for flushing/eos after releasing the lock */
1337         first = FALSE;
1338         continue;
1339       }
1340       if (appsrc->priv->block) {
1341         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1342         /* we are filled, wait until a buffer gets popped or when we
1343          * flush. */
1344         g_cond_wait (appsrc->priv->cond, appsrc->priv->mutex);
1345       } else {
1346         /* no need to wait for free space, we just pump more data into the
1347          * queue hoping that the caller reacts to the enough-data signal and
1348          * stops pushing buffers. */
1349         break;
1350       }
1351     } else
1352       break;
1353   }
1354
1355   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1356   if (!steal_ref)
1357     gst_buffer_ref (buffer);
1358   g_queue_push_tail (appsrc->priv->queue, buffer);
1359   appsrc->priv->queued_bytes += GST_BUFFER_SIZE (buffer);
1360   g_cond_broadcast (appsrc->priv->cond);
1361   g_mutex_unlock (appsrc->priv->mutex);
1362
1363   return GST_FLOW_OK;
1364
1365   /* ERRORS */
1366 flushing:
1367   {
1368     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1369     if (steal_ref)
1370       gst_buffer_unref (buffer);
1371     g_mutex_unlock (appsrc->priv->mutex);
1372     return GST_FLOW_WRONG_STATE;
1373   }
1374 eos:
1375   {
1376     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1377     if (steal_ref)
1378       gst_buffer_unref (buffer);
1379     g_mutex_unlock (appsrc->priv->mutex);
1380     return GST_FLOW_UNEXPECTED;
1381   }
1382 }
1383
1384 /**
1385  * gst_app_src_push_buffer:
1386  * @appsrc: a #GstAppSrc
1387  * @buffer: a #GstBuffer to push
1388  *
1389  * Adds a buffer to the queue of buffers that the appsrc element will
1390  * push to its source pad.  This function takes ownership of the buffer.
1391  *
1392  * When the block property is TRUE, this function can block until free
1393  * space becomes available in the queue.
1394  *
1395  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1396  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1397  * #GST_FLOW_UNEXPECTED when EOS occured.
1398  * 
1399  * Since: 0.10.22
1400  */
1401 GstFlowReturn
1402 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1403 {
1404   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1405 }
1406
1407 /* push a buffer without stealing the ref of the buffer. This is used for the
1408  * action signal. */
1409 static GstFlowReturn
1410 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1411 {
1412   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1413 }
1414
1415 /**
1416  * gst_app_src_end_of_stream:
1417  * @appsrc: a #GstAppSrc
1418  *
1419  * Indicates to the appsrc element that the last buffer queued in the
1420  * element is the last buffer of the stream.
1421  *
1422  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1423  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1424  * 
1425  * Since: 0.10.22
1426  */
1427 GstFlowReturn
1428 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1429 {
1430   g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
1431   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1432
1433   g_mutex_lock (appsrc->priv->mutex);
1434   /* can't accept buffers when we are flushing. We can accept them when we are 
1435    * EOS although it will not do anything. */
1436   if (appsrc->priv->flushing)
1437     goto flushing;
1438
1439   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1440   appsrc->priv->is_eos = TRUE;
1441   g_cond_broadcast (appsrc->priv->cond);
1442   g_mutex_unlock (appsrc->priv->mutex);
1443
1444   return GST_FLOW_OK;
1445
1446   /* ERRORS */
1447 flushing:
1448   {
1449     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1450     return GST_FLOW_WRONG_STATE;
1451   }
1452 }
1453
1454 /**
1455  * gst_app_src_set_callbacks:
1456  * @appsrc: a #GstAppSrc
1457  * @callbacks: the callbacks
1458  * @user_data: a user_data argument for the callbacks
1459  * @notify: a destroy notify function
1460  *
1461  * Set callbacks which will be executed when data is needed, enough data has
1462  * been collected or when a seek should be performed.
1463  * This is an alternative to using the signals, it has lower overhead and is thus
1464  * less expensive, but also less flexible.
1465  *
1466  * If callbacks are installed, no signals will be emited for performance
1467  * reasons.
1468  *
1469  * Since: 0.10.23
1470  */
1471 void
1472 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1473     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1474 {
1475   GDestroyNotify old_notify;
1476
1477   g_return_if_fail (appsrc != NULL);
1478   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1479   g_return_if_fail (callbacks != NULL);
1480
1481   GST_OBJECT_LOCK (appsrc);
1482   old_notify = appsrc->priv->notify;
1483
1484   if (old_notify) {
1485     gpointer old_data;
1486
1487     old_data = appsrc->priv->user_data;
1488
1489     appsrc->priv->user_data = NULL;
1490     appsrc->priv->notify = NULL;
1491     GST_OBJECT_UNLOCK (appsrc);
1492
1493     old_notify (old_data);
1494
1495     GST_OBJECT_LOCK (appsrc);
1496   }
1497   appsrc->priv->callbacks = *callbacks;
1498   appsrc->priv->user_data = user_data;
1499   appsrc->priv->notify = notify;
1500   GST_OBJECT_UNLOCK (appsrc);
1501 }
1502
1503 /*** GSTURIHANDLER INTERFACE *************************************************/
1504
1505 static GstURIType
1506 gst_app_src_uri_get_type (void)
1507 {
1508   return GST_URI_SRC;
1509 }
1510
1511 static gchar **
1512 gst_app_src_uri_get_protocols (void)
1513 {
1514   static gchar *protocols[] = { "appsrc", NULL };
1515
1516   return protocols;
1517 }
1518
1519 static const gchar *
1520 gst_app_src_uri_get_uri (GstURIHandler * handler)
1521 {
1522   return "appsrc";
1523 }
1524
1525 static gboolean
1526 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1527 {
1528   gchar *protocol;
1529   gboolean ret;
1530
1531   protocol = gst_uri_get_protocol (uri);
1532   ret = !strcmp (protocol, "appsrc");
1533   g_free (protocol);
1534
1535   return ret;
1536 }
1537
1538 static void
1539 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1540 {
1541   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1542
1543   iface->get_type = gst_app_src_uri_get_type;
1544   iface->get_protocols = gst_app_src_uri_get_protocols;
1545   iface->get_uri = gst_app_src_uri_get_uri;
1546   iface->set_uri = gst_app_src_uri_set_uri;
1547 }