gst-indent run on core
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
34     "Generic",
35     "Simple data queue",
36     "Erik Walthinsen <omega@cse.ogi.edu>");
37
38
39 /* Queue signals and args */
40 enum
41 {
42   SIGNAL_UNDERRUN,
43   SIGNAL_RUNNING,
44   SIGNAL_OVERRUN,
45   LAST_SIGNAL
46 };
47
48 enum
49 {
50   ARG_0,
51   /* FIXME: don't we have another way of doing this
52    * "Gstreamer format" (frame/byte/time) queries? */
53   ARG_CUR_LEVEL_BUFFERS,
54   ARG_CUR_LEVEL_BYTES,
55   ARG_CUR_LEVEL_TIME,
56   ARG_MAX_SIZE_BUFFERS,
57   ARG_MAX_SIZE_BYTES,
58   ARG_MAX_SIZE_TIME,
59   ARG_MIN_THRESHOLD_BUFFERS,
60   ARG_MIN_THRESHOLD_BYTES,
61   ARG_MIN_THRESHOLD_TIME,
62   ARG_LEAKY,
63   ARG_MAY_DEADLOCK,
64   ARG_BLOCK_TIMEOUT
65       /* FILL ME */
66 };
67
68 typedef struct _GstQueueEventResponse
69 {
70   GstEvent *event;
71   gboolean ret, handled;
72 } GstQueueEventResponse;
73
74 static void gst_queue_base_init (GstQueueClass * klass);
75 static void gst_queue_class_init (GstQueueClass * klass);
76 static void gst_queue_init (GstQueue * queue);
77 static void gst_queue_dispose (GObject * object);
78
79 static void gst_queue_set_property (GObject * object,
80     guint prop_id, const GValue * value, GParamSpec * pspec);
81 static void gst_queue_get_property (GObject * object,
82     guint prop_id, GValue * value, GParamSpec * pspec);
83
84 static void gst_queue_chain (GstPad * pad, GstData * data);
85 static GstData *gst_queue_get (GstPad * pad);
86
87 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
88
89 static GstCaps *gst_queue_getcaps (GstPad * pad);
90 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
91 static void gst_queue_locked_flush (GstQueue * queue);
92
93 static GstElementStateReturn gst_queue_change_state (GstElement * element);
94 static gboolean gst_queue_release_locks (GstElement * element);
95
96
97 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
98
99 static GType
100 queue_leaky_get_type (void)
101 {
102   static GType queue_leaky_type = 0;
103   static GEnumValue queue_leaky[] = {
104     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
105     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
106     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
107     {0, NULL, NULL},
108   };
109   if (!queue_leaky_type) {
110     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
111   }
112   return queue_leaky_type;
113 }
114
115 static GstElementClass *parent_class = NULL;
116 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
117
118 GType
119 gst_queue_get_type (void)
120 {
121   static GType queue_type = 0;
122
123   if (!queue_type) {
124     static const GTypeInfo queue_info = {
125       sizeof (GstQueueClass),
126       (GBaseInitFunc) gst_queue_base_init,
127       NULL,
128       (GClassInitFunc) gst_queue_class_init,
129       NULL,
130       NULL,
131       sizeof (GstQueue),
132       4,
133       (GInstanceInitFunc) gst_queue_init,
134       NULL
135     };
136
137     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
138         "GstQueue", &queue_info, 0);
139   }
140
141   return queue_type;
142 }
143
144 static void
145 gst_queue_base_init (GstQueueClass * klass)
146 {
147   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
148
149   gst_element_class_set_details (gstelement_class, &gst_queue_details);
150 }
151
152 static void
153 gst_queue_class_init (GstQueueClass * klass)
154 {
155   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
156   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
157
158   parent_class = g_type_class_peek_parent (klass);
159
160   /* signals */
161   gst_queue_signals[SIGNAL_UNDERRUN] =
162       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
163       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
164       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
165   gst_queue_signals[SIGNAL_RUNNING] =
166       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
167       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
168       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
169   gst_queue_signals[SIGNAL_OVERRUN] =
170       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
171       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
172       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
173
174   /* properties */
175   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
176       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
177           "Current amount of data in the queue (bytes)",
178           0, G_MAXUINT, 0, G_PARAM_READABLE));
179   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
180       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
181           "Current number of buffers in the queue",
182           0, G_MAXUINT, 0, G_PARAM_READABLE));
183   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
184       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
185           "Current amount of data in the queue (in ns)",
186           0, G_MAXUINT64, 0, G_PARAM_READABLE));
187
188   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
189       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
190           "Max. amount of data in the queue (bytes, 0=disable)",
191           0, G_MAXUINT, 0, G_PARAM_READWRITE));
192   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
193       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
194           "Max. number of buffers in the queue (0=disable)",
195           0, G_MAXUINT, 0, G_PARAM_READWRITE));
196   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
197       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
198           "Max. amount of data in the queue (in ns, 0=disable)",
199           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
200
201   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
202       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
203           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
204           0, G_MAXUINT, 0, G_PARAM_READWRITE));
205   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
206       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
207           "Min. number of buffers in the queue to allow reading (0=disable)",
208           0, G_MAXUINT, 0, G_PARAM_READWRITE));
209   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
210       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
211           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
212           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
213
214   g_object_class_install_property (gobject_class, ARG_LEAKY,
215       g_param_spec_enum ("leaky", "Leaky",
216           "Where the queue leaks, if at all",
217           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
218   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
219       g_param_spec_boolean ("may_deadlock", "May Deadlock",
220           "The queue may deadlock if it's full and not PLAYING",
221           TRUE, G_PARAM_READWRITE));
222   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
223       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
224           "Nanoseconds until blocked queue times out and returns filler event. "
225           "Value of -1 disables timeout",
226           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
227
228   /* set several parent class virtual functions */
229   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
230   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
231   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
232
233   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
234   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
235 }
236
237 static void
238 gst_queue_init (GstQueue * queue)
239 {
240   /* scheduling on this kind of element is, well, interesting */
241   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
242   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
243
244   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
245   gst_pad_set_chain_function (queue->sinkpad,
246       GST_DEBUG_FUNCPTR (gst_queue_chain));
247   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
248   gst_pad_set_link_function (queue->sinkpad,
249       GST_DEBUG_FUNCPTR (gst_queue_link));
250   gst_pad_set_getcaps_function (queue->sinkpad,
251       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
252   gst_pad_set_active (queue->sinkpad, TRUE);
253
254   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
255   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
256   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
257   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
258   gst_pad_set_getcaps_function (queue->srcpad,
259       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
260   gst_pad_set_event_function (queue->srcpad,
261       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
262   gst_pad_set_active (queue->srcpad, TRUE);
263
264   queue->cur_level.buffers = 0; /* no content */
265   queue->cur_level.bytes = 0;   /* no content */
266   queue->cur_level.time = 0;    /* no content */
267   queue->max_size.buffers = 100;        /* 100 buffers */
268   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
269   queue->max_size.time = GST_SECOND;    /* 1 s. */
270   queue->min_threshold.buffers = 0;     /* no threshold */
271   queue->min_threshold.bytes = 0;       /* no threshold */
272   queue->min_threshold.time = 0;        /* no threshold */
273
274   queue->leaky = GST_QUEUE_NO_LEAK;
275   queue->may_deadlock = TRUE;
276   queue->block_timeout = GST_CLOCK_TIME_NONE;
277   queue->interrupt = FALSE;
278   queue->flush = FALSE;
279
280   queue->qlock = g_mutex_new ();
281   queue->item_add = g_cond_new ();
282   queue->item_del = g_cond_new ();
283   queue->event_done = g_cond_new ();
284   queue->events = g_queue_new ();
285   queue->queue = g_queue_new ();
286
287   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
288       "initialized queue's not_empty & not_full conditions");
289 }
290
291 static void
292 gst_queue_dispose (GObject * object)
293 {
294   GstQueue *queue = GST_QUEUE (object);
295
296   gst_element_set_state (GST_ELEMENT (queue), GST_STATE_NULL);
297
298   while (!g_queue_is_empty (queue->queue)) {
299     GstData *data = g_queue_pop_head (queue->queue);
300
301     gst_data_unref (data);
302   }
303   g_queue_free (queue->queue);
304   g_mutex_free (queue->qlock);
305   g_cond_free (queue->item_add);
306   g_cond_free (queue->item_del);
307   g_cond_free (queue->event_done);
308   while (!g_queue_is_empty (queue->events)) {
309     GstEvent *event = g_queue_pop_head (queue->events);
310
311     gst_event_unref (event);
312   }
313
314   if (G_OBJECT_CLASS (parent_class)->dispose)
315     G_OBJECT_CLASS (parent_class)->dispose (object);
316 }
317
318 static GstCaps *
319 gst_queue_getcaps (GstPad * pad)
320 {
321   GstQueue *queue;
322
323   queue = GST_QUEUE (gst_pad_get_parent (pad));
324
325   if (queue->cur_level.bytes > 0) {
326     return gst_caps_copy (queue->negotiated_caps);
327   }
328
329   return gst_pad_proxy_getcaps (pad);
330 }
331
332 static GstPadLinkReturn
333 gst_queue_link (GstPad * pad, const GstCaps * caps)
334 {
335   GstQueue *queue;
336   GstPadLinkReturn link_ret;
337
338   queue = GST_QUEUE (gst_pad_get_parent (pad));
339
340   if (queue->cur_level.bytes > 0) {
341     if (gst_caps_is_equal_fixed (caps, queue->negotiated_caps)) {
342       return GST_PAD_LINK_OK;
343     }
344     return GST_PAD_LINK_REFUSED;
345   }
346
347   link_ret = gst_pad_proxy_pad_link (pad, caps);
348
349   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
350     /* we store an extra copy of the negotiated caps, just in case
351      * the pads become unnegotiated while we have buffers */
352     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
353   }
354
355   return link_ret;
356 }
357
358 static void
359 gst_queue_locked_flush (GstQueue * queue)
360 {
361   while (!g_queue_is_empty (queue->queue)) {
362     GstData *data = g_queue_pop_head (queue->queue);
363
364     /* First loose the reference we added when putting that data in the queue */
365     gst_data_unref (data);
366     /* Then loose another reference because we are supposed to destroy that
367        data when flushing */
368     gst_data_unref (data);
369   }
370   queue->timeval = NULL;
371   queue->cur_level.buffers = 0;
372   queue->cur_level.bytes = 0;
373   queue->cur_level.time = 0;
374
375   /* make sure any pending buffers to be added are flushed too */
376   queue->flush = TRUE;
377
378   /* we deleted something... */
379   g_cond_signal (queue->item_del);
380 }
381
382 static void
383 gst_queue_handle_pending_events (GstQueue * queue)
384 {
385   /* check for events to send upstream */
386   while (!g_queue_is_empty (queue->events)) {
387     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
388
389     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "sending event upstream");
390     er->ret = gst_pad_event_default (queue->srcpad, er->event);
391     er->handled = TRUE;
392     g_cond_signal (queue->event_done);
393     GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent");
394   }
395 }
396
397 #define STATUS(queue, msg) \
398   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, \
399                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
400                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
401                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
402                       GST_DEBUG_PAD_NAME (pad), \
403                       queue->cur_level.buffers, \
404                       queue->min_threshold.buffers, \
405                       queue->max_size.buffers, \
406                       queue->cur_level.bytes, \
407                       queue->min_threshold.bytes, \
408                       queue->max_size.bytes, \
409                       queue->cur_level.time, \
410                       queue->min_threshold.time, \
411                       queue->max_size.time, \
412                       queue->queue->length)
413
414 static void
415 gst_queue_chain (GstPad * pad, GstData * data)
416 {
417   GstQueue *queue;
418
419   g_return_if_fail (pad != NULL);
420   g_return_if_fail (GST_IS_PAD (pad));
421   g_return_if_fail (data != NULL);
422
423   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
424
425 restart:
426   /* we have to lock the queue since we span threads */
427   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p",
428       g_thread_self ());
429   g_mutex_lock (queue->qlock);
430   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
431
432   gst_queue_handle_pending_events (queue);
433
434   /* assume don't need to flush this buffer when the queue is filled */
435   queue->flush = FALSE;
436
437   if (GST_IS_EVENT (data)) {
438     switch (GST_EVENT_TYPE (data)) {
439       case GST_EVENT_FLUSH:
440         STATUS (queue, "received flush event");
441         gst_queue_locked_flush (queue);
442         STATUS (queue, "after flush");
443         break;
444       case GST_EVENT_EOS:
445         STATUS (queue, "received EOS");
446         break;
447       default:
448         /* we put the event in the queue, we don't have to act ourselves */
449         GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
450             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
451         break;
452     }
453   }
454
455   if (GST_IS_BUFFER (data))
456     GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
457         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
458
459   /* We make space available if we're "full" according to whatever
460    * the user defined as "full". Note that this only applies to buffers.
461    * We always handle events and they don't count in our statistics. */
462   if (GST_IS_BUFFER (data) &&
463       ((queue->max_size.buffers > 0 &&
464               queue->cur_level.buffers >= queue->max_size.buffers) ||
465           (queue->max_size.bytes > 0 &&
466               queue->cur_level.bytes >= queue->max_size.bytes) ||
467           (queue->max_size.time > 0 &&
468               queue->cur_level.time >= queue->max_size.time))) {
469     g_mutex_unlock (queue->qlock);
470     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
471     g_mutex_lock (queue->qlock);
472
473     /* how are we going to make space for this buffer? */
474     switch (queue->leaky) {
475         /* leak current buffer */
476       case GST_QUEUE_LEAK_UPSTREAM:
477         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
478             "queue is full, leaking buffer on upstream end");
479         /* now we can clean up and exit right away */
480         g_mutex_unlock (queue->qlock);
481         goto out_unref;
482
483         /* leak first buffer in the queue */
484       case GST_QUEUE_LEAK_DOWNSTREAM:{
485         /* this is a bit hacky. We'll manually iterate the list
486          * and find the first buffer from the head on. We'll
487          * unref that and "fix up" the GQueue object... */
488         GList *item;
489         GstData *leak = NULL;
490
491         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
492             "queue is full, leaking buffer on downstream end");
493
494         for (item = queue->queue->head; item != NULL; item = item->next) {
495           if (GST_IS_BUFFER (item->data)) {
496             leak = item->data;
497             break;
498           }
499         }
500
501         /* if we didn't find anything, it means we have no buffers
502          * in here. That cannot happen, since we had >= 1 bufs */
503         g_assert (leak);
504
505         /* Now remove it from the list, fixing up the GQueue
506          * CHECKME: is a queue->head the first or the last item? */
507         item = g_list_delete_link (queue->queue->head, item);
508         queue->queue->head = g_list_first (item);
509         queue->queue->tail = g_list_last (item);
510         queue->queue->length--;
511
512         /* and unref the data at the end. Twice, because we keep a ref
513          * to make things read-only. Also keep our list uptodate. */
514         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
515         queue->cur_level.buffers--;
516         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
517           queue->cur_level.time -= GST_BUFFER_DURATION (data);
518
519         gst_data_unref (data);
520         gst_data_unref (data);
521         break;
522       }
523
524       default:
525         g_warning ("Unknown leaky type, using default");
526         /* fall-through */
527
528         /* don't leak. Instead, wait for space to be available */
529       case GST_QUEUE_NO_LEAK:
530         STATUS (queue, "pre-full wait");
531
532         while ((queue->max_size.buffers > 0 &&
533                 queue->cur_level.buffers >= queue->max_size.buffers) ||
534             (queue->max_size.bytes > 0 &&
535                 queue->cur_level.bytes >= queue->max_size.bytes) ||
536             (queue->max_size.time > 0 &&
537                 queue->cur_level.time >= queue->max_size.time)) {
538           /* if there's a pending state change for this queue
539            * or its manager, switch back to iterator so bottom
540            * half of state change executes */
541           if (queue->interrupt) {
542             GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
543             g_mutex_unlock (queue->qlock);
544             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
545                     GST_ELEMENT (queue))) {
546               goto out_unref;
547             }
548             /* if we got here because we were unlocked after a
549              * flush, we don't need to add the buffer to the
550              * queue again */
551             if (queue->flush) {
552               GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
553                   "not adding pending buffer after flush");
554               goto out_unref;
555             }
556             GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
557                 "adding pending buffer after interrupt");
558             goto restart;
559           }
560
561           if (GST_STATE (queue) != GST_STATE_PLAYING) {
562             /* this means the other end is shut down. Try to
563              * signal to resolve the error */
564             if (!queue->may_deadlock) {
565               g_mutex_unlock (queue->qlock);
566               gst_data_unref (data);
567               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
568                   ("deadlock found, shutting down source pad elements"));
569               /* we don't go to out_unref here, since we want to
570                * unref the buffer *before* calling GST_ELEMENT_ERROR */
571               return;
572             } else {
573               GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
574                   "%s: waiting for the app to restart "
575                   "source pad elements", GST_ELEMENT_NAME (queue));
576             }
577           }
578
579           /* OK, we've got a serious issue here. Imagine the situation
580            * where the puller (next element) is sending an event here,
581            * so it cannot pull events from the queue, and we cannot
582            * push data further because the queue is 'full' and therefore,
583            * we wait here (and do not handle events): deadlock! to solve
584            * that, we handle pending upstream events here, too. */
585           gst_queue_handle_pending_events (queue);
586
587           STATUS (queue, "waiting for item_del signal");
588           g_cond_wait (queue->item_del, queue->qlock);
589           STATUS (queue, "received item_del signal");
590         }
591
592         STATUS (queue, "post-full wait");
593         g_mutex_unlock (queue->qlock);
594         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
595         g_mutex_lock (queue->qlock);
596         break;
597     }
598   }
599
600   /* put the buffer on the tail of the list. We keep a reference,
601    * so that the data is read-only while in here. There's a good
602    * reason to do so: we have a size and time counter, and any
603    * modification to the content could change any of the two. */
604   gst_data_ref (data);
605   g_queue_push_tail (queue->queue, data);
606
607   /* Note that we only add buffers (not events) to the statistics */
608   if (GST_IS_BUFFER (data)) {
609     queue->cur_level.buffers++;
610     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
611     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
612       queue->cur_level.time += GST_BUFFER_DURATION (data);
613   }
614
615   STATUS (queue, "+ level");
616
617   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_add");
618   g_cond_signal (queue->item_add);
619   g_mutex_unlock (queue->qlock);
620
621   return;
622
623 out_unref:
624   gst_data_unref (data);
625   return;
626 }
627
628 static GstData *
629 gst_queue_get (GstPad * pad)
630 {
631   GstQueue *queue;
632   GstData *data;
633
634   g_return_val_if_fail (pad != NULL, NULL);
635   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
636
637   queue = GST_QUEUE (gst_pad_get_parent (pad));
638
639 restart:
640   /* have to lock for thread-safety */
641   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
642       "locking t:%p", g_thread_self ());
643   g_mutex_lock (queue->qlock);
644   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
645
646   if (queue->queue->length == 0 ||
647       (queue->min_threshold.buffers > 0 &&
648           queue->cur_level.buffers < queue->min_threshold.buffers) ||
649       (queue->min_threshold.bytes > 0 &&
650           queue->cur_level.bytes < queue->min_threshold.bytes) ||
651       (queue->min_threshold.time > 0 &&
652           queue->cur_level.time < queue->min_threshold.time)) {
653     g_mutex_unlock (queue->qlock);
654     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
655     g_mutex_lock (queue->qlock);
656
657     STATUS (queue, "pre-empty wait");
658     while (queue->queue->length == 0 ||
659         (queue->min_threshold.buffers > 0 &&
660             queue->cur_level.buffers < queue->min_threshold.buffers) ||
661         (queue->min_threshold.bytes > 0 &&
662             queue->cur_level.bytes < queue->min_threshold.bytes) ||
663         (queue->min_threshold.time > 0 &&
664             queue->cur_level.time < queue->min_threshold.time)) {
665       /* if there's a pending state change for this queue or its
666        * manager, switch back to iterator so bottom half of state
667        * change executes. */
668       if (queue->interrupt) {
669         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted");
670         g_mutex_unlock (queue->qlock);
671         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
672                 GST_ELEMENT (queue)))
673           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
674         goto restart;
675       }
676       if (GST_STATE (queue) != GST_STATE_PLAYING) {
677         /* this means the other end is shut down */
678         if (!queue->may_deadlock) {
679           g_mutex_unlock (queue->qlock);
680           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
681               ("deadlock found, shutting down sink pad elements"));
682           goto restart;
683         } else {
684           GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
685               "%s: waiting for the app to restart "
686               "source pad elements", GST_ELEMENT_NAME (queue));
687         }
688       }
689
690       STATUS (queue, "waiting for item_add");
691
692       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
693         GTimeVal timeout;
694
695         g_get_current_time (&timeout);
696         g_time_val_add (&timeout, queue->block_timeout / 1000);
697         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
698           g_mutex_unlock (queue->qlock);
699           GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
700               "Sending filler event");
701           return GST_DATA (gst_event_new_filler ());
702         }
703       } else {
704         g_cond_wait (queue->item_add, queue->qlock);
705       }
706       STATUS (queue, "got item_add signal");
707     }
708
709     STATUS (queue, "post-empty wait");
710     g_mutex_unlock (queue->qlock);
711     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
712     g_mutex_lock (queue->qlock);
713   }
714
715   /* There's something in the list now, whatever it is */
716   data = g_queue_pop_head (queue->queue);
717   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
718       "retrieved data %p from queue", data);
719
720   if (data == NULL)
721     return NULL;
722
723   if (GST_IS_BUFFER (data)) {
724     /* Update statistics */
725     queue->cur_level.buffers--;
726     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
727     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
728       queue->cur_level.time -= GST_BUFFER_DURATION (data);
729   }
730
731   /* Now that we're done, we can lose our own reference to
732    * the item, since we're no longer in danger. */
733   gst_data_unref (data);
734
735   STATUS (queue, "after _get()");
736
737   GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "signalling item_del");
738   g_cond_signal (queue->item_del);
739   g_mutex_unlock (queue->qlock);
740
741   /* FIXME: I suppose this needs to be locked, since the EOS
742    * bit affects the pipeline state. However, that bit is
743    * locked too so it'd cause a deadlock. */
744   if (GST_IS_EVENT (data)) {
745     GstEvent *event = GST_EVENT (data);
746
747     switch (GST_EVENT_TYPE (event)) {
748       case GST_EVENT_EOS:
749         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
750             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
751         gst_element_set_eos (GST_ELEMENT (queue));
752         break;
753       default:
754         break;
755     }
756   }
757
758   return data;
759 }
760
761
762 static gboolean
763 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
764 {
765   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
766   gboolean res;
767
768   g_mutex_lock (queue->qlock);
769
770   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
771     GstQueueEventResponse er;
772
773     /* push the event to the queue and wait for upstream consumption */
774     er.event = event;
775     er.handled = FALSE;
776     g_queue_push_tail (queue->events, &er);
777     GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
778         "Preparing for loop for event handler");
779     /* see the chain function on why this is here - it prevents a deadlock */
780     g_cond_signal (queue->item_del);
781     while (!er.handled) {
782       GTimeVal timeout;
783
784       g_get_current_time (&timeout);
785       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
786       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
787           !er.handled) {
788         GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue,
789             "timeout in upstream event handling");
790         /* remove ourselves from the pending list. Since we're
791          * locked, others cannot reference this anymore. */
792         queue->queue->head = g_list_remove (queue->queue->head, &er);
793         queue->queue->head = g_list_first (queue->queue->head);
794         queue->queue->tail = g_list_last (queue->queue->head);
795         queue->queue->length--;
796         res = FALSE;
797         goto handled;
798       }
799     }
800     GST_CAT_WARNING_OBJECT (GST_CAT_DATAFLOW, queue, "Event handled");
801     res = er.ret;
802   } else {
803     res = gst_pad_event_default (pad, event);
804
805     switch (GST_EVENT_TYPE (event)) {
806       case GST_EVENT_FLUSH:
807         GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue,
808             "FLUSH event, flushing queue\n");
809         gst_queue_locked_flush (queue);
810         break;
811       case GST_EVENT_SEEK:
812         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
813           gst_queue_locked_flush (queue);
814         }
815       default:
816         break;
817     }
818   }
819 handled:
820   g_mutex_unlock (queue->qlock);
821
822   return res;
823 }
824
825 static gboolean
826 gst_queue_release_locks (GstElement * element)
827 {
828   GstQueue *queue;
829
830   queue = GST_QUEUE (element);
831
832   g_mutex_lock (queue->qlock);
833   queue->interrupt = TRUE;
834   g_cond_signal (queue->item_add);
835   g_cond_signal (queue->item_del);
836   g_mutex_unlock (queue->qlock);
837
838   return TRUE;
839 }
840
841 static GstElementStateReturn
842 gst_queue_change_state (GstElement * element)
843 {
844   GstQueue *queue;
845   GstElementStateReturn ret = GST_STATE_SUCCESS;
846
847   queue = GST_QUEUE (element);
848
849   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
850
851   /* lock the queue so another thread (not in sync with this thread's state)
852    * can't call this queue's _get (or whatever)
853    */
854   g_mutex_lock (queue->qlock);
855
856   switch (GST_STATE_TRANSITION (element)) {
857     case GST_STATE_NULL_TO_READY:
858       gst_queue_locked_flush (queue);
859       break;
860     case GST_STATE_PAUSED_TO_PLAYING:
861       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
862         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
863             "queue %s is not linked", GST_ELEMENT_NAME (queue));
864         /* FIXME can this be? */
865         g_cond_signal (queue->item_add);
866
867         ret = GST_STATE_FAILURE;
868         goto error;
869       } else {
870         GstScheduler *src_sched, *sink_sched;
871
872         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
873         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
874
875         if (src_sched == sink_sched) {
876           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
877               "queue %s does not connect different schedulers",
878               GST_ELEMENT_NAME (queue));
879
880           g_warning ("queue %s does not connect different schedulers",
881               GST_ELEMENT_NAME (queue));
882
883           ret = GST_STATE_FAILURE;
884           goto error;
885         }
886       }
887       queue->interrupt = FALSE;
888       break;
889     case GST_STATE_PAUSED_TO_READY:
890       gst_queue_locked_flush (queue);
891       break;
892     default:
893       break;
894   }
895
896   if (GST_ELEMENT_CLASS (parent_class)->change_state)
897     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
898
899   /* this is an ugly hack to make sure our pads are always active.
900    * Reason for this is that pad activation for the queue element
901    * depends on 2 schedulers (ugh) */
902   gst_pad_set_active (queue->sinkpad, TRUE);
903   gst_pad_set_active (queue->srcpad, TRUE);
904
905 error:
906   g_mutex_unlock (queue->qlock);
907
908   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
909
910   return ret;
911 }
912
913
914 static void
915 gst_queue_set_property (GObject * object,
916     guint prop_id, const GValue * value, GParamSpec * pspec)
917 {
918   GstQueue *queue = GST_QUEUE (object);
919
920   /* someone could change levels here, and since this
921    * affects the get/put funcs, we need to lock for safety. */
922   g_mutex_lock (queue->qlock);
923
924   switch (prop_id) {
925     case ARG_MAX_SIZE_BYTES:
926       queue->max_size.bytes = g_value_get_uint (value);
927       break;
928     case ARG_MAX_SIZE_BUFFERS:
929       queue->max_size.buffers = g_value_get_uint (value);
930       break;
931     case ARG_MAX_SIZE_TIME:
932       queue->max_size.time = g_value_get_uint64 (value);
933       break;
934     case ARG_MIN_THRESHOLD_BYTES:
935       queue->min_threshold.bytes = g_value_get_uint (value);
936       break;
937     case ARG_MIN_THRESHOLD_BUFFERS:
938       queue->min_threshold.buffers = g_value_get_uint (value);
939       break;
940     case ARG_MIN_THRESHOLD_TIME:
941       queue->min_threshold.time = g_value_get_uint64 (value);
942       break;
943     case ARG_LEAKY:
944       queue->leaky = g_value_get_enum (value);
945       break;
946     case ARG_MAY_DEADLOCK:
947       queue->may_deadlock = g_value_get_boolean (value);
948       break;
949     case ARG_BLOCK_TIMEOUT:
950       queue->block_timeout = g_value_get_uint64 (value);
951       break;
952     default:
953       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
954       break;
955   }
956
957   g_mutex_unlock (queue->qlock);
958 }
959
960 static void
961 gst_queue_get_property (GObject * object,
962     guint prop_id, GValue * value, GParamSpec * pspec)
963 {
964   GstQueue *queue = GST_QUEUE (object);
965
966   switch (prop_id) {
967     case ARG_CUR_LEVEL_BYTES:
968       g_value_set_uint (value, queue->cur_level.bytes);
969       break;
970     case ARG_CUR_LEVEL_BUFFERS:
971       g_value_set_uint (value, queue->cur_level.buffers);
972       break;
973     case ARG_CUR_LEVEL_TIME:
974       g_value_set_uint64 (value, queue->cur_level.time);
975       break;
976     case ARG_MAX_SIZE_BYTES:
977       g_value_set_uint (value, queue->max_size.bytes);
978       break;
979     case ARG_MAX_SIZE_BUFFERS:
980       g_value_set_uint (value, queue->max_size.buffers);
981       break;
982     case ARG_MAX_SIZE_TIME:
983       g_value_set_uint64 (value, queue->max_size.time);
984       break;
985     case ARG_MIN_THRESHOLD_BYTES:
986       g_value_set_uint (value, queue->min_threshold.bytes);
987       break;
988     case ARG_MIN_THRESHOLD_BUFFERS:
989       g_value_set_uint (value, queue->min_threshold.buffers);
990       break;
991     case ARG_MIN_THRESHOLD_TIME:
992       g_value_set_uint64 (value, queue->min_threshold.time);
993       break;
994     case ARG_LEAKY:
995       g_value_set_enum (value, queue->leaky);
996       break;
997     case ARG_MAY_DEADLOCK:
998       g_value_set_boolean (value, queue->may_deadlock);
999       break;
1000     case ARG_BLOCK_TIMEOUT:
1001       g_value_set_uint64 (value, queue->block_timeout);
1002       break;
1003     default:
1004       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1005       break;
1006   }
1007 }