tizen 2.3.1 release
[framework/multimedia/gst-plugins-base0.10.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @short_description: Easy way for applications to inject buffers into a
23  *     pipeline
24  * @see_also: #GstBaseSrc, appsink
25  *
26  * The appsrc element can be used by applications to insert data into a
27  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
28  * external API functions.
29  *
30  * appsrc can be used by linking with the libgstapp library to access the
31  * methods directly or by using the appsrc action signals.
32  *
33  * Before operating appsrc, the caps property must be set to a fixed caps
34  * describing the format of the data that will be pushed with appsrc. An
35  * exception to this is when pushing buffers with unknown caps, in which case no
36  * caps should be set. This is typically true of file-like sources that push raw
37  * byte buffers.
38  *
39  * The main way of handing data to the appsrc element is by calling the
40  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
41  * This will put the buffer onto a queue from which appsrc will read from in its
42  * streaming thread. It is important to note that data transport will not happen
43  * from the thread that performed the push-buffer call.
44  *
45  * The "max-bytes" property controls how much data can be queued in appsrc
46  * before appsrc considers the queue full. A filled internal queue will always
47  * signal the "enough-data" signal, which signals the application that it should
48  * stop pushing data into appsrc. The "block" property will cause appsrc to
49  * block the push-buffer method until free data becomes available again.
50  *
51  * When the internal queue is running out of data, the "need-data" signal is
52  * emitted, which signals the application that it should start pushing more data
53  * into appsrc.
54  *
55  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
56  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
57  * "random-access". The signal argument will contain the new desired position in
58  * the stream expressed in the unit set with the "format" property. After
59  * receiving the seek-data signal, the application should push-buffers from the
60  * new position.
61  *
62  * These signals allow the application to operate the appsrc in two different
63  * ways:
64  *
65  * The push model, in which the application repeatedly calls the push-buffer method
66  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
67  * with the enough-data and need-data signals by respectively stopping/starting
68  * the push-buffer calls. This is a typical mode of operation for the
69  * stream-type "stream" and "seekable". Use this model when implementing various
70  * network protocols or hardware devices.
71  *
72  * The pull model where the need-data signal triggers the next push-buffer call.
73  * This mode is typically used in the "random-access" stream-type. Use this
74  * model for file access or other randomly accessable sources. In this mode, a
75  * buffer of exactly the amount of bytes given by the need-data signal should be
76  * pushed into appsrc.
77  *
78  * In all modes, the size property on appsrc should contain the total stream
79  * size in bytes. Setting this property is mandatory in the random-access mode.
80  * For the stream and seekable modes, setting this property is optional but
81  * recommended.
82  *
83  * When the application is finished pushing data into appsrc, it should call
84  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
85  * this call, no more buffers can be pushed into appsrc until a flushing seek
86  * happened or the state of the appsrc has gone through READY.
87  *
88  * Last reviewed on 2008-12-17 (0.10.10)
89  *
90  * Since: 0.10.22
91  */
92
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/base/gstbasesrc.h>
99
100 #include <string.h>
101
102 #include "gstapp-marshal.h"
103 #include "gstappsrc.h"
104
105 #include "gst/glib-compat-private.h"
106
107 struct _GstAppSrcPrivate
108 {
109   GCond *cond;
110   GMutex *mutex;
111   GQueue *queue;
112
113   GstCaps *caps;
114   gint64 size;
115   GstAppStreamType stream_type;
116   guint64 max_bytes;
117   GstFormat format;
118   gboolean block;
119
120   gboolean flushing;
121   gboolean started;
122   gboolean is_eos;
123   guint64 queued_bytes;
124   guint64 offset;
125   GstAppStreamType current_type;
126
127   guint64 min_latency;
128   guint64 max_latency;
129   gboolean emit_signals;
130   guint min_percent;
131
132   GstAppSrcCallbacks callbacks;
133   gpointer user_data;
134   GDestroyNotify notify;
135 };
136
137 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
138 #define GST_CAT_DEFAULT app_src_debug
139
140 enum
141 {
142   /* signals */
143   SIGNAL_NEED_DATA,
144   SIGNAL_ENOUGH_DATA,
145   SIGNAL_SEEK_DATA,
146
147   /* actions */
148   SIGNAL_PUSH_BUFFER,
149   SIGNAL_END_OF_STREAM,
150
151   LAST_SIGNAL
152 };
153
154 #define DEFAULT_PROP_SIZE          -1
155 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
156 #define DEFAULT_PROP_MAX_BYTES     200000
157 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
158 #define DEFAULT_PROP_BLOCK         FALSE
159 #define DEFAULT_PROP_IS_LIVE       FALSE
160 #define DEFAULT_PROP_MIN_LATENCY   -1
161 #define DEFAULT_PROP_MAX_LATENCY   -1
162 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
163 #define DEFAULT_PROP_MIN_PERCENT   0
164
165 enum
166 {
167   PROP_0,
168   PROP_CAPS,
169   PROP_SIZE,
170   PROP_STREAM_TYPE,
171   PROP_MAX_BYTES,
172   PROP_FORMAT,
173   PROP_BLOCK,
174   PROP_IS_LIVE,
175   PROP_MIN_LATENCY,
176   PROP_MAX_LATENCY,
177   PROP_EMIT_SIGNALS,
178   PROP_MIN_PERCENT,
179   PROP_LAST
180 };
181
182 static GstStaticPadTemplate gst_app_src_template =
183 GST_STATIC_PAD_TEMPLATE ("src",
184     GST_PAD_SRC,
185     GST_PAD_ALWAYS,
186     GST_STATIC_CAPS_ANY);
187
188 GType
189 gst_app_stream_type_get_type (void)
190 {
191   static volatile gsize stream_type_type = 0;
192   static const GEnumValue stream_type[] = {
193     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
194     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
195     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
196         "random-access"},
197     {0, NULL, NULL}
198   };
199
200   if (g_once_init_enter (&stream_type_type)) {
201     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
202     g_once_init_leave (&stream_type_type, tmp);
203   }
204
205   return (GType) stream_type_type;
206 }
207
208 static void gst_app_src_uri_handler_init (gpointer g_iface,
209     gpointer iface_data);
210
211 static void gst_app_src_dispose (GObject * object);
212 static void gst_app_src_finalize (GObject * object);
213
214 static void gst_app_src_set_property (GObject * object, guint prop_id,
215     const GValue * value, GParamSpec * pspec);
216 static void gst_app_src_get_property (GObject * object, guint prop_id,
217     GValue * value, GParamSpec * pspec);
218
219 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
220     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
221
222 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc);
223 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
224     guint64 offset, guint size, GstBuffer ** buf);
225 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
227 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
228 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
229 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
230 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
231 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
232 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
233 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
234
235 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
236     GstBuffer * buffer);
237 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
238
239 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
240
241 static void
242 _do_init (GType filesrc_type)
243 {
244   static const GInterfaceInfo urihandler_info = {
245     gst_app_src_uri_handler_init,
246     NULL,
247     NULL
248   };
249   g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
250       &urihandler_info);
251 }
252
253 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
254     _do_init);
255
256 static void
257 gst_app_src_base_init (gpointer g_class)
258 {
259   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
260
261   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
262
263   gst_element_class_set_details_simple (element_class, "AppSrc",
264       "Generic/Source", "Allow the application to feed buffers to a pipeline",
265       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
266
267   gst_element_class_add_static_pad_template (element_class,
268       &gst_app_src_template);
269 }
270
271 static void
272 gst_app_src_class_init (GstAppSrcClass * klass)
273 {
274   GObjectClass *gobject_class = (GObjectClass *) klass;
275   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
276   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);;
277
278   gobject_class->dispose = gst_app_src_dispose;
279   gobject_class->finalize = gst_app_src_finalize;
280
281   gobject_class->set_property = gst_app_src_set_property;
282   gobject_class->get_property = gst_app_src_get_property;
283   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_app_src_send_event);
284
285   /**
286    * GstAppSrc::caps
287    *
288    * The GstCaps that will negotiated downstream and will be put
289    * on outgoing buffers.
290    */
291   g_object_class_install_property (gobject_class, PROP_CAPS,
292       g_param_spec_boxed ("caps", "Caps",
293           "The allowed caps for the src pad", GST_TYPE_CAPS,
294           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
295   /**
296    * GstAppSrc::format
297    *
298    * The format to use for segment events. When the source is producing
299    * timestamped buffers this property should be set to GST_FORMAT_TIME.
300    */
301   g_object_class_install_property (gobject_class, PROP_FORMAT,
302       g_param_spec_enum ("format", "Format",
303           "The format of the segment events and seek", GST_TYPE_FORMAT,
304           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305   /**
306    * GstAppSrc::size
307    *
308    * The total size in bytes of the data stream. If the total size is known, it
309    * is recommended to configure it with this property.
310    */
311   g_object_class_install_property (gobject_class, PROP_SIZE,
312       g_param_spec_int64 ("size", "Size",
313           "The size of the data stream in bytes (-1 if unknown)",
314           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
315           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316   /**
317    * GstAppSrc::stream-type
318    *
319    * The type of stream that this source is producing.  For seekable streams the
320    * application should connect to the seek-data signal.
321    */
322   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
323       g_param_spec_enum ("stream-type", "Stream Type",
324           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
325           DEFAULT_PROP_STREAM_TYPE,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327   /**
328    * GstAppSrc::max-bytes
329    *
330    * The maximum amount of bytes that can be queued internally.
331    * After the maximum amount of bytes are queued, appsrc will emit the
332    * "enough-data" signal.
333    */
334   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
335       g_param_spec_uint64 ("max-bytes", "Max bytes",
336           "The maximum number of bytes to queue internally (0 = unlimited)",
337           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
338           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339   /**
340    * GstAppSrc::block
341    *
342    * When max-bytes are queued and after the enough-data signal has been emitted,
343    * block any further push-buffer calls until the amount of queued bytes drops
344    * below the max-bytes limit.
345    */
346   g_object_class_install_property (gobject_class, PROP_BLOCK,
347       g_param_spec_boolean ("block", "Block",
348           "Block push-buffer when max-bytes are queued",
349           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   /**
352    * GstAppSrc::is-live
353    *
354    * Instruct the source to behave like a live source. This includes that it
355    * will only push out buffers in the PLAYING state.
356    */
357   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
358       g_param_spec_boolean ("is-live", "Is Live",
359           "Whether to act as a live source",
360           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361   /**
362    * GstAppSrc::min-latency
363    *
364    * The minimum latency of the source. A value of -1 will use the default
365    * latency calculations of #GstBaseSrc.
366    */
367   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
368       g_param_spec_int64 ("min-latency", "Min Latency",
369           "The minimum latency (-1 = default)",
370           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
371           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372   /**
373    * GstAppSrc::max-latency
374    *
375    * The maximum latency of the source. A value of -1 means an unlimited amout
376    * of latency.
377    */
378   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
379       g_param_spec_int64 ("max-latency", "Max Latency",
380           "The maximum latency (-1 = unlimited)",
381           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383
384   /**
385    * GstAppSrc::emit-signals
386    *
387    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
388    * This option is by default enabled for backwards compatibility reasons but
389    * can disabled when needed because signal emission is expensive.
390    *
391    * Since: 0.10.23
392    */
393   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
394       g_param_spec_boolean ("emit-signals", "Emit signals",
395           "Emit need-data, enough-data and seek-data signals",
396           DEFAULT_PROP_EMIT_SIGNALS,
397           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   /**
400    * GstAppSrc::empty-percent
401    *
402    * Make appsrc emit the "need-data" signal when the amount of bytes in the
403    * queue drops below this percentage of max-bytes.
404    *
405    * Since: 0.10.27
406    */
407   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
408       g_param_spec_uint ("min-percent", "Min Percent",
409           "Emit need-data when queued bytes drops below this percent of max-bytes",
410           0, 100, DEFAULT_PROP_MIN_PERCENT,
411           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412
413   /**
414    * GstAppSrc::need-data:
415    * @appsrc: the appsrc element that emitted the signal
416    * @length: the amount of bytes needed.
417    *
418    * Signal that the source needs more data. In the callback or from another
419    * thread you should call push-buffer or end-of-stream.
420    *
421    * @length is just a hint and when it is set to -1, any number of bytes can be
422    * pushed into @appsrc.
423    *
424    * You can call push-buffer multiple times until the enough-data signal is
425    * fired.
426    */
427   gst_app_src_signals[SIGNAL_NEED_DATA] =
428       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
429       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
430       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
431
432   /**
433    * GstAppSrc::enough-data:
434    * @appsrc: the appsrc element that emitted the signal
435    *
436    * Signal that the source has enough data. It is recommended that the
437    * application stops calling push-buffer until the need-data signal is
438    * emitted again to avoid excessive buffer queueing.
439    */
440   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
441       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
442       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
443       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
444
445   /**
446    * GstAppSrc::seek-data:
447    * @appsrc: the appsrc element that emitted the signal
448    * @offset: the offset to seek to
449    *
450    * Seek to the given offset. The next push-buffer should produce buffers from
451    * the new @offset.
452    * This callback is only called for seekable stream types.
453    *
454    * Returns: %TRUE if the seek succeeded.
455    */
456   gst_app_src_signals[SIGNAL_SEEK_DATA] =
457       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
458       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
459       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
460       G_TYPE_UINT64);
461
462    /**
463     * GstAppSrc::push-buffer:
464     * @appsrc: the appsrc
465     * @buffer: a buffer to push
466     *
467     * Adds a buffer to the queue of buffers that the appsrc element will
468     * push to its source pad. This function does not take ownership of the
469     * buffer so the buffer needs to be unreffed after calling this function.
470     *
471     * When the block property is TRUE, this function can block until free space
472     * becomes available in the queue.
473     */
474   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
475       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
476       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
477           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
478       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
479
480    /**
481     * GstAppSrc::end-of-stream:
482     * @appsrc: the appsrc
483     *
484     * Notify @appsrc that no more buffer are available.
485     */
486   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
487       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
488       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
489           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
490       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
491
492   basesrc_class->get_caps = gst_app_src_internal_get_caps;
493   basesrc_class->create = gst_app_src_create;
494   basesrc_class->start = gst_app_src_start;
495   basesrc_class->stop = gst_app_src_stop;
496   basesrc_class->unlock = gst_app_src_unlock;
497   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
498   basesrc_class->do_seek = gst_app_src_do_seek;
499   basesrc_class->is_seekable = gst_app_src_is_seekable;
500   basesrc_class->check_get_range = gst_app_src_check_get_range;
501   basesrc_class->get_size = gst_app_src_do_get_size;
502   basesrc_class->get_size = gst_app_src_do_get_size;
503   basesrc_class->query = gst_app_src_query;
504
505   klass->push_buffer = gst_app_src_push_buffer_action;
506   klass->end_of_stream = gst_app_src_end_of_stream;
507
508   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
509 }
510
511 static void
512 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
513 {
514   GstAppSrcPrivate *priv;
515
516   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
517       GstAppSrcPrivate);
518
519   priv->mutex = g_mutex_new ();
520   priv->cond = g_cond_new ();
521   priv->queue = g_queue_new ();
522
523   priv->size = DEFAULT_PROP_SIZE;
524   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
525   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
526   priv->format = DEFAULT_PROP_FORMAT;
527   priv->block = DEFAULT_PROP_BLOCK;
528   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
529   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
530   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
531   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
532
533   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
534 }
535
536 static void
537 gst_app_src_flush_queued (GstAppSrc * src)
538 {
539   GstBuffer *buf;
540   GstAppSrcPrivate *priv = src->priv;
541
542   while ((buf = g_queue_pop_head (priv->queue)))
543     gst_buffer_unref (buf);
544   priv->queued_bytes = 0;
545 }
546
547 static void
548 gst_app_src_dispose (GObject * obj)
549 {
550   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
551   GstAppSrcPrivate *priv = appsrc->priv;
552
553   if (priv->caps) {
554     gst_caps_unref (priv->caps);
555     priv->caps = NULL;
556   }
557   gst_app_src_flush_queued (appsrc);
558
559   G_OBJECT_CLASS (parent_class)->dispose (obj);
560 }
561
562 static void
563 gst_app_src_finalize (GObject * obj)
564 {
565   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
566   GstAppSrcPrivate *priv = appsrc->priv;
567
568   g_mutex_free (priv->mutex);
569   g_cond_free (priv->cond);
570   g_queue_free (priv->queue);
571
572   G_OBJECT_CLASS (parent_class)->finalize (obj);
573 }
574
575 static GstCaps *
576 gst_app_src_internal_get_caps (GstBaseSrc * bsrc)
577 {
578   return gst_app_src_get_caps (GST_APP_SRC_CAST (bsrc));
579 }
580
581 static void
582 gst_app_src_set_property (GObject * object, guint prop_id,
583     const GValue * value, GParamSpec * pspec)
584 {
585   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
586   GstAppSrcPrivate *priv = appsrc->priv;
587
588   switch (prop_id) {
589     case PROP_CAPS:
590       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
591       break;
592     case PROP_SIZE:
593       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
594       break;
595     case PROP_STREAM_TYPE:
596       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
597       break;
598     case PROP_MAX_BYTES:
599       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
600       break;
601     case PROP_FORMAT:
602       priv->format = g_value_get_enum (value);
603       break;
604     case PROP_BLOCK:
605       priv->block = g_value_get_boolean (value);
606       break;
607     case PROP_IS_LIVE:
608       gst_base_src_set_live (GST_BASE_SRC (appsrc),
609           g_value_get_boolean (value));
610       break;
611     case PROP_MIN_LATENCY:
612       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
613           FALSE, -1);
614       break;
615     case PROP_MAX_LATENCY:
616       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
617           g_value_get_int64 (value));
618       break;
619     case PROP_EMIT_SIGNALS:
620       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
621       break;
622     case PROP_MIN_PERCENT:
623       priv->min_percent = g_value_get_uint (value);
624       break;
625     default:
626       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
627       break;
628   }
629 }
630
631 static void
632 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
633     GParamSpec * pspec)
634 {
635   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
636   GstAppSrcPrivate *priv = appsrc->priv;
637
638   switch (prop_id) {
639     case PROP_CAPS:
640     {
641       GstCaps *caps;
642
643       /* we're missing a _take_caps() function to transfer ownership */
644       caps = gst_app_src_get_caps (appsrc);
645       gst_value_set_caps (value, caps);
646       if (caps)
647         gst_caps_unref (caps);
648       break;
649     }
650     case PROP_SIZE:
651       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
652       break;
653     case PROP_STREAM_TYPE:
654       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
655       break;
656     case PROP_MAX_BYTES:
657       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
658       break;
659     case PROP_FORMAT:
660       g_value_set_enum (value, priv->format);
661       break;
662     case PROP_BLOCK:
663       g_value_set_boolean (value, priv->block);
664       break;
665     case PROP_IS_LIVE:
666       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
667       break;
668     case PROP_MIN_LATENCY:
669     {
670       guint64 min;
671
672       gst_app_src_get_latency (appsrc, &min, NULL);
673       g_value_set_int64 (value, min);
674       break;
675     }
676     case PROP_MAX_LATENCY:
677     {
678       guint64 max;
679
680       gst_app_src_get_latency (appsrc, &max, NULL);
681       g_value_set_int64 (value, max);
682       break;
683     }
684     case PROP_EMIT_SIGNALS:
685       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
686       break;
687     case PROP_MIN_PERCENT:
688       g_value_set_uint (value, priv->min_percent);
689       break;
690     default:
691       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
692       break;
693   }
694 }
695
696 static gboolean
697 gst_app_src_unlock (GstBaseSrc * bsrc)
698 {
699   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
700   GstAppSrcPrivate *priv = appsrc->priv;
701
702   g_mutex_lock (priv->mutex);
703   GST_DEBUG_OBJECT (appsrc, "unlock start");
704   priv->flushing = TRUE;
705   g_cond_broadcast (priv->cond);
706   g_mutex_unlock (priv->mutex);
707
708   return TRUE;
709 }
710
711 static gboolean
712 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
713 {
714   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
715   GstAppSrcPrivate *priv = appsrc->priv;
716
717   g_mutex_lock (priv->mutex);
718   GST_DEBUG_OBJECT (appsrc, "unlock stop");
719   priv->flushing = FALSE;
720   g_cond_broadcast (priv->cond);
721   g_mutex_unlock (priv->mutex);
722
723   return TRUE;
724 }
725
726 static gboolean
727 gst_app_src_start (GstBaseSrc * bsrc)
728 {
729   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
730   GstAppSrcPrivate *priv = appsrc->priv;
731
732   g_mutex_lock (priv->mutex);
733   GST_DEBUG_OBJECT (appsrc, "starting");
734   priv->started = TRUE;
735   /* set the offset to -1 so that we always do a first seek. This is only used
736    * in random-access mode. */
737   priv->offset = -1;
738   priv->flushing = FALSE;
739   g_mutex_unlock (priv->mutex);
740
741   gst_base_src_set_format (bsrc, priv->format);
742
743   return TRUE;
744 }
745
746 static gboolean
747 gst_app_src_stop (GstBaseSrc * bsrc)
748 {
749   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
750   GstAppSrcPrivate *priv = appsrc->priv;
751
752   g_mutex_lock (priv->mutex);
753   GST_DEBUG_OBJECT (appsrc, "stopping");
754   priv->is_eos = FALSE;
755   priv->flushing = TRUE;
756   priv->started = FALSE;
757   gst_app_src_flush_queued (appsrc);
758   g_mutex_unlock (priv->mutex);
759
760   return TRUE;
761 }
762
763 static gboolean
764 gst_app_src_is_seekable (GstBaseSrc * src)
765 {
766   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
767   GstAppSrcPrivate *priv = appsrc->priv;
768   gboolean res = FALSE;
769
770   switch (priv->stream_type) {
771     case GST_APP_STREAM_TYPE_STREAM:
772       break;
773     case GST_APP_STREAM_TYPE_SEEKABLE:
774     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
775       res = TRUE;
776       break;
777   }
778   return res;
779 }
780
781 static gboolean
782 gst_app_src_check_get_range (GstBaseSrc * src)
783 {
784   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
785   GstAppSrcPrivate *priv = appsrc->priv;
786   gboolean res = FALSE;
787
788   switch (priv->stream_type) {
789     case GST_APP_STREAM_TYPE_STREAM:
790     case GST_APP_STREAM_TYPE_SEEKABLE:
791       break;
792     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
793       res = TRUE;
794       break;
795   }
796   return res;
797 }
798
799 static gboolean
800 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
801 {
802   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
803
804   *size = gst_app_src_get_size (appsrc);
805
806   return TRUE;
807 }
808
809 static gboolean
810 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
811 {
812   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
813   GstAppSrcPrivate *priv = appsrc->priv;
814   gboolean res;
815
816   switch (GST_QUERY_TYPE (query)) {
817     case GST_QUERY_LATENCY:
818     {
819       GstClockTime min, max;
820       gboolean live;
821
822       /* Query the parent class for the defaults */
823       res = gst_base_src_query_latency (src, &live, &min, &max);
824
825       /* overwrite with our values when we need to */
826       g_mutex_lock (priv->mutex);
827       if (priv->min_latency != -1)
828         min = priv->min_latency;
829       if (priv->max_latency != -1)
830         max = priv->max_latency;
831       g_mutex_unlock (priv->mutex);
832
833       gst_query_set_latency (query, live, min, max);
834       break;
835     }
836     default:
837       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
838       break;
839   }
840
841   return res;
842 }
843
844 /* will be called in push mode */
845 static gboolean
846 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
847 {
848   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
849   GstAppSrcPrivate *priv = appsrc->priv;
850   gint64 desired_position;
851   gboolean res = FALSE;
852
853   desired_position = segment->last_stop;
854
855   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
856       desired_position, gst_format_get_name (segment->format));
857
858   /* no need to try to seek in streaming mode */
859   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
860     return TRUE;
861
862   if (priv->callbacks.seek_data)
863     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
864   else {
865     gboolean emit;
866
867     g_mutex_lock (priv->mutex);
868     emit = priv->emit_signals;
869     g_mutex_unlock (priv->mutex);
870
871     if (emit)
872       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
873           desired_position, &res);
874   }
875
876   if (res) {
877     GST_DEBUG_OBJECT (appsrc, "flushing queue");
878     gst_app_src_flush_queued (appsrc);
879     priv->is_eos = FALSE;
880   } else {
881     GST_WARNING_OBJECT (appsrc, "seek failed");
882   }
883
884   return res;
885 }
886
887 /* must be called with the appsrc mutex */
888 static gboolean
889 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
890 {
891   gboolean res = FALSE;
892   gboolean emit;
893   GstAppSrcPrivate *priv = appsrc->priv;
894
895   emit = priv->emit_signals;
896   g_mutex_unlock (priv->mutex);
897
898   GST_DEBUG_OBJECT (appsrc,
899       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
900       priv->offset, offset);
901
902   if (priv->callbacks.seek_data)
903     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
904   else if (emit)
905     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
906         offset, &res);
907
908   g_mutex_lock (priv->mutex);
909
910   return res;
911 }
912
913 /* must be called with the appsrc mutex. After this call things can be
914  * flushing */
915 static void
916 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
917 {
918   gboolean emit;
919   GstAppSrcPrivate *priv = appsrc->priv;
920
921   emit = priv->emit_signals;
922   g_mutex_unlock (priv->mutex);
923
924   /* we have no data, we need some. We fire the signal with the size hint. */
925   if (priv->callbacks.need_data)
926     priv->callbacks.need_data (appsrc, size, priv->user_data);
927   else if (emit)
928     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
929         NULL);
930
931   g_mutex_lock (priv->mutex);
932   /* we can be flushing now because we released the lock */
933 }
934
935 static GstFlowReturn
936 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
937     GstBuffer ** buf)
938 {
939   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
940   GstAppSrcPrivate *priv = appsrc->priv;
941   GstFlowReturn ret;
942   GstCaps *caps;
943
944   GST_OBJECT_LOCK (appsrc);
945   caps = priv->caps ? gst_caps_ref (priv->caps) : NULL;
946   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
947           bsrc->segment.format == GST_FORMAT_BYTES)) {
948     GST_DEBUG_OBJECT (appsrc,
949         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
950         bsrc->segment.duration, priv->size);
951     gst_segment_set_duration (&bsrc->segment, GST_FORMAT_BYTES, priv->size);
952     GST_OBJECT_UNLOCK (appsrc);
953
954     gst_element_post_message (GST_ELEMENT (appsrc),
955         gst_message_new_duration (GST_OBJECT (appsrc), GST_FORMAT_BYTES,
956             priv->size));
957   } else {
958     GST_OBJECT_UNLOCK (appsrc);
959   }
960
961   g_mutex_lock (priv->mutex);
962   /* check flushing first */
963   if (G_UNLIKELY (priv->flushing))
964     goto flushing;
965
966   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
967     /* if we are dealing with a random-access stream, issue a seek if the offset
968      * changed. */
969     if (G_UNLIKELY (priv->offset != offset)) {
970       gboolean res;
971
972       /* do the seek */
973       res = gst_app_src_emit_seek (appsrc, offset);
974
975       if (G_UNLIKELY (!res))
976         /* failing to seek is fatal */
977         goto seek_error;
978
979       priv->offset = offset;
980     }
981   }
982
983   while (TRUE) {
984     /* return data as long as we have some */
985     if (!g_queue_is_empty (priv->queue)) {
986       guint buf_size;
987
988       *buf = g_queue_pop_head (priv->queue);
989       buf_size = GST_BUFFER_SIZE (*buf);
990
991       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
992
993       priv->queued_bytes -= buf_size;
994
995       /* only update the offset when in random_access mode */
996       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
997         priv->offset += buf_size;
998       if (caps) {
999         *buf = gst_buffer_make_metadata_writable (*buf);
1000         gst_buffer_set_caps (*buf, caps);
1001       }
1002
1003       /* signal that we removed an item */
1004       g_cond_broadcast (priv->cond);
1005
1006       /* see if we go lower than the empty-percent */
1007       if (priv->min_percent && priv->max_bytes) {
1008         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
1009           /* ignore flushing state, we got a buffer and we will return it now.
1010            * Errors will be handled in the next round */
1011           gst_app_src_emit_need_data (appsrc, size);
1012       }
1013       ret = GST_FLOW_OK;
1014       break;
1015     } else {
1016       gst_app_src_emit_need_data (appsrc, size);
1017
1018       /* we can be flushing now because we released the lock above */
1019       if (G_UNLIKELY (priv->flushing))
1020         goto flushing;
1021
1022       /* if we have a buffer now, continue the loop and try to return it. In
1023        * random-access mode (where a buffer is normally pushed in the above
1024        * signal) we can still be empty because the pushed buffer got flushed or
1025        * when the application pushes the requested buffer later, we support both
1026        * possibilities. */
1027       if (!g_queue_is_empty (priv->queue))
1028         continue;
1029
1030       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1031     }
1032
1033     /* check EOS */
1034     if (G_UNLIKELY (priv->is_eos))
1035       goto eos;
1036
1037     /* nothing to return, wait a while for new data or flushing. */
1038     g_cond_wait (priv->cond, priv->mutex);
1039   }
1040   g_mutex_unlock (priv->mutex);
1041   if (caps)
1042     gst_caps_unref (caps);
1043   return ret;
1044
1045   /* ERRORS */
1046 flushing:
1047   {
1048     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1049     g_mutex_unlock (priv->mutex);
1050     if (caps)
1051       gst_caps_unref (caps);
1052     return GST_FLOW_WRONG_STATE;
1053   }
1054 eos:
1055   {
1056     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1057     g_mutex_unlock (priv->mutex);
1058     if (caps)
1059       gst_caps_unref (caps);
1060     return GST_FLOW_UNEXPECTED;
1061   }
1062 seek_error:
1063   {
1064     g_mutex_unlock (priv->mutex);
1065     if (caps)
1066       gst_caps_unref (caps);
1067     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1068         GST_ERROR_SYSTEM);
1069     return GST_FLOW_ERROR;
1070   }
1071 }
1072
1073 /* external API */
1074
1075 /**
1076  * gst_app_src_set_caps:
1077  * @appsrc: a #GstAppSrc
1078  * @caps: caps to set
1079  *
1080  * Set the capabilities on the appsrc element.  This function takes
1081  * a copy of the caps structure. After calling this method, the source will
1082  * only produce caps that match @caps. @caps must be fixed and the caps on the
1083  * buffers must match the caps or left NULL.
1084  *
1085  * Since: 0.10.22
1086  */
1087 void
1088 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1089 {
1090   GstCaps *old;
1091   GstAppSrcPrivate *priv;
1092
1093   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1094
1095   priv = appsrc->priv;
1096
1097   GST_OBJECT_LOCK (appsrc);
1098   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1099   if ((old = priv->caps) != caps) {
1100     if (caps)
1101       priv->caps = gst_caps_copy (caps);
1102     else
1103       priv->caps = NULL;
1104     if (old)
1105       gst_caps_unref (old);
1106   }
1107   GST_OBJECT_UNLOCK (appsrc);
1108 }
1109
1110 /**
1111  * gst_app_src_get_caps:
1112  * @appsrc: a #GstAppSrc
1113  *
1114  * Get the configured caps on @appsrc.
1115  *
1116  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1117  *
1118  * Since: 0.10.22
1119  */
1120 GstCaps *
1121 gst_app_src_get_caps (GstAppSrc * appsrc)
1122 {
1123   GstCaps *caps;
1124   GstAppSrcPrivate *priv;
1125
1126   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1127
1128   priv = appsrc->priv;
1129
1130   GST_OBJECT_LOCK (appsrc);
1131   if ((caps = priv->caps))
1132     gst_caps_ref (caps);
1133   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
1134   GST_OBJECT_UNLOCK (appsrc);
1135
1136   return caps;
1137 }
1138
1139 /**
1140  * gst_app_src_set_size:
1141  * @appsrc: a #GstAppSrc
1142  * @size: the size to set
1143  *
1144  * Set the size of the stream in bytes. A value of -1 means that the size is
1145  * not known.
1146  *
1147  * Since: 0.10.22
1148  */
1149 void
1150 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1151 {
1152   GstAppSrcPrivate *priv;
1153
1154   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1155
1156   priv = appsrc->priv;
1157
1158   GST_OBJECT_LOCK (appsrc);
1159   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1160   priv->size = size;
1161   GST_OBJECT_UNLOCK (appsrc);
1162 }
1163
1164 /**
1165  * gst_app_src_get_size:
1166  * @appsrc: a #GstAppSrc
1167  *
1168  * Get the size of the stream in bytes. A value of -1 means that the size is
1169  * not known.
1170  *
1171  * Returns: the size of the stream previously set with gst_app_src_set_size();
1172  *
1173  * Since: 0.10.22
1174  */
1175 gint64
1176 gst_app_src_get_size (GstAppSrc * appsrc)
1177 {
1178   gint64 size;
1179   GstAppSrcPrivate *priv;
1180
1181   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1182
1183   priv = appsrc->priv;
1184
1185   GST_OBJECT_LOCK (appsrc);
1186   size = priv->size;
1187   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1188   GST_OBJECT_UNLOCK (appsrc);
1189
1190   return size;
1191 }
1192
1193 /**
1194  * gst_app_src_set_stream_type:
1195  * @appsrc: a #GstAppSrc
1196  * @type: the new state
1197  *
1198  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1199  * be connected to.
1200  *
1201  * A stream_type stream
1202  *
1203  * Since: 0.10.22
1204  */
1205 void
1206 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1207 {
1208   GstAppSrcPrivate *priv;
1209
1210   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1211
1212   priv = appsrc->priv;
1213
1214   GST_OBJECT_LOCK (appsrc);
1215   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1216   priv->stream_type = type;
1217   GST_OBJECT_UNLOCK (appsrc);
1218 }
1219
1220 /**
1221  * gst_app_src_get_stream_type:
1222  * @appsrc: a #GstAppSrc
1223  *
1224  * Get the stream type. Control the stream type of @appsrc
1225  * with gst_app_src_set_stream_type().
1226  *
1227  * Returns: the stream type.
1228  *
1229  * Since: 0.10.22
1230  */
1231 GstAppStreamType
1232 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1233 {
1234   gboolean stream_type;
1235   GstAppSrcPrivate *priv;
1236
1237   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1238
1239   priv = appsrc->priv;
1240
1241   GST_OBJECT_LOCK (appsrc);
1242   stream_type = priv->stream_type;
1243   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1244   GST_OBJECT_UNLOCK (appsrc);
1245
1246   return stream_type;
1247 }
1248
1249 /**
1250  * gst_app_src_set_max_bytes:
1251  * @appsrc: a #GstAppSrc
1252  * @max: the maximum number of bytes to queue
1253  *
1254  * Set the maximum amount of bytes that can be queued in @appsrc.
1255  * After the maximum amount of bytes are queued, @appsrc will emit the
1256  * "enough-data" signal.
1257  *
1258  * Since: 0.10.22
1259  */
1260 void
1261 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1262 {
1263   GstAppSrcPrivate *priv;
1264
1265   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1266
1267   priv = appsrc->priv;
1268
1269   g_mutex_lock (priv->mutex);
1270   if (max != priv->max_bytes) {
1271     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1272     priv->max_bytes = max;
1273     /* signal the change */
1274     g_cond_broadcast (priv->cond);
1275   }
1276   g_mutex_unlock (priv->mutex);
1277 }
1278
1279 /**
1280  * gst_app_src_get_max_bytes:
1281  * @appsrc: a #GstAppSrc
1282  *
1283  * Get the maximum amount of bytes that can be queued in @appsrc.
1284  *
1285  * Returns: The maximum amount of bytes that can be queued.
1286  *
1287  * Since: 0.10.22
1288  */
1289 guint64
1290 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1291 {
1292   guint64 result;
1293   GstAppSrcPrivate *priv;
1294
1295   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1296
1297   priv = appsrc->priv;
1298
1299   g_mutex_lock (priv->mutex);
1300   result = priv->max_bytes;
1301   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1302   g_mutex_unlock (priv->mutex);
1303
1304   return result;
1305 }
1306
1307 static void
1308 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1309     gboolean do_max, guint64 max)
1310 {
1311   GstAppSrcPrivate *priv = appsrc->priv;
1312   gboolean changed = FALSE;
1313
1314   g_mutex_lock (priv->mutex);
1315   if (do_min && priv->min_latency != min) {
1316     priv->min_latency = min;
1317     changed = TRUE;
1318   }
1319   if (do_max && priv->max_latency != max) {
1320     priv->max_latency = max;
1321     changed = TRUE;
1322   }
1323   g_mutex_unlock (priv->mutex);
1324
1325   if (changed) {
1326     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1327     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1328         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1329   }
1330 }
1331
1332 /**
1333  * gst_app_src_set_latency:
1334  * @appsrc: a #GstAppSrc
1335  * @min: the min latency
1336  * @max: the min latency
1337  *
1338  * Configure the @min and @max latency in @src. If @min is set to -1, the
1339  * default latency calculations for pseudo-live sources will be used.
1340  *
1341  * Since: 0.10.22
1342  */
1343 void
1344 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1345 {
1346   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1347 }
1348
1349 /**
1350  * gst_app_src_get_latency:
1351  * @appsrc: a #GstAppSrc
1352  * @min: the min latency
1353  * @max: the min latency
1354  *
1355  * Retrieve the min and max latencies in @min and @max respectively.
1356  *
1357  * Since: 0.10.22
1358  */
1359 void
1360 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1361 {
1362   GstAppSrcPrivate *priv;
1363
1364   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1365
1366   priv = appsrc->priv;
1367
1368   g_mutex_lock (priv->mutex);
1369   if (min)
1370     *min = priv->min_latency;
1371   if (max)
1372     *max = priv->max_latency;
1373   g_mutex_unlock (priv->mutex);
1374 }
1375
1376 /**
1377  * gst_app_src_set_emit_signals:
1378  * @appsrc: a #GstAppSrc
1379  * @emit: the new state
1380  *
1381  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1382  * by default disabled because signal emission is expensive and unneeded when
1383  * the application prefers to operate in pull mode.
1384  *
1385  * Since: 0.10.23
1386  */
1387 void
1388 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1389 {
1390   GstAppSrcPrivate *priv;
1391
1392   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1393
1394   priv = appsrc->priv;
1395
1396   g_mutex_lock (priv->mutex);
1397   priv->emit_signals = emit;
1398   g_mutex_unlock (priv->mutex);
1399 }
1400
1401 /**
1402  * gst_app_src_get_emit_signals:
1403  * @appsrc: a #GstAppSrc
1404  *
1405  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1406  *
1407  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
1408  * signals.
1409  *
1410  * Since: 0.10.23
1411  */
1412 gboolean
1413 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1414 {
1415   gboolean result;
1416   GstAppSrcPrivate *priv;
1417
1418   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1419
1420   priv = appsrc->priv;
1421
1422   g_mutex_lock (priv->mutex);
1423   result = priv->emit_signals;
1424   g_mutex_unlock (priv->mutex);
1425
1426   return result;
1427 }
1428
1429 static GstFlowReturn
1430 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1431     gboolean steal_ref)
1432 {
1433   gboolean first = TRUE;
1434   GstAppSrcPrivate *priv;
1435
1436   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1437   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1438
1439   priv = appsrc->priv;
1440
1441   g_mutex_lock (priv->mutex);
1442
1443   while (TRUE) {
1444     /* can't accept buffers when we are flushing or EOS */
1445     if (priv->flushing)
1446       goto flushing;
1447
1448     if (priv->is_eos)
1449       goto eos;
1450
1451     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1452       GST_DEBUG_OBJECT (appsrc,
1453           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1454           priv->queued_bytes, priv->max_bytes);
1455
1456       if (first) {
1457         gboolean emit;
1458
1459         emit = priv->emit_signals;
1460         /* only signal on the first push */
1461         g_mutex_unlock (priv->mutex);
1462
1463         if (priv->callbacks.enough_data)
1464           priv->callbacks.enough_data (appsrc, priv->user_data);
1465         else if (emit)
1466           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1467               NULL);
1468
1469         g_mutex_lock (priv->mutex);
1470         /* continue to check for flushing/eos after releasing the lock */
1471         first = FALSE;
1472         continue;
1473       }
1474       if (priv->block) {
1475         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1476         /* we are filled, wait until a buffer gets popped or when we
1477          * flush. */
1478         g_cond_wait (priv->cond, priv->mutex);
1479       } else {
1480         /* no need to wait for free space, we just pump more data into the
1481          * queue hoping that the caller reacts to the enough-data signal and
1482          * stops pushing buffers. */
1483         break;
1484       }
1485     } else
1486       break;
1487   }
1488
1489   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1490   if (!steal_ref)
1491     gst_buffer_ref (buffer);
1492   g_queue_push_tail (priv->queue, buffer);
1493   priv->queued_bytes += GST_BUFFER_SIZE (buffer);
1494   g_cond_broadcast (priv->cond);
1495   g_mutex_unlock (priv->mutex);
1496
1497   return GST_FLOW_OK;
1498
1499   /* ERRORS */
1500 flushing:
1501   {
1502     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1503     if (steal_ref)
1504       gst_buffer_unref (buffer);
1505     g_mutex_unlock (priv->mutex);
1506     return GST_FLOW_WRONG_STATE;
1507   }
1508 eos:
1509   {
1510     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1511     if (steal_ref)
1512       gst_buffer_unref (buffer);
1513     g_mutex_unlock (priv->mutex);
1514     return GST_FLOW_UNEXPECTED;
1515   }
1516 }
1517
1518 /**
1519  * gst_app_src_push_buffer:
1520  * @appsrc: a #GstAppSrc
1521  * @buffer: a #GstBuffer to push
1522  *
1523  * Adds a buffer to the queue of buffers that the appsrc element will
1524  * push to its source pad.  This function takes ownership of the buffer.
1525  *
1526  * When the block property is TRUE, this function can block until free
1527  * space becomes available in the queue.
1528  *
1529  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1530  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1531  * #GST_FLOW_UNEXPECTED when EOS occured.
1532  *
1533  * Since: 0.10.22
1534  */
1535 GstFlowReturn
1536 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1537 {
1538   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1539 }
1540
1541 /* push a buffer without stealing the ref of the buffer. This is used for the
1542  * action signal. */
1543 static GstFlowReturn
1544 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1545 {
1546   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1547 }
1548
1549 /**
1550  * gst_app_src_end_of_stream:
1551  * @appsrc: a #GstAppSrc
1552  *
1553  * Indicates to the appsrc element that the last buffer queued in the
1554  * element is the last buffer of the stream.
1555  *
1556  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1557  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
1558  *
1559  * Since: 0.10.22
1560  */
1561 GstFlowReturn
1562 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1563 {
1564   GstAppSrcPrivate *priv;
1565
1566   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1567
1568   priv = appsrc->priv;
1569
1570   g_mutex_lock (priv->mutex);
1571   /* can't accept buffers when we are flushing. We can accept them when we are
1572    * EOS although it will not do anything. */
1573   if (priv->flushing)
1574     goto flushing;
1575
1576   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1577   priv->is_eos = TRUE;
1578   g_cond_broadcast (priv->cond);
1579   g_mutex_unlock (priv->mutex);
1580
1581   return GST_FLOW_OK;
1582
1583   /* ERRORS */
1584 flushing:
1585   {
1586     g_mutex_unlock (priv->mutex);
1587     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1588     return GST_FLOW_WRONG_STATE;
1589   }
1590 }
1591
1592 /**
1593  * gst_app_src_set_callbacks:
1594  * @appsrc: a #GstAppSrc
1595  * @callbacks: the callbacks
1596  * @user_data: a user_data argument for the callbacks
1597  * @notify: a destroy notify function
1598  *
1599  * Set callbacks which will be executed when data is needed, enough data has
1600  * been collected or when a seek should be performed.
1601  * This is an alternative to using the signals, it has lower overhead and is thus
1602  * less expensive, but also less flexible.
1603  *
1604  * If callbacks are installed, no signals will be emitted for performance
1605  * reasons.
1606  *
1607  * Since: 0.10.23
1608  */
1609 void
1610 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1611     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1612 {
1613   GDestroyNotify old_notify;
1614   GstAppSrcPrivate *priv;
1615
1616   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1617   g_return_if_fail (callbacks != NULL);
1618
1619   priv = appsrc->priv;
1620
1621   GST_OBJECT_LOCK (appsrc);
1622   old_notify = priv->notify;
1623
1624   if (old_notify) {
1625     gpointer old_data;
1626
1627     old_data = priv->user_data;
1628
1629     priv->user_data = NULL;
1630     priv->notify = NULL;
1631     GST_OBJECT_UNLOCK (appsrc);
1632
1633     old_notify (old_data);
1634
1635     GST_OBJECT_LOCK (appsrc);
1636   }
1637   priv->callbacks = *callbacks;
1638   priv->user_data = user_data;
1639   priv->notify = notify;
1640   GST_OBJECT_UNLOCK (appsrc);
1641 }
1642
1643 static gboolean
1644 gst_app_src_send_event (GstElement * element, GstEvent * event)
1645 {
1646   GstAppSrc *appsrc;
1647   gboolean result = FALSE;
1648   GstBaseSrc *src;
1649
1650   src = GST_BASE_SRC (element);
1651   appsrc = GST_APP_SRC (element);
1652   GstAppSrcPrivate *priv = appsrc->priv;
1653
1654   GST_DEBUG_OBJECT (appsrc, "handling event %p %" GST_PTR_FORMAT, event, event);
1655
1656   switch (GST_EVENT_TYPE (event)) {
1657       /* bidirectional events */
1658     case GST_EVENT_FLUSH_STOP:
1659       g_mutex_lock (priv->mutex);
1660       GST_INFO_OBJECT (appsrc, "flushing internal queue");
1661       gst_app_src_flush_queued (appsrc);
1662       priv->flushing = FALSE;
1663       g_mutex_unlock (priv->mutex);
1664       result = gst_pad_push_event (src ->srcpad, event);
1665       break;
1666       /* fallback */
1667     case GST_EVENT_FLUSH_START:
1668       g_mutex_lock (priv->mutex);
1669       priv->flushing = TRUE;
1670       g_mutex_unlock (priv->mutex);
1671       result = gst_pad_push_event (src ->srcpad, event);
1672       break;
1673     case GST_EVENT_NEWSEGMENT:
1674       result = gst_pad_push_event (src ->srcpad, event);
1675       break;
1676     default:
1677       result = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1678       break;
1679   }
1680
1681   return result;
1682 }
1683
1684 /*** GSTURIHANDLER INTERFACE *************************************************/
1685
1686 static GstURIType
1687 gst_app_src_uri_get_type (void)
1688 {
1689   return GST_URI_SRC;
1690 }
1691
1692 static gchar **
1693 gst_app_src_uri_get_protocols (void)
1694 {
1695   static gchar *protocols[] = { (char *) "appsrc", NULL };
1696
1697   return protocols;
1698 }
1699
1700 static const gchar *
1701 gst_app_src_uri_get_uri (GstURIHandler * handler)
1702 {
1703   return "appsrc";
1704 }
1705
1706 static gboolean
1707 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1708 {
1709   gchar *protocol;
1710   gboolean ret;
1711
1712   protocol = gst_uri_get_protocol (uri);
1713   ret = !strcmp (protocol, "appsrc");
1714   g_free (protocol);
1715
1716   return ret;
1717 }
1718
1719 static void
1720 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1721 {
1722   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1723
1724   iface->get_type = gst_app_src_uri_get_type;
1725   iface->get_protocols = gst_app_src_uri_get_protocols;
1726   iface->get_uri = gst_app_src_uri_get_uri;
1727   iface->set_uri = gst_app_src_uri_set_uri;
1728 }