clean up debugging. change to finalize to free all resources
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *
6  * gstqueue.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24
25 #include "gst_private.h"
26
27 #include "gstqueue.h"
28 #include "gstscheduler.h"
29 #include "gstevent.h"
30 #include "gstinfo.h"
31 #include "gsterror.h"
32
33 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
34
35 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
36     "Generic",
37     "Simple data queue",
38     "Erik Walthinsen <omega@cse.ogi.edu>");
39
40
41 /* Queue signals and args */
42 enum
43 {
44   SIGNAL_UNDERRUN,
45   SIGNAL_RUNNING,
46   SIGNAL_OVERRUN,
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   ARG_0,
53   /* FIXME: don't we have another way of doing this
54    * "Gstreamer format" (frame/byte/time) queries? */
55   ARG_CUR_LEVEL_BUFFERS,
56   ARG_CUR_LEVEL_BYTES,
57   ARG_CUR_LEVEL_TIME,
58   ARG_MAX_SIZE_BUFFERS,
59   ARG_MAX_SIZE_BYTES,
60   ARG_MAX_SIZE_TIME,
61   ARG_MIN_THRESHOLD_BUFFERS,
62   ARG_MIN_THRESHOLD_BYTES,
63   ARG_MIN_THRESHOLD_TIME,
64   ARG_LEAKY,
65   ARG_MAY_DEADLOCK,
66   ARG_BLOCK_TIMEOUT
67       /* FILL ME */
68 };
69
70 #define GST_QUEUE_MUTEX_LOCK G_STMT_START {                             \
71   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
72       "locking qlock from thread %p",                                   \
73       g_thread_self ());                                                \
74   g_mutex_lock (queue->qlock);                                          \
75   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
76       "locked qlock from thread %p",                                    \
77       g_thread_self ());                                                \
78 } G_STMT_END
79
80 #define GST_QUEUE_MUTEX_UNLOCK G_STMT_START {                           \
81   GST_CAT_LOG_OBJECT (queue_dataflow, queue,                            \
82       "unlocking qlock from thread %p",                                 \
83       g_thread_self ());                                                \
84   g_mutex_unlock (queue->qlock);                                        \
85 } G_STMT_END
86
87
88 typedef struct _GstQueueEventResponse
89 {
90   GstEvent *event;
91   gboolean ret, handled;
92 }
93 GstQueueEventResponse;
94
95 static void gst_queue_base_init (GstQueueClass * klass);
96 static void gst_queue_class_init (GstQueueClass * klass);
97 static void gst_queue_init (GstQueue * queue);
98 static void gst_queue_finalize (GObject * object);
99
100 static void gst_queue_set_property (GObject * object,
101     guint prop_id, const GValue * value, GParamSpec * pspec);
102 static void gst_queue_get_property (GObject * object,
103     guint prop_id, GValue * value, GParamSpec * pspec);
104
105 static void gst_queue_chain (GstPad * pad, GstData * data);
106 static GstData *gst_queue_get (GstPad * pad);
107
108 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
109
110 static GstCaps *gst_queue_getcaps (GstPad * pad);
111 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
112 static void gst_queue_locked_flush (GstQueue * queue);
113
114 static GstElementStateReturn gst_queue_change_state (GstElement * element);
115 static gboolean gst_queue_release_locks (GstElement * element);
116
117
118 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
119
120 static GType
121 queue_leaky_get_type (void)
122 {
123   static GType queue_leaky_type = 0;
124   static GEnumValue queue_leaky[] = {
125     {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
126     {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
127     {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
128     {0, NULL, NULL},
129   };
130
131   if (!queue_leaky_type) {
132     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
133   }
134   return queue_leaky_type;
135 }
136
137 static GstElementClass *parent_class = NULL;
138 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
139
140 GType
141 gst_queue_get_type (void)
142 {
143   static GType queue_type = 0;
144
145   if (!queue_type) {
146     static const GTypeInfo queue_info = {
147       sizeof (GstQueueClass),
148       (GBaseInitFunc) gst_queue_base_init,
149       NULL,
150       (GClassInitFunc) gst_queue_class_init,
151       NULL,
152       NULL,
153       sizeof (GstQueue),
154       0,
155       (GInstanceInitFunc) gst_queue_init,
156       NULL
157     };
158
159     queue_type = g_type_register_static (GST_TYPE_ELEMENT,
160         "GstQueue", &queue_info, 0);
161     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
162         "dataflow inside the queue element");
163   }
164
165   return queue_type;
166 }
167
168 static void
169 gst_queue_base_init (GstQueueClass * klass)
170 {
171   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
172
173   gst_element_class_set_details (gstelement_class, &gst_queue_details);
174 }
175
176 static void
177 gst_queue_class_init (GstQueueClass * klass)
178 {
179   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
180   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
181
182   parent_class = g_type_class_peek_parent (klass);
183
184   /* signals */
185   gst_queue_signals[SIGNAL_UNDERRUN] =
186       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
187       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
188       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
189   gst_queue_signals[SIGNAL_RUNNING] =
190       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
191       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
192       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
193   gst_queue_signals[SIGNAL_OVERRUN] =
194       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
195       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
196       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
197
198   /* properties */
199   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
200       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
201           "Current amount of data in the queue (bytes)",
202           0, G_MAXUINT, 0, G_PARAM_READABLE));
203   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
204       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
205           "Current number of buffers in the queue",
206           0, G_MAXUINT, 0, G_PARAM_READABLE));
207   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
208       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
209           "Current amount of data in the queue (in ns)",
210           0, G_MAXUINT64, 0, G_PARAM_READABLE));
211
212   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
213       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
214           "Max. amount of data in the queue (bytes, 0=disable)",
215           0, G_MAXUINT, 0, G_PARAM_READWRITE));
216   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
217       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
218           "Max. number of buffers in the queue (0=disable)",
219           0, G_MAXUINT, 0, G_PARAM_READWRITE));
220   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
221       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
222           "Max. amount of data in the queue (in ns, 0=disable)",
223           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
224
225   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
226       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
227           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
228           0, G_MAXUINT, 0, G_PARAM_READWRITE));
229   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
230       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
231           "Min. number of buffers in the queue to allow reading (0=disable)",
232           0, G_MAXUINT, 0, G_PARAM_READWRITE));
233   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
234       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
235           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
236           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
237
238   g_object_class_install_property (gobject_class, ARG_LEAKY,
239       g_param_spec_enum ("leaky", "Leaky",
240           "Where the queue leaks, if at all",
241           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
242   g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
243       g_param_spec_boolean ("may_deadlock", "May Deadlock",
244           "The queue may deadlock if it's full and not PLAYING",
245           TRUE, G_PARAM_READWRITE));
246   g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
247       g_param_spec_uint64 ("block_timeout", "Timeout for Block",
248           "Nanoseconds until blocked queue times out and returns filler event. "
249           "Value of -1 disables timeout",
250           0, G_MAXUINT64, -1, G_PARAM_READWRITE));
251
252   /* set several parent class virtual functions */
253   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
254   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
255   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
256
257   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
258   gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
259 }
260
261 static void
262 gst_queue_init (GstQueue * queue)
263 {
264   /* scheduling on this kind of element is, well, interesting */
265   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
266   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
267
268   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
269   gst_pad_set_chain_function (queue->sinkpad,
270       GST_DEBUG_FUNCPTR (gst_queue_chain));
271   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
272   gst_pad_set_link_function (queue->sinkpad,
273       GST_DEBUG_FUNCPTR (gst_queue_link));
274   gst_pad_set_getcaps_function (queue->sinkpad,
275       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
276   gst_pad_set_active (queue->sinkpad, TRUE);
277
278   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
279   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
280   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
281   gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
282   gst_pad_set_getcaps_function (queue->srcpad,
283       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
284   gst_pad_set_event_function (queue->srcpad,
285       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
286   gst_pad_set_active (queue->srcpad, TRUE);
287
288   queue->cur_level.buffers = 0; /* no content */
289   queue->cur_level.bytes = 0;   /* no content */
290   queue->cur_level.time = 0;    /* no content */
291   queue->max_size.buffers = 100;        /* 100 buffers */
292   queue->max_size.bytes = 10 * 1024 * 1024;     /* 10 MB */
293   queue->max_size.time = GST_SECOND;    /* 1 s. */
294   queue->min_threshold.buffers = 0;     /* no threshold */
295   queue->min_threshold.bytes = 0;       /* no threshold */
296   queue->min_threshold.time = 0;        /* no threshold */
297
298   queue->leaky = GST_QUEUE_NO_LEAK;
299   queue->may_deadlock = TRUE;
300   queue->block_timeout = GST_CLOCK_TIME_NONE;
301   queue->interrupt = FALSE;
302   queue->flush = FALSE;
303
304   queue->qlock = g_mutex_new ();
305   queue->item_add = g_cond_new ();
306   queue->item_del = g_cond_new ();
307   queue->event_done = g_cond_new ();
308   queue->events = g_queue_new ();
309   queue->event_lock = g_mutex_new ();
310   queue->queue = g_queue_new ();
311
312   GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
313       "initialized queue's not_empty & not_full conditions");
314 }
315
316 /* called only once, as opposed to dispose */
317 static void
318 gst_queue_finalize (GObject * object)
319 {
320   GstQueue *queue = GST_QUEUE (object);
321
322   GST_DEBUG_OBJECT (queue, "finalizing queue");
323
324   while (!g_queue_is_empty (queue->queue)) {
325     GstData *data = g_queue_pop_head (queue->queue);
326
327     gst_data_unref (data);
328   }
329   g_queue_free (queue->queue);
330   g_mutex_free (queue->qlock);
331   g_cond_free (queue->item_add);
332   g_cond_free (queue->item_del);
333   g_cond_free (queue->event_done);
334   g_mutex_lock (queue->event_lock);
335   while (!g_queue_is_empty (queue->events)) {
336     GstQueueEventResponse *er = g_queue_pop_head (queue->events);
337
338     gst_event_unref (er->event);
339   }
340   g_mutex_unlock (queue->event_lock);
341   g_mutex_free (queue->event_lock);
342   g_queue_free (queue->events);
343
344   if (G_OBJECT_CLASS (parent_class)->finalize)
345     G_OBJECT_CLASS (parent_class)->finalize (object);
346 }
347
348 static GstCaps *
349 gst_queue_getcaps (GstPad * pad)
350 {
351   GstQueue *queue;
352
353   queue = GST_QUEUE (gst_pad_get_parent (pad));
354
355   if (queue->cur_level.bytes > 0) {
356     return gst_caps_copy (queue->negotiated_caps);
357   }
358
359   return gst_pad_proxy_getcaps (pad);
360 }
361
362 static GstPadLinkReturn
363 gst_queue_link (GstPad * pad, const GstCaps * caps)
364 {
365   GstQueue *queue;
366   GstPadLinkReturn link_ret;
367
368   queue = GST_QUEUE (gst_pad_get_parent (pad));
369
370   if (queue->cur_level.bytes > 0) {
371     if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
372       return GST_PAD_LINK_OK;
373     }
374     return GST_PAD_LINK_REFUSED;
375   }
376
377   link_ret = gst_pad_proxy_pad_link (pad, caps);
378
379   if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
380     /* we store an extra copy of the negotiated caps, just in case
381      * the pads become unnegotiated while we have buffers */
382     gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
383   }
384
385   return link_ret;
386 }
387
388 static void
389 gst_queue_locked_flush (GstQueue * queue)
390 {
391   while (!g_queue_is_empty (queue->queue)) {
392     GstData *data = g_queue_pop_head (queue->queue);
393
394     /* First loose the reference we added when putting that data in the queue */
395     gst_data_unref (data);
396     /* Then loose another reference because we are supposed to destroy that
397        data when flushing */
398     gst_data_unref (data);
399   }
400   queue->timeval = NULL;
401   queue->cur_level.buffers = 0;
402   queue->cur_level.bytes = 0;
403   queue->cur_level.time = 0;
404
405   /* make sure any pending buffers to be added are flushed too */
406   queue->flush = TRUE;
407
408   /* we deleted something... */
409   g_cond_signal (queue->item_del);
410 }
411
412 static void
413 gst_queue_handle_pending_events (GstQueue * queue)
414 {
415   /* check for events to send upstream */
416   /* g_queue_get_length is glib 2.4, so don't depend on it yet, use ->length */
417   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
418       "handling pending events, events queue of size %d",
419       queue->events->length);
420   g_mutex_lock (queue->event_lock);
421   while (!g_queue_is_empty (queue->events)) {
422     GstQueueEventResponse *er;
423
424     er = g_queue_pop_head (queue->events);
425
426     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
427         "sending event %p (%d) from event response %p upstream",
428         er->event, GST_EVENT_TYPE (er->event), er);
429     if (er->handled) {
430       /* change this to an assert when this file gets reviewed properly. */
431       GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
432           ("already handled event %p (%d) from event response %p upstream",
433               er->event, GST_EVENT_TYPE (er->event), er));
434       break;
435     }
436     g_mutex_unlock (queue->event_lock);
437     er->ret = gst_pad_event_default (queue->srcpad, er->event);
438     er->handled = TRUE;
439     g_cond_signal (queue->event_done);
440     g_mutex_lock (queue->event_lock);
441     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
442   }
443   g_mutex_unlock (queue->event_lock);
444 }
445
446 #define STATUS(queue, msg) \
447   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
448                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
449                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
450                       "-%" G_GUINT64_FORMAT " ns, %u elements", \
451                       GST_DEBUG_PAD_NAME (pad), \
452                       queue->cur_level.buffers, \
453                       queue->min_threshold.buffers, \
454                       queue->max_size.buffers, \
455                       queue->cur_level.bytes, \
456                       queue->min_threshold.bytes, \
457                       queue->max_size.bytes, \
458                       queue->cur_level.time, \
459                       queue->min_threshold.time, \
460                       queue->max_size.time, \
461                       queue->queue->length)
462
463 static void
464 gst_queue_chain (GstPad * pad, GstData * data)
465 {
466   GstQueue *queue;
467
468   g_return_if_fail (pad != NULL);
469   g_return_if_fail (GST_IS_PAD (pad));
470   g_return_if_fail (data != NULL);
471
472   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
473
474 restart:
475   /* we have to lock the queue since we span threads */
476   GST_QUEUE_MUTEX_LOCK;
477
478   gst_queue_handle_pending_events (queue);
479
480   /* assume don't need to flush this buffer when the queue is filled */
481   queue->flush = FALSE;
482
483   if (GST_IS_EVENT (data)) {
484     switch (GST_EVENT_TYPE (data)) {
485       case GST_EVENT_FLUSH:
486         STATUS (queue, "received flush event");
487         gst_queue_locked_flush (queue);
488         STATUS (queue, "after flush");
489         break;
490       case GST_EVENT_EOS:
491         STATUS (queue, "received EOS");
492         break;
493       default:
494         /* we put the event in the queue, we don't have to act ourselves */
495         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
496             "adding event %p of type %d", data, GST_EVENT_TYPE (data));
497         break;
498     }
499   }
500
501   if (GST_IS_BUFFER (data))
502     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
503         "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
504
505   /* We make space available if we're "full" according to whatever
506    * the user defined as "full". Note that this only applies to buffers.
507    * We always handle events and they don't count in our statistics. */
508   if (GST_IS_BUFFER (data) &&
509       ((queue->max_size.buffers > 0 &&
510               queue->cur_level.buffers >= queue->max_size.buffers) ||
511           (queue->max_size.bytes > 0 &&
512               queue->cur_level.bytes >= queue->max_size.bytes) ||
513           (queue->max_size.time > 0 &&
514               queue->cur_level.time >= queue->max_size.time))) {
515     GST_QUEUE_MUTEX_UNLOCK;
516     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
517     GST_QUEUE_MUTEX_LOCK;
518
519     /* how are we going to make space for this buffer? */
520     switch (queue->leaky) {
521         /* leak current buffer */
522       case GST_QUEUE_LEAK_UPSTREAM:
523         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
524             "queue is full, leaking buffer on upstream end");
525         /* now we can clean up and exit right away */
526         GST_QUEUE_MUTEX_UNLOCK;
527         goto out_unref;
528
529         /* leak first buffer in the queue */
530       case GST_QUEUE_LEAK_DOWNSTREAM:{
531         /* this is a bit hacky. We'll manually iterate the list
532          * and find the first buffer from the head on. We'll
533          * unref that and "fix up" the GQueue object... */
534         GList *item;
535         GstData *leak = NULL;
536
537         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
538             "queue is full, leaking buffer on downstream end");
539
540         for (item = queue->queue->head; item != NULL; item = item->next) {
541           if (GST_IS_BUFFER (item->data)) {
542             leak = item->data;
543             break;
544           }
545         }
546
547         /* if we didn't find anything, it means we have no buffers
548          * in here. That cannot happen, since we had >= 1 bufs */
549         g_assert (leak);
550
551         /* Now remove it from the list, fixing up the GQueue
552          * CHECKME: is a queue->head the first or the last item? */
553         item = g_list_delete_link (queue->queue->head, item);
554         queue->queue->head = g_list_first (item);
555         queue->queue->tail = g_list_last (item);
556         queue->queue->length--;
557
558         /* and unref the data at the end. Twice, because we keep a ref
559          * to make things read-only. Also keep our list uptodate. */
560         queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
561         queue->cur_level.buffers--;
562         if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
563           queue->cur_level.time -= GST_BUFFER_DURATION (data);
564
565         gst_data_unref (data);
566         gst_data_unref (data);
567         break;
568       }
569
570       default:
571         g_warning ("Unknown leaky type, using default");
572         /* fall-through */
573
574         /* don't leak. Instead, wait for space to be available */
575       case GST_QUEUE_NO_LEAK:
576         STATUS (queue, "pre-full wait");
577
578         while ((queue->max_size.buffers > 0 &&
579                 queue->cur_level.buffers >= queue->max_size.buffers) ||
580             (queue->max_size.bytes > 0 &&
581                 queue->cur_level.bytes >= queue->max_size.bytes) ||
582             (queue->max_size.time > 0 &&
583                 queue->cur_level.time >= queue->max_size.time)) {
584           /* if there's a pending state change for this queue
585            * or its manager, switch back to iterator so bottom
586            * half of state change executes */
587           if (queue->interrupt) {
588             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
589             GST_QUEUE_MUTEX_UNLOCK;
590             if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
591                     GST_ELEMENT (queue))) {
592               goto out_unref;
593             }
594             /* if we got here because we were unlocked after a
595              * flush, we don't need to add the buffer to the
596              * queue again */
597             if (queue->flush) {
598               GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
599                   "not adding pending buffer after flush");
600               goto out_unref;
601             }
602             GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
603                 "adding pending buffer after interrupt");
604             goto restart;
605           }
606
607           if (GST_STATE (queue) != GST_STATE_PLAYING) {
608             /* this means the other end is shut down. Try to
609              * signal to resolve the error */
610             if (!queue->may_deadlock) {
611               GST_QUEUE_MUTEX_UNLOCK;
612               gst_data_unref (data);
613               GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
614                   ("deadlock found, shutting down source pad elements"));
615               /* we don't go to out_unref here, since we want to
616                * unref the buffer *before* calling GST_ELEMENT_ERROR */
617               return;
618             } else {
619               GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
620                   "%s: waiting for the app to restart "
621                   "source pad elements", GST_ELEMENT_NAME (queue));
622             }
623           }
624
625           /* OK, we've got a serious issue here. Imagine the situation
626            * where the puller (next element) is sending an event here,
627            * so it cannot pull events from the queue, and we cannot
628            * push data further because the queue is 'full' and therefore,
629            * we wait here (and do not handle events): deadlock! to solve
630            * that, we handle pending upstream events here, too. */
631           gst_queue_handle_pending_events (queue);
632
633           STATUS (queue, "waiting for item_del signal from thread using qlock");
634           g_cond_wait (queue->item_del, queue->qlock);
635           STATUS (queue, "received item_del signal from thread using qlock");
636         }
637
638         STATUS (queue, "post-full wait");
639         GST_QUEUE_MUTEX_UNLOCK;
640         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
641         GST_QUEUE_MUTEX_LOCK;
642         break;
643     }
644   }
645
646   /* put the buffer on the tail of the list. We keep a reference,
647    * so that the data is read-only while in here. There's a good
648    * reason to do so: we have a size and time counter, and any
649    * modification to the content could change any of the two. */
650   gst_data_ref (data);
651   g_queue_push_tail (queue->queue, data);
652
653   /* Note that we only add buffers (not events) to the statistics */
654   if (GST_IS_BUFFER (data)) {
655     queue->cur_level.buffers++;
656     queue->cur_level.bytes += GST_BUFFER_SIZE (data);
657     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
658       queue->cur_level.time += GST_BUFFER_DURATION (data);
659   }
660
661   STATUS (queue, "+ level");
662
663   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
664   g_cond_signal (queue->item_add);
665   GST_QUEUE_MUTEX_UNLOCK;
666
667   return;
668
669 out_unref:
670   gst_data_unref (data);
671   return;
672 }
673
674 static GstData *
675 gst_queue_get (GstPad * pad)
676 {
677   GstQueue *queue;
678   GstData *data;
679
680   g_return_val_if_fail (pad != NULL, NULL);
681   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
682
683   queue = GST_QUEUE (gst_pad_get_parent (pad));
684
685 restart:
686   /* have to lock for thread-safety */
687   GST_QUEUE_MUTEX_LOCK;
688
689   if (queue->queue->length == 0 ||
690       (queue->min_threshold.buffers > 0 &&
691           queue->cur_level.buffers < queue->min_threshold.buffers) ||
692       (queue->min_threshold.bytes > 0 &&
693           queue->cur_level.bytes < queue->min_threshold.bytes) ||
694       (queue->min_threshold.time > 0 &&
695           queue->cur_level.time < queue->min_threshold.time)) {
696     GST_QUEUE_MUTEX_UNLOCK;
697     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
698     GST_QUEUE_MUTEX_LOCK;
699
700     STATUS (queue, "pre-empty wait");
701     while (queue->queue->length == 0 ||
702         (queue->min_threshold.buffers > 0 &&
703             queue->cur_level.buffers < queue->min_threshold.buffers) ||
704         (queue->min_threshold.bytes > 0 &&
705             queue->cur_level.bytes < queue->min_threshold.bytes) ||
706         (queue->min_threshold.time > 0 &&
707             queue->cur_level.time < queue->min_threshold.time)) {
708       /* if there's a pending state change for this queue or its
709        * manager, switch back to iterator so bottom half of state
710        * change executes. */
711       if (queue->interrupt) {
712         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
713         GST_QUEUE_MUTEX_UNLOCK;
714         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
715                 GST_ELEMENT (queue)))
716           return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
717         goto restart;
718       }
719       if (GST_STATE (queue) != GST_STATE_PLAYING) {
720         /* this means the other end is shut down */
721         if (!queue->may_deadlock) {
722           GST_QUEUE_MUTEX_UNLOCK;
723           GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
724               ("deadlock found, shutting down sink pad elements"));
725           goto restart;
726         } else {
727           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
728               "%s: waiting for the app to restart "
729               "source pad elements", GST_ELEMENT_NAME (queue));
730         }
731       }
732
733       STATUS (queue, "waiting for item_add");
734
735       if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
736         GTimeVal timeout;
737
738         g_get_current_time (&timeout);
739         g_time_val_add (&timeout, queue->block_timeout / 1000);
740         GST_LOG_OBJECT (queue, "g_cond_time_wait using qlock from thread %p",
741             g_thread_self ());
742         if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
743           GST_QUEUE_MUTEX_UNLOCK;
744           GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
745               "Sending filler event");
746           return GST_DATA (gst_event_new_filler ());
747         }
748       } else {
749         GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
750             g_thread_self ());
751         g_cond_wait (queue->item_add, queue->qlock);
752         GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
753             g_thread_self ());
754       }
755       STATUS (queue, "got item_add signal");
756     }
757
758     STATUS (queue, "post-empty wait");
759     GST_QUEUE_MUTEX_UNLOCK;
760     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
761     GST_QUEUE_MUTEX_LOCK;
762   }
763
764   /* There's something in the list now, whatever it is */
765   data = g_queue_pop_head (queue->queue);
766   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
767       "retrieved data %p from queue", data);
768
769   if (data == NULL)
770     return NULL;
771
772   if (GST_IS_BUFFER (data)) {
773     /* Update statistics */
774     queue->cur_level.buffers--;
775     queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
776     if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
777       queue->cur_level.time -= GST_BUFFER_DURATION (data);
778   }
779
780   /* Now that we're done, we can lose our own reference to
781    * the item, since we're no longer in danger. */
782   gst_data_unref (data);
783
784   STATUS (queue, "after _get()");
785
786   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
787   g_cond_signal (queue->item_del);
788   GST_QUEUE_MUTEX_UNLOCK;
789
790   /* FIXME: I suppose this needs to be locked, since the EOS
791    * bit affects the pipeline state. However, that bit is
792    * locked too so it'd cause a deadlock. */
793   if (GST_IS_EVENT (data)) {
794     GstEvent *event = GST_EVENT (data);
795
796     switch (GST_EVENT_TYPE (event)) {
797       case GST_EVENT_EOS:
798         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
799             "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
800         gst_element_set_eos (GST_ELEMENT (queue));
801         break;
802       default:
803         break;
804     }
805   }
806
807   return data;
808 }
809
810
811 static gboolean
812 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
813 {
814   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
815   gboolean res;
816
817   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
818       event, GST_EVENT_TYPE (event));
819   GST_QUEUE_MUTEX_LOCK;
820
821   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
822     GstQueueEventResponse er;
823
824     /* push the event to the queue and wait for upstream consumption */
825     er.event = event;
826     er.handled = FALSE;
827     g_mutex_lock (queue->event_lock);
828     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
829         "putting event %p (%d) on internal queue", event,
830         GST_EVENT_TYPE (event));
831     g_queue_push_tail (queue->events, &er);
832     g_mutex_unlock (queue->event_lock);
833     GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
834         "Preparing for loop for event handler");
835     /* see the chain function on why this is here - it prevents a deadlock */
836     g_cond_signal (queue->item_del);
837     while (!er.handled) {
838       GTimeVal timeout;
839
840       g_get_current_time (&timeout);
841       g_time_val_add (&timeout, 500 * 1000);    /* half a second */
842       GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
843           g_thread_self ());
844       if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
845           !er.handled) {
846         GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
847             "timeout in upstream event handling, dropping event %p (%d)",
848             er.event, GST_EVENT_TYPE (er.event));
849         g_mutex_lock (queue->event_lock);
850         /* since this queue is for src events (ie upstream), this thread is
851          * the only one that is pushing stuff on it, so we're sure that
852          * it's still the tail element.  FIXME: But in practice, we should use
853          * GList instead of GQueue for this so we can remove any element in
854          * the list. */
855         g_queue_pop_tail (queue->events);
856         g_mutex_unlock (queue->event_lock);
857         gst_event_unref (er.event);
858         res = FALSE;
859         goto handled;
860       }
861     }
862     GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
863     res = er.ret;
864   } else {
865     res = gst_pad_event_default (pad, event);
866
867     switch (GST_EVENT_TYPE (event)) {
868       case GST_EVENT_FLUSH:
869         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
870             "FLUSH event, flushing queue\n");
871         gst_queue_locked_flush (queue);
872         break;
873       case GST_EVENT_SEEK:
874         if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
875           gst_queue_locked_flush (queue);
876         }
877       default:
878         break;
879     }
880   }
881 handled:
882   GST_QUEUE_MUTEX_UNLOCK;
883
884   return res;
885 }
886
887 static gboolean
888 gst_queue_release_locks (GstElement * element)
889 {
890   GstQueue *queue;
891
892   queue = GST_QUEUE (element);
893
894   GST_QUEUE_MUTEX_LOCK;
895   queue->interrupt = TRUE;
896   g_cond_signal (queue->item_add);
897   g_cond_signal (queue->item_del);
898   GST_QUEUE_MUTEX_UNLOCK;
899
900   return TRUE;
901 }
902
903 static GstElementStateReturn
904 gst_queue_change_state (GstElement * element)
905 {
906   GstQueue *queue;
907   GstElementStateReturn ret = GST_STATE_SUCCESS;
908
909   queue = GST_QUEUE (element);
910
911   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
912
913   /* lock the queue so another thread (not in sync with this thread's state)
914    * can't call this queue's _get (or whatever)
915    */
916   GST_QUEUE_MUTEX_LOCK;
917
918   switch (GST_STATE_TRANSITION (element)) {
919     case GST_STATE_NULL_TO_READY:
920       gst_queue_locked_flush (queue);
921       break;
922     case GST_STATE_PAUSED_TO_PLAYING:
923       if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
924         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
925             "queue %s is not linked", GST_ELEMENT_NAME (queue));
926         /* FIXME can this be? */
927         g_cond_signal (queue->item_add);
928
929         ret = GST_STATE_FAILURE;
930         goto unlock;
931       } else {
932         GstScheduler *src_sched, *sink_sched;
933
934         src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
935         sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
936
937         if (src_sched == sink_sched) {
938           GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
939               "queue %s does not connect different schedulers",
940               GST_ELEMENT_NAME (queue));
941
942           g_warning ("queue %s does not connect different schedulers",
943               GST_ELEMENT_NAME (queue));
944
945           ret = GST_STATE_FAILURE;
946           goto unlock;
947         }
948       }
949       queue->interrupt = FALSE;
950       break;
951     case GST_STATE_PAUSED_TO_READY:
952       gst_queue_locked_flush (queue);
953       gst_caps_replace (&queue->negotiated_caps, NULL);
954       break;
955     default:
956       break;
957   }
958
959   if (GST_ELEMENT_CLASS (parent_class)->change_state)
960     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
961
962   /* this is an ugly hack to make sure our pads are always active.
963    * Reason for this is that pad activation for the queue element
964    * depends on 2 schedulers (ugh) */
965   gst_pad_set_active (queue->sinkpad, TRUE);
966   gst_pad_set_active (queue->srcpad, TRUE);
967
968 unlock:
969   GST_QUEUE_MUTEX_UNLOCK;
970
971   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
972
973   return ret;
974 }
975
976
977 static void
978 gst_queue_set_property (GObject * object,
979     guint prop_id, const GValue * value, GParamSpec * pspec)
980 {
981   GstQueue *queue = GST_QUEUE (object);
982
983   /* someone could change levels here, and since this
984    * affects the get/put funcs, we need to lock for safety. */
985   GST_QUEUE_MUTEX_LOCK;
986
987   switch (prop_id) {
988     case ARG_MAX_SIZE_BYTES:
989       queue->max_size.bytes = g_value_get_uint (value);
990       break;
991     case ARG_MAX_SIZE_BUFFERS:
992       queue->max_size.buffers = g_value_get_uint (value);
993       break;
994     case ARG_MAX_SIZE_TIME:
995       queue->max_size.time = g_value_get_uint64 (value);
996       break;
997     case ARG_MIN_THRESHOLD_BYTES:
998       queue->min_threshold.bytes = g_value_get_uint (value);
999       break;
1000     case ARG_MIN_THRESHOLD_BUFFERS:
1001       queue->min_threshold.buffers = g_value_get_uint (value);
1002       break;
1003     case ARG_MIN_THRESHOLD_TIME:
1004       queue->min_threshold.time = g_value_get_uint64 (value);
1005       break;
1006     case ARG_LEAKY:
1007       queue->leaky = g_value_get_enum (value);
1008       break;
1009     case ARG_MAY_DEADLOCK:
1010       queue->may_deadlock = g_value_get_boolean (value);
1011       break;
1012     case ARG_BLOCK_TIMEOUT:
1013       queue->block_timeout = g_value_get_uint64 (value);
1014       break;
1015     default:
1016       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1017       break;
1018   }
1019
1020   GST_QUEUE_MUTEX_UNLOCK;
1021 }
1022
1023 static void
1024 gst_queue_get_property (GObject * object,
1025     guint prop_id, GValue * value, GParamSpec * pspec)
1026 {
1027   GstQueue *queue = GST_QUEUE (object);
1028
1029   switch (prop_id) {
1030     case ARG_CUR_LEVEL_BYTES:
1031       g_value_set_uint (value, queue->cur_level.bytes);
1032       break;
1033     case ARG_CUR_LEVEL_BUFFERS:
1034       g_value_set_uint (value, queue->cur_level.buffers);
1035       break;
1036     case ARG_CUR_LEVEL_TIME:
1037       g_value_set_uint64 (value, queue->cur_level.time);
1038       break;
1039     case ARG_MAX_SIZE_BYTES:
1040       g_value_set_uint (value, queue->max_size.bytes);
1041       break;
1042     case ARG_MAX_SIZE_BUFFERS:
1043       g_value_set_uint (value, queue->max_size.buffers);
1044       break;
1045     case ARG_MAX_SIZE_TIME:
1046       g_value_set_uint64 (value, queue->max_size.time);
1047       break;
1048     case ARG_MIN_THRESHOLD_BYTES:
1049       g_value_set_uint (value, queue->min_threshold.bytes);
1050       break;
1051     case ARG_MIN_THRESHOLD_BUFFERS:
1052       g_value_set_uint (value, queue->min_threshold.buffers);
1053       break;
1054     case ARG_MIN_THRESHOLD_TIME:
1055       g_value_set_uint64 (value, queue->min_threshold.time);
1056       break;
1057     case ARG_LEAKY:
1058       g_value_set_enum (value, queue->leaky);
1059       break;
1060     case ARG_MAY_DEADLOCK:
1061       g_value_set_boolean (value, queue->may_deadlock);
1062       break;
1063     case ARG_BLOCK_TIMEOUT:
1064       g_value_set_uint64 (value, queue->block_timeout);
1065       break;
1066     default:
1067       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1068       break;
1069   }
1070 }