core/base: Only post latency messages if the latency values have actually changed
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-base / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @title: GstAppSrc
23  * @short_description: Easy way for applications to inject buffers into a
24  *     pipeline
25  * @see_also: #GstBaseSrc, appsink
26  *
27  * The appsrc element can be used by applications to insert data into a
28  * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29  * external API functions.
30  *
31  * appsrc can be used by linking with the libgstapp library to access the
32  * methods directly or by using the appsrc action signals.
33  *
34  * Before operating appsrc, the caps property must be set to fixed caps
35  * describing the format of the data that will be pushed with appsrc. An
36  * exception to this is when pushing buffers with unknown caps, in which case no
37  * caps should be set. This is typically true of file-like sources that push raw
38  * byte buffers. If you don't want to explicitly set the caps, you can use
39  * gst_app_src_push_sample. This method gets the caps associated with the
40  * sample and sets them on the appsrc replacing any previously set caps (if
41  * different from sample's caps).
42  *
43  * The main way of handing data to the appsrc element is by calling the
44  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45  * This will put the buffer onto a queue from which appsrc will read from in its
46  * streaming thread. It is important to note that data transport will not happen
47  * from the thread that performed the push-buffer call.
48  *
49  * The "max-bytes", "max-buffers" and "max-time" properties control how much
50  * data can be queued in appsrc before appsrc considers the queue full. A
51  * filled internal queue will always signal the "enough-data" signal, which
52  * signals the application that it should stop pushing data into appsrc. The
53  * "block" property will cause appsrc to block the push-buffer method until
54  * free data becomes available again.
55  *
56  * When the internal queue is running out of data, the "need-data" signal is
57  * emitted, which signals the application that it should start pushing more data
58  * into appsrc.
59  *
60  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
61  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
62  * "random-access". The signal argument will contain the new desired position in
63  * the stream expressed in the unit set with the "format" property. After
64  * receiving the seek-data signal, the application should push-buffers from the
65  * new position.
66  *
67  * These signals allow the application to operate the appsrc in two different
68  * ways:
69  *
70  * The push mode, in which the application repeatedly calls the push-buffer/push-sample
71  * method with a new buffer/sample. Optionally, the queue size in the appsrc
72  * can be controlled with the enough-data and need-data signals by respectively
73  * stopping/starting the push-buffer/push-sample calls. This is a typical
74  * mode of operation for the stream-type "stream" and "seekable". Use this
75  * mode when implementing various network protocols or hardware devices.
76  *
77  * The pull mode, in which the need-data signal triggers the next push-buffer call.
78  * This mode is typically used in the "random-access" stream-type. Use this
79  * mode for file access or other randomly accessible sources. In this mode, a
80  * buffer of exactly the amount of bytes given by the need-data signal should be
81  * pushed into appsrc.
82  *
83  * In all modes, the size property on appsrc should contain the total stream
84  * size in bytes. Setting this property is mandatory in the random-access mode.
85  * For the stream and seekable modes, setting this property is optional but
86  * recommended.
87  *
88  * When the application has finished pushing data into appsrc, it should call
89  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
90  * this call, no more buffers can be pushed into appsrc until a flushing seek
91  * occurs or the state of the appsrc has gone through READY.
92  */
93
94 #ifdef HAVE_CONFIG_H
95 #include "config.h"
96 #endif
97
98 #include <gst/gst.h>
99 #include <gst/base/base.h>
100
101 #include <string.h>
102
103 #include "gstappsrc.h"
104
105 typedef enum
106 {
107   NOONE_WAITING = 0,
108   STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
109   APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
110 } GstAppSrcWaitStatus;
111
112 typedef struct
113 {
114   GstAppSrcCallbacks callbacks;
115   gpointer user_data;
116   GDestroyNotify destroy_notify;
117   gint ref_count;
118 } Callbacks;
119
120 static Callbacks *
121 callbacks_ref (Callbacks * callbacks)
122 {
123   g_atomic_int_inc (&callbacks->ref_count);
124
125   return callbacks;
126 }
127
128 static void
129 callbacks_unref (Callbacks * callbacks)
130 {
131   if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
132     return;
133
134   if (callbacks->destroy_notify)
135     callbacks->destroy_notify (callbacks->user_data);
136
137   g_free (callbacks);
138 }
139
140
141 struct _GstAppSrcPrivate
142 {
143   GCond cond;
144   GMutex mutex;
145   GstQueueArray *queue;
146   GstAppSrcWaitStatus wait_status;
147
148   GstCaps *last_caps;
149   GstCaps *current_caps;
150   /* last segment received on the input */
151   GstSegment last_segment;
152   /* currently configured segment for the output */
153   GstSegment current_segment;
154   /* queue up a segment event based on last_segment before
155    * the next buffer of buffer list */
156   gboolean pending_custom_segment;
157
158   /* the next buffer that will be queued needs a discont flag
159    * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
160   gboolean need_discont_upstream;
161   /* the next buffer that will be dequeued needs a discont flag
162    * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
163   gboolean need_discont_downstream;
164
165   gint64 size;
166   GstClockTime duration;
167   GstAppStreamType stream_type;
168   guint64 max_bytes, max_buffers, max_time;
169   GstFormat format;
170   gboolean block;
171   gchar *uri;
172
173   gboolean flushing;
174   gboolean started;
175   gboolean is_eos;
176   guint64 queued_bytes, queued_buffers;
177   /* Used to calculate the current time level */
178   GstClockTime last_in_running_time, last_out_running_time;
179   /* Updated based on the above whenever they change */
180   GstClockTime queued_time;
181   guint64 offset;
182   GstAppStreamType current_type;
183
184   guint64 min_latency;
185   guint64 max_latency;
186   /* Tracks whether the latency message was posted at least once */
187   gboolean posted_latency_msg;
188
189   gboolean emit_signals;
190   guint min_percent;
191   gboolean handle_segment_change;
192
193   GstAppLeakyType leaky_type;
194
195   Callbacks *callbacks;
196 };
197
198 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
199 #define GST_CAT_DEFAULT app_src_debug
200
201 enum
202 {
203   /* signals */
204   SIGNAL_NEED_DATA,
205   SIGNAL_ENOUGH_DATA,
206   SIGNAL_SEEK_DATA,
207
208   /* actions */
209   SIGNAL_PUSH_BUFFER,
210   SIGNAL_END_OF_STREAM,
211   SIGNAL_PUSH_SAMPLE,
212   SIGNAL_PUSH_BUFFER_LIST,
213
214   LAST_SIGNAL
215 };
216
217 #define DEFAULT_PROP_SIZE          -1
218 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
219 #define DEFAULT_PROP_MAX_BYTES     200000
220 #define DEFAULT_PROP_MAX_BUFFERS   0
221 #define DEFAULT_PROP_MAX_TIME      (0 * GST_SECOND)
222 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
223 #define DEFAULT_PROP_BLOCK         FALSE
224 #define DEFAULT_PROP_IS_LIVE       FALSE
225 #define DEFAULT_PROP_MIN_LATENCY   -1
226 #define DEFAULT_PROP_MAX_LATENCY   -1
227 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
228 #define DEFAULT_PROP_MIN_PERCENT   0
229 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES   0
230 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
231 #define DEFAULT_PROP_CURRENT_LEVEL_TIME    0
232 #define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
233 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
234 #define DEFAULT_PROP_LEAKY_TYPE    GST_APP_LEAKY_TYPE_NONE
235
236 enum
237 {
238   PROP_0,
239   PROP_CAPS,
240   PROP_SIZE,
241   PROP_STREAM_TYPE,
242   PROP_MAX_BYTES,
243   PROP_MAX_BUFFERS,
244   PROP_MAX_TIME,
245   PROP_FORMAT,
246   PROP_BLOCK,
247   PROP_IS_LIVE,
248   PROP_MIN_LATENCY,
249   PROP_MAX_LATENCY,
250   PROP_EMIT_SIGNALS,
251   PROP_MIN_PERCENT,
252   PROP_CURRENT_LEVEL_BYTES,
253   PROP_CURRENT_LEVEL_BUFFERS,
254   PROP_CURRENT_LEVEL_TIME,
255   PROP_DURATION,
256   PROP_HANDLE_SEGMENT_CHANGE,
257   PROP_LEAKY_TYPE,
258   PROP_LAST
259 };
260
261 static GstStaticPadTemplate gst_app_src_template =
262 GST_STATIC_PAD_TEMPLATE ("src",
263     GST_PAD_SRC,
264     GST_PAD_ALWAYS,
265     GST_STATIC_CAPS_ANY);
266
267 static void gst_app_src_uri_handler_init (gpointer g_iface,
268     gpointer iface_data);
269
270 static void gst_app_src_dispose (GObject * object);
271 static void gst_app_src_finalize (GObject * object);
272
273 static void gst_app_src_set_property (GObject * object, guint prop_id,
274     const GValue * value, GParamSpec * pspec);
275 static void gst_app_src_get_property (GObject * object, guint prop_id,
276     GValue * value, GParamSpec * pspec);
277
278 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
279
280 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
281     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
282
283 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
284 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
285     GstCaps * filter);
286 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
287     guint size, GstBuffer ** buf);
288 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
289 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
290 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
291 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
292 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
293 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
294 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
295 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
296 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
297
298 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
299     GstBuffer * buffer);
300 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
301     GstBufferList * buffer_list);
302 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
303     GstSample * sample);
304
305 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
306
307 #define gst_app_src_parent_class parent_class
308 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
309     G_ADD_PRIVATE (GstAppSrc)
310     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
311
312 static void
313 gst_app_src_class_init (GstAppSrcClass * klass)
314 {
315   GObjectClass *gobject_class = (GObjectClass *) klass;
316   GstElementClass *element_class = (GstElementClass *) klass;
317   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
318
319   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
320
321   gobject_class->dispose = gst_app_src_dispose;
322   gobject_class->finalize = gst_app_src_finalize;
323
324   gobject_class->set_property = gst_app_src_set_property;
325   gobject_class->get_property = gst_app_src_get_property;
326
327   /**
328    * GstAppSrc:caps:
329    *
330    * The GstCaps that will negotiated downstream and will be put
331    * on outgoing buffers.
332    */
333   g_object_class_install_property (gobject_class, PROP_CAPS,
334       g_param_spec_boxed ("caps", "Caps",
335           "The allowed caps for the src pad", GST_TYPE_CAPS,
336           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
337   /**
338    * GstAppSrc:format:
339    *
340    * The format to use for segment events. When the source is producing
341    * timestamped buffers this property should be set to GST_FORMAT_TIME.
342    */
343   g_object_class_install_property (gobject_class, PROP_FORMAT,
344       g_param_spec_enum ("format", "Format",
345           "The format of the segment events and seek", GST_TYPE_FORMAT,
346           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347   /**
348    * GstAppSrc:size:
349    *
350    * The total size in bytes of the data stream. If the total size is known, it
351    * is recommended to configure it with this property.
352    */
353   g_object_class_install_property (gobject_class, PROP_SIZE,
354       g_param_spec_int64 ("size", "Size",
355           "The size of the data stream in bytes (-1 if unknown)",
356           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
357           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358   /**
359    * GstAppSrc:stream-type:
360    *
361    * The type of stream that this source is producing.  For seekable streams the
362    * application should connect to the seek-data signal.
363    */
364   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
365       g_param_spec_enum ("stream-type", "Stream Type",
366           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
367           DEFAULT_PROP_STREAM_TYPE,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   /**
370    * GstAppSrc:max-bytes:
371    *
372    * The maximum amount of bytes that can be queued internally.
373    * After the maximum amount of bytes are queued, appsrc will emit the
374    * "enough-data" signal.
375    */
376   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
377       g_param_spec_uint64 ("max-bytes", "Max bytes",
378           "The maximum number of bytes to queue internally (0 = unlimited)",
379           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
380           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381
382   /**
383    * GstAppSrc:max-buffers:
384    *
385    * The maximum amount of buffers that can be queued internally.
386    * After the maximum amount of buffers are queued, appsrc will emit the
387    * "enough-data" signal.
388    *
389    * Since: 1.20
390    */
391   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
392       g_param_spec_uint64 ("max-buffers", "Max buffers",
393           "The maximum number of buffers to queue internally (0 = unlimited)",
394           0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
395           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
396
397   /**
398    * GstAppSrc:max-time:
399    *
400    * The maximum amount of time that can be queued internally.
401    * After the maximum amount of time are queued, appsrc will emit the
402    * "enough-data" signal.
403    *
404    * Since: 1.20
405    */
406   g_object_class_install_property (gobject_class, PROP_MAX_TIME,
407       g_param_spec_uint64 ("max-time", "Max time",
408           "The maximum amount of time to queue internally (0 = unlimited)",
409           0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
410           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411
412   /**
413    * GstAppSrc:block:
414    *
415    * When max-bytes are queued and after the enough-data signal has been emitted,
416    * block any further push-buffer calls until the amount of queued bytes drops
417    * below the max-bytes limit.
418    */
419   g_object_class_install_property (gobject_class, PROP_BLOCK,
420       g_param_spec_boolean ("block", "Block",
421           "Block push-buffer when max-bytes are queued",
422           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423
424   /**
425    * GstAppSrc:is-live:
426    *
427    * Instruct the source to behave like a live source. This includes that it
428    * will only push out buffers in the PLAYING state.
429    */
430   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
431       g_param_spec_boolean ("is-live", "Is Live",
432           "Whether to act as a live source",
433           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434   /**
435    * GstAppSrc:min-latency:
436    *
437    * The minimum latency of the source. A value of -1 will use the default
438    * latency calculations of #GstBaseSrc.
439    */
440   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
441       g_param_spec_int64 ("min-latency", "Min Latency",
442           "The minimum latency (-1 = default)",
443           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
444           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
445   /**
446    * GstAppSrc::max-latency:
447    *
448    * The maximum latency of the source. A value of -1 means an unlimited amount
449    * of latency.
450    */
451   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
452       g_param_spec_int64 ("max-latency", "Max Latency",
453           "The maximum latency (-1 = unlimited)",
454           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
455           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456
457   /**
458    * GstAppSrc:emit-signals:
459    *
460    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
461    * This option is by default enabled for backwards compatibility reasons but
462    * can disabled when needed because signal emission is expensive.
463    */
464   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
465       g_param_spec_boolean ("emit-signals", "Emit signals",
466           "Emit need-data, enough-data and seek-data signals",
467           DEFAULT_PROP_EMIT_SIGNALS,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469
470   /**
471    * GstAppSrc:min-percent:
472    *
473    * Make appsrc emit the "need-data" signal when the amount of bytes in the
474    * queue drops below this percentage of max-bytes.
475    */
476   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
477       g_param_spec_uint ("min-percent", "Min Percent",
478           "Emit need-data when queued bytes drops below this percent of max-bytes",
479           0, 100, DEFAULT_PROP_MIN_PERCENT,
480           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
481
482   /**
483    * GstAppSrc:current-level-bytes:
484    *
485    * The number of currently queued bytes inside appsrc.
486    *
487    * Since: 1.2
488    */
489   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
490       g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
491           "The number of currently queued bytes",
492           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
493           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
494
495   /**
496    * GstAppSrc:current-level-buffers:
497    *
498    * The number of currently queued buffers inside appsrc.
499    *
500    * Since: 1.20
501    */
502   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
503       g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
504           "The number of currently queued buffers",
505           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
506           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
507
508   /**
509    * GstAppSrc:current-level-time:
510    *
511    * The amount of currently queued time inside appsrc.
512    *
513    * Since: 1.20
514    */
515   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
516       g_param_spec_uint64 ("current-level-time", "Current Level Time",
517           "The amount of currently queued time",
518           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
519           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
520
521   /**
522    * GstAppSrc:duration:
523    *
524    * The total duration in nanoseconds of the data stream. If the total duration is known, it
525    * is recommended to configure it with this property.
526    *
527    * Since: 1.10
528    */
529   g_object_class_install_property (gobject_class, PROP_DURATION,
530       g_param_spec_uint64 ("duration", "Duration",
531           "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
532           0, G_MAXUINT64, DEFAULT_PROP_DURATION,
533           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
534
535   /**
536    * GstAppSrc:handle-segment-change:
537    *
538    * When enabled, appsrc will check GstSegment in GstSample which was
539    * pushed via gst_app_src_push_sample() or "push-sample" signal action.
540    * If a GstSegment is changed, corresponding segment event will be followed
541    * by next data flow.
542    *
543    * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
544    * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
545    * other formats.
546    *
547    * Since: 1.18
548    */
549   g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
550       g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
551           "Whether to detect and handle changed time format GstSegment in "
552           "GstSample. User should set valid GstSegment in GstSample. "
553           "Must set format property as \"time\" to enable this property",
554           DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
555           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
556           G_PARAM_STATIC_STRINGS));
557
558   /**
559    * GstAppSrc:leaky-type:
560    *
561    * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
562    * will drop any buffers that are pushed into it once its internal queue is
563    * full. The selected type defines whether to drop the oldest or new
564    * buffers.
565    *
566    * Since: 1.20
567    */
568   g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
569       g_param_spec_enum ("leaky-type", "Leaky Type",
570           "Whether to drop buffers once the internal queue is full",
571           GST_TYPE_APP_LEAKY_TYPE,
572           DEFAULT_PROP_LEAKY_TYPE,
573           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
574           G_PARAM_STATIC_STRINGS));
575
576   /**
577    * GstAppSrc::need-data:
578    * @appsrc: the appsrc element that emitted the signal
579    * @length: the amount of bytes needed.
580    *
581    * Signal that the source needs more data. In the callback or from another
582    * thread you should call push-buffer or end-of-stream.
583    *
584    * @length is just a hint and when it is set to -1, any number of bytes can be
585    * pushed into @appsrc.
586    *
587    * You can call push-buffer multiple times until the enough-data signal is
588    * fired.
589    */
590   gst_app_src_signals[SIGNAL_NEED_DATA] =
591       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
592       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
593       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
594
595   /**
596    * GstAppSrc::enough-data:
597    * @appsrc: the appsrc element that emitted the signal
598    *
599    * Signal that the source has enough data. It is recommended that the
600    * application stops calling push-buffer until the need-data signal is
601    * emitted again to avoid excessive buffer queueing.
602    */
603   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
604       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
605       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
606       NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
607
608   /**
609    * GstAppSrc::seek-data:
610    * @appsrc: the appsrc element that emitted the signal
611    * @offset: the offset to seek to
612    *
613    * Seek to the given offset. The next push-buffer should produce buffers from
614    * the new @offset.
615    * This callback is only called for seekable stream types.
616    *
617    * Returns: %TRUE if the seek succeeded.
618    */
619   gst_app_src_signals[SIGNAL_SEEK_DATA] =
620       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
621       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
622       NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
623
624    /**
625     * GstAppSrc::push-buffer:
626     * @appsrc: the appsrc
627     * @buffer: (transfer none): a buffer to push
628     *
629     * Adds a buffer to the queue of buffers that the appsrc element will
630     * push to its source pad.
631     *
632     * This function does not take ownership of the buffer, but it takes a
633     * reference so the buffer can be unreffed at any time after calling this
634     * function.
635     *
636     * When the block property is TRUE, this function can block until free space
637     * becomes available in the queue.
638     */
639   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
640       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
641       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
642           push_buffer), NULL, NULL, NULL,
643       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
644
645    /**
646     * GstAppSrc::push-buffer-list:
647     * @appsrc: the appsrc
648     * @buffer_list: (transfer none): a buffer list to push
649     *
650     * Adds a buffer list to the queue of buffers and buffer lists that the
651     * appsrc element will push to its source pad.
652     *
653     * This function does not take ownership of the buffer list, but it takes a
654     * reference so the buffer list can be unreffed at any time after calling
655     * this function.
656     *
657     * When the block property is TRUE, this function can block until free space
658     * becomes available in the queue.
659     *
660     * Since: 1.14
661     */
662   gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
663       g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
664       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
665           push_buffer_list), NULL, NULL, NULL,
666       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
667
668   /**
669     * GstAppSrc::push-sample:
670     * @appsrc: the appsrc
671     * @sample: (transfer none): a sample from which extract buffer to push
672     *
673     * Extract a buffer from the provided sample and adds the extracted buffer
674     * to the queue of buffers that the appsrc element will
675     * push to its source pad. This function set the appsrc caps based on the caps
676     * in the sample and reset the caps if they change.
677     * Only the caps and the buffer of the provided sample are used and not
678     * for example the segment in the sample.
679     *
680     * This function does not take ownership of the sample, but it takes a
681     * reference so the sample can be unreffed at any time after calling this
682     * function.
683     *
684     * When the block property is TRUE, this function can block until free space
685     * becomes available in the queue.
686     *
687     * Since: 1.6
688     */
689   gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
690       g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
691       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
692           push_sample), NULL, NULL, NULL,
693       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
694
695
696    /**
697     * GstAppSrc::end-of-stream:
698     * @appsrc: the appsrc
699     *
700     * Notify @appsrc that no more buffer are available.
701     */
702   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
703       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
704       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
705           end_of_stream), NULL, NULL, NULL,
706       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
707
708   gst_element_class_set_static_metadata (element_class, "AppSrc",
709       "Generic/Source", "Allow the application to feed buffers to a pipeline",
710       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
711
712   gst_element_class_add_static_pad_template (element_class,
713       &gst_app_src_template);
714
715   element_class->send_event = gst_app_src_send_event;
716
717   basesrc_class->negotiate = gst_app_src_negotiate;
718   basesrc_class->get_caps = gst_app_src_internal_get_caps;
719   basesrc_class->create = gst_app_src_create;
720   basesrc_class->start = gst_app_src_start;
721   basesrc_class->stop = gst_app_src_stop;
722   basesrc_class->unlock = gst_app_src_unlock;
723   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
724   basesrc_class->do_seek = gst_app_src_do_seek;
725   basesrc_class->is_seekable = gst_app_src_is_seekable;
726   basesrc_class->get_size = gst_app_src_do_get_size;
727   basesrc_class->query = gst_app_src_query;
728   basesrc_class->event = gst_app_src_event;
729
730   klass->push_buffer = gst_app_src_push_buffer_action;
731   klass->push_buffer_list = gst_app_src_push_buffer_list_action;
732   klass->push_sample = gst_app_src_push_sample_action;
733   klass->end_of_stream = gst_app_src_end_of_stream;
734 }
735
736 static void
737 gst_app_src_init (GstAppSrc * appsrc)
738 {
739   GstAppSrcPrivate *priv;
740
741   priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
742
743   g_mutex_init (&priv->mutex);
744   g_cond_init (&priv->cond);
745   priv->queue = gst_queue_array_new (16);
746   priv->wait_status = NOONE_WAITING;
747
748   priv->size = DEFAULT_PROP_SIZE;
749   priv->duration = DEFAULT_PROP_DURATION;
750   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
751   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
752   priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
753   priv->max_time = DEFAULT_PROP_MAX_TIME;
754   priv->format = DEFAULT_PROP_FORMAT;
755   priv->block = DEFAULT_PROP_BLOCK;
756   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
757   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
758   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
759   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
760   priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
761   priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
762
763   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
764 }
765
766 /* Must be called with priv->mutex */
767 static void
768 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
769 {
770   GstMiniObject *obj;
771   GstAppSrcPrivate *priv = src->priv;
772   GstCaps *requeue_caps = NULL;
773
774   while (!gst_queue_array_is_empty (priv->queue)) {
775     obj = gst_queue_array_pop_head (priv->queue);
776     if (obj) {
777       if (GST_IS_CAPS (obj) && retain_last_caps) {
778         gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
779       }
780       gst_mini_object_unref (obj);
781     }
782   }
783
784   if (requeue_caps) {
785     gst_queue_array_push_tail (priv->queue, requeue_caps);
786   }
787
788   priv->queued_bytes = 0;
789   priv->queued_buffers = 0;
790   priv->queued_time = 0;
791   priv->last_in_running_time = GST_CLOCK_TIME_NONE;
792   priv->last_out_running_time = GST_CLOCK_TIME_NONE;
793   priv->need_discont_upstream = FALSE;
794   priv->need_discont_downstream = FALSE;
795 }
796
797 static void
798 gst_app_src_dispose (GObject * obj)
799 {
800   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
801   GstAppSrcPrivate *priv = appsrc->priv;
802   Callbacks *callbacks = NULL;
803
804   GST_OBJECT_LOCK (appsrc);
805   if (priv->current_caps) {
806     gst_caps_unref (priv->current_caps);
807     priv->current_caps = NULL;
808   }
809   if (priv->last_caps) {
810     gst_caps_unref (priv->last_caps);
811     priv->last_caps = NULL;
812   }
813   GST_OBJECT_UNLOCK (appsrc);
814
815   g_mutex_lock (&priv->mutex);
816   if (priv->callbacks)
817     callbacks = g_steal_pointer (&priv->callbacks);
818   gst_app_src_flush_queued (appsrc, FALSE);
819   g_mutex_unlock (&priv->mutex);
820
821   g_clear_pointer (&callbacks, callbacks_unref);
822
823   G_OBJECT_CLASS (parent_class)->dispose (obj);
824 }
825
826 static void
827 gst_app_src_finalize (GObject * obj)
828 {
829   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
830   GstAppSrcPrivate *priv = appsrc->priv;
831
832   g_mutex_clear (&priv->mutex);
833   g_cond_clear (&priv->cond);
834   gst_queue_array_free (priv->queue);
835
836   g_free (priv->uri);
837
838   G_OBJECT_CLASS (parent_class)->finalize (obj);
839 }
840
841 static GstCaps *
842 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
843 {
844   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
845   GstCaps *caps;
846
847   GST_OBJECT_LOCK (appsrc);
848   if ((caps = appsrc->priv->current_caps))
849     gst_caps_ref (caps);
850   GST_OBJECT_UNLOCK (appsrc);
851
852   if (filter) {
853     if (caps) {
854       GstCaps *intersection =
855           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
856       gst_caps_unref (caps);
857       caps = intersection;
858     } else {
859       caps = gst_caps_ref (filter);
860     }
861   }
862
863   GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
864   return caps;
865 }
866
867 static void
868 gst_app_src_set_property (GObject * object, guint prop_id,
869     const GValue * value, GParamSpec * pspec)
870 {
871   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
872   GstAppSrcPrivate *priv = appsrc->priv;
873
874   switch (prop_id) {
875     case PROP_CAPS:
876       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
877       break;
878     case PROP_SIZE:
879       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
880       break;
881     case PROP_STREAM_TYPE:
882       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
883       break;
884     case PROP_MAX_BYTES:
885       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
886       break;
887     case PROP_MAX_BUFFERS:
888       gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
889       break;
890     case PROP_MAX_TIME:
891       gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
892       break;
893     case PROP_FORMAT:
894       priv->format = g_value_get_enum (value);
895       break;
896     case PROP_BLOCK:
897       priv->block = g_value_get_boolean (value);
898       break;
899     case PROP_IS_LIVE:
900       gst_base_src_set_live (GST_BASE_SRC (appsrc),
901           g_value_get_boolean (value));
902       break;
903     case PROP_MIN_LATENCY:
904       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
905           FALSE, -1);
906       break;
907     case PROP_MAX_LATENCY:
908       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
909           g_value_get_int64 (value));
910       break;
911     case PROP_EMIT_SIGNALS:
912       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
913       break;
914     case PROP_MIN_PERCENT:
915       priv->min_percent = g_value_get_uint (value);
916       break;
917     case PROP_DURATION:
918       gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
919       break;
920     case PROP_HANDLE_SEGMENT_CHANGE:
921       priv->handle_segment_change = g_value_get_boolean (value);
922       break;
923     case PROP_LEAKY_TYPE:
924       priv->leaky_type = g_value_get_enum (value);
925       break;
926     default:
927       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
928       break;
929   }
930 }
931
932 static void
933 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
934     GParamSpec * pspec)
935 {
936   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
937   GstAppSrcPrivate *priv = appsrc->priv;
938
939   switch (prop_id) {
940     case PROP_CAPS:
941       g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
942       break;
943     case PROP_SIZE:
944       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
945       break;
946     case PROP_STREAM_TYPE:
947       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
948       break;
949     case PROP_MAX_BYTES:
950       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
951       break;
952     case PROP_MAX_BUFFERS:
953       g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
954       break;
955     case PROP_MAX_TIME:
956       g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
957       break;
958     case PROP_FORMAT:
959       g_value_set_enum (value, priv->format);
960       break;
961     case PROP_BLOCK:
962       g_value_set_boolean (value, priv->block);
963       break;
964     case PROP_IS_LIVE:
965       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
966       break;
967     case PROP_MIN_LATENCY:
968     {
969       guint64 min = 0;
970
971       gst_app_src_get_latency (appsrc, &min, NULL);
972       g_value_set_int64 (value, min);
973       break;
974     }
975     case PROP_MAX_LATENCY:
976     {
977       guint64 max = 0;
978
979       gst_app_src_get_latency (appsrc, NULL, &max);
980       g_value_set_int64 (value, max);
981       break;
982     }
983     case PROP_EMIT_SIGNALS:
984       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
985       break;
986     case PROP_MIN_PERCENT:
987       g_value_set_uint (value, priv->min_percent);
988       break;
989     case PROP_CURRENT_LEVEL_BYTES:
990       g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
991       break;
992     case PROP_CURRENT_LEVEL_BUFFERS:
993       g_value_set_uint64 (value,
994           gst_app_src_get_current_level_buffers (appsrc));
995       break;
996     case PROP_CURRENT_LEVEL_TIME:
997       g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
998       break;
999     case PROP_DURATION:
1000       g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
1001       break;
1002     case PROP_HANDLE_SEGMENT_CHANGE:
1003       g_value_set_boolean (value, priv->handle_segment_change);
1004       break;
1005     case PROP_LEAKY_TYPE:
1006       g_value_set_enum (value, priv->leaky_type);
1007       break;
1008     default:
1009       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1010       break;
1011   }
1012 }
1013
1014 static gboolean
1015 gst_app_src_send_event (GstElement * element, GstEvent * event)
1016 {
1017   GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
1018   GstAppSrcPrivate *priv = appsrc->priv;
1019
1020   switch (GST_EVENT_TYPE (event)) {
1021     case GST_EVENT_FLUSH_STOP:
1022       g_mutex_lock (&priv->mutex);
1023       gst_app_src_flush_queued (appsrc, TRUE);
1024       g_mutex_unlock (&priv->mutex);
1025       break;
1026     default:
1027       if (GST_EVENT_IS_SERIALIZED (event)) {
1028         GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
1029         g_mutex_lock (&priv->mutex);
1030         gst_queue_array_push_tail (priv->queue, event);
1031
1032         if ((priv->wait_status & STREAM_WAITING))
1033           g_cond_broadcast (&priv->cond);
1034
1035         g_mutex_unlock (&priv->mutex);
1036         return TRUE;
1037       }
1038       break;
1039   }
1040
1041   return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
1042           event), FALSE);
1043 }
1044
1045 static gboolean
1046 gst_app_src_unlock (GstBaseSrc * bsrc)
1047 {
1048   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1049   GstAppSrcPrivate *priv = appsrc->priv;
1050
1051   g_mutex_lock (&priv->mutex);
1052   GST_DEBUG_OBJECT (appsrc, "unlock start");
1053   priv->flushing = TRUE;
1054   g_cond_broadcast (&priv->cond);
1055   g_mutex_unlock (&priv->mutex);
1056
1057   return TRUE;
1058 }
1059
1060 static gboolean
1061 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
1062 {
1063   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1064   GstAppSrcPrivate *priv = appsrc->priv;
1065
1066   g_mutex_lock (&priv->mutex);
1067   GST_DEBUG_OBJECT (appsrc, "unlock stop");
1068   priv->flushing = FALSE;
1069   g_cond_broadcast (&priv->cond);
1070   g_mutex_unlock (&priv->mutex);
1071
1072   return TRUE;
1073 }
1074
1075 static gboolean
1076 gst_app_src_start (GstBaseSrc * bsrc)
1077 {
1078   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1079   GstAppSrcPrivate *priv = appsrc->priv;
1080
1081   g_mutex_lock (&priv->mutex);
1082   GST_DEBUG_OBJECT (appsrc, "starting");
1083   priv->started = TRUE;
1084   /* set the offset to -1 so that we always do a first seek. This is only used
1085    * in random-access mode. */
1086   priv->offset = -1;
1087   priv->flushing = FALSE;
1088   g_mutex_unlock (&priv->mutex);
1089
1090   gst_base_src_set_format (bsrc, priv->format);
1091   gst_segment_init (&priv->last_segment, priv->format);
1092   gst_segment_init (&priv->current_segment, priv->format);
1093   priv->pending_custom_segment = FALSE;
1094
1095   return TRUE;
1096 }
1097
1098 static gboolean
1099 gst_app_src_stop (GstBaseSrc * bsrc)
1100 {
1101   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1102   GstAppSrcPrivate *priv = appsrc->priv;
1103
1104   g_mutex_lock (&priv->mutex);
1105   GST_DEBUG_OBJECT (appsrc, "stopping");
1106   priv->is_eos = FALSE;
1107   priv->flushing = TRUE;
1108   priv->started = FALSE;
1109   priv->posted_latency_msg = FALSE;
1110   gst_app_src_flush_queued (appsrc, TRUE);
1111   g_cond_broadcast (&priv->cond);
1112   g_mutex_unlock (&priv->mutex);
1113
1114   return TRUE;
1115 }
1116
1117 static gboolean
1118 gst_app_src_is_seekable (GstBaseSrc * src)
1119 {
1120   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1121   GstAppSrcPrivate *priv = appsrc->priv;
1122   gboolean res = FALSE;
1123
1124   switch (priv->stream_type) {
1125     case GST_APP_STREAM_TYPE_STREAM:
1126       break;
1127     case GST_APP_STREAM_TYPE_SEEKABLE:
1128     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1129       res = TRUE;
1130       break;
1131   }
1132   return res;
1133 }
1134
1135 static gboolean
1136 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
1137 {
1138   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1139
1140   *size = gst_app_src_get_size (appsrc);
1141
1142   return TRUE;
1143 }
1144
1145 static gboolean
1146 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
1147 {
1148   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1149   GstAppSrcPrivate *priv = appsrc->priv;
1150   gboolean res;
1151
1152   switch (GST_QUERY_TYPE (query)) {
1153     case GST_QUERY_LATENCY:
1154     {
1155       GstClockTime min, max;
1156       gboolean live;
1157
1158       /* Query the parent class for the defaults */
1159       res = gst_base_src_query_latency (src, &live, &min, &max);
1160
1161       /* overwrite with our values when we need to */
1162       g_mutex_lock (&priv->mutex);
1163       if (priv->min_latency != -1) {
1164         min = priv->min_latency;
1165         max = priv->max_latency;
1166       }
1167       g_mutex_unlock (&priv->mutex);
1168
1169       gst_query_set_latency (query, live, min, max);
1170       break;
1171     }
1172     case GST_QUERY_SCHEDULING:
1173     {
1174       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1175       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1176
1177       switch (priv->stream_type) {
1178         case GST_APP_STREAM_TYPE_STREAM:
1179         case GST_APP_STREAM_TYPE_SEEKABLE:
1180           break;
1181         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1182           gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1183           break;
1184       }
1185       res = TRUE;
1186       break;
1187     }
1188     case GST_QUERY_DURATION:
1189     {
1190       GstFormat format;
1191       gst_query_parse_duration (query, &format, NULL);
1192       if (format == GST_FORMAT_BYTES) {
1193         gst_query_set_duration (query, format, priv->size);
1194         res = TRUE;
1195       } else if (format == GST_FORMAT_TIME) {
1196         if (priv->duration != GST_CLOCK_TIME_NONE) {
1197           gst_query_set_duration (query, format, priv->duration);
1198           res = TRUE;
1199         } else {
1200           res = FALSE;
1201         }
1202       } else {
1203         res = FALSE;
1204       }
1205       break;
1206     }
1207     default:
1208       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
1209       break;
1210   }
1211
1212   return res;
1213 }
1214
1215 /* will be called in push mode */
1216 static gboolean
1217 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1218 {
1219   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1220   GstAppSrcPrivate *priv = appsrc->priv;
1221   gint64 desired_position;
1222   gboolean res = FALSE;
1223   gboolean emit;
1224   Callbacks *callbacks = NULL;
1225
1226   desired_position = segment->position;
1227
1228   /* no need to try to seek in streaming mode */
1229   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1230     return TRUE;
1231
1232   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1233       desired_position, gst_format_get_name (segment->format));
1234
1235   g_mutex_lock (&priv->mutex);
1236   emit = priv->emit_signals;
1237   if (priv->callbacks)
1238     callbacks = callbacks_ref (priv->callbacks);
1239   g_mutex_unlock (&priv->mutex);
1240
1241   if (callbacks && callbacks->callbacks.seek_data) {
1242     res =
1243         callbacks->callbacks.seek_data (appsrc, desired_position,
1244         callbacks->user_data);
1245   } else if (emit) {
1246     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1247         desired_position, &res);
1248   }
1249
1250   g_clear_pointer (&callbacks, callbacks_unref);
1251
1252   if (res) {
1253     GST_DEBUG_OBJECT (appsrc, "flushing queue");
1254     g_mutex_lock (&priv->mutex);
1255     gst_app_src_flush_queued (appsrc, TRUE);
1256     gst_segment_copy_into (segment, &priv->last_segment);
1257     gst_segment_copy_into (segment, &priv->current_segment);
1258     priv->pending_custom_segment = FALSE;
1259     g_mutex_unlock (&priv->mutex);
1260     priv->is_eos = FALSE;
1261   } else {
1262     GST_WARNING_OBJECT (appsrc, "seek failed");
1263   }
1264
1265   return res;
1266 }
1267
1268 /* must be called with the appsrc mutex */
1269 static gboolean
1270 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1271 {
1272   gboolean res = FALSE;
1273   gboolean emit;
1274   GstAppSrcPrivate *priv = appsrc->priv;
1275   Callbacks *callbacks = NULL;
1276
1277   emit = priv->emit_signals;
1278   if (priv->callbacks)
1279     callbacks = callbacks_ref (priv->callbacks);
1280   g_mutex_unlock (&priv->mutex);
1281
1282   GST_DEBUG_OBJECT (appsrc,
1283       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1284       priv->offset, offset);
1285
1286   if (callbacks && callbacks->callbacks.seek_data)
1287     res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
1288   else if (emit)
1289     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1290         offset, &res);
1291
1292   g_clear_pointer (&callbacks, callbacks_unref);
1293
1294   g_mutex_lock (&priv->mutex);
1295
1296   return res;
1297 }
1298
1299 /* must be called with the appsrc mutex. After this call things can be
1300  * flushing */
1301 static void
1302 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1303 {
1304   gboolean emit;
1305   GstAppSrcPrivate *priv = appsrc->priv;
1306   Callbacks *callbacks = NULL;
1307
1308   emit = priv->emit_signals;
1309   if (priv->callbacks)
1310     callbacks = callbacks_ref (priv->callbacks);
1311   g_mutex_unlock (&priv->mutex);
1312
1313   /* we have no data, we need some. We fire the signal with the size hint. */
1314   if (callbacks && callbacks->callbacks.need_data)
1315     callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
1316   else if (emit)
1317     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1318         NULL);
1319
1320   g_clear_pointer (&callbacks, callbacks_unref);
1321
1322   g_mutex_lock (&priv->mutex);
1323   /* we can be flushing now because we released the lock */
1324 }
1325
1326 /* must be called with the appsrc mutex */
1327 static gboolean
1328 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1329 {
1330   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1331   GstAppSrcPrivate *priv = appsrc->priv;
1332   gboolean result;
1333   GstCaps *caps;
1334
1335   GST_OBJECT_LOCK (basesrc);
1336   caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1337   GST_OBJECT_UNLOCK (basesrc);
1338
1339   /* Avoid deadlock by unlocking mutex
1340    * otherwise we get deadlock between this and stream lock */
1341   g_mutex_unlock (&priv->mutex);
1342   if (caps) {
1343     result = gst_base_src_set_caps (basesrc, caps);
1344     gst_caps_unref (caps);
1345   } else {
1346     result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1347   }
1348   g_mutex_lock (&priv->mutex);
1349
1350   return result;
1351 }
1352
1353 static gboolean
1354 gst_app_src_negotiate (GstBaseSrc * basesrc)
1355 {
1356   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1357   GstAppSrcPrivate *priv = appsrc->priv;
1358   gboolean result;
1359
1360   g_mutex_lock (&priv->mutex);
1361   result = gst_app_src_do_negotiate (basesrc);
1362   g_mutex_unlock (&priv->mutex);
1363   return result;
1364 }
1365
1366 /* Update the currently queued bytes/buffers/time information for the item
1367  * that was just removed from the queue.
1368  *
1369  * If update_offset is set, additionally the offset of the source will be
1370  * moved forward accordingly as if that many bytes were output.
1371  */
1372 static void
1373 gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
1374     gboolean update_offset)
1375 {
1376   GstAppSrcPrivate *priv = appsrc->priv;
1377   guint buf_size = 0;
1378   guint n_buffers = 0;
1379   GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1380
1381   if (GST_IS_BUFFER (item)) {
1382     GstBuffer *buf = GST_BUFFER_CAST (item);
1383     buf_size = gst_buffer_get_size (buf);
1384     n_buffers = 1;
1385
1386     end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1387     if (end_buffer_ts != GST_CLOCK_TIME_NONE
1388         && GST_BUFFER_DURATION_IS_VALID (buf))
1389       end_buffer_ts += GST_BUFFER_DURATION (buf);
1390
1391     GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
1392   } else if (GST_IS_BUFFER_LIST (item)) {
1393     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1394     guint i;
1395
1396     n_buffers = gst_buffer_list_length (buffer_list);
1397
1398     for (i = 0; i < n_buffers; i++) {
1399       GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1400       GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1401
1402       buf_size += gst_buffer_get_size (tmp);
1403       /* Update to the last buffer's timestamp that is known */
1404       if (ts != GST_CLOCK_TIME_NONE) {
1405         end_buffer_ts = ts;
1406         if (GST_BUFFER_DURATION_IS_VALID (tmp))
1407           end_buffer_ts += GST_BUFFER_DURATION (tmp);
1408       }
1409     }
1410   }
1411
1412   priv->queued_bytes -= buf_size;
1413   priv->queued_buffers -= n_buffers;
1414
1415   /* Update time level if working on a TIME segment */
1416   if ((priv->current_segment.format == GST_FORMAT_TIME
1417           || (priv->current_segment.format == GST_FORMAT_UNDEFINED
1418               && priv->last_segment.format == GST_FORMAT_TIME))
1419       && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1420     const GstSegment *segment =
1421         priv->current_segment.format ==
1422         GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
1423
1424     /* Clip to the current segment boundaries */
1425     if (segment->stop != -1 && end_buffer_ts > segment->stop)
1426       end_buffer_ts = segment->stop;
1427     else if (segment->start > end_buffer_ts)
1428       end_buffer_ts = segment->start;
1429
1430     priv->last_out_running_time =
1431         gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
1432
1433     GST_TRACE_OBJECT (appsrc,
1434         "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1435         GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1436         GST_TIME_ARGS (priv->last_out_running_time));
1437
1438     /* If timestamps on both sides are known, calculate the current
1439      * fill level in time and consider the queue empty if the output
1440      * running time is lower than the input one (i.e. some kind of reset
1441      * has happened).
1442      */
1443     if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1444         && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1445       if (priv->last_out_running_time > priv->last_in_running_time) {
1446         priv->queued_time = 0;
1447       } else {
1448         priv->queued_time =
1449             priv->last_in_running_time - priv->last_out_running_time;
1450       }
1451     }
1452   }
1453
1454   GST_DEBUG_OBJECT (appsrc,
1455       "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1456       " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
1457       priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
1458
1459   /* only update the offset when in random_access mode and when requested by
1460    * the caller, i.e. not when just dropping the item */
1461   if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1462     priv->offset += buf_size;
1463 }
1464
1465 /* Update the currently queued bytes/buffers/time information for the item
1466  * that was just added to the queue.
1467  */
1468 static void
1469 gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
1470 {
1471   GstAppSrcPrivate *priv = appsrc->priv;
1472   GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
1473   GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1474   guint buf_size = 0;
1475   guint n_buffers = 0;
1476
1477   if (GST_IS_BUFFER (item)) {
1478     GstBuffer *buf = GST_BUFFER_CAST (item);
1479
1480     buf_size = gst_buffer_get_size (buf);
1481     n_buffers = 1;
1482
1483     start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1484     if (end_buffer_ts != GST_CLOCK_TIME_NONE
1485         && GST_BUFFER_DURATION_IS_VALID (buf))
1486       end_buffer_ts += GST_BUFFER_DURATION (buf);
1487   } else if (GST_IS_BUFFER_LIST (item)) {
1488     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1489     guint i;
1490
1491     n_buffers = gst_buffer_list_length (buffer_list);
1492
1493     for (i = 0; i < n_buffers; i++) {
1494       GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1495       GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1496
1497       buf_size += gst_buffer_get_size (tmp);
1498
1499       if (ts != GST_CLOCK_TIME_NONE) {
1500         if (start_buffer_ts == GST_CLOCK_TIME_NONE)
1501           start_buffer_ts = ts;
1502         end_buffer_ts = ts;
1503         if (GST_BUFFER_DURATION_IS_VALID (tmp))
1504           end_buffer_ts += GST_BUFFER_DURATION (tmp);
1505       }
1506     }
1507   }
1508
1509   priv->queued_bytes += buf_size;
1510   priv->queued_buffers += n_buffers;
1511
1512   /* Update time level if working on a TIME segment */
1513   if (priv->last_segment.format == GST_FORMAT_TIME
1514       && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1515     /* Clip to the last segment boundaries */
1516     if (priv->last_segment.stop != -1
1517         && end_buffer_ts > priv->last_segment.stop)
1518       end_buffer_ts = priv->last_segment.stop;
1519     else if (priv->last_segment.start > end_buffer_ts)
1520       end_buffer_ts = priv->last_segment.start;
1521
1522     priv->last_in_running_time =
1523         gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1524         end_buffer_ts);
1525
1526     /* If this is the only buffer then we can directly update the queued time
1527      * here. This is especially useful if this was the first buffer because
1528      * otherwise we would have to wait until it is actually unqueued to know
1529      * the queued duration */
1530     if (priv->queued_buffers == 1) {
1531       if (priv->last_segment.stop != -1
1532           && start_buffer_ts > priv->last_segment.stop)
1533         start_buffer_ts = priv->last_segment.stop;
1534       else if (priv->last_segment.start > start_buffer_ts)
1535         start_buffer_ts = priv->last_segment.start;
1536
1537       priv->last_out_running_time =
1538           gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1539           start_buffer_ts);
1540     }
1541
1542     GST_TRACE_OBJECT (appsrc,
1543         "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1544         GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1545         GST_TIME_ARGS (priv->last_out_running_time));
1546
1547     if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1548         && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1549       if (priv->last_out_running_time > priv->last_in_running_time) {
1550         priv->queued_time = 0;
1551       } else {
1552         priv->queued_time =
1553             priv->last_in_running_time - priv->last_out_running_time;
1554       }
1555     }
1556   }
1557
1558   GST_DEBUG_OBJECT (appsrc,
1559       "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1560       " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
1561       GST_TIME_ARGS (priv->queued_time));
1562 }
1563
1564 static GstFlowReturn
1565 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1566     GstBuffer ** buf)
1567 {
1568   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1569   GstAppSrcPrivate *priv = appsrc->priv;
1570   GstFlowReturn ret;
1571
1572   GST_OBJECT_LOCK (appsrc);
1573   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1574           bsrc->segment.format == GST_FORMAT_BYTES)) {
1575     GST_DEBUG_OBJECT (appsrc,
1576         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1577         bsrc->segment.duration, priv->size);
1578     bsrc->segment.duration = priv->size;
1579     GST_OBJECT_UNLOCK (appsrc);
1580
1581     gst_element_post_message (GST_ELEMENT (appsrc),
1582         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1583   } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1584           bsrc->segment.format == GST_FORMAT_TIME)) {
1585     GST_DEBUG_OBJECT (appsrc,
1586         "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1587         GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1588     bsrc->segment.duration = priv->duration;
1589     GST_OBJECT_UNLOCK (appsrc);
1590
1591     gst_element_post_message (GST_ELEMENT (appsrc),
1592         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1593   } else {
1594     GST_OBJECT_UNLOCK (appsrc);
1595   }
1596
1597   g_mutex_lock (&priv->mutex);
1598   /* check flushing first */
1599   if (G_UNLIKELY (priv->flushing))
1600     goto flushing;
1601
1602   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1603     /* if we are dealing with a random-access stream, issue a seek if the offset
1604      * changed. */
1605     if (G_UNLIKELY (priv->offset != offset)) {
1606       gboolean res;
1607
1608       /* do the seek */
1609       res = gst_app_src_emit_seek (appsrc, offset);
1610
1611       if (G_UNLIKELY (!res))
1612         /* failing to seek is fatal */
1613         goto seek_error;
1614
1615       priv->offset = offset;
1616       priv->is_eos = FALSE;
1617     }
1618   }
1619
1620   while (TRUE) {
1621     /* Our lock may have been release to push events or caps, check out
1622      * state in case we are now flushing. */
1623     if (G_UNLIKELY (priv->flushing))
1624       goto flushing;
1625
1626     /* return data as long as we have some */
1627     if (!gst_queue_array_is_empty (priv->queue)) {
1628       GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1629
1630       if (GST_IS_CAPS (obj)) {
1631         GstCaps *next_caps = GST_CAPS (obj);
1632         gboolean caps_changed = TRUE;
1633
1634         if (next_caps && priv->current_caps)
1635           caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1636         else
1637           caps_changed = (next_caps != priv->current_caps);
1638
1639         gst_caps_replace (&priv->current_caps, next_caps);
1640
1641         if (next_caps) {
1642           gst_caps_unref (next_caps);
1643         }
1644
1645         if (caps_changed)
1646           gst_app_src_do_negotiate (bsrc);
1647
1648         /* Continue checks caps and queue */
1649         continue;
1650       }
1651
1652       if (GST_IS_BUFFER (obj)) {
1653         GstBuffer *buffer = GST_BUFFER (obj);
1654
1655         /* Mark the buffer as DISCONT if we previously dropped a buffer
1656          * instead of outputting it */
1657         if (priv->need_discont_downstream) {
1658           buffer = gst_buffer_make_writable (buffer);
1659           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1660           priv->need_discont_downstream = FALSE;
1661         }
1662
1663         *buf = buffer;
1664       } else if (GST_IS_BUFFER_LIST (obj)) {
1665         GstBufferList *buffer_list;
1666
1667         buffer_list = GST_BUFFER_LIST (obj);
1668
1669         /* Mark the first buffer of the buffer list as DISCONT if we
1670          * previously dropped a buffer instead of outputting it */
1671         if (priv->need_discont_downstream) {
1672           GstBuffer *buffer;
1673
1674           buffer_list = gst_buffer_list_make_writable (buffer_list);
1675           buffer = gst_buffer_list_get_writable (buffer_list, 0);
1676           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1677           priv->need_discont_downstream = FALSE;
1678         }
1679
1680         gst_base_src_submit_buffer_list (bsrc, buffer_list);
1681         *buf = NULL;
1682       } else if (GST_IS_EVENT (obj)) {
1683         GstEvent *event = GST_EVENT (obj);
1684
1685         GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
1686
1687         if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1688           const GstSegment *segment = NULL;
1689
1690           gst_event_parse_segment (event, &segment);
1691           g_assert (segment != NULL);
1692
1693           if (!gst_segment_is_equal (&priv->current_segment, segment)) {
1694             GST_DEBUG_OBJECT (appsrc,
1695                 "Update new segment %" GST_PTR_FORMAT, event);
1696             if (!gst_base_src_new_segment (bsrc, segment)) {
1697               GST_ERROR_OBJECT (appsrc,
1698                   "Couldn't set new segment %" GST_PTR_FORMAT, event);
1699               gst_event_unref (event);
1700               goto invalid_segment;
1701             }
1702             gst_segment_copy_into (segment, &priv->current_segment);
1703           }
1704
1705           gst_event_unref (event);
1706         } else {
1707           GstEvent *seg_event;
1708           GstSegment last_segment = priv->last_segment;
1709
1710           /* event is serialized with the buffers flow */
1711
1712           /* We are about to push an event, release out lock */
1713           g_mutex_unlock (&priv->mutex);
1714
1715           seg_event =
1716               gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
1717               GST_EVENT_SEGMENT, 0);
1718           if (!seg_event) {
1719             seg_event = gst_event_new_segment (&last_segment);
1720
1721             GST_DEBUG_OBJECT (appsrc,
1722                 "received serialized event before first buffer, push default segment %"
1723                 GST_PTR_FORMAT, seg_event);
1724
1725             gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
1726           } else {
1727             gst_event_unref (seg_event);
1728           }
1729
1730           gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
1731
1732           g_mutex_lock (&priv->mutex);
1733         }
1734         continue;
1735       } else {
1736         g_assert_not_reached ();
1737       }
1738
1739       gst_app_src_update_queued_pop (appsrc, obj, TRUE);
1740
1741       /* signal that we removed an item */
1742       if ((priv->wait_status & APP_WAITING))
1743         g_cond_broadcast (&priv->cond);
1744
1745       /* see if we go lower than the min-percent */
1746       if (priv->min_percent) {
1747         if ((priv->max_bytes
1748                 && priv->queued_bytes * 100 / priv->max_bytes <=
1749                 priv->min_percent) || (priv->max_buffers
1750                 && priv->queued_buffers * 100 / priv->max_buffers <=
1751                 priv->min_percent) || (priv->max_time
1752                 && priv->queued_time * 100 / priv->max_time <=
1753                 priv->min_percent)) {
1754           /* ignore flushing state, we got a buffer and we will return it now.
1755            * Errors will be handled in the next round */
1756           gst_app_src_emit_need_data (appsrc, size);
1757         }
1758       }
1759       ret = GST_FLOW_OK;
1760       break;
1761     } else {
1762       gst_app_src_emit_need_data (appsrc, size);
1763
1764       /* we can be flushing now because we released the lock above */
1765       if (G_UNLIKELY (priv->flushing))
1766         goto flushing;
1767
1768       /* if we have a buffer now, continue the loop and try to return it. In
1769        * random-access mode (where a buffer is normally pushed in the above
1770        * signal) we can still be empty because the pushed buffer got flushed or
1771        * when the application pushes the requested buffer later, we support both
1772        * possibilities. */
1773       if (!gst_queue_array_is_empty (priv->queue))
1774         continue;
1775
1776       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1777     }
1778
1779     /* check EOS */
1780     if (G_UNLIKELY (priv->is_eos))
1781       goto eos;
1782
1783     /* nothing to return, wait a while for new data or flushing. */
1784     priv->wait_status |= STREAM_WAITING;
1785     g_cond_wait (&priv->cond, &priv->mutex);
1786     priv->wait_status &= ~STREAM_WAITING;
1787   }
1788   g_mutex_unlock (&priv->mutex);
1789   return ret;
1790
1791   /* ERRORS */
1792 flushing:
1793   {
1794     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1795     g_mutex_unlock (&priv->mutex);
1796     return GST_FLOW_FLUSHING;
1797   }
1798 eos:
1799   {
1800     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1801     g_mutex_unlock (&priv->mutex);
1802     return GST_FLOW_EOS;
1803   }
1804 seek_error:
1805   {
1806     g_mutex_unlock (&priv->mutex);
1807     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1808         GST_ERROR_SYSTEM);
1809     return GST_FLOW_ERROR;
1810   }
1811
1812 invalid_segment:
1813   {
1814     g_mutex_unlock (&priv->mutex);
1815     GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
1816         (NULL), ("Failed to configure the provided input segment."));
1817     return GST_FLOW_ERROR;
1818   }
1819 }
1820
1821 /* external API */
1822
1823 /**
1824  * gst_app_src_set_caps:
1825  * @appsrc: a #GstAppSrc
1826  * @caps: (nullable): caps to set
1827  *
1828  * Set the capabilities on the appsrc element.  This function takes
1829  * a copy of the caps structure. After calling this method, the source will
1830  * only produce caps that match @caps. @caps must be fixed and the caps on the
1831  * buffers must match the caps or left NULL.
1832  */
1833 void
1834 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1835 {
1836   GstAppSrcPrivate *priv;
1837   gboolean caps_changed;
1838
1839   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1840
1841   priv = appsrc->priv;
1842
1843   g_mutex_lock (&priv->mutex);
1844
1845   GST_OBJECT_LOCK (appsrc);
1846   if (caps && priv->last_caps)
1847     caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1848   else
1849     caps_changed = (caps != priv->last_caps);
1850
1851   if (caps_changed) {
1852     GstCaps *new_caps;
1853     gpointer t;
1854
1855     new_caps = caps ? gst_caps_copy (caps) : NULL;
1856     GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1857
1858     while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1859       gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1860     }
1861     gst_queue_array_push_tail (priv->queue, new_caps);
1862     gst_caps_replace (&priv->last_caps, new_caps);
1863
1864     if ((priv->wait_status & STREAM_WAITING))
1865       g_cond_broadcast (&priv->cond);
1866   }
1867
1868   GST_OBJECT_UNLOCK (appsrc);
1869
1870   g_mutex_unlock (&priv->mutex);
1871 }
1872
1873 /**
1874  * gst_app_src_get_caps:
1875  * @appsrc: a #GstAppSrc
1876  *
1877  * Get the configured caps on @appsrc.
1878  *
1879  * Returns: (nullable) (transfer full): the #GstCaps produced by the source. gst_caps_unref() after usage.
1880  */
1881 GstCaps *
1882 gst_app_src_get_caps (GstAppSrc * appsrc)
1883 {
1884
1885   GstCaps *caps;
1886
1887   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1888
1889   GST_OBJECT_LOCK (appsrc);
1890   if ((caps = appsrc->priv->last_caps))
1891     gst_caps_ref (caps);
1892   GST_OBJECT_UNLOCK (appsrc);
1893
1894   return caps;
1895
1896 }
1897
1898 /**
1899  * gst_app_src_set_size:
1900  * @appsrc: a #GstAppSrc
1901  * @size: the size to set
1902  *
1903  * Set the size of the stream in bytes. A value of -1 means that the size is
1904  * not known.
1905  */
1906 void
1907 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1908 {
1909   GstAppSrcPrivate *priv;
1910
1911   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1912
1913   priv = appsrc->priv;
1914
1915   GST_OBJECT_LOCK (appsrc);
1916   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1917   priv->size = size;
1918   GST_OBJECT_UNLOCK (appsrc);
1919 }
1920
1921 /**
1922  * gst_app_src_get_size:
1923  * @appsrc: a #GstAppSrc
1924  *
1925  * Get the size of the stream in bytes. A value of -1 means that the size is
1926  * not known.
1927  *
1928  * Returns: the size of the stream previously set with gst_app_src_set_size();
1929  */
1930 gint64
1931 gst_app_src_get_size (GstAppSrc * appsrc)
1932 {
1933   gint64 size;
1934   GstAppSrcPrivate *priv;
1935
1936   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1937
1938   priv = appsrc->priv;
1939
1940   GST_OBJECT_LOCK (appsrc);
1941   size = priv->size;
1942   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1943   GST_OBJECT_UNLOCK (appsrc);
1944
1945   return size;
1946 }
1947
1948 /**
1949  * gst_app_src_set_duration:
1950  * @appsrc: a #GstAppSrc
1951  * @duration: the duration to set
1952  *
1953  * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1954  * not known.
1955  *
1956  * Since: 1.10
1957  */
1958 void
1959 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1960 {
1961   GstAppSrcPrivate *priv;
1962
1963   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1964
1965   priv = appsrc->priv;
1966
1967   GST_OBJECT_LOCK (appsrc);
1968   GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1969       GST_TIME_ARGS (duration));
1970   priv->duration = duration;
1971   GST_OBJECT_UNLOCK (appsrc);
1972 }
1973
1974 /**
1975  * gst_app_src_get_duration:
1976  * @appsrc: a #GstAppSrc
1977  *
1978  * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1979  * not known.
1980  *
1981  * Returns: the duration of the stream previously set with gst_app_src_set_duration();
1982  *
1983  * Since: 1.10
1984  */
1985 GstClockTime
1986 gst_app_src_get_duration (GstAppSrc * appsrc)
1987 {
1988   GstClockTime duration;
1989   GstAppSrcPrivate *priv;
1990
1991   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
1992
1993   priv = appsrc->priv;
1994
1995   GST_OBJECT_LOCK (appsrc);
1996   duration = priv->duration;
1997   GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
1998       GST_TIME_ARGS (duration));
1999   GST_OBJECT_UNLOCK (appsrc);
2000
2001   return duration;
2002 }
2003
2004 /**
2005  * gst_app_src_set_stream_type:
2006  * @appsrc: a #GstAppSrc
2007  * @type: the new state
2008  *
2009  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
2010  * be connected to.
2011  *
2012  * A stream_type stream
2013  */
2014 void
2015 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
2016 {
2017   GstAppSrcPrivate *priv;
2018
2019   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2020
2021   priv = appsrc->priv;
2022
2023   GST_OBJECT_LOCK (appsrc);
2024   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
2025   priv->stream_type = type;
2026   GST_OBJECT_UNLOCK (appsrc);
2027 }
2028
2029 /**
2030  * gst_app_src_get_stream_type:
2031  * @appsrc: a #GstAppSrc
2032  *
2033  * Get the stream type. Control the stream type of @appsrc
2034  * with gst_app_src_set_stream_type().
2035  *
2036  * Returns: the stream type.
2037  */
2038 GstAppStreamType
2039 gst_app_src_get_stream_type (GstAppSrc * appsrc)
2040 {
2041   gboolean stream_type;
2042   GstAppSrcPrivate *priv;
2043
2044   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2045
2046   priv = appsrc->priv;
2047
2048   GST_OBJECT_LOCK (appsrc);
2049   stream_type = priv->stream_type;
2050   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
2051   GST_OBJECT_UNLOCK (appsrc);
2052
2053   return stream_type;
2054 }
2055
2056 /**
2057  * gst_app_src_set_max_bytes:
2058  * @appsrc: a #GstAppSrc
2059  * @max: the maximum number of bytes to queue
2060  *
2061  * Set the maximum amount of bytes that can be queued in @appsrc.
2062  * After the maximum amount of bytes are queued, @appsrc will emit the
2063  * "enough-data" signal.
2064  */
2065 void
2066 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
2067 {
2068   GstAppSrcPrivate *priv;
2069
2070   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2071
2072   priv = appsrc->priv;
2073
2074   g_mutex_lock (&priv->mutex);
2075   if (max != priv->max_bytes) {
2076     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
2077     priv->max_bytes = max;
2078     /* signal the change */
2079     g_cond_broadcast (&priv->cond);
2080   }
2081   g_mutex_unlock (&priv->mutex);
2082 }
2083
2084 /**
2085  * gst_app_src_get_max_bytes:
2086  * @appsrc: a #GstAppSrc
2087  *
2088  * Get the maximum amount of bytes that can be queued in @appsrc.
2089  *
2090  * Returns: The maximum amount of bytes that can be queued.
2091  */
2092 guint64
2093 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
2094 {
2095   guint64 result;
2096   GstAppSrcPrivate *priv;
2097
2098   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2099
2100   priv = appsrc->priv;
2101
2102   g_mutex_lock (&priv->mutex);
2103   result = priv->max_bytes;
2104   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
2105   g_mutex_unlock (&priv->mutex);
2106
2107   return result;
2108 }
2109
2110 /**
2111  * gst_app_src_get_current_level_bytes:
2112  * @appsrc: a #GstAppSrc
2113  *
2114  * Get the number of currently queued bytes inside @appsrc.
2115  *
2116  * Returns: The number of currently queued bytes.
2117  *
2118  * Since: 1.2
2119  */
2120 guint64
2121 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
2122 {
2123   guint64 queued;
2124   GstAppSrcPrivate *priv;
2125
2126   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2127
2128   priv = appsrc->priv;
2129
2130   GST_OBJECT_LOCK (appsrc);
2131   queued = priv->queued_bytes;
2132   GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
2133       queued);
2134   GST_OBJECT_UNLOCK (appsrc);
2135
2136   return queued;
2137 }
2138
2139 /**
2140  * gst_app_src_set_max_buffers:
2141  * @appsrc: a #GstAppSrc
2142  * @max: the maximum number of buffers to queue
2143  *
2144  * Set the maximum amount of buffers that can be queued in @appsrc.
2145  * After the maximum amount of buffers are queued, @appsrc will emit the
2146  * "enough-data" signal.
2147  *
2148  * Since: 1.20
2149  */
2150 void
2151 gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
2152 {
2153   GstAppSrcPrivate *priv;
2154
2155   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2156
2157   priv = appsrc->priv;
2158
2159   g_mutex_lock (&priv->mutex);
2160   if (max != priv->max_buffers) {
2161     GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
2162     priv->max_buffers = max;
2163     /* signal the change */
2164     g_cond_broadcast (&priv->cond);
2165   }
2166   g_mutex_unlock (&priv->mutex);
2167 }
2168
2169 /**
2170  * gst_app_src_get_max_buffers:
2171  * @appsrc: a #GstAppSrc
2172  *
2173  * Get the maximum amount of buffers that can be queued in @appsrc.
2174  *
2175  * Returns: The maximum amount of buffers that can be queued.
2176  *
2177  * Since: 1.20
2178  */
2179 guint64
2180 gst_app_src_get_max_buffers (GstAppSrc * appsrc)
2181 {
2182   guint64 result;
2183   GstAppSrcPrivate *priv;
2184
2185   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2186
2187   priv = appsrc->priv;
2188
2189   g_mutex_lock (&priv->mutex);
2190   result = priv->max_buffers;
2191   GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
2192       result);
2193   g_mutex_unlock (&priv->mutex);
2194
2195   return result;
2196 }
2197
2198 /**
2199  * gst_app_src_get_current_level_buffers:
2200  * @appsrc: a #GstAppSrc
2201  *
2202  * Get the number of currently queued buffers inside @appsrc.
2203  *
2204  * Returns: The number of currently queued buffers.
2205  *
2206  * Since: 1.20
2207  */
2208 guint64
2209 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
2210 {
2211   guint64 queued;
2212   GstAppSrcPrivate *priv;
2213
2214   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2215
2216   priv = appsrc->priv;
2217
2218   GST_OBJECT_LOCK (appsrc);
2219   queued = priv->queued_buffers;
2220   GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
2221       queued);
2222   GST_OBJECT_UNLOCK (appsrc);
2223
2224   return queued;
2225 }
2226
2227 /**
2228  * gst_app_src_set_max_time:
2229  * @appsrc: a #GstAppSrc
2230  * @max: the maximum amonut of time to queue
2231  *
2232  * Set the maximum amount of time that can be queued in @appsrc.
2233  * After the maximum amount of time are queued, @appsrc will emit the
2234  * "enough-data" signal.
2235  *
2236  * Since: 1.20
2237  */
2238 void
2239 gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
2240 {
2241   GstAppSrcPrivate *priv;
2242
2243   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2244
2245   priv = appsrc->priv;
2246
2247   g_mutex_lock (&priv->mutex);
2248   if (max != priv->max_time) {
2249     GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
2250         GST_TIME_ARGS (max));
2251     priv->max_time = max;
2252     /* signal the change */
2253     g_cond_broadcast (&priv->cond);
2254   }
2255   g_mutex_unlock (&priv->mutex);
2256 }
2257
2258 /**
2259  * gst_app_src_get_max_time:
2260  * @appsrc: a #GstAppSrc
2261  *
2262  * Get the maximum amount of time that can be queued in @appsrc.
2263  *
2264  * Returns: The maximum amount of time that can be queued.
2265  *
2266  * Since: 1.20
2267  */
2268 GstClockTime
2269 gst_app_src_get_max_time (GstAppSrc * appsrc)
2270 {
2271   GstClockTime result;
2272   GstAppSrcPrivate *priv;
2273
2274   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2275
2276   priv = appsrc->priv;
2277
2278   g_mutex_lock (&priv->mutex);
2279   result = priv->max_time;
2280   GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
2281       GST_TIME_ARGS (result));
2282   g_mutex_unlock (&priv->mutex);
2283
2284   return result;
2285 }
2286
2287 /**
2288  * gst_app_src_get_current_level_time:
2289  * @appsrc: a #GstAppSrc
2290  *
2291  * Get the amount of currently queued time inside @appsrc.
2292  *
2293  * Returns: The amount of currently queued time.
2294  *
2295  * Since: 1.20
2296  */
2297 GstClockTime
2298 gst_app_src_get_current_level_time (GstAppSrc * appsrc)
2299 {
2300   gint64 queued;
2301   GstAppSrcPrivate *priv;
2302
2303   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2304
2305   priv = appsrc->priv;
2306
2307   GST_OBJECT_LOCK (appsrc);
2308   queued = priv->queued_time;
2309   GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
2310       GST_TIME_ARGS (queued));
2311   GST_OBJECT_UNLOCK (appsrc);
2312
2313   return queued;
2314 }
2315
2316 static void
2317 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
2318     gboolean do_max, guint64 max)
2319 {
2320   GstAppSrcPrivate *priv = appsrc->priv;
2321   gboolean changed = FALSE;
2322
2323   g_mutex_lock (&priv->mutex);
2324   if (do_min && priv->min_latency != min) {
2325     priv->min_latency = min;
2326     changed = TRUE;
2327   }
2328   if (do_max && priv->max_latency != max) {
2329     priv->max_latency = max;
2330     changed = TRUE;
2331   }
2332   if (!priv->posted_latency_msg) {
2333     priv->posted_latency_msg = TRUE;
2334     changed = TRUE;
2335   }
2336   g_mutex_unlock (&priv->mutex);
2337
2338   if (changed) {
2339     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
2340     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
2341         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
2342   }
2343 }
2344
2345 /**
2346  * gst_app_src_set_leaky_type:
2347  * @appsrc: a #GstAppSrc
2348  * @leaky: the #GstAppLeakyType
2349  *
2350  * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
2351  * will drop any buffers that are pushed into it once its internal queue is
2352  * full. The selected type defines whether to drop the oldest or new
2353  * buffers.
2354  *
2355  * Since: 1.20
2356  */
2357 void
2358 gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
2359 {
2360   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2361
2362   appsrc->priv->leaky_type = leaky;
2363 }
2364
2365 /**
2366  * gst_app_src_get_leaky_type:
2367  * @appsrc: a #GstAppSrc
2368  *
2369  * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
2370  * for more details.
2371  *
2372  * Returns: The currently set #GstAppLeakyType.
2373  *
2374  * Since: 1.20
2375  */
2376 GstAppLeakyType
2377 gst_app_src_get_leaky_type (GstAppSrc * appsrc)
2378 {
2379   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
2380
2381   return appsrc->priv->leaky_type;
2382 }
2383
2384 /**
2385  * gst_app_src_set_latency:
2386  * @appsrc: a #GstAppSrc
2387  * @min: the min latency
2388  * @max: the max latency
2389  *
2390  * Configure the @min and @max latency in @src. If @min is set to -1, the
2391  * default latency calculations for pseudo-live sources will be used.
2392  */
2393 void
2394 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
2395 {
2396   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
2397 }
2398
2399 /**
2400  * gst_app_src_get_latency:
2401  * @appsrc: a #GstAppSrc
2402  * @min: (out): the min latency
2403  * @max: (out): the max latency
2404  *
2405  * Retrieve the min and max latencies in @min and @max respectively.
2406  */
2407 void
2408 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
2409 {
2410   GstAppSrcPrivate *priv;
2411
2412   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2413
2414   priv = appsrc->priv;
2415
2416   g_mutex_lock (&priv->mutex);
2417   if (min)
2418     *min = priv->min_latency;
2419   if (max)
2420     *max = priv->max_latency;
2421   g_mutex_unlock (&priv->mutex);
2422 }
2423
2424 /**
2425  * gst_app_src_set_emit_signals:
2426  * @appsrc: a #GstAppSrc
2427  * @emit: the new state
2428  *
2429  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
2430  * by default disabled because signal emission is expensive and unneeded when
2431  * the application prefers to operate in pull mode.
2432  */
2433 void
2434 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
2435 {
2436   GstAppSrcPrivate *priv;
2437
2438   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2439
2440   priv = appsrc->priv;
2441
2442   g_mutex_lock (&priv->mutex);
2443   priv->emit_signals = emit;
2444   g_mutex_unlock (&priv->mutex);
2445 }
2446
2447 /**
2448  * gst_app_src_get_emit_signals:
2449  * @appsrc: a #GstAppSrc
2450  *
2451  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
2452  *
2453  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
2454  * signals.
2455  */
2456 gboolean
2457 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
2458 {
2459   gboolean result;
2460   GstAppSrcPrivate *priv;
2461
2462   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2463
2464   priv = appsrc->priv;
2465
2466   g_mutex_lock (&priv->mutex);
2467   result = priv->emit_signals;
2468   g_mutex_unlock (&priv->mutex);
2469
2470   return result;
2471 }
2472
2473 static GstFlowReturn
2474 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
2475     GstBufferList * buflist, gboolean steal_ref)
2476 {
2477   gboolean first = TRUE;
2478   GstAppSrcPrivate *priv;
2479
2480   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2481
2482   priv = appsrc->priv;
2483
2484   if (buffer != NULL)
2485     g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2486   else
2487     g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
2488
2489   if (buflist != NULL) {
2490     if (gst_buffer_list_length (buflist) == 0)
2491       return GST_FLOW_OK;
2492
2493     buffer = gst_buffer_list_get (buflist, 0);
2494   }
2495
2496   if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
2497       GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
2498       gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
2499     GstClock *clock;
2500
2501     clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
2502     if (clock) {
2503       GstClockTime now;
2504       GstClockTime base_time =
2505           gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
2506
2507       now = gst_clock_get_time (clock);
2508       if (now > base_time)
2509         now -= base_time;
2510       else
2511         now = 0;
2512       gst_object_unref (clock);
2513
2514       if (buflist == NULL) {
2515         if (!steal_ref) {
2516           buffer = gst_buffer_copy (buffer);
2517           steal_ref = TRUE;
2518         } else {
2519           buffer = gst_buffer_make_writable (buffer);
2520         }
2521       } else {
2522         if (!steal_ref) {
2523           buflist = gst_buffer_list_copy (buflist);
2524           steal_ref = TRUE;
2525         } else {
2526           buflist = gst_buffer_list_make_writable (buflist);
2527         }
2528         buffer = gst_buffer_list_get_writable (buflist, 0);
2529       }
2530
2531       GST_BUFFER_PTS (buffer) = now;
2532       GST_BUFFER_DTS (buffer) = now;
2533     } else {
2534       GST_WARNING_OBJECT (appsrc,
2535           "do-timestamp=TRUE but buffers are provided before "
2536           "reaching the PLAYING state and having a clock. Timestamps will "
2537           "not be accurate!");
2538     }
2539   }
2540
2541   g_mutex_lock (&priv->mutex);
2542
2543   while (TRUE) {
2544     /* can't accept buffers when we are flushing or EOS */
2545     if (priv->flushing)
2546       goto flushing;
2547
2548     if (priv->is_eos)
2549       goto eos;
2550
2551     if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
2552         (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
2553         (priv->max_time && priv->queued_time >= priv->max_time)) {
2554       GST_DEBUG_OBJECT (appsrc,
2555           "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
2556           G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
2557           " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
2558           GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
2559           priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
2560           priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
2561           GST_TIME_ARGS (priv->max_time));
2562
2563       if (first) {
2564         Callbacks *callbacks = NULL;
2565         gboolean emit;
2566
2567         emit = priv->emit_signals;
2568         if (priv->callbacks)
2569           callbacks = callbacks_ref (priv->callbacks);
2570         /* only signal on the first push */
2571         g_mutex_unlock (&priv->mutex);
2572
2573         if (callbacks && callbacks->callbacks.enough_data)
2574           callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
2575         else if (emit)
2576           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
2577               NULL);
2578
2579         g_clear_pointer (&callbacks, callbacks_unref);
2580
2581         g_mutex_lock (&priv->mutex);
2582       }
2583
2584       if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
2585         priv->need_discont_upstream = TRUE;
2586         goto dropped;
2587       } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
2588         guint i, length = gst_queue_array_get_length (priv->queue);
2589         GstMiniObject *item = NULL;
2590
2591         /* Find the oldest buffer or buffer list and drop it, then update the
2592          * limits. Dropping one is sufficient to go below the limits again.
2593          */
2594         for (i = 0; i < length; i++) {
2595           item = gst_queue_array_peek_nth (priv->queue, i);
2596           if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
2597             gst_queue_array_drop_element (priv->queue, i);
2598             break;
2599           }
2600           /* To not accidentally have an event after the loop */
2601           item = NULL;
2602         }
2603
2604         if (!item) {
2605           GST_FIXME_OBJECT (appsrc,
2606               "No buffer or buffer list queued but queue is full");
2607           /* This shouldn't really happen but in this case we can't really do
2608            * anything apart from accepting the buffer / bufferlist */
2609           break;
2610         }
2611
2612         GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
2613
2614         gst_app_src_update_queued_pop (appsrc, item, FALSE);
2615         gst_mini_object_unref (item);
2616
2617         priv->need_discont_downstream = TRUE;
2618         continue;
2619       }
2620
2621       if (first) {
2622         /* continue to check for flushing/eos after releasing the lock */
2623         first = FALSE;
2624         continue;
2625       }
2626       if (priv->block) {
2627         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
2628         /* we are filled, wait until a buffer gets popped or when we
2629          * flush. */
2630         priv->wait_status |= APP_WAITING;
2631         g_cond_wait (&priv->cond, &priv->mutex);
2632         priv->wait_status &= ~APP_WAITING;
2633       } else {
2634         /* no need to wait for free space, we just pump more data into the
2635          * queue hoping that the caller reacts to the enough-data signal and
2636          * stops pushing buffers. */
2637         break;
2638       }
2639     } else {
2640       break;
2641     }
2642   }
2643
2644   if (priv->pending_custom_segment) {
2645     GstEvent *event = gst_event_new_segment (&priv->last_segment);
2646
2647     GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
2648     gst_queue_array_push_tail (priv->queue, event);
2649     priv->pending_custom_segment = FALSE;
2650   }
2651
2652   if (buflist != NULL) {
2653     /* Mark the first buffer of the buffer list as DISCONT if we previously
2654      * dropped a buffer instead of queueing it */
2655     if (priv->need_discont_upstream) {
2656       if (!steal_ref) {
2657         buflist = gst_buffer_list_copy (buflist);
2658         steal_ref = TRUE;
2659       } else {
2660         buflist = gst_buffer_list_make_writable (buflist);
2661       }
2662       buffer = gst_buffer_list_get_writable (buflist, 0);
2663       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2664       priv->need_discont_upstream = FALSE;
2665     }
2666
2667     GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
2668
2669     if (!steal_ref)
2670       gst_buffer_list_ref (buflist);
2671     gst_queue_array_push_tail (priv->queue, buflist);
2672   } else {
2673     /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
2674      * queueing it */
2675     if (priv->need_discont_upstream) {
2676       if (!steal_ref) {
2677         buffer = gst_buffer_copy (buffer);
2678         steal_ref = TRUE;
2679       } else {
2680         buffer = gst_buffer_make_writable (buffer);
2681       }
2682       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2683       priv->need_discont_upstream = FALSE;
2684     }
2685
2686     GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
2687     if (!steal_ref)
2688       gst_buffer_ref (buffer);
2689     gst_queue_array_push_tail (priv->queue, buffer);
2690   }
2691
2692   gst_app_src_update_queued_push (appsrc,
2693       buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
2694
2695   if ((priv->wait_status & STREAM_WAITING))
2696     g_cond_broadcast (&priv->cond);
2697
2698   g_mutex_unlock (&priv->mutex);
2699
2700   return GST_FLOW_OK;
2701
2702   /* ERRORS */
2703 flushing:
2704   {
2705     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
2706     if (steal_ref) {
2707       if (buflist)
2708         gst_buffer_list_unref (buflist);
2709       else
2710         gst_buffer_unref (buffer);
2711     }
2712     g_mutex_unlock (&priv->mutex);
2713     return GST_FLOW_FLUSHING;
2714   }
2715 eos:
2716   {
2717     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
2718     if (steal_ref) {
2719       if (buflist)
2720         gst_buffer_list_unref (buflist);
2721       else
2722         gst_buffer_unref (buffer);
2723     }
2724     g_mutex_unlock (&priv->mutex);
2725     return GST_FLOW_EOS;
2726   }
2727 dropped:
2728   {
2729     GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
2730     if (steal_ref) {
2731       if (buflist)
2732         gst_buffer_list_unref (buflist);
2733       else
2734         gst_buffer_unref (buffer);
2735     }
2736     g_mutex_unlock (&priv->mutex);
2737     return GST_FLOW_EOS;
2738   }
2739 }
2740
2741 static GstFlowReturn
2742 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
2743     gboolean steal_ref)
2744 {
2745   return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
2746 }
2747
2748 static GstFlowReturn
2749 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
2750 {
2751   GstAppSrcPrivate *priv = appsrc->priv;
2752   GstBufferList *buffer_list;
2753   GstBuffer *buffer;
2754   GstCaps *caps;
2755
2756   g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
2757
2758   caps = gst_sample_get_caps (sample);
2759   if (caps != NULL) {
2760     gst_app_src_set_caps (appsrc, caps);
2761   } else {
2762     GST_WARNING_OBJECT (appsrc, "received sample without caps");
2763   }
2764
2765   if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
2766     GstSegment *segment = gst_sample_get_segment (sample);
2767
2768     if (segment->format != GST_FORMAT_TIME) {
2769       GST_LOG_OBJECT (appsrc, "format %s is not supported",
2770           gst_format_get_name (segment->format));
2771       goto handle_buffer;
2772     }
2773
2774     g_mutex_lock (&priv->mutex);
2775     if (gst_segment_is_equal (&priv->last_segment, segment)) {
2776       GST_LOG_OBJECT (appsrc, "segment wasn't changed");
2777       g_mutex_unlock (&priv->mutex);
2778       goto handle_buffer;
2779     } else {
2780       GST_LOG_OBJECT (appsrc,
2781           "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
2782           &priv->last_segment, segment);
2783     }
2784
2785     /* will be pushed to queue with next buffer/buffer-list */
2786     gst_segment_copy_into (segment, &priv->last_segment);
2787     priv->pending_custom_segment = TRUE;
2788     g_mutex_unlock (&priv->mutex);
2789   }
2790
2791 handle_buffer:
2792
2793   buffer = gst_sample_get_buffer (sample);
2794   if (buffer != NULL)
2795     return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2796
2797   buffer_list = gst_sample_get_buffer_list (sample);
2798   if (buffer_list != NULL)
2799     return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2800
2801   GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
2802   return GST_FLOW_OK;
2803 }
2804
2805 /**
2806  * gst_app_src_push_buffer:
2807  * @appsrc: a #GstAppSrc
2808  * @buffer: (transfer full): a #GstBuffer to push
2809  *
2810  * Adds a buffer to the queue of buffers that the appsrc element will
2811  * push to its source pad.  This function takes ownership of the buffer.
2812  *
2813  * When the block property is TRUE, this function can block until free
2814  * space becomes available in the queue.
2815  *
2816  * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2817  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2818  * #GST_FLOW_EOS when EOS occurred.
2819  */
2820 GstFlowReturn
2821 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
2822 {
2823   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
2824 }
2825
2826 /**
2827  * gst_app_src_push_buffer_list:
2828  * @appsrc: a #GstAppSrc
2829  * @buffer_list: (transfer full): a #GstBufferList to push
2830  *
2831  * Adds a buffer list to the queue of buffers and buffer lists that the
2832  * appsrc element will push to its source pad.  This function takes ownership
2833  * of @buffer_list.
2834  *
2835  * When the block property is TRUE, this function can block until free
2836  * space becomes available in the queue.
2837  *
2838  * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
2839  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2840  * #GST_FLOW_EOS when EOS occurred.
2841  *
2842  * Since: 1.14
2843  */
2844 GstFlowReturn
2845 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
2846 {
2847   return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
2848 }
2849
2850 /**
2851  * gst_app_src_push_sample:
2852  * @appsrc: a #GstAppSrc
2853  * @sample: (transfer none): a #GstSample from which buffer and caps may be
2854  * extracted
2855  *
2856  * Extract a buffer from the provided sample and adds it to the queue of
2857  * buffers that the appsrc element will push to its source pad. Any
2858  * previous caps that were set on appsrc will be replaced by the caps
2859  * associated with the sample if not equal.
2860  *
2861  * This function does not take ownership of the
2862  * sample so the sample needs to be unreffed after calling this function.
2863  *
2864  * When the block property is TRUE, this function can block until free
2865  * space becomes available in the queue.
2866  *
2867  * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2868  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2869  * #GST_FLOW_EOS when EOS occurred.
2870  *
2871  * Since: 1.6
2872  *
2873  */
2874 GstFlowReturn
2875 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2876 {
2877   return gst_app_src_push_sample_internal (appsrc, sample);
2878 }
2879
2880 /* push a buffer without stealing the ref of the buffer. This is used for the
2881  * action signal. */
2882 static GstFlowReturn
2883 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2884 {
2885   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2886 }
2887
2888 /* push a buffer list without stealing the ref of the buffer list. This is
2889  * used for the action signal. */
2890 static GstFlowReturn
2891 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2892     GstBufferList * buffer_list)
2893 {
2894   return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2895 }
2896
2897 /* push a sample without stealing the ref. This is used for the
2898  * action signal. */
2899 static GstFlowReturn
2900 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2901 {
2902   return gst_app_src_push_sample_internal (appsrc, sample);
2903 }
2904
2905 /**
2906  * gst_app_src_end_of_stream:
2907  * @appsrc: a #GstAppSrc
2908  *
2909  * Indicates to the appsrc element that the last buffer queued in the
2910  * element is the last buffer of the stream.
2911  *
2912  * Returns: #GST_FLOW_OK when the EOS was successfully queued.
2913  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2914  */
2915 GstFlowReturn
2916 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2917 {
2918   GstAppSrcPrivate *priv;
2919
2920   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2921
2922   priv = appsrc->priv;
2923
2924   g_mutex_lock (&priv->mutex);
2925   /* can't accept buffers when we are flushing. We can accept them when we are
2926    * EOS although it will not do anything. */
2927   if (priv->flushing)
2928     goto flushing;
2929
2930   GST_DEBUG_OBJECT (appsrc, "sending EOS");
2931   priv->is_eos = TRUE;
2932   g_cond_broadcast (&priv->cond);
2933   g_mutex_unlock (&priv->mutex);
2934
2935   return GST_FLOW_OK;
2936
2937   /* ERRORS */
2938 flushing:
2939   {
2940     g_mutex_unlock (&priv->mutex);
2941     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2942     return GST_FLOW_FLUSHING;
2943   }
2944 }
2945
2946 /**
2947  * gst_app_src_set_callbacks: (skip)
2948  * @appsrc: a #GstAppSrc
2949  * @callbacks: the callbacks
2950  * @user_data: a user_data argument for the callbacks
2951  * @notify: a destroy notify function
2952  *
2953  * Set callbacks which will be executed when data is needed, enough data has
2954  * been collected or when a seek should be performed.
2955  * This is an alternative to using the signals, it has lower overhead and is thus
2956  * less expensive, but also less flexible.
2957  *
2958  * If callbacks are installed, no signals will be emitted for performance
2959  * reasons.
2960  *
2961  * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
2962  * way.
2963  */
2964 void
2965 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2966     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2967 {
2968   Callbacks *old_callbacks, *new_callbacks = NULL;
2969   GstAppSrcPrivate *priv;
2970
2971   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2972   g_return_if_fail (callbacks != NULL);
2973
2974   priv = appsrc->priv;
2975
2976   if (callbacks) {
2977     new_callbacks = g_new0 (Callbacks, 1);
2978     new_callbacks->callbacks = *callbacks;
2979     new_callbacks->user_data = user_data;
2980     new_callbacks->destroy_notify = notify;
2981     new_callbacks->ref_count = 1;
2982   }
2983
2984   g_mutex_lock (&priv->mutex);
2985   old_callbacks = g_steal_pointer (&priv->callbacks);
2986   priv->callbacks = g_steal_pointer (&new_callbacks);
2987   g_mutex_unlock (&priv->mutex);
2988
2989   g_clear_pointer (&old_callbacks, callbacks_unref);
2990 }
2991
2992 /*** GSTURIHANDLER INTERFACE *************************************************/
2993
2994 static GstURIType
2995 gst_app_src_uri_get_type (GType type)
2996 {
2997   return GST_URI_SRC;
2998 }
2999
3000 static const gchar *const *
3001 gst_app_src_uri_get_protocols (GType type)
3002 {
3003   static const gchar *protocols[] = { "appsrc", NULL };
3004
3005   return protocols;
3006 }
3007
3008 static gchar *
3009 gst_app_src_uri_get_uri (GstURIHandler * handler)
3010 {
3011   GstAppSrc *appsrc = GST_APP_SRC (handler);
3012
3013   return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
3014 }
3015
3016 static gboolean
3017 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
3018     GError ** error)
3019 {
3020   GstAppSrc *appsrc = GST_APP_SRC (handler);
3021
3022   g_free (appsrc->priv->uri);
3023   appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
3024
3025   return TRUE;
3026 }
3027
3028 static void
3029 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
3030 {
3031   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
3032
3033   iface->get_type = gst_app_src_uri_get_type;
3034   iface->get_protocols = gst_app_src_uri_get_protocols;
3035   iface->get_uri = gst_app_src_uri_get_uri;
3036   iface->set_uri = gst_app_src_uri_set_uri;
3037 }
3038
3039 static gboolean
3040 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
3041 {
3042   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
3043   GstAppSrcPrivate *priv = appsrc->priv;
3044
3045   switch (GST_EVENT_TYPE (event)) {
3046     case GST_EVENT_FLUSH_STOP:
3047       g_mutex_lock (&priv->mutex);
3048       priv->is_eos = FALSE;
3049       g_mutex_unlock (&priv->mutex);
3050       break;
3051     default:
3052       break;
3053   }
3054
3055   return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
3056 }