Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @short_description: Easy way for applications to inject buffers into a
23  *     pipeline
24  * @see_also: #GstBaseSrc, appsink
25  *
26  * The appsrc element can be used by applications to insert data into a
27  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
28  * external API functions.
29  *
30  * appsrc can be used by linking with the libgstapp library to access the
31  * methods directly or by using the appsrc action signals.
32  *
33  * Before operating appsrc, the caps property must be set to a fixed caps
34  * describing the format of the data that will be pushed with appsrc. An
35  * exception to this is when pushing buffers with unknown caps, in which case no
36  * caps should be set. This is typically true of file-like sources that push raw
37  * byte buffers.
38  *
39  * The main way of handing data to the appsrc element is by calling the
40  * gst_app_src_push_buffer() method or by emiting the push-buffer action signal.
41  * This will put the buffer onto a queue from which appsrc will read from in its
42  * streaming thread. It is important to note that data transport will not happen
43  * from the thread that performed the push-buffer call.
44  *
45  * The "max-bytes" property controls how much data can be queued in appsrc
46  * before appsrc considers the queue full. A filled internal queue will always
47  * signal the "enough-data" signal, which signals the application that it should
48  * stop pushing data into appsrc. The "block" property will cause appsrc to
49  * block the push-buffer method until free data becomes available again.
50  *
51  * When the internal queue is running out of data, the "need-data" signal is
52  * emited, which signals the application that it should start pushing more data
53  * into appsrc.
54  *
55  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
56  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
57  * "random-access". The signal argument will contain the new desired position in
58  * the stream expressed in the unit set with the "format" property. After
59  * receiving the seek-data signal, the application should push-buffers from the
60  * new position.
61  *
62  * These signals allow the application to operate the appsrc in two different
63  * ways:
64  *
65  * The push model, in which the application repeadedly calls the push-buffer method
66  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
67  * with the enough-data and need-data signals by respectively stopping/starting
68  * the push-buffer calls. This is a typical mode of operation for the
69  * stream-type "stream" and "seekable". Use this model when implementing various
70  * network protocols or hardware devices.
71  *
72  * The pull model where the need-data signal triggers the next push-buffer call.
73  * This mode is typically used in the "random-access" stream-type. Use this
74  * model for file access or other randomly accessable sources. In this mode, a
75  * buffer of exactly the amount of bytes given by the need-data signal should be
76  * pushed into appsrc.
77  *
78  * In all modes, the size property on appsrc should contain the total stream
79  * size in bytes. Setting this property is mandatory in the random-access mode.
80  * For the stream and seekable modes, setting this property is optional but
81  * recommended.
82  *
83  * When the application is finished pushing data into appsrc, it should call
84  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
85  * this call, no more buffers can be pushed into appsrc until a flushing seek
86  * happened or the state of the appsrc has gone through READY.
87  *
88  * Last reviewed on 2008-12-17 (0.10.10)
89  *
90  * Since: 0.10.22
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/base/gstbasesrc.h>
99
100 #include <string.h>
101
102 #include "gstapp-marshal.h"
103 #include "gstappsrc.h"
104
105 struct _GstAppSrcPrivate
106 {
107   GCond *cond;
108   GMutex *mutex;
109   GQueue *queue;
110
111   GstCaps *caps;
112   gint64 size;
113   GstAppStreamType stream_type;
114   guint64 max_bytes;
115   GstFormat format;
116   gboolean block;
117
118   gboolean flushing;
119   gboolean started;
120   gboolean is_eos;
121   guint64 queued_bytes;
122   guint64 offset;
123   GstAppStreamType current_type;
124
125   guint64 min_latency;
126   guint64 max_latency;
127   gboolean emit_signals;
128   guint min_percent;
129
130   GstAppSrcCallbacks callbacks;
131   gpointer user_data;
132   GDestroyNotify notify;
133 };
134
135 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
136 #define GST_CAT_DEFAULT app_src_debug
137
138 enum
139 {
140   /* signals */
141   SIGNAL_NEED_DATA,
142   SIGNAL_ENOUGH_DATA,
143   SIGNAL_SEEK_DATA,
144
145   /* actions */
146   SIGNAL_PUSH_BUFFER,
147   SIGNAL_END_OF_STREAM,
148
149   LAST_SIGNAL
150 };
151
152 #define DEFAULT_PROP_SIZE          -1
153 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
154 #define DEFAULT_PROP_MAX_BYTES     200000
155 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
156 #define DEFAULT_PROP_BLOCK         FALSE
157 #define DEFAULT_PROP_IS_LIVE       FALSE
158 #define DEFAULT_PROP_MIN_LATENCY   -1
159 #define DEFAULT_PROP_MAX_LATENCY   -1
160 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
161 #define DEFAULT_PROP_MIN_PERCENT   0
162
163 enum
164 {
165   PROP_0,
166   PROP_CAPS,
167   PROP_SIZE,
168   PROP_STREAM_TYPE,
169   PROP_MAX_BYTES,
170   PROP_FORMAT,
171   PROP_BLOCK,
172   PROP_IS_LIVE,
173   PROP_MIN_LATENCY,
174   PROP_MAX_LATENCY,
175   PROP_EMIT_SIGNALS,
176   PROP_MIN_PERCENT,
177   PROP_LAST
178 };
179
180 static GstStaticPadTemplate gst_app_src_template =
181 GST_STATIC_PAD_TEMPLATE ("src",
182     GST_PAD_SRC,
183     GST_PAD_ALWAYS,
184     GST_STATIC_CAPS_ANY);
185
186 GType
187 gst_app_stream_type_get_type (void)
188 {
189   static volatile gsize stream_type_type = 0;
190   static const GEnumValue stream_type[] = {
191     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
192     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
193     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
194         "random-access"},
195     {0, NULL, NULL}
196   };
197
198   if (g_once_init_enter (&stream_type_type)) {
199     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
200     g_once_init_leave (&stream_type_type, tmp);
201   }
202
203   return (GType) stream_type_type;
204 }
205
206 static void gst_app_src_uri_handler_init (gpointer g_iface,
207     gpointer iface_data);
208
209 static void gst_app_src_dispose (GObject * object);
210 static void gst_app_src_finalize (GObject * object);
211
212 static void gst_app_src_set_property (GObject * object, guint prop_id,
213     const GValue * value, GParamSpec * pspec);
214 static void gst_app_src_get_property (GObject * object, guint prop_id,
215     GValue * value, GParamSpec * pspec);
216
217 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
218     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
219
220 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
221     guint64 offset, guint size, GstBuffer ** buf);
222 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
223 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
224 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
225 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
227 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
228 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
229 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
230
231 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
232     GstBuffer * buffer);
233
234 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
235
236 #define gst_app_src_parent_class parent_class
237 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
238     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
239
240 static void
241 gst_app_src_class_init (GstAppSrcClass * klass)
242 {
243   GObjectClass *gobject_class = (GObjectClass *) klass;
244   GstElementClass *element_class = (GstElementClass *) klass;
245   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
246
247   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
248
249   gobject_class->dispose = gst_app_src_dispose;
250   gobject_class->finalize = gst_app_src_finalize;
251
252   gobject_class->set_property = gst_app_src_set_property;
253   gobject_class->get_property = gst_app_src_get_property;
254
255   /**
256    * GstAppSrc::caps
257    *
258    * The GstCaps that will negotiated downstream and will be put
259    * on outgoing buffers.
260    */
261   g_object_class_install_property (gobject_class, PROP_CAPS,
262       g_param_spec_boxed ("caps", "Caps",
263           "The allowed caps for the src pad", GST_TYPE_CAPS,
264           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265   /**
266    * GstAppSrc::format
267    *
268    * The format to use for segment events. When the source is producing
269    * timestamped buffers this property should be set to GST_FORMAT_TIME.
270    */
271   g_object_class_install_property (gobject_class, PROP_FORMAT,
272       g_param_spec_enum ("format", "Format",
273           "The format of the segment events and seek", GST_TYPE_FORMAT,
274           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
275   /**
276    * GstAppSrc::size
277    *
278    * The total size in bytes of the data stream. If the total size is known, it
279    * is recommended to configure it with this property.
280    */
281   g_object_class_install_property (gobject_class, PROP_SIZE,
282       g_param_spec_int64 ("size", "Size",
283           "The size of the data stream in bytes (-1 if unknown)",
284           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
285           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   /**
287    * GstAppSrc::stream-type
288    *
289    * The type of stream that this source is producing.  For seekable streams the
290    * application should connect to the seek-data signal.
291    */
292   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
293       g_param_spec_enum ("stream-type", "Stream Type",
294           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
295           DEFAULT_PROP_STREAM_TYPE,
296           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297   /**
298    * GstAppSrc::max-bytes
299    *
300    * The maximum amount of bytes that can be queued internally.
301    * After the maximum amount of bytes are queued, appsrc will emit the
302    * "enough-data" signal.
303    */
304   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
305       g_param_spec_uint64 ("max-bytes", "Max bytes",
306           "The maximum number of bytes to queue internally (0 = unlimited)",
307           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
308           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
309   /**
310    * GstAppSrc::block
311    *
312    * When max-bytes are queued and after the enough-data signal has been emited,
313    * block any further push-buffer calls until the amount of queued bytes drops
314    * below the max-bytes limit.
315    */
316   g_object_class_install_property (gobject_class, PROP_BLOCK,
317       g_param_spec_boolean ("block", "Block",
318           "Block push-buffer when max-bytes are queued",
319           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
320
321   /**
322    * GstAppSrc::is-live
323    *
324    * Instruct the source to behave like a live source. This includes that it
325    * will only push out buffers in the PLAYING state.
326    */
327   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
328       g_param_spec_boolean ("is-live", "Is Live",
329           "Whether to act as a live source",
330           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
331   /**
332    * GstAppSrc::min-latency
333    *
334    * The minimum latency of the source. A value of -1 will use the default
335    * latency calculations of #GstBaseSrc.
336    */
337   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
338       g_param_spec_int64 ("min-latency", "Min Latency",
339           "The minimum latency (-1 = default)",
340           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
341           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342   /**
343    * GstAppSrc::max-latency
344    *
345    * The maximum latency of the source. A value of -1 means an unlimited amout
346    * of latency.
347    */
348   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
349       g_param_spec_int64 ("max-latency", "Max Latency",
350           "The maximum latency (-1 = unlimited)",
351           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353
354   /**
355    * GstAppSrc::emit-signals
356    *
357    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
358    * This option is by default enabled for backwards compatibility reasons but
359    * can disabled when needed because signal emission is expensive.
360    *
361    * Since: 0.10.23
362    */
363   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
364       g_param_spec_boolean ("emit-signals", "Emit signals",
365           "Emit need-data, enough-data and seek-data signals",
366           DEFAULT_PROP_EMIT_SIGNALS,
367           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368
369   /**
370    * GstAppSrc::empty-percent
371    *
372    * Make appsrc emit the "need-data" signal when the amount of bytes in the
373    * queue drops below this percentage of max-bytes.
374    *
375    * Since: 0.10.27
376    */
377   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
378       g_param_spec_uint ("min-percent", "Min Percent",
379           "Emit need-data when queued bytes drops below this percent of max-bytes",
380           0, 100, DEFAULT_PROP_MIN_PERCENT,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   /**
384    * GstAppSrc::need-data:
385    * @appsrc: the appsrc element that emited the signal
386    * @length: the amount of bytes needed.
387    *
388    * Signal that the source needs more data. In the callback or from another
389    * thread you should call push-buffer or end-of-stream.
390    *
391    * @length is just a hint and when it is set to -1, any number of bytes can be
392    * pushed into @appsrc.
393    *
394    * You can call push-buffer multiple times until the enough-data signal is
395    * fired.
396    */
397   gst_app_src_signals[SIGNAL_NEED_DATA] =
398       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
399       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
400       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
401
402   /**
403    * GstAppSrc::enough-data:
404    * @appsrc: the appsrc element that emited the signal
405    *
406    * Signal that the source has enough data. It is recommended that the
407    * application stops calling push-buffer until the need-data signal is
408    * emited again to avoid excessive buffer queueing.
409    */
410   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
411       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
412       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
413       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
414
415   /**
416    * GstAppSrc::seek-data:
417    * @appsrc: the appsrc element that emited the signal
418    * @offset: the offset to seek to
419    *
420    * Seek to the given offset. The next push-buffer should produce buffers from
421    * the new @offset.
422    * This callback is only called for seekable stream types.
423    *
424    * Returns: %TRUE if the seek succeeded.
425    */
426   gst_app_src_signals[SIGNAL_SEEK_DATA] =
427       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
428       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
429       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
430       G_TYPE_UINT64);
431
432    /**
433     * GstAppSrc::push-buffer:
434     * @appsrc: the appsrc
435     * @buffer: a buffer to push
436     *
437     * Adds a buffer to the queue of buffers that the appsrc element will
438     * push to its source pad. This function does not take ownership of the
439     * buffer so the buffer needs to be unreffed after calling this function.
440     *
441     * When the block property is TRUE, this function can block until free space
442     * becomes available in the queue.
443     */
444   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
445       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
446       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
447           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
448       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
449
450    /**
451     * GstAppSrc::end-of-stream:
452     * @appsrc: the appsrc
453     *
454     * Notify @appsrc that no more buffer are available.
455     */
456   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
457       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
458       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
459           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
460       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
461
462   gst_element_class_set_details_simple (element_class, "AppSrc",
463       "Generic/Source", "Allow the application to feed buffers to a pipeline",
464       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
465
466   gst_element_class_add_pad_template (element_class,
467       gst_static_pad_template_get (&gst_app_src_template));
468
469   basesrc_class->create = gst_app_src_create;
470   basesrc_class->start = gst_app_src_start;
471   basesrc_class->stop = gst_app_src_stop;
472   basesrc_class->unlock = gst_app_src_unlock;
473   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
474   basesrc_class->do_seek = gst_app_src_do_seek;
475   basesrc_class->is_seekable = gst_app_src_is_seekable;
476   basesrc_class->get_size = gst_app_src_do_get_size;
477   basesrc_class->get_size = gst_app_src_do_get_size;
478   basesrc_class->query = gst_app_src_query;
479
480   klass->push_buffer = gst_app_src_push_buffer_action;
481   klass->end_of_stream = gst_app_src_end_of_stream;
482
483   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
484 }
485
486 static void
487 gst_app_src_init (GstAppSrc * appsrc)
488 {
489   GstAppSrcPrivate *priv;
490
491   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
492       GstAppSrcPrivate);
493
494   priv->mutex = g_mutex_new ();
495   priv->cond = g_cond_new ();
496   priv->queue = g_queue_new ();
497
498   priv->size = DEFAULT_PROP_SIZE;
499   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
500   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
501   priv->format = DEFAULT_PROP_FORMAT;
502   priv->block = DEFAULT_PROP_BLOCK;
503   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
504   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
505   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
506   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
507
508   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
509 }
510
511 static void
512 gst_app_src_flush_queued (GstAppSrc * src)
513 {
514   GstBuffer *buf;
515   GstAppSrcPrivate *priv = src->priv;
516
517   while ((buf = g_queue_pop_head (priv->queue)))
518     gst_buffer_unref (buf);
519   priv->queued_bytes = 0;
520 }
521
522 static void
523 gst_app_src_dispose (GObject * obj)
524 {
525   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
526   GstAppSrcPrivate *priv = appsrc->priv;
527
528   if (priv->caps) {
529     gst_caps_unref (priv->caps);
530     priv->caps = NULL;
531   }
532   gst_app_src_flush_queued (appsrc);
533
534   G_OBJECT_CLASS (parent_class)->dispose (obj);
535 }
536
537 static void
538 gst_app_src_finalize (GObject * obj)
539 {
540   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
541   GstAppSrcPrivate *priv = appsrc->priv;
542
543   g_mutex_free (priv->mutex);
544   g_cond_free (priv->cond);
545   g_queue_free (priv->queue);
546
547   G_OBJECT_CLASS (parent_class)->finalize (obj);
548 }
549
550 static void
551 gst_app_src_set_property (GObject * object, guint prop_id,
552     const GValue * value, GParamSpec * pspec)
553 {
554   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
555   GstAppSrcPrivate *priv = appsrc->priv;
556
557   switch (prop_id) {
558     case PROP_CAPS:
559       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
560       break;
561     case PROP_SIZE:
562       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
563       break;
564     case PROP_STREAM_TYPE:
565       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
566       break;
567     case PROP_MAX_BYTES:
568       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
569       break;
570     case PROP_FORMAT:
571       priv->format = g_value_get_enum (value);
572       break;
573     case PROP_BLOCK:
574       priv->block = g_value_get_boolean (value);
575       break;
576     case PROP_IS_LIVE:
577       gst_base_src_set_live (GST_BASE_SRC (appsrc),
578           g_value_get_boolean (value));
579       break;
580     case PROP_MIN_LATENCY:
581       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
582           FALSE, -1);
583       break;
584     case PROP_MAX_LATENCY:
585       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
586           g_value_get_int64 (value));
587       break;
588     case PROP_EMIT_SIGNALS:
589       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
590       break;
591     case PROP_MIN_PERCENT:
592       priv->min_percent = g_value_get_uint (value);
593       break;
594     default:
595       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
596       break;
597   }
598 }
599
600 static void
601 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
602     GParamSpec * pspec)
603 {
604   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
605   GstAppSrcPrivate *priv = appsrc->priv;
606
607   switch (prop_id) {
608     case PROP_CAPS:
609     {
610       GstCaps *caps;
611
612       /* we're missing a _take_caps() function to transfer ownership */
613       caps = gst_app_src_get_caps (appsrc);
614       gst_value_set_caps (value, caps);
615       if (caps)
616         gst_caps_unref (caps);
617       break;
618     }
619     case PROP_SIZE:
620       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
621       break;
622     case PROP_STREAM_TYPE:
623       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
624       break;
625     case PROP_MAX_BYTES:
626       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
627       break;
628     case PROP_FORMAT:
629       g_value_set_enum (value, priv->format);
630       break;
631     case PROP_BLOCK:
632       g_value_set_boolean (value, priv->block);
633       break;
634     case PROP_IS_LIVE:
635       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
636       break;
637     case PROP_MIN_LATENCY:
638     {
639       guint64 min;
640
641       gst_app_src_get_latency (appsrc, &min, NULL);
642       g_value_set_int64 (value, min);
643       break;
644     }
645     case PROP_MAX_LATENCY:
646     {
647       guint64 max;
648
649       gst_app_src_get_latency (appsrc, &max, NULL);
650       g_value_set_int64 (value, max);
651       break;
652     }
653     case PROP_EMIT_SIGNALS:
654       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
655       break;
656     case PROP_MIN_PERCENT:
657       g_value_set_uint (value, priv->min_percent);
658       break;
659     default:
660       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
661       break;
662   }
663 }
664
665 static gboolean
666 gst_app_src_unlock (GstBaseSrc * bsrc)
667 {
668   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
669   GstAppSrcPrivate *priv = appsrc->priv;
670
671   g_mutex_lock (priv->mutex);
672   GST_DEBUG_OBJECT (appsrc, "unlock start");
673   priv->flushing = TRUE;
674   g_cond_broadcast (priv->cond);
675   g_mutex_unlock (priv->mutex);
676
677   return TRUE;
678 }
679
680 static gboolean
681 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
682 {
683   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
684   GstAppSrcPrivate *priv = appsrc->priv;
685
686   g_mutex_lock (priv->mutex);
687   GST_DEBUG_OBJECT (appsrc, "unlock stop");
688   priv->flushing = FALSE;
689   g_cond_broadcast (priv->cond);
690   g_mutex_unlock (priv->mutex);
691
692   return TRUE;
693 }
694
695 static gboolean
696 gst_app_src_start (GstBaseSrc * bsrc)
697 {
698   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
699   GstAppSrcPrivate *priv = appsrc->priv;
700
701   g_mutex_lock (priv->mutex);
702   GST_DEBUG_OBJECT (appsrc, "starting");
703   priv->started = TRUE;
704   /* set the offset to -1 so that we always do a first seek. This is only used
705    * in random-access mode. */
706   priv->offset = -1;
707   priv->flushing = FALSE;
708   g_mutex_unlock (priv->mutex);
709
710   gst_base_src_set_format (bsrc, priv->format);
711
712   return TRUE;
713 }
714
715 static gboolean
716 gst_app_src_stop (GstBaseSrc * bsrc)
717 {
718   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
719   GstAppSrcPrivate *priv = appsrc->priv;
720
721   g_mutex_lock (priv->mutex);
722   GST_DEBUG_OBJECT (appsrc, "stopping");
723   priv->is_eos = FALSE;
724   priv->flushing = TRUE;
725   priv->started = FALSE;
726   gst_app_src_flush_queued (appsrc);
727   g_mutex_unlock (priv->mutex);
728
729   return TRUE;
730 }
731
732 static gboolean
733 gst_app_src_is_seekable (GstBaseSrc * src)
734 {
735   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
736   GstAppSrcPrivate *priv = appsrc->priv;
737   gboolean res = FALSE;
738
739   switch (priv->stream_type) {
740     case GST_APP_STREAM_TYPE_STREAM:
741       break;
742     case GST_APP_STREAM_TYPE_SEEKABLE:
743     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
744       res = TRUE;
745       break;
746   }
747   return res;
748 }
749
750 static gboolean
751 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
752 {
753   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
754
755   *size = gst_app_src_get_size (appsrc);
756
757   return TRUE;
758 }
759
760 static gboolean
761 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
762 {
763   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
764   GstAppSrcPrivate *priv = appsrc->priv;
765   gboolean res;
766
767   switch (GST_QUERY_TYPE (query)) {
768     case GST_QUERY_LATENCY:
769     {
770       GstClockTime min, max;
771       gboolean live;
772
773       /* Query the parent class for the defaults */
774       res = gst_base_src_query_latency (src, &live, &min, &max);
775
776       /* overwrite with our values when we need to */
777       g_mutex_lock (priv->mutex);
778       if (priv->min_latency != -1)
779         min = priv->min_latency;
780       if (priv->max_latency != -1)
781         max = priv->max_latency;
782       g_mutex_unlock (priv->mutex);
783
784       gst_query_set_latency (query, live, min, max);
785       break;
786     }
787     case GST_QUERY_SCHEDULING:
788     {
789       gboolean pull_mode = FALSE;
790
791       switch (priv->stream_type) {
792         case GST_APP_STREAM_TYPE_STREAM:
793         case GST_APP_STREAM_TYPE_SEEKABLE:
794           break;
795         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
796           pull_mode = TRUE;
797           break;
798       }
799       gst_query_set_scheduling (query, pull_mode, TRUE, FALSE, 1, -1, 1);
800       res = TRUE;
801       break;
802     }
803     default:
804       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
805       break;
806   }
807
808   return res;
809 }
810
811 /* will be called in push mode */
812 static gboolean
813 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
814 {
815   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
816   GstAppSrcPrivate *priv = appsrc->priv;
817   gint64 desired_position;
818   gboolean res = FALSE;
819
820   desired_position = segment->position;
821
822   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
823       desired_position, gst_format_get_name (segment->format));
824
825   /* no need to try to seek in streaming mode */
826   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
827     return TRUE;
828
829   if (priv->callbacks.seek_data)
830     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
831   else {
832     gboolean emit;
833
834     g_mutex_lock (priv->mutex);
835     emit = priv->emit_signals;
836     g_mutex_unlock (priv->mutex);
837
838     if (emit)
839       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
840           desired_position, &res);
841   }
842
843   if (res) {
844     GST_DEBUG_OBJECT (appsrc, "flushing queue");
845     gst_app_src_flush_queued (appsrc);
846     priv->is_eos = FALSE;
847   } else {
848     GST_WARNING_OBJECT (appsrc, "seek failed");
849   }
850
851   return res;
852 }
853
854 /* must be called with the appsrc mutex */
855 static gboolean
856 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
857 {
858   gboolean res = FALSE;
859   gboolean emit;
860   GstAppSrcPrivate *priv = appsrc->priv;
861
862   emit = priv->emit_signals;
863   g_mutex_unlock (priv->mutex);
864
865   GST_DEBUG_OBJECT (appsrc,
866       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
867       priv->offset, offset);
868
869   if (priv->callbacks.seek_data)
870     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
871   else if (emit)
872     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
873         offset, &res);
874
875   g_mutex_lock (priv->mutex);
876
877   return res;
878 }
879
880 /* must be called with the appsrc mutex. After this call things can be
881  * flushing */
882 static void
883 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
884 {
885   gboolean emit;
886   GstAppSrcPrivate *priv = appsrc->priv;
887
888   emit = priv->emit_signals;
889   g_mutex_unlock (priv->mutex);
890
891   /* we have no data, we need some. We fire the signal with the size hint. */
892   if (priv->callbacks.need_data)
893     priv->callbacks.need_data (appsrc, size, priv->user_data);
894   else if (emit)
895     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
896         NULL);
897
898   g_mutex_lock (priv->mutex);
899   /* we can be flushing now because we released the lock */
900 }
901
902 static GstFlowReturn
903 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
904     GstBuffer ** buf)
905 {
906   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
907   GstAppSrcPrivate *priv = appsrc->priv;
908   GstFlowReturn ret;
909   GstCaps *caps;
910
911   GST_OBJECT_LOCK (appsrc);
912   caps = priv->caps ? gst_caps_ref (priv->caps) : NULL;
913   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
914           bsrc->segment.format == GST_FORMAT_BYTES)) {
915     GST_DEBUG_OBJECT (appsrc,
916         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
917         bsrc->segment.duration, priv->size);
918     bsrc->segment.duration = priv->size;
919     GST_OBJECT_UNLOCK (appsrc);
920
921     gst_element_post_message (GST_ELEMENT (appsrc),
922         gst_message_new_duration (GST_OBJECT (appsrc), GST_FORMAT_BYTES,
923             priv->size));
924   } else {
925     GST_OBJECT_UNLOCK (appsrc);
926   }
927
928   g_mutex_lock (priv->mutex);
929   /* check flushing first */
930   if (G_UNLIKELY (priv->flushing))
931     goto flushing;
932
933   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
934     /* if we are dealing with a random-access stream, issue a seek if the offset
935      * changed. */
936     if (G_UNLIKELY (priv->offset != offset)) {
937       gboolean res;
938
939       /* do the seek */
940       res = gst_app_src_emit_seek (appsrc, offset);
941
942       if (G_UNLIKELY (!res))
943         /* failing to seek is fatal */
944         goto seek_error;
945
946       priv->offset = offset;
947     }
948   }
949
950   while (TRUE) {
951     /* return data as long as we have some */
952     if (!g_queue_is_empty (priv->queue)) {
953       guint buf_size;
954
955       *buf = g_queue_pop_head (priv->queue);
956       buf_size = gst_buffer_get_size (*buf);
957
958       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
959
960       priv->queued_bytes -= buf_size;
961
962       /* only update the offset when in random_access mode */
963       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
964         priv->offset += buf_size;
965
966       /* signal that we removed an item */
967       g_cond_broadcast (priv->cond);
968
969       /* see if we go lower than the empty-percent */
970       if (priv->min_percent && priv->max_bytes) {
971         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
972           /* ignore flushing state, we got a buffer and we will return it now.
973            * Errors will be handled in the next round */
974           gst_app_src_emit_need_data (appsrc, size);
975       }
976       ret = GST_FLOW_OK;
977       break;
978     } else {
979       gst_app_src_emit_need_data (appsrc, size);
980
981       /* we can be flushing now because we released the lock above */
982       if (G_UNLIKELY (priv->flushing))
983         goto flushing;
984
985       /* if we have a buffer now, continue the loop and try to return it. In
986        * random-access mode (where a buffer is normally pushed in the above
987        * signal) we can still be empty because the pushed buffer got flushed or
988        * when the application pushes the requested buffer later, we support both
989        * possiblities. */
990       if (!g_queue_is_empty (priv->queue))
991         continue;
992
993       /* no buffer yet, maybe we are EOS, if not, block for more data. */
994     }
995
996     /* check EOS */
997     if (G_UNLIKELY (priv->is_eos))
998       goto eos;
999
1000     /* nothing to return, wait a while for new data or flushing. */
1001     g_cond_wait (priv->cond, priv->mutex);
1002   }
1003   g_mutex_unlock (priv->mutex);
1004   if (caps)
1005     gst_caps_unref (caps);
1006   return ret;
1007
1008   /* ERRORS */
1009 flushing:
1010   {
1011     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1012     g_mutex_unlock (priv->mutex);
1013     if (caps)
1014       gst_caps_unref (caps);
1015     return GST_FLOW_WRONG_STATE;
1016   }
1017 eos:
1018   {
1019     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1020     g_mutex_unlock (priv->mutex);
1021     if (caps)
1022       gst_caps_unref (caps);
1023     return GST_FLOW_UNEXPECTED;
1024   }
1025 seek_error:
1026   {
1027     g_mutex_unlock (priv->mutex);
1028     if (caps)
1029       gst_caps_unref (caps);
1030     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1031         GST_ERROR_SYSTEM);
1032     return GST_FLOW_ERROR;
1033   }
1034 }
1035
1036 /* external API */
1037
1038 /**
1039  * gst_app_src_set_caps:
1040  * @appsrc: a #GstAppSrc
1041  * @caps: caps to set
1042  *
1043  * Set the capabilities on the appsrc element.  This function takes
1044  * a copy of the caps structure. After calling this method, the source will
1045  * only produce caps that match @caps. @caps must be fixed and the caps on the
1046  * buffers must match the caps or left NULL.
1047  *
1048  * Since: 0.10.22
1049  */
1050 void
1051 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1052 {
1053   GstCaps *old;
1054   GstAppSrcPrivate *priv;
1055
1056   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1057
1058   priv = appsrc->priv;
1059
1060   GST_OBJECT_LOCK (appsrc);
1061   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1062   if ((old = priv->caps) != caps) {
1063     if (caps)
1064       priv->caps = gst_caps_copy (caps);
1065     else
1066       priv->caps = NULL;
1067     if (old)
1068       gst_caps_unref (old);
1069   }
1070   GST_OBJECT_UNLOCK (appsrc);
1071 }
1072
1073 /**
1074  * gst_app_src_get_caps:
1075  * @appsrc: a #GstAppSrc
1076  *
1077  * Get the configured caps on @appsrc.
1078  *
1079  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1080  *
1081  * Since: 0.10.22
1082  */
1083 GstCaps *
1084 gst_app_src_get_caps (GstAppSrc * appsrc)
1085 {
1086   GstCaps *caps;
1087   GstAppSrcPrivate *priv;
1088
1089   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1090
1091   priv = appsrc->priv;
1092
1093   GST_OBJECT_LOCK (appsrc);
1094   if ((caps = priv->caps))
1095     gst_caps_ref (caps);
1096   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1097   GST_OBJECT_UNLOCK (appsrc);
1098
1099   return caps;
1100 }
1101
1102 /**
1103  * gst_app_src_set_size:
1104  * @appsrc: a #GstAppSrc
1105  * @size: the size to set
1106  *
1107  * Set the size of the stream in bytes. A value of -1 means that the size is
1108  * not known.
1109  *
1110  * Since: 0.10.22
1111  */
1112 void
1113 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1114 {
1115   GstAppSrcPrivate *priv;
1116
1117   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1118
1119   priv = appsrc->priv;
1120
1121   GST_OBJECT_LOCK (appsrc);
1122   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1123   priv->size = size;
1124   GST_OBJECT_UNLOCK (appsrc);
1125 }
1126
1127 /**
1128  * gst_app_src_get_size:
1129  * @appsrc: a #GstAppSrc
1130  *
1131  * Get the size of the stream in bytes. A value of -1 means that the size is
1132  * not known.
1133  *
1134  * Returns: the size of the stream previously set with gst_app_src_set_size();
1135  *
1136  * Since: 0.10.22
1137  */
1138 gint64
1139 gst_app_src_get_size (GstAppSrc * appsrc)
1140 {
1141   gint64 size;
1142   GstAppSrcPrivate *priv;
1143
1144   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1145
1146   priv = appsrc->priv;
1147
1148   GST_OBJECT_LOCK (appsrc);
1149   size = priv->size;
1150   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1151   GST_OBJECT_UNLOCK (appsrc);
1152
1153   return size;
1154 }
1155
1156 /**
1157  * gst_app_src_set_stream_type:
1158  * @appsrc: a #GstAppSrc
1159  * @type: the new state
1160  *
1161  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1162  * be connected to.
1163  *
1164  * A stream_type stream
1165  *
1166  * Since: 0.10.22
1167  */
1168 void
1169 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1170 {
1171   GstAppSrcPrivate *priv;
1172
1173   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1174
1175   priv = appsrc->priv;
1176
1177   GST_OBJECT_LOCK (appsrc);
1178   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1179   priv->stream_type = type;
1180   GST_OBJECT_UNLOCK (appsrc);
1181 }
1182
1183 /**
1184  * gst_app_src_get_stream_type:
1185  * @appsrc: a #GstAppSrc
1186  *
1187  * Get the stream type. Control the stream type of @appsrc
1188  * with gst_app_src_set_stream_type().
1189  *
1190  * Returns: the stream type.
1191  *
1192  * Since: 0.10.22
1193  */
1194 GstAppStreamType
1195 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1196 {
1197   gboolean stream_type;
1198   GstAppSrcPrivate *priv;
1199
1200   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1201
1202   priv = appsrc->priv;
1203
1204   GST_OBJECT_LOCK (appsrc);
1205   stream_type = priv->stream_type;
1206   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1207   GST_OBJECT_UNLOCK (appsrc);
1208
1209   return stream_type;
1210 }
1211
1212 /**
1213  * gst_app_src_set_max_bytes:
1214  * @appsrc: a #GstAppSrc
1215  * @max: the maximum number of bytes to queue
1216  *
1217  * Set the maximum amount of bytes that can be queued in @appsrc.
1218  * After the maximum amount of bytes are queued, @appsrc will emit the
1219  * "enough-data" signal.
1220  *
1221  * Since: 0.10.22
1222  */
1223 void
1224 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1225 {
1226   GstAppSrcPrivate *priv;
1227
1228   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1229
1230   priv = appsrc->priv;
1231
1232   g_mutex_lock (priv->mutex);
1233   if (max != priv->max_bytes) {
1234     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1235     priv->max_bytes = max;
1236     /* signal the change */
1237     g_cond_broadcast (priv->cond);
1238   }
1239   g_mutex_unlock (priv->mutex);
1240 }
1241
1242 /**
1243  * gst_app_src_get_max_bytes:
1244  * @appsrc: a #GstAppSrc
1245  *
1246  * Get the maximum amount of bytes that can be queued in @appsrc.
1247  *
1248  * Returns: The maximum amount of bytes that can be queued.
1249  *
1250  * Since: 0.10.22
1251  */
1252 guint64
1253 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1254 {
1255   guint64 result;
1256   GstAppSrcPrivate *priv;
1257
1258   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1259
1260   priv = appsrc->priv;
1261
1262   g_mutex_lock (priv->mutex);
1263   result = priv->max_bytes;
1264   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1265   g_mutex_unlock (priv->mutex);
1266
1267   return result;
1268 }
1269
1270 static void
1271 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1272     gboolean do_max, guint64 max)
1273 {
1274   GstAppSrcPrivate *priv = appsrc->priv;
1275   gboolean changed = FALSE;
1276
1277   g_mutex_lock (priv->mutex);
1278   if (do_min && priv->min_latency != min) {
1279     priv->min_latency = min;
1280     changed = TRUE;
1281   }
1282   if (do_max && priv->max_latency != max) {
1283     priv->max_latency = max;
1284     changed = TRUE;
1285   }
1286   g_mutex_unlock (priv->mutex);
1287
1288   if (changed) {
1289     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1290     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1291         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1292   }
1293 }
1294
1295 /**
1296  * gst_app_src_set_latency:
1297  * @appsrc: a #GstAppSrc
1298  * @min: the min latency
1299  * @max: the min latency
1300  *
1301  * Configure the @min and @max latency in @src. If @min is set to -1, the
1302  * default latency calculations for pseudo-live sources will be used.
1303  *
1304  * Since: 0.10.22
1305  */
1306 void
1307 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1308 {
1309   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1310 }
1311
1312 /**
1313  * gst_app_src_get_latency:
1314  * @appsrc: a #GstAppSrc
1315  * @min: the min latency
1316  * @max: the min latency
1317  *
1318  * Retrieve the min and max latencies in @min and @max respectively.
1319  *
1320  * Since: 0.10.22
1321  */
1322 void
1323 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1324 {
1325   GstAppSrcPrivate *priv;
1326
1327   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1328
1329   priv = appsrc->priv;
1330
1331   g_mutex_lock (priv->mutex);
1332   if (min)
1333     *min = priv->min_latency;
1334   if (max)
1335     *max = priv->max_latency;
1336   g_mutex_unlock (priv->mutex);
1337 }
1338
1339 /**
1340  * gst_app_src_set_emit_signals:
1341  * @appsrc: a #GstAppSrc
1342  * @emit: the new state
1343  *
1344  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1345  * by default disabled because signal emission is expensive and unneeded when
1346  * the application prefers to operate in pull mode.
1347  *
1348  * Since: 0.10.23
1349  */
1350 void
1351 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1352 {
1353   GstAppSrcPrivate *priv;
1354
1355   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1356
1357   priv = appsrc->priv;
1358
1359   g_mutex_lock (priv->mutex);
1360   priv->emit_signals = emit;
1361   g_mutex_unlock (priv->mutex);
1362 }
1363
1364 /**
1365  * gst_app_src_get_emit_signals:
1366  * @appsrc: a #GstAppSrc
1367  *
1368  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1369  *
1370  * Returns: %TRUE if @appsrc is emiting the "new-preroll" and "new-buffer"
1371  * signals.
1372  *
1373  * Since: 0.10.23
1374  */
1375 gboolean
1376 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1377 {
1378   gboolean result;
1379   GstAppSrcPrivate *priv;
1380
1381   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1382
1383   priv = appsrc->priv;
1384
1385   g_mutex_lock (priv->mutex);
1386   result = priv->emit_signals;
1387   g_mutex_unlock (priv->mutex);
1388
1389   return result;
1390 }
1391
1392 static GstFlowReturn
1393 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1394     gboolean steal_ref)
1395 {
1396   gboolean first = TRUE;
1397   GstAppSrcPrivate *priv;
1398
1399   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1400   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1401
1402   priv = appsrc->priv;
1403
1404   g_mutex_lock (priv->mutex);
1405
1406   while (TRUE) {
1407     /* can't accept buffers when we are flushing or EOS */
1408     if (priv->flushing)
1409       goto flushing;
1410
1411     if (priv->is_eos)
1412       goto eos;
1413
1414     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1415       GST_DEBUG_OBJECT (appsrc,
1416           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1417           priv->queued_bytes, priv->max_bytes);
1418
1419       if (first) {
1420         gboolean emit;
1421
1422         emit = priv->emit_signals;
1423         /* only signal on the first push */
1424         g_mutex_unlock (priv->mutex);
1425
1426         if (priv->callbacks.enough_data)
1427           priv->callbacks.enough_data (appsrc, priv->user_data);
1428         else if (emit)
1429           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1430               NULL);
1431
1432         g_mutex_lock (priv->mutex);
1433         /* continue to check for flushing/eos after releasing the lock */
1434         first = FALSE;
1435         continue;
1436       }
1437       if (priv->block) {
1438         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1439         /* we are filled, wait until a buffer gets popped or when we
1440          * flush. */
1441         g_cond_wait (priv->cond, priv->mutex);
1442       } else {
1443         /* no need to wait for free space, we just pump more data into the
1444          * queue hoping that the caller reacts to the enough-data signal and
1445          * stops pushing buffers. */
1446         break;
1447       }
1448     } else
1449       break;
1450   }
1451
1452   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1453   if (!steal_ref)
1454     gst_buffer_ref (buffer);
1455   g_queue_push_tail (priv->queue, buffer);
1456   priv->queued_bytes += gst_buffer_get_size (buffer);
1457   g_cond_broadcast (priv->cond);
1458   g_mutex_unlock (priv->mutex);
1459
1460   return GST_FLOW_OK;
1461
1462   /* ERRORS */
1463 flushing:
1464   {
1465     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1466     if (steal_ref)
1467       gst_buffer_unref (buffer);
1468     g_mutex_unlock (priv->mutex);
1469     return GST_FLOW_WRONG_STATE;
1470   }
1471 eos:
1472   {
1473     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1474     if (steal_ref)
1475       gst_buffer_unref (buffer);
1476     g_mutex_unlock (priv->mutex);
1477     return GST_FLOW_UNEXPECTED;
1478   }
1479 }
1480
1481 /**
1482  * gst_app_src_push_buffer:
1483  * @appsrc: a #GstAppSrc
1484  * @buffer: a #GstBuffer to push
1485  *
1486  * Adds a buffer to the queue of buffers that the appsrc element will
1487  * push to its source pad.  This function takes ownership of the buffer.
1488  *
1489  * When the block property is TRUE, this function can block until free
1490  * space becomes available in the queue.
1491  *
1492  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1493  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1494  * #GST_FLOW_UNEXPECTED when EOS occured.
1495  *
1496  * Since: 0.10.22
1497  */
1498 GstFlowReturn
1499 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1500 {
1501   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1502 }
1503
1504 /* push a buffer without stealing the ref of the buffer. This is used for the
1505  * action signal. */
1506 static GstFlowReturn
1507 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1508 {
1509   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1510 }
1511
1512 /**
1513  * gst_app_src_end_of_stream:
1514  * @appsrc: a #GstAppSrc
1515  *
1516  * Indicates to the appsrc element that the last buffer queued in the
1517  * element is the last buffer of the stream.
1518  *
1519  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1520  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1521  *
1522  * Since: 0.10.22
1523  */
1524 GstFlowReturn
1525 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1526 {
1527   GstAppSrcPrivate *priv;
1528
1529   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1530
1531   priv = appsrc->priv;
1532
1533   g_mutex_lock (priv->mutex);
1534   /* can't accept buffers when we are flushing. We can accept them when we are
1535    * EOS although it will not do anything. */
1536   if (priv->flushing)
1537     goto flushing;
1538
1539   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1540   priv->is_eos = TRUE;
1541   g_cond_broadcast (priv->cond);
1542   g_mutex_unlock (priv->mutex);
1543
1544   return GST_FLOW_OK;
1545
1546   /* ERRORS */
1547 flushing:
1548   {
1549     g_mutex_unlock (priv->mutex);
1550     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1551     return GST_FLOW_WRONG_STATE;
1552   }
1553 }
1554
1555 /**
1556  * gst_app_src_set_callbacks:
1557  * @appsrc: a #GstAppSrc
1558  * @callbacks: the callbacks
1559  * @user_data: a user_data argument for the callbacks
1560  * @notify: a destroy notify function
1561  *
1562  * Set callbacks which will be executed when data is needed, enough data has
1563  * been collected or when a seek should be performed.
1564  * This is an alternative to using the signals, it has lower overhead and is thus
1565  * less expensive, but also less flexible.
1566  *
1567  * If callbacks are installed, no signals will be emited for performance
1568  * reasons.
1569  *
1570  * Since: 0.10.23
1571  */
1572 void
1573 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1574     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1575 {
1576   GDestroyNotify old_notify;
1577   GstAppSrcPrivate *priv;
1578
1579   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1580   g_return_if_fail (callbacks != NULL);
1581
1582   priv = appsrc->priv;
1583
1584   GST_OBJECT_LOCK (appsrc);
1585   old_notify = priv->notify;
1586
1587   if (old_notify) {
1588     gpointer old_data;
1589
1590     old_data = priv->user_data;
1591
1592     priv->user_data = NULL;
1593     priv->notify = NULL;
1594     GST_OBJECT_UNLOCK (appsrc);
1595
1596     old_notify (old_data);
1597
1598     GST_OBJECT_LOCK (appsrc);
1599   }
1600   priv->callbacks = *callbacks;
1601   priv->user_data = user_data;
1602   priv->notify = notify;
1603   GST_OBJECT_UNLOCK (appsrc);
1604 }
1605
1606 /*** GSTURIHANDLER INTERFACE *************************************************/
1607
1608 static GstURIType
1609 gst_app_src_uri_get_type (GType type)
1610 {
1611   return GST_URI_SRC;
1612 }
1613
1614 static gchar **
1615 gst_app_src_uri_get_protocols (GType type)
1616 {
1617   static gchar *protocols[] = { (char *) "appsrc", NULL };
1618
1619   return protocols;
1620 }
1621
1622 static const gchar *
1623 gst_app_src_uri_get_uri (GstURIHandler * handler)
1624 {
1625   return "appsrc";
1626 }
1627
1628 static gboolean
1629 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1630 {
1631   gchar *protocol;
1632   gboolean ret;
1633
1634   protocol = gst_uri_get_protocol (uri);
1635   ret = !strcmp (protocol, "appsrc");
1636   g_free (protocol);
1637
1638   return ret;
1639 }
1640
1641 static void
1642 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1643 {
1644   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1645
1646   iface->get_type = gst_app_src_uri_get_type;
1647   iface->get_protocols = gst_app_src_uri_get_protocols;
1648   iface->get_uri = gst_app_src_uri_get_uri;
1649   iface->set_uri = gst_app_src_uri_set_uri;
1650 }