2 * Copyright (C) 2007 David Schleef <ds@schleef.org>
3 * (C) 2008 Wim Taymans <wim.taymans@gmail.com>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
23 * @short_description: Easy way for applications to inject buffers into a
25 * @see_also: #GstBaseSrc, appsink
27 * The appsrc element can be used by applications to insert data into a
28 * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29 * external API functions.
31 * appsrc can be used by linking with the libgstapp library to access the
32 * methods directly or by using the appsrc action signals.
34 * Before operating appsrc, the caps property must be set to fixed caps
35 * describing the format of the data that will be pushed with appsrc. An
36 * exception to this is when pushing buffers with unknown caps, in which case no
37 * caps should be set. This is typically true of file-like sources that push raw
38 * byte buffers. If you don't want to explicitly set the caps, you can use
39 * gst_app_src_push_sample. This method gets the caps associated with the
40 * sample and sets them on the appsrc replacing any previously set caps (if
41 * different from sample's caps).
43 * The main way of handing data to the appsrc element is by calling the
44 * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45 * This will put the buffer onto a queue from which appsrc will read from in its
46 * streaming thread. It is important to note that data transport will not happen
47 * from the thread that performed the push-buffer call.
49 * The "max-bytes", "max-buffers" and "max-time" properties control how much
50 * data can be queued in appsrc before appsrc considers the queue full. A
51 * filled internal queue will always signal the "enough-data" signal, which
52 * signals the application that it should stop pushing data into appsrc. The
53 * "block" property will cause appsrc to block the push-buffer method until
54 * free data becomes available again.
56 * When the internal queue is running out of data, the "need-data" signal is
57 * emitted, which signals the application that it should start pushing more data
60 * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
61 * "seek-data" signal when the "stream-mode" property is set to "seekable" or
62 * "random-access". The signal argument will contain the new desired position in
63 * the stream expressed in the unit set with the "format" property. After
64 * receiving the seek-data signal, the application should push-buffers from the
67 * These signals allow the application to operate the appsrc in two different
70 * The push mode, in which the application repeatedly calls the push-buffer/push-sample
71 * method with a new buffer/sample. Optionally, the queue size in the appsrc
72 * can be controlled with the enough-data and need-data signals by respectively
73 * stopping/starting the push-buffer/push-sample calls. This is a typical
74 * mode of operation for the stream-type "stream" and "seekable". Use this
75 * mode when implementing various network protocols or hardware devices.
77 * The pull mode, in which the need-data signal triggers the next push-buffer call.
78 * This mode is typically used in the "random-access" stream-type. Use this
79 * mode for file access or other randomly accessible sources. In this mode, a
80 * buffer of exactly the amount of bytes given by the need-data signal should be
83 * In all modes, the size property on appsrc should contain the total stream
84 * size in bytes. Setting this property is mandatory in the random-access mode.
85 * For the stream and seekable modes, setting this property is optional but
88 * When the application has finished pushing data into appsrc, it should call
89 * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
90 * this call, no more buffers can be pushed into appsrc until a flushing seek
91 * occurs or the state of the appsrc has gone through READY.
99 #include <gst/base/base.h>
103 #include "gstappsrc.h"
108 STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
109 APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
110 } GstAppSrcWaitStatus;
114 GstAppSrcCallbacks callbacks;
116 GDestroyNotify destroy_notify;
121 callbacks_ref (Callbacks * callbacks)
123 g_atomic_int_inc (&callbacks->ref_count);
129 callbacks_unref (Callbacks * callbacks)
131 if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
134 if (callbacks->destroy_notify)
135 callbacks->destroy_notify (callbacks->user_data);
141 struct _GstAppSrcPrivate
145 GstQueueArray *queue;
146 GstAppSrcWaitStatus wait_status;
149 GstCaps *current_caps;
150 /* last segment received on the input */
151 GstSegment last_segment;
152 /* currently configured segment for the output */
153 GstSegment current_segment;
154 /* queue up a segment event based on last_segment before
155 * the next buffer of buffer list */
156 gboolean pending_custom_segment;
158 /* the next buffer that will be queued needs a discont flag
159 * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
160 gboolean need_discont_upstream;
161 /* the next buffer that will be dequeued needs a discont flag
162 * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
163 gboolean need_discont_downstream;
166 GstClockTime duration;
167 GstAppStreamType stream_type;
168 guint64 max_bytes, max_buffers, max_time;
176 guint64 queued_bytes, queued_buffers;
177 /* Used to calculate the current time level */
178 GstClockTime last_in_running_time, last_out_running_time;
179 /* Updated based on the above whenever they change */
180 GstClockTime queued_time;
182 GstAppStreamType current_type;
186 /* Tracks whether the latency message was posted at least once */
187 gboolean posted_latency_msg;
189 gboolean emit_signals;
191 gboolean handle_segment_change;
193 GstAppLeakyType leaky_type;
195 Callbacks *callbacks;
198 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
199 #define GST_CAT_DEFAULT app_src_debug
210 SIGNAL_END_OF_STREAM,
212 SIGNAL_PUSH_BUFFER_LIST,
217 #define DEFAULT_PROP_SIZE -1
218 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
219 #define DEFAULT_PROP_MAX_BYTES 200000
220 #define DEFAULT_PROP_MAX_BUFFERS 0
221 #define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND)
222 #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
223 #define DEFAULT_PROP_BLOCK FALSE
224 #define DEFAULT_PROP_IS_LIVE FALSE
225 #define DEFAULT_PROP_MIN_LATENCY -1
226 #define DEFAULT_PROP_MAX_LATENCY -1
227 #define DEFAULT_PROP_EMIT_SIGNALS TRUE
228 #define DEFAULT_PROP_MIN_PERCENT 0
229 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
230 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
231 #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
232 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
233 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
234 #define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
252 PROP_CURRENT_LEVEL_BYTES,
253 PROP_CURRENT_LEVEL_BUFFERS,
254 PROP_CURRENT_LEVEL_TIME,
256 PROP_HANDLE_SEGMENT_CHANGE,
261 static GstStaticPadTemplate gst_app_src_template =
262 GST_STATIC_PAD_TEMPLATE ("src",
265 GST_STATIC_CAPS_ANY);
267 static void gst_app_src_uri_handler_init (gpointer g_iface,
268 gpointer iface_data);
270 static void gst_app_src_dispose (GObject * object);
271 static void gst_app_src_finalize (GObject * object);
273 static void gst_app_src_set_property (GObject * object, guint prop_id,
274 const GValue * value, GParamSpec * pspec);
275 static void gst_app_src_get_property (GObject * object, guint prop_id,
276 GValue * value, GParamSpec * pspec);
278 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
280 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
281 gboolean do_min, guint64 min, gboolean do_max, guint64 max);
283 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
284 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
286 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
287 guint size, GstBuffer ** buf);
288 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
289 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
290 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
291 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
292 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
293 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
294 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
295 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
296 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
298 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
300 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
301 GstBufferList * buffer_list);
302 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
305 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
307 #define gst_app_src_parent_class parent_class
308 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
309 G_ADD_PRIVATE (GstAppSrc)
310 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
313 gst_app_src_class_init (GstAppSrcClass * klass)
315 GObjectClass *gobject_class = (GObjectClass *) klass;
316 GstElementClass *element_class = (GstElementClass *) klass;
317 GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
319 GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
321 gobject_class->dispose = gst_app_src_dispose;
322 gobject_class->finalize = gst_app_src_finalize;
324 gobject_class->set_property = gst_app_src_set_property;
325 gobject_class->get_property = gst_app_src_get_property;
330 * The GstCaps that will negotiated downstream and will be put
331 * on outgoing buffers.
333 g_object_class_install_property (gobject_class, PROP_CAPS,
334 g_param_spec_boxed ("caps", "Caps",
335 "The allowed caps for the src pad", GST_TYPE_CAPS,
336 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340 * The format to use for segment events. When the source is producing
341 * timestamped buffers this property should be set to GST_FORMAT_TIME.
343 g_object_class_install_property (gobject_class, PROP_FORMAT,
344 g_param_spec_enum ("format", "Format",
345 "The format of the segment events and seek", GST_TYPE_FORMAT,
346 DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350 * The total size in bytes of the data stream. If the total size is known, it
351 * is recommended to configure it with this property.
353 g_object_class_install_property (gobject_class, PROP_SIZE,
354 g_param_spec_int64 ("size", "Size",
355 "The size of the data stream in bytes (-1 if unknown)",
356 -1, G_MAXINT64, DEFAULT_PROP_SIZE,
357 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359 * GstAppSrc:stream-type:
361 * The type of stream that this source is producing. For seekable streams the
362 * application should connect to the seek-data signal.
364 g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
365 g_param_spec_enum ("stream-type", "Stream Type",
366 "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
367 DEFAULT_PROP_STREAM_TYPE,
368 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370 * GstAppSrc:max-bytes:
372 * The maximum amount of bytes that can be queued internally.
373 * After the maximum amount of bytes are queued, appsrc will emit the
374 * "enough-data" signal.
376 g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
377 g_param_spec_uint64 ("max-bytes", "Max bytes",
378 "The maximum number of bytes to queue internally (0 = unlimited)",
379 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
380 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383 * GstAppSrc:max-buffers:
385 * The maximum amount of buffers that can be queued internally.
386 * After the maximum amount of buffers are queued, appsrc will emit the
387 * "enough-data" signal.
391 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
392 g_param_spec_uint64 ("max-buffers", "Max buffers",
393 "The maximum number of buffers to queue internally (0 = unlimited)",
394 0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
395 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398 * GstAppSrc:max-time:
400 * The maximum amount of time that can be queued internally.
401 * After the maximum amount of time are queued, appsrc will emit the
402 * "enough-data" signal.
406 g_object_class_install_property (gobject_class, PROP_MAX_TIME,
407 g_param_spec_uint64 ("max-time", "Max time",
408 "The maximum amount of time to queue internally (0 = unlimited)",
409 0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
410 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
415 * When max-bytes are queued and after the enough-data signal has been emitted,
416 * block any further push-buffer calls until the amount of queued bytes drops
417 * below the max-bytes limit.
419 g_object_class_install_property (gobject_class, PROP_BLOCK,
420 g_param_spec_boolean ("block", "Block",
421 "Block push-buffer when max-bytes are queued",
422 DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
427 * Instruct the source to behave like a live source. This includes that it
428 * will only push out buffers in the PLAYING state.
430 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
431 g_param_spec_boolean ("is-live", "Is Live",
432 "Whether to act as a live source",
433 DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435 * GstAppSrc:min-latency:
437 * The minimum latency of the source. A value of -1 will use the default
438 * latency calculations of #GstBaseSrc.
440 g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
441 g_param_spec_int64 ("min-latency", "Min Latency",
442 "The minimum latency (-1 = default)",
443 -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
444 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446 * GstAppSrc::max-latency:
448 * The maximum latency of the source. A value of -1 means an unlimited amount
451 g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
452 g_param_spec_int64 ("max-latency", "Max Latency",
453 "The maximum latency (-1 = unlimited)",
454 -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
455 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
458 * GstAppSrc:emit-signals:
460 * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
461 * This option is by default enabled for backwards compatibility reasons but
462 * can disabled when needed because signal emission is expensive.
464 g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
465 g_param_spec_boolean ("emit-signals", "Emit signals",
466 "Emit need-data, enough-data and seek-data signals",
467 DEFAULT_PROP_EMIT_SIGNALS,
468 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471 * GstAppSrc:min-percent:
473 * Make appsrc emit the "need-data" signal when the amount of bytes in the
474 * queue drops below this percentage of max-bytes.
476 g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
477 g_param_spec_uint ("min-percent", "Min Percent",
478 "Emit need-data when queued bytes drops below this percent of max-bytes",
479 0, 100, DEFAULT_PROP_MIN_PERCENT,
480 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
483 * GstAppSrc:current-level-bytes:
485 * The number of currently queued bytes inside appsrc.
489 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
490 g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
491 "The number of currently queued bytes",
492 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
493 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
496 * GstAppSrc:current-level-buffers:
498 * The number of currently queued buffers inside appsrc.
502 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
503 g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
504 "The number of currently queued buffers",
505 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
506 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
509 * GstAppSrc:current-level-time:
511 * The amount of currently queued time inside appsrc.
515 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
516 g_param_spec_uint64 ("current-level-time", "Current Level Time",
517 "The amount of currently queued time",
518 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
519 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
522 * GstAppSrc:duration:
524 * The total duration in nanoseconds of the data stream. If the total duration is known, it
525 * is recommended to configure it with this property.
529 g_object_class_install_property (gobject_class, PROP_DURATION,
530 g_param_spec_uint64 ("duration", "Duration",
531 "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
532 0, G_MAXUINT64, DEFAULT_PROP_DURATION,
533 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536 * GstAppSrc:handle-segment-change:
538 * When enabled, appsrc will check GstSegment in GstSample which was
539 * pushed via gst_app_src_push_sample() or "push-sample" signal action.
540 * If a GstSegment is changed, corresponding segment event will be followed
543 * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
544 * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
549 g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
550 g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
551 "Whether to detect and handle changed time format GstSegment in "
552 "GstSample. User should set valid GstSegment in GstSample. "
553 "Must set format property as \"time\" to enable this property",
554 DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
555 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
556 G_PARAM_STATIC_STRINGS));
559 * GstAppSrc:leaky-type:
561 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
562 * will drop any buffers that are pushed into it once its internal queue is
563 * full. The selected type defines whether to drop the oldest or new
568 g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
569 g_param_spec_enum ("leaky-type", "Leaky Type",
570 "Whether to drop buffers once the internal queue is full",
571 GST_TYPE_APP_LEAKY_TYPE,
572 DEFAULT_PROP_LEAKY_TYPE,
573 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
574 G_PARAM_STATIC_STRINGS));
577 * GstAppSrc::need-data:
578 * @appsrc: the appsrc element that emitted the signal
579 * @length: the amount of bytes needed.
581 * Signal that the source needs more data. In the callback or from another
582 * thread you should call push-buffer or end-of-stream.
584 * @length is just a hint and when it is set to -1, any number of bytes can be
585 * pushed into @appsrc.
587 * You can call push-buffer multiple times until the enough-data signal is
590 gst_app_src_signals[SIGNAL_NEED_DATA] =
591 g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
592 G_STRUCT_OFFSET (GstAppSrcClass, need_data),
593 NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
596 * GstAppSrc::enough-data:
597 * @appsrc: the appsrc element that emitted the signal
599 * Signal that the source has enough data. It is recommended that the
600 * application stops calling push-buffer until the need-data signal is
601 * emitted again to avoid excessive buffer queueing.
603 gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
604 g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
605 G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
606 NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
609 * GstAppSrc::seek-data:
610 * @appsrc: the appsrc element that emitted the signal
611 * @offset: the offset to seek to
613 * Seek to the given offset. The next push-buffer should produce buffers from
615 * This callback is only called for seekable stream types.
617 * Returns: %TRUE if the seek succeeded.
619 gst_app_src_signals[SIGNAL_SEEK_DATA] =
620 g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
621 G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
622 NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
625 * GstAppSrc::push-buffer:
626 * @appsrc: the appsrc
627 * @buffer: (transfer none): a buffer to push
629 * Adds a buffer to the queue of buffers that the appsrc element will
630 * push to its source pad.
632 * This function does not take ownership of the buffer, but it takes a
633 * reference so the buffer can be unreffed at any time after calling this
636 * When the block property is TRUE, this function can block until free space
637 * becomes available in the queue.
639 gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
640 g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
641 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
642 push_buffer), NULL, NULL, NULL,
643 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
646 * GstAppSrc::push-buffer-list:
647 * @appsrc: the appsrc
648 * @buffer_list: (transfer none): a buffer list to push
650 * Adds a buffer list to the queue of buffers and buffer lists that the
651 * appsrc element will push to its source pad.
653 * This function does not take ownership of the buffer list, but it takes a
654 * reference so the buffer list can be unreffed at any time after calling
657 * When the block property is TRUE, this function can block until free space
658 * becomes available in the queue.
662 gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
663 g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
664 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
665 push_buffer_list), NULL, NULL, NULL,
666 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
669 * GstAppSrc::push-sample:
670 * @appsrc: the appsrc
671 * @sample: (transfer none): a sample from which extract buffer to push
673 * Extract a buffer from the provided sample and adds the extracted buffer
674 * to the queue of buffers that the appsrc element will
675 * push to its source pad. This function set the appsrc caps based on the caps
676 * in the sample and reset the caps if they change.
677 * Only the caps and the buffer of the provided sample are used and not
678 * for example the segment in the sample.
680 * This function does not take ownership of the sample, but it takes a
681 * reference so the sample can be unreffed at any time after calling this
684 * When the block property is TRUE, this function can block until free space
685 * becomes available in the queue.
689 gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
690 g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
691 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
692 push_sample), NULL, NULL, NULL,
693 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
697 * GstAppSrc::end-of-stream:
698 * @appsrc: the appsrc
700 * Notify @appsrc that no more buffer are available.
702 gst_app_src_signals[SIGNAL_END_OF_STREAM] =
703 g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
704 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
705 end_of_stream), NULL, NULL, NULL,
706 GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
708 gst_element_class_set_static_metadata (element_class, "AppSrc",
709 "Generic/Source", "Allow the application to feed buffers to a pipeline",
710 "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
712 gst_element_class_add_static_pad_template (element_class,
713 &gst_app_src_template);
715 element_class->send_event = gst_app_src_send_event;
717 basesrc_class->negotiate = gst_app_src_negotiate;
718 basesrc_class->get_caps = gst_app_src_internal_get_caps;
719 basesrc_class->create = gst_app_src_create;
720 basesrc_class->start = gst_app_src_start;
721 basesrc_class->stop = gst_app_src_stop;
722 basesrc_class->unlock = gst_app_src_unlock;
723 basesrc_class->unlock_stop = gst_app_src_unlock_stop;
724 basesrc_class->do_seek = gst_app_src_do_seek;
725 basesrc_class->is_seekable = gst_app_src_is_seekable;
726 basesrc_class->get_size = gst_app_src_do_get_size;
727 basesrc_class->query = gst_app_src_query;
728 basesrc_class->event = gst_app_src_event;
730 klass->push_buffer = gst_app_src_push_buffer_action;
731 klass->push_buffer_list = gst_app_src_push_buffer_list_action;
732 klass->push_sample = gst_app_src_push_sample_action;
733 klass->end_of_stream = gst_app_src_end_of_stream;
737 gst_app_src_init (GstAppSrc * appsrc)
739 GstAppSrcPrivate *priv;
741 priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
743 g_mutex_init (&priv->mutex);
744 g_cond_init (&priv->cond);
745 priv->queue = gst_queue_array_new (16);
746 priv->wait_status = NOONE_WAITING;
748 priv->size = DEFAULT_PROP_SIZE;
749 priv->duration = DEFAULT_PROP_DURATION;
750 priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
751 priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
752 priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
753 priv->max_time = DEFAULT_PROP_MAX_TIME;
754 priv->format = DEFAULT_PROP_FORMAT;
755 priv->block = DEFAULT_PROP_BLOCK;
756 priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
757 priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
758 priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
759 priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
760 priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
761 priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
763 gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
766 /* Must be called with priv->mutex */
768 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
771 GstAppSrcPrivate *priv = src->priv;
772 GstCaps *requeue_caps = NULL;
774 while (!gst_queue_array_is_empty (priv->queue)) {
775 obj = gst_queue_array_pop_head (priv->queue);
777 if (GST_IS_CAPS (obj) && retain_last_caps) {
778 gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
780 gst_mini_object_unref (obj);
785 gst_queue_array_push_tail (priv->queue, requeue_caps);
788 priv->queued_bytes = 0;
789 priv->queued_buffers = 0;
790 priv->queued_time = 0;
791 priv->last_in_running_time = GST_CLOCK_TIME_NONE;
792 priv->last_out_running_time = GST_CLOCK_TIME_NONE;
793 priv->need_discont_upstream = FALSE;
794 priv->need_discont_downstream = FALSE;
798 gst_app_src_dispose (GObject * obj)
800 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
801 GstAppSrcPrivate *priv = appsrc->priv;
802 Callbacks *callbacks = NULL;
804 GST_OBJECT_LOCK (appsrc);
805 if (priv->current_caps) {
806 gst_caps_unref (priv->current_caps);
807 priv->current_caps = NULL;
809 if (priv->last_caps) {
810 gst_caps_unref (priv->last_caps);
811 priv->last_caps = NULL;
813 GST_OBJECT_UNLOCK (appsrc);
815 g_mutex_lock (&priv->mutex);
817 callbacks = g_steal_pointer (&priv->callbacks);
818 gst_app_src_flush_queued (appsrc, FALSE);
819 g_mutex_unlock (&priv->mutex);
821 g_clear_pointer (&callbacks, callbacks_unref);
823 G_OBJECT_CLASS (parent_class)->dispose (obj);
827 gst_app_src_finalize (GObject * obj)
829 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
830 GstAppSrcPrivate *priv = appsrc->priv;
832 g_mutex_clear (&priv->mutex);
833 g_cond_clear (&priv->cond);
834 gst_queue_array_free (priv->queue);
838 G_OBJECT_CLASS (parent_class)->finalize (obj);
842 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
844 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
847 GST_OBJECT_LOCK (appsrc);
848 if ((caps = appsrc->priv->current_caps))
850 GST_OBJECT_UNLOCK (appsrc);
854 GstCaps *intersection =
855 gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
856 gst_caps_unref (caps);
859 caps = gst_caps_ref (filter);
863 GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
868 gst_app_src_set_property (GObject * object, guint prop_id,
869 const GValue * value, GParamSpec * pspec)
871 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
872 GstAppSrcPrivate *priv = appsrc->priv;
876 gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
879 gst_app_src_set_size (appsrc, g_value_get_int64 (value));
881 case PROP_STREAM_TYPE:
882 gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
885 gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
887 case PROP_MAX_BUFFERS:
888 gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
891 gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
894 priv->format = g_value_get_enum (value);
897 priv->block = g_value_get_boolean (value);
900 gst_base_src_set_live (GST_BASE_SRC (appsrc),
901 g_value_get_boolean (value));
903 case PROP_MIN_LATENCY:
904 gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
907 case PROP_MAX_LATENCY:
908 gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
909 g_value_get_int64 (value));
911 case PROP_EMIT_SIGNALS:
912 gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
914 case PROP_MIN_PERCENT:
915 priv->min_percent = g_value_get_uint (value);
918 gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
920 case PROP_HANDLE_SEGMENT_CHANGE:
921 priv->handle_segment_change = g_value_get_boolean (value);
923 case PROP_LEAKY_TYPE:
924 priv->leaky_type = g_value_get_enum (value);
927 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
933 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
936 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
937 GstAppSrcPrivate *priv = appsrc->priv;
941 g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
944 g_value_set_int64 (value, gst_app_src_get_size (appsrc));
946 case PROP_STREAM_TYPE:
947 g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
950 g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
952 case PROP_MAX_BUFFERS:
953 g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
956 g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
959 g_value_set_enum (value, priv->format);
962 g_value_set_boolean (value, priv->block);
965 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
967 case PROP_MIN_LATENCY:
971 gst_app_src_get_latency (appsrc, &min, NULL);
972 g_value_set_int64 (value, min);
975 case PROP_MAX_LATENCY:
979 gst_app_src_get_latency (appsrc, NULL, &max);
980 g_value_set_int64 (value, max);
983 case PROP_EMIT_SIGNALS:
984 g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
986 case PROP_MIN_PERCENT:
987 g_value_set_uint (value, priv->min_percent);
989 case PROP_CURRENT_LEVEL_BYTES:
990 g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
992 case PROP_CURRENT_LEVEL_BUFFERS:
993 g_value_set_uint64 (value,
994 gst_app_src_get_current_level_buffers (appsrc));
996 case PROP_CURRENT_LEVEL_TIME:
997 g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
1000 g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
1002 case PROP_HANDLE_SEGMENT_CHANGE:
1003 g_value_set_boolean (value, priv->handle_segment_change);
1005 case PROP_LEAKY_TYPE:
1006 g_value_set_enum (value, priv->leaky_type);
1009 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1015 gst_app_src_send_event (GstElement * element, GstEvent * event)
1017 GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
1018 GstAppSrcPrivate *priv = appsrc->priv;
1020 switch (GST_EVENT_TYPE (event)) {
1021 case GST_EVENT_FLUSH_STOP:
1022 g_mutex_lock (&priv->mutex);
1023 gst_app_src_flush_queued (appsrc, TRUE);
1024 g_mutex_unlock (&priv->mutex);
1027 if (GST_EVENT_IS_SERIALIZED (event)) {
1028 GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
1029 g_mutex_lock (&priv->mutex);
1030 gst_queue_array_push_tail (priv->queue, event);
1032 if ((priv->wait_status & STREAM_WAITING))
1033 g_cond_broadcast (&priv->cond);
1035 g_mutex_unlock (&priv->mutex);
1041 return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
1046 gst_app_src_unlock (GstBaseSrc * bsrc)
1048 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1049 GstAppSrcPrivate *priv = appsrc->priv;
1051 g_mutex_lock (&priv->mutex);
1052 GST_DEBUG_OBJECT (appsrc, "unlock start");
1053 priv->flushing = TRUE;
1054 g_cond_broadcast (&priv->cond);
1055 g_mutex_unlock (&priv->mutex);
1061 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
1063 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1064 GstAppSrcPrivate *priv = appsrc->priv;
1066 g_mutex_lock (&priv->mutex);
1067 GST_DEBUG_OBJECT (appsrc, "unlock stop");
1068 priv->flushing = FALSE;
1069 g_cond_broadcast (&priv->cond);
1070 g_mutex_unlock (&priv->mutex);
1076 gst_app_src_start (GstBaseSrc * bsrc)
1078 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1079 GstAppSrcPrivate *priv = appsrc->priv;
1081 g_mutex_lock (&priv->mutex);
1082 GST_DEBUG_OBJECT (appsrc, "starting");
1083 priv->started = TRUE;
1084 /* set the offset to -1 so that we always do a first seek. This is only used
1085 * in random-access mode. */
1087 priv->flushing = FALSE;
1088 g_mutex_unlock (&priv->mutex);
1090 gst_base_src_set_format (bsrc, priv->format);
1091 gst_segment_init (&priv->last_segment, priv->format);
1092 gst_segment_init (&priv->current_segment, priv->format);
1093 priv->pending_custom_segment = FALSE;
1099 gst_app_src_stop (GstBaseSrc * bsrc)
1101 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1102 GstAppSrcPrivate *priv = appsrc->priv;
1104 g_mutex_lock (&priv->mutex);
1105 GST_DEBUG_OBJECT (appsrc, "stopping");
1106 priv->is_eos = FALSE;
1107 priv->flushing = TRUE;
1108 priv->started = FALSE;
1109 priv->posted_latency_msg = FALSE;
1110 gst_app_src_flush_queued (appsrc, TRUE);
1111 g_cond_broadcast (&priv->cond);
1112 g_mutex_unlock (&priv->mutex);
1118 gst_app_src_is_seekable (GstBaseSrc * src)
1120 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1121 GstAppSrcPrivate *priv = appsrc->priv;
1122 gboolean res = FALSE;
1124 switch (priv->stream_type) {
1125 case GST_APP_STREAM_TYPE_STREAM:
1127 case GST_APP_STREAM_TYPE_SEEKABLE:
1128 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1136 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
1138 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1140 *size = gst_app_src_get_size (appsrc);
1146 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
1148 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1149 GstAppSrcPrivate *priv = appsrc->priv;
1152 switch (GST_QUERY_TYPE (query)) {
1153 case GST_QUERY_LATENCY:
1155 GstClockTime min, max;
1158 /* Query the parent class for the defaults */
1159 res = gst_base_src_query_latency (src, &live, &min, &max);
1161 /* overwrite with our values when we need to */
1162 g_mutex_lock (&priv->mutex);
1163 if (priv->min_latency != -1) {
1164 min = priv->min_latency;
1165 max = priv->max_latency;
1167 g_mutex_unlock (&priv->mutex);
1169 gst_query_set_latency (query, live, min, max);
1172 case GST_QUERY_SCHEDULING:
1174 gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1175 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1177 switch (priv->stream_type) {
1178 case GST_APP_STREAM_TYPE_STREAM:
1179 case GST_APP_STREAM_TYPE_SEEKABLE:
1181 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1182 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1188 case GST_QUERY_DURATION:
1191 gst_query_parse_duration (query, &format, NULL);
1192 if (format == GST_FORMAT_BYTES) {
1193 gst_query_set_duration (query, format, priv->size);
1195 } else if (format == GST_FORMAT_TIME) {
1196 if (priv->duration != GST_CLOCK_TIME_NONE) {
1197 gst_query_set_duration (query, format, priv->duration);
1208 res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
1215 /* will be called in push mode */
1217 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1219 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1220 GstAppSrcPrivate *priv = appsrc->priv;
1221 gint64 desired_position;
1222 gboolean res = FALSE;
1224 Callbacks *callbacks = NULL;
1226 desired_position = segment->position;
1228 /* no need to try to seek in streaming mode */
1229 if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1232 GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1233 desired_position, gst_format_get_name (segment->format));
1235 g_mutex_lock (&priv->mutex);
1236 emit = priv->emit_signals;
1237 if (priv->callbacks)
1238 callbacks = callbacks_ref (priv->callbacks);
1239 g_mutex_unlock (&priv->mutex);
1241 if (callbacks && callbacks->callbacks.seek_data) {
1243 callbacks->callbacks.seek_data (appsrc, desired_position,
1244 callbacks->user_data);
1246 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1247 desired_position, &res);
1250 g_clear_pointer (&callbacks, callbacks_unref);
1253 GST_DEBUG_OBJECT (appsrc, "flushing queue");
1254 g_mutex_lock (&priv->mutex);
1255 gst_app_src_flush_queued (appsrc, TRUE);
1256 gst_segment_copy_into (segment, &priv->last_segment);
1257 gst_segment_copy_into (segment, &priv->current_segment);
1258 priv->pending_custom_segment = FALSE;
1259 g_mutex_unlock (&priv->mutex);
1260 priv->is_eos = FALSE;
1262 GST_WARNING_OBJECT (appsrc, "seek failed");
1268 /* must be called with the appsrc mutex */
1270 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1272 gboolean res = FALSE;
1274 GstAppSrcPrivate *priv = appsrc->priv;
1275 Callbacks *callbacks = NULL;
1277 emit = priv->emit_signals;
1278 if (priv->callbacks)
1279 callbacks = callbacks_ref (priv->callbacks);
1280 g_mutex_unlock (&priv->mutex);
1282 GST_DEBUG_OBJECT (appsrc,
1283 "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1284 priv->offset, offset);
1286 if (callbacks && callbacks->callbacks.seek_data)
1287 res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
1289 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1292 g_clear_pointer (&callbacks, callbacks_unref);
1294 g_mutex_lock (&priv->mutex);
1299 /* must be called with the appsrc mutex. After this call things can be
1302 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1305 GstAppSrcPrivate *priv = appsrc->priv;
1306 Callbacks *callbacks = NULL;
1308 emit = priv->emit_signals;
1309 if (priv->callbacks)
1310 callbacks = callbacks_ref (priv->callbacks);
1311 g_mutex_unlock (&priv->mutex);
1313 /* we have no data, we need some. We fire the signal with the size hint. */
1314 if (callbacks && callbacks->callbacks.need_data)
1315 callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
1317 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1320 g_clear_pointer (&callbacks, callbacks_unref);
1322 g_mutex_lock (&priv->mutex);
1323 /* we can be flushing now because we released the lock */
1326 /* must be called with the appsrc mutex */
1328 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1330 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1331 GstAppSrcPrivate *priv = appsrc->priv;
1335 GST_OBJECT_LOCK (basesrc);
1336 caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1337 GST_OBJECT_UNLOCK (basesrc);
1339 /* Avoid deadlock by unlocking mutex
1340 * otherwise we get deadlock between this and stream lock */
1341 g_mutex_unlock (&priv->mutex);
1343 result = gst_base_src_set_caps (basesrc, caps);
1344 gst_caps_unref (caps);
1346 result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1348 g_mutex_lock (&priv->mutex);
1354 gst_app_src_negotiate (GstBaseSrc * basesrc)
1356 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1357 GstAppSrcPrivate *priv = appsrc->priv;
1360 g_mutex_lock (&priv->mutex);
1361 result = gst_app_src_do_negotiate (basesrc);
1362 g_mutex_unlock (&priv->mutex);
1366 /* Update the currently queued bytes/buffers/time information for the item
1367 * that was just removed from the queue.
1369 * If update_offset is set, additionally the offset of the source will be
1370 * moved forward accordingly as if that many bytes were output.
1373 gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
1374 gboolean update_offset)
1376 GstAppSrcPrivate *priv = appsrc->priv;
1378 guint n_buffers = 0;
1379 GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1381 if (GST_IS_BUFFER (item)) {
1382 GstBuffer *buf = GST_BUFFER_CAST (item);
1383 buf_size = gst_buffer_get_size (buf);
1386 end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1387 if (end_buffer_ts != GST_CLOCK_TIME_NONE
1388 && GST_BUFFER_DURATION_IS_VALID (buf))
1389 end_buffer_ts += GST_BUFFER_DURATION (buf);
1391 GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
1392 } else if (GST_IS_BUFFER_LIST (item)) {
1393 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1396 n_buffers = gst_buffer_list_length (buffer_list);
1398 for (i = 0; i < n_buffers; i++) {
1399 GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1400 GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1402 buf_size += gst_buffer_get_size (tmp);
1403 /* Update to the last buffer's timestamp that is known */
1404 if (ts != GST_CLOCK_TIME_NONE) {
1406 if (GST_BUFFER_DURATION_IS_VALID (tmp))
1407 end_buffer_ts += GST_BUFFER_DURATION (tmp);
1412 priv->queued_bytes -= buf_size;
1413 priv->queued_buffers -= n_buffers;
1415 /* Update time level if working on a TIME segment */
1416 if ((priv->current_segment.format == GST_FORMAT_TIME
1417 || (priv->current_segment.format == GST_FORMAT_UNDEFINED
1418 && priv->last_segment.format == GST_FORMAT_TIME))
1419 && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1420 const GstSegment *segment =
1421 priv->current_segment.format ==
1422 GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
1424 /* Clip to the current segment boundaries */
1425 if (segment->stop != -1 && end_buffer_ts > segment->stop)
1426 end_buffer_ts = segment->stop;
1427 else if (segment->start > end_buffer_ts)
1428 end_buffer_ts = segment->start;
1430 priv->last_out_running_time =
1431 gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
1433 GST_TRACE_OBJECT (appsrc,
1434 "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1435 GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1436 GST_TIME_ARGS (priv->last_out_running_time));
1438 /* If timestamps on both sides are known, calculate the current
1439 * fill level in time and consider the queue empty if the output
1440 * running time is lower than the input one (i.e. some kind of reset
1443 if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1444 && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1445 if (priv->last_out_running_time > priv->last_in_running_time) {
1446 priv->queued_time = 0;
1449 priv->last_in_running_time - priv->last_out_running_time;
1454 GST_DEBUG_OBJECT (appsrc,
1455 "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1456 " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
1457 priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
1459 /* only update the offset when in random_access mode and when requested by
1460 * the caller, i.e. not when just dropping the item */
1461 if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1462 priv->offset += buf_size;
1465 /* Update the currently queued bytes/buffers/time information for the item
1466 * that was just added to the queue.
1469 gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
1471 GstAppSrcPrivate *priv = appsrc->priv;
1472 GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
1473 GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1475 guint n_buffers = 0;
1477 if (GST_IS_BUFFER (item)) {
1478 GstBuffer *buf = GST_BUFFER_CAST (item);
1480 buf_size = gst_buffer_get_size (buf);
1483 start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1484 if (end_buffer_ts != GST_CLOCK_TIME_NONE
1485 && GST_BUFFER_DURATION_IS_VALID (buf))
1486 end_buffer_ts += GST_BUFFER_DURATION (buf);
1487 } else if (GST_IS_BUFFER_LIST (item)) {
1488 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1491 n_buffers = gst_buffer_list_length (buffer_list);
1493 for (i = 0; i < n_buffers; i++) {
1494 GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1495 GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1497 buf_size += gst_buffer_get_size (tmp);
1499 if (ts != GST_CLOCK_TIME_NONE) {
1500 if (start_buffer_ts == GST_CLOCK_TIME_NONE)
1501 start_buffer_ts = ts;
1503 if (GST_BUFFER_DURATION_IS_VALID (tmp))
1504 end_buffer_ts += GST_BUFFER_DURATION (tmp);
1509 priv->queued_bytes += buf_size;
1510 priv->queued_buffers += n_buffers;
1512 /* Update time level if working on a TIME segment */
1513 if (priv->last_segment.format == GST_FORMAT_TIME
1514 && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1515 /* Clip to the last segment boundaries */
1516 if (priv->last_segment.stop != -1
1517 && end_buffer_ts > priv->last_segment.stop)
1518 end_buffer_ts = priv->last_segment.stop;
1519 else if (priv->last_segment.start > end_buffer_ts)
1520 end_buffer_ts = priv->last_segment.start;
1522 priv->last_in_running_time =
1523 gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1526 /* If this is the only buffer then we can directly update the queued time
1527 * here. This is especially useful if this was the first buffer because
1528 * otherwise we would have to wait until it is actually unqueued to know
1529 * the queued duration */
1530 if (priv->queued_buffers == 1) {
1531 if (priv->last_segment.stop != -1
1532 && start_buffer_ts > priv->last_segment.stop)
1533 start_buffer_ts = priv->last_segment.stop;
1534 else if (priv->last_segment.start > start_buffer_ts)
1535 start_buffer_ts = priv->last_segment.start;
1537 priv->last_out_running_time =
1538 gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1542 GST_TRACE_OBJECT (appsrc,
1543 "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1544 GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1545 GST_TIME_ARGS (priv->last_out_running_time));
1547 if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1548 && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1549 if (priv->last_out_running_time > priv->last_in_running_time) {
1550 priv->queued_time = 0;
1553 priv->last_in_running_time - priv->last_out_running_time;
1558 GST_DEBUG_OBJECT (appsrc,
1559 "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1560 " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
1561 GST_TIME_ARGS (priv->queued_time));
1564 static GstFlowReturn
1565 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1568 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1569 GstAppSrcPrivate *priv = appsrc->priv;
1572 GST_OBJECT_LOCK (appsrc);
1573 if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1574 bsrc->segment.format == GST_FORMAT_BYTES)) {
1575 GST_DEBUG_OBJECT (appsrc,
1576 "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1577 bsrc->segment.duration, priv->size);
1578 bsrc->segment.duration = priv->size;
1579 GST_OBJECT_UNLOCK (appsrc);
1581 gst_element_post_message (GST_ELEMENT (appsrc),
1582 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1583 } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1584 bsrc->segment.format == GST_FORMAT_TIME)) {
1585 GST_DEBUG_OBJECT (appsrc,
1586 "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1587 GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1588 bsrc->segment.duration = priv->duration;
1589 GST_OBJECT_UNLOCK (appsrc);
1591 gst_element_post_message (GST_ELEMENT (appsrc),
1592 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1594 GST_OBJECT_UNLOCK (appsrc);
1597 g_mutex_lock (&priv->mutex);
1598 /* check flushing first */
1599 if (G_UNLIKELY (priv->flushing))
1602 if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1603 /* if we are dealing with a random-access stream, issue a seek if the offset
1605 if (G_UNLIKELY (priv->offset != offset)) {
1609 res = gst_app_src_emit_seek (appsrc, offset);
1611 if (G_UNLIKELY (!res))
1612 /* failing to seek is fatal */
1615 priv->offset = offset;
1616 priv->is_eos = FALSE;
1621 /* Our lock may have been release to push events or caps, check out
1622 * state in case we are now flushing. */
1623 if (G_UNLIKELY (priv->flushing))
1626 /* return data as long as we have some */
1627 if (!gst_queue_array_is_empty (priv->queue)) {
1628 GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1630 if (GST_IS_CAPS (obj)) {
1631 GstCaps *next_caps = GST_CAPS (obj);
1632 gboolean caps_changed = TRUE;
1634 if (next_caps && priv->current_caps)
1635 caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1637 caps_changed = (next_caps != priv->current_caps);
1639 gst_caps_replace (&priv->current_caps, next_caps);
1642 gst_caps_unref (next_caps);
1646 gst_app_src_do_negotiate (bsrc);
1648 /* Continue checks caps and queue */
1652 if (GST_IS_BUFFER (obj)) {
1653 GstBuffer *buffer = GST_BUFFER (obj);
1655 /* Mark the buffer as DISCONT if we previously dropped a buffer
1656 * instead of outputting it */
1657 if (priv->need_discont_downstream) {
1658 buffer = gst_buffer_make_writable (buffer);
1659 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1660 priv->need_discont_downstream = FALSE;
1664 } else if (GST_IS_BUFFER_LIST (obj)) {
1665 GstBufferList *buffer_list;
1667 buffer_list = GST_BUFFER_LIST (obj);
1669 /* Mark the first buffer of the buffer list as DISCONT if we
1670 * previously dropped a buffer instead of outputting it */
1671 if (priv->need_discont_downstream) {
1674 buffer_list = gst_buffer_list_make_writable (buffer_list);
1675 buffer = gst_buffer_list_get_writable (buffer_list, 0);
1676 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1677 priv->need_discont_downstream = FALSE;
1680 gst_base_src_submit_buffer_list (bsrc, buffer_list);
1682 } else if (GST_IS_EVENT (obj)) {
1683 GstEvent *event = GST_EVENT (obj);
1685 GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
1687 if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1688 const GstSegment *segment = NULL;
1690 gst_event_parse_segment (event, &segment);
1691 g_assert (segment != NULL);
1693 if (!gst_segment_is_equal (&priv->current_segment, segment)) {
1694 GST_DEBUG_OBJECT (appsrc,
1695 "Update new segment %" GST_PTR_FORMAT, event);
1696 if (!gst_base_src_new_segment (bsrc, segment)) {
1697 GST_ERROR_OBJECT (appsrc,
1698 "Couldn't set new segment %" GST_PTR_FORMAT, event);
1699 gst_event_unref (event);
1700 goto invalid_segment;
1702 gst_segment_copy_into (segment, &priv->current_segment);
1705 gst_event_unref (event);
1707 GstEvent *seg_event;
1708 GstSegment last_segment = priv->last_segment;
1710 /* event is serialized with the buffers flow */
1712 /* We are about to push an event, release out lock */
1713 g_mutex_unlock (&priv->mutex);
1716 gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
1717 GST_EVENT_SEGMENT, 0);
1719 seg_event = gst_event_new_segment (&last_segment);
1721 GST_DEBUG_OBJECT (appsrc,
1722 "received serialized event before first buffer, push default segment %"
1723 GST_PTR_FORMAT, seg_event);
1725 gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
1727 gst_event_unref (seg_event);
1730 gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
1732 g_mutex_lock (&priv->mutex);
1736 g_assert_not_reached ();
1739 gst_app_src_update_queued_pop (appsrc, obj, TRUE);
1741 /* signal that we removed an item */
1742 if ((priv->wait_status & APP_WAITING))
1743 g_cond_broadcast (&priv->cond);
1745 /* see if we go lower than the min-percent */
1746 if (priv->min_percent) {
1747 if ((priv->max_bytes
1748 && priv->queued_bytes * 100 / priv->max_bytes <=
1749 priv->min_percent) || (priv->max_buffers
1750 && priv->queued_buffers * 100 / priv->max_buffers <=
1751 priv->min_percent) || (priv->max_time
1752 && priv->queued_time * 100 / priv->max_time <=
1753 priv->min_percent)) {
1754 /* ignore flushing state, we got a buffer and we will return it now.
1755 * Errors will be handled in the next round */
1756 gst_app_src_emit_need_data (appsrc, size);
1762 gst_app_src_emit_need_data (appsrc, size);
1764 /* we can be flushing now because we released the lock above */
1765 if (G_UNLIKELY (priv->flushing))
1768 /* if we have a buffer now, continue the loop and try to return it. In
1769 * random-access mode (where a buffer is normally pushed in the above
1770 * signal) we can still be empty because the pushed buffer got flushed or
1771 * when the application pushes the requested buffer later, we support both
1773 if (!gst_queue_array_is_empty (priv->queue))
1776 /* no buffer yet, maybe we are EOS, if not, block for more data. */
1780 if (G_UNLIKELY (priv->is_eos))
1783 /* nothing to return, wait a while for new data or flushing. */
1784 priv->wait_status |= STREAM_WAITING;
1785 g_cond_wait (&priv->cond, &priv->mutex);
1786 priv->wait_status &= ~STREAM_WAITING;
1788 g_mutex_unlock (&priv->mutex);
1794 GST_DEBUG_OBJECT (appsrc, "we are flushing");
1795 g_mutex_unlock (&priv->mutex);
1796 return GST_FLOW_FLUSHING;
1800 GST_DEBUG_OBJECT (appsrc, "we are EOS");
1801 g_mutex_unlock (&priv->mutex);
1802 return GST_FLOW_EOS;
1806 g_mutex_unlock (&priv->mutex);
1807 GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1809 return GST_FLOW_ERROR;
1814 g_mutex_unlock (&priv->mutex);
1815 GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
1816 (NULL), ("Failed to configure the provided input segment."));
1817 return GST_FLOW_ERROR;
1824 * gst_app_src_set_caps:
1825 * @appsrc: a #GstAppSrc
1826 * @caps: (nullable): caps to set
1828 * Set the capabilities on the appsrc element. This function takes
1829 * a copy of the caps structure. After calling this method, the source will
1830 * only produce caps that match @caps. @caps must be fixed and the caps on the
1831 * buffers must match the caps or left NULL.
1834 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1836 GstAppSrcPrivate *priv;
1837 gboolean caps_changed;
1839 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1841 priv = appsrc->priv;
1843 g_mutex_lock (&priv->mutex);
1845 GST_OBJECT_LOCK (appsrc);
1846 if (caps && priv->last_caps)
1847 caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1849 caps_changed = (caps != priv->last_caps);
1855 new_caps = caps ? gst_caps_copy (caps) : NULL;
1856 GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1858 while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1859 gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1861 gst_queue_array_push_tail (priv->queue, new_caps);
1862 gst_caps_replace (&priv->last_caps, new_caps);
1864 if ((priv->wait_status & STREAM_WAITING))
1865 g_cond_broadcast (&priv->cond);
1868 GST_OBJECT_UNLOCK (appsrc);
1870 g_mutex_unlock (&priv->mutex);
1874 * gst_app_src_get_caps:
1875 * @appsrc: a #GstAppSrc
1877 * Get the configured caps on @appsrc.
1879 * Returns: (nullable) (transfer full): the #GstCaps produced by the source. gst_caps_unref() after usage.
1882 gst_app_src_get_caps (GstAppSrc * appsrc)
1887 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1889 GST_OBJECT_LOCK (appsrc);
1890 if ((caps = appsrc->priv->last_caps))
1891 gst_caps_ref (caps);
1892 GST_OBJECT_UNLOCK (appsrc);
1899 * gst_app_src_set_size:
1900 * @appsrc: a #GstAppSrc
1901 * @size: the size to set
1903 * Set the size of the stream in bytes. A value of -1 means that the size is
1907 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1909 GstAppSrcPrivate *priv;
1911 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1913 priv = appsrc->priv;
1915 GST_OBJECT_LOCK (appsrc);
1916 GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1918 GST_OBJECT_UNLOCK (appsrc);
1922 * gst_app_src_get_size:
1923 * @appsrc: a #GstAppSrc
1925 * Get the size of the stream in bytes. A value of -1 means that the size is
1928 * Returns: the size of the stream previously set with gst_app_src_set_size();
1931 gst_app_src_get_size (GstAppSrc * appsrc)
1934 GstAppSrcPrivate *priv;
1936 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1938 priv = appsrc->priv;
1940 GST_OBJECT_LOCK (appsrc);
1942 GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1943 GST_OBJECT_UNLOCK (appsrc);
1949 * gst_app_src_set_duration:
1950 * @appsrc: a #GstAppSrc
1951 * @duration: the duration to set
1953 * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1959 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1961 GstAppSrcPrivate *priv;
1963 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1965 priv = appsrc->priv;
1967 GST_OBJECT_LOCK (appsrc);
1968 GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1969 GST_TIME_ARGS (duration));
1970 priv->duration = duration;
1971 GST_OBJECT_UNLOCK (appsrc);
1975 * gst_app_src_get_duration:
1976 * @appsrc: a #GstAppSrc
1978 * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1981 * Returns: the duration of the stream previously set with gst_app_src_set_duration();
1986 gst_app_src_get_duration (GstAppSrc * appsrc)
1988 GstClockTime duration;
1989 GstAppSrcPrivate *priv;
1991 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
1993 priv = appsrc->priv;
1995 GST_OBJECT_LOCK (appsrc);
1996 duration = priv->duration;
1997 GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
1998 GST_TIME_ARGS (duration));
1999 GST_OBJECT_UNLOCK (appsrc);
2005 * gst_app_src_set_stream_type:
2006 * @appsrc: a #GstAppSrc
2007 * @type: the new state
2009 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
2012 * A stream_type stream
2015 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
2017 GstAppSrcPrivate *priv;
2019 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2021 priv = appsrc->priv;
2023 GST_OBJECT_LOCK (appsrc);
2024 GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
2025 priv->stream_type = type;
2026 GST_OBJECT_UNLOCK (appsrc);
2030 * gst_app_src_get_stream_type:
2031 * @appsrc: a #GstAppSrc
2033 * Get the stream type. Control the stream type of @appsrc
2034 * with gst_app_src_set_stream_type().
2036 * Returns: the stream type.
2039 gst_app_src_get_stream_type (GstAppSrc * appsrc)
2041 gboolean stream_type;
2042 GstAppSrcPrivate *priv;
2044 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2046 priv = appsrc->priv;
2048 GST_OBJECT_LOCK (appsrc);
2049 stream_type = priv->stream_type;
2050 GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
2051 GST_OBJECT_UNLOCK (appsrc);
2057 * gst_app_src_set_max_bytes:
2058 * @appsrc: a #GstAppSrc
2059 * @max: the maximum number of bytes to queue
2061 * Set the maximum amount of bytes that can be queued in @appsrc.
2062 * After the maximum amount of bytes are queued, @appsrc will emit the
2063 * "enough-data" signal.
2066 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
2068 GstAppSrcPrivate *priv;
2070 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2072 priv = appsrc->priv;
2074 g_mutex_lock (&priv->mutex);
2075 if (max != priv->max_bytes) {
2076 GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
2077 priv->max_bytes = max;
2078 /* signal the change */
2079 g_cond_broadcast (&priv->cond);
2081 g_mutex_unlock (&priv->mutex);
2085 * gst_app_src_get_max_bytes:
2086 * @appsrc: a #GstAppSrc
2088 * Get the maximum amount of bytes that can be queued in @appsrc.
2090 * Returns: The maximum amount of bytes that can be queued.
2093 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
2096 GstAppSrcPrivate *priv;
2098 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2100 priv = appsrc->priv;
2102 g_mutex_lock (&priv->mutex);
2103 result = priv->max_bytes;
2104 GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
2105 g_mutex_unlock (&priv->mutex);
2111 * gst_app_src_get_current_level_bytes:
2112 * @appsrc: a #GstAppSrc
2114 * Get the number of currently queued bytes inside @appsrc.
2116 * Returns: The number of currently queued bytes.
2121 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
2124 GstAppSrcPrivate *priv;
2126 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2128 priv = appsrc->priv;
2130 GST_OBJECT_LOCK (appsrc);
2131 queued = priv->queued_bytes;
2132 GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
2134 GST_OBJECT_UNLOCK (appsrc);
2140 * gst_app_src_set_max_buffers:
2141 * @appsrc: a #GstAppSrc
2142 * @max: the maximum number of buffers to queue
2144 * Set the maximum amount of buffers that can be queued in @appsrc.
2145 * After the maximum amount of buffers are queued, @appsrc will emit the
2146 * "enough-data" signal.
2151 gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
2153 GstAppSrcPrivate *priv;
2155 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2157 priv = appsrc->priv;
2159 g_mutex_lock (&priv->mutex);
2160 if (max != priv->max_buffers) {
2161 GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
2162 priv->max_buffers = max;
2163 /* signal the change */
2164 g_cond_broadcast (&priv->cond);
2166 g_mutex_unlock (&priv->mutex);
2170 * gst_app_src_get_max_buffers:
2171 * @appsrc: a #GstAppSrc
2173 * Get the maximum amount of buffers that can be queued in @appsrc.
2175 * Returns: The maximum amount of buffers that can be queued.
2180 gst_app_src_get_max_buffers (GstAppSrc * appsrc)
2183 GstAppSrcPrivate *priv;
2185 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2187 priv = appsrc->priv;
2189 g_mutex_lock (&priv->mutex);
2190 result = priv->max_buffers;
2191 GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
2193 g_mutex_unlock (&priv->mutex);
2199 * gst_app_src_get_current_level_buffers:
2200 * @appsrc: a #GstAppSrc
2202 * Get the number of currently queued buffers inside @appsrc.
2204 * Returns: The number of currently queued buffers.
2209 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
2212 GstAppSrcPrivate *priv;
2214 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2216 priv = appsrc->priv;
2218 GST_OBJECT_LOCK (appsrc);
2219 queued = priv->queued_buffers;
2220 GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
2222 GST_OBJECT_UNLOCK (appsrc);
2228 * gst_app_src_set_max_time:
2229 * @appsrc: a #GstAppSrc
2230 * @max: the maximum amonut of time to queue
2232 * Set the maximum amount of time that can be queued in @appsrc.
2233 * After the maximum amount of time are queued, @appsrc will emit the
2234 * "enough-data" signal.
2239 gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
2241 GstAppSrcPrivate *priv;
2243 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2245 priv = appsrc->priv;
2247 g_mutex_lock (&priv->mutex);
2248 if (max != priv->max_time) {
2249 GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
2250 GST_TIME_ARGS (max));
2251 priv->max_time = max;
2252 /* signal the change */
2253 g_cond_broadcast (&priv->cond);
2255 g_mutex_unlock (&priv->mutex);
2259 * gst_app_src_get_max_time:
2260 * @appsrc: a #GstAppSrc
2262 * Get the maximum amount of time that can be queued in @appsrc.
2264 * Returns: The maximum amount of time that can be queued.
2269 gst_app_src_get_max_time (GstAppSrc * appsrc)
2271 GstClockTime result;
2272 GstAppSrcPrivate *priv;
2274 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2276 priv = appsrc->priv;
2278 g_mutex_lock (&priv->mutex);
2279 result = priv->max_time;
2280 GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
2281 GST_TIME_ARGS (result));
2282 g_mutex_unlock (&priv->mutex);
2288 * gst_app_src_get_current_level_time:
2289 * @appsrc: a #GstAppSrc
2291 * Get the amount of currently queued time inside @appsrc.
2293 * Returns: The amount of currently queued time.
2298 gst_app_src_get_current_level_time (GstAppSrc * appsrc)
2301 GstAppSrcPrivate *priv;
2303 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2305 priv = appsrc->priv;
2307 GST_OBJECT_LOCK (appsrc);
2308 queued = priv->queued_time;
2309 GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
2310 GST_TIME_ARGS (queued));
2311 GST_OBJECT_UNLOCK (appsrc);
2317 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
2318 gboolean do_max, guint64 max)
2320 GstAppSrcPrivate *priv = appsrc->priv;
2321 gboolean changed = FALSE;
2323 g_mutex_lock (&priv->mutex);
2324 if (do_min && priv->min_latency != min) {
2325 priv->min_latency = min;
2328 if (do_max && priv->max_latency != max) {
2329 priv->max_latency = max;
2332 if (!priv->posted_latency_msg) {
2333 priv->posted_latency_msg = TRUE;
2336 g_mutex_unlock (&priv->mutex);
2339 GST_DEBUG_OBJECT (appsrc, "posting latency changed");
2340 gst_element_post_message (GST_ELEMENT_CAST (appsrc),
2341 gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
2346 * gst_app_src_set_leaky_type:
2347 * @appsrc: a #GstAppSrc
2348 * @leaky: the #GstAppLeakyType
2350 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
2351 * will drop any buffers that are pushed into it once its internal queue is
2352 * full. The selected type defines whether to drop the oldest or new
2358 gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
2360 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2362 appsrc->priv->leaky_type = leaky;
2366 * gst_app_src_get_leaky_type:
2367 * @appsrc: a #GstAppSrc
2369 * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
2372 * Returns: The currently set #GstAppLeakyType.
2377 gst_app_src_get_leaky_type (GstAppSrc * appsrc)
2379 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
2381 return appsrc->priv->leaky_type;
2385 * gst_app_src_set_latency:
2386 * @appsrc: a #GstAppSrc
2387 * @min: the min latency
2388 * @max: the max latency
2390 * Configure the @min and @max latency in @src. If @min is set to -1, the
2391 * default latency calculations for pseudo-live sources will be used.
2394 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
2396 gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
2400 * gst_app_src_get_latency:
2401 * @appsrc: a #GstAppSrc
2402 * @min: (out): the min latency
2403 * @max: (out): the max latency
2405 * Retrieve the min and max latencies in @min and @max respectively.
2408 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
2410 GstAppSrcPrivate *priv;
2412 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2414 priv = appsrc->priv;
2416 g_mutex_lock (&priv->mutex);
2418 *min = priv->min_latency;
2420 *max = priv->max_latency;
2421 g_mutex_unlock (&priv->mutex);
2425 * gst_app_src_set_emit_signals:
2426 * @appsrc: a #GstAppSrc
2427 * @emit: the new state
2429 * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
2430 * by default disabled because signal emission is expensive and unneeded when
2431 * the application prefers to operate in pull mode.
2434 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
2436 GstAppSrcPrivate *priv;
2438 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2440 priv = appsrc->priv;
2442 g_mutex_lock (&priv->mutex);
2443 priv->emit_signals = emit;
2444 g_mutex_unlock (&priv->mutex);
2448 * gst_app_src_get_emit_signals:
2449 * @appsrc: a #GstAppSrc
2451 * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
2453 * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
2457 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
2460 GstAppSrcPrivate *priv;
2462 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2464 priv = appsrc->priv;
2466 g_mutex_lock (&priv->mutex);
2467 result = priv->emit_signals;
2468 g_mutex_unlock (&priv->mutex);
2473 static GstFlowReturn
2474 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
2475 GstBufferList * buflist, gboolean steal_ref)
2477 gboolean first = TRUE;
2478 GstAppSrcPrivate *priv;
2480 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2482 priv = appsrc->priv;
2485 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2487 g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
2489 if (buflist != NULL) {
2490 if (gst_buffer_list_length (buflist) == 0)
2493 buffer = gst_buffer_list_get (buflist, 0);
2496 if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
2497 GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
2498 gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
2501 clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
2504 GstClockTime base_time =
2505 gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
2507 now = gst_clock_get_time (clock);
2508 if (now > base_time)
2512 gst_object_unref (clock);
2514 if (buflist == NULL) {
2516 buffer = gst_buffer_copy (buffer);
2519 buffer = gst_buffer_make_writable (buffer);
2523 buflist = gst_buffer_list_copy (buflist);
2526 buflist = gst_buffer_list_make_writable (buflist);
2528 buffer = gst_buffer_list_get_writable (buflist, 0);
2531 GST_BUFFER_PTS (buffer) = now;
2532 GST_BUFFER_DTS (buffer) = now;
2534 GST_WARNING_OBJECT (appsrc,
2535 "do-timestamp=TRUE but buffers are provided before "
2536 "reaching the PLAYING state and having a clock. Timestamps will "
2537 "not be accurate!");
2541 g_mutex_lock (&priv->mutex);
2544 /* can't accept buffers when we are flushing or EOS */
2551 if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
2552 (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
2553 (priv->max_time && priv->queued_time >= priv->max_time)) {
2554 GST_DEBUG_OBJECT (appsrc,
2555 "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
2556 G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
2557 " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
2558 GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
2559 priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
2560 priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
2561 GST_TIME_ARGS (priv->max_time));
2564 Callbacks *callbacks = NULL;
2567 emit = priv->emit_signals;
2568 if (priv->callbacks)
2569 callbacks = callbacks_ref (priv->callbacks);
2570 /* only signal on the first push */
2571 g_mutex_unlock (&priv->mutex);
2573 if (callbacks && callbacks->callbacks.enough_data)
2574 callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
2576 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
2579 g_clear_pointer (&callbacks, callbacks_unref);
2581 g_mutex_lock (&priv->mutex);
2584 if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
2585 priv->need_discont_upstream = TRUE;
2587 } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
2588 guint i, length = gst_queue_array_get_length (priv->queue);
2589 GstMiniObject *item = NULL;
2591 /* Find the oldest buffer or buffer list and drop it, then update the
2592 * limits. Dropping one is sufficient to go below the limits again.
2594 for (i = 0; i < length; i++) {
2595 item = gst_queue_array_peek_nth (priv->queue, i);
2596 if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
2597 gst_queue_array_drop_element (priv->queue, i);
2600 /* To not accidentally have an event after the loop */
2605 GST_FIXME_OBJECT (appsrc,
2606 "No buffer or buffer list queued but queue is full");
2607 /* This shouldn't really happen but in this case we can't really do
2608 * anything apart from accepting the buffer / bufferlist */
2612 GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
2614 gst_app_src_update_queued_pop (appsrc, item, FALSE);
2615 gst_mini_object_unref (item);
2617 priv->need_discont_downstream = TRUE;
2622 /* continue to check for flushing/eos after releasing the lock */
2627 GST_DEBUG_OBJECT (appsrc, "waiting for free space");
2628 /* we are filled, wait until a buffer gets popped or when we
2630 priv->wait_status |= APP_WAITING;
2631 g_cond_wait (&priv->cond, &priv->mutex);
2632 priv->wait_status &= ~APP_WAITING;
2634 /* no need to wait for free space, we just pump more data into the
2635 * queue hoping that the caller reacts to the enough-data signal and
2636 * stops pushing buffers. */
2644 if (priv->pending_custom_segment) {
2645 GstEvent *event = gst_event_new_segment (&priv->last_segment);
2647 GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
2648 gst_queue_array_push_tail (priv->queue, event);
2649 priv->pending_custom_segment = FALSE;
2652 if (buflist != NULL) {
2653 /* Mark the first buffer of the buffer list as DISCONT if we previously
2654 * dropped a buffer instead of queueing it */
2655 if (priv->need_discont_upstream) {
2657 buflist = gst_buffer_list_copy (buflist);
2660 buflist = gst_buffer_list_make_writable (buflist);
2662 buffer = gst_buffer_list_get_writable (buflist, 0);
2663 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2664 priv->need_discont_upstream = FALSE;
2667 GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
2670 gst_buffer_list_ref (buflist);
2671 gst_queue_array_push_tail (priv->queue, buflist);
2673 /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
2675 if (priv->need_discont_upstream) {
2677 buffer = gst_buffer_copy (buffer);
2680 buffer = gst_buffer_make_writable (buffer);
2682 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2683 priv->need_discont_upstream = FALSE;
2686 GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
2688 gst_buffer_ref (buffer);
2689 gst_queue_array_push_tail (priv->queue, buffer);
2692 gst_app_src_update_queued_push (appsrc,
2693 buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
2695 if ((priv->wait_status & STREAM_WAITING))
2696 g_cond_broadcast (&priv->cond);
2698 g_mutex_unlock (&priv->mutex);
2705 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
2708 gst_buffer_list_unref (buflist);
2710 gst_buffer_unref (buffer);
2712 g_mutex_unlock (&priv->mutex);
2713 return GST_FLOW_FLUSHING;
2717 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
2720 gst_buffer_list_unref (buflist);
2722 gst_buffer_unref (buffer);
2724 g_mutex_unlock (&priv->mutex);
2725 return GST_FLOW_EOS;
2729 GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
2732 gst_buffer_list_unref (buflist);
2734 gst_buffer_unref (buffer);
2736 g_mutex_unlock (&priv->mutex);
2737 return GST_FLOW_EOS;
2741 static GstFlowReturn
2742 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
2745 return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
2748 static GstFlowReturn
2749 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
2751 GstAppSrcPrivate *priv = appsrc->priv;
2752 GstBufferList *buffer_list;
2756 g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
2758 caps = gst_sample_get_caps (sample);
2760 gst_app_src_set_caps (appsrc, caps);
2762 GST_WARNING_OBJECT (appsrc, "received sample without caps");
2765 if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
2766 GstSegment *segment = gst_sample_get_segment (sample);
2768 if (segment->format != GST_FORMAT_TIME) {
2769 GST_LOG_OBJECT (appsrc, "format %s is not supported",
2770 gst_format_get_name (segment->format));
2774 g_mutex_lock (&priv->mutex);
2775 if (gst_segment_is_equal (&priv->last_segment, segment)) {
2776 GST_LOG_OBJECT (appsrc, "segment wasn't changed");
2777 g_mutex_unlock (&priv->mutex);
2780 GST_LOG_OBJECT (appsrc,
2781 "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
2782 &priv->last_segment, segment);
2785 /* will be pushed to queue with next buffer/buffer-list */
2786 gst_segment_copy_into (segment, &priv->last_segment);
2787 priv->pending_custom_segment = TRUE;
2788 g_mutex_unlock (&priv->mutex);
2793 buffer = gst_sample_get_buffer (sample);
2795 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2797 buffer_list = gst_sample_get_buffer_list (sample);
2798 if (buffer_list != NULL)
2799 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2801 GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
2806 * gst_app_src_push_buffer:
2807 * @appsrc: a #GstAppSrc
2808 * @buffer: (transfer full): a #GstBuffer to push
2810 * Adds a buffer to the queue of buffers that the appsrc element will
2811 * push to its source pad. This function takes ownership of the buffer.
2813 * When the block property is TRUE, this function can block until free
2814 * space becomes available in the queue.
2816 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2817 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2818 * #GST_FLOW_EOS when EOS occurred.
2821 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
2823 return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
2827 * gst_app_src_push_buffer_list:
2828 * @appsrc: a #GstAppSrc
2829 * @buffer_list: (transfer full): a #GstBufferList to push
2831 * Adds a buffer list to the queue of buffers and buffer lists that the
2832 * appsrc element will push to its source pad. This function takes ownership
2835 * When the block property is TRUE, this function can block until free
2836 * space becomes available in the queue.
2838 * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
2839 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2840 * #GST_FLOW_EOS when EOS occurred.
2845 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
2847 return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
2851 * gst_app_src_push_sample:
2852 * @appsrc: a #GstAppSrc
2853 * @sample: (transfer none): a #GstSample from which buffer and caps may be
2856 * Extract a buffer from the provided sample and adds it to the queue of
2857 * buffers that the appsrc element will push to its source pad. Any
2858 * previous caps that were set on appsrc will be replaced by the caps
2859 * associated with the sample if not equal.
2861 * This function does not take ownership of the
2862 * sample so the sample needs to be unreffed after calling this function.
2864 * When the block property is TRUE, this function can block until free
2865 * space becomes available in the queue.
2867 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2868 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2869 * #GST_FLOW_EOS when EOS occurred.
2875 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2877 return gst_app_src_push_sample_internal (appsrc, sample);
2880 /* push a buffer without stealing the ref of the buffer. This is used for the
2882 static GstFlowReturn
2883 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2885 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2888 /* push a buffer list without stealing the ref of the buffer list. This is
2889 * used for the action signal. */
2890 static GstFlowReturn
2891 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2892 GstBufferList * buffer_list)
2894 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2897 /* push a sample without stealing the ref. This is used for the
2899 static GstFlowReturn
2900 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2902 return gst_app_src_push_sample_internal (appsrc, sample);
2906 * gst_app_src_end_of_stream:
2907 * @appsrc: a #GstAppSrc
2909 * Indicates to the appsrc element that the last buffer queued in the
2910 * element is the last buffer of the stream.
2912 * Returns: #GST_FLOW_OK when the EOS was successfully queued.
2913 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2916 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2918 GstAppSrcPrivate *priv;
2920 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2922 priv = appsrc->priv;
2924 g_mutex_lock (&priv->mutex);
2925 /* can't accept buffers when we are flushing. We can accept them when we are
2926 * EOS although it will not do anything. */
2930 GST_DEBUG_OBJECT (appsrc, "sending EOS");
2931 priv->is_eos = TRUE;
2932 g_cond_broadcast (&priv->cond);
2933 g_mutex_unlock (&priv->mutex);
2940 g_mutex_unlock (&priv->mutex);
2941 GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2942 return GST_FLOW_FLUSHING;
2947 * gst_app_src_set_callbacks: (skip)
2948 * @appsrc: a #GstAppSrc
2949 * @callbacks: the callbacks
2950 * @user_data: a user_data argument for the callbacks
2951 * @notify: a destroy notify function
2953 * Set callbacks which will be executed when data is needed, enough data has
2954 * been collected or when a seek should be performed.
2955 * This is an alternative to using the signals, it has lower overhead and is thus
2956 * less expensive, but also less flexible.
2958 * If callbacks are installed, no signals will be emitted for performance
2961 * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
2965 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2966 GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2968 Callbacks *old_callbacks, *new_callbacks = NULL;
2969 GstAppSrcPrivate *priv;
2971 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2972 g_return_if_fail (callbacks != NULL);
2974 priv = appsrc->priv;
2977 new_callbacks = g_new0 (Callbacks, 1);
2978 new_callbacks->callbacks = *callbacks;
2979 new_callbacks->user_data = user_data;
2980 new_callbacks->destroy_notify = notify;
2981 new_callbacks->ref_count = 1;
2984 g_mutex_lock (&priv->mutex);
2985 old_callbacks = g_steal_pointer (&priv->callbacks);
2986 priv->callbacks = g_steal_pointer (&new_callbacks);
2987 g_mutex_unlock (&priv->mutex);
2989 g_clear_pointer (&old_callbacks, callbacks_unref);
2992 /*** GSTURIHANDLER INTERFACE *************************************************/
2995 gst_app_src_uri_get_type (GType type)
3000 static const gchar *const *
3001 gst_app_src_uri_get_protocols (GType type)
3003 static const gchar *protocols[] = { "appsrc", NULL };
3009 gst_app_src_uri_get_uri (GstURIHandler * handler)
3011 GstAppSrc *appsrc = GST_APP_SRC (handler);
3013 return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
3017 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
3020 GstAppSrc *appsrc = GST_APP_SRC (handler);
3022 g_free (appsrc->priv->uri);
3023 appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
3029 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
3031 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
3033 iface->get_type = gst_app_src_uri_get_type;
3034 iface->get_protocols = gst_app_src_uri_get_protocols;
3035 iface->get_uri = gst_app_src_uri_get_uri;
3036 iface->set_uri = gst_app_src_uri_set_uri;
3040 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
3042 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
3043 GstAppSrcPrivate *priv = appsrc->priv;
3045 switch (GST_EVENT_TYPE (event)) {
3046 case GST_EVENT_FLUSH_STOP:
3047 g_mutex_lock (&priv->mutex);
3048 priv->is_eos = FALSE;
3049 g_mutex_unlock (&priv->mutex);
3055 return GST_BASE_SRC_CLASS (parent_class)->event (src, event);