Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @short_description: Easy way for applications to inject buffers into a
23  *     pipeline
24  * @see_also: #GstBaseSrc, appsink
25  *
26  * The appsrc element can be used by applications to insert data into a
27  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
28  * external API functions.
29  *
30  * appsrc can be used by linking with the libgstapp library to access the
31  * methods directly or by using the appsrc action signals.
32  *
33  * Before operating appsrc, the caps property must be set to a fixed caps
34  * describing the format of the data that will be pushed with appsrc. An
35  * exception to this is when pushing buffers with unknown caps, in which case no
36  * caps should be set. This is typically true of file-like sources that push raw
37  * byte buffers.
38  *
39  * The main way of handing data to the appsrc element is by calling the
40  * gst_app_src_push_buffer() method or by emiting the push-buffer action signal.
41  * This will put the buffer onto a queue from which appsrc will read from in its
42  * streaming thread. It is important to note that data transport will not happen
43  * from the thread that performed the push-buffer call.
44  *
45  * The "max-bytes" property controls how much data can be queued in appsrc
46  * before appsrc considers the queue full. A filled internal queue will always
47  * signal the "enough-data" signal, which signals the application that it should
48  * stop pushing data into appsrc. The "block" property will cause appsrc to
49  * block the push-buffer method until free data becomes available again.
50  *
51  * When the internal queue is running out of data, the "need-data" signal is
52  * emited, which signals the application that it should start pushing more data
53  * into appsrc.
54  *
55  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
56  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
57  * "random-access". The signal argument will contain the new desired position in
58  * the stream expressed in the unit set with the "format" property. After
59  * receiving the seek-data signal, the application should push-buffers from the
60  * new position.
61  *
62  * These signals allow the application to operate the appsrc in two different
63  * ways:
64  *
65  * The push model, in which the application repeadedly calls the push-buffer method
66  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
67  * with the enough-data and need-data signals by respectively stopping/starting
68  * the push-buffer calls. This is a typical mode of operation for the
69  * stream-type "stream" and "seekable". Use this model when implementing various
70  * network protocols or hardware devices.
71  *
72  * The pull model where the need-data signal triggers the next push-buffer call.
73  * This mode is typically used in the "random-access" stream-type. Use this
74  * model for file access or other randomly accessable sources. In this mode, a
75  * buffer of exactly the amount of bytes given by the need-data signal should be
76  * pushed into appsrc.
77  *
78  * In all modes, the size property on appsrc should contain the total stream
79  * size in bytes. Setting this property is mandatory in the random-access mode.
80  * For the stream and seekable modes, setting this property is optional but
81  * recommended.
82  *
83  * When the application is finished pushing data into appsrc, it should call
84  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
85  * this call, no more buffers can be pushed into appsrc until a flushing seek
86  * happened or the state of the appsrc has gone through READY.
87  *
88  * Last reviewed on 2008-12-17 (0.10.10)
89  *
90  * Since: 0.10.22
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/base/gstbasesrc.h>
99
100 #include <string.h>
101
102 #include "gstapp-marshal.h"
103 #include "gstappsrc.h"
104
105 struct _GstAppSrcPrivate
106 {
107   GCond *cond;
108   GMutex *mutex;
109   GQueue *queue;
110
111   GstCaps *caps;
112   gint64 size;
113   GstAppStreamType stream_type;
114   guint64 max_bytes;
115   GstFormat format;
116   gboolean block;
117
118   gboolean flushing;
119   gboolean started;
120   gboolean is_eos;
121   guint64 queued_bytes;
122   guint64 offset;
123   GstAppStreamType current_type;
124
125   guint64 min_latency;
126   guint64 max_latency;
127   gboolean emit_signals;
128   guint min_percent;
129
130   GstAppSrcCallbacks callbacks;
131   gpointer user_data;
132   GDestroyNotify notify;
133 };
134
135 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
136 #define GST_CAT_DEFAULT app_src_debug
137
138 enum
139 {
140   /* signals */
141   SIGNAL_NEED_DATA,
142   SIGNAL_ENOUGH_DATA,
143   SIGNAL_SEEK_DATA,
144
145   /* actions */
146   SIGNAL_PUSH_BUFFER,
147   SIGNAL_END_OF_STREAM,
148
149   LAST_SIGNAL
150 };
151
152 #define DEFAULT_PROP_SIZE          -1
153 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
154 #define DEFAULT_PROP_MAX_BYTES     200000
155 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
156 #define DEFAULT_PROP_BLOCK         FALSE
157 #define DEFAULT_PROP_IS_LIVE       FALSE
158 #define DEFAULT_PROP_MIN_LATENCY   -1
159 #define DEFAULT_PROP_MAX_LATENCY   -1
160 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
161 #define DEFAULT_PROP_MIN_PERCENT   0
162
163 enum
164 {
165   PROP_0,
166   PROP_CAPS,
167   PROP_SIZE,
168   PROP_STREAM_TYPE,
169   PROP_MAX_BYTES,
170   PROP_FORMAT,
171   PROP_BLOCK,
172   PROP_IS_LIVE,
173   PROP_MIN_LATENCY,
174   PROP_MAX_LATENCY,
175   PROP_EMIT_SIGNALS,
176   PROP_MIN_PERCENT,
177   PROP_LAST
178 };
179
180 static GstStaticPadTemplate gst_app_src_template =
181 GST_STATIC_PAD_TEMPLATE ("src",
182     GST_PAD_SRC,
183     GST_PAD_ALWAYS,
184     GST_STATIC_CAPS_ANY);
185
186 GType
187 gst_app_stream_type_get_type (void)
188 {
189   static volatile gsize stream_type_type = 0;
190   static const GEnumValue stream_type[] = {
191     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
192     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
193     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
194         "random-access"},
195     {0, NULL, NULL}
196   };
197
198   if (g_once_init_enter (&stream_type_type)) {
199     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
200     g_once_init_leave (&stream_type_type, tmp);
201   }
202
203   return (GType) stream_type_type;
204 }
205
206 static void gst_app_src_uri_handler_init (gpointer g_iface,
207     gpointer iface_data);
208
209 static void gst_app_src_dispose (GObject * object);
210 static void gst_app_src_finalize (GObject * object);
211
212 static void gst_app_src_set_property (GObject * object, guint prop_id,
213     const GValue * value, GParamSpec * pspec);
214 static void gst_app_src_get_property (GObject * object, guint prop_id,
215     GValue * value, GParamSpec * pspec);
216
217 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
218     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
219
220 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
221     guint64 offset, guint size, GstBuffer ** buf);
222 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
223 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
224 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
225 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
227 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
228 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
229 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
230 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
231
232 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
233     GstBuffer * buffer);
234
235 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
236
237 #define gst_app_src_parent_class parent_class
238 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
239     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
240
241 static void
242 gst_app_src_class_init (GstAppSrcClass * klass)
243 {
244   GObjectClass *gobject_class = (GObjectClass *) klass;
245   GstElementClass *element_class = (GstElementClass *) klass;
246   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
247
248   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
249
250   gobject_class->dispose = gst_app_src_dispose;
251   gobject_class->finalize = gst_app_src_finalize;
252
253   gobject_class->set_property = gst_app_src_set_property;
254   gobject_class->get_property = gst_app_src_get_property;
255
256   /**
257    * GstAppSrc::caps
258    *
259    * The GstCaps that will negotiated downstream and will be put
260    * on outgoing buffers.
261    */
262   g_object_class_install_property (gobject_class, PROP_CAPS,
263       g_param_spec_boxed ("caps", "Caps",
264           "The allowed caps for the src pad", GST_TYPE_CAPS,
265           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   /**
267    * GstAppSrc::format
268    *
269    * The format to use for segment events. When the source is producing
270    * timestamped buffers this property should be set to GST_FORMAT_TIME.
271    */
272   g_object_class_install_property (gobject_class, PROP_FORMAT,
273       g_param_spec_enum ("format", "Format",
274           "The format of the segment events and seek", GST_TYPE_FORMAT,
275           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276   /**
277    * GstAppSrc::size
278    *
279    * The total size in bytes of the data stream. If the total size is known, it
280    * is recommended to configure it with this property.
281    */
282   g_object_class_install_property (gobject_class, PROP_SIZE,
283       g_param_spec_int64 ("size", "Size",
284           "The size of the data stream in bytes (-1 if unknown)",
285           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
286           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
287   /**
288    * GstAppSrc::stream-type
289    *
290    * The type of stream that this source is producing.  For seekable streams the
291    * application should connect to the seek-data signal.
292    */
293   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
294       g_param_spec_enum ("stream-type", "Stream Type",
295           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
296           DEFAULT_PROP_STREAM_TYPE,
297           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298   /**
299    * GstAppSrc::max-bytes
300    *
301    * The maximum amount of bytes that can be queued internally.
302    * After the maximum amount of bytes are queued, appsrc will emit the
303    * "enough-data" signal.
304    */
305   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
306       g_param_spec_uint64 ("max-bytes", "Max bytes",
307           "The maximum number of bytes to queue internally (0 = unlimited)",
308           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
309           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310   /**
311    * GstAppSrc::block
312    *
313    * When max-bytes are queued and after the enough-data signal has been emited,
314    * block any further push-buffer calls until the amount of queued bytes drops
315    * below the max-bytes limit.
316    */
317   g_object_class_install_property (gobject_class, PROP_BLOCK,
318       g_param_spec_boolean ("block", "Block",
319           "Block push-buffer when max-bytes are queued",
320           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321
322   /**
323    * GstAppSrc::is-live
324    *
325    * Instruct the source to behave like a live source. This includes that it
326    * will only push out buffers in the PLAYING state.
327    */
328   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
329       g_param_spec_boolean ("is-live", "Is Live",
330           "Whether to act as a live source",
331           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
332   /**
333    * GstAppSrc::min-latency
334    *
335    * The minimum latency of the source. A value of -1 will use the default
336    * latency calculations of #GstBaseSrc.
337    */
338   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
339       g_param_spec_int64 ("min-latency", "Min Latency",
340           "The minimum latency (-1 = default)",
341           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
342           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
343   /**
344    * GstAppSrc::max-latency
345    *
346    * The maximum latency of the source. A value of -1 means an unlimited amout
347    * of latency.
348    */
349   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
350       g_param_spec_int64 ("max-latency", "Max Latency",
351           "The maximum latency (-1 = unlimited)",
352           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
353           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
354
355   /**
356    * GstAppSrc::emit-signals
357    *
358    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
359    * This option is by default enabled for backwards compatibility reasons but
360    * can disabled when needed because signal emission is expensive.
361    *
362    * Since: 0.10.23
363    */
364   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
365       g_param_spec_boolean ("emit-signals", "Emit signals",
366           "Emit need-data, enough-data and seek-data signals",
367           DEFAULT_PROP_EMIT_SIGNALS,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369
370   /**
371    * GstAppSrc::empty-percent
372    *
373    * Make appsrc emit the "need-data" signal when the amount of bytes in the
374    * queue drops below this percentage of max-bytes.
375    *
376    * Since: 0.10.27
377    */
378   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
379       g_param_spec_uint ("min-percent", "Min Percent",
380           "Emit need-data when queued bytes drops below this percent of max-bytes",
381           0, 100, DEFAULT_PROP_MIN_PERCENT,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383
384   /**
385    * GstAppSrc::need-data:
386    * @appsrc: the appsrc element that emited the signal
387    * @length: the amount of bytes needed.
388    *
389    * Signal that the source needs more data. In the callback or from another
390    * thread you should call push-buffer or end-of-stream.
391    *
392    * @length is just a hint and when it is set to -1, any number of bytes can be
393    * pushed into @appsrc.
394    *
395    * You can call push-buffer multiple times until the enough-data signal is
396    * fired.
397    */
398   gst_app_src_signals[SIGNAL_NEED_DATA] =
399       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
400       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
401       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
402
403   /**
404    * GstAppSrc::enough-data:
405    * @appsrc: the appsrc element that emited the signal
406    *
407    * Signal that the source has enough data. It is recommended that the
408    * application stops calling push-buffer until the need-data signal is
409    * emited again to avoid excessive buffer queueing.
410    */
411   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
412       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
413       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
414       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
415
416   /**
417    * GstAppSrc::seek-data:
418    * @appsrc: the appsrc element that emited the signal
419    * @offset: the offset to seek to
420    *
421    * Seek to the given offset. The next push-buffer should produce buffers from
422    * the new @offset.
423    * This callback is only called for seekable stream types.
424    *
425    * Returns: %TRUE if the seek succeeded.
426    */
427   gst_app_src_signals[SIGNAL_SEEK_DATA] =
428       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
429       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
430       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
431       G_TYPE_UINT64);
432
433    /**
434     * GstAppSrc::push-buffer:
435     * @appsrc: the appsrc
436     * @buffer: a buffer to push
437     *
438     * Adds a buffer to the queue of buffers that the appsrc element will
439     * push to its source pad. This function does not take ownership of the
440     * buffer so the buffer needs to be unreffed after calling this function.
441     *
442     * When the block property is TRUE, this function can block until free space
443     * becomes available in the queue.
444     */
445   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
446       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
447       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
448           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
449       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
450
451    /**
452     * GstAppSrc::end-of-stream:
453     * @appsrc: the appsrc
454     *
455     * Notify @appsrc that no more buffer are available.
456     */
457   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
458       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
459       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
460           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
461       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
462
463   gst_element_class_set_details_simple (element_class, "AppSrc",
464       "Generic/Source", "Allow the application to feed buffers to a pipeline",
465       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
466
467   gst_element_class_add_pad_template (element_class,
468       gst_static_pad_template_get (&gst_app_src_template));
469
470   basesrc_class->create = gst_app_src_create;
471   basesrc_class->start = gst_app_src_start;
472   basesrc_class->stop = gst_app_src_stop;
473   basesrc_class->unlock = gst_app_src_unlock;
474   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
475   basesrc_class->do_seek = gst_app_src_do_seek;
476   basesrc_class->is_seekable = gst_app_src_is_seekable;
477   basesrc_class->check_get_range = gst_app_src_check_get_range;
478   basesrc_class->get_size = gst_app_src_do_get_size;
479   basesrc_class->get_size = gst_app_src_do_get_size;
480   basesrc_class->query = gst_app_src_query;
481
482   klass->push_buffer = gst_app_src_push_buffer_action;
483   klass->end_of_stream = gst_app_src_end_of_stream;
484
485   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
486 }
487
488 static void
489 gst_app_src_init (GstAppSrc * appsrc)
490 {
491   GstAppSrcPrivate *priv;
492
493   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
494       GstAppSrcPrivate);
495
496   priv->mutex = g_mutex_new ();
497   priv->cond = g_cond_new ();
498   priv->queue = g_queue_new ();
499
500   priv->size = DEFAULT_PROP_SIZE;
501   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
502   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
503   priv->format = DEFAULT_PROP_FORMAT;
504   priv->block = DEFAULT_PROP_BLOCK;
505   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
506   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
507   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
508   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
509
510   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
511 }
512
513 static void
514 gst_app_src_flush_queued (GstAppSrc * src)
515 {
516   GstBuffer *buf;
517   GstAppSrcPrivate *priv = src->priv;
518
519   while ((buf = g_queue_pop_head (priv->queue)))
520     gst_buffer_unref (buf);
521   priv->queued_bytes = 0;
522 }
523
524 static void
525 gst_app_src_dispose (GObject * obj)
526 {
527   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
528   GstAppSrcPrivate *priv = appsrc->priv;
529
530   if (priv->caps) {
531     gst_caps_unref (priv->caps);
532     priv->caps = NULL;
533   }
534   gst_app_src_flush_queued (appsrc);
535
536   G_OBJECT_CLASS (parent_class)->dispose (obj);
537 }
538
539 static void
540 gst_app_src_finalize (GObject * obj)
541 {
542   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
543   GstAppSrcPrivate *priv = appsrc->priv;
544
545   g_mutex_free (priv->mutex);
546   g_cond_free (priv->cond);
547   g_queue_free (priv->queue);
548
549   G_OBJECT_CLASS (parent_class)->finalize (obj);
550 }
551
552 static void
553 gst_app_src_set_property (GObject * object, guint prop_id,
554     const GValue * value, GParamSpec * pspec)
555 {
556   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
557   GstAppSrcPrivate *priv = appsrc->priv;
558
559   switch (prop_id) {
560     case PROP_CAPS:
561       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
562       break;
563     case PROP_SIZE:
564       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
565       break;
566     case PROP_STREAM_TYPE:
567       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
568       break;
569     case PROP_MAX_BYTES:
570       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
571       break;
572     case PROP_FORMAT:
573       priv->format = g_value_get_enum (value);
574       break;
575     case PROP_BLOCK:
576       priv->block = g_value_get_boolean (value);
577       break;
578     case PROP_IS_LIVE:
579       gst_base_src_set_live (GST_BASE_SRC (appsrc),
580           g_value_get_boolean (value));
581       break;
582     case PROP_MIN_LATENCY:
583       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
584           FALSE, -1);
585       break;
586     case PROP_MAX_LATENCY:
587       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
588           g_value_get_int64 (value));
589       break;
590     case PROP_EMIT_SIGNALS:
591       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
592       break;
593     case PROP_MIN_PERCENT:
594       priv->min_percent = g_value_get_uint (value);
595       break;
596     default:
597       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
598       break;
599   }
600 }
601
602 static void
603 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
604     GParamSpec * pspec)
605 {
606   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
607   GstAppSrcPrivate *priv = appsrc->priv;
608
609   switch (prop_id) {
610     case PROP_CAPS:
611     {
612       GstCaps *caps;
613
614       /* we're missing a _take_caps() function to transfer ownership */
615       caps = gst_app_src_get_caps (appsrc);
616       gst_value_set_caps (value, caps);
617       if (caps)
618         gst_caps_unref (caps);
619       break;
620     }
621     case PROP_SIZE:
622       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
623       break;
624     case PROP_STREAM_TYPE:
625       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
626       break;
627     case PROP_MAX_BYTES:
628       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
629       break;
630     case PROP_FORMAT:
631       g_value_set_enum (value, priv->format);
632       break;
633     case PROP_BLOCK:
634       g_value_set_boolean (value, priv->block);
635       break;
636     case PROP_IS_LIVE:
637       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
638       break;
639     case PROP_MIN_LATENCY:
640     {
641       guint64 min;
642
643       gst_app_src_get_latency (appsrc, &min, NULL);
644       g_value_set_int64 (value, min);
645       break;
646     }
647     case PROP_MAX_LATENCY:
648     {
649       guint64 max;
650
651       gst_app_src_get_latency (appsrc, &max, NULL);
652       g_value_set_int64 (value, max);
653       break;
654     }
655     case PROP_EMIT_SIGNALS:
656       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
657       break;
658     case PROP_MIN_PERCENT:
659       g_value_set_uint (value, priv->min_percent);
660       break;
661     default:
662       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
663       break;
664   }
665 }
666
667 static gboolean
668 gst_app_src_unlock (GstBaseSrc * bsrc)
669 {
670   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
671   GstAppSrcPrivate *priv = appsrc->priv;
672
673   g_mutex_lock (priv->mutex);
674   GST_DEBUG_OBJECT (appsrc, "unlock start");
675   priv->flushing = TRUE;
676   g_cond_broadcast (priv->cond);
677   g_mutex_unlock (priv->mutex);
678
679   return TRUE;
680 }
681
682 static gboolean
683 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
684 {
685   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
686   GstAppSrcPrivate *priv = appsrc->priv;
687
688   g_mutex_lock (priv->mutex);
689   GST_DEBUG_OBJECT (appsrc, "unlock stop");
690   priv->flushing = FALSE;
691   g_cond_broadcast (priv->cond);
692   g_mutex_unlock (priv->mutex);
693
694   return TRUE;
695 }
696
697 static gboolean
698 gst_app_src_start (GstBaseSrc * bsrc)
699 {
700   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
701   GstAppSrcPrivate *priv = appsrc->priv;
702
703   g_mutex_lock (priv->mutex);
704   GST_DEBUG_OBJECT (appsrc, "starting");
705   priv->started = TRUE;
706   /* set the offset to -1 so that we always do a first seek. This is only used
707    * in random-access mode. */
708   priv->offset = -1;
709   priv->flushing = FALSE;
710   g_mutex_unlock (priv->mutex);
711
712   gst_base_src_set_format (bsrc, priv->format);
713
714   return TRUE;
715 }
716
717 static gboolean
718 gst_app_src_stop (GstBaseSrc * bsrc)
719 {
720   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
721   GstAppSrcPrivate *priv = appsrc->priv;
722
723   g_mutex_lock (priv->mutex);
724   GST_DEBUG_OBJECT (appsrc, "stopping");
725   priv->is_eos = FALSE;
726   priv->flushing = TRUE;
727   priv->started = FALSE;
728   gst_app_src_flush_queued (appsrc);
729   g_mutex_unlock (priv->mutex);
730
731   return TRUE;
732 }
733
734 static gboolean
735 gst_app_src_is_seekable (GstBaseSrc * src)
736 {
737   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
738   GstAppSrcPrivate *priv = appsrc->priv;
739   gboolean res = FALSE;
740
741   switch (priv->stream_type) {
742     case GST_APP_STREAM_TYPE_STREAM:
743       break;
744     case GST_APP_STREAM_TYPE_SEEKABLE:
745     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
746       res = TRUE;
747       break;
748   }
749   return res;
750 }
751
752 static gboolean
753 gst_app_src_check_get_range (GstBaseSrc * src)
754 {
755   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
756   GstAppSrcPrivate *priv = appsrc->priv;
757   gboolean res = FALSE;
758
759   switch (priv->stream_type) {
760     case GST_APP_STREAM_TYPE_STREAM:
761     case GST_APP_STREAM_TYPE_SEEKABLE:
762       break;
763     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
764       res = TRUE;
765       break;
766   }
767   return res;
768 }
769
770 static gboolean
771 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
772 {
773   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
774
775   *size = gst_app_src_get_size (appsrc);
776
777   return TRUE;
778 }
779
780 static gboolean
781 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
782 {
783   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
784   GstAppSrcPrivate *priv = appsrc->priv;
785   gboolean res;
786
787   switch (GST_QUERY_TYPE (query)) {
788     case GST_QUERY_LATENCY:
789     {
790       GstClockTime min, max;
791       gboolean live;
792
793       /* Query the parent class for the defaults */
794       res = gst_base_src_query_latency (src, &live, &min, &max);
795
796       /* overwrite with our values when we need to */
797       g_mutex_lock (priv->mutex);
798       if (priv->min_latency != -1)
799         min = priv->min_latency;
800       if (priv->max_latency != -1)
801         max = priv->max_latency;
802       g_mutex_unlock (priv->mutex);
803
804       gst_query_set_latency (query, live, min, max);
805       break;
806     }
807     default:
808       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
809       break;
810   }
811
812   return res;
813 }
814
815 /* will be called in push mode */
816 static gboolean
817 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
818 {
819   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
820   GstAppSrcPrivate *priv = appsrc->priv;
821   gint64 desired_position;
822   gboolean res = FALSE;
823
824   desired_position = segment->position;
825
826   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
827       desired_position, gst_format_get_name (segment->format));
828
829   /* no need to try to seek in streaming mode */
830   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
831     return TRUE;
832
833   if (priv->callbacks.seek_data)
834     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
835   else {
836     gboolean emit;
837
838     g_mutex_lock (priv->mutex);
839     emit = priv->emit_signals;
840     g_mutex_unlock (priv->mutex);
841
842     if (emit)
843       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
844           desired_position, &res);
845   }
846
847   if (res) {
848     GST_DEBUG_OBJECT (appsrc, "flushing queue");
849     gst_app_src_flush_queued (appsrc);
850     priv->is_eos = FALSE;
851   } else {
852     GST_WARNING_OBJECT (appsrc, "seek failed");
853   }
854
855   return res;
856 }
857
858 /* must be called with the appsrc mutex */
859 static gboolean
860 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
861 {
862   gboolean res = FALSE;
863   gboolean emit;
864   GstAppSrcPrivate *priv = appsrc->priv;
865
866   emit = priv->emit_signals;
867   g_mutex_unlock (priv->mutex);
868
869   GST_DEBUG_OBJECT (appsrc,
870       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
871       priv->offset, offset);
872
873   if (priv->callbacks.seek_data)
874     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
875   else if (emit)
876     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
877         offset, &res);
878
879   g_mutex_lock (priv->mutex);
880
881   return res;
882 }
883
884 /* must be called with the appsrc mutex. After this call things can be
885  * flushing */
886 static void
887 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
888 {
889   gboolean emit;
890   GstAppSrcPrivate *priv = appsrc->priv;
891
892   emit = priv->emit_signals;
893   g_mutex_unlock (priv->mutex);
894
895   /* we have no data, we need some. We fire the signal with the size hint. */
896   if (priv->callbacks.need_data)
897     priv->callbacks.need_data (appsrc, size, priv->user_data);
898   else if (emit)
899     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
900         NULL);
901
902   g_mutex_lock (priv->mutex);
903   /* we can be flushing now because we released the lock */
904 }
905
906 static GstFlowReturn
907 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
908     GstBuffer ** buf)
909 {
910   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
911   GstAppSrcPrivate *priv = appsrc->priv;
912   GstFlowReturn ret;
913   GstCaps *caps;
914
915   GST_OBJECT_LOCK (appsrc);
916   caps = priv->caps ? gst_caps_ref (priv->caps) : NULL;
917   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
918           bsrc->segment.format == GST_FORMAT_BYTES)) {
919     GST_DEBUG_OBJECT (appsrc,
920         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
921         bsrc->segment.duration, priv->size);
922     bsrc->segment.duration = priv->size;
923     GST_OBJECT_UNLOCK (appsrc);
924
925     gst_element_post_message (GST_ELEMENT (appsrc),
926         gst_message_new_duration (GST_OBJECT (appsrc), GST_FORMAT_BYTES,
927             priv->size));
928   } else {
929     GST_OBJECT_UNLOCK (appsrc);
930   }
931
932   g_mutex_lock (priv->mutex);
933   /* check flushing first */
934   if (G_UNLIKELY (priv->flushing))
935     goto flushing;
936
937   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
938     /* if we are dealing with a random-access stream, issue a seek if the offset
939      * changed. */
940     if (G_UNLIKELY (priv->offset != offset)) {
941       gboolean res;
942
943       /* do the seek */
944       res = gst_app_src_emit_seek (appsrc, offset);
945
946       if (G_UNLIKELY (!res))
947         /* failing to seek is fatal */
948         goto seek_error;
949
950       priv->offset = offset;
951     }
952   }
953
954   while (TRUE) {
955     /* return data as long as we have some */
956     if (!g_queue_is_empty (priv->queue)) {
957       guint buf_size;
958
959       *buf = g_queue_pop_head (priv->queue);
960       buf_size = gst_buffer_get_size (*buf);
961
962       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
963
964       priv->queued_bytes -= buf_size;
965
966       /* only update the offset when in random_access mode */
967       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
968         priv->offset += buf_size;
969
970       /* signal that we removed an item */
971       g_cond_broadcast (priv->cond);
972
973       /* see if we go lower than the empty-percent */
974       if (priv->min_percent && priv->max_bytes) {
975         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
976           /* ignore flushing state, we got a buffer and we will return it now.
977            * Errors will be handled in the next round */
978           gst_app_src_emit_need_data (appsrc, size);
979       }
980       ret = GST_FLOW_OK;
981       break;
982     } else {
983       gst_app_src_emit_need_data (appsrc, size);
984
985       /* we can be flushing now because we released the lock above */
986       if (G_UNLIKELY (priv->flushing))
987         goto flushing;
988
989       /* if we have a buffer now, continue the loop and try to return it. In
990        * random-access mode (where a buffer is normally pushed in the above
991        * signal) we can still be empty because the pushed buffer got flushed or
992        * when the application pushes the requested buffer later, we support both
993        * possiblities. */
994       if (!g_queue_is_empty (priv->queue))
995         continue;
996
997       /* no buffer yet, maybe we are EOS, if not, block for more data. */
998     }
999
1000     /* check EOS */
1001     if (G_UNLIKELY (priv->is_eos))
1002       goto eos;
1003
1004     /* nothing to return, wait a while for new data or flushing. */
1005     g_cond_wait (priv->cond, priv->mutex);
1006   }
1007   g_mutex_unlock (priv->mutex);
1008   if (caps)
1009     gst_caps_unref (caps);
1010   return ret;
1011
1012   /* ERRORS */
1013 flushing:
1014   {
1015     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1016     g_mutex_unlock (priv->mutex);
1017     if (caps)
1018       gst_caps_unref (caps);
1019     return GST_FLOW_WRONG_STATE;
1020   }
1021 eos:
1022   {
1023     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1024     g_mutex_unlock (priv->mutex);
1025     if (caps)
1026       gst_caps_unref (caps);
1027     return GST_FLOW_UNEXPECTED;
1028   }
1029 seek_error:
1030   {
1031     g_mutex_unlock (priv->mutex);
1032     if (caps)
1033       gst_caps_unref (caps);
1034     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1035         GST_ERROR_SYSTEM);
1036     return GST_FLOW_ERROR;
1037   }
1038 }
1039
1040 /* external API */
1041
1042 /**
1043  * gst_app_src_set_caps:
1044  * @appsrc: a #GstAppSrc
1045  * @caps: caps to set
1046  *
1047  * Set the capabilities on the appsrc element.  This function takes
1048  * a copy of the caps structure. After calling this method, the source will
1049  * only produce caps that match @caps. @caps must be fixed and the caps on the
1050  * buffers must match the caps or left NULL.
1051  *
1052  * Since: 0.10.22
1053  */
1054 void
1055 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1056 {
1057   GstCaps *old;
1058   GstAppSrcPrivate *priv;
1059
1060   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1061
1062   priv = appsrc->priv;
1063
1064   GST_OBJECT_LOCK (appsrc);
1065   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1066   if ((old = priv->caps) != caps) {
1067     if (caps)
1068       priv->caps = gst_caps_copy (caps);
1069     else
1070       priv->caps = NULL;
1071     if (old)
1072       gst_caps_unref (old);
1073   }
1074   GST_OBJECT_UNLOCK (appsrc);
1075 }
1076
1077 /**
1078  * gst_app_src_get_caps:
1079  * @appsrc: a #GstAppSrc
1080  *
1081  * Get the configured caps on @appsrc.
1082  *
1083  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1084  *
1085  * Since: 0.10.22
1086  */
1087 GstCaps *
1088 gst_app_src_get_caps (GstAppSrc * appsrc)
1089 {
1090   GstCaps *caps;
1091   GstAppSrcPrivate *priv;
1092
1093   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1094
1095   priv = appsrc->priv;
1096
1097   GST_OBJECT_LOCK (appsrc);
1098   if ((caps = priv->caps))
1099     gst_caps_ref (caps);
1100   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1101   GST_OBJECT_UNLOCK (appsrc);
1102
1103   return caps;
1104 }
1105
1106 /**
1107  * gst_app_src_set_size:
1108  * @appsrc: a #GstAppSrc
1109  * @size: the size to set
1110  *
1111  * Set the size of the stream in bytes. A value of -1 means that the size is
1112  * not known.
1113  *
1114  * Since: 0.10.22
1115  */
1116 void
1117 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1118 {
1119   GstAppSrcPrivate *priv;
1120
1121   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1122
1123   priv = appsrc->priv;
1124
1125   GST_OBJECT_LOCK (appsrc);
1126   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1127   priv->size = size;
1128   GST_OBJECT_UNLOCK (appsrc);
1129 }
1130
1131 /**
1132  * gst_app_src_get_size:
1133  * @appsrc: a #GstAppSrc
1134  *
1135  * Get the size of the stream in bytes. A value of -1 means that the size is
1136  * not known.
1137  *
1138  * Returns: the size of the stream previously set with gst_app_src_set_size();
1139  *
1140  * Since: 0.10.22
1141  */
1142 gint64
1143 gst_app_src_get_size (GstAppSrc * appsrc)
1144 {
1145   gint64 size;
1146   GstAppSrcPrivate *priv;
1147
1148   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1149
1150   priv = appsrc->priv;
1151
1152   GST_OBJECT_LOCK (appsrc);
1153   size = priv->size;
1154   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1155   GST_OBJECT_UNLOCK (appsrc);
1156
1157   return size;
1158 }
1159
1160 /**
1161  * gst_app_src_set_stream_type:
1162  * @appsrc: a #GstAppSrc
1163  * @type: the new state
1164  *
1165  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1166  * be connected to.
1167  *
1168  * A stream_type stream
1169  *
1170  * Since: 0.10.22
1171  */
1172 void
1173 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1174 {
1175   GstAppSrcPrivate *priv;
1176
1177   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1178
1179   priv = appsrc->priv;
1180
1181   GST_OBJECT_LOCK (appsrc);
1182   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1183   priv->stream_type = type;
1184   GST_OBJECT_UNLOCK (appsrc);
1185 }
1186
1187 /**
1188  * gst_app_src_get_stream_type:
1189  * @appsrc: a #GstAppSrc
1190  *
1191  * Get the stream type. Control the stream type of @appsrc
1192  * with gst_app_src_set_stream_type().
1193  *
1194  * Returns: the stream type.
1195  *
1196  * Since: 0.10.22
1197  */
1198 GstAppStreamType
1199 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1200 {
1201   gboolean stream_type;
1202   GstAppSrcPrivate *priv;
1203
1204   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1205
1206   priv = appsrc->priv;
1207
1208   GST_OBJECT_LOCK (appsrc);
1209   stream_type = priv->stream_type;
1210   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1211   GST_OBJECT_UNLOCK (appsrc);
1212
1213   return stream_type;
1214 }
1215
1216 /**
1217  * gst_app_src_set_max_bytes:
1218  * @appsrc: a #GstAppSrc
1219  * @max: the maximum number of bytes to queue
1220  *
1221  * Set the maximum amount of bytes that can be queued in @appsrc.
1222  * After the maximum amount of bytes are queued, @appsrc will emit the
1223  * "enough-data" signal.
1224  *
1225  * Since: 0.10.22
1226  */
1227 void
1228 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1229 {
1230   GstAppSrcPrivate *priv;
1231
1232   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1233
1234   priv = appsrc->priv;
1235
1236   g_mutex_lock (priv->mutex);
1237   if (max != priv->max_bytes) {
1238     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1239     priv->max_bytes = max;
1240     /* signal the change */
1241     g_cond_broadcast (priv->cond);
1242   }
1243   g_mutex_unlock (priv->mutex);
1244 }
1245
1246 /**
1247  * gst_app_src_get_max_bytes:
1248  * @appsrc: a #GstAppSrc
1249  *
1250  * Get the maximum amount of bytes that can be queued in @appsrc.
1251  *
1252  * Returns: The maximum amount of bytes that can be queued.
1253  *
1254  * Since: 0.10.22
1255  */
1256 guint64
1257 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1258 {
1259   guint64 result;
1260   GstAppSrcPrivate *priv;
1261
1262   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1263
1264   priv = appsrc->priv;
1265
1266   g_mutex_lock (priv->mutex);
1267   result = priv->max_bytes;
1268   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1269   g_mutex_unlock (priv->mutex);
1270
1271   return result;
1272 }
1273
1274 static void
1275 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1276     gboolean do_max, guint64 max)
1277 {
1278   GstAppSrcPrivate *priv = appsrc->priv;
1279   gboolean changed = FALSE;
1280
1281   g_mutex_lock (priv->mutex);
1282   if (do_min && priv->min_latency != min) {
1283     priv->min_latency = min;
1284     changed = TRUE;
1285   }
1286   if (do_max && priv->max_latency != max) {
1287     priv->max_latency = max;
1288     changed = TRUE;
1289   }
1290   g_mutex_unlock (priv->mutex);
1291
1292   if (changed) {
1293     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1294     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1295         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1296   }
1297 }
1298
1299 /**
1300  * gst_app_src_set_latency:
1301  * @appsrc: a #GstAppSrc
1302  * @min: the min latency
1303  * @max: the min latency
1304  *
1305  * Configure the @min and @max latency in @src. If @min is set to -1, the
1306  * default latency calculations for pseudo-live sources will be used.
1307  *
1308  * Since: 0.10.22
1309  */
1310 void
1311 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1312 {
1313   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1314 }
1315
1316 /**
1317  * gst_app_src_get_latency:
1318  * @appsrc: a #GstAppSrc
1319  * @min: the min latency
1320  * @max: the min latency
1321  *
1322  * Retrieve the min and max latencies in @min and @max respectively.
1323  *
1324  * Since: 0.10.22
1325  */
1326 void
1327 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1328 {
1329   GstAppSrcPrivate *priv;
1330
1331   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1332
1333   priv = appsrc->priv;
1334
1335   g_mutex_lock (priv->mutex);
1336   if (min)
1337     *min = priv->min_latency;
1338   if (max)
1339     *max = priv->max_latency;
1340   g_mutex_unlock (priv->mutex);
1341 }
1342
1343 /**
1344  * gst_app_src_set_emit_signals:
1345  * @appsrc: a #GstAppSrc
1346  * @emit: the new state
1347  *
1348  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1349  * by default disabled because signal emission is expensive and unneeded when
1350  * the application prefers to operate in pull mode.
1351  *
1352  * Since: 0.10.23
1353  */
1354 void
1355 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1356 {
1357   GstAppSrcPrivate *priv;
1358
1359   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1360
1361   priv = appsrc->priv;
1362
1363   g_mutex_lock (priv->mutex);
1364   priv->emit_signals = emit;
1365   g_mutex_unlock (priv->mutex);
1366 }
1367
1368 /**
1369  * gst_app_src_get_emit_signals:
1370  * @appsrc: a #GstAppSrc
1371  *
1372  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1373  *
1374  * Returns: %TRUE if @appsrc is emiting the "new-preroll" and "new-buffer"
1375  * signals.
1376  *
1377  * Since: 0.10.23
1378  */
1379 gboolean
1380 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1381 {
1382   gboolean result;
1383   GstAppSrcPrivate *priv;
1384
1385   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1386
1387   priv = appsrc->priv;
1388
1389   g_mutex_lock (priv->mutex);
1390   result = priv->emit_signals;
1391   g_mutex_unlock (priv->mutex);
1392
1393   return result;
1394 }
1395
1396 static GstFlowReturn
1397 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1398     gboolean steal_ref)
1399 {
1400   gboolean first = TRUE;
1401   GstAppSrcPrivate *priv;
1402
1403   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1404   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1405
1406   priv = appsrc->priv;
1407
1408   g_mutex_lock (priv->mutex);
1409
1410   while (TRUE) {
1411     /* can't accept buffers when we are flushing or EOS */
1412     if (priv->flushing)
1413       goto flushing;
1414
1415     if (priv->is_eos)
1416       goto eos;
1417
1418     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1419       GST_DEBUG_OBJECT (appsrc,
1420           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1421           priv->queued_bytes, priv->max_bytes);
1422
1423       if (first) {
1424         gboolean emit;
1425
1426         emit = priv->emit_signals;
1427         /* only signal on the first push */
1428         g_mutex_unlock (priv->mutex);
1429
1430         if (priv->callbacks.enough_data)
1431           priv->callbacks.enough_data (appsrc, priv->user_data);
1432         else if (emit)
1433           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1434               NULL);
1435
1436         g_mutex_lock (priv->mutex);
1437         /* continue to check for flushing/eos after releasing the lock */
1438         first = FALSE;
1439         continue;
1440       }
1441       if (priv->block) {
1442         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1443         /* we are filled, wait until a buffer gets popped or when we
1444          * flush. */
1445         g_cond_wait (priv->cond, priv->mutex);
1446       } else {
1447         /* no need to wait for free space, we just pump more data into the
1448          * queue hoping that the caller reacts to the enough-data signal and
1449          * stops pushing buffers. */
1450         break;
1451       }
1452     } else
1453       break;
1454   }
1455
1456   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1457   if (!steal_ref)
1458     gst_buffer_ref (buffer);
1459   g_queue_push_tail (priv->queue, buffer);
1460   priv->queued_bytes += gst_buffer_get_size (buffer);
1461   g_cond_broadcast (priv->cond);
1462   g_mutex_unlock (priv->mutex);
1463
1464   return GST_FLOW_OK;
1465
1466   /* ERRORS */
1467 flushing:
1468   {
1469     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1470     if (steal_ref)
1471       gst_buffer_unref (buffer);
1472     g_mutex_unlock (priv->mutex);
1473     return GST_FLOW_WRONG_STATE;
1474   }
1475 eos:
1476   {
1477     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1478     if (steal_ref)
1479       gst_buffer_unref (buffer);
1480     g_mutex_unlock (priv->mutex);
1481     return GST_FLOW_UNEXPECTED;
1482   }
1483 }
1484
1485 /**
1486  * gst_app_src_push_buffer:
1487  * @appsrc: a #GstAppSrc
1488  * @buffer: a #GstBuffer to push
1489  *
1490  * Adds a buffer to the queue of buffers that the appsrc element will
1491  * push to its source pad.  This function takes ownership of the buffer.
1492  *
1493  * When the block property is TRUE, this function can block until free
1494  * space becomes available in the queue.
1495  *
1496  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1497  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1498  * #GST_FLOW_UNEXPECTED when EOS occured.
1499  *
1500  * Since: 0.10.22
1501  */
1502 GstFlowReturn
1503 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1504 {
1505   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1506 }
1507
1508 /* push a buffer without stealing the ref of the buffer. This is used for the
1509  * action signal. */
1510 static GstFlowReturn
1511 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1512 {
1513   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1514 }
1515
1516 /**
1517  * gst_app_src_end_of_stream:
1518  * @appsrc: a #GstAppSrc
1519  *
1520  * Indicates to the appsrc element that the last buffer queued in the
1521  * element is the last buffer of the stream.
1522  *
1523  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1524  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1525  *
1526  * Since: 0.10.22
1527  */
1528 GstFlowReturn
1529 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1530 {
1531   GstAppSrcPrivate *priv;
1532
1533   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1534
1535   priv = appsrc->priv;
1536
1537   g_mutex_lock (priv->mutex);
1538   /* can't accept buffers when we are flushing. We can accept them when we are
1539    * EOS although it will not do anything. */
1540   if (priv->flushing)
1541     goto flushing;
1542
1543   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1544   priv->is_eos = TRUE;
1545   g_cond_broadcast (priv->cond);
1546   g_mutex_unlock (priv->mutex);
1547
1548   return GST_FLOW_OK;
1549
1550   /* ERRORS */
1551 flushing:
1552   {
1553     g_mutex_unlock (priv->mutex);
1554     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1555     return GST_FLOW_WRONG_STATE;
1556   }
1557 }
1558
1559 /**
1560  * gst_app_src_set_callbacks:
1561  * @appsrc: a #GstAppSrc
1562  * @callbacks: the callbacks
1563  * @user_data: a user_data argument for the callbacks
1564  * @notify: a destroy notify function
1565  *
1566  * Set callbacks which will be executed when data is needed, enough data has
1567  * been collected or when a seek should be performed.
1568  * This is an alternative to using the signals, it has lower overhead and is thus
1569  * less expensive, but also less flexible.
1570  *
1571  * If callbacks are installed, no signals will be emited for performance
1572  * reasons.
1573  *
1574  * Since: 0.10.23
1575  */
1576 void
1577 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1578     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1579 {
1580   GDestroyNotify old_notify;
1581   GstAppSrcPrivate *priv;
1582
1583   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1584   g_return_if_fail (callbacks != NULL);
1585
1586   priv = appsrc->priv;
1587
1588   GST_OBJECT_LOCK (appsrc);
1589   old_notify = priv->notify;
1590
1591   if (old_notify) {
1592     gpointer old_data;
1593
1594     old_data = priv->user_data;
1595
1596     priv->user_data = NULL;
1597     priv->notify = NULL;
1598     GST_OBJECT_UNLOCK (appsrc);
1599
1600     old_notify (old_data);
1601
1602     GST_OBJECT_LOCK (appsrc);
1603   }
1604   priv->callbacks = *callbacks;
1605   priv->user_data = user_data;
1606   priv->notify = notify;
1607   GST_OBJECT_UNLOCK (appsrc);
1608 }
1609
1610 /*** GSTURIHANDLER INTERFACE *************************************************/
1611
1612 static GstURIType
1613 gst_app_src_uri_get_type (void)
1614 {
1615   return GST_URI_SRC;
1616 }
1617
1618 static gchar **
1619 gst_app_src_uri_get_protocols (void)
1620 {
1621   static gchar *protocols[] = { (char *) "appsrc", NULL };
1622
1623   return protocols;
1624 }
1625
1626 static const gchar *
1627 gst_app_src_uri_get_uri (GstURIHandler * handler)
1628 {
1629   return "appsrc";
1630 }
1631
1632 static gboolean
1633 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1634 {
1635   gchar *protocol;
1636   gboolean ret;
1637
1638   protocol = gst_uri_get_protocol (uri);
1639   ret = !strcmp (protocol, "appsrc");
1640   g_free (protocol);
1641
1642   return ret;
1643 }
1644
1645 static void
1646 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1647 {
1648   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1649
1650   iface->get_type = gst_app_src_uri_get_type;
1651   iface->get_protocols = gst_app_src_uri_get_protocols;
1652   iface->get_uri = gst_app_src_uri_get_uri;
1653   iface->set_uri = gst_app_src_uri_set_uri;
1654 }