appsrc: fix deadlock setting pipeline in NULL state with block=true
[platform/upstream/gstreamer.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @short_description: Easy way for applications to inject buffers into a
23  *     pipeline
24  * @see_also: #GstBaseSrc, appsink
25  *
26  * The appsrc element can be used by applications to insert data into a
27  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
28  * external API functions.
29  *
30  * appsrc can be used by linking with the libgstapp library to access the
31  * methods directly or by using the appsrc action signals.
32  *
33  * Before operating appsrc, the caps property must be set to a fixed caps
34  * describing the format of the data that will be pushed with appsrc. An
35  * exception to this is when pushing buffers with unknown caps, in which case no
36  * caps should be set. This is typically true of file-like sources that push raw
37  * byte buffers.
38  *
39  * The main way of handing data to the appsrc element is by calling the
40  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
41  * This will put the buffer onto a queue from which appsrc will read from in its
42  * streaming thread. It is important to note that data transport will not happen
43  * from the thread that performed the push-buffer call.
44  *
45  * The "max-bytes" property controls how much data can be queued in appsrc
46  * before appsrc considers the queue full. A filled internal queue will always
47  * signal the "enough-data" signal, which signals the application that it should
48  * stop pushing data into appsrc. The "block" property will cause appsrc to
49  * block the push-buffer method until free data becomes available again.
50  *
51  * When the internal queue is running out of data, the "need-data" signal is
52  * emitted, which signals the application that it should start pushing more data
53  * into appsrc.
54  *
55  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
56  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
57  * "random-access". The signal argument will contain the new desired position in
58  * the stream expressed in the unit set with the "format" property. After
59  * receiving the seek-data signal, the application should push-buffers from the
60  * new position.
61  *
62  * These signals allow the application to operate the appsrc in two different
63  * ways:
64  *
65  * The push model, in which the application repeatedly calls the push-buffer method
66  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
67  * with the enough-data and need-data signals by respectively stopping/starting
68  * the push-buffer calls. This is a typical mode of operation for the
69  * stream-type "stream" and "seekable". Use this model when implementing various
70  * network protocols or hardware devices.
71  *
72  * The pull model where the need-data signal triggers the next push-buffer call.
73  * This mode is typically used in the "random-access" stream-type. Use this
74  * model for file access or other randomly accessable sources. In this mode, a
75  * buffer of exactly the amount of bytes given by the need-data signal should be
76  * pushed into appsrc.
77  *
78  * In all modes, the size property on appsrc should contain the total stream
79  * size in bytes. Setting this property is mandatory in the random-access mode.
80  * For the stream and seekable modes, setting this property is optional but
81  * recommended.
82  *
83  * When the application is finished pushing data into appsrc, it should call
84  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
85  * this call, no more buffers can be pushed into appsrc until a flushing seek
86  * happened or the state of the appsrc has gone through READY.
87  *
88  * Last reviewed on 2008-12-17 (0.10.10)
89  */
90
91 #ifdef HAVE_CONFIG_H
92 #include "config.h"
93 #endif
94
95 #include <gst/gst.h>
96 #include <gst/base/gstbasesrc.h>
97
98 #include <string.h>
99
100 #include "gstapp-marshal.h"
101 #include "gstappsrc.h"
102
103 struct _GstAppSrcPrivate
104 {
105   GCond cond;
106   GMutex mutex;
107   GQueue *queue;
108
109   GstCaps *caps;
110   gint64 size;
111   GstAppStreamType stream_type;
112   guint64 max_bytes;
113   GstFormat format;
114   gboolean block;
115   gchar *uri;
116   gboolean new_caps;
117
118   gboolean flushing;
119   gboolean started;
120   gboolean is_eos;
121   guint64 queued_bytes;
122   guint64 offset;
123   GstAppStreamType current_type;
124
125   guint64 min_latency;
126   guint64 max_latency;
127   gboolean emit_signals;
128   guint min_percent;
129
130   GstAppSrcCallbacks callbacks;
131   gpointer user_data;
132   GDestroyNotify notify;
133 };
134
135 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
136 #define GST_CAT_DEFAULT app_src_debug
137
138 enum
139 {
140   /* signals */
141   SIGNAL_NEED_DATA,
142   SIGNAL_ENOUGH_DATA,
143   SIGNAL_SEEK_DATA,
144
145   /* actions */
146   SIGNAL_PUSH_BUFFER,
147   SIGNAL_END_OF_STREAM,
148
149   LAST_SIGNAL
150 };
151
152 #define DEFAULT_PROP_SIZE          -1
153 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
154 #define DEFAULT_PROP_MAX_BYTES     200000
155 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
156 #define DEFAULT_PROP_BLOCK         FALSE
157 #define DEFAULT_PROP_IS_LIVE       FALSE
158 #define DEFAULT_PROP_MIN_LATENCY   -1
159 #define DEFAULT_PROP_MAX_LATENCY   -1
160 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
161 #define DEFAULT_PROP_MIN_PERCENT   0
162
163 enum
164 {
165   PROP_0,
166   PROP_CAPS,
167   PROP_SIZE,
168   PROP_STREAM_TYPE,
169   PROP_MAX_BYTES,
170   PROP_FORMAT,
171   PROP_BLOCK,
172   PROP_IS_LIVE,
173   PROP_MIN_LATENCY,
174   PROP_MAX_LATENCY,
175   PROP_EMIT_SIGNALS,
176   PROP_MIN_PERCENT,
177   PROP_LAST
178 };
179
180 static GstStaticPadTemplate gst_app_src_template =
181 GST_STATIC_PAD_TEMPLATE ("src",
182     GST_PAD_SRC,
183     GST_PAD_ALWAYS,
184     GST_STATIC_CAPS_ANY);
185
186 GType
187 gst_app_stream_type_get_type (void)
188 {
189   static volatile gsize stream_type_type = 0;
190   static const GEnumValue stream_type[] = {
191     {GST_APP_STREAM_TYPE_STREAM, "GST_APP_STREAM_TYPE_STREAM", "stream"},
192     {GST_APP_STREAM_TYPE_SEEKABLE, "GST_APP_STREAM_TYPE_SEEKABLE", "seekable"},
193     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "GST_APP_STREAM_TYPE_RANDOM_ACCESS",
194         "random-access"},
195     {0, NULL, NULL}
196   };
197
198   if (g_once_init_enter (&stream_type_type)) {
199     GType tmp = g_enum_register_static ("GstAppStreamType", stream_type);
200     g_once_init_leave (&stream_type_type, tmp);
201   }
202
203   return (GType) stream_type_type;
204 }
205
206 static void gst_app_src_uri_handler_init (gpointer g_iface,
207     gpointer iface_data);
208
209 static void gst_app_src_dispose (GObject * object);
210 static void gst_app_src_finalize (GObject * object);
211
212 static void gst_app_src_set_property (GObject * object, guint prop_id,
213     const GValue * value, GParamSpec * pspec);
214 static void gst_app_src_get_property (GObject * object, guint prop_id,
215     GValue * value, GParamSpec * pspec);
216
217 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
218     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
219
220 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
221 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
222     GstCaps * filter);
223 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
224     guint size, GstBuffer ** buf);
225 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
227 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
228 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
229 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
230 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
231 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
232 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
233
234 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
235     GstBuffer * buffer);
236
237 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
238
239 #define gst_app_src_parent_class parent_class
240 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
241     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
242
243 static void
244 gst_app_src_class_init (GstAppSrcClass * klass)
245 {
246   GObjectClass *gobject_class = (GObjectClass *) klass;
247   GstElementClass *element_class = (GstElementClass *) klass;
248   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
249
250   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
251
252   gobject_class->dispose = gst_app_src_dispose;
253   gobject_class->finalize = gst_app_src_finalize;
254
255   gobject_class->set_property = gst_app_src_set_property;
256   gobject_class->get_property = gst_app_src_get_property;
257
258   /**
259    * GstAppSrc::caps:
260    *
261    * The GstCaps that will negotiated downstream and will be put
262    * on outgoing buffers.
263    */
264   g_object_class_install_property (gobject_class, PROP_CAPS,
265       g_param_spec_boxed ("caps", "Caps",
266           "The allowed caps for the src pad", GST_TYPE_CAPS,
267           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
268   /**
269    * GstAppSrc::format:
270    *
271    * The format to use for segment events. When the source is producing
272    * timestamped buffers this property should be set to GST_FORMAT_TIME.
273    */
274   g_object_class_install_property (gobject_class, PROP_FORMAT,
275       g_param_spec_enum ("format", "Format",
276           "The format of the segment events and seek", GST_TYPE_FORMAT,
277           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278   /**
279    * GstAppSrc::size:
280    *
281    * The total size in bytes of the data stream. If the total size is known, it
282    * is recommended to configure it with this property.
283    */
284   g_object_class_install_property (gobject_class, PROP_SIZE,
285       g_param_spec_int64 ("size", "Size",
286           "The size of the data stream in bytes (-1 if unknown)",
287           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
288           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289   /**
290    * GstAppSrc::stream-type:
291    *
292    * The type of stream that this source is producing.  For seekable streams the
293    * application should connect to the seek-data signal.
294    */
295   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
296       g_param_spec_enum ("stream-type", "Stream Type",
297           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
298           DEFAULT_PROP_STREAM_TYPE,
299           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   /**
301    * GstAppSrc::max-bytes:
302    *
303    * The maximum amount of bytes that can be queued internally.
304    * After the maximum amount of bytes are queued, appsrc will emit the
305    * "enough-data" signal.
306    */
307   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
308       g_param_spec_uint64 ("max-bytes", "Max bytes",
309           "The maximum number of bytes to queue internally (0 = unlimited)",
310           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
311           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
312   /**
313    * GstAppSrc::block:
314    *
315    * When max-bytes are queued and after the enough-data signal has been emitted,
316    * block any further push-buffer calls until the amount of queued bytes drops
317    * below the max-bytes limit.
318    */
319   g_object_class_install_property (gobject_class, PROP_BLOCK,
320       g_param_spec_boolean ("block", "Block",
321           "Block push-buffer when max-bytes are queued",
322           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
323
324   /**
325    * GstAppSrc::is-live:
326    *
327    * Instruct the source to behave like a live source. This includes that it
328    * will only push out buffers in the PLAYING state.
329    */
330   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
331       g_param_spec_boolean ("is-live", "Is Live",
332           "Whether to act as a live source",
333           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334   /**
335    * GstAppSrc::min-latency:
336    *
337    * The minimum latency of the source. A value of -1 will use the default
338    * latency calculations of #GstBaseSrc.
339    */
340   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
341       g_param_spec_int64 ("min-latency", "Min Latency",
342           "The minimum latency (-1 = default)",
343           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
344           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345   /**
346    * GstAppSrc::max-latency:
347    *
348    * The maximum latency of the source. A value of -1 means an unlimited amout
349    * of latency.
350    */
351   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
352       g_param_spec_int64 ("max-latency", "Max Latency",
353           "The maximum latency (-1 = unlimited)",
354           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   /**
358    * GstAppSrc::emit-signals:
359    *
360    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
361    * This option is by default enabled for backwards compatibility reasons but
362    * can disabled when needed because signal emission is expensive.
363    */
364   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
365       g_param_spec_boolean ("emit-signals", "Emit signals",
366           "Emit need-data, enough-data and seek-data signals",
367           DEFAULT_PROP_EMIT_SIGNALS,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369
370   /**
371    * GstAppSrc::empty-percent:
372    *
373    * Make appsrc emit the "need-data" signal when the amount of bytes in the
374    * queue drops below this percentage of max-bytes.
375    */
376   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
377       g_param_spec_uint ("min-percent", "Min Percent",
378           "Emit need-data when queued bytes drops below this percent of max-bytes",
379           0, 100, DEFAULT_PROP_MIN_PERCENT,
380           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381
382   /**
383    * GstAppSrc::need-data:
384    * @appsrc: the appsrc element that emitted the signal
385    * @length: the amount of bytes needed.
386    *
387    * Signal that the source needs more data. In the callback or from another
388    * thread you should call push-buffer or end-of-stream.
389    *
390    * @length is just a hint and when it is set to -1, any number of bytes can be
391    * pushed into @appsrc.
392    *
393    * You can call push-buffer multiple times until the enough-data signal is
394    * fired.
395    */
396   gst_app_src_signals[SIGNAL_NEED_DATA] =
397       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
398       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
399       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
400
401   /**
402    * GstAppSrc::enough-data:
403    * @appsrc: the appsrc element that emitted the signal
404    *
405    * Signal that the source has enough data. It is recommended that the
406    * application stops calling push-buffer until the need-data signal is
407    * emitted again to avoid excessive buffer queueing.
408    */
409   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
410       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
411       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
412       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
413
414   /**
415    * GstAppSrc::seek-data:
416    * @appsrc: the appsrc element that emitted the signal
417    * @offset: the offset to seek to
418    *
419    * Seek to the given offset. The next push-buffer should produce buffers from
420    * the new @offset.
421    * This callback is only called for seekable stream types.
422    *
423    * Returns: %TRUE if the seek succeeded.
424    */
425   gst_app_src_signals[SIGNAL_SEEK_DATA] =
426       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
427       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
428       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
429       G_TYPE_UINT64);
430
431    /**
432     * GstAppSrc::push-buffer:
433     * @appsrc: the appsrc
434     * @buffer: a buffer to push
435     *
436     * Adds a buffer to the queue of buffers that the appsrc element will
437     * push to its source pad. This function does not take ownership of the
438     * buffer so the buffer needs to be unreffed after calling this function.
439     *
440     * When the block property is TRUE, this function can block until free space
441     * becomes available in the queue.
442     */
443   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
444       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
445       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
446           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__BOXED,
447       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
448
449    /**
450     * GstAppSrc::end-of-stream:
451     * @appsrc: the appsrc
452     *
453     * Notify @appsrc that no more buffer are available.
454     */
455   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
456       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
457       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
458           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
459       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
460
461   gst_element_class_set_static_metadata (element_class, "AppSrc",
462       "Generic/Source", "Allow the application to feed buffers to a pipeline",
463       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
464
465   gst_element_class_add_pad_template (element_class,
466       gst_static_pad_template_get (&gst_app_src_template));
467
468   basesrc_class->negotiate = gst_app_src_negotiate;
469   basesrc_class->get_caps = gst_app_src_internal_get_caps;
470   basesrc_class->create = gst_app_src_create;
471   basesrc_class->start = gst_app_src_start;
472   basesrc_class->stop = gst_app_src_stop;
473   basesrc_class->unlock = gst_app_src_unlock;
474   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
475   basesrc_class->do_seek = gst_app_src_do_seek;
476   basesrc_class->is_seekable = gst_app_src_is_seekable;
477   basesrc_class->get_size = gst_app_src_do_get_size;
478   basesrc_class->get_size = gst_app_src_do_get_size;
479   basesrc_class->query = gst_app_src_query;
480
481   klass->push_buffer = gst_app_src_push_buffer_action;
482   klass->end_of_stream = gst_app_src_end_of_stream;
483
484   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
485 }
486
487 static void
488 gst_app_src_init (GstAppSrc * appsrc)
489 {
490   GstAppSrcPrivate *priv;
491
492   priv = appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
493       GstAppSrcPrivate);
494
495   g_mutex_init (&priv->mutex);
496   g_cond_init (&priv->cond);
497   priv->queue = g_queue_new ();
498
499   priv->size = DEFAULT_PROP_SIZE;
500   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
501   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
502   priv->format = DEFAULT_PROP_FORMAT;
503   priv->block = DEFAULT_PROP_BLOCK;
504   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
505   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
506   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
507   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
508
509   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
510 }
511
512 static void
513 gst_app_src_flush_queued (GstAppSrc * src)
514 {
515   GstBuffer *buf;
516   GstAppSrcPrivate *priv = src->priv;
517
518   while ((buf = g_queue_pop_head (priv->queue)))
519     gst_buffer_unref (buf);
520   priv->queued_bytes = 0;
521 }
522
523 static void
524 gst_app_src_dispose (GObject * obj)
525 {
526   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
527   GstAppSrcPrivate *priv = appsrc->priv;
528
529   if (priv->caps) {
530     gst_caps_unref (priv->caps);
531     priv->caps = NULL;
532   }
533   gst_app_src_flush_queued (appsrc);
534
535   G_OBJECT_CLASS (parent_class)->dispose (obj);
536 }
537
538 static void
539 gst_app_src_finalize (GObject * obj)
540 {
541   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
542   GstAppSrcPrivate *priv = appsrc->priv;
543
544   g_mutex_clear (&priv->mutex);
545   g_cond_clear (&priv->cond);
546   g_queue_free (priv->queue);
547
548   g_free (priv->uri);
549
550   G_OBJECT_CLASS (parent_class)->finalize (obj);
551 }
552
553 static GstCaps *
554 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
555 {
556   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
557   GstCaps *caps;
558
559   GST_OBJECT_LOCK (appsrc);
560   if ((caps = appsrc->priv->caps))
561     gst_caps_ref (caps);
562   GST_OBJECT_UNLOCK (appsrc);
563
564   if (filter) {
565     if (caps) {
566       GstCaps *intersection =
567           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
568       gst_caps_unref (caps);
569       caps = intersection;
570     } else {
571       caps = gst_caps_ref (filter);
572     }
573   }
574
575   GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
576   return caps;
577 }
578
579 static void
580 gst_app_src_set_property (GObject * object, guint prop_id,
581     const GValue * value, GParamSpec * pspec)
582 {
583   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
584   GstAppSrcPrivate *priv = appsrc->priv;
585
586   switch (prop_id) {
587     case PROP_CAPS:
588       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
589       break;
590     case PROP_SIZE:
591       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
592       break;
593     case PROP_STREAM_TYPE:
594       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
595       break;
596     case PROP_MAX_BYTES:
597       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
598       break;
599     case PROP_FORMAT:
600       priv->format = g_value_get_enum (value);
601       break;
602     case PROP_BLOCK:
603       priv->block = g_value_get_boolean (value);
604       break;
605     case PROP_IS_LIVE:
606       gst_base_src_set_live (GST_BASE_SRC (appsrc),
607           g_value_get_boolean (value));
608       break;
609     case PROP_MIN_LATENCY:
610       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
611           FALSE, -1);
612       break;
613     case PROP_MAX_LATENCY:
614       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
615           g_value_get_int64 (value));
616       break;
617     case PROP_EMIT_SIGNALS:
618       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
619       break;
620     case PROP_MIN_PERCENT:
621       priv->min_percent = g_value_get_uint (value);
622       break;
623     default:
624       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
625       break;
626   }
627 }
628
629 static void
630 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
631     GParamSpec * pspec)
632 {
633   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
634   GstAppSrcPrivate *priv = appsrc->priv;
635
636   switch (prop_id) {
637     case PROP_CAPS:
638       g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
639       break;
640     case PROP_SIZE:
641       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
642       break;
643     case PROP_STREAM_TYPE:
644       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
645       break;
646     case PROP_MAX_BYTES:
647       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
648       break;
649     case PROP_FORMAT:
650       g_value_set_enum (value, priv->format);
651       break;
652     case PROP_BLOCK:
653       g_value_set_boolean (value, priv->block);
654       break;
655     case PROP_IS_LIVE:
656       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
657       break;
658     case PROP_MIN_LATENCY:
659     {
660       guint64 min;
661
662       gst_app_src_get_latency (appsrc, &min, NULL);
663       g_value_set_int64 (value, min);
664       break;
665     }
666     case PROP_MAX_LATENCY:
667     {
668       guint64 max;
669
670       gst_app_src_get_latency (appsrc, NULL, &max);
671       g_value_set_int64 (value, max);
672       break;
673     }
674     case PROP_EMIT_SIGNALS:
675       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
676       break;
677     case PROP_MIN_PERCENT:
678       g_value_set_uint (value, priv->min_percent);
679       break;
680     default:
681       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
682       break;
683   }
684 }
685
686 static gboolean
687 gst_app_src_unlock (GstBaseSrc * bsrc)
688 {
689   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
690   GstAppSrcPrivate *priv = appsrc->priv;
691
692   g_mutex_lock (&priv->mutex);
693   GST_DEBUG_OBJECT (appsrc, "unlock start");
694   priv->flushing = TRUE;
695   g_cond_broadcast (&priv->cond);
696   g_mutex_unlock (&priv->mutex);
697
698   return TRUE;
699 }
700
701 static gboolean
702 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
703 {
704   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
705   GstAppSrcPrivate *priv = appsrc->priv;
706
707   g_mutex_lock (&priv->mutex);
708   GST_DEBUG_OBJECT (appsrc, "unlock stop");
709   priv->flushing = FALSE;
710   g_cond_broadcast (&priv->cond);
711   g_mutex_unlock (&priv->mutex);
712
713   return TRUE;
714 }
715
716 static gboolean
717 gst_app_src_start (GstBaseSrc * bsrc)
718 {
719   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
720   GstAppSrcPrivate *priv = appsrc->priv;
721
722   g_mutex_lock (&priv->mutex);
723   GST_DEBUG_OBJECT (appsrc, "starting");
724   priv->new_caps = FALSE;
725   priv->started = TRUE;
726   /* set the offset to -1 so that we always do a first seek. This is only used
727    * in random-access mode. */
728   priv->offset = -1;
729   priv->flushing = FALSE;
730   g_mutex_unlock (&priv->mutex);
731
732   gst_base_src_set_format (bsrc, priv->format);
733
734   return TRUE;
735 }
736
737 static gboolean
738 gst_app_src_stop (GstBaseSrc * bsrc)
739 {
740   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
741   GstAppSrcPrivate *priv = appsrc->priv;
742
743   g_mutex_lock (&priv->mutex);
744   GST_DEBUG_OBJECT (appsrc, "stopping");
745   priv->is_eos = FALSE;
746   priv->flushing = TRUE;
747   priv->started = FALSE;
748   gst_app_src_flush_queued (appsrc);
749   g_cond_broadcast (&priv->cond);
750   g_mutex_unlock (&priv->mutex);
751
752   return TRUE;
753 }
754
755 static gboolean
756 gst_app_src_is_seekable (GstBaseSrc * src)
757 {
758   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
759   GstAppSrcPrivate *priv = appsrc->priv;
760   gboolean res = FALSE;
761
762   switch (priv->stream_type) {
763     case GST_APP_STREAM_TYPE_STREAM:
764       break;
765     case GST_APP_STREAM_TYPE_SEEKABLE:
766     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
767       res = TRUE;
768       break;
769   }
770   return res;
771 }
772
773 static gboolean
774 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
775 {
776   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
777
778   *size = gst_app_src_get_size (appsrc);
779
780   return TRUE;
781 }
782
783 static gboolean
784 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
785 {
786   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
787   GstAppSrcPrivate *priv = appsrc->priv;
788   gboolean res;
789
790   switch (GST_QUERY_TYPE (query)) {
791     case GST_QUERY_LATENCY:
792     {
793       GstClockTime min, max;
794       gboolean live;
795
796       /* Query the parent class for the defaults */
797       res = gst_base_src_query_latency (src, &live, &min, &max);
798
799       /* overwrite with our values when we need to */
800       g_mutex_lock (&priv->mutex);
801       if (priv->min_latency != -1)
802         min = priv->min_latency;
803       if (priv->max_latency != -1)
804         max = priv->max_latency;
805       g_mutex_unlock (&priv->mutex);
806
807       gst_query_set_latency (query, live, min, max);
808       break;
809     }
810     case GST_QUERY_SCHEDULING:
811     {
812       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
813       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
814
815       switch (priv->stream_type) {
816         case GST_APP_STREAM_TYPE_STREAM:
817         case GST_APP_STREAM_TYPE_SEEKABLE:
818           break;
819         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
820           gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
821           break;
822       }
823       res = TRUE;
824       break;
825     }
826     default:
827       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
828       break;
829   }
830
831   return res;
832 }
833
834 /* will be called in push mode */
835 static gboolean
836 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
837 {
838   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
839   GstAppSrcPrivate *priv = appsrc->priv;
840   gint64 desired_position;
841   gboolean res = FALSE;
842
843   desired_position = segment->position;
844
845   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
846       desired_position, gst_format_get_name (segment->format));
847
848   /* no need to try to seek in streaming mode */
849   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
850     return TRUE;
851
852   if (priv->callbacks.seek_data)
853     res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
854   else {
855     gboolean emit;
856
857     g_mutex_lock (&priv->mutex);
858     emit = priv->emit_signals;
859     g_mutex_unlock (&priv->mutex);
860
861     if (emit)
862       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
863           desired_position, &res);
864   }
865
866   if (res) {
867     GST_DEBUG_OBJECT (appsrc, "flushing queue");
868     gst_app_src_flush_queued (appsrc);
869     priv->is_eos = FALSE;
870   } else {
871     GST_WARNING_OBJECT (appsrc, "seek failed");
872   }
873
874   return res;
875 }
876
877 /* must be called with the appsrc mutex */
878 static gboolean
879 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
880 {
881   gboolean res = FALSE;
882   gboolean emit;
883   GstAppSrcPrivate *priv = appsrc->priv;
884
885   emit = priv->emit_signals;
886   g_mutex_unlock (&priv->mutex);
887
888   GST_DEBUG_OBJECT (appsrc,
889       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
890       priv->offset, offset);
891
892   if (priv->callbacks.seek_data)
893     res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
894   else if (emit)
895     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
896         offset, &res);
897
898   g_mutex_lock (&priv->mutex);
899
900   return res;
901 }
902
903 /* must be called with the appsrc mutex. After this call things can be
904  * flushing */
905 static void
906 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
907 {
908   gboolean emit;
909   GstAppSrcPrivate *priv = appsrc->priv;
910
911   emit = priv->emit_signals;
912   g_mutex_unlock (&priv->mutex);
913
914   /* we have no data, we need some. We fire the signal with the size hint. */
915   if (priv->callbacks.need_data)
916     priv->callbacks.need_data (appsrc, size, priv->user_data);
917   else if (emit)
918     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
919         NULL);
920
921   g_mutex_lock (&priv->mutex);
922   /* we can be flushing now because we released the lock */
923 }
924
925 static gboolean
926 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
927 {
928   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
929   GstAppSrcPrivate *priv = appsrc->priv;
930   gboolean result;
931   GstCaps *caps;
932
933   GST_OBJECT_LOCK (basesrc);
934   caps = priv->caps ? gst_caps_ref (priv->caps) : NULL;
935   GST_OBJECT_UNLOCK (basesrc);
936
937   if (caps) {
938     result = gst_base_src_set_caps (basesrc, caps);
939     gst_caps_unref (caps);
940   } else {
941     result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
942   }
943
944   return result;
945 }
946
947 static gboolean
948 gst_app_src_negotiate (GstBaseSrc * basesrc)
949 {
950   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
951   GstAppSrcPrivate *priv = appsrc->priv;
952   gboolean result;
953
954   g_mutex_lock (&priv->mutex);
955   result = gst_app_src_do_negotiate (basesrc);
956   priv->new_caps = FALSE;
957   g_mutex_unlock (&priv->mutex);
958   return result;
959 }
960
961 static GstFlowReturn
962 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
963     GstBuffer ** buf)
964 {
965   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
966   GstAppSrcPrivate *priv = appsrc->priv;
967   GstFlowReturn ret;
968
969   GST_OBJECT_LOCK (appsrc);
970   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
971           bsrc->segment.format == GST_FORMAT_BYTES)) {
972     GST_DEBUG_OBJECT (appsrc,
973         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
974         bsrc->segment.duration, priv->size);
975     bsrc->segment.duration = priv->size;
976     GST_OBJECT_UNLOCK (appsrc);
977
978     gst_element_post_message (GST_ELEMENT (appsrc),
979         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
980   } else {
981     GST_OBJECT_UNLOCK (appsrc);
982   }
983
984   g_mutex_lock (&priv->mutex);
985   /* check flushing first */
986   if (G_UNLIKELY (priv->flushing))
987     goto flushing;
988
989   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
990     /* if we are dealing with a random-access stream, issue a seek if the offset
991      * changed. */
992     if (G_UNLIKELY (priv->offset != offset)) {
993       gboolean res;
994
995       /* do the seek */
996       res = gst_app_src_emit_seek (appsrc, offset);
997
998       if (G_UNLIKELY (!res))
999         /* failing to seek is fatal */
1000         goto seek_error;
1001
1002       priv->offset = offset;
1003       priv->is_eos = FALSE;
1004     }
1005   }
1006
1007   while (TRUE) {
1008     /* return data as long as we have some */
1009     if (!g_queue_is_empty (priv->queue)) {
1010       guint buf_size;
1011
1012       if (priv->new_caps) {
1013         gst_app_src_do_negotiate (bsrc);
1014         priv->new_caps = FALSE;
1015       }
1016
1017       *buf = g_queue_pop_head (priv->queue);
1018       buf_size = gst_buffer_get_size (*buf);
1019
1020       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
1021
1022       priv->queued_bytes -= buf_size;
1023
1024       /* only update the offset when in random_access mode */
1025       if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1026         priv->offset += buf_size;
1027
1028       /* signal that we removed an item */
1029       g_cond_broadcast (&priv->cond);
1030
1031       /* see if we go lower than the empty-percent */
1032       if (priv->min_percent && priv->max_bytes) {
1033         if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
1034           /* ignore flushing state, we got a buffer and we will return it now.
1035            * Errors will be handled in the next round */
1036           gst_app_src_emit_need_data (appsrc, size);
1037       }
1038       ret = GST_FLOW_OK;
1039       break;
1040     } else {
1041       gst_app_src_emit_need_data (appsrc, size);
1042
1043       /* we can be flushing now because we released the lock above */
1044       if (G_UNLIKELY (priv->flushing))
1045         goto flushing;
1046
1047       /* if we have a buffer now, continue the loop and try to return it. In
1048        * random-access mode (where a buffer is normally pushed in the above
1049        * signal) we can still be empty because the pushed buffer got flushed or
1050        * when the application pushes the requested buffer later, we support both
1051        * possibilities. */
1052       if (!g_queue_is_empty (priv->queue))
1053         continue;
1054
1055       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1056     }
1057
1058     /* check EOS */
1059     if (G_UNLIKELY (priv->is_eos))
1060       goto eos;
1061
1062     /* nothing to return, wait a while for new data or flushing. */
1063     g_cond_wait (&priv->cond, &priv->mutex);
1064   }
1065   g_mutex_unlock (&priv->mutex);
1066   return ret;
1067
1068   /* ERRORS */
1069 flushing:
1070   {
1071     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1072     g_mutex_unlock (&priv->mutex);
1073     return GST_FLOW_FLUSHING;
1074   }
1075 eos:
1076   {
1077     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1078     g_mutex_unlock (&priv->mutex);
1079     return GST_FLOW_EOS;
1080   }
1081 seek_error:
1082   {
1083     g_mutex_unlock (&priv->mutex);
1084     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1085         GST_ERROR_SYSTEM);
1086     return GST_FLOW_ERROR;
1087   }
1088 }
1089
1090 /* external API */
1091
1092 /**
1093  * gst_app_src_set_caps:
1094  * @appsrc: a #GstAppSrc
1095  * @caps: caps to set
1096  *
1097  * Set the capabilities on the appsrc element.  This function takes
1098  * a copy of the caps structure. After calling this method, the source will
1099  * only produce caps that match @caps. @caps must be fixed and the caps on the
1100  * buffers must match the caps or left NULL.
1101  */
1102 void
1103 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1104 {
1105   GstCaps *old;
1106   GstAppSrcPrivate *priv;
1107
1108   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1109
1110   priv = appsrc->priv;
1111
1112   g_mutex_lock (&priv->mutex);
1113
1114   GST_OBJECT_LOCK (appsrc);
1115   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1116   if ((old = priv->caps) != caps) {
1117     if (caps)
1118       priv->caps = gst_caps_copy (caps);
1119     else
1120       priv->caps = NULL;
1121     if (old)
1122       gst_caps_unref (old);
1123     priv->new_caps = TRUE;
1124   }
1125   GST_OBJECT_UNLOCK (appsrc);
1126
1127   g_mutex_unlock (&priv->mutex);
1128 }
1129
1130 /**
1131  * gst_app_src_get_caps:
1132  * @appsrc: a #GstAppSrc
1133  *
1134  * Get the configured caps on @appsrc.
1135  *
1136  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1137  */
1138 GstCaps *
1139 gst_app_src_get_caps (GstAppSrc * appsrc)
1140 {
1141   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1142
1143   return gst_app_src_internal_get_caps (GST_BASE_SRC_CAST (appsrc), NULL);
1144 }
1145
1146 /**
1147  * gst_app_src_set_size:
1148  * @appsrc: a #GstAppSrc
1149  * @size: the size to set
1150  *
1151  * Set the size of the stream in bytes. A value of -1 means that the size is
1152  * not known.
1153  */
1154 void
1155 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1156 {
1157   GstAppSrcPrivate *priv;
1158
1159   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1160
1161   priv = appsrc->priv;
1162
1163   GST_OBJECT_LOCK (appsrc);
1164   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1165   priv->size = size;
1166   GST_OBJECT_UNLOCK (appsrc);
1167 }
1168
1169 /**
1170  * gst_app_src_get_size:
1171  * @appsrc: a #GstAppSrc
1172  *
1173  * Get the size of the stream in bytes. A value of -1 means that the size is
1174  * not known.
1175  *
1176  * Returns: the size of the stream previously set with gst_app_src_set_size();
1177  */
1178 gint64
1179 gst_app_src_get_size (GstAppSrc * appsrc)
1180 {
1181   gint64 size;
1182   GstAppSrcPrivate *priv;
1183
1184   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1185
1186   priv = appsrc->priv;
1187
1188   GST_OBJECT_LOCK (appsrc);
1189   size = priv->size;
1190   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1191   GST_OBJECT_UNLOCK (appsrc);
1192
1193   return size;
1194 }
1195
1196 /**
1197  * gst_app_src_set_stream_type:
1198  * @appsrc: a #GstAppSrc
1199  * @type: the new state
1200  *
1201  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1202  * be connected to.
1203  *
1204  * A stream_type stream
1205  */
1206 void
1207 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1208 {
1209   GstAppSrcPrivate *priv;
1210
1211   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1212
1213   priv = appsrc->priv;
1214
1215   GST_OBJECT_LOCK (appsrc);
1216   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1217   priv->stream_type = type;
1218   GST_OBJECT_UNLOCK (appsrc);
1219 }
1220
1221 /**
1222  * gst_app_src_get_stream_type:
1223  * @appsrc: a #GstAppSrc
1224  *
1225  * Get the stream type. Control the stream type of @appsrc
1226  * with gst_app_src_set_stream_type().
1227  *
1228  * Returns: the stream type.
1229  */
1230 GstAppStreamType
1231 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1232 {
1233   gboolean stream_type;
1234   GstAppSrcPrivate *priv;
1235
1236   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1237
1238   priv = appsrc->priv;
1239
1240   GST_OBJECT_LOCK (appsrc);
1241   stream_type = priv->stream_type;
1242   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1243   GST_OBJECT_UNLOCK (appsrc);
1244
1245   return stream_type;
1246 }
1247
1248 /**
1249  * gst_app_src_set_max_bytes:
1250  * @appsrc: a #GstAppSrc
1251  * @max: the maximum number of bytes to queue
1252  *
1253  * Set the maximum amount of bytes that can be queued in @appsrc.
1254  * After the maximum amount of bytes are queued, @appsrc will emit the
1255  * "enough-data" signal.
1256  */
1257 void
1258 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1259 {
1260   GstAppSrcPrivate *priv;
1261
1262   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1263
1264   priv = appsrc->priv;
1265
1266   g_mutex_lock (&priv->mutex);
1267   if (max != priv->max_bytes) {
1268     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1269     priv->max_bytes = max;
1270     /* signal the change */
1271     g_cond_broadcast (&priv->cond);
1272   }
1273   g_mutex_unlock (&priv->mutex);
1274 }
1275
1276 /**
1277  * gst_app_src_get_max_bytes:
1278  * @appsrc: a #GstAppSrc
1279  *
1280  * Get the maximum amount of bytes that can be queued in @appsrc.
1281  *
1282  * Returns: The maximum amount of bytes that can be queued.
1283  */
1284 guint64
1285 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1286 {
1287   guint64 result;
1288   GstAppSrcPrivate *priv;
1289
1290   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1291
1292   priv = appsrc->priv;
1293
1294   g_mutex_lock (&priv->mutex);
1295   result = priv->max_bytes;
1296   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1297   g_mutex_unlock (&priv->mutex);
1298
1299   return result;
1300 }
1301
1302 static void
1303 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1304     gboolean do_max, guint64 max)
1305 {
1306   GstAppSrcPrivate *priv = appsrc->priv;
1307   gboolean changed = FALSE;
1308
1309   g_mutex_lock (&priv->mutex);
1310   if (do_min && priv->min_latency != min) {
1311     priv->min_latency = min;
1312     changed = TRUE;
1313   }
1314   if (do_max && priv->max_latency != max) {
1315     priv->max_latency = max;
1316     changed = TRUE;
1317   }
1318   g_mutex_unlock (&priv->mutex);
1319
1320   if (changed) {
1321     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1322     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1323         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1324   }
1325 }
1326
1327 /**
1328  * gst_app_src_set_latency:
1329  * @appsrc: a #GstAppSrc
1330  * @min: the min latency
1331  * @max: the min latency
1332  *
1333  * Configure the @min and @max latency in @src. If @min is set to -1, the
1334  * default latency calculations for pseudo-live sources will be used.
1335  */
1336 void
1337 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1338 {
1339   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1340 }
1341
1342 /**
1343  * gst_app_src_get_latency:
1344  * @appsrc: a #GstAppSrc
1345  * @min: the min latency
1346  * @max: the min latency
1347  *
1348  * Retrieve the min and max latencies in @min and @max respectively.
1349  */
1350 void
1351 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1352 {
1353   GstAppSrcPrivate *priv;
1354
1355   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1356
1357   priv = appsrc->priv;
1358
1359   g_mutex_lock (&priv->mutex);
1360   if (min)
1361     *min = priv->min_latency;
1362   if (max)
1363     *max = priv->max_latency;
1364   g_mutex_unlock (&priv->mutex);
1365 }
1366
1367 /**
1368  * gst_app_src_set_emit_signals:
1369  * @appsrc: a #GstAppSrc
1370  * @emit: the new state
1371  *
1372  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1373  * by default disabled because signal emission is expensive and unneeded when
1374  * the application prefers to operate in pull mode.
1375  */
1376 void
1377 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1378 {
1379   GstAppSrcPrivate *priv;
1380
1381   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1382
1383   priv = appsrc->priv;
1384
1385   g_mutex_lock (&priv->mutex);
1386   priv->emit_signals = emit;
1387   g_mutex_unlock (&priv->mutex);
1388 }
1389
1390 /**
1391  * gst_app_src_get_emit_signals:
1392  * @appsrc: a #GstAppSrc
1393  *
1394  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1395  *
1396  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
1397  * signals.
1398  */
1399 gboolean
1400 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1401 {
1402   gboolean result;
1403   GstAppSrcPrivate *priv;
1404
1405   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1406
1407   priv = appsrc->priv;
1408
1409   g_mutex_lock (&priv->mutex);
1410   result = priv->emit_signals;
1411   g_mutex_unlock (&priv->mutex);
1412
1413   return result;
1414 }
1415
1416 static GstFlowReturn
1417 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1418     gboolean steal_ref)
1419 {
1420   gboolean first = TRUE;
1421   GstAppSrcPrivate *priv;
1422
1423   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1424   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1425
1426   priv = appsrc->priv;
1427
1428   g_mutex_lock (&priv->mutex);
1429
1430   while (TRUE) {
1431     /* can't accept buffers when we are flushing or EOS */
1432     if (priv->flushing)
1433       goto flushing;
1434
1435     if (priv->is_eos)
1436       goto eos;
1437
1438     if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1439       GST_DEBUG_OBJECT (appsrc,
1440           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1441           priv->queued_bytes, priv->max_bytes);
1442
1443       if (first) {
1444         gboolean emit;
1445
1446         emit = priv->emit_signals;
1447         /* only signal on the first push */
1448         g_mutex_unlock (&priv->mutex);
1449
1450         if (priv->callbacks.enough_data)
1451           priv->callbacks.enough_data (appsrc, priv->user_data);
1452         else if (emit)
1453           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1454               NULL);
1455
1456         g_mutex_lock (&priv->mutex);
1457         /* continue to check for flushing/eos after releasing the lock */
1458         first = FALSE;
1459         continue;
1460       }
1461       if (priv->block) {
1462         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1463         /* we are filled, wait until a buffer gets popped or when we
1464          * flush. */
1465         g_cond_wait (&priv->cond, &priv->mutex);
1466       } else {
1467         /* no need to wait for free space, we just pump more data into the
1468          * queue hoping that the caller reacts to the enough-data signal and
1469          * stops pushing buffers. */
1470         break;
1471       }
1472     } else
1473       break;
1474   }
1475
1476   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1477   if (!steal_ref)
1478     gst_buffer_ref (buffer);
1479   g_queue_push_tail (priv->queue, buffer);
1480   priv->queued_bytes += gst_buffer_get_size (buffer);
1481   g_cond_broadcast (&priv->cond);
1482   g_mutex_unlock (&priv->mutex);
1483
1484   return GST_FLOW_OK;
1485
1486   /* ERRORS */
1487 flushing:
1488   {
1489     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1490     if (steal_ref)
1491       gst_buffer_unref (buffer);
1492     g_mutex_unlock (&priv->mutex);
1493     return GST_FLOW_FLUSHING;
1494   }
1495 eos:
1496   {
1497     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1498     if (steal_ref)
1499       gst_buffer_unref (buffer);
1500     g_mutex_unlock (&priv->mutex);
1501     return GST_FLOW_EOS;
1502   }
1503 }
1504
1505 /**
1506  * gst_app_src_push_buffer:
1507  * @appsrc: a #GstAppSrc
1508  * @buffer: (transfer full): a #GstBuffer to push
1509  *
1510  * Adds a buffer to the queue of buffers that the appsrc element will
1511  * push to its source pad.  This function takes ownership of the buffer.
1512  *
1513  * When the block property is TRUE, this function can block until free
1514  * space becomes available in the queue.
1515  *
1516  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1517  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1518  * #GST_FLOW_EOS when EOS occured.
1519  */
1520 GstFlowReturn
1521 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1522 {
1523   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1524 }
1525
1526 /* push a buffer without stealing the ref of the buffer. This is used for the
1527  * action signal. */
1528 static GstFlowReturn
1529 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
1530 {
1531   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1532 }
1533
1534 /**
1535  * gst_app_src_end_of_stream:
1536  * @appsrc: a #GstAppSrc
1537  *
1538  * Indicates to the appsrc element that the last buffer queued in the
1539  * element is the last buffer of the stream.
1540  *
1541  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
1542  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1543  */
1544 GstFlowReturn
1545 gst_app_src_end_of_stream (GstAppSrc * appsrc)
1546 {
1547   GstAppSrcPrivate *priv;
1548
1549   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1550
1551   priv = appsrc->priv;
1552
1553   g_mutex_lock (&priv->mutex);
1554   /* can't accept buffers when we are flushing. We can accept them when we are
1555    * EOS although it will not do anything. */
1556   if (priv->flushing)
1557     goto flushing;
1558
1559   GST_DEBUG_OBJECT (appsrc, "sending EOS");
1560   priv->is_eos = TRUE;
1561   g_cond_broadcast (&priv->cond);
1562   g_mutex_unlock (&priv->mutex);
1563
1564   return GST_FLOW_OK;
1565
1566   /* ERRORS */
1567 flushing:
1568   {
1569     g_mutex_unlock (&priv->mutex);
1570     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
1571     return GST_FLOW_FLUSHING;
1572   }
1573 }
1574
1575 /**
1576  * gst_app_src_set_callbacks: (skip)
1577  * @appsrc: a #GstAppSrc
1578  * @callbacks: the callbacks
1579  * @user_data: a user_data argument for the callbacks
1580  * @notify: a destroy notify function
1581  *
1582  * Set callbacks which will be executed when data is needed, enough data has
1583  * been collected or when a seek should be performed.
1584  * This is an alternative to using the signals, it has lower overhead and is thus
1585  * less expensive, but also less flexible.
1586  *
1587  * If callbacks are installed, no signals will be emitted for performance
1588  * reasons.
1589  */
1590 void
1591 gst_app_src_set_callbacks (GstAppSrc * appsrc,
1592     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1593 {
1594   GDestroyNotify old_notify;
1595   GstAppSrcPrivate *priv;
1596
1597   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1598   g_return_if_fail (callbacks != NULL);
1599
1600   priv = appsrc->priv;
1601
1602   GST_OBJECT_LOCK (appsrc);
1603   old_notify = priv->notify;
1604
1605   if (old_notify) {
1606     gpointer old_data;
1607
1608     old_data = priv->user_data;
1609
1610     priv->user_data = NULL;
1611     priv->notify = NULL;
1612     GST_OBJECT_UNLOCK (appsrc);
1613
1614     old_notify (old_data);
1615
1616     GST_OBJECT_LOCK (appsrc);
1617   }
1618   priv->callbacks = *callbacks;
1619   priv->user_data = user_data;
1620   priv->notify = notify;
1621   GST_OBJECT_UNLOCK (appsrc);
1622 }
1623
1624 /*** GSTURIHANDLER INTERFACE *************************************************/
1625
1626 static GstURIType
1627 gst_app_src_uri_get_type (GType type)
1628 {
1629   return GST_URI_SRC;
1630 }
1631
1632 static const gchar *const *
1633 gst_app_src_uri_get_protocols (GType type)
1634 {
1635   static const gchar *protocols[] = { "appsrc", NULL };
1636
1637   return protocols;
1638 }
1639
1640 static gchar *
1641 gst_app_src_uri_get_uri (GstURIHandler * handler)
1642 {
1643   GstAppSrc *appsrc = GST_APP_SRC (handler);
1644
1645   return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
1646 }
1647
1648 static gboolean
1649 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1650     GError ** error)
1651 {
1652   GstAppSrc *appsrc = GST_APP_SRC (handler);
1653
1654   g_free (appsrc->priv->uri);
1655   appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
1656
1657   return TRUE;
1658 }
1659
1660 static void
1661 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1662 {
1663   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1664
1665   iface->get_type = gst_app_src_uri_get_type;
1666   iface->get_protocols = gst_app_src_uri_get_protocols;
1667   iface->get_uri = gst_app_src_uri_get_uri;
1668   iface->set_uri = gst_app_src_uri_set_uri;
1669 }