25c95ebf08843ffda74d6ac7cb3bdd43152a404a
[platform/upstream/gst-plugins-base.git] / gst-libs / gst / app / gstappsink.c
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20 /**
21  * SECTION:gstappsink
22  * @short_description: Easy way for applications to extract samples from a
23  *     pipeline
24  * @see_also: #GstSample, #GstBaseSink, appsrc
25  *
26  * Appsink is a sink plugin that supports many different methods for making
27  * the application get a handle on the GStreamer data in a pipeline. Unlike
28  * most GStreamer elements, Appsink provides external API functions.
29  *
30  * appsink can be used by linking to the gstappsink.h header file to access the
31  * methods or by using the appsink action signals and properties.
32  *
33  * The normal way of retrieving samples from appsink is by using the
34  * gst_app_sink_pull_sample() and gst_app_sink_pull_preroll() methods.
35  * These methods block until a sample becomes available in the sink or when the
36  * sink is shut down or reaches EOS.
37  *
38  * Appsink will internally use a queue to collect buffers from the streaming
39  * thread. If the application is not pulling samples fast enough, this queue
40  * will consume a lot of memory over time. The "max-buffers" property can be
41  * used to limit the queue size. The "drop" property controls whether the
42  * streaming thread blocks or if older buffers are dropped when the maximum
43  * queue size is reached. Note that blocking the streaming thread can negatively
44  * affect real-time performance and should be avoided.
45  *
46  * If a blocking behaviour is not desirable, setting the "emit-signals" property
47  * to %TRUE will make appsink emit the "new-sample" and "new-preroll" signals
48  * when a sample can be pulled without blocking.
49  *
50  * The "caps" property on appsink can be used to control the formats that
51  * appsink can receive. This property can contain non-fixed caps, the format of
52  * the pulled samples can be obtained by getting the sample caps.
53  *
54  * If one of the pull-preroll or pull-sample methods return %NULL, the appsink
55  * is stopped or in the EOS state. You can check for the EOS state with the
56  * "eos" property or with the gst_app_sink_is_eos() method.
57  *
58  * The eos signal can also be used to be informed when the EOS state is reached
59  * to avoid polling.
60  *
61  * Last reviewed on 2008-12-17 (0.10.22)
62  *
63  * Since: 0.10.22
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69
70 #include <gst/gst.h>
71 #include <gst/base/gstbasesink.h>
72 #include <gst/gstbuffer.h>
73
74 #include <string.h>
75
76 #include "gstapp-marshal.h"
77 #include "gstappsink.h"
78
79 #include "gst/glib-compat-private.h"
80
81 struct _GstAppSinkPrivate
82 {
83   GstCaps *caps;
84   gboolean emit_signals;
85   guint num_buffers;
86   guint max_buffers;
87   gboolean drop;
88
89   GCond *cond;
90   GMutex *mutex;
91   GQueue *queue;
92   GstBuffer *preroll;
93   GstCaps *preroll_caps;
94   GstCaps *last_caps;
95   GstSegment last_segment;
96   gboolean flushing;
97   gboolean unlock;
98   gboolean started;
99   gboolean is_eos;
100
101   GstAppSinkCallbacks callbacks;
102   gpointer user_data;
103   GDestroyNotify notify;
104 };
105
106 GST_DEBUG_CATEGORY_STATIC (app_sink_debug);
107 #define GST_CAT_DEFAULT app_sink_debug
108
109 enum
110 {
111   /* signals */
112   SIGNAL_EOS,
113   SIGNAL_NEW_PREROLL,
114   SIGNAL_NEW_SAMPLE,
115
116   /* actions */
117   SIGNAL_PULL_PREROLL,
118   SIGNAL_PULL_SAMPLE,
119
120   LAST_SIGNAL
121 };
122
123 #define DEFAULT_PROP_EOS                TRUE
124 #define DEFAULT_PROP_EMIT_SIGNALS       FALSE
125 #define DEFAULT_PROP_MAX_BUFFERS        0
126 #define DEFAULT_PROP_DROP               FALSE
127
128 enum
129 {
130   PROP_0,
131   PROP_CAPS,
132   PROP_EOS,
133   PROP_EMIT_SIGNALS,
134   PROP_MAX_BUFFERS,
135   PROP_DROP,
136   PROP_LAST
137 };
138
139 static GstStaticPadTemplate gst_app_sink_template =
140 GST_STATIC_PAD_TEMPLATE ("sink",
141     GST_PAD_SINK,
142     GST_PAD_ALWAYS,
143     GST_STATIC_CAPS_ANY);
144
145 static void gst_app_sink_uri_handler_init (gpointer g_iface,
146     gpointer iface_data);
147
148 static void gst_app_sink_dispose (GObject * object);
149 static void gst_app_sink_finalize (GObject * object);
150
151 static void gst_app_sink_set_property (GObject * object, guint prop_id,
152     const GValue * value, GParamSpec * pspec);
153 static void gst_app_sink_get_property (GObject * object, guint prop_id,
154     GValue * value, GParamSpec * pspec);
155
156 static gboolean gst_app_sink_unlock_start (GstBaseSink * bsink);
157 static gboolean gst_app_sink_unlock_stop (GstBaseSink * bsink);
158 static gboolean gst_app_sink_start (GstBaseSink * psink);
159 static gboolean gst_app_sink_stop (GstBaseSink * psink);
160 static gboolean gst_app_sink_event (GstBaseSink * sink, GstEvent * event);
161 static gboolean gst_app_sink_query (GstBaseSink * bsink, GstQuery * query);
162 static GstFlowReturn gst_app_sink_preroll (GstBaseSink * psink,
163     GstBuffer * buffer);
164 static GstFlowReturn gst_app_sink_render (GstBaseSink * psink,
165     GstBuffer * buffer);
166 static gboolean gst_app_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
167 static GstCaps *gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter);
168
169 static guint gst_app_sink_signals[LAST_SIGNAL] = { 0 };
170
171 #define gst_app_sink_parent_class parent_class
172 G_DEFINE_TYPE_WITH_CODE (GstAppSink, gst_app_sink, GST_TYPE_BASE_SINK,
173     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
174         gst_app_sink_uri_handler_init));
175
176 static void
177 gst_app_sink_class_init (GstAppSinkClass * klass)
178 {
179   GObjectClass *gobject_class = (GObjectClass *) klass;
180   GstElementClass *element_class = (GstElementClass *) klass;
181   GstBaseSinkClass *basesink_class = (GstBaseSinkClass *) klass;
182
183   GST_DEBUG_CATEGORY_INIT (app_sink_debug, "appsink", 0, "appsink element");
184
185   gobject_class->dispose = gst_app_sink_dispose;
186   gobject_class->finalize = gst_app_sink_finalize;
187
188   gobject_class->set_property = gst_app_sink_set_property;
189   gobject_class->get_property = gst_app_sink_get_property;
190
191   g_object_class_install_property (gobject_class, PROP_CAPS,
192       g_param_spec_boxed ("caps", "Caps",
193           "The allowed caps for the sink pad", GST_TYPE_CAPS,
194           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
195
196   g_object_class_install_property (gobject_class, PROP_EOS,
197       g_param_spec_boolean ("eos", "EOS",
198           "Check if the sink is EOS or not started", DEFAULT_PROP_EOS,
199           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
200
201   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
202       g_param_spec_boolean ("emit-signals", "Emit signals",
203           "Emit new-preroll, new-buffer and new-buffer-list signals",
204           DEFAULT_PROP_EMIT_SIGNALS,
205           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206
207   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
208       g_param_spec_uint ("max-buffers", "Max Buffers",
209           "The maximum number of buffers to queue internally (0 = unlimited)",
210           0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS,
211           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
212
213   g_object_class_install_property (gobject_class, PROP_DROP,
214       g_param_spec_boolean ("drop", "Drop",
215           "Drop old buffers when the buffer queue is filled", DEFAULT_PROP_DROP,
216           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
217
218   /**
219    * GstAppSink::eos:
220    * @appsink: the appsink element that emitted the signal
221    *
222    * Signal that the end-of-stream has been reached. This signal is emitted from
223    * the steaming thread.
224    */
225   gst_app_sink_signals[SIGNAL_EOS] =
226       g_signal_new ("eos", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
227       G_STRUCT_OFFSET (GstAppSinkClass, eos),
228       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
229   /**
230    * GstAppSink::new-preroll:
231    * @appsink: the appsink element that emitted the signal
232    *
233    * Signal that a new preroll sample is available.
234    *
235    * This signal is emitted from the steaming thread and only when the
236    * "emit-signals" property is %TRUE.
237    *
238    * The new preroll sample can be retrieved with the "pull-preroll" action
239    * signal or gst_app_sink_pull_preroll() either from this signal callback
240    * or from any other thread.
241    *
242    * Note that this signal is only emitted when the "emit-signals" property is
243    * set to %TRUE, which it is not by default for performance reasons.
244    */
245   gst_app_sink_signals[SIGNAL_NEW_PREROLL] =
246       g_signal_new ("new-preroll", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
247       G_STRUCT_OFFSET (GstAppSinkClass, new_preroll),
248       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
249   /**
250    * GstAppSink::new-sample:
251    * @appsink: the appsink element that emited the signal
252    *
253    * Signal that a new sample is available.
254    *
255    * This signal is emitted from the steaming thread and only when the
256    * "emit-signals" property is %TRUE.
257    *
258    * The new sample can be retrieved with the "pull-sample" action
259    * signal or gst_app_sink_pull_sample() either from this signal callback
260    * or from any other thread.
261    *
262    * Note that this signal is only emitted when the "emit-signals" property is
263    * set to %TRUE, which it is not by default for performance reasons.
264    */
265   gst_app_sink_signals[SIGNAL_NEW_SAMPLE] =
266       g_signal_new ("new-sample", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
267       G_STRUCT_OFFSET (GstAppSinkClass, new_sample),
268       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
269
270   /**
271    * GstAppSink::pull-preroll:
272    * @appsink: the appsink element to emit this signal on
273    *
274    * Get the last preroll sample in @appsink. This was the sample that caused the
275    * appsink to preroll in the PAUSED state. This sample can be pulled many times
276    * and remains available to the application even after EOS.
277    *
278    * This function is typically used when dealing with a pipeline in the PAUSED
279    * state. Calling this function after doing a seek will give the sample right
280    * after the seek position.
281    *
282    * Note that the preroll sample will also be returned as the first sample
283    * when calling gst_app_sink_pull_sample() or the "pull-sample" action signal.
284    *
285    * If an EOS event was received before any buffers, this function returns
286    * %NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
287    *
288    * This function blocks until a preroll sample or EOS is received or the appsink
289    * element is set to the READY/NULL state.
290    *
291    * Returns: a #GstSample or NULL when the appsink is stopped or EOS.
292    */
293   gst_app_sink_signals[SIGNAL_PULL_PREROLL] =
294       g_signal_new ("pull-preroll", G_TYPE_FROM_CLASS (klass),
295       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSinkClass,
296           pull_preroll), NULL, NULL, __gst_app_marshal_BOXED__VOID,
297       GST_TYPE_SAMPLE, 0, G_TYPE_NONE);
298   /**
299    * GstAppSink::pull-sample:
300    * @appsink: the appsink element to emit this signal on
301    *
302    * This function blocks until a sample or EOS becomes available or the appsink
303    * element is set to the READY/NULL state.
304    *
305    * This function will only return samples when the appsink is in the PLAYING
306    * state. All rendered samples will be put in a queue so that the application
307    * can pull samples at its own rate.
308    *
309    * Note that when the application does not pull samples fast enough, the
310    * queued samples could consume a lot of memory, especially when dealing with
311    * raw video frames. It's possible to control the behaviour of the queue with
312    * the "drop" and "max-buffers" properties.
313    *
314    * If an EOS event was received before any buffers, this function returns
315    * %NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
316    *
317    * Returns: a #GstSample or NULL when the appsink is stopped or EOS.
318    */
319   gst_app_sink_signals[SIGNAL_PULL_SAMPLE] =
320       g_signal_new ("pull-sample", G_TYPE_FROM_CLASS (klass),
321       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSinkClass,
322           pull_sample), NULL, NULL, __gst_app_marshal_BOXED__VOID,
323       GST_TYPE_SAMPLE, 0, G_TYPE_NONE);
324
325   gst_element_class_set_details_simple (element_class, "AppSink",
326       "Generic/Sink", "Allow the application to get access to raw buffer",
327       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
328
329   gst_element_class_add_pad_template (element_class,
330       gst_static_pad_template_get (&gst_app_sink_template));
331
332   basesink_class->unlock = gst_app_sink_unlock_start;
333   basesink_class->unlock_stop = gst_app_sink_unlock_stop;
334   basesink_class->start = gst_app_sink_start;
335   basesink_class->stop = gst_app_sink_stop;
336   basesink_class->event = gst_app_sink_event;
337   basesink_class->preroll = gst_app_sink_preroll;
338   basesink_class->render = gst_app_sink_render;
339   basesink_class->get_caps = gst_app_sink_getcaps;
340   basesink_class->set_caps = gst_app_sink_setcaps;
341   basesink_class->query = gst_app_sink_query;
342
343   klass->pull_preroll = gst_app_sink_pull_preroll;
344   klass->pull_sample = gst_app_sink_pull_sample;
345
346   g_type_class_add_private (klass, sizeof (GstAppSinkPrivate));
347 }
348
349 static void
350 gst_app_sink_init (GstAppSink * appsink)
351 {
352   GstAppSinkPrivate *priv;
353
354   priv = appsink->priv =
355       G_TYPE_INSTANCE_GET_PRIVATE (appsink, GST_TYPE_APP_SINK,
356       GstAppSinkPrivate);
357
358   priv->mutex = g_mutex_new ();
359   priv->cond = g_cond_new ();
360   priv->queue = g_queue_new ();
361
362   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
363   priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
364   priv->drop = DEFAULT_PROP_DROP;
365 }
366
367 static void
368 gst_app_sink_dispose (GObject * obj)
369 {
370   GstAppSink *appsink = GST_APP_SINK_CAST (obj);
371   GstAppSinkPrivate *priv = appsink->priv;
372   GstMiniObject *queue_obj;
373
374   GST_OBJECT_LOCK (appsink);
375   if (priv->caps) {
376     gst_caps_unref (priv->caps);
377     priv->caps = NULL;
378   }
379   if (priv->notify) {
380     priv->notify (priv->user_data);
381   }
382   priv->user_data = NULL;
383   priv->notify = NULL;
384
385   GST_OBJECT_UNLOCK (appsink);
386
387   g_mutex_lock (priv->mutex);
388   while ((queue_obj = g_queue_pop_head (priv->queue)))
389     gst_mini_object_unref (queue_obj);
390   gst_buffer_replace (&priv->preroll, NULL);
391   gst_caps_replace (&priv->preroll_caps, NULL);
392   gst_caps_replace (&priv->last_caps, NULL);
393   g_mutex_unlock (priv->mutex);
394
395   G_OBJECT_CLASS (parent_class)->dispose (obj);
396 }
397
398 static void
399 gst_app_sink_finalize (GObject * obj)
400 {
401   GstAppSink *appsink = GST_APP_SINK_CAST (obj);
402   GstAppSinkPrivate *priv = appsink->priv;
403
404   g_mutex_free (priv->mutex);
405   g_cond_free (priv->cond);
406   g_queue_free (priv->queue);
407
408   G_OBJECT_CLASS (parent_class)->finalize (obj);
409 }
410
411 static void
412 gst_app_sink_set_property (GObject * object, guint prop_id,
413     const GValue * value, GParamSpec * pspec)
414 {
415   GstAppSink *appsink = GST_APP_SINK_CAST (object);
416
417   switch (prop_id) {
418     case PROP_CAPS:
419       gst_app_sink_set_caps (appsink, gst_value_get_caps (value));
420       break;
421     case PROP_EMIT_SIGNALS:
422       gst_app_sink_set_emit_signals (appsink, g_value_get_boolean (value));
423       break;
424     case PROP_MAX_BUFFERS:
425       gst_app_sink_set_max_buffers (appsink, g_value_get_uint (value));
426       break;
427     case PROP_DROP:
428       gst_app_sink_set_drop (appsink, g_value_get_boolean (value));
429       break;
430     default:
431       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
432       break;
433   }
434 }
435
436 static void
437 gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value,
438     GParamSpec * pspec)
439 {
440   GstAppSink *appsink = GST_APP_SINK_CAST (object);
441
442   switch (prop_id) {
443     case PROP_CAPS:
444     {
445       GstCaps *caps;
446
447       caps = gst_app_sink_get_caps (appsink);
448       gst_value_set_caps (value, caps);
449       if (caps)
450         gst_caps_unref (caps);
451       break;
452     }
453     case PROP_EOS:
454       g_value_set_boolean (value, gst_app_sink_is_eos (appsink));
455       break;
456     case PROP_EMIT_SIGNALS:
457       g_value_set_boolean (value, gst_app_sink_get_emit_signals (appsink));
458       break;
459     case PROP_MAX_BUFFERS:
460       g_value_set_uint (value, gst_app_sink_get_max_buffers (appsink));
461       break;
462     case PROP_DROP:
463       g_value_set_boolean (value, gst_app_sink_get_drop (appsink));
464       break;
465     default:
466       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
467       break;
468   }
469 }
470
471 static gboolean
472 gst_app_sink_unlock_start (GstBaseSink * bsink)
473 {
474   GstAppSink *appsink = GST_APP_SINK_CAST (bsink);
475   GstAppSinkPrivate *priv = appsink->priv;
476
477   g_mutex_lock (priv->mutex);
478   GST_DEBUG_OBJECT (appsink, "unlock start");
479   priv->unlock = TRUE;
480   g_cond_signal (priv->cond);
481   g_mutex_unlock (priv->mutex);
482
483   return TRUE;
484 }
485
486 static gboolean
487 gst_app_sink_unlock_stop (GstBaseSink * bsink)
488 {
489   GstAppSink *appsink = GST_APP_SINK_CAST (bsink);
490   GstAppSinkPrivate *priv = appsink->priv;
491
492   g_mutex_lock (priv->mutex);
493   GST_DEBUG_OBJECT (appsink, "unlock stop");
494   priv->unlock = FALSE;
495   g_cond_signal (priv->cond);
496   g_mutex_unlock (priv->mutex);
497
498   return TRUE;
499 }
500
501 static void
502 gst_app_sink_flush_unlocked (GstAppSink * appsink)
503 {
504   GstMiniObject *obj;
505   GstAppSinkPrivate *priv = appsink->priv;
506
507   GST_DEBUG_OBJECT (appsink, "flush stop appsink");
508   priv->is_eos = FALSE;
509   gst_buffer_replace (&priv->preroll, NULL);
510   while ((obj = g_queue_pop_head (priv->queue)))
511     gst_mini_object_unref (obj);
512   priv->num_buffers = 0;
513   g_cond_signal (priv->cond);
514 }
515
516 static gboolean
517 gst_app_sink_start (GstBaseSink * psink)
518 {
519   GstAppSink *appsink = GST_APP_SINK_CAST (psink);
520   GstAppSinkPrivate *priv = appsink->priv;
521
522   g_mutex_lock (priv->mutex);
523   GST_DEBUG_OBJECT (appsink, "starting");
524   priv->flushing = FALSE;
525   priv->started = TRUE;
526   gst_segment_init (&priv->last_segment, GST_FORMAT_TIME);
527   g_mutex_unlock (priv->mutex);
528
529   return TRUE;
530 }
531
532 static gboolean
533 gst_app_sink_stop (GstBaseSink * psink)
534 {
535   GstAppSink *appsink = GST_APP_SINK_CAST (psink);
536   GstAppSinkPrivate *priv = appsink->priv;
537
538   g_mutex_lock (priv->mutex);
539   GST_DEBUG_OBJECT (appsink, "stopping");
540   priv->flushing = TRUE;
541   priv->started = FALSE;
542   gst_app_sink_flush_unlocked (appsink);
543   gst_caps_replace (&priv->preroll_caps, NULL);
544   gst_caps_replace (&priv->last_caps, NULL);
545   g_mutex_unlock (priv->mutex);
546
547   return TRUE;
548 }
549
550 static gboolean
551 gst_app_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
552 {
553   GstAppSink *appsink = GST_APP_SINK_CAST (sink);
554   GstAppSinkPrivate *priv = appsink->priv;
555
556   g_mutex_lock (priv->mutex);
557   GST_DEBUG_OBJECT (appsink, "receiving CAPS");
558   g_queue_push_tail (priv->queue, gst_event_new_caps (caps));
559   gst_caps_replace (&priv->preroll_caps, caps);
560   g_mutex_unlock (priv->mutex);
561
562   return TRUE;
563 }
564
565 static gboolean
566 gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
567 {
568   GstAppSink *appsink = GST_APP_SINK_CAST (sink);
569   GstAppSinkPrivate *priv = appsink->priv;
570
571   switch (event->type) {
572     case GST_EVENT_SEGMENT:
573       g_mutex_lock (priv->mutex);
574       GST_DEBUG_OBJECT (appsink, "receiving SEGMENT");
575       g_queue_push_tail (priv->queue, gst_event_ref (event));
576       g_mutex_unlock (priv->mutex);
577       break;
578     case GST_EVENT_EOS:
579       g_mutex_lock (priv->mutex);
580       GST_DEBUG_OBJECT (appsink, "receiving EOS");
581       priv->is_eos = TRUE;
582       g_cond_signal (priv->cond);
583       g_mutex_unlock (priv->mutex);
584
585       /* emit EOS now */
586       if (priv->callbacks.eos)
587         priv->callbacks.eos (appsink, priv->user_data);
588       else
589         g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0);
590
591       break;
592     case GST_EVENT_FLUSH_START:
593       /* we don't have to do anything here, the base class will call unlock
594        * which will make sure we exit the _render method */
595       GST_DEBUG_OBJECT (appsink, "received FLUSH_START");
596       break;
597     case GST_EVENT_FLUSH_STOP:
598       g_mutex_lock (priv->mutex);
599       GST_DEBUG_OBJECT (appsink, "received FLUSH_STOP");
600       gst_app_sink_flush_unlocked (appsink);
601       g_mutex_unlock (priv->mutex);
602       break;
603     default:
604       break;
605   }
606   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
607 }
608
609 static GstFlowReturn
610 gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
611 {
612   GstFlowReturn res = GST_FLOW_OK;
613   GstAppSink *appsink = GST_APP_SINK_CAST (psink);
614   GstAppSinkPrivate *priv = appsink->priv;
615   gboolean emit;
616
617   g_mutex_lock (priv->mutex);
618   if (priv->flushing)
619     goto flushing;
620
621   GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer);
622   gst_buffer_replace (&priv->preroll, buffer);
623
624   g_cond_signal (priv->cond);
625   emit = priv->emit_signals;
626   g_mutex_unlock (priv->mutex);
627
628   if (priv->callbacks.new_preroll)
629     res = priv->callbacks.new_preroll (appsink, priv->user_data);
630   else if (emit)
631     g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_PREROLL], 0);
632
633   return res;
634
635 flushing:
636   {
637     GST_DEBUG_OBJECT (appsink, "we are flushing");
638     g_mutex_unlock (priv->mutex);
639     return GST_FLOW_FLUSHING;
640   }
641 }
642
643 static GstBuffer *
644 dequeue_buffer (GstAppSink * appsink)
645 {
646   GstAppSinkPrivate *priv = appsink->priv;
647   GstBuffer *buffer;
648
649   do {
650     GstMiniObject *obj;
651
652     obj = g_queue_pop_head (priv->queue);
653
654     if (GST_IS_BUFFER (obj)) {
655       buffer = GST_BUFFER_CAST (obj);
656       GST_DEBUG_OBJECT (appsink, "dequeued buffer %p", buffer);
657       priv->num_buffers--;
658       break;
659     } else if (GST_IS_EVENT (obj)) {
660       GstEvent *event = GST_EVENT_CAST (obj);
661
662       switch (GST_EVENT_TYPE (obj)) {
663         case GST_EVENT_CAPS:
664         {
665           GstCaps *caps;
666
667           gst_event_parse_caps (event, &caps);
668           GST_DEBUG_OBJECT (appsink, "activating caps %" GST_PTR_FORMAT, caps);
669           gst_caps_replace (&priv->last_caps, caps);
670           break;
671         }
672         case GST_EVENT_SEGMENT:
673           gst_event_copy_segment (event, &priv->last_segment);
674           GST_DEBUG_OBJECT (appsink, "activated segment %" GST_SEGMENT_FORMAT,
675               &priv->last_segment);
676           break;
677         default:
678           break;
679       }
680       gst_mini_object_unref (obj);
681     }
682   } while (TRUE);
683
684   return buffer;
685 }
686
687 static GstFlowReturn
688 gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
689 {
690   GstFlowReturn ret;
691   GstAppSink *appsink = GST_APP_SINK_CAST (psink);
692   GstAppSinkPrivate *priv = appsink->priv;
693   gboolean emit;
694
695 restart:
696   g_mutex_lock (priv->mutex);
697   if (priv->flushing)
698     goto flushing;
699
700   GST_DEBUG_OBJECT (appsink, "pushing render buffer %p on queue (%d)",
701       buffer, priv->num_buffers);
702
703   while (priv->max_buffers > 0 && priv->num_buffers >= priv->max_buffers) {
704     if (priv->drop) {
705       GstBuffer *old;
706
707       /* we need to drop the oldest buffer and try again */
708       if ((old = dequeue_buffer (appsink))) {
709         GST_DEBUG_OBJECT (appsink, "dropping old buffer %p", old);
710         gst_buffer_unref (old);
711       }
712     } else {
713       GST_DEBUG_OBJECT (appsink, "waiting for free space, length %d >= %d",
714           priv->num_buffers, priv->max_buffers);
715
716       if (priv->unlock) {
717         /* we are asked to unlock, call the wait_preroll method */
718         g_mutex_unlock (priv->mutex);
719         if ((ret = gst_base_sink_wait_preroll (psink)) != GST_FLOW_OK)
720           goto stopping;
721
722         /* we are allowed to continue now */
723         goto restart;
724       }
725
726       /* wait for a buffer to be removed or flush */
727       g_cond_wait (priv->cond, priv->mutex);
728       if (priv->flushing)
729         goto flushing;
730     }
731   }
732   /* we need to ref the buffer when pushing it in the queue */
733   g_queue_push_tail (priv->queue, gst_buffer_ref (buffer));
734   priv->num_buffers++;
735   g_cond_signal (priv->cond);
736   emit = priv->emit_signals;
737   g_mutex_unlock (priv->mutex);
738
739   if (priv->callbacks.new_sample)
740     priv->callbacks.new_sample (appsink, priv->user_data);
741   else if (emit)
742     g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_SAMPLE], 0);
743
744   return GST_FLOW_OK;
745
746 flushing:
747   {
748     GST_DEBUG_OBJECT (appsink, "we are flushing");
749     g_mutex_unlock (priv->mutex);
750     return GST_FLOW_FLUSHING;
751   }
752 stopping:
753   {
754     GST_DEBUG_OBJECT (appsink, "we are stopping");
755     return ret;
756   }
757 }
758
759 static GstCaps *
760 gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter)
761 {
762   GstCaps *caps;
763   GstAppSink *appsink = GST_APP_SINK_CAST (psink);
764   GstAppSinkPrivate *priv = appsink->priv;
765
766   GST_OBJECT_LOCK (appsink);
767   if ((caps = priv->caps)) {
768     if (filter)
769       caps = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
770     else
771       gst_caps_ref (caps);
772   }
773   GST_DEBUG_OBJECT (appsink, "got caps %" GST_PTR_FORMAT, caps);
774   GST_OBJECT_UNLOCK (appsink);
775
776   return caps;
777 }
778
779 static gboolean
780 gst_app_sink_query (GstBaseSink * bsink, GstQuery * query)
781 {
782   gboolean ret;
783
784   switch (GST_QUERY_TYPE (query)) {
785     case GST_QUERY_SEEKING:{
786       GstFormat fmt;
787
788       /* we don't supporting seeking */
789       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
790       gst_query_set_seeking (query, fmt, FALSE, 0, -1);
791       ret = TRUE;
792       break;
793     }
794
795     default:
796       ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
797       break;
798   }
799
800   return ret;
801 }
802
803 /* external API */
804
805 /**
806  * gst_app_sink_set_caps:
807  * @appsink: a #GstAppSink
808  * @caps: caps to set
809  *
810  * Set the capabilities on the appsink element.  This function takes
811  * a copy of the caps structure. After calling this method, the sink will only
812  * accept caps that match @caps. If @caps is non-fixed, you must check the caps
813  * on the buffers to get the actual used caps.
814  *
815  * Since: 0.10.22
816  */
817 void
818 gst_app_sink_set_caps (GstAppSink * appsink, const GstCaps * caps)
819 {
820   GstCaps *old;
821   GstAppSinkPrivate *priv;
822
823   g_return_if_fail (GST_IS_APP_SINK (appsink));
824
825   priv = appsink->priv;
826
827   GST_OBJECT_LOCK (appsink);
828   GST_DEBUG_OBJECT (appsink, "setting caps to %" GST_PTR_FORMAT, caps);
829   if ((old = priv->caps) != caps) {
830     if (caps)
831       priv->caps = gst_caps_copy (caps);
832     else
833       priv->caps = NULL;
834     if (old)
835       gst_caps_unref (old);
836   }
837   GST_OBJECT_UNLOCK (appsink);
838 }
839
840 /**
841  * gst_app_sink_get_caps:
842  * @appsink: a #GstAppSink
843  *
844  * Get the configured caps on @appsink.
845  *
846  * Returns: the #GstCaps accepted by the sink. gst_caps_unref() after usage.
847  *
848  * Since: 0.10.22
849  */
850 GstCaps *
851 gst_app_sink_get_caps (GstAppSink * appsink)
852 {
853   GstCaps *caps;
854   GstAppSinkPrivate *priv;
855
856   g_return_val_if_fail (GST_IS_APP_SINK (appsink), NULL);
857
858   priv = appsink->priv;
859
860   GST_OBJECT_LOCK (appsink);
861   if ((caps = priv->caps))
862     gst_caps_ref (caps);
863   GST_DEBUG_OBJECT (appsink, "getting caps of %" GST_PTR_FORMAT, caps);
864   GST_OBJECT_UNLOCK (appsink);
865
866   return caps;
867 }
868
869 /**
870  * gst_app_sink_is_eos:
871  * @appsink: a #GstAppSink
872  *
873  * Check if @appsink is EOS, which is when no more samples can be pulled because
874  * an EOS event was received.
875  *
876  * This function also returns %TRUE when the appsink is not in the PAUSED or
877  * PLAYING state.
878  *
879  * Returns: %TRUE if no more samples can be pulled and the appsink is EOS.
880  *
881  * Since: 0.10.22
882  */
883 gboolean
884 gst_app_sink_is_eos (GstAppSink * appsink)
885 {
886   gboolean ret;
887   GstAppSinkPrivate *priv;
888
889   g_return_val_if_fail (GST_IS_APP_SINK (appsink), FALSE);
890
891   priv = appsink->priv;
892
893   g_mutex_lock (priv->mutex);
894   if (!priv->started)
895     goto not_started;
896
897   if (priv->is_eos && priv->num_buffers == 0) {
898     GST_DEBUG_OBJECT (appsink, "we are EOS and the queue is empty");
899     ret = TRUE;
900   } else {
901     GST_DEBUG_OBJECT (appsink, "we are not yet EOS");
902     ret = FALSE;
903   }
904   g_mutex_unlock (priv->mutex);
905
906   return ret;
907
908 not_started:
909   {
910     GST_DEBUG_OBJECT (appsink, "we are stopped, return TRUE");
911     g_mutex_unlock (priv->mutex);
912     return TRUE;
913   }
914 }
915
916 /**
917  * gst_app_sink_set_emit_signals:
918  * @appsink: a #GstAppSink
919  * @emit: the new state
920  *
921  * Make appsink emit the "new-preroll" and "new-sample" signals. This option is
922  * by default disabled because signal emission is expensive and unneeded when
923  * the application prefers to operate in pull mode.
924  *
925  * Since: 0.10.22
926  */
927 void
928 gst_app_sink_set_emit_signals (GstAppSink * appsink, gboolean emit)
929 {
930   GstAppSinkPrivate *priv;
931
932   g_return_if_fail (GST_IS_APP_SINK (appsink));
933
934   priv = appsink->priv;
935
936   g_mutex_lock (priv->mutex);
937   priv->emit_signals = emit;
938   g_mutex_unlock (priv->mutex);
939 }
940
941 /**
942  * gst_app_sink_get_emit_signals:
943  * @appsink: a #GstAppSink
944  *
945  * Check if appsink will emit the "new-preroll" and "new-sample" signals.
946  *
947  * Returns: %TRUE if @appsink is emiting the "new-preroll" and "new-sample"
948  * signals.
949  *
950  * Since: 0.10.22
951  */
952 gboolean
953 gst_app_sink_get_emit_signals (GstAppSink * appsink)
954 {
955   gboolean result;
956   GstAppSinkPrivate *priv;
957
958   g_return_val_if_fail (GST_IS_APP_SINK (appsink), FALSE);
959
960   priv = appsink->priv;
961
962   g_mutex_lock (priv->mutex);
963   result = priv->emit_signals;
964   g_mutex_unlock (priv->mutex);
965
966   return result;
967 }
968
969 /**
970  * gst_app_sink_set_max_buffers:
971  * @appsink: a #GstAppSink
972  * @max: the maximum number of buffers to queue
973  *
974  * Set the maximum amount of buffers that can be queued in @appsink. After this
975  * amount of buffers are queued in appsink, any more buffers will block upstream
976  * elements until a sample is pulled from @appsink.
977  *
978  * Since: 0.10.22
979  */
980 void
981 gst_app_sink_set_max_buffers (GstAppSink * appsink, guint max)
982 {
983   GstAppSinkPrivate *priv;
984
985   g_return_if_fail (GST_IS_APP_SINK (appsink));
986
987   priv = appsink->priv;
988
989   g_mutex_lock (priv->mutex);
990   if (max != priv->max_buffers) {
991     priv->max_buffers = max;
992     /* signal the change */
993     g_cond_signal (priv->cond);
994   }
995   g_mutex_unlock (priv->mutex);
996 }
997
998 /**
999  * gst_app_sink_get_max_buffers:
1000  * @appsink: a #GstAppSink
1001  *
1002  * Get the maximum amount of buffers that can be queued in @appsink.
1003  *
1004  * Returns: The maximum amount of buffers that can be queued.
1005  *
1006  * Since: 0.10.22
1007  */
1008 guint
1009 gst_app_sink_get_max_buffers (GstAppSink * appsink)
1010 {
1011   guint result;
1012   GstAppSinkPrivate *priv;
1013
1014   g_return_val_if_fail (GST_IS_APP_SINK (appsink), 0);
1015
1016   priv = appsink->priv;
1017
1018   g_mutex_lock (priv->mutex);
1019   result = priv->max_buffers;
1020   g_mutex_unlock (priv->mutex);
1021
1022   return result;
1023 }
1024
1025 /**
1026  * gst_app_sink_set_drop:
1027  * @appsink: a #GstAppSink
1028  * @drop: the new state
1029  *
1030  * Instruct @appsink to drop old buffers when the maximum amount of queued
1031  * buffers is reached.
1032  *
1033  * Since: 0.10.22
1034  */
1035 void
1036 gst_app_sink_set_drop (GstAppSink * appsink, gboolean drop)
1037 {
1038   GstAppSinkPrivate *priv;
1039
1040   g_return_if_fail (GST_IS_APP_SINK (appsink));
1041
1042   priv = appsink->priv;
1043
1044   g_mutex_lock (priv->mutex);
1045   if (priv->drop != drop) {
1046     priv->drop = drop;
1047     /* signal the change */
1048     g_cond_signal (priv->cond);
1049   }
1050   g_mutex_unlock (priv->mutex);
1051 }
1052
1053 /**
1054  * gst_app_sink_get_drop:
1055  * @appsink: a #GstAppSink
1056  *
1057  * Check if @appsink will drop old buffers when the maximum amount of queued
1058  * buffers is reached.
1059  *
1060  * Returns: %TRUE if @appsink is dropping old buffers when the queue is
1061  * filled.
1062  *
1063  * Since: 0.10.22
1064  */
1065 gboolean
1066 gst_app_sink_get_drop (GstAppSink * appsink)
1067 {
1068   gboolean result;
1069   GstAppSinkPrivate *priv;
1070
1071   g_return_val_if_fail (GST_IS_APP_SINK (appsink), FALSE);
1072
1073   priv = appsink->priv;
1074
1075   g_mutex_lock (priv->mutex);
1076   result = priv->drop;
1077   g_mutex_unlock (priv->mutex);
1078
1079   return result;
1080 }
1081
1082 /**
1083  * gst_app_sink_pull_preroll:
1084  * @appsink: a #GstAppSink
1085  *
1086  * Get the last preroll sample in @appsink. This was the sample that caused the
1087  * appsink to preroll in the PAUSED state. This sample can be pulled many times
1088  * and remains available to the application even after EOS.
1089  *
1090  * This function is typically used when dealing with a pipeline in the PAUSED
1091  * state. Calling this function after doing a seek will give the sample right
1092  * after the seek position.
1093  *
1094  * Note that the preroll sample will also be returned as the first sample
1095  * when calling gst_app_sink_pull_sample().
1096  *
1097  * If an EOS event was received before any buffers, this function returns
1098  * %NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
1099  *
1100  * This function blocks until a preroll sample or EOS is received or the appsink
1101  * element is set to the READY/NULL state.
1102  *
1103  * Returns: a #GstBuffer or NULL when the appsink is stopped or EOS.
1104  *
1105  * Since: 0.10.22
1106  */
1107 GstSample *
1108 gst_app_sink_pull_preroll (GstAppSink * appsink)
1109 {
1110   GstSample *sample = NULL;
1111   GstAppSinkPrivate *priv;
1112
1113   g_return_val_if_fail (GST_IS_APP_SINK (appsink), NULL);
1114
1115   priv = appsink->priv;
1116
1117   g_mutex_lock (priv->mutex);
1118
1119   while (TRUE) {
1120     GST_DEBUG_OBJECT (appsink, "trying to grab a buffer");
1121     if (!priv->started)
1122       goto not_started;
1123
1124     if (priv->preroll != NULL)
1125       break;
1126
1127     if (priv->is_eos)
1128       goto eos;
1129
1130     /* nothing to return, wait */
1131     GST_DEBUG_OBJECT (appsink, "waiting for the preroll buffer");
1132     g_cond_wait (priv->cond, priv->mutex);
1133   }
1134   sample =
1135       gst_sample_new (priv->preroll, priv->preroll_caps, &priv->last_segment,
1136       NULL);
1137   GST_DEBUG_OBJECT (appsink, "we have the preroll sample %p", sample);
1138   g_mutex_unlock (priv->mutex);
1139
1140   return sample;
1141
1142   /* special conditions */
1143 eos:
1144   {
1145     GST_DEBUG_OBJECT (appsink, "we are EOS, return NULL");
1146     g_mutex_unlock (priv->mutex);
1147     return NULL;
1148   }
1149 not_started:
1150   {
1151     GST_DEBUG_OBJECT (appsink, "we are stopped, return NULL");
1152     g_mutex_unlock (priv->mutex);
1153     return NULL;
1154   }
1155 }
1156
1157 /**
1158  * gst_app_sink_pull_sample:
1159  * @appsink: a #GstAppSink
1160  *
1161  * This function blocks until a sample or EOS becomes available or the appsink
1162  * element is set to the READY/NULL state.
1163  *
1164  * This function will only return samples when the appsink is in the PLAYING
1165  * state. All rendered buffers will be put in a queue so that the application
1166  * can pull samples at its own rate. Note that when the application does not
1167  * pull samples fast enough, the queued buffers could consume a lot of memory,
1168  * especially when dealing with raw video frames.
1169  *
1170  * If an EOS event was received before any buffers, this function returns
1171  * %NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
1172  *
1173  * Returns: a #GstBuffer or NULL when the appsink is stopped or EOS.
1174  *
1175  * Since: 0.10.22
1176  */
1177
1178 GstSample *
1179 gst_app_sink_pull_sample (GstAppSink * appsink)
1180 {
1181   GstSample *sample = NULL;
1182   GstBuffer *buffer;
1183   GstAppSinkPrivate *priv;
1184
1185   g_return_val_if_fail (GST_IS_APP_SINK (appsink), NULL);
1186
1187   priv = appsink->priv;
1188
1189   g_mutex_lock (priv->mutex);
1190
1191   while (TRUE) {
1192     GST_DEBUG_OBJECT (appsink, "trying to grab a buffer");
1193     if (!priv->started)
1194       goto not_started;
1195
1196     if (priv->num_buffers > 0)
1197       break;
1198
1199     if (priv->is_eos)
1200       goto eos;
1201
1202     /* nothing to return, wait */
1203     GST_DEBUG_OBJECT (appsink, "waiting for a buffer");
1204     g_cond_wait (priv->cond, priv->mutex);
1205   }
1206   buffer = dequeue_buffer (appsink);
1207   GST_DEBUG_OBJECT (appsink, "we have a buffer %p", buffer);
1208   sample = gst_sample_new (buffer, priv->last_caps, &priv->last_segment, NULL);
1209   gst_buffer_unref (buffer);
1210
1211   g_cond_signal (priv->cond);
1212   g_mutex_unlock (priv->mutex);
1213
1214   return sample;
1215
1216   /* special conditions */
1217 eos:
1218   {
1219     GST_DEBUG_OBJECT (appsink, "we are EOS, return NULL");
1220     g_mutex_unlock (priv->mutex);
1221     return NULL;
1222   }
1223 not_started:
1224   {
1225     GST_DEBUG_OBJECT (appsink, "we are stopped, return NULL");
1226     g_mutex_unlock (priv->mutex);
1227     return NULL;
1228   }
1229 }
1230
1231 /**
1232  * gst_app_sink_set_callbacks:
1233  * @appsink: a #GstAppSink
1234  * @callbacks: the callbacks
1235  * @user_data: a user_data argument for the callbacks
1236  * @notify: a destroy notify function
1237  *
1238  * Set callbacks which will be executed for each new preroll, new sample and eos.
1239  * This is an alternative to using the signals, it has lower overhead and is thus
1240  * less expensive, but also less flexible.
1241  *
1242  * If callbacks are installed, no signals will be emitted for performance
1243  * reasons.
1244  *
1245  * Since: 0.10.23
1246  */
1247 void
1248 gst_app_sink_set_callbacks (GstAppSink * appsink,
1249     GstAppSinkCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
1250 {
1251   GDestroyNotify old_notify;
1252   GstAppSinkPrivate *priv;
1253
1254   g_return_if_fail (GST_IS_APP_SINK (appsink));
1255   g_return_if_fail (callbacks != NULL);
1256
1257   priv = appsink->priv;
1258
1259   GST_OBJECT_LOCK (appsink);
1260   old_notify = priv->notify;
1261
1262   if (old_notify) {
1263     gpointer old_data;
1264
1265     old_data = priv->user_data;
1266
1267     priv->user_data = NULL;
1268     priv->notify = NULL;
1269     GST_OBJECT_UNLOCK (appsink);
1270
1271     old_notify (old_data);
1272
1273     GST_OBJECT_LOCK (appsink);
1274   }
1275   priv->callbacks = *callbacks;
1276   priv->user_data = user_data;
1277   priv->notify = notify;
1278   GST_OBJECT_UNLOCK (appsink);
1279 }
1280
1281 /*** GSTURIHANDLER INTERFACE *************************************************/
1282
1283 static GstURIType
1284 gst_app_sink_uri_get_type (GType type)
1285 {
1286   return GST_URI_SINK;
1287 }
1288
1289 static const gchar *const *
1290 gst_app_sink_uri_get_protocols (GType type)
1291 {
1292   static const gchar *protocols[] = { "appsink", NULL };
1293
1294   return protocols;
1295 }
1296
1297 static gchar *
1298 gst_app_sink_uri_get_uri (GstURIHandler * handler)
1299 {
1300   return g_strdup ("appsink");
1301 }
1302
1303 static gboolean
1304 gst_app_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1305     GError ** error)
1306 {
1307   /* GstURIHandler checks the protocol for us */
1308   return TRUE;
1309 }
1310
1311 static void
1312 gst_app_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
1313 {
1314   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1315
1316   iface->get_type = gst_app_sink_uri_get_type;
1317   iface->get_protocols = gst_app_sink_uri_get_protocols;
1318   iface->get_uri = gst_app_sink_uri_get_uri;
1319   iface->set_uri = gst_app_sink_uri_set_uri;
1320
1321 }