gst/gstqueue.c: Reverted to 1.110 until this makes the testsuite and various apps...
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
34     GST_PAD_SINK,
35     GST_PAD_ALWAYS,
36     GST_STATIC_CAPS_ANY);
37
38 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
39     GST_PAD_SRC,
40     GST_PAD_ALWAYS,
41     GST_STATIC_CAPS_ANY);
42
43 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
44
45 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
46     "Generic",
47     "Simple data queue",
48     "Erik Walthinsen <omega@cse.ogi.edu>");
49
50
51 /* Queue signals and args */
52 enum
53 {
54   SIGNAL_UNDERRUN,
55   SIGNAL_RUNNING,
56   SIGNAL_OVERRUN,
57   LAST_SIGNAL
58 };
59
60 enum
61 {
62   ARG_0,
63   /* FIXME: don't we have another way of doing this
64    * "Gstreamer format" (frame/byte/time) queries? */
65   ARG_CUR_LEVEL_BUFFERS,
66   ARG_CUR_LEVEL_BYTES,
67   ARG_CUR_LEVEL_TIME,
68   ARG_MAX_SIZE_BUFFERS,
69   ARG_MAX_SIZE_BYTES,
70   ARG_MAX_SIZE_TIME,
71   ARG_MIN_THRESHOLD_BUFFERS,
72   ARG_MIN_THRESHOLD_BYTES,
73   ARG_MIN_THRESHOLD_TIME,
74   ARG_LEAKY,
75   ARG_MAY_DEADLOCK,
76   ARG_BLOCK_TIMEOUT
77       /* FILL ME */
78 };
79
80 #define GST_QUEUE_MUTEX_LOCK G_STMT_START {                             \
81   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
82       "locking qlock from thread %p",                                   \
83       g_thread_self ());                                                \
84   g_mutex_lock (queue->qlock);                                          \
85   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
86       "locked qlock from thread %p",                                    \
87       g_thread_self ());                                                \
88 } G_STMT_END
89
90 #define GST_QUEUE_MUTEX_UNLOCK G_STMT_START {                           \
91   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
92       "unlocking qlock from thread %p",                                 \
93       g_thread_self ());                                                \
94   g_mutex_unlock (queue->qlock);                                        \
95 } G_STMT_END
96
97
98 typedef struct _GstQueueEventResponse
99 {
100   GstEvent *event;
101   gboolean ret, handled;
102 }
103 GstQueueEventResponse;
104
105 static void gst_queue_base_init (GstQueueClass * klass);
106 static void gst_queue_class_init (GstQueueClass * klass);
107 static void gst_queue_init (GstQueue * queue);
108 static void gst_queue_finalize (GObject * object);
109
110 static void gst_queue_set_property (GObject * object,
111     guint prop_id, const GValue * value, GParamSpec * pspec);
112 static void gst_queue_get_property (GObject * object,
113     guint prop_id, GValue * value, GParamSpec * pspec);
114
115 static void gst_queue_chain (GstPad * pad, GstData * data);
116 static GstData *gst_queue_get (GstPad * pad);
117
118 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
119 static gboolean gst_queue_handle_src_query (GstPad * pad,
120     GstQueryType type, GstFormat * fmt, gint64 * value);
121
122 static GstCaps *gst_queue_getcaps (GstPad * pad);
123 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
124 static void gst_queue_locked_flush (GstQueue * queue);
125
126 static GstElementStateReturn gst_queue_change_state (GstElement * element);
127 static gboolean gst_queue_release_locks (GstElement * element);
128
129
130 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
131
132 static GType
133 queue_leaky_get_type (void)
134 {
135   static GType queue_leaky_type = 0;
136   static GEnumValue queue_leaky[] = {
137     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
138     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
139     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
140     {0, NULL, NULL},
141   };
142
143   if (!queue_leaky_type) {
144     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
145   }
146   return queue_leaky_type;
147 }
148
149 static GstElementClass *parent_class = NULL;
150 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
151
152 GType
153 gst_queue_get_type (void)
154 {
155   static GType queue_type = 0;
156
157   if (!queue_type) {
158     static const GTypeInfo queue_info = {
159       sizeof (GstQueueClass),
160       (GBaseInitFunc) gst_queue_base_init,
161       NULL,
162       (GClassInitFunc) gst_queue_class_init,
163       NULL,
164       NULL,
165       sizeof (GstQueue),
166       0,
167       (GInstanceInitFunc) gst_queue_init,
168       NULL
169     };
170
171     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
172         "GstQueue", &queue_info, 0);
173     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
174         "dataflow inside the queue element");
175   }
176
177   return queue_type;
178 }
179
180 static void
181 gst_queue_base_init (GstQueueClass * klass)
182 {
183   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
184
185   gst_element_class_add_pad_template (gstelement_class,
186       gst_static_pad_template_get (&srctemplate));
187   gst_element_class_add_pad_template (gstelement_class,
188       gst_static_pad_template_get (&sinktemplate));
189   gst_element_class_set_details (gstelement_class, &gst_queue_details);
190 }
191
192 static void
193 gst_queue_class_init (GstQueueClass * klass)
194 {
195   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
196   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
197
198   parent_class = g_type_class_peek_parent (klass);
199
200   /* signals */
201   gst_queue_signals[SIGNAL_UNDERRUN] =
202       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
203       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
204       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
205   gst_queue_signals[SIGNAL_RUNNING] =
206       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
207       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
208       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
209   gst_queue_signals[SIGNAL_OVERRUN] =
210       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
211       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
212       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
213
214   /* properties */
215   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
216       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
217           "Current amount of data in the queue (bytes)",
218           0, G_MAXUINT, 0, G_PARAM_READABLE));
219   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
220       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
221           "Current number of buffers in the queue",
222           0, G_MAXUINT, 0, G_PARAM_READABLE));
223   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
224       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
225           "Current amount of data in the queue (in ns)",
226           0, G_MAXUINT64, 0, G_PARAM_READABLE));
227
228   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
229       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
230           "Max. amount of data in the queue (bytes, 0=disable)",
231           0, G_MAXUINT, 0, G_PARAM_READWRITE));
232   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
233       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
234           "Max. number of buffers in the queue (0=disable)",
235           0, G_MAXUINT, 0, G_PARAM_READWRITE));
236   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
237       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
238           "Max. amount of data in the queue (in ns, 0=disable)",
239           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
240
241   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
242       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
243           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
244           0, G_MAXUINT, 0, G_PARAM_READWRITE));
245   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
246       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
247           "Min. number of buffers in the queue to allow reading (0=disable)",
248           0, G_MAXUINT, 0, G_PARAM_READWRITE));
249   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
250       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
251           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
252           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
253
254   g_object_class_install_property (gobject_class, ARG_LEAKY,
255       g_param_spec_enum ("leaky", "Leaky",
256           "Where the queue leaks, if at all",
257           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
258   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
259       g_param_spec_boolean ("may_deadlock", "May Deadlock",
260           "The queue may deadlock if it's full and not PLAYING",
261           TRUE, G_PARAM_READWRITE));
262   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
263       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
264           "Nanoseconds until blocked queue times out and returns filler event. "
265           "Value of -1 disables timeout",
266           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
267
268   /* set several parent class virtual functions */
269   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
270   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
271   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
272
273   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
274   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
275 }
276
277 static void
278 gst_queue_init (GstQueue * queue)
279 {
280   /* scheduling on this kind of element is, well, interesting */
281   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
282   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
283
284   queue->sinkpad =
285       gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate),
286       "sink");
287   gst_pad_set_chain_function (queue->sinkpad,
288       GST_DEBUG_FUNCPTR (gst_queue_chain));
289   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
290   gst_pad_set_link_function (queue->sinkpad,
291       GST_DEBUG_FUNCPTR (gst_queue_link));
292   gst_pad_set_getcaps_function (queue->sinkpad,
293       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
294   gst_pad_set_active (queue->sinkpad, TRUE);
295
296   queue->srcpad =
297       gst_pad_new_from_template (gst_static_pad_template_get (&srctemplate),
298       "src");
299   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
300   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
301   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
302   gst_pad_set_getcaps_function (queue->srcpad,
303       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
304   gst_pad_set_event_function (queue->srcpad,
305       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
306   gst_pad_set_query_function (queue->srcpad,
307       GST_DEBUG_FUNCPTR (gst_queue_handle_src_query));
308   gst_pad_set_active (queue->srcpad, TRUE);
309
310   queue->cur_level.buffers = 0; /* no content */
311   queue->cur_level.bytes = 0;   /* no content */
312   queue->cur_level.time = 0;    /* no content */
313   queue->max_size.buffers = 100;        /* 100 buffers */
314   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
315   queue->max_size.time = GST_SECOND;    /* 1 s. */
316   queue->min_threshold.buffers = 0;     /* no threshold */
317   queue->min_threshold.bytes = 0;       /* no threshold */
318   queue->min_threshold.time = 0;        /* no threshold */
319
320   queue->leaky = GST_QUEUE_NO_LEAK;
321   queue->may_deadlock = TRUE;
322   queue->block_timeout = GST_CLOCK_TIME_NONE;
323   queue->interrupt = FALSE;
324   queue->flush = FALSE;
325
326   queue->qlock = g_mutex_new ();
327   queue->item_add = g_cond_new ();
328   queue->item_del = g_cond_new ();
329   queue->event_done = g_cond_new ();
330   queue->events = g_queue_new ();
331   queue->event_lock = g_mutex_new ();
332   queue->queue = g_queue_new ();
333
334   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
335       "initialized queue's not_empty & not_full conditions");
336 }
337
338 /* called only once, as opposed to dispose */
339 static void
340 gst_queue_finalize (GObject * object)
341 {
342   GstQueue *queue = GST_QUEUE (object);
343
344   GST_DEBUG_OBJECT (queue, "finalizing queue");
345
346   while (!g_queue_is_empty (queue->queue)) {
347     GstData *data = g_queue_pop_head (queue->queue);
348
349     gst_data_unref (data);
350   }
351   g_queue_free (queue->queue);
352   g_mutex_free (queue->qlock);
353   g_cond_free (queue->item_add);
354   g_cond_free (queue->item_del);
355   g_cond_free (queue->event_done);
356   g_mutex_lock (queue->event_lock);
357   while (!g_queue_is_empty (queue->events)) {
358     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
359
360     gst_event_unref (er->event);
361   }
362   g_mutex_unlock (queue->event_lock);
363   g_mutex_free (queue->event_lock);
364   g_queue_free (queue->events);
365
366   if (G_OBJECT_CLASS (parent_class)->finalize)
367     G_OBJECT_CLASS (parent_class)->finalize (object);
368 }
369
370 static GstCaps *
371 gst_queue_getcaps (GstPad * pad)
372 {
373   GstQueue *queue;
374
375   queue = GST_QUEUE (gst_pad_get_parent (pad));
376
377   if (queue->cur_level.bytes > 0) {
378     return gst_caps_copy (queue->negotiated_caps);
379   }
380
381   return gst_pad_proxy_getcaps (pad);
382 }
383
384 static GstPadLinkReturn
385 gst_queue_link (GstPad * pad, const GstCaps * caps)
386 {
387   GstQueue *queue;
388   GstPadLinkReturn link_ret;
389
390   queue = GST_QUEUE (gst_pad_get_parent (pad));
391
392   if (queue->cur_level.bytes > 0) {
393     if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
394       return GST_PAD_LINK_OK;
395     }
396     return GST_PAD_LINK_REFUSED;
397   }
398
399   link_ret = gst_pad_proxy_pad_link (pad, caps);
400
401   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
402     /* we store an extra copy of the negotiated caps, just in case
403      * the pads become unnegotiated while we have buffers */
404     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
405   }
406
407   return link_ret;
408 }
409
410 static void
411 gst_queue_locked_flush (GstQueue * queue)
412 {
413   while (!g_queue_is_empty (queue->queue)) {
414     GstData *data = g_queue_pop_head (queue->queue);
415
416     /* First loose the reference we added when putting that data in the queue */
417     gst_data_unref (data);
418     /* Then loose another reference because we are supposed to destroy that
419        data when flushing */
420     gst_data_unref (data);
421   }
422   queue->timeval = NULL;
423   queue->cur_level.buffers = 0;
424   queue->cur_level.bytes = 0;
425   queue->cur_level.time = 0;
426
427   /* make sure any pending buffers to be added are flushed too */
428   queue->flush = TRUE;
429
430   /* we deleted something... */
431   g_cond_signal (queue->item_del);
432 }
433
434 static void
435 gst_queue_handle_pending_events (GstQueue * queue)
436 {
437   /* check for events to send upstream */
438   /* g_queue_get_length is glib 2.4, so don't depend on it yet, use ->length */
439   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
440       "handling pending events, events queue of size %d",
441       queue->events->length);
442   g_mutex_lock (queue->event_lock);
443   while (!g_queue_is_empty (queue->events)) {
444     GstQueueEventResponse *er;
445
446     er = g_queue_pop_head (queue->events);
447
448     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
449         "sending event %p (%d) from event response %p upstream",
450         er->event, GST_EVENT_TYPE (er->event), er);
451     if (er->handled) {
452       /* change this to an assert when this file gets reviewed properly. */
453       GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
454           ("already handled event %p (%d) from event response %p upstream",
455               er->event, GST_EVENT_TYPE (er->event), er));
456       break;
457     }
458     g_mutex_unlock (queue->event_lock);
459     er->ret = gst_pad_event_default (queue->srcpad, er->event);
460     er->handled = TRUE;
461     g_cond_signal (queue->event_done);
462     g_mutex_lock (queue->event_lock);
463     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
464   }
465   g_mutex_unlock (queue->event_lock);
466 }
467
468 #define STATUS(queue, msg) \
469   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
470                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
471                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
472                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
473                       GST_DEBUG_PAD_NAME (pad), \
474                       queue->cur_level.buffers, \
475                       queue->min_threshold.buffers, \
476                       queue->max_size.buffers, \
477                       queue->cur_level.bytes, \
478                       queue->min_threshold.bytes, \
479                       queue->max_size.bytes, \
480                       queue->cur_level.time, \
481                       queue->min_threshold.time, \
482                       queue->max_size.time, \
483                       queue->queue->length)
484
485 static void
486 gst_queue_chain (GstPad * pad, GstData * data)
487 {
488   GstQueue *queue;
489
490   g_return_if_fail (pad != NULL);
491   g_return_if_fail (GST_IS_PAD (pad));
492   g_return_if_fail (data != NULL);
493
494   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
495
496 restart:
497   /* we have to lock the queue since we span threads */
498   GST_QUEUE_MUTEX_LOCK;
499
500   gst_queue_handle_pending_events (queue);
501
502   /* assume don't need to flush this buffer when the queue is filled */
503   queue->flush = FALSE;
504
505   if (GST_IS_EVENT (data)) {
506     switch (GST_EVENT_TYPE (data)) {
507       case GST_EVENT_FLUSH:
508         STATUS (queue, "received flush event");
509         gst_queue_locked_flush (queue);
510         STATUS (queue, "after flush");
511         break;
512       case GST_EVENT_EOS:
513         STATUS (queue, "received EOS");
514         break;
515       default:
516         /* we put the event in the queue, we don't have to act ourselves */
517         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
518             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
519         break;
520     }
521   }
522
523   if (GST_IS_BUFFER (data))
524     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
525         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
526
527   /* We make space available if we're "full" according to whatever
528    * the user defined as "full". Note that this only applies to buffers.
529    * We always handle events and they don't count in our statistics. */
530   if (GST_IS_BUFFER (data) &&
531       ((queue->max_size.buffers > 0 &&
532               queue->cur_level.buffers >= queue->max_size.buffers) ||
533           (queue->max_size.bytes > 0 &&
534               queue->cur_level.bytes >= queue->max_size.bytes) ||
535           (queue->max_size.time > 0 &&
536               queue->cur_level.time >= queue->max_size.time))) {
537     GST_QUEUE_MUTEX_UNLOCK;
538     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
539     GST_QUEUE_MUTEX_LOCK;
540
541     /* how are we going to make space for this buffer? */
542     switch (queue->leaky) {
543         /* leak current buffer */
544       case GST_QUEUE_LEAK_UPSTREAM:
545         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
546             "queue is full, leaking buffer on upstream end");
547         /* now we can clean up and exit right away */
548         GST_QUEUE_MUTEX_UNLOCK;
549         goto out_unref;
550
551         /* leak first buffer in the queue */
552       case GST_QUEUE_LEAK_DOWNSTREAM:{
553         /* this is a bit hacky. We'll manually iterate the list
554          * and find the first buffer from the head on. We'll
555          * unref that and "fix up" the GQueue object... */
556         GList *item;
557         GstData *leak = NULL;
558
559         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
560             "queue is full, leaking buffer on downstream end");
561
562         for (item = queue->queue->head; item != NULL; item = item->next) {
563           if (GST_IS_BUFFER (item->data)) {
564             leak = item->data;
565             break;
566           }
567         }
568
569         /* if we didn't find anything, it means we have no buffers
570          * in here. That cannot happen, since we had >= 1 bufs */
571         g_assert (leak);
572
573         /* Now remove it from the list, fixing up the GQueue
574          * CHECKME: is a queue->head the first or the last item? */
575         item = g_list_delete_link (queue->queue->head, item);
576         queue->queue->head = g_list_first (item);
577         queue->queue->tail = g_list_last (item);
578         queue->queue->length--;
579
580         /* and unref the data at the end. Twice, because we keep a ref
581          * to make things read-only. Also keep our list uptodate. */
582         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
583         queue->cur_level.buffers--;
584         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
585           queue->cur_level.time -= GST_BUFFER_DURATION (data);
586
587         gst_data_unref (data);
588         gst_data_unref (data);
589         break;
590       }
591
592       default:
593         g_warning ("Unknown leaky type, using default");
594         /* fall-through */
595
596         /* don't leak. Instead, wait for space to be available */
597       case GST_QUEUE_NO_LEAK:
598         STATUS (queue, "pre-full wait");
599
600         while ((queue->max_size.buffers > 0 &&
601                 queue->cur_level.buffers >= queue->max_size.buffers) ||
602             (queue->max_size.bytes > 0 &&
603                 queue->cur_level.bytes >= queue->max_size.bytes) ||
604             (queue->max_size.time > 0 &&
605                 queue->cur_level.time >= queue->max_size.time)) {
606           /* if there's a pending state change for this queue
607            * or its manager, switch back to iterator so bottom
608            * half of state change executes */
609           if (queue->interrupt) {
610             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
611             GST_QUEUE_MUTEX_UNLOCK;
612             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
613                     GST_ELEMENT (queue))) {
614               goto out_unref;
615             }
616             /* if we got here because we were unlocked after a
617              * flush, we don't need to add the buffer to the
618              * queue again */
619             if (queue->flush) {
620               GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
621                   "not adding pending buffer after flush");
622               goto out_unref;
623             }
624             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
625                 "adding pending buffer after interrupt");
626             goto restart;
627           }
628
629           if (GST_STATE (queue) != GST_STATE_PLAYING) {
630             /* this means the other end is shut down. Try to
631              * signal to resolve the error */
632             if (!queue->may_deadlock) {
633               GST_QUEUE_MUTEX_UNLOCK;
634               gst_data_unref (data);
635               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
636                   ("deadlock found, shutting down source pad elements"));
637               /* we don't go to out_unref here, since we want to
638                * unref the buffer *before* calling GST_ELEMENT_ERROR */
639               return;
640             } else {
641               GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
642                   "%s: waiting for the app to restart "
643                   "source pad elements", GST_ELEMENT_NAME (queue));
644             }
645           }
646
647           /* OK, we've got a serious issue here. Imagine the situation
648            * where the puller (next element) is sending an event here,
649            * so it cannot pull events from the queue, and we cannot
650            * push data further because the queue is 'full' and therefore,
651            * we wait here (and do not handle events): deadlock! to solve
652            * that, we handle pending upstream events here, too. */
653           gst_queue_handle_pending_events (queue);
654
655           STATUS (queue, "waiting for item_del signal from thread using qlock");
656           g_cond_wait (queue->item_del, queue->qlock);
657           STATUS (queue, "received item_del signal from thread using qlock");
658         }
659
660         STATUS (queue, "post-full wait");
661         GST_QUEUE_MUTEX_UNLOCK;
662         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
663         GST_QUEUE_MUTEX_LOCK;
664         break;
665     }
666   }
667
668   /* put the buffer on the tail of the list. We keep a reference,
669    * so that the data is read-only while in here. There's a good
670    * reason to do so: we have a size and time counter, and any
671    * modification to the content could change any of the two. */
672   gst_data_ref (data);
673   g_queue_push_tail (queue->queue, data);
674
675   /* Note that we only add buffers (not events) to the statistics */
676   if (GST_IS_BUFFER (data)) {
677     queue->cur_level.buffers++;
678     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
679     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
680       queue->cur_level.time += GST_BUFFER_DURATION (data);
681   }
682
683   STATUS (queue, "+ level");
684
685   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
686   g_cond_signal (queue->item_add);
687   GST_QUEUE_MUTEX_UNLOCK;
688
689   return;
690
691 out_unref:
692   gst_data_unref (data);
693   return;
694 }
695
696 static GstData *
697 gst_queue_get (GstPad * pad)
698 {
699   GstQueue *queue;
700   GstData *data;
701
702   g_return_val_if_fail (pad != NULL, NULL);
703   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
704
705   queue = GST_QUEUE (gst_pad_get_parent (pad));
706
707 restart:
708   /* have to lock for thread-safety */
709   GST_QUEUE_MUTEX_LOCK;
710
711   if (queue->queue->length == 0 ||
712       (queue->min_threshold.buffers > 0 &&
713           queue->cur_level.buffers < queue->min_threshold.buffers) ||
714       (queue->min_threshold.bytes > 0 &&
715           queue->cur_level.bytes < queue->min_threshold.bytes) ||
716       (queue->min_threshold.time > 0 &&
717           queue->cur_level.time < queue->min_threshold.time)) {
718     GST_QUEUE_MUTEX_UNLOCK;
719     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
720     GST_QUEUE_MUTEX_LOCK;
721
722     STATUS (queue, "pre-empty wait");
723     while (queue->queue->length == 0 ||
724         (queue->min_threshold.buffers > 0 &&
725             queue->cur_level.buffers < queue->min_threshold.buffers) ||
726         (queue->min_threshold.bytes > 0 &&
727             queue->cur_level.bytes < queue->min_threshold.bytes) ||
728         (queue->min_threshold.time > 0 &&
729             queue->cur_level.time < queue->min_threshold.time)) {
730       /* if there's a pending state change for this queue or its
731        * manager, switch back to iterator so bottom half of state
732        * change executes. */
733       if (queue->interrupt) {
734         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
735         GST_QUEUE_MUTEX_UNLOCK;
736         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
737                 GST_ELEMENT (queue)))
738           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
739         goto restart;
740       }
741       if (GST_STATE (queue) != GST_STATE_PLAYING) {
742         /* this means the other end is shut down */
743         if (!queue->may_deadlock) {
744           GST_QUEUE_MUTEX_UNLOCK;
745           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
746               ("deadlock found, shutting down sink pad elements"));
747           goto restart;
748         } else {
749           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
750               "%s: waiting for the app to restart "
751               "source pad elements", GST_ELEMENT_NAME (queue));
752         }
753       }
754
755       STATUS (queue, "waiting for item_add");
756
757       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
758         GTimeVal timeout;
759
760         g_get_current_time (&timeout);
761         g_time_val_add (&timeout, queue->block_timeout / 1000);
762         GST_LOG_OBJECT (queue, "g_cond_time_wait using qlock from thread %p",
763             g_thread_self ());
764         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
765           GST_QUEUE_MUTEX_UNLOCK;
766           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
767               "Sending filler event");
768           return GST_DATA (gst_event_new_filler ());
769         }
770       } else {
771         GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
772             g_thread_self ());
773         g_cond_wait (queue->item_add, queue->qlock);
774         GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
775             g_thread_self ());
776       }
777       STATUS (queue, "got item_add signal");
778     }
779
780     STATUS (queue, "post-empty wait");
781     GST_QUEUE_MUTEX_UNLOCK;
782     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
783     GST_QUEUE_MUTEX_LOCK;
784   }
785
786   /* There's something in the list now, whatever it is */
787   data = g_queue_pop_head (queue->queue);
788   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
789       "retrieved data %p from queue", data);
790
791   if (data == NULL)
792     return NULL;
793
794   if (GST_IS_BUFFER (data)) {
795     /* Update statistics */
796     queue->cur_level.buffers--;
797     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
798     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
799       queue->cur_level.time -= GST_BUFFER_DURATION (data);
800   }
801
802   /* Now that we're done, we can lose our own reference to
803    * the item, since we're no longer in danger. */
804   gst_data_unref (data);
805
806   STATUS (queue, "after _get()");
807
808   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
809   g_cond_signal (queue->item_del);
810   GST_QUEUE_MUTEX_UNLOCK;
811
812   /* FIXME: I suppose this needs to be locked, since the EOS
813    * bit affects the pipeline state. However, that bit is
814    * locked too so it'd cause a deadlock. */
815   if (GST_IS_EVENT (data)) {
816     GstEvent *event = GST_EVENT (data);
817
818     switch (GST_EVENT_TYPE (event)) {
819       case GST_EVENT_EOS:
820         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
821             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
822         gst_element_set_eos (GST_ELEMENT (queue));
823         break;
824       default:
825         break;
826     }
827   }
828
829   return data;
830 }
831
832
833 static gboolean
834 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
835 {
836   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
837   gboolean res;
838
839   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
840       event, GST_EVENT_TYPE (event));
841   GST_QUEUE_MUTEX_LOCK;
842
843   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
844     GstQueueEventResponse er;
845
846     /* push the event to the queue and wait for upstream consumption */
847     er.event = event;
848     er.handled = FALSE;
849     g_mutex_lock (queue->event_lock);
850     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
851         "putting event %p (%d) on internal queue", event,
852         GST_EVENT_TYPE (event));
853     g_queue_push_tail (queue->events, &er);
854     g_mutex_unlock (queue->event_lock);
855     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
856         "Preparing for loop for event handler");
857     /* see the chain function on why this is here - it prevents a deadlock */
858     g_cond_signal (queue->item_del);
859     while (!er.handled) {
860       GTimeVal timeout;
861
862       g_get_current_time (&timeout);
863       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
864       GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
865           g_thread_self ());
866       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
867           !er.handled) {
868         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
869             "timeout in upstream event handling, dropping event %p (%d)",
870             er.event, GST_EVENT_TYPE (er.event));
871         g_mutex_lock (queue->event_lock);
872         /* since this queue is for src events (ie upstream), this thread is
873          * the only one that is pushing stuff on it, so we're sure that
874          * it's still the tail element.  FIXME: But in practice, we should use
875          * GList instead of GQueue for this so we can remove any element in
876          * the list. */
877         g_queue_pop_tail (queue->events);
878         g_mutex_unlock (queue->event_lock);
879         gst_event_unref (er.event);
880         res = FALSE;
881         goto handled;
882       }
883     }
884     GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
885     res = er.ret;
886   } else {
887     res = gst_pad_event_default (pad, event);
888
889     switch (GST_EVENT_TYPE (event)) {
890       case GST_EVENT_FLUSH:
891         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
892             "FLUSH event, flushing queue\n");
893         gst_queue_locked_flush (queue);
894         break;
895       case GST_EVENT_SEEK:
896         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
897           gst_queue_locked_flush (queue);
898         }
899       default:
900         break;
901     }
902   }
903 handled:
904   GST_QUEUE_MUTEX_UNLOCK;
905
906   return res;
907 }
908
909 static gboolean
910 gst_queue_handle_src_query (GstPad * pad,
911     GstQueryType type, GstFormat * fmt, gint64 * value)
912 {
913   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
914   gboolean res;
915
916   res = gst_pad_query (GST_PAD_PEER (queue->sinkpad), type, fmt, value);
917   if (!res)
918     return FALSE;
919
920   if (type == GST_QUERY_POSITION) {
921     /* FIXME: this code assumes that there's no discont in the queue */
922     switch (*fmt) {
923       case GST_FORMAT_BYTES:
924         *value -= queue->cur_level.bytes;
925         break;
926       case GST_FORMAT_TIME:
927         *value -= queue->cur_level.time;
928         break;
929       default:
930         /* FIXME */
931         break;
932     }
933   }
934
935   return TRUE;
936 }
937
938 static gboolean
939 gst_queue_release_locks (GstElement * element)
940 {
941   GstQueue *queue;
942
943   queue = GST_QUEUE (element);
944
945   GST_QUEUE_MUTEX_LOCK;
946   queue->interrupt = TRUE;
947   g_cond_signal (queue->item_add);
948   g_cond_signal (queue->item_del);
949   GST_QUEUE_MUTEX_UNLOCK;
950
951   return TRUE;
952 }
953
954 static GstElementStateReturn
955 gst_queue_change_state (GstElement * element)
956 {
957   GstQueue *queue;
958   GstElementStateReturn ret = GST_STATE_SUCCESS;
959
960   queue = GST_QUEUE (element);
961
962   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
963
964   /* lock the queue so another thread (not in sync with this thread's state)
965    * can't call this queue's _get (or whatever)
966    */
967   GST_QUEUE_MUTEX_LOCK;
968
969   switch (GST_STATE_TRANSITION (element)) {
970     case GST_STATE_NULL_TO_READY:
971       gst_queue_locked_flush (queue);
972       break;
973     case GST_STATE_PAUSED_TO_PLAYING:
974       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
975         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
976             "queue %s is not linked", GST_ELEMENT_NAME (queue));
977         /* FIXME can this be? */
978         g_cond_signal (queue->item_add);
979
980         ret = GST_STATE_FAILURE;
981         goto unlock;
982       } else {
983         GstScheduler *src_sched, *sink_sched;
984
985         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
986         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
987
988         if (src_sched == sink_sched) {
989           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
990               "queue %s does not connect different schedulers",
991               GST_ELEMENT_NAME (queue));
992
993           g_warning ("queue %s does not connect different schedulers",
994               GST_ELEMENT_NAME (queue));
995
996           ret = GST_STATE_FAILURE;
997           goto unlock;
998         }
999       }
1000       queue->interrupt = FALSE;
1001       break;
1002     case GST_STATE_PAUSED_TO_READY:
1003       gst_queue_locked_flush (queue);
1004       gst_caps_replace (&queue->negotiated_caps, NULL);
1005       break;
1006     default:
1007       break;
1008   }
1009
1010   if (GST_ELEMENT_CLASS (parent_class)->change_state)
1011     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
1012
1013   /* this is an ugly hack to make sure our pads are always active.
1014    * Reason for this is that pad activation for the queue element
1015    * depends on 2 schedulers (ugh) */
1016   gst_pad_set_active (queue->sinkpad, TRUE);
1017   gst_pad_set_active (queue->srcpad, TRUE);
1018
1019 unlock:
1020   GST_QUEUE_MUTEX_UNLOCK;
1021
1022   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
1023
1024   return ret;
1025 }
1026
1027
1028 static void
1029 gst_queue_set_property (GObject * object,
1030     guint prop_id, const GValue * value, GParamSpec * pspec)
1031 {
1032   GstQueue *queue = GST_QUEUE (object);
1033
1034   /* someone could change levels here, and since this
1035    * affects the get/put funcs, we need to lock for safety. */
1036   GST_QUEUE_MUTEX_LOCK;
1037
1038   switch (prop_id) {
1039     case ARG_MAX_SIZE_BYTES:
1040       queue->max_size.bytes = g_value_get_uint (value);
1041       break;
1042     case ARG_MAX_SIZE_BUFFERS:
1043       queue->max_size.buffers = g_value_get_uint (value);
1044       break;
1045     case ARG_MAX_SIZE_TIME:
1046       queue->max_size.time = g_value_get_uint64 (value);
1047       break;
1048     case ARG_MIN_THRESHOLD_BYTES:
1049       queue->min_threshold.bytes = g_value_get_uint (value);
1050       break;
1051     case ARG_MIN_THRESHOLD_BUFFERS:
1052       queue->min_threshold.buffers = g_value_get_uint (value);
1053       break;
1054     case ARG_MIN_THRESHOLD_TIME:
1055       queue->min_threshold.time = g_value_get_uint64 (value);
1056       break;
1057     case ARG_LEAKY:
1058       queue->leaky = g_value_get_enum (value);
1059       break;
1060     case ARG_MAY_DEADLOCK:
1061       queue->may_deadlock = g_value_get_boolean (value);
1062       break;
1063     case ARG_BLOCK_TIMEOUT:
1064       queue->block_timeout = g_value_get_uint64 (value);
1065       break;
1066     default:
1067       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1068       break;
1069   }
1070
1071   GST_QUEUE_MUTEX_UNLOCK;
1072 }
1073
1074 static void
1075 gst_queue_get_property (GObject * object,
1076     guint prop_id, GValue * value, GParamSpec * pspec)
1077 {
1078   GstQueue *queue = GST_QUEUE (object);
1079
1080   switch (prop_id) {
1081     case ARG_CUR_LEVEL_BYTES:
1082       g_value_set_uint (value, queue->cur_level.bytes);
1083       break;
1084     case ARG_CUR_LEVEL_BUFFERS:
1085       g_value_set_uint (value, queue->cur_level.buffers);
1086       break;
1087     case ARG_CUR_LEVEL_TIME:
1088       g_value_set_uint64 (value, queue->cur_level.time);
1089       break;
1090     case ARG_MAX_SIZE_BYTES:
1091       g_value_set_uint (value, queue->max_size.bytes);
1092       break;
1093     case ARG_MAX_SIZE_BUFFERS:
1094       g_value_set_uint (value, queue->max_size.buffers);
1095       break;
1096     case ARG_MAX_SIZE_TIME:
1097       g_value_set_uint64 (value, queue->max_size.time);
1098       break;
1099     case ARG_MIN_THRESHOLD_BYTES:
1100       g_value_set_uint (value, queue->min_threshold.bytes);
1101       break;
1102     case ARG_MIN_THRESHOLD_BUFFERS:
1103       g_value_set_uint (value, queue->min_threshold.buffers);
1104       break;
1105     case ARG_MIN_THRESHOLD_TIME:
1106       g_value_set_uint64 (value, queue->min_threshold.time);
1107       break;
1108     case ARG_LEAKY:
1109       g_value_set_enum (value, queue->leaky);
1110       break;
1111     case ARG_MAY_DEADLOCK:
1112       g_value_set_boolean (value, queue->may_deadlock);
1113       break;
1114     case ARG_BLOCK_TIMEOUT:
1115       g_value_set_uint64 (value, queue->block_timeout);
1116       break;
1117     default:
1118       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1119       break;
1120   }
1121 }