3b9cb449b9b7b78ca496734febe7c96c4695ea7c
[platform/upstream/gstreamer.git] / gst / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
34     "Generic",
35     "Simple data queue",
36     "Erik Walthinsen <omega@cse.ogi.edu>");
37
38
39 /* Queue signals and args */
40 enum
41 {
42   SIGNAL_UNDERRUN,
43   SIGNAL_RUNNING,
44   SIGNAL_OVERRUN,
45   LAST_SIGNAL
46 };
47
48 enum
49 {
50   ARG_0,
51   /* FIXME: don't we have another way of doing this
52    * "Gstreamer format" (frame/byte/time) queries? */
53   ARG_CUR_LEVEL_BUFFERS,
54   ARG_CUR_LEVEL_BYTES,
55   ARG_CUR_LEVEL_TIME,
56   ARG_MAX_SIZE_BUFFERS,
57   ARG_MAX_SIZE_BYTES,
58   ARG_MAX_SIZE_TIME,
59   ARG_MIN_THRESHOLD_BUFFERS,
60   ARG_MIN_THRESHOLD_BYTES,
61   ARG_MIN_THRESHOLD_TIME,
62   ARG_LEAKY,
63   ARG_MAY_DEADLOCK,
64   ARG_BLOCK_TIMEOUT
65       /* FILL ME */
66 };
67
68 typedef struct _GstQueueEventResponse
69 {
70   GstEvent *event;
71   gboolean ret, handled;
72 }
73 GstQueueEventResponse;
74
75 static void gst_queue_base_init (GstQueueClass * klass);
76 static void gst_queue_class_init (GstQueueClass * klass);
77 static void gst_queue_init (GstQueue * queue);
78 static void gst_queue_dispose (GObject * object);
79
80 static void gst_queue_set_property (GObject * object,
81     guint prop_id, const GValue * value, GParamSpec * pspec);
82 static void gst_queue_get_property (GObject * object,
83     guint prop_id, GValue * value, GParamSpec * pspec);
84
85 static void gst_queue_chain (GstPad * pad, GstData * data);
86 static GstData *gst_queue_get (GstPad * pad);
87
88 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
89
90 static GstCaps *gst_queue_getcaps (GstPad * pad);
91 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
92 static void gst_queue_locked_flush (GstQueue * queue);
93
94 static GstElementStateReturn gst_queue_change_state (GstElement * element);
95 static gboolean gst_queue_release_locks (GstElement * element);
96
97
98 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
99
100 static GType
101 queue_leaky_get_type (void)
102 {
103   static GType queue_leaky_type = 0;
104   static GEnumValue queue_leaky[] = {
105     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
106     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
107     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
108     {0, NULL, NULL},
109   };
110
111   if (!queue_leaky_type) {
112     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
113   }
114   return queue_leaky_type;
115 }
116
117 static GstElementClass *parent_class = NULL;
118 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
119
120 GType
121 gst_queue_get_type (void)
122 {
123   static GType queue_type = 0;
124
125   if (!queue_type) {
126     static const GTypeInfo queue_info = {
127       sizeof (GstQueueClass),
128       (GBaseInitFunc) gst_queue_base_init,
129       NULL,
130       (GClassInitFunc) gst_queue_class_init,
131       NULL,
132       NULL,
133       sizeof (GstQueue),
134       0,
135       (GInstanceInitFunc) gst_queue_init,
136       NULL
137     };
138
139     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
140         "GstQueue", &queue_info, 0);
141   }
142
143   return queue_type;
144 }
145
146 static void
147 gst_queue_base_init (GstQueueClass * klass)
148 {
149   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
150
151   gst_element_class_set_details (gstelement_class, &gst_queue_details);
152 }
153
154 static void
155 gst_queue_class_init (GstQueueClass * klass)
156 {
157   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
158   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
159
160   parent_class = g_type_class_peek_parent (klass);
161
162   /* signals */
163   gst_queue_signals[SIGNAL_UNDERRUN] =
164       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
165       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
166       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
167   gst_queue_signals[SIGNAL_RUNNING] =
168       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171   gst_queue_signals[SIGNAL_OVERRUN] =
172       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
173       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
174       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
175
176   /* properties */
177   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
178       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
179           "Current amount of data in the queue (bytes)",
180           0, G_MAXUINT, 0, G_PARAM_READABLE));
181   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
182       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
183           "Current number of buffers in the queue",
184           0, G_MAXUINT, 0, G_PARAM_READABLE));
185   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
186       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
187           "Current amount of data in the queue (in ns)",
188           0, G_MAXUINT64, 0, G_PARAM_READABLE));
189
190   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
191       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
192           "Max. amount of data in the queue (bytes, 0=disable)",
193           0, G_MAXUINT, 0, G_PARAM_READWRITE));
194   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
195       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
196           "Max. number of buffers in the queue (0=disable)",
197           0, G_MAXUINT, 0, G_PARAM_READWRITE));
198   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
199       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
200           "Max. amount of data in the queue (in ns, 0=disable)",
201           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
202
203   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
204       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
205           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
206           0, G_MAXUINT, 0, G_PARAM_READWRITE));
207   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
208       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
209           "Min. number of buffers in the queue to allow reading (0=disable)",
210           0, G_MAXUINT, 0, G_PARAM_READWRITE));
211   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
212       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
213           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
214           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
215
216   g_object_class_install_property (gobject_class, ARG_LEAKY,
217       g_param_spec_enum ("leaky", "Leaky",
218           "Where the queue leaks, if at all",
219           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
220   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
221       g_param_spec_boolean ("may_deadlock", "May Deadlock",
222           "The queue may deadlock if it's full and not PLAYING",
223           TRUE, G_PARAM_READWRITE));
224   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
225       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
226           "Nanoseconds until blocked queue times out and returns filler event. "
227           "Value of -1 disables timeout",
228           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
229
230   /* set several parent class virtual functions */
231   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
232   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
233   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
234
235   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
236   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
237 }
238
239 static void
240 gst_queue_init (GstQueue * queue)
241 {
242   /* scheduling on this kind of element is, well, interesting */
243   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
244   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
245
246   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
247   gst_pad_set_chain_function (queue->sinkpad,
248       GST_DEBUG_FUNCPTR (gst_queue_chain));
249   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
250   gst_pad_set_link_function (queue->sinkpad,
251       GST_DEBUG_FUNCPTR (gst_queue_link));
252   gst_pad_set_getcaps_function (queue->sinkpad,
253       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
254   gst_pad_set_active (queue->sinkpad, TRUE);
255
256   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
257   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
258   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
259   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
260   gst_pad_set_getcaps_function (queue->srcpad,
261       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
262   gst_pad_set_event_function (queue->srcpad,
263       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
264   gst_pad_set_active (queue->srcpad, TRUE);
265
266   queue->cur_level.buffers = 0; /* no content */
267   queue->cur_level.bytes = 0;   /* no content */
268   queue->cur_level.time = 0;    /* no content */
269   queue->max_size.buffers = 100;        /* 100 buffers */
270   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
271   queue->max_size.time = GST_SECOND;    /* 1 s. */
272   queue->min_threshold.buffers = 0;     /* no threshold */
273   queue->min_threshold.bytes = 0;       /* no threshold */
274   queue->min_threshold.time = 0;        /* no threshold */
275
276   queue->leaky = GST_QUEUE_NO_LEAK;
277   queue->may_deadlock = TRUE;
278   queue->block_timeout = GST_CLOCK_TIME_NONE;
279   queue->interrupt = FALSE;
280   queue->flush = FALSE;
281
282   queue->qlock = g_mutex_new ();
283   queue->item_add = g_cond_new ();
284   queue->item_del = g_cond_new ();
285   queue->event_done = g_cond_new ();
286   queue->events = g_queue_new ();
287   queue->queue = g_queue_new ();
288
289   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
290       "initialized queue's not_empty & not_full conditions");
291 }
292
293 static void
294 gst_queue_dispose (GObject * object)
295 {
296   GstQueue *queue = GST_QUEUE (object);
297
298   gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
299
300   while (!g_queue_is_empty (queue->queue)) {
301     GstData *data = g_queue_pop_head (queue->queue);
302
303     gst_data_unref (data);
304   }
305   g_queue_free (queue->queue);
306   g_mutex_free (queue->qlock);
307   g_cond_free (queue->item_add);
308   g_cond_free (queue->item_del);
309   g_cond_free (queue->event_done);
310   while (!g_queue_is_empty (queue->events)) {
311     GstEvent *event = g_queue_pop_head (queue->events);
312
313     gst_event_unref (event);
314   }
315
316   if (G_OBJECT_CLASS (parent_class)->dispose)
317     G_OBJECT_CLASS (parent_class)->dispose (object);
318 }
319
320 static GstCaps *
321 gst_queue_getcaps (GstPad * pad)
322 {
323   GstQueue *queue;
324
325   queue = GST_QUEUE (gst_pad_get_parent (pad));
326
327   if (queue->cur_level.bytes > 0) {
328     return gst_caps_copy (queue->negotiated_caps);
329   }
330
331   return gst_pad_proxy_getcaps (pad);
332 }
333
334 static GstPadLinkReturn
335 gst_queue_link (GstPad * pad, const GstCaps * caps)
336 {
337   GstQueue *queue;
338   GstPadLinkReturn link_ret;
339
340   queue = GST_QUEUE (gst_pad_get_parent (pad));
341
342   if (queue->cur_level.bytes > 0) {
343     if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
344       return GST_PAD_LINK_OK;
345     }
346     return GST_PAD_LINK_REFUSED;
347   }
348
349   link_ret = gst_pad_proxy_pad_link (pad, caps);
350
351   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
352     /* we store an extra copy of the negotiated caps, just in case
353      * the pads become unnegotiated while we have buffers */
354     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
355   }
356
357   return link_ret;
358 }
359
360 static void
361 gst_queue_locked_flush (GstQueue * queue)
362 {
363   while (!g_queue_is_empty (queue->queue)) {
364     GstData *data = g_queue_pop_head (queue->queue);
365
366     /* First loose the reference we added when putting that data in the queue */
367     gst_data_unref (data);
368     /* Then loose another reference because we are supposed to destroy that
369        data when flushing */
370     gst_data_unref (data);
371   }
372   queue->timeval = NULL;
373   queue->cur_level.buffers = 0;
374   queue->cur_level.bytes = 0;
375   queue->cur_level.time = 0;
376
377   /* make sure any pending buffers to be added are flushed too */
378   queue->flush = TRUE;
379
380   /* we deleted something... */
381   g_cond_signal (queue->item_del);
382 }
383
384 static void
385 gst_queue_handle_pending_events (GstQueue * queue)
386 {
387   /* check for events to send upstream */
388   while (!g_queue_is_empty (queue->events)) {
389     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
390
391     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
392     er->ret = gst_pad_event_default (queue->srcpad, er->event);
393     er->handled = TRUE;
394     g_cond_signal (queue->event_done);
395     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
396   }
397 }
398
399 #define STATUS(queue, msg) \
400   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
401                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
402                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
403                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
404                       GST_DEBUG_PAD_NAME (pad), \
405                       queue->cur_level.buffers, \
406                       queue->min_threshold.buffers, \
407                       queue->max_size.buffers, \
408                       queue->cur_level.bytes, \
409                       queue->min_threshold.bytes, \
410                       queue->max_size.bytes, \
411                       queue->cur_level.time, \
412                       queue->min_threshold.time, \
413                       queue->max_size.time, \
414                       queue->queue->length)
415
416 static void
417 gst_queue_chain (GstPad * pad, GstData * data)
418 {
419   GstQueue *queue;
420
421   g_return_if_fail (pad != NULL);
422   g_return_if_fail (GST_IS_PAD (pad));
423   g_return_if_fail (data != NULL);
424
425   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
426
427 restart:
428   /* we have to lock the queue since we span threads */
429   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p",
430       g_thread_self ());
431   g_mutex_lock (queue->qlock);
432   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
433
434   gst_queue_handle_pending_events (queue);
435
436   /* assume don't need to flush this buffer when the queue is filled */
437   queue->flush = FALSE;
438
439   if (GST_IS_EVENT (data)) {
440     switch (GST_EVENT_TYPE (data)) {
441       case GST_EVENT_FLUSH:
442         STATUS (queue, "received flush event");
443         gst_queue_locked_flush (queue);
444         STATUS (queue, "after flush");
445         break;
446       case GST_EVENT_EOS:
447         STATUS (queue, "received EOS");
448         break;
449       default:
450         /* we put the event in the queue, we don't have to act ourselves */
451         GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
452             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
453         break;
454     }
455   }
456
457   if (GST_IS_BUFFER (data))
458     GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
459         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
460
461   /* We make space available if we're "full" according to whatever
462    * the user defined as "full". Note that this only applies to buffers.
463    * We always handle events and they don't count in our statistics. */
464   if (GST_IS_BUFFER (data) &&
465       ((queue->max_size.buffers > 0 &&
466               queue->cur_level.buffers >= queue->max_size.buffers) ||
467           (queue->max_size.bytes > 0 &&
468               queue->cur_level.bytes >= queue->max_size.bytes) ||
469           (queue->max_size.time > 0 &&
470               queue->cur_level.time >= queue->max_size.time))) {
471     g_mutex_unlock (queue->qlock);
472     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
473     g_mutex_lock (queue->qlock);
474
475     /* how are we going to make space for this buffer? */
476     switch (queue->leaky) {
477         /* leak current buffer */
478       case GST_QUEUE_LEAK_UPSTREAM:
479         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
480             "queue is full, leaking buffer on upstream end");
481         /* now we can clean up and exit right away */
482         g_mutex_unlock (queue->qlock);
483         goto out_unref;
484
485         /* leak first buffer in the queue */
486       case GST_QUEUE_LEAK_DOWNSTREAM:{
487         /* this is a bit hacky. We'll manually iterate the list
488          * and find the first buffer from the head on. We'll
489          * unref that and "fix up" the GQueue object... */
490         GList *item;
491         GstData *leak = NULL;
492
493         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
494             "queue is full, leaking buffer on downstream end");
495
496         for (item = queue->queue->head; item != NULL; item = item->next) {
497           if (GST_IS_BUFFER (item->data)) {
498             leak = item->data;
499             break;
500           }
501         }
502
503         /* if we didn't find anything, it means we have no buffers
504          * in here. That cannot happen, since we had >= 1 bufs */
505         g_assert (leak);
506
507         /* Now remove it from the list, fixing up the GQueue
508          * CHECKME: is a queue->head the first or the last item? */
509         item = g_list_delete_link (queue->queue->head, item);
510         queue->queue->head = g_list_first (item);
511         queue->queue->tail = g_list_last (item);
512         queue->queue->length--;
513
514         /* and unref the data at the end. Twice, because we keep a ref
515          * to make things read-only. Also keep our list uptodate. */
516         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
517         queue->cur_level.buffers--;
518         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
519           queue->cur_level.time -= GST_BUFFER_DURATION (data);
520
521         gst_data_unref (data);
522         gst_data_unref (data);
523         break;
524       }
525
526       default:
527         g_warning ("Unknown leaky type, using default");
528         /* fall-through */
529
530         /* don't leak. Instead, wait for space to be available */
531       case GST_QUEUE_NO_LEAK:
532         STATUS (queue, "pre-full wait");
533
534         while ((queue->max_size.buffers > 0 &&
535                 queue->cur_level.buffers >= queue->max_size.buffers) ||
536             (queue->max_size.bytes > 0 &&
537                 queue->cur_level.bytes >= queue->max_size.bytes) ||
538             (queue->max_size.time > 0 &&
539                 queue->cur_level.time >= queue->max_size.time)) {
540           /* if there's a pending state change for this queue
541            * or its manager, switch back to iterator so bottom
542            * half of state change executes */
543           if (queue->interrupt) {
544             GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
545             g_mutex_unlock (queue->qlock);
546             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
547                     GST_ELEMENT (queue))) {
548               goto out_unref;
549             }
550             /* if we got here because we were unlocked after a
551              * flush, we don't need to add the buffer to the
552              * queue again */
553             if (queue->flush) {
554               GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
555                   "not adding pending buffer after flush");
556               goto out_unref;
557             }
558             GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
559                 "adding pending buffer after interrupt");
560             goto restart;
561           }
562
563           if (GST_STATE (queue) != GST_STATE_PLAYING) {
564             /* this means the other end is shut down. Try to
565              * signal to resolve the error */
566             if (!queue->may_deadlock) {
567               g_mutex_unlock (queue->qlock);
568               gst_data_unref (data);
569               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
570                   ("deadlock found, shutting down source pad elements"));
571               /* we don't go to out_unref here, since we want to
572                * unref the buffer *before* calling GST_ELEMENT_ERROR */
573               return;
574             } else {
575               GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
576                   "%s: waiting for the app to restart "
577                   "source pad elements", GST_ELEMENT_NAME (queue));
578             }
579           }
580
581           /* OK, we've got a serious issue here. Imagine the situation
582            * where the puller (next element) is sending an event here,
583            * so it cannot pull events from the queue, and we cannot
584            * push data further because the queue is 'full' and therefore,
585            * we wait here (and do not handle events): deadlock! to solve
586            * that, we handle pending upstream events here, too. */
587           gst_queue_handle_pending_events (queue);
588
589           STATUS (queue, "waiting for item_del signal");
590           g_cond_wait (queue->item_del, queue->qlock);
591           STATUS (queue, "received item_del signal");
592         }
593
594         STATUS (queue, "post-full wait");
595         g_mutex_unlock (queue->qlock);
596         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
597         g_mutex_lock (queue->qlock);
598         break;
599     }
600   }
601
602   /* put the buffer on the tail of the list. We keep a reference,
603    * so that the data is read-only while in here. There's a good
604    * reason to do so: we have a size and time counter, and any
605    * modification to the content could change any of the two. */
606   gst_data_ref (data);
607   g_queue_push_tail (queue->queue, data);
608
609   /* Note that we only add buffers (not events) to the statistics */
610   if (GST_IS_BUFFER (data)) {
611     queue->cur_level.buffers++;
612     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
613     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
614       queue->cur_level.time += GST_BUFFER_DURATION (data);
615   }
616
617   STATUS (queue, "+ level");
618
619   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add");
620   g_cond_signal (queue->item_add);
621   g_mutex_unlock (queue->qlock);
622
623   return;
624
625 out_unref:
626   gst_data_unref (data);
627   return;
628 }
629
630 static GstData *
631 gst_queue_get (GstPad * pad)
632 {
633   GstQueue *queue;
634   GstData *data;
635
636   g_return_val_if_fail (pad != NULL, NULL);
637   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
638
639   queue = GST_QUEUE (gst_pad_get_parent (pad));
640
641 restart:
642   /* have to lock for thread-safety */
643   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
644       "locking t:%p", g_thread_self ());
645   g_mutex_lock (queue->qlock);
646   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
647
648   if (queue->queue->length == 0 ||
649       (queue->min_threshold.buffers > 0 &&
650           queue->cur_level.buffers < queue->min_threshold.buffers) ||
651       (queue->min_threshold.bytes > 0 &&
652           queue->cur_level.bytes < queue->min_threshold.bytes) ||
653       (queue->min_threshold.time > 0 &&
654           queue->cur_level.time < queue->min_threshold.time)) {
655     g_mutex_unlock (queue->qlock);
656     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
657     g_mutex_lock (queue->qlock);
658
659     STATUS (queue, "pre-empty wait");
660     while (queue->queue->length == 0 ||
661         (queue->min_threshold.buffers > 0 &&
662             queue->cur_level.buffers < queue->min_threshold.buffers) ||
663         (queue->min_threshold.bytes > 0 &&
664             queue->cur_level.bytes < queue->min_threshold.bytes) ||
665         (queue->min_threshold.time > 0 &&
666             queue->cur_level.time < queue->min_threshold.time)) {
667       /* if there's a pending state change for this queue or its
668        * manager, switch back to iterator so bottom half of state
669        * change executes. */
670       if (queue->interrupt) {
671         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
672         g_mutex_unlock (queue->qlock);
673         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
674                 GST_ELEMENT (queue)))
675           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
676         goto restart;
677       }
678       if (GST_STATE (queue) != GST_STATE_PLAYING) {
679         /* this means the other end is shut down */
680         if (!queue->may_deadlock) {
681           g_mutex_unlock (queue->qlock);
682           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
683               ("deadlock found, shutting down sink pad elements"));
684           goto restart;
685         } else {
686           GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
687               "%s: waiting for the app to restart "
688               "source pad elements", GST_ELEMENT_NAME (queue));
689         }
690       }
691
692       STATUS (queue, "waiting for item_add");
693
694       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
695         GTimeVal timeout;
696
697         g_get_current_time (&timeout);
698         g_time_val_add (&timeout, queue->block_timeout / 1000);
699         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
700           g_mutex_unlock (queue->qlock);
701           GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
702               "Sending filler event");
703           return GST_DATA (gst_event_new_filler ());
704         }
705       } else {
706         g_cond_wait (queue->item_add, queue->qlock);
707       }
708       STATUS (queue, "got item_add signal");
709     }
710
711     STATUS (queue, "post-empty wait");
712     g_mutex_unlock (queue->qlock);
713     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
714     g_mutex_lock (queue->qlock);
715   }
716
717   /* There's something in the list now, whatever it is */
718   data = g_queue_pop_head (queue->queue);
719   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
720       "retrieved data %p from queue", data);
721
722   if (data == NULL)
723     return NULL;
724
725   if (GST_IS_BUFFER (data)) {
726     /* Update statistics */
727     queue->cur_level.buffers--;
728     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
729     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
730       queue->cur_level.time -= GST_BUFFER_DURATION (data);
731   }
732
733   /* Now that we're done, we can lose our own reference to
734    * the item, since we're no longer in danger. */
735   gst_data_unref (data);
736
737   STATUS (queue, "after _get()");
738
739   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
740   g_cond_signal (queue->item_del);
741   g_mutex_unlock (queue->qlock);
742
743   /* FIXME: I suppose this needs to be locked, since the EOS
744    * bit affects the pipeline state. However, that bit is
745    * locked too so it'd cause a deadlock. */
746   if (GST_IS_EVENT (data)) {
747     GstEvent *event = GST_EVENT (data);
748
749     switch (GST_EVENT_TYPE (event)) {
750       case GST_EVENT_EOS:
751         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
752             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
753         gst_element_set_eos (GST_ELEMENT (queue));
754         break;
755       default:
756         break;
757     }
758   }
759
760   return data;
761 }
762
763
764 static gboolean
765 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
766 {
767   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
768   gboolean res;
769
770   g_mutex_lock (queue->qlock);
771
772   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
773     GstQueueEventResponse er;
774
775     /* push the event to the queue and wait for upstream consumption */
776     er.event = event;
777     er.handled = FALSE;
778     g_queue_push_tail (queue->events, &er);
779     GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
780         "Preparing for loop for event handler");
781     /* see the chain function on why this is here - it prevents a deadlock */
782     g_cond_signal (queue->item_del);
783     while (!er.handled) {
784       GTimeVal timeout;
785
786       g_get_current_time (&timeout);
787       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
788       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
789           !er.handled) {
790         GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
791             "timeout in upstream event handling");
792         /* remove ourselves from the pending list. Since we're
793          * locked, others cannot reference this anymore. */
794         queue->queue->head = g_list_remove (queue->queue->head, &er);
795         queue->queue->head = g_list_first (queue->queue->head);
796         queue->queue->tail = g_list_last (queue->queue->head);
797         queue->queue->length--;
798         res = FALSE;
799         goto handled;
800       }
801     }
802     GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, "Event handled");
803     res = er.ret;
804   } else {
805     res = gst_pad_event_default (pad, event);
806
807     switch (GST_EVENT_TYPE (event)) {
808       case GST_EVENT_FLUSH:
809         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
810             "FLUSH event, flushing queue\n");
811         gst_queue_locked_flush (queue);
812         break;
813       case GST_EVENT_SEEK:
814         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
815           gst_queue_locked_flush (queue);
816         }
817       default:
818         break;
819     }
820   }
821 handled:
822   g_mutex_unlock (queue->qlock);
823
824   return res;
825 }
826
827 static gboolean
828 gst_queue_release_locks (GstElement * element)
829 {
830   GstQueue *queue;
831
832   queue = GST_QUEUE (element);
833
834   g_mutex_lock (queue->qlock);
835   queue->interrupt = TRUE;
836   g_cond_signal (queue->item_add);
837   g_cond_signal (queue->item_del);
838   g_mutex_unlock (queue->qlock);
839
840   return TRUE;
841 }
842
843 static GstElementStateReturn
844 gst_queue_change_state (GstElement * element)
845 {
846   GstQueue *queue;
847   GstElementStateReturn ret = GST_STATE_SUCCESS;
848
849   queue = GST_QUEUE (element);
850
851   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
852
853   /* lock the queue so another thread (not in sync with this thread's state)
854    * can't call this queue's _get (or whatever)
855    */
856   g_mutex_lock (queue->qlock);
857
858   switch (GST_STATE_TRANSITION (element)) {
859     case GST_STATE_NULL_TO_READY:
860       gst_queue_locked_flush (queue);
861       break;
862     case GST_STATE_PAUSED_TO_PLAYING:
863       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
864         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
865             "queue %s is not linked", GST_ELEMENT_NAME (queue));
866         /* FIXME can this be? */
867         g_cond_signal (queue->item_add);
868
869         ret = GST_STATE_FAILURE;
870         goto error;
871       } else {
872         GstScheduler *src_sched, *sink_sched;
873
874         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
875         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
876
877         if (src_sched == sink_sched) {
878           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
879               "queue %s does not connect different schedulers",
880               GST_ELEMENT_NAME (queue));
881
882           g_warning ("queue %s does not connect different schedulers",
883               GST_ELEMENT_NAME (queue));
884
885           ret = GST_STATE_FAILURE;
886           goto error;
887         }
888       }
889       queue->interrupt = FALSE;
890       break;
891     case GST_STATE_PAUSED_TO_READY:
892       gst_queue_locked_flush (queue);
893       break;
894     default:
895       break;
896   }
897
898   if (GST_ELEMENT_CLASS (parent_class)->change_state)
899     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
900
901   /* this is an ugly hack to make sure our pads are always active.
902    * Reason for this is that pad activation for the queue element
903    * depends on 2 schedulers (ugh) */
904   gst_pad_set_active (queue->sinkpad, TRUE);
905   gst_pad_set_active (queue->srcpad, TRUE);
906
907 error:
908   g_mutex_unlock (queue->qlock);
909
910   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
911
912   return ret;
913 }
914
915
916 static void
917 gst_queue_set_property (GObject * object,
918     guint prop_id, const GValue * value, GParamSpec * pspec)
919 {
920   GstQueue *queue = GST_QUEUE (object);
921
922   /* someone could change levels here, and since this
923    * affects the get/put funcs, we need to lock for safety. */
924   g_mutex_lock (queue->qlock);
925
926   switch (prop_id) {
927     case ARG_MAX_SIZE_BYTES:
928       queue->max_size.bytes = g_value_get_uint (value);
929       break;
930     case ARG_MAX_SIZE_BUFFERS:
931       queue->max_size.buffers = g_value_get_uint (value);
932       break;
933     case ARG_MAX_SIZE_TIME:
934       queue->max_size.time = g_value_get_uint64 (value);
935       break;
936     case ARG_MIN_THRESHOLD_BYTES:
937       queue->min_threshold.bytes = g_value_get_uint (value);
938       break;
939     case ARG_MIN_THRESHOLD_BUFFERS:
940       queue->min_threshold.buffers = g_value_get_uint (value);
941       break;
942     case ARG_MIN_THRESHOLD_TIME:
943       queue->min_threshold.time = g_value_get_uint64 (value);
944       break;
945     case ARG_LEAKY:
946       queue->leaky = g_value_get_enum (value);
947       break;
948     case ARG_MAY_DEADLOCK:
949       queue->may_deadlock = g_value_get_boolean (value);
950       break;
951     case ARG_BLOCK_TIMEOUT:
952       queue->block_timeout = g_value_get_uint64 (value);
953       break;
954     default:
955       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
956       break;
957   }
958
959   g_mutex_unlock (queue->qlock);
960 }
961
962 static void
963 gst_queue_get_property (GObject * object,
964     guint prop_id, GValue * value, GParamSpec * pspec)
965 {
966   GstQueue *queue = GST_QUEUE (object);
967
968   switch (prop_id) {
969     case ARG_CUR_LEVEL_BYTES:
970       g_value_set_uint (value, queue->cur_level.bytes);
971       break;
972     case ARG_CUR_LEVEL_BUFFERS:
973       g_value_set_uint (value, queue->cur_level.buffers);
974       break;
975     case ARG_CUR_LEVEL_TIME:
976       g_value_set_uint64 (value, queue->cur_level.time);
977       break;
978     case ARG_MAX_SIZE_BYTES:
979       g_value_set_uint (value, queue->max_size.bytes);
980       break;
981     case ARG_MAX_SIZE_BUFFERS:
982       g_value_set_uint (value, queue->max_size.buffers);
983       break;
984     case ARG_MAX_SIZE_TIME:
985       g_value_set_uint64 (value, queue->max_size.time);
986       break;
987     case ARG_MIN_THRESHOLD_BYTES:
988       g_value_set_uint (value, queue->min_threshold.bytes);
989       break;
990     case ARG_MIN_THRESHOLD_BUFFERS:
991       g_value_set_uint (value, queue->min_threshold.buffers);
992       break;
993     case ARG_MIN_THRESHOLD_TIME:
994       g_value_set_uint64 (value, queue->min_threshold.time);
995       break;
996     case ARG_LEAKY:
997       g_value_set_enum (value, queue->leaky);
998       break;
999     case ARG_MAY_DEADLOCK:
1000       g_value_set_boolean (value, queue->may_deadlock);
1001       break;
1002     case ARG_BLOCK_TIMEOUT:
1003       g_value_set_uint64 (value, queue->block_timeout);
1004       break;
1005     default:
1006       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1007       break;
1008   }
1009 }