fix queue event code
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
34
35 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
36     "Generic",
37     "Simple data queue",
38     "Erik Walthinsen <omega@cse.ogi.edu>");
39
40
41 /* Queue signals and args */
42 enum
43 {
44   SIGNAL_UNDERRUN,
45   SIGNAL_RUNNING,
46   SIGNAL_OVERRUN,
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   ARG_0,
53   /* FIXME: don't we have another way of doing this
54    * "Gstreamer format" (frame/byte/time) queries? */
55   ARG_CUR_LEVEL_BUFFERS,
56   ARG_CUR_LEVEL_BYTES,
57   ARG_CUR_LEVEL_TIME,
58   ARG_MAX_SIZE_BUFFERS,
59   ARG_MAX_SIZE_BYTES,
60   ARG_MAX_SIZE_TIME,
61   ARG_MIN_THRESHOLD_BUFFERS,
62   ARG_MIN_THRESHOLD_BYTES,
63   ARG_MIN_THRESHOLD_TIME,
64   ARG_LEAKY,
65   ARG_MAY_DEADLOCK,
66   ARG_BLOCK_TIMEOUT
67       /* FILL ME */
68 };
69
70 typedef struct _GstQueueEventResponse
71 {
72   GstEvent *event;
73   gboolean ret, handled;
74 }
75 GstQueueEventResponse;
76
77 static void gst_queue_base_init (GstQueueClass * klass);
78 static void gst_queue_class_init (GstQueueClass * klass);
79 static void gst_queue_init (GstQueue * queue);
80 static void gst_queue_dispose (GObject * object);
81
82 static void gst_queue_set_property (GObject * object,
83     guint prop_id, const GValue * value, GParamSpec * pspec);
84 static void gst_queue_get_property (GObject * object,
85     guint prop_id, GValue * value, GParamSpec * pspec);
86
87 static void gst_queue_chain (GstPad * pad, GstData * data);
88 static GstData *gst_queue_get (GstPad * pad);
89
90 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
91
92 static GstCaps *gst_queue_getcaps (GstPad * pad);
93 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
94 static void gst_queue_locked_flush (GstQueue * queue);
95
96 static GstElementStateReturn gst_queue_change_state (GstElement * element);
97 static gboolean gst_queue_release_locks (GstElement * element);
98
99
100 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
101
102 static GType
103 queue_leaky_get_type (void)
104 {
105   static GType queue_leaky_type = 0;
106   static GEnumValue queue_leaky[] = {
107     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
108     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
109     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
110     {0, NULL, NULL},
111   };
112
113   if (!queue_leaky_type) {
114     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
115   }
116   return queue_leaky_type;
117 }
118
119 static GstElementClass *parent_class = NULL;
120 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
121
122 GType
123 gst_queue_get_type (void)
124 {
125   static GType queue_type = 0;
126
127   if (!queue_type) {
128     static const GTypeInfo queue_info = {
129       sizeof (GstQueueClass),
130       (GBaseInitFunc) gst_queue_base_init,
131       NULL,
132       (GClassInitFunc) gst_queue_class_init,
133       NULL,
134       NULL,
135       sizeof (GstQueue),
136       0,
137       (GInstanceInitFunc) gst_queue_init,
138       NULL
139     };
140
141     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
142         "GstQueue", &queue_info, 0);
143     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
144         "dataflow inside the queue element");
145   }
146
147   return queue_type;
148 }
149
150 static void
151 gst_queue_base_init (GstQueueClass * klass)
152 {
153   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
154
155   gst_element_class_set_details (gstelement_class, &gst_queue_details);
156 }
157
158 static void
159 gst_queue_class_init (GstQueueClass * klass)
160 {
161   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
162   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
163
164   parent_class = g_type_class_peek_parent (klass);
165
166   /* signals */
167   gst_queue_signals[SIGNAL_UNDERRUN] =
168       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
169       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
170       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
171   gst_queue_signals[SIGNAL_RUNNING] =
172       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
173       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
174       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
175   gst_queue_signals[SIGNAL_OVERRUN] =
176       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
177       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
178       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
179
180   /* properties */
181   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
182       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
183           "Current amount of data in the queue (bytes)",
184           0, G_MAXUINT, 0, G_PARAM_READABLE));
185   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
186       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
187           "Current number of buffers in the queue",
188           0, G_MAXUINT, 0, G_PARAM_READABLE));
189   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
190       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
191           "Current amount of data in the queue (in ns)",
192           0, G_MAXUINT64, 0, G_PARAM_READABLE));
193
194   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
195       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
196           "Max. amount of data in the queue (bytes, 0=disable)",
197           0, G_MAXUINT, 0, G_PARAM_READWRITE));
198   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
199       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
200           "Max. number of buffers in the queue (0=disable)",
201           0, G_MAXUINT, 0, G_PARAM_READWRITE));
202   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
203       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204           "Max. amount of data in the queue (in ns, 0=disable)",
205           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
206
207   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
208       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
209           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
210           0, G_MAXUINT, 0, G_PARAM_READWRITE));
211   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
212       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
213           "Min. number of buffers in the queue to allow reading (0=disable)",
214           0, G_MAXUINT, 0, G_PARAM_READWRITE));
215   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
216       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
217           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
218           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
219
220   g_object_class_install_property (gobject_class, ARG_LEAKY,
221       g_param_spec_enum ("leaky", "Leaky",
222           "Where the queue leaks, if at all",
223           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
224   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
225       g_param_spec_boolean ("may_deadlock", "May Deadlock",
226           "The queue may deadlock if it's full and not PLAYING",
227           TRUE, G_PARAM_READWRITE));
228   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
229       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
230           "Nanoseconds until blocked queue times out and returns filler event. "
231           "Value of -1 disables timeout",
232           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
233
234   /* set several parent class virtual functions */
235   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
236   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
237   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
238
239   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
240   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
241 }
242
243 static void
244 gst_queue_init (GstQueue * queue)
245 {
246   /* scheduling on this kind of element is, well, interesting */
247   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
248   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
249
250   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
251   gst_pad_set_chain_function (queue->sinkpad,
252       GST_DEBUG_FUNCPTR (gst_queue_chain));
253   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
254   gst_pad_set_link_function (queue->sinkpad,
255       GST_DEBUG_FUNCPTR (gst_queue_link));
256   gst_pad_set_getcaps_function (queue->sinkpad,
257       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
258   gst_pad_set_active (queue->sinkpad, TRUE);
259
260   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
261   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
262   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
263   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
264   gst_pad_set_getcaps_function (queue->srcpad,
265       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
266   gst_pad_set_event_function (queue->srcpad,
267       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
268   gst_pad_set_active (queue->srcpad, TRUE);
269
270   queue->cur_level.buffers = 0; /* no content */
271   queue->cur_level.bytes = 0;   /* no content */
272   queue->cur_level.time = 0;    /* no content */
273   queue->max_size.buffers = 100;        /* 100 buffers */
274   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
275   queue->max_size.time = GST_SECOND;    /* 1 s. */
276   queue->min_threshold.buffers = 0;     /* no threshold */
277   queue->min_threshold.bytes = 0;       /* no threshold */
278   queue->min_threshold.time = 0;        /* no threshold */
279
280   queue->leaky = GST_QUEUE_NO_LEAK;
281   queue->may_deadlock = TRUE;
282   queue->block_timeout = GST_CLOCK_TIME_NONE;
283   queue->interrupt = FALSE;
284   queue->flush = FALSE;
285
286   queue->qlock = g_mutex_new ();
287   queue->item_add = g_cond_new ();
288   queue->item_del = g_cond_new ();
289   queue->event_done = g_cond_new ();
290   queue->events = g_queue_new ();
291   queue->event_lock = g_mutex_new ();
292   queue->queue = g_queue_new ();
293
294   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
295       "initialized queue's not_empty & not_full conditions");
296 }
297
298 static void
299 gst_queue_dispose (GObject * object)
300 {
301   GstQueue *queue = GST_QUEUE (object);
302
303   gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
304
305   while (!g_queue_is_empty (queue->queue)) {
306     GstData *data = g_queue_pop_head (queue->queue);
307
308     gst_data_unref (data);
309   }
310   g_queue_free (queue->queue);
311   g_mutex_free (queue->qlock);
312   g_cond_free (queue->item_add);
313   g_cond_free (queue->item_del);
314   g_cond_free (queue->event_done);
315   g_mutex_lock (queue->event_lock);
316   while (!g_queue_is_empty (queue->events)) {
317     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
318
319     gst_event_unref (er->event);
320   }
321   g_mutex_unlock (queue->event_lock);
322   g_mutex_free (queue->event_lock);
323   g_queue_free (queue->events);
324
325   if (G_OBJECT_CLASS (parent_class)->dispose)
326     G_OBJECT_CLASS (parent_class)->dispose (object);
327 }
328
329 static GstCaps *
330 gst_queue_getcaps (GstPad * pad)
331 {
332   GstQueue *queue;
333
334   queue = GST_QUEUE (gst_pad_get_parent (pad));
335
336   if (queue->cur_level.bytes > 0) {
337     return gst_caps_copy (queue->negotiated_caps);
338   }
339
340   return gst_pad_proxy_getcaps (pad);
341 }
342
343 static GstPadLinkReturn
344 gst_queue_link (GstPad * pad, const GstCaps * caps)
345 {
346   GstQueue *queue;
347   GstPadLinkReturn link_ret;
348
349   queue = GST_QUEUE (gst_pad_get_parent (pad));
350
351   if (queue->cur_level.bytes > 0) {
352     if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
353       return GST_PAD_LINK_OK;
354     }
355     return GST_PAD_LINK_REFUSED;
356   }
357
358   link_ret = gst_pad_proxy_pad_link (pad, caps);
359
360   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
361     /* we store an extra copy of the negotiated caps, just in case
362      * the pads become unnegotiated while we have buffers */
363     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
364   }
365
366   return link_ret;
367 }
368
369 static void
370 gst_queue_locked_flush (GstQueue * queue)
371 {
372   while (!g_queue_is_empty (queue->queue)) {
373     GstData *data = g_queue_pop_head (queue->queue);
374
375     /* First loose the reference we added when putting that data in the queue */
376     gst_data_unref (data);
377     /* Then loose another reference because we are supposed to destroy that
378        data when flushing */
379     gst_data_unref (data);
380   }
381   queue->timeval = NULL;
382   queue->cur_level.buffers = 0;
383   queue->cur_level.bytes = 0;
384   queue->cur_level.time = 0;
385
386   /* make sure any pending buffers to be added are flushed too */
387   queue->flush = TRUE;
388
389   /* we deleted something... */
390   g_cond_signal (queue->item_del);
391 }
392
393 static void
394 gst_queue_handle_pending_events (GstQueue * queue)
395 {
396   /* check for events to send upstream */
397   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
398       "handling pending events, events queue of size %d",
399       g_queue_get_length (queue->events));
400   g_mutex_lock (queue->event_lock);
401   while (!g_queue_is_empty (queue->events)) {
402     GstQueueEventResponse *er;
403
404     er = g_queue_pop_head (queue->events);
405
406     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
407         "sending event %p (%d) from event response %p upstream",
408         er->event, GST_EVENT_TYPE (er->event), er);
409     if (er->handled) {
410       /* change this to an assert when this file gets reviewed properly. */
411       GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
412           ("already handled event %p (%d) from event response %p upstream",
413               er->event, GST_EVENT_TYPE (er->event), er));
414       break;
415     }
416     g_mutex_unlock (queue->event_lock);
417     er->ret = gst_pad_event_default (queue->srcpad, er->event);
418     er->handled = TRUE;
419     g_cond_signal (queue->event_done);
420     g_mutex_lock (queue->event_lock);
421     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
422   }
423   g_mutex_unlock (queue->event_lock);
424 }
425
426 #define STATUS(queue, msg) \
427   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
428                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
429                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
430                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
431                       GST_DEBUG_PAD_NAME (pad), \
432                       queue->cur_level.buffers, \
433                       queue->min_threshold.buffers, \
434                       queue->max_size.buffers, \
435                       queue->cur_level.bytes, \
436                       queue->min_threshold.bytes, \
437                       queue->max_size.bytes, \
438                       queue->cur_level.time, \
439                       queue->min_threshold.time, \
440                       queue->max_size.time, \
441                       queue->queue->length)
442
443 static void
444 gst_queue_chain (GstPad * pad, GstData * data)
445 {
446   GstQueue *queue;
447
448   g_return_if_fail (pad != NULL);
449   g_return_if_fail (GST_IS_PAD (pad));
450   g_return_if_fail (data != NULL);
451
452   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
453
454 restart:
455   /* we have to lock the queue since we span threads */
456   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locking t:%p", g_thread_self ());
457   g_mutex_lock (queue->qlock);
458   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locked t:%p", g_thread_self ());
459
460   gst_queue_handle_pending_events (queue);
461
462   /* assume don't need to flush this buffer when the queue is filled */
463   queue->flush = FALSE;
464
465   if (GST_IS_EVENT (data)) {
466     switch (GST_EVENT_TYPE (data)) {
467       case GST_EVENT_FLUSH:
468         STATUS (queue, "received flush event");
469         gst_queue_locked_flush (queue);
470         STATUS (queue, "after flush");
471         break;
472       case GST_EVENT_EOS:
473         STATUS (queue, "received EOS");
474         break;
475       default:
476         /* we put the event in the queue, we don't have to act ourselves */
477         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
478             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
479         break;
480     }
481   }
482
483   if (GST_IS_BUFFER (data))
484     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
485         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
486
487   /* We make space available if we're "full" according to whatever
488    * the user defined as "full". Note that this only applies to buffers.
489    * We always handle events and they don't count in our statistics. */
490   if (GST_IS_BUFFER (data) &&
491       ((queue->max_size.buffers > 0 &&
492               queue->cur_level.buffers >= queue->max_size.buffers) ||
493           (queue->max_size.bytes > 0 &&
494               queue->cur_level.bytes >= queue->max_size.bytes) ||
495           (queue->max_size.time > 0 &&
496               queue->cur_level.time >= queue->max_size.time))) {
497     g_mutex_unlock (queue->qlock);
498     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
499     g_mutex_lock (queue->qlock);
500
501     /* how are we going to make space for this buffer? */
502     switch (queue->leaky) {
503         /* leak current buffer */
504       case GST_QUEUE_LEAK_UPSTREAM:
505         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
506             "queue is full, leaking buffer on upstream end");
507         /* now we can clean up and exit right away */
508         g_mutex_unlock (queue->qlock);
509         goto out_unref;
510
511         /* leak first buffer in the queue */
512       case GST_QUEUE_LEAK_DOWNSTREAM:{
513         /* this is a bit hacky. We'll manually iterate the list
514          * and find the first buffer from the head on. We'll
515          * unref that and "fix up" the GQueue object... */
516         GList *item;
517         GstData *leak = NULL;
518
519         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
520             "queue is full, leaking buffer on downstream end");
521
522         for (item = queue->queue->head; item != NULL; item = item->next) {
523           if (GST_IS_BUFFER (item->data)) {
524             leak = item->data;
525             break;
526           }
527         }
528
529         /* if we didn't find anything, it means we have no buffers
530          * in here. That cannot happen, since we had >= 1 bufs */
531         g_assert (leak);
532
533         /* Now remove it from the list, fixing up the GQueue
534          * CHECKME: is a queue->head the first or the last item? */
535         item = g_list_delete_link (queue->queue->head, item);
536         queue->queue->head = g_list_first (item);
537         queue->queue->tail = g_list_last (item);
538         queue->queue->length--;
539
540         /* and unref the data at the end. Twice, because we keep a ref
541          * to make things read-only. Also keep our list uptodate. */
542         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
543         queue->cur_level.buffers--;
544         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
545           queue->cur_level.time -= GST_BUFFER_DURATION (data);
546
547         gst_data_unref (data);
548         gst_data_unref (data);
549         break;
550       }
551
552       default:
553         g_warning ("Unknown leaky type, using default");
554         /* fall-through */
555
556         /* don't leak. Instead, wait for space to be available */
557       case GST_QUEUE_NO_LEAK:
558         STATUS (queue, "pre-full wait");
559
560         while ((queue->max_size.buffers > 0 &&
561                 queue->cur_level.buffers >= queue->max_size.buffers) ||
562             (queue->max_size.bytes > 0 &&
563                 queue->cur_level.bytes >= queue->max_size.bytes) ||
564             (queue->max_size.time > 0 &&
565                 queue->cur_level.time >= queue->max_size.time)) {
566           /* if there's a pending state change for this queue
567            * or its manager, switch back to iterator so bottom
568            * half of state change executes */
569           if (queue->interrupt) {
570             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
571             g_mutex_unlock (queue->qlock);
572             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
573                     GST_ELEMENT (queue))) {
574               goto out_unref;
575             }
576             /* if we got here because we were unlocked after a
577              * flush, we don't need to add the buffer to the
578              * queue again */
579             if (queue->flush) {
580               GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
581                   "not adding pending buffer after flush");
582               goto out_unref;
583             }
584             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
585                 "adding pending buffer after interrupt");
586             goto restart;
587           }
588
589           if (GST_STATE (queue) != GST_STATE_PLAYING) {
590             /* this means the other end is shut down. Try to
591              * signal to resolve the error */
592             if (!queue->may_deadlock) {
593               g_mutex_unlock (queue->qlock);
594               gst_data_unref (data);
595               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
596                   ("deadlock found, shutting down source pad elements"));
597               /* we don't go to out_unref here, since we want to
598                * unref the buffer *before* calling GST_ELEMENT_ERROR */
599               return;
600             } else {
601               GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
602                   "%s: waiting for the app to restart "
603                   "source pad elements", GST_ELEMENT_NAME (queue));
604             }
605           }
606
607           /* OK, we've got a serious issue here. Imagine the situation
608            * where the puller (next element) is sending an event here,
609            * so it cannot pull events from the queue, and we cannot
610            * push data further because the queue is 'full' and therefore,
611            * we wait here (and do not handle events): deadlock! to solve
612            * that, we handle pending upstream events here, too. */
613           gst_queue_handle_pending_events (queue);
614
615           STATUS (queue, "waiting for item_del signal");
616           g_cond_wait (queue->item_del, queue->qlock);
617           STATUS (queue, "received item_del signal");
618         }
619
620         STATUS (queue, "post-full wait");
621         g_mutex_unlock (queue->qlock);
622         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
623         g_mutex_lock (queue->qlock);
624         break;
625     }
626   }
627
628   /* put the buffer on the tail of the list. We keep a reference,
629    * so that the data is read-only while in here. There's a good
630    * reason to do so: we have a size and time counter, and any
631    * modification to the content could change any of the two. */
632   gst_data_ref (data);
633   g_queue_push_tail (queue->queue, data);
634
635   /* Note that we only add buffers (not events) to the statistics */
636   if (GST_IS_BUFFER (data)) {
637     queue->cur_level.buffers++;
638     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
639     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
640       queue->cur_level.time += GST_BUFFER_DURATION (data);
641   }
642
643   STATUS (queue, "+ level");
644
645   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
646   g_cond_signal (queue->item_add);
647   g_mutex_unlock (queue->qlock);
648
649   return;
650
651 out_unref:
652   gst_data_unref (data);
653   return;
654 }
655
656 static GstData *
657 gst_queue_get (GstPad * pad)
658 {
659   GstQueue *queue;
660   GstData *data;
661
662   g_return_val_if_fail (pad != NULL, NULL);
663   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
664
665   queue = GST_QUEUE (gst_pad_get_parent (pad));
666
667 restart:
668   /* have to lock for thread-safety */
669   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locking t:%p", g_thread_self ());
670   g_mutex_lock (queue->qlock);
671   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "locked t:%p", g_thread_self ());
672
673   if (queue->queue->length == 0 ||
674       (queue->min_threshold.buffers > 0 &&
675           queue->cur_level.buffers < queue->min_threshold.buffers) ||
676       (queue->min_threshold.bytes > 0 &&
677           queue->cur_level.bytes < queue->min_threshold.bytes) ||
678       (queue->min_threshold.time > 0 &&
679           queue->cur_level.time < queue->min_threshold.time)) {
680     g_mutex_unlock (queue->qlock);
681     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
682     g_mutex_lock (queue->qlock);
683
684     STATUS (queue, "pre-empty wait");
685     while (queue->queue->length == 0 ||
686         (queue->min_threshold.buffers > 0 &&
687             queue->cur_level.buffers < queue->min_threshold.buffers) ||
688         (queue->min_threshold.bytes > 0 &&
689             queue->cur_level.bytes < queue->min_threshold.bytes) ||
690         (queue->min_threshold.time > 0 &&
691             queue->cur_level.time < queue->min_threshold.time)) {
692       /* if there's a pending state change for this queue or its
693        * manager, switch back to iterator so bottom half of state
694        * change executes. */
695       if (queue->interrupt) {
696         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
697         g_mutex_unlock (queue->qlock);
698         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
699                 GST_ELEMENT (queue)))
700           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
701         goto restart;
702       }
703       if (GST_STATE (queue) != GST_STATE_PLAYING) {
704         /* this means the other end is shut down */
705         if (!queue->may_deadlock) {
706           g_mutex_unlock (queue->qlock);
707           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
708               ("deadlock found, shutting down sink pad elements"));
709           goto restart;
710         } else {
711           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
712               "%s: waiting for the app to restart "
713               "source pad elements", GST_ELEMENT_NAME (queue));
714         }
715       }
716
717       STATUS (queue, "waiting for item_add");
718
719       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
720         GTimeVal timeout;
721
722         g_get_current_time (&timeout);
723         g_time_val_add (&timeout, queue->block_timeout / 1000);
724         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
725           g_mutex_unlock (queue->qlock);
726           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
727               "Sending filler event");
728           return GST_DATA (gst_event_new_filler ());
729         }
730       } else {
731         g_cond_wait (queue->item_add, queue->qlock);
732       }
733       STATUS (queue, "got item_add signal");
734     }
735
736     STATUS (queue, "post-empty wait");
737     g_mutex_unlock (queue->qlock);
738     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
739     g_mutex_lock (queue->qlock);
740   }
741
742   /* There's something in the list now, whatever it is */
743   data = g_queue_pop_head (queue->queue);
744   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
745       "retrieved data %p from queue", data);
746
747   if (data == NULL)
748     return NULL;
749
750   if (GST_IS_BUFFER (data)) {
751     /* Update statistics */
752     queue->cur_level.buffers--;
753     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
754     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
755       queue->cur_level.time -= GST_BUFFER_DURATION (data);
756   }
757
758   /* Now that we're done, we can lose our own reference to
759    * the item, since we're no longer in danger. */
760   gst_data_unref (data);
761
762   STATUS (queue, "after _get()");
763
764   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
765   g_cond_signal (queue->item_del);
766   g_mutex_unlock (queue->qlock);
767
768   /* FIXME: I suppose this needs to be locked, since the EOS
769    * bit affects the pipeline state. However, that bit is
770    * locked too so it'd cause a deadlock. */
771   if (GST_IS_EVENT (data)) {
772     GstEvent *event = GST_EVENT (data);
773
774     switch (GST_EVENT_TYPE (event)) {
775       case GST_EVENT_EOS:
776         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
777             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
778         gst_element_set_eos (GST_ELEMENT (queue));
779         break;
780       default:
781         break;
782     }
783   }
784
785   return data;
786 }
787
788
789 static gboolean
790 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
791 {
792   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
793   gboolean res;
794
795   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
796       event, GST_EVENT_TYPE (event));
797   g_mutex_lock (queue->qlock);
798
799   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
800     GstQueueEventResponse er;
801
802     /* push the event to the queue and wait for upstream consumption */
803     er.event = event;
804     er.handled = FALSE;
805     g_mutex_lock (queue->event_lock);
806     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
807         "putting event %p (%d) on internal queue", event,
808         GST_EVENT_TYPE (event));
809     g_queue_push_tail (queue->events, &er);
810     g_mutex_unlock (queue->event_lock);
811     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
812         "Preparing for loop for event handler");
813     /* see the chain function on why this is here - it prevents a deadlock */
814     g_cond_signal (queue->item_del);
815     while (!er.handled) {
816       GTimeVal timeout;
817
818       g_get_current_time (&timeout);
819       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
820       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
821           !er.handled) {
822         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
823             "timeout in upstream event handling, dropping event %p (%s)",
824             er.event, GST_EVENT_TYPE (er.event));
825         g_mutex_lock (queue->event_lock);
826         /* since this queue is for src events (ie upstream), this thread is
827          * the only one that is pushing stuff on it, so we're sure that
828          * it's still the tail element.  FIXME: But in practice, we should use
829          * GList instead of GQueue for this so we can remove any element in
830          * the list. */
831         g_queue_pop_tail (queue->events);
832         g_mutex_unlock (queue->event_lock);
833         gst_event_unref (er.event);
834         res = FALSE;
835         goto handled;
836       }
837     }
838     GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
839     res = er.ret;
840   } else {
841     res = gst_pad_event_default (pad, event);
842
843     switch (GST_EVENT_TYPE (event)) {
844       case GST_EVENT_FLUSH:
845         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
846             "FLUSH event, flushing queue\n");
847         gst_queue_locked_flush (queue);
848         break;
849       case GST_EVENT_SEEK:
850         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
851           gst_queue_locked_flush (queue);
852         }
853       default:
854         break;
855     }
856   }
857 handled:
858   g_mutex_unlock (queue->qlock);
859
860   return res;
861 }
862
863 static gboolean
864 gst_queue_release_locks (GstElement * element)
865 {
866   GstQueue *queue;
867
868   queue = GST_QUEUE (element);
869
870   g_mutex_lock (queue->qlock);
871   queue->interrupt = TRUE;
872   g_cond_signal (queue->item_add);
873   g_cond_signal (queue->item_del);
874   g_mutex_unlock (queue->qlock);
875
876   return TRUE;
877 }
878
879 static GstElementStateReturn
880 gst_queue_change_state (GstElement * element)
881 {
882   GstQueue *queue;
883   GstElementStateReturn ret = GST_STATE_SUCCESS;
884
885   queue = GST_QUEUE (element);
886
887   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
888
889   /* lock the queue so another thread (not in sync with this thread's state)
890    * can't call this queue's _get (or whatever)
891    */
892   g_mutex_lock (queue->qlock);
893
894   switch (GST_STATE_TRANSITION (element)) {
895     case GST_STATE_NULL_TO_READY:
896       gst_queue_locked_flush (queue);
897       break;
898     case GST_STATE_PAUSED_TO_PLAYING:
899       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
900         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
901             "queue %s is not linked", GST_ELEMENT_NAME (queue));
902         /* FIXME can this be? */
903         g_cond_signal (queue->item_add);
904
905         ret = GST_STATE_FAILURE;
906         goto error;
907       } else {
908         GstScheduler *src_sched, *sink_sched;
909
910         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
911         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
912
913         if (src_sched == sink_sched) {
914           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
915               "queue %s does not connect different schedulers",
916               GST_ELEMENT_NAME (queue));
917
918           g_warning ("queue %s does not connect different schedulers",
919               GST_ELEMENT_NAME (queue));
920
921           ret = GST_STATE_FAILURE;
922           goto error;
923         }
924       }
925       queue->interrupt = FALSE;
926       break;
927     case GST_STATE_PAUSED_TO_READY:
928       gst_queue_locked_flush (queue);
929       gst_caps_replace (&queue->negotiated_caps, NULL);
930       break;
931     default:
932       break;
933   }
934
935   if (GST_ELEMENT_CLASS (parent_class)->change_state)
936     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
937
938   /* this is an ugly hack to make sure our pads are always active.
939    * Reason for this is that pad activation for the queue element
940    * depends on 2 schedulers (ugh) */
941   gst_pad_set_active (queue->sinkpad, TRUE);
942   gst_pad_set_active (queue->srcpad, TRUE);
943
944 error:
945   g_mutex_unlock (queue->qlock);
946
947   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
948
949   return ret;
950 }
951
952
953 static void
954 gst_queue_set_property (GObject * object,
955     guint prop_id, const GValue * value, GParamSpec * pspec)
956 {
957   GstQueue *queue = GST_QUEUE (object);
958
959   /* someone could change levels here, and since this
960    * affects the get/put funcs, we need to lock for safety. */
961   g_mutex_lock (queue->qlock);
962
963   switch (prop_id) {
964     case ARG_MAX_SIZE_BYTES:
965       queue->max_size.bytes = g_value_get_uint (value);
966       break;
967     case ARG_MAX_SIZE_BUFFERS:
968       queue->max_size.buffers = g_value_get_uint (value);
969       break;
970     case ARG_MAX_SIZE_TIME:
971       queue->max_size.time = g_value_get_uint64 (value);
972       break;
973     case ARG_MIN_THRESHOLD_BYTES:
974       queue->min_threshold.bytes = g_value_get_uint (value);
975       break;
976     case ARG_MIN_THRESHOLD_BUFFERS:
977       queue->min_threshold.buffers = g_value_get_uint (value);
978       break;
979     case ARG_MIN_THRESHOLD_TIME:
980       queue->min_threshold.time = g_value_get_uint64 (value);
981       break;
982     case ARG_LEAKY:
983       queue->leaky = g_value_get_enum (value);
984       break;
985     case ARG_MAY_DEADLOCK:
986       queue->may_deadlock = g_value_get_boolean (value);
987       break;
988     case ARG_BLOCK_TIMEOUT:
989       queue->block_timeout = g_value_get_uint64 (value);
990       break;
991     default:
992       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
993       break;
994   }
995
996   g_mutex_unlock (queue->qlock);
997 }
998
999 static void
1000 gst_queue_get_property (GObject * object,
1001     guint prop_id, GValue * value, GParamSpec * pspec)
1002 {
1003   GstQueue *queue = GST_QUEUE (object);
1004
1005   switch (prop_id) {
1006     case ARG_CUR_LEVEL_BYTES:
1007       g_value_set_uint (value, queue->cur_level.bytes);
1008       break;
1009     case ARG_CUR_LEVEL_BUFFERS:
1010       g_value_set_uint (value, queue->cur_level.buffers);
1011       break;
1012     case ARG_CUR_LEVEL_TIME:
1013       g_value_set_uint64 (value, queue->cur_level.time);
1014       break;
1015     case ARG_MAX_SIZE_BYTES:
1016       g_value_set_uint (value, queue->max_size.bytes);
1017       break;
1018     case ARG_MAX_SIZE_BUFFERS:
1019       g_value_set_uint (value, queue->max_size.buffers);
1020       break;
1021     case ARG_MAX_SIZE_TIME:
1022       g_value_set_uint64 (value, queue->max_size.time);
1023       break;
1024     case ARG_MIN_THRESHOLD_BYTES:
1025       g_value_set_uint (value, queue->min_threshold.bytes);
1026       break;
1027     case ARG_MIN_THRESHOLD_BUFFERS:
1028       g_value_set_uint (value, queue->min_threshold.buffers);
1029       break;
1030     case ARG_MIN_THRESHOLD_TIME:
1031       g_value_set_uint64 (value, queue->min_threshold.time);
1032       break;
1033     case ARG_LEAKY:
1034       g_value_set_enum (value, queue->leaky);
1035       break;
1036     case ARG_MAY_DEADLOCK:
1037       g_value_set_boolean (value, queue->may_deadlock);
1038       break;
1039     case ARG_BLOCK_TIMEOUT:
1040       g_value_set_uint64 (value, queue->block_timeout);
1041       break;
1042     default:
1043       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1044       break;
1045   }
1046 }