gst-libs/gst/app/gstappsrc.c (gst_app_src_set_max_bytes)
[platform/upstream/gstreamer.git] / gst-libs / gst / app / gstappsrc.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <gst/gst.h>
26 #include <gst/base/gstbasesrc.h>
27
28 #include <string.h>
29
30 #include "gstapp-marshal.h"
31 #include "gstappsrc.h"
32
33
34 GST_DEBUG_CATEGORY (app_src_debug);
35 #define GST_CAT_DEFAULT app_src_debug
36
37 static const GstElementDetails app_src_details = GST_ELEMENT_DETAILS ("AppSrc",
38     "Generic/Src",
39     "Allow the application to feed buffers to a pipeline",
40     "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com");
41
42 enum
43 {
44   /* signals */
45   SIGNAL_NEED_DATA,
46   SIGNAL_ENOUGH_DATA,
47   SIGNAL_SEEK_DATA,
48
49   /* actions */
50   SIGNAL_PUSH_BUFFER,
51   SIGNAL_END_OF_STREAM,
52
53   LAST_SIGNAL
54 };
55
56 #define DEFAULT_PROP_SIZE          -1
57 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
58 #define DEFAULT_PROP_MAX_BYTES     200000
59 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
60 #define DEFAULT_PROP_BLOCK         FALSE
61
62 enum
63 {
64   PROP_0,
65   PROP_CAPS,
66   PROP_SIZE,
67   PROP_STREAM_TYPE,
68   PROP_MAX_BYTES,
69   PROP_FORMAT,
70   PROP_BLOCK,
71
72   PROP_LAST
73 };
74
75 static GstStaticPadTemplate gst_app_src_template =
76 GST_STATIC_PAD_TEMPLATE ("src",
77     GST_PAD_SRC,
78     GST_PAD_ALWAYS,
79     GST_STATIC_CAPS_ANY);
80
81
82 #define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
83 static GType
84 stream_type_get_type (void)
85 {
86   static GType stream_type_type = 0;
87   static const GEnumValue stream_type[] = {
88     {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
89     {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
90     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
91     {0, NULL, NULL},
92   };
93
94   if (!stream_type_type) {
95     stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
96   }
97   return stream_type_type;
98 }
99
100 static void gst_app_src_uri_handler_init (gpointer g_iface,
101     gpointer iface_data);
102
103 static void gst_app_src_dispose (GObject * object);
104 static void gst_app_src_finalize (GObject * object);
105
106 static void gst_app_src_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_app_src_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110
111 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
112     guint64 offset, guint size, GstBuffer ** buf);
113 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
114 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
115 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
116 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
117 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
118 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
119 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
120 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
121
122 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
123
124 static void
125 _do_init (GType filesrc_type)
126 {
127   static const GInterfaceInfo urihandler_info = {
128     gst_app_src_uri_handler_init,
129     NULL,
130     NULL
131   };
132   g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
133       &urihandler_info);
134 }
135
136 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
137     _do_init);
138
139 static void
140 gst_app_src_base_init (gpointer g_class)
141 {
142   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
143
144   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
145
146   gst_element_class_set_details (element_class, &app_src_details);
147
148   gst_element_class_add_pad_template (element_class,
149       gst_static_pad_template_get (&gst_app_src_template));
150 }
151
152 static void
153 gst_app_src_class_init (GstAppSrcClass * klass)
154 {
155   GObjectClass *gobject_class = (GObjectClass *) klass;
156   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
157
158   gobject_class->dispose = gst_app_src_dispose;
159   gobject_class->finalize = gst_app_src_finalize;
160
161   gobject_class->set_property = gst_app_src_set_property;
162   gobject_class->get_property = gst_app_src_get_property;
163
164   g_object_class_install_property (gobject_class, PROP_CAPS,
165       g_param_spec_boxed ("caps", "Caps",
166           "The allowed caps for the src pad", GST_TYPE_CAPS,
167           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
168
169   g_object_class_install_property (gobject_class, PROP_FORMAT,
170       g_param_spec_enum ("format", "Format",
171           "The format of the segment events and seek", GST_TYPE_FORMAT,
172           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173
174   g_object_class_install_property (gobject_class, PROP_SIZE,
175       g_param_spec_int64 ("size", "Size",
176           "The size of the data stream (-1 if unknown)",
177           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
178           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
179
180   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
181       g_param_spec_enum ("stream-type", "Stream Type",
182           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
183           DEFAULT_PROP_STREAM_TYPE,
184           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
187       g_param_spec_uint64 ("max-bytes", "Max bytes",
188           "The maximum number of bytes to queue internally (0 = unlimited)",
189           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
190           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191
192   g_object_class_install_property (gobject_class, PROP_BLOCK,
193       g_param_spec_boolean ("block", "Block",
194           "Block push-buffer when max-bytes are queued",
195           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   /**
198    * GstAppSrc::need-data:
199    * @appsrc: the appsrc element that emited the signal
200    * @length: the amount of bytes needed.
201    *
202    * Signal that the source needs more data. In the callback or from another
203    * thread you should call push-buffer or end-of-stream.
204    *
205    * @length is just a hint and when it is set to -1, any number of bytes can be
206    * pushed into @appsrc.
207    *
208    * You can call push-buffer multiple times until the enough-data signal is
209    * fired.
210    */
211   gst_app_src_signals[SIGNAL_NEED_DATA] =
212       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
213       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
214       NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
215
216   /**
217    * GstAppSrc::enough-data:
218    * @appsrc: the appsrc element that emited the signal
219    *
220    * Signal that the source has enough data. It is recommended that the
221    * application stops calling push-buffer until the need-data signal is
222    * emited again to avoid excessive buffer queueing.
223    */
224   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
225       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
226       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
227       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
228
229   /**
230    * GstAppSrc::seek-data:
231    * @appsrc: the appsrc element that emited the signal
232    * @offset: the offset to seek to
233    *
234    * Seek to the given offset. The next push-buffer should produce buffers from
235    * the new @offset.
236    * This callback is only called for seekable stream types.
237    *
238    * Returns: %TRUE if the seek succeeded.
239    */
240   gst_app_src_signals[SIGNAL_SEEK_DATA] =
241       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
242       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
243       NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
244       G_TYPE_UINT64);
245
246    /**
247     * GstAppSrc::push-buffer:
248     * @appsrc: the appsrc
249     * @buffer: a buffer to push
250     *
251     * Adds a buffer to the queue of buffers that the appsrc element will
252     * push to its source pad. This function will take ownership of @buffer.
253     */
254   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
255       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
256       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
257           push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT,
258       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
259
260    /**
261     * GstAppSrc::end-of-stream:
262     * @appsrc: the appsrc
263     *
264     * Notify @appsrc that no more buffer are available. 
265     */
266   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
267       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
268       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
269           end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID,
270       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
271
272   basesrc_class->create = gst_app_src_create;
273   basesrc_class->start = gst_app_src_start;
274   basesrc_class->stop = gst_app_src_stop;
275   basesrc_class->unlock = gst_app_src_unlock;
276   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
277   basesrc_class->do_seek = gst_app_src_do_seek;
278   basesrc_class->is_seekable = gst_app_src_is_seekable;
279   basesrc_class->check_get_range = gst_app_src_check_get_range;
280   basesrc_class->get_size = gst_app_src_do_get_size;
281
282   klass->push_buffer = gst_app_src_push_buffer;
283   klass->end_of_stream = gst_app_src_end_of_stream;
284 }
285
286 static void
287 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
288 {
289   appsrc->mutex = g_mutex_new ();
290   appsrc->cond = g_cond_new ();
291   appsrc->queue = g_queue_new ();
292
293   appsrc->size = DEFAULT_PROP_SIZE;
294   appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE;
295   appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES;
296   appsrc->format = DEFAULT_PROP_FORMAT;
297   appsrc->block = DEFAULT_PROP_BLOCK;
298 }
299
300 static void
301 gst_app_src_flush_queued (GstAppSrc * src)
302 {
303   GstBuffer *buf;
304
305   while ((buf = g_queue_pop_head (src->queue)))
306     gst_buffer_unref (buf);
307 }
308
309 static void
310 gst_app_src_dispose (GObject * obj)
311 {
312   GstAppSrc *appsrc = GST_APP_SRC (obj);
313
314   if (appsrc->caps) {
315     gst_caps_unref (appsrc->caps);
316     appsrc->caps = NULL;
317   }
318   gst_app_src_flush_queued (appsrc);
319
320   G_OBJECT_CLASS (parent_class)->dispose (obj);
321 }
322
323 static void
324 gst_app_src_finalize (GObject * obj)
325 {
326   GstAppSrc *appsrc = GST_APP_SRC (obj);
327
328   g_mutex_free (appsrc->mutex);
329   g_cond_free (appsrc->cond);
330   g_queue_free (appsrc->queue);
331
332   G_OBJECT_CLASS (parent_class)->finalize (obj);
333 }
334
335 static void
336 gst_app_src_set_property (GObject * object, guint prop_id,
337     const GValue * value, GParamSpec * pspec)
338 {
339   GstAppSrc *appsrc = GST_APP_SRC (object);
340
341   switch (prop_id) {
342     case PROP_CAPS:
343       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
344       break;
345     case PROP_SIZE:
346       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
347       break;
348     case PROP_STREAM_TYPE:
349       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
350       break;
351     case PROP_MAX_BYTES:
352       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
353       break;
354     case PROP_FORMAT:
355       appsrc->format = g_value_get_enum (value);
356       break;
357     case PROP_BLOCK:
358       appsrc->block = g_value_get_boolean (value);
359       break;
360     default:
361       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
362       break;
363   }
364 }
365
366 static void
367 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
368     GParamSpec * pspec)
369 {
370   GstAppSrc *appsrc = GST_APP_SRC (object);
371
372   switch (prop_id) {
373     case PROP_CAPS:
374     {
375       GstCaps *caps;
376
377       /* we're missing a _take_caps() function to transfer ownership */
378       caps = gst_app_src_get_caps (appsrc);
379       gst_value_set_caps (value, caps);
380       if (caps)
381         gst_caps_unref (caps);
382       break;
383     }
384     case PROP_SIZE:
385       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
386       break;
387     case PROP_STREAM_TYPE:
388       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
389       break;
390     case PROP_MAX_BYTES:
391       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
392       break;
393     case PROP_FORMAT:
394       g_value_set_enum (value, appsrc->format);
395       break;
396     case PROP_BLOCK:
397       g_value_set_boolean (value, appsrc->block);
398       break;
399     default:
400       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
401       break;
402   }
403 }
404
405 static gboolean
406 gst_app_src_unlock (GstBaseSrc * bsrc)
407 {
408   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
409
410   g_mutex_lock (appsrc->mutex);
411   GST_DEBUG_OBJECT (appsrc, "unlock start");
412   appsrc->flushing = TRUE;
413   g_cond_broadcast (appsrc->cond);
414   g_mutex_unlock (appsrc->mutex);
415
416   return TRUE;
417 }
418
419 static gboolean
420 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
421 {
422   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
423
424   g_mutex_lock (appsrc->mutex);
425   GST_DEBUG_OBJECT (appsrc, "unlock stop");
426   appsrc->flushing = FALSE;
427   g_cond_broadcast (appsrc->cond);
428   g_mutex_unlock (appsrc->mutex);
429
430   return TRUE;
431 }
432
433 static gboolean
434 gst_app_src_start (GstBaseSrc * bsrc)
435 {
436   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
437
438   g_mutex_lock (appsrc->mutex);
439   GST_DEBUG_OBJECT (appsrc, "starting");
440   appsrc->started = TRUE;
441   /* set the offset to -1 so that we always do a first seek. This is only used
442    * in random-access mode. */
443   appsrc->offset = -1;
444   appsrc->flushing = FALSE;
445   g_mutex_unlock (appsrc->mutex);
446
447   gst_base_src_set_format (bsrc, appsrc->format);
448
449   return TRUE;
450 }
451
452 static gboolean
453 gst_app_src_stop (GstBaseSrc * bsrc)
454 {
455   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
456
457   g_mutex_lock (appsrc->mutex);
458   GST_DEBUG_OBJECT (appsrc, "stopping");
459   appsrc->is_eos = FALSE;
460   appsrc->flushing = TRUE;
461   appsrc->started = FALSE;
462   gst_app_src_flush_queued (appsrc);
463   g_mutex_unlock (appsrc->mutex);
464
465   return TRUE;
466 }
467
468 static gboolean
469 gst_app_src_is_seekable (GstBaseSrc * src)
470 {
471   GstAppSrc *appsrc = GST_APP_SRC (src);
472   gboolean res = FALSE;
473
474   switch (appsrc->stream_type) {
475     case GST_APP_STREAM_TYPE_STREAM:
476       break;
477     case GST_APP_STREAM_TYPE_SEEKABLE:
478     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
479       res = TRUE;
480       break;
481   }
482   return res;
483 }
484
485 static gboolean
486 gst_app_src_check_get_range (GstBaseSrc * src)
487 {
488   GstAppSrc *appsrc = GST_APP_SRC (src);
489   gboolean res = FALSE;
490
491   switch (appsrc->stream_type) {
492     case GST_APP_STREAM_TYPE_STREAM:
493     case GST_APP_STREAM_TYPE_SEEKABLE:
494       break;
495     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
496       res = TRUE;
497       break;
498   }
499   return res;
500 }
501
502 static gboolean
503 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
504 {
505   GstAppSrc *appsrc = GST_APP_SRC (src);
506
507   *size = gst_app_src_get_size (appsrc);
508
509   return TRUE;
510 }
511
512 /* will be called in push mode */
513 static gboolean
514 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
515 {
516   GstAppSrc *appsrc = GST_APP_SRC (src);
517   gint64 desired_position;
518   gboolean res = FALSE;
519
520   desired_position = segment->last_stop;
521
522   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
523       desired_position, gst_format_get_name (segment->format));
524
525   /* no need to try to seek in streaming mode */
526   if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM)
527     return TRUE;
528
529   g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
530       desired_position, &res);
531
532   if (res) {
533     GST_DEBUG_OBJECT (appsrc, "flushing queue");
534     gst_app_src_flush_queued (appsrc);
535   } else {
536     GST_WARNING_OBJECT (appsrc, "seek failed");
537   }
538
539   return res;
540 }
541
542 static GstFlowReturn
543 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
544     GstBuffer ** buf)
545 {
546   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
547   GstFlowReturn ret;
548
549   g_mutex_lock (appsrc->mutex);
550   /* check flushing first */
551   if (G_UNLIKELY (appsrc->flushing))
552     goto flushing;
553
554   if (appsrc->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
555     /* if we are dealing with a random-access stream, issue a seek if the offset
556      * changed. */
557     if (G_UNLIKELY (appsrc->offset != offset)) {
558       gboolean res;
559
560       g_mutex_unlock (appsrc->mutex);
561
562       GST_DEBUG_OBJECT (appsrc,
563           "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
564           appsrc->offset, offset);
565
566       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
567           offset, &res);
568
569       if (G_UNLIKELY (!res))
570         /* failing to seek is fatal */
571         goto seek_error;
572
573       g_mutex_lock (appsrc->mutex);
574
575       appsrc->offset = offset;
576     }
577   }
578
579   while (TRUE) {
580     /* return data as long as we have some */
581     if (!g_queue_is_empty (appsrc->queue)) {
582       guint buf_size;
583
584       *buf = g_queue_pop_head (appsrc->queue);
585       buf_size = GST_BUFFER_SIZE (*buf);
586
587       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
588
589       appsrc->queued_bytes -= buf_size;
590
591       /* only update the offset when in random_access mode */
592       if (appsrc->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
593         appsrc->offset += buf_size;
594       }
595
596       gst_buffer_set_caps (*buf, appsrc->caps);
597
598       /* signal that we removed an item */
599       g_cond_broadcast (appsrc->cond);
600
601       ret = GST_FLOW_OK;
602       break;
603     } else {
604       g_mutex_unlock (appsrc->mutex);
605
606       /* we have no data, we need some. We fire the signal with the size hint. */
607       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
608           NULL);
609
610       g_mutex_lock (appsrc->mutex);
611       /* we can be flushing now because we released the lock */
612       if (G_UNLIKELY (appsrc->flushing))
613         goto flushing;
614
615       /* if we have a buffer now, continue the loop and try to return it. In
616        * random-access mode (where a buffer is normally pushed in the above
617        * signal) we can still be empty because the pushed buffer got flushed or
618        * when the application pushes the requested buffer later, we support both
619        * possiblities. */
620       if (!g_queue_is_empty (appsrc->queue))
621         continue;
622
623       /* no buffer yet, maybe we are EOS, if not, block for more data. */
624     }
625
626     /* check EOS */
627     if (G_UNLIKELY (appsrc->is_eos))
628       goto eos;
629
630     /* nothing to return, wait a while for new data or flushing. */
631     g_cond_wait (appsrc->cond, appsrc->mutex);
632   }
633   g_mutex_unlock (appsrc->mutex);
634
635   return ret;
636
637   /* ERRORS */
638 flushing:
639   {
640     GST_DEBUG_OBJECT (appsrc, "we are flushing");
641     g_mutex_unlock (appsrc->mutex);
642     return GST_FLOW_WRONG_STATE;
643   }
644 eos:
645   {
646     GST_DEBUG_OBJECT (appsrc, "we are EOS");
647     g_mutex_unlock (appsrc->mutex);
648     return GST_FLOW_UNEXPECTED;
649   }
650 seek_error:
651   {
652     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
653         GST_ERROR_SYSTEM);
654     return GST_FLOW_ERROR;
655   }
656 }
657
658 /* external API */
659
660 /**
661  * gst_app_src_set_caps:
662  * @appsrc: a #GstAppSrc
663  * @caps: caps to set
664  *
665  * Set the capabilities on the appsrc element.  This function takes
666  * a copy of the caps structure. After calling this method, the source will
667  * only produce caps that match @caps. @caps must be fixed and the caps on the
668  * buffers must match the caps or left NULL.
669  */
670 void
671 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
672 {
673   GstCaps *old;
674
675   g_return_if_fail (GST_IS_APP_SRC (appsrc));
676
677   GST_OBJECT_LOCK (appsrc);
678   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
679   if ((old = appsrc->caps) != caps) {
680     if (caps)
681       appsrc->caps = gst_caps_copy (caps);
682     else
683       appsrc->caps = NULL;
684     if (old)
685       gst_caps_unref (old);
686   }
687   GST_OBJECT_UNLOCK (appsrc);
688 }
689
690 /**
691  * gst_app_src_get_caps:
692  * @appsrc: a #GstAppSrc
693  *
694  * Get the configured caps on @appsrc.
695  *
696  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
697  */
698 GstCaps *
699 gst_app_src_get_caps (GstAppSrc * appsrc)
700 {
701   GstCaps *caps;
702
703   g_return_val_if_fail (appsrc != NULL, NULL);
704   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
705
706   GST_OBJECT_LOCK (appsrc);
707   if ((caps = appsrc->caps))
708     gst_caps_ref (caps);
709   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
710   GST_OBJECT_UNLOCK (appsrc);
711
712   return caps;
713 }
714
715 /**
716  * gst_app_src_set_size:
717  * @appsrc: a #GstAppSrc
718  * @size: the size to set
719  *
720  * Set the size of the stream in bytes. A value of -1 means that the size is
721  * not known. 
722  */
723 void
724 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
725 {
726   g_return_if_fail (appsrc != NULL);
727   g_return_if_fail (GST_IS_APP_SRC (appsrc));
728
729   GST_OBJECT_LOCK (appsrc);
730   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
731   appsrc->size = size;
732   GST_OBJECT_UNLOCK (appsrc);
733 }
734
735 /**
736  * gst_app_src_get_size:
737  * @appsrc: a #GstAppSrc
738  *
739  * Get the size of the stream in bytes. A value of -1 means that the size is
740  * not known. 
741  *
742  * Returns: the size of the stream previously set with gst_app_src_set_size();
743  */
744 gint64
745 gst_app_src_get_size (GstAppSrc * appsrc)
746 {
747   gint64 size;
748
749   g_return_val_if_fail (appsrc != NULL, -1);
750   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
751
752   GST_OBJECT_LOCK (appsrc);
753   size = appsrc->size;
754   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
755   GST_OBJECT_UNLOCK (appsrc);
756
757   return size;
758 }
759
760 /**
761  * gst_app_src_set_stream_type:
762  * @appsrc: a #GstAppSrc
763  * @type: the new state
764  *
765  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
766  * be connected to.
767  *
768  * A stream_type stream 
769  */
770 void
771 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
772 {
773   g_return_if_fail (appsrc != NULL);
774   g_return_if_fail (GST_IS_APP_SRC (appsrc));
775
776   GST_OBJECT_LOCK (appsrc);
777   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
778   appsrc->stream_type = type;
779   GST_OBJECT_UNLOCK (appsrc);
780 }
781
782 /**
783  * gst_app_src_get_stream_type:
784  * @appsrc: a #GstAppSrc
785  *
786  * Get the stream type. Control the stream type of @appsrc
787  * with gst_app_src_set_stream_type().
788  *
789  * Returns: the stream type.
790  */
791 GstAppStreamType
792 gst_app_src_get_stream_type (GstAppSrc * appsrc)
793 {
794   gboolean stream_type;
795
796   g_return_val_if_fail (appsrc != NULL, FALSE);
797   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
798
799   GST_OBJECT_LOCK (appsrc);
800   stream_type = appsrc->stream_type;
801   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
802   GST_OBJECT_UNLOCK (appsrc);
803
804   return stream_type;
805 }
806
807 /**
808  * gst_app_src_set_max_bytes:
809  * @appsrc: a #GstAppSrc
810  * @max: the maximum number of bytes to queue
811  *
812  * Set the maximum amount of bytes that can be queued in @appsrc.
813  * After the maximum amount of bytes are queued, @appsrc will emit the
814  * "enough-data" signal.
815  */
816 void
817 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
818 {
819   g_return_if_fail (GST_IS_APP_SRC (appsrc));
820
821   g_mutex_lock (appsrc->mutex);
822   if (max != appsrc->max_bytes) {
823     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
824     appsrc->max_bytes = max;
825     /* signal the change */
826     g_cond_broadcast (appsrc->cond);
827   }
828   g_mutex_unlock (appsrc->mutex);
829 }
830
831 /**
832  * gst_app_src_get_max_bytes:
833  * @appsrc: a #GstAppSrc
834  *
835  * Get the maximum amount of bytes that can be queued in @appsrc.
836  *
837  * Returns: The maximum amount of bytes that can be queued.
838  */
839 guint64
840 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
841 {
842   guint64 result;
843
844   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
845
846   g_mutex_lock (appsrc->mutex);
847   result = appsrc->max_bytes;
848   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
849   g_mutex_unlock (appsrc->mutex);
850
851   return result;
852 }
853
854 /**
855  * gst_app_src_push_buffer:
856  * @appsrc: a #GstAppSrc
857  * @buffer: a #GstBuffer to push
858  *
859  * Adds a buffer to the queue of buffers that the appsrc element will
860  * push to its source pad.  This function takes ownership of the buffer.
861  *
862  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
863  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
864  * #GST_FLOW_UNEXPECTED when EOS occured.
865  */
866 GstFlowReturn
867 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
868 {
869   gboolean first = TRUE;
870
871   g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
872   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
873   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
874
875   g_mutex_lock (appsrc->mutex);
876
877   while (TRUE) {
878     /* can't accept buffers when we are flushing or EOS */
879     if (appsrc->flushing)
880       goto flushing;
881
882     if (appsrc->is_eos)
883       goto eos;
884
885     if (appsrc->queued_bytes >= appsrc->max_bytes) {
886       GST_DEBUG_OBJECT (appsrc, "queue filled (%" G_GUINT64_FORMAT " >= %"
887           G_GUINT64_FORMAT ")", appsrc->queued_bytes, appsrc->max_bytes);
888
889       if (first) {
890         /* only signal on the first push */
891         g_mutex_unlock (appsrc->mutex);
892
893         g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
894             NULL);
895
896         g_mutex_lock (appsrc->mutex);
897         /* continue to check for flushing/eos after releasing the lock */
898         first = FALSE;
899         continue;
900       }
901       if (appsrc->block) {
902         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
903         /* we are filled, wait until a buffer gets popped or when we
904          * flush. */
905         g_cond_wait (appsrc->cond, appsrc->mutex);
906       } else {
907         /* no need to wait for free space, we just pump data into the queue */
908         break;
909       }
910     } else
911       break;
912   }
913
914   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
915   g_queue_push_tail (appsrc->queue, buffer);
916   appsrc->queued_bytes += GST_BUFFER_SIZE (buffer);
917   g_cond_broadcast (appsrc->cond);
918   g_mutex_unlock (appsrc->mutex);
919
920   return GST_FLOW_OK;
921
922   /* ERRORS */
923 flushing:
924   {
925     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
926     gst_buffer_unref (buffer);
927     return GST_FLOW_WRONG_STATE;
928   }
929 eos:
930   {
931     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
932     gst_buffer_unref (buffer);
933     return GST_FLOW_UNEXPECTED;
934   }
935 }
936
937 /**
938  * gst_app_src_end_of_stream:
939  * @appsrc: a #GstAppSrc
940  *
941  * Indicates to the appsrc element that the last buffer queued in the
942  * element is the last buffer of the stream.
943  *
944  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
945  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
946  */
947 GstFlowReturn
948 gst_app_src_end_of_stream (GstAppSrc * appsrc)
949 {
950   g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
951   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
952
953   g_mutex_lock (appsrc->mutex);
954   /* can't accept buffers when we are flushing. We can accept them when we are 
955    * EOS although it will not do anything. */
956   if (appsrc->flushing)
957     goto flushing;
958
959   GST_DEBUG_OBJECT (appsrc, "sending EOS");
960   appsrc->is_eos = TRUE;
961   g_cond_broadcast (appsrc->cond);
962   g_mutex_unlock (appsrc->mutex);
963
964   return GST_FLOW_OK;
965
966   /* ERRORS */
967 flushing:
968   {
969     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
970     return GST_FLOW_WRONG_STATE;
971   }
972 }
973
974 /*** GSTURIHANDLER INTERFACE *************************************************/
975
976 static GstURIType
977 gst_app_src_uri_get_type (void)
978 {
979   return GST_URI_SRC;
980 }
981
982 static gchar **
983 gst_app_src_uri_get_protocols (void)
984 {
985   static gchar *protocols[] = { "appsrc", NULL };
986
987   return protocols;
988 }
989 static const gchar *
990 gst_app_src_uri_get_uri (GstURIHandler * handler)
991 {
992   return "appsrc";
993 }
994
995 static gboolean
996 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
997 {
998   gchar *protocol;
999   gboolean ret;
1000
1001   protocol = gst_uri_get_protocol (uri);
1002   ret = !strcmp (protocol, "appsrc");
1003   g_free (protocol);
1004
1005   return ret;
1006 }
1007
1008 static void
1009 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1010 {
1011   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1012
1013   iface->get_type = gst_app_src_uri_get_type;
1014   iface->get_protocols = gst_app_src_uri_get_protocols;
1015   iface->get_uri = gst_app_src_uri_get_uri;
1016   iface->set_uri = gst_app_src_uri_set_uri;
1017 }