use ->length
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
34
35 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
36     "Generic",
37     "Simple data queue",
38     "Erik Walthinsen <omega@cse.ogi.edu>");
39
40
41 /* Queue signals and args */
42 enum
43 {
44   SIGNAL_UNDERRUN,
45   SIGNAL_RUNNING,
46   SIGNAL_OVERRUN,
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   ARG_0,
53   /* FIXME: don't we have another way of doing this
54    * "Gstreamer format" (frame/byte/time) queries? */
55   ARG_CUR_LEVEL_BUFFERS,
56   ARG_CUR_LEVEL_BYTES,
57   ARG_CUR_LEVEL_TIME,
58   ARG_MAX_SIZE_BUFFERS,
59   ARG_MAX_SIZE_BYTES,
60   ARG_MAX_SIZE_TIME,
61   ARG_MIN_THRESHOLD_BUFFERS,
62   ARG_MIN_THRESHOLD_BYTES,
63   ARG_MIN_THRESHOLD_TIME,
64   ARG_LEAKY,
65   ARG_MAY_DEADLOCK,
66   ARG_BLOCK_TIMEOUT
67       /* FILL ME */
68 };
69
70 typedef struct _GstQueueEventResponse
71 {
72   GstEvent *event;
73   gboolean ret, handled;
74 }
75 GstQueueEventResponse;
76
77 static void gst_queue_base_init (GstQueueClass * klass);
78 static void gst_queue_class_init (GstQueueClass * klass);
79 static void gst_queue_init (GstQueue * queue);
80 static void gst_queue_dispose (GObject * object);
81
82 static void gst_queue_set_property (GObject * object,
83     guint prop_id, const GValue * value, GParamSpec * pspec);
84 static void gst_queue_get_property (GObject * object,
85     guint prop_id, GValue * value, GParamSpec * pspec);
86
87 static void gst_queue_chain (GstPad * pad, GstData * data);
88 static GstData *gst_queue_get (GstPad * pad);
89
90 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
91
92 static GstCaps *gst_queue_getcaps (GstPad * pad);
93 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
94 static void gst_queue_locked_flush (GstQueue * queue);
95
96 static GstElementStateReturn gst_queue_change_state (GstElement * element);
97 static gboolean gst_queue_release_locks (GstElement * element);
98
99
100 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
101
102 static GType
103 queue_leaky_get_type (void)
104 {
105   static GType queue_leaky_type = 0;
106   static GEnumValue queue_leaky[] = {
107     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
108     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
109     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
110     {0, NULL, NULL},
111   };
112
113   if (!queue_leaky_type) {
114     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
115   }
116   return queue_leaky_type;
117 }
118
119 static GstElementClass *parent_class = NULL;
120 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
121
122 GType
123 gst_queue_get_type (void)
124 {
125   static GType queue_type = 0;
126
127   if (!queue_type) {
128     static const GTypeInfo queue_info = {
129       sizeof (GstQueueClass),
130       (GBaseInitFunc) gst_queue_base_init,
131       NULL,
132       (GClassInitFunc) gst_queue_class_init,
133       NULL,
134       NULL,
135       sizeof (GstQueue),
136       0,
137       (GInstanceInitFunc) gst_queue_init,
138       NULL
139     };
140
141     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
142         "GstQueue", &queue_info, 0);
143     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
144         "dataflow inside the queue element");
145   }
146
147   return queue_type;
148 }
149
150 static void
151 gst_queue_base_init (GstQueueClass * klass)
152 {
153   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
154
155   gst_element_class_set_details (gstelement_class, &gst_queue_details);
156 }
157
158 static void
159 gst_queue_class_init (GstQueueClass * klass)
160 {
161   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
162   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
163
164   parent_class = g_type_class_peek_parent (klass);
165
166   /* signals */
167   gst_queue_signals[SIGNAL_UNDERRUN] =
168       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171   gst_queue_signals[SIGNAL_RUNNING] =
172       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
173       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
174       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
175   gst_queue_signals[SIGNAL_OVERRUN] =
176       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
177       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
178       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
179
180   /* properties */
181   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
182       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
183           "Current amount of data in the queue (bytes)",
184           0, G_MAXUINT, 0, G_PARAM_READABLE));
185   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
186       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
187           "Current number of buffers in the queue",
188           0, G_MAXUINT, 0, G_PARAM_READABLE));
189   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
190       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
191           "Current amount of data in the queue (in ns)",
192           0, G_MAXUINT64, 0, G_PARAM_READABLE));
193
194   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
195       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
196           "Max. amount of data in the queue (bytes, 0=disable)",
197           0, G_MAXUINT, 0, G_PARAM_READWRITE));
198   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
199       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
200           "Max. number of buffers in the queue (0=disable)",
201           0, G_MAXUINT, 0, G_PARAM_READWRITE));
202   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of data in the queue (in ns, 0=disable)",
205           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
206
207   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
208       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
209           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
210           0, G_MAXUINT, 0, G_PARAM_READWRITE));
211   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
212       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
213           "Min. number of buffers in the queue to allow reading (0=disable)",
214           0, G_MAXUINT, 0, G_PARAM_READWRITE));
215   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
216       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
217           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
218           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
219
220   g_object_class_install_property (gobject_class, ARG_LEAKY,
221       g_param_spec_enum ("leaky", "Leaky",
222           "Where the queue leaks, if at all",
223           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
224   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
225       g_param_spec_boolean ("may_deadlock", "May Deadlock",
226           "The queue may deadlock if it's full and not PLAYING",
227           TRUE, G_PARAM_READWRITE));
228   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
229       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
230           "Nanoseconds until blocked queue times out and returns filler event. "
231           "Value of -1 disables timeout",
232           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
233
234   /* set several parent class virtual functions */
235   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
236   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
237   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
238
239   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
240   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
241 }
242
243 static void
244 gst_queue_init (GstQueue * queue)
245 {
246   /* scheduling on this kind of element is, well, interesting */
247   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
248   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
249
250   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
251   gst_pad_set_chain_function (queue->sinkpad,
252       GST_DEBUG_FUNCPTR (gst_queue_chain));
253   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
254   gst_pad_set_link_function (queue->sinkpad,
255       GST_DEBUG_FUNCPTR (gst_queue_link));
256   gst_pad_set_getcaps_function (queue->sinkpad,
257       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
258   gst_pad_set_active (queue->sinkpad, TRUE);
259
260   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
261   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
262   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
263   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
264   gst_pad_set_getcaps_function (queue->srcpad,
265       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
266   gst_pad_set_event_function (queue->srcpad,
267       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
268   gst_pad_set_active (queue->srcpad, TRUE);
269
270   queue->cur_level.buffers = 0; /* no content */
271   queue->cur_level.bytes = 0;   /* no content */
272   queue->cur_level.time = 0;    /* no content */
273   queue->max_size.buffers = 100;        /* 100 buffers */
274   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
275   queue->max_size.time = GST_SECOND;    /* 1 s. */
276   queue->min_threshold.buffers = 0;     /* no threshold */
277   queue->min_threshold.bytes = 0;       /* no threshold */
278   queue->min_threshold.time = 0;        /* no threshold */
279
280   queue->leaky = GST_QUEUE_NO_LEAK;
281   queue->may_deadlock = TRUE;
282   queue->block_timeout = GST_CLOCK_TIME_NONE;
283   queue->interrupt = FALSE;
284   queue->flush = FALSE;
285
286   queue->qlock = g_mutex_new ();
287   queue->item_add = g_cond_new ();
288   queue->item_del = g_cond_new ();
289   queue->event_done = g_cond_new ();
290   queue->events = g_queue_new ();
291   queue->event_lock = g_mutex_new ();
292   queue->queue = g_queue_new ();
293
294   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
295       "initialized queue's not_empty & not_full conditions");
296 }
297
298 static void
299 gst_queue_dispose (GObject * object)
300 {
301   GstQueue *queue = GST_QUEUE (object);
302
303   gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
304
305   while (!g_queue_is_empty (queue->queue)) {
306     GstData *data = g_queue_pop_head (queue->queue);
307
308     gst_data_unref (data);
309   }
310   g_queue_free (queue->queue);
311   g_mutex_free (queue->qlock);
312   g_cond_free (queue->item_add);
313   g_cond_free (queue->item_del);
314   g_cond_free (queue->event_done);
315   g_mutex_lock (queue->event_lock);
316   while (!g_queue_is_empty (queue->events)) {
317     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
318
319     gst_event_unref (er->event);
320   }
321   g_mutex_unlock (queue->event_lock);
322   g_mutex_free (queue->event_lock);
323   g_queue_free (queue->events);
324
325   if (G_OBJECT_CLASS (parent_class)->dispose)
326     G_OBJECT_CLASS (parent_class)->dispose (object);
327 }
328
329 static GstCaps *
330 gst_queue_getcaps (GstPad * pad)
331 {
332   GstQueue *queue;
333
334   queue = GST_QUEUE (gst_pad_get_parent (pad));
335
336   if (queue->cur_level.bytes > 0) {
337     return gst_caps_copy (queue->negotiated_caps);
338   }
339
340   return gst_pad_proxy_getcaps (pad);
341 }
342
343 static GstPadLinkReturn
344 gst_queue_link (GstPad * pad, const GstCaps * caps)
345 {
346   GstQueue *queue;
347   GstPadLinkReturn link_ret;
348
349   queue = GST_QUEUE (gst_pad_get_parent (pad));
350
351   if (queue->cur_level.bytes > 0) {
352     if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
353       return GST_PAD_LINK_OK;
354     }
355     return GST_PAD_LINK_REFUSED;
356   }
357
358   link_ret = gst_pad_proxy_pad_link (pad, caps);
359
360   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
361     /* we store an extra copy of the negotiated caps, just in case
362      * the pads become unnegotiated while we have buffers */
363     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
364   }
365
366   return link_ret;
367 }
368
369 static void
370 gst_queue_locked_flush (GstQueue * queue)
371 {
372   while (!g_queue_is_empty (queue->queue)) {
373     GstData *data = g_queue_pop_head (queue->queue);
374
375     /* First loose the reference we added when putting that data in the queue */
376     gst_data_unref (data);
377     /* Then loose another reference because we are supposed to destroy that
378        data when flushing */
379     gst_data_unref (data);
380   }
381   queue->timeval = NULL;
382   queue->cur_level.buffers = 0;
383   queue->cur_level.bytes = 0;
384   queue->cur_level.time = 0;
385
386   /* make sure any pending buffers to be added are flushed too */
387   queue->flush = TRUE;
388
389   /* we deleted something... */
390   g_cond_signal (queue->item_del);
391 }
392
393 static void
394 gst_queue_handle_pending_events (GstQueue * queue)
395 {
396   /* check for events to send upstream */
397   /* g_queue_get_length is glib 2.4, so don't depend on it yet, use ->length */
398   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
399       "handling pending events, events queue of size %d",
400       queue->events->length);
401   g_mutex_lock (queue->event_lock);
402   while (!g_queue_is_empty (queue->events)) {
403     GstQueueEventResponse *er;
404
405     er = g_queue_pop_head (queue->events);
406
407     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
408         "sending event %p (%d) from event response %p upstream",
409         er->event, GST_EVENT_TYPE (er->event), er);
410     if (er->handled) {
411       /* change this to an assert when this file gets reviewed properly. */
412       GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
413           ("already handled event %p (%d) from event response %p upstream",
414               er->event, GST_EVENT_TYPE (er->event), er));
415       break;
416     }
417     g_mutex_unlock (queue->event_lock);
418     er->ret = gst_pad_event_default (queue->srcpad, er->event);
419     er->handled = TRUE;
420     g_cond_signal (queue->event_done);
421     g_mutex_lock (queue->event_lock);
422     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
423   }
424   g_mutex_unlock (queue->event_lock);
425 }
426
427 #define STATUS(queue, msg) \
428   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
429                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
430                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
431                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
432                       GST_DEBUG_PAD_NAME (pad), \
433                       queue->cur_level.buffers, \
434                       queue->min_threshold.buffers, \
435                       queue->max_size.buffers, \
436                       queue->cur_level.bytes, \
437                       queue->min_threshold.bytes, \
438                       queue->max_size.bytes, \
439                       queue->cur_level.time, \
440                       queue->min_threshold.time, \
441                       queue->max_size.time, \
442                       queue->queue->length)
443
444 static void
445 gst_queue_chain (GstPad * pad, GstData * data)
446 {
447   GstQueue *queue;
448
449   g_return_if_fail (pad != NULL);
450   g_return_if_fail (GST_IS_PAD (pad));
451   g_return_if_fail (data != NULL);
452
453   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
454
455 restart:
456   /* we have to lock the queue since we span threads */
457   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locking t:%p", g_thread_self ());
458   g_mutex_lock (queue->qlock);
459   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locked t:%p", g_thread_self ());
460
461   gst_queue_handle_pending_events (queue);
462
463   /* assume don't need to flush this buffer when the queue is filled */
464   queue->flush = FALSE;
465
466   if (GST_IS_EVENT (data)) {
467     switch (GST_EVENT_TYPE (data)) {
468       case GST_EVENT_FLUSH:
469         STATUS (queue, "received flush event");
470         gst_queue_locked_flush (queue);
471         STATUS (queue, "after flush");
472         break;
473       case GST_EVENT_EOS:
474         STATUS (queue, "received EOS");
475         break;
476       default:
477         /* we put the event in the queue, we don't have to act ourselves */
478         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
479             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
480         break;
481     }
482   }
483
484   if (GST_IS_BUFFER (data))
485     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
486         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
487
488   /* We make space available if we're "full" according to whatever
489    * the user defined as "full". Note that this only applies to buffers.
490    * We always handle events and they don't count in our statistics. */
491   if (GST_IS_BUFFER (data) &&
492       ((queue->max_size.buffers > 0 &&
493               queue->cur_level.buffers >= queue->max_size.buffers) ||
494           (queue->max_size.bytes > 0 &&
495               queue->cur_level.bytes >= queue->max_size.bytes) ||
496           (queue->max_size.time > 0 &&
497               queue->cur_level.time >= queue->max_size.time))) {
498     g_mutex_unlock (queue->qlock);
499     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
500     g_mutex_lock (queue->qlock);
501
502     /* how are we going to make space for this buffer? */
503     switch (queue->leaky) {
504         /* leak current buffer */
505       case GST_QUEUE_LEAK_UPSTREAM:
506         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
507             "queue is full, leaking buffer on upstream end");
508         /* now we can clean up and exit right away */
509         g_mutex_unlock (queue->qlock);
510         goto out_unref;
511
512         /* leak first buffer in the queue */
513       case GST_QUEUE_LEAK_DOWNSTREAM:{
514         /* this is a bit hacky. We'll manually iterate the list
515          * and find the first buffer from the head on. We'll
516          * unref that and "fix up" the GQueue object... */
517         GList *item;
518         GstData *leak = NULL;
519
520         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
521             "queue is full, leaking buffer on downstream end");
522
523         for (item = queue->queue->head; item != NULL; item = item->next) {
524           if (GST_IS_BUFFER (item->data)) {
525             leak = item->data;
526             break;
527           }
528         }
529
530         /* if we didn't find anything, it means we have no buffers
531          * in here. That cannot happen, since we had >= 1 bufs */
532         g_assert (leak);
533
534         /* Now remove it from the list, fixing up the GQueue
535          * CHECKME: is a queue->head the first or the last item? */
536         item = g_list_delete_link (queue->queue->head, item);
537         queue->queue->head = g_list_first (item);
538         queue->queue->tail = g_list_last (item);
539         queue->queue->length--;
540
541         /* and unref the data at the end. Twice, because we keep a ref
542          * to make things read-only. Also keep our list uptodate. */
543         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
544         queue->cur_level.buffers--;
545         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
546           queue->cur_level.time -= GST_BUFFER_DURATION (data);
547
548         gst_data_unref (data);
549         gst_data_unref (data);
550         break;
551       }
552
553       default:
554         g_warning ("Unknown leaky type, using default");
555         /* fall-through */
556
557         /* don't leak. Instead, wait for space to be available */
558       case GST_QUEUE_NO_LEAK:
559         STATUS (queue, "pre-full wait");
560
561         while ((queue->max_size.buffers > 0 &&
562                 queue->cur_level.buffers >= queue->max_size.buffers) ||
563             (queue->max_size.bytes > 0 &&
564                 queue->cur_level.bytes >= queue->max_size.bytes) ||
565             (queue->max_size.time > 0 &&
566                 queue->cur_level.time >= queue->max_size.time)) {
567           /* if there's a pending state change for this queue
568            * or its manager, switch back to iterator so bottom
569            * half of state change executes */
570           if (queue->interrupt) {
571             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
572             g_mutex_unlock (queue->qlock);
573             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
574                     GST_ELEMENT (queue))) {
575               goto out_unref;
576             }
577             /* if we got here because we were unlocked after a
578              * flush, we don't need to add the buffer to the
579              * queue again */
580             if (queue->flush) {
581               GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
582                   "not adding pending buffer after flush");
583               goto out_unref;
584             }
585             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
586                 "adding pending buffer after interrupt");
587             goto restart;
588           }
589
590           if (GST_STATE (queue) != GST_STATE_PLAYING) {
591             /* this means the other end is shut down. Try to
592              * signal to resolve the error */
593             if (!queue->may_deadlock) {
594               g_mutex_unlock (queue->qlock);
595               gst_data_unref (data);
596               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
597                   ("deadlock found, shutting down source pad elements"));
598               /* we don't go to out_unref here, since we want to
599                * unref the buffer *before* calling GST_ELEMENT_ERROR */
600               return;
601             } else {
602               GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
603                   "%s: waiting for the app to restart "
604                   "source pad elements", GST_ELEMENT_NAME (queue));
605             }
606           }
607
608           /* OK, we've got a serious issue here. Imagine the situation
609            * where the puller (next element) is sending an event here,
610            * so it cannot pull events from the queue, and we cannot
611            * push data further because the queue is 'full' and therefore,
612            * we wait here (and do not handle events): deadlock! to solve
613            * that, we handle pending upstream events here, too. */
614           gst_queue_handle_pending_events (queue);
615
616           STATUS (queue, "waiting for item_del signal");
617           g_cond_wait (queue->item_del, queue->qlock);
618           STATUS (queue, "received item_del signal");
619         }
620
621         STATUS (queue, "post-full wait");
622         g_mutex_unlock (queue->qlock);
623         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
624         g_mutex_lock (queue->qlock);
625         break;
626     }
627   }
628
629   /* put the buffer on the tail of the list. We keep a reference,
630    * so that the data is read-only while in here. There's a good
631    * reason to do so: we have a size and time counter, and any
632    * modification to the content could change any of the two. */
633   gst_data_ref (data);
634   g_queue_push_tail (queue->queue, data);
635
636   /* Note that we only add buffers (not events) to the statistics */
637   if (GST_IS_BUFFER (data)) {
638     queue->cur_level.buffers++;
639     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
640     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
641       queue->cur_level.time += GST_BUFFER_DURATION (data);
642   }
643
644   STATUS (queue, "+ level");
645
646   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
647   g_cond_signal (queue->item_add);
648   g_mutex_unlock (queue->qlock);
649
650   return;
651
652 out_unref:
653   gst_data_unref (data);
654   return;
655 }
656
657 static GstData *
658 gst_queue_get (GstPad * pad)
659 {
660   GstQueue *queue;
661   GstData *data;
662
663   g_return_val_if_fail (pad != NULL, NULL);
664   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
665
666   queue = GST_QUEUE (gst_pad_get_parent (pad));
667
668 restart:
669   /* have to lock for thread-safety */
670   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locking t:%p", g_thread_self ());
671   g_mutex_lock (queue->qlock);
672   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locked t:%p", g_thread_self ());
673
674   if (queue->queue->length == 0 ||
675       (queue->min_threshold.buffers > 0 &&
676           queue->cur_level.buffers < queue->min_threshold.buffers) ||
677       (queue->min_threshold.bytes > 0 &&
678           queue->cur_level.bytes < queue->min_threshold.bytes) ||
679       (queue->min_threshold.time > 0 &&
680           queue->cur_level.time < queue->min_threshold.time)) {
681     g_mutex_unlock (queue->qlock);
682     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
683     g_mutex_lock (queue->qlock);
684
685     STATUS (queue, "pre-empty wait");
686     while (queue->queue->length == 0 ||
687         (queue->min_threshold.buffers > 0 &&
688             queue->cur_level.buffers < queue->min_threshold.buffers) ||
689         (queue->min_threshold.bytes > 0 &&
690             queue->cur_level.bytes < queue->min_threshold.bytes) ||
691         (queue->min_threshold.time > 0 &&
692             queue->cur_level.time < queue->min_threshold.time)) {
693       /* if there's a pending state change for this queue or its
694        * manager, switch back to iterator so bottom half of state
695        * change executes. */
696       if (queue->interrupt) {
697         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
698         g_mutex_unlock (queue->qlock);
699         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
700                 GST_ELEMENT (queue)))
701           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
702         goto restart;
703       }
704       if (GST_STATE (queue) != GST_STATE_PLAYING) {
705         /* this means the other end is shut down */
706         if (!queue->may_deadlock) {
707           g_mutex_unlock (queue->qlock);
708           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
709               ("deadlock found, shutting down sink pad elements"));
710           goto restart;
711         } else {
712           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
713               "%s: waiting for the app to restart "
714               "source pad elements", GST_ELEMENT_NAME (queue));
715         }
716       }
717
718       STATUS (queue, "waiting for item_add");
719
720       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
721         GTimeVal timeout;
722
723         g_get_current_time (&timeout);
724         g_time_val_add (&timeout, queue->block_timeout / 1000);
725         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
726           g_mutex_unlock (queue->qlock);
727           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
728               "Sending filler event");
729           return GST_DATA (gst_event_new_filler ());
730         }
731       } else {
732         g_cond_wait (queue->item_add, queue->qlock);
733       }
734       STATUS (queue, "got item_add signal");
735     }
736
737     STATUS (queue, "post-empty wait");
738     g_mutex_unlock (queue->qlock);
739     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
740     g_mutex_lock (queue->qlock);
741   }
742
743   /* There's something in the list now, whatever it is */
744   data = g_queue_pop_head (queue->queue);
745   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
746       "retrieved data %p from queue", data);
747
748   if (data == NULL)
749     return NULL;
750
751   if (GST_IS_BUFFER (data)) {
752     /* Update statistics */
753     queue->cur_level.buffers--;
754     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
755     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
756       queue->cur_level.time -= GST_BUFFER_DURATION (data);
757   }
758
759   /* Now that we're done, we can lose our own reference to
760    * the item, since we're no longer in danger. */
761   gst_data_unref (data);
762
763   STATUS (queue, "after _get()");
764
765   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
766   g_cond_signal (queue->item_del);
767   g_mutex_unlock (queue->qlock);
768
769   /* FIXME: I suppose this needs to be locked, since the EOS
770    * bit affects the pipeline state. However, that bit is
771    * locked too so it'd cause a deadlock. */
772   if (GST_IS_EVENT (data)) {
773     GstEvent *event = GST_EVENT (data);
774
775     switch (GST_EVENT_TYPE (event)) {
776       case GST_EVENT_EOS:
777         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
778             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
779         gst_element_set_eos (GST_ELEMENT (queue));
780         break;
781       default:
782         break;
783     }
784   }
785
786   return data;
787 }
788
789
790 static gboolean
791 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
792 {
793   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
794   gboolean res;
795
796   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
797       event, GST_EVENT_TYPE (event));
798   g_mutex_lock (queue->qlock);
799
800   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
801     GstQueueEventResponse er;
802
803     /* push the event to the queue and wait for upstream consumption */
804     er.event = event;
805     er.handled = FALSE;
806     g_mutex_lock (queue->event_lock);
807     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
808         "putting event %p (%d) on internal queue", event,
809         GST_EVENT_TYPE (event));
810     g_queue_push_tail (queue->events, &er);
811     g_mutex_unlock (queue->event_lock);
812     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
813         "Preparing for loop for event handler");
814     /* see the chain function on why this is here - it prevents a deadlock */
815     g_cond_signal (queue->item_del);
816     while (!er.handled) {
817       GTimeVal timeout;
818
819       g_get_current_time (&timeout);
820       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
821       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
822           !er.handled) {
823         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
824             "timeout in upstream event handling, dropping event %p (%s)",
825             er.event, GST_EVENT_TYPE (er.event));
826         g_mutex_lock (queue->event_lock);
827         /* since this queue is for src events (ie upstream), this thread is
828          * the only one that is pushing stuff on it, so we're sure that
829          * it's still the tail element.  FIXME: But in practice, we should use
830          * GList instead of GQueue for this so we can remove any element in
831          * the list. */
832         g_queue_pop_tail (queue->events);
833         g_mutex_unlock (queue->event_lock);
834         gst_event_unref (er.event);
835         res = FALSE;
836         goto handled;
837       }
838     }
839     GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
840     res = er.ret;
841   } else {
842     res = gst_pad_event_default (pad, event);
843
844     switch (GST_EVENT_TYPE (event)) {
845       case GST_EVENT_FLUSH:
846         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
847             "FLUSH event, flushing queue\n");
848         gst_queue_locked_flush (queue);
849         break;
850       case GST_EVENT_SEEK:
851         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
852           gst_queue_locked_flush (queue);
853         }
854       default:
855         break;
856     }
857   }
858 handled:
859   g_mutex_unlock (queue->qlock);
860
861   return res;
862 }
863
864 static gboolean
865 gst_queue_release_locks (GstElement * element)
866 {
867   GstQueue *queue;
868
869   queue = GST_QUEUE (element);
870
871   g_mutex_lock (queue->qlock);
872   queue->interrupt = TRUE;
873   g_cond_signal (queue->item_add);
874   g_cond_signal (queue->item_del);
875   g_mutex_unlock (queue->qlock);
876
877   return TRUE;
878 }
879
880 static GstElementStateReturn
881 gst_queue_change_state (GstElement * element)
882 {
883   GstQueue *queue;
884   GstElementStateReturn ret = GST_STATE_SUCCESS;
885
886   queue = GST_QUEUE (element);
887
888   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
889
890   /* lock the queue so another thread (not in sync with this thread's state)
891    * can't call this queue's _get (or whatever)
892    */
893   g_mutex_lock (queue->qlock);
894
895   switch (GST_STATE_TRANSITION (element)) {
896     case GST_STATE_NULL_TO_READY:
897       gst_queue_locked_flush (queue);
898       break;
899     case GST_STATE_PAUSED_TO_PLAYING:
900       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
901         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
902             "queue %s is not linked", GST_ELEMENT_NAME (queue));
903         /* FIXME can this be? */
904         g_cond_signal (queue->item_add);
905
906         ret = GST_STATE_FAILURE;
907         goto error;
908       } else {
909         GstScheduler *src_sched, *sink_sched;
910
911         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
912         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
913
914         if (src_sched == sink_sched) {
915           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
916               "queue %s does not connect different schedulers",
917               GST_ELEMENT_NAME (queue));
918
919           g_warning ("queue %s does not connect different schedulers",
920               GST_ELEMENT_NAME (queue));
921
922           ret = GST_STATE_FAILURE;
923           goto error;
924         }
925       }
926       queue->interrupt = FALSE;
927       break;
928     case GST_STATE_PAUSED_TO_READY:
929       gst_queue_locked_flush (queue);
930       gst_caps_replace (&queue->negotiated_caps, NULL);
931       break;
932     default:
933       break;
934   }
935
936   if (GST_ELEMENT_CLASS (parent_class)->change_state)
937     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
938
939   /* this is an ugly hack to make sure our pads are always active.
940    * Reason for this is that pad activation for the queue element
941    * depends on 2 schedulers (ugh) */
942   gst_pad_set_active (queue->sinkpad, TRUE);
943   gst_pad_set_active (queue->srcpad, TRUE);
944
945 error:
946   g_mutex_unlock (queue->qlock);
947
948   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
949
950   return ret;
951 }
952
953
954 static void
955 gst_queue_set_property (GObject * object,
956     guint prop_id, const GValue * value, GParamSpec * pspec)
957 {
958   GstQueue *queue = GST_QUEUE (object);
959
960   /* someone could change levels here, and since this
961    * affects the get/put funcs, we need to lock for safety. */
962   g_mutex_lock (queue->qlock);
963
964   switch (prop_id) {
965     case ARG_MAX_SIZE_BYTES:
966       queue->max_size.bytes = g_value_get_uint (value);
967       break;
968     case ARG_MAX_SIZE_BUFFERS:
969       queue->max_size.buffers = g_value_get_uint (value);
970       break;
971     case ARG_MAX_SIZE_TIME:
972       queue->max_size.time = g_value_get_uint64 (value);
973       break;
974     case ARG_MIN_THRESHOLD_BYTES:
975       queue->min_threshold.bytes = g_value_get_uint (value);
976       break;
977     case ARG_MIN_THRESHOLD_BUFFERS:
978       queue->min_threshold.buffers = g_value_get_uint (value);
979       break;
980     case ARG_MIN_THRESHOLD_TIME:
981       queue->min_threshold.time = g_value_get_uint64 (value);
982       break;
983     case ARG_LEAKY:
984       queue->leaky = g_value_get_enum (value);
985       break;
986     case ARG_MAY_DEADLOCK:
987       queue->may_deadlock = g_value_get_boolean (value);
988       break;
989     case ARG_BLOCK_TIMEOUT:
990       queue->block_timeout = g_value_get_uint64 (value);
991       break;
992     default:
993       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
994       break;
995   }
996
997   g_mutex_unlock (queue->qlock);
998 }
999
1000 static void
1001 gst_queue_get_property (GObject * object,
1002     guint prop_id, GValue * value, GParamSpec * pspec)
1003 {
1004   GstQueue *queue = GST_QUEUE (object);
1005
1006   switch (prop_id) {
1007     case ARG_CUR_LEVEL_BYTES:
1008       g_value_set_uint (value, queue->cur_level.bytes);
1009       break;
1010     case ARG_CUR_LEVEL_BUFFERS:
1011       g_value_set_uint (value, queue->cur_level.buffers);
1012       break;
1013     case ARG_CUR_LEVEL_TIME:
1014       g_value_set_uint64 (value, queue->cur_level.time);
1015       break;
1016     case ARG_MAX_SIZE_BYTES:
1017       g_value_set_uint (value, queue->max_size.bytes);
1018       break;
1019     case ARG_MAX_SIZE_BUFFERS:
1020       g_value_set_uint (value, queue->max_size.buffers);
1021       break;
1022     case ARG_MAX_SIZE_TIME:
1023       g_value_set_uint64 (value, queue->max_size.time);
1024       break;
1025     case ARG_MIN_THRESHOLD_BYTES:
1026       g_value_set_uint (value, queue->min_threshold.bytes);
1027       break;
1028     case ARG_MIN_THRESHOLD_BUFFERS:
1029       g_value_set_uint (value, queue->min_threshold.buffers);
1030       break;
1031     case ARG_MIN_THRESHOLD_TIME:
1032       g_value_set_uint64 (value, queue->min_threshold.time);
1033       break;
1034     case ARG_LEAKY:
1035       g_value_set_enum (value, queue->leaky);
1036       break;
1037     case ARG_MAY_DEADLOCK:
1038       g_value_set_boolean (value, queue->may_deadlock);
1039       break;
1040     case ARG_BLOCK_TIMEOUT:
1041       g_value_set_uint64 (value, queue->block_timeout);
1042       break;
1043     default:
1044       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1045       break;
1046   }
1047 }