2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
4 * 2003 Colin Walters <cwalters@gnome.org>
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21 * Boston, MA 02111-1307, USA.
25 #include "gst_private.h"
28 #include "gstscheduler.h"
33 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
38 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
43 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
45 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
48 "Erik Walthinsen <omega@cse.ogi.edu>");
51 /* Queue signals and args */
63 /* FIXME: don't we have another way of doing this
64 * "Gstreamer format" (frame/byte/time) queries? */
65 ARG_CUR_LEVEL_BUFFERS,
71 ARG_MIN_THRESHOLD_BUFFERS,
72 ARG_MIN_THRESHOLD_BYTES,
73 ARG_MIN_THRESHOLD_TIME,
80 #define GST_QUEUE_MUTEX_LOCK G_STMT_START { \
81 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
82 "locking qlock from thread %p", \
84 g_mutex_lock (queue->qlock); \
85 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
86 "locked qlock from thread %p", \
90 #define GST_QUEUE_MUTEX_UNLOCK G_STMT_START { \
91 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
92 "unlocking qlock from thread %p", \
94 g_mutex_unlock (queue->qlock); \
98 typedef struct _GstQueueEventResponse
101 gboolean ret, handled;
103 GstQueueEventResponse;
105 static void gst_queue_base_init (GstQueueClass * klass);
106 static void gst_queue_class_init (GstQueueClass * klass);
107 static void gst_queue_init (GstQueue * queue);
108 static void gst_queue_finalize (GObject * object);
110 static void gst_queue_set_property (GObject * object,
111 guint prop_id, const GValue * value, GParamSpec * pspec);
112 static void gst_queue_get_property (GObject * object,
113 guint prop_id, GValue * value, GParamSpec * pspec);
115 static void gst_queue_chain (GstPad * pad, GstData * data);
116 static GstData *gst_queue_get (GstPad * pad);
118 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
120 static GstCaps *gst_queue_getcaps (GstPad * pad);
121 static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
122 static void gst_queue_locked_flush (GstQueue * queue);
124 static GstElementStateReturn gst_queue_change_state (GstElement * element);
125 static gboolean gst_queue_release_locks (GstElement * element);
128 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
131 queue_leaky_get_type (void)
133 static GType queue_leaky_type = 0;
134 static GEnumValue queue_leaky[] = {
135 {GST_QUEUE_NO_LEAK, "0", "Not Leaky"},
136 {GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream"},
137 {GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream"},
141 if (!queue_leaky_type) {
142 queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
144 return queue_leaky_type;
147 static GstElementClass *parent_class = NULL;
148 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
151 gst_queue_get_type (void)
153 static GType queue_type = 0;
156 static const GTypeInfo queue_info = {
157 sizeof (GstQueueClass),
158 (GBaseInitFunc) gst_queue_base_init,
160 (GClassInitFunc) gst_queue_class_init,
165 (GInstanceInitFunc) gst_queue_init,
169 queue_type = g_type_register_static (GST_TYPE_ELEMENT,
170 "GstQueue", &queue_info, 0);
171 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0,
172 "dataflow inside the queue element");
179 gst_queue_base_init (GstQueueClass * klass)
181 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
183 gst_element_class_add_pad_template (gstelement_class,
184 gst_static_pad_template_get (&srctemplate));
185 gst_element_class_add_pad_template (gstelement_class,
186 gst_static_pad_template_get (&sinktemplate));
187 gst_element_class_set_details (gstelement_class, &gst_queue_details);
191 gst_queue_class_init (GstQueueClass * klass)
193 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
194 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
196 parent_class = g_type_class_peek_parent (klass);
199 gst_queue_signals[SIGNAL_UNDERRUN] =
200 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
201 G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
202 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
203 gst_queue_signals[SIGNAL_RUNNING] =
204 g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
205 G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
206 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
207 gst_queue_signals[SIGNAL_OVERRUN] =
208 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
209 G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
210 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
213 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
214 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
215 "Current amount of data in the queue (bytes)",
216 0, G_MAXUINT, 0, G_PARAM_READABLE));
217 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
218 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
219 "Current number of buffers in the queue",
220 0, G_MAXUINT, 0, G_PARAM_READABLE));
221 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
222 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
223 "Current amount of data in the queue (in ns)",
224 0, G_MAXUINT64, 0, G_PARAM_READABLE));
226 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
227 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
228 "Max. amount of data in the queue (bytes, 0=disable)",
229 0, G_MAXUINT, 0, G_PARAM_READWRITE));
230 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
231 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
232 "Max. number of buffers in the queue (0=disable)",
233 0, G_MAXUINT, 0, G_PARAM_READWRITE));
234 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
235 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
236 "Max. amount of data in the queue (in ns, 0=disable)",
237 0, G_MAXUINT64, 0, G_PARAM_READWRITE));
239 g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
240 g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
241 "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
242 0, G_MAXUINT, 0, G_PARAM_READWRITE));
243 g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
244 g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
245 "Min. number of buffers in the queue to allow reading (0=disable)",
246 0, G_MAXUINT, 0, G_PARAM_READWRITE));
247 g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
248 g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
249 "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
250 0, G_MAXUINT64, 0, G_PARAM_READWRITE));
252 g_object_class_install_property (gobject_class, ARG_LEAKY,
253 g_param_spec_enum ("leaky", "Leaky",
254 "Where the queue leaks, if at all",
255 GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
256 g_object_class_install_property (gobject_class, ARG_MAY_DEADLOCK,
257 g_param_spec_boolean ("may_deadlock", "May Deadlock",
258 "The queue may deadlock if it's full and not PLAYING",
259 TRUE, G_PARAM_READWRITE));
260 g_object_class_install_property (gobject_class, ARG_BLOCK_TIMEOUT,
261 g_param_spec_uint64 ("block_timeout", "Timeout for Block",
262 "Nanoseconds until blocked queue times out and returns filler event. "
263 "Value of -1 disables timeout",
264 0, G_MAXUINT64, -1, G_PARAM_READWRITE));
266 /* set several parent class virtual functions */
267 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
268 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
269 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
271 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
272 gstelement_class->release_locks = GST_DEBUG_FUNCPTR (gst_queue_release_locks);
276 gst_queue_init (GstQueue * queue)
278 /* scheduling on this kind of element is, well, interesting */
279 GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
280 GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
283 gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate),
285 gst_pad_set_chain_function (queue->sinkpad,
286 GST_DEBUG_FUNCPTR (gst_queue_chain));
287 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
288 gst_pad_set_link_function (queue->sinkpad,
289 GST_DEBUG_FUNCPTR (gst_queue_link));
290 gst_pad_set_getcaps_function (queue->sinkpad,
291 GST_DEBUG_FUNCPTR (gst_queue_getcaps));
292 gst_pad_set_active (queue->sinkpad, TRUE);
295 gst_pad_new_from_template (gst_static_pad_template_get (&srctemplate),
297 gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
298 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
299 gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
300 gst_pad_set_getcaps_function (queue->srcpad,
301 GST_DEBUG_FUNCPTR (gst_queue_getcaps));
302 gst_pad_set_event_function (queue->srcpad,
303 GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
304 gst_pad_set_active (queue->srcpad, TRUE);
306 queue->cur_level.buffers = 0; /* no content */
307 queue->cur_level.bytes = 0; /* no content */
308 queue->cur_level.time = 0; /* no content */
309 queue->max_size.buffers = 100; /* 100 buffers */
310 queue->max_size.bytes = 10 * 1024 * 1024; /* 10 MB */
311 queue->max_size.time = GST_SECOND; /* 1 s. */
312 queue->min_threshold.buffers = 0; /* no threshold */
313 queue->min_threshold.bytes = 0; /* no threshold */
314 queue->min_threshold.time = 0; /* no threshold */
316 queue->leaky = GST_QUEUE_NO_LEAK;
317 queue->may_deadlock = TRUE;
318 queue->block_timeout = GST_CLOCK_TIME_NONE;
319 queue->interrupt = FALSE;
320 queue->flush = FALSE;
322 queue->qlock = g_mutex_new ();
323 queue->item_add = g_cond_new ();
324 queue->item_del = g_cond_new ();
325 queue->event_done = g_cond_new ();
326 queue->events = g_queue_new ();
327 queue->event_lock = g_mutex_new ();
328 queue->queue = g_queue_new ();
330 GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
331 "initialized queue's not_empty & not_full conditions");
334 /* called only once, as opposed to dispose */
336 gst_queue_finalize (GObject * object)
338 GstQueue *queue = GST_QUEUE (object);
340 GST_DEBUG_OBJECT (queue, "finalizing queue");
342 while (!g_queue_is_empty (queue->queue)) {
343 GstData *data = g_queue_pop_head (queue->queue);
345 gst_data_unref (data);
347 g_queue_free (queue->queue);
348 g_mutex_free (queue->qlock);
349 g_cond_free (queue->item_add);
350 g_cond_free (queue->item_del);
351 g_cond_free (queue->event_done);
352 g_mutex_lock (queue->event_lock);
353 while (!g_queue_is_empty (queue->events)) {
354 GstQueueEventResponse *er = g_queue_pop_head (queue->events);
356 gst_event_unref (er->event);
358 g_mutex_unlock (queue->event_lock);
359 g_mutex_free (queue->event_lock);
360 g_queue_free (queue->events);
362 if (G_OBJECT_CLASS (parent_class)->finalize)
363 G_OBJECT_CLASS (parent_class)->finalize (object);
367 gst_queue_getcaps (GstPad * pad)
371 queue = GST_QUEUE (gst_pad_get_parent (pad));
373 if (queue->cur_level.bytes > 0) {
374 return gst_caps_copy (queue->negotiated_caps);
377 return gst_pad_proxy_getcaps (pad);
380 static GstPadLinkReturn
381 gst_queue_link (GstPad * pad, const GstCaps * caps)
384 GstPadLinkReturn link_ret;
386 queue = GST_QUEUE (gst_pad_get_parent (pad));
388 if (queue->cur_level.bytes > 0) {
389 if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
390 return GST_PAD_LINK_OK;
392 return GST_PAD_LINK_REFUSED;
395 link_ret = gst_pad_proxy_pad_link (pad, caps);
397 if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
398 /* we store an extra copy of the negotiated caps, just in case
399 * the pads become unnegotiated while we have buffers */
400 gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
407 gst_queue_locked_flush (GstQueue * queue)
409 while (!g_queue_is_empty (queue->queue)) {
410 GstData *data = g_queue_pop_head (queue->queue);
412 /* First loose the reference we added when putting that data in the queue */
413 gst_data_unref (data);
414 /* Then loose another reference because we are supposed to destroy that
415 data when flushing */
416 gst_data_unref (data);
418 queue->timeval = NULL;
419 queue->cur_level.buffers = 0;
420 queue->cur_level.bytes = 0;
421 queue->cur_level.time = 0;
423 /* make sure any pending buffers to be added are flushed too */
426 /* we deleted something... */
427 g_cond_signal (queue->item_del);
431 gst_queue_handle_pending_events (GstQueue * queue)
433 /* check for events to send upstream */
434 /* g_queue_get_length is glib 2.4, so don't depend on it yet, use ->length */
435 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
436 "handling pending events, events queue of size %d",
437 queue->events->length);
438 g_mutex_lock (queue->event_lock);
439 while (!g_queue_is_empty (queue->events)) {
440 GstQueueEventResponse *er;
442 er = g_queue_pop_head (queue->events);
444 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
445 "sending event %p (%d) from event response %p upstream",
446 er->event, GST_EVENT_TYPE (er->event), er);
448 /* change this to an assert when this file gets reviewed properly. */
449 GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
450 ("already handled event %p (%d) from event response %p upstream",
451 er->event, GST_EVENT_TYPE (er->event), er));
454 g_mutex_unlock (queue->event_lock);
455 er->ret = gst_pad_event_default (queue->srcpad, er->event);
457 g_cond_signal (queue->event_done);
458 g_mutex_lock (queue->event_lock);
459 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
461 g_mutex_unlock (queue->event_lock);
464 #define STATUS(queue, msg) \
465 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
466 "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
467 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
468 "-%" G_GUINT64_FORMAT " ns, %u elements", \
469 GST_DEBUG_PAD_NAME (pad), \
470 queue->cur_level.buffers, \
471 queue->min_threshold.buffers, \
472 queue->max_size.buffers, \
473 queue->cur_level.bytes, \
474 queue->min_threshold.bytes, \
475 queue->max_size.bytes, \
476 queue->cur_level.time, \
477 queue->min_threshold.time, \
478 queue->max_size.time, \
479 queue->queue->length)
482 gst_queue_chain (GstPad * pad, GstData * data)
486 g_return_if_fail (pad != NULL);
487 g_return_if_fail (GST_IS_PAD (pad));
488 g_return_if_fail (data != NULL);
490 queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
493 /* we have to lock the queue since we span threads */
494 GST_QUEUE_MUTEX_LOCK;
496 gst_queue_handle_pending_events (queue);
498 /* assume don't need to flush this buffer when the queue is filled */
499 queue->flush = FALSE;
501 if (GST_IS_EVENT (data)) {
502 switch (GST_EVENT_TYPE (data)) {
503 case GST_EVENT_FLUSH:
504 STATUS (queue, "received flush event");
505 gst_queue_locked_flush (queue);
506 STATUS (queue, "after flush");
509 STATUS (queue, "received EOS");
512 /* we put the event in the queue, we don't have to act ourselves */
513 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
514 "adding event %p of type %d", data, GST_EVENT_TYPE (data));
519 if (GST_IS_BUFFER (data))
520 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
521 "adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
523 /* We make space available if we're "full" according to whatever
524 * the user defined as "full". Note that this only applies to buffers.
525 * We always handle events and they don't count in our statistics. */
526 if (GST_IS_BUFFER (data) &&
527 ((queue->max_size.buffers > 0 &&
528 queue->cur_level.buffers >= queue->max_size.buffers) ||
529 (queue->max_size.bytes > 0 &&
530 queue->cur_level.bytes >= queue->max_size.bytes) ||
531 (queue->max_size.time > 0 &&
532 queue->cur_level.time >= queue->max_size.time))) {
533 GST_QUEUE_MUTEX_UNLOCK;
534 g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
535 GST_QUEUE_MUTEX_LOCK;
537 /* how are we going to make space for this buffer? */
538 switch (queue->leaky) {
539 /* leak current buffer */
540 case GST_QUEUE_LEAK_UPSTREAM:
541 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
542 "queue is full, leaking buffer on upstream end");
543 /* now we can clean up and exit right away */
544 GST_QUEUE_MUTEX_UNLOCK;
547 /* leak first buffer in the queue */
548 case GST_QUEUE_LEAK_DOWNSTREAM:{
549 /* this is a bit hacky. We'll manually iterate the list
550 * and find the first buffer from the head on. We'll
551 * unref that and "fix up" the GQueue object... */
553 GstData *leak = NULL;
555 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
556 "queue is full, leaking buffer on downstream end");
558 for (item = queue->queue->head; item != NULL; item = item->next) {
559 if (GST_IS_BUFFER (item->data)) {
565 /* if we didn't find anything, it means we have no buffers
566 * in here. That cannot happen, since we had >= 1 bufs */
569 /* Now remove it from the list, fixing up the GQueue
570 * CHECKME: is a queue->head the first or the last item? */
571 item = g_list_delete_link (queue->queue->head, item);
572 queue->queue->head = g_list_first (item);
573 queue->queue->tail = g_list_last (item);
574 queue->queue->length--;
576 /* and unref the data at the end. Twice, because we keep a ref
577 * to make things read-only. Also keep our list uptodate. */
578 queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
579 queue->cur_level.buffers--;
580 if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
581 queue->cur_level.time -= GST_BUFFER_DURATION (data);
583 gst_data_unref (data);
584 gst_data_unref (data);
589 g_warning ("Unknown leaky type, using default");
592 /* don't leak. Instead, wait for space to be available */
593 case GST_QUEUE_NO_LEAK:
594 STATUS (queue, "pre-full wait");
596 while ((queue->max_size.buffers > 0 &&
597 queue->cur_level.buffers >= queue->max_size.buffers) ||
598 (queue->max_size.bytes > 0 &&
599 queue->cur_level.bytes >= queue->max_size.bytes) ||
600 (queue->max_size.time > 0 &&
601 queue->cur_level.time >= queue->max_size.time)) {
602 /* if there's a pending state change for this queue
603 * or its manager, switch back to iterator so bottom
604 * half of state change executes */
605 if (queue->interrupt) {
606 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
607 GST_QUEUE_MUTEX_UNLOCK;
608 if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad),
609 GST_ELEMENT (queue))) {
612 /* if we got here because we were unlocked after a
613 * flush, we don't need to add the buffer to the
616 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
617 "not adding pending buffer after flush");
620 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
621 "adding pending buffer after interrupt");
625 if (GST_STATE (queue) != GST_STATE_PLAYING) {
626 /* this means the other end is shut down. Try to
627 * signal to resolve the error */
628 if (!queue->may_deadlock) {
629 GST_QUEUE_MUTEX_UNLOCK;
630 gst_data_unref (data);
631 GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
632 ("deadlock found, shutting down source pad elements"));
633 /* we don't go to out_unref here, since we want to
634 * unref the buffer *before* calling GST_ELEMENT_ERROR */
637 GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
638 "%s: waiting for the app to restart "
639 "source pad elements", GST_ELEMENT_NAME (queue));
643 /* OK, we've got a serious issue here. Imagine the situation
644 * where the puller (next element) is sending an event here,
645 * so it cannot pull events from the queue, and we cannot
646 * push data further because the queue is 'full' and therefore,
647 * we wait here (and do not handle events): deadlock! to solve
648 * that, we handle pending upstream events here, too. */
649 gst_queue_handle_pending_events (queue);
651 STATUS (queue, "waiting for item_del signal from thread using qlock");
652 g_cond_wait (queue->item_del, queue->qlock);
653 STATUS (queue, "received item_del signal from thread using qlock");
656 STATUS (queue, "post-full wait");
657 GST_QUEUE_MUTEX_UNLOCK;
658 g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
659 GST_QUEUE_MUTEX_LOCK;
664 /* put the buffer on the tail of the list. We keep a reference,
665 * so that the data is read-only while in here. There's a good
666 * reason to do so: we have a size and time counter, and any
667 * modification to the content could change any of the two. */
669 g_queue_push_tail (queue->queue, data);
671 /* Note that we only add buffers (not events) to the statistics */
672 if (GST_IS_BUFFER (data)) {
673 queue->cur_level.buffers++;
674 queue->cur_level.bytes += GST_BUFFER_SIZE (data);
675 if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
676 queue->cur_level.time += GST_BUFFER_DURATION (data);
679 STATUS (queue, "+ level");
681 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
682 g_cond_signal (queue->item_add);
683 GST_QUEUE_MUTEX_UNLOCK;
688 gst_data_unref (data);
693 gst_queue_get (GstPad * pad)
698 g_return_val_if_fail (pad != NULL, NULL);
699 g_return_val_if_fail (GST_IS_PAD (pad), NULL);
701 queue = GST_QUEUE (gst_pad_get_parent (pad));
704 /* have to lock for thread-safety */
705 GST_QUEUE_MUTEX_LOCK;
707 if (queue->queue->length == 0 ||
708 (queue->min_threshold.buffers > 0 &&
709 queue->cur_level.buffers < queue->min_threshold.buffers) ||
710 (queue->min_threshold.bytes > 0 &&
711 queue->cur_level.bytes < queue->min_threshold.bytes) ||
712 (queue->min_threshold.time > 0 &&
713 queue->cur_level.time < queue->min_threshold.time)) {
714 GST_QUEUE_MUTEX_UNLOCK;
715 g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
716 GST_QUEUE_MUTEX_LOCK;
718 STATUS (queue, "pre-empty wait");
719 while (queue->queue->length == 0 ||
720 (queue->min_threshold.buffers > 0 &&
721 queue->cur_level.buffers < queue->min_threshold.buffers) ||
722 (queue->min_threshold.bytes > 0 &&
723 queue->cur_level.bytes < queue->min_threshold.bytes) ||
724 (queue->min_threshold.time > 0 &&
725 queue->cur_level.time < queue->min_threshold.time)) {
726 /* if there's a pending state change for this queue or its
727 * manager, switch back to iterator so bottom half of state
728 * change executes. */
729 if (queue->interrupt) {
730 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "interrupted");
731 GST_QUEUE_MUTEX_UNLOCK;
732 if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad),
733 GST_ELEMENT (queue)))
734 return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
737 if (GST_STATE (queue) != GST_STATE_PLAYING) {
738 /* this means the other end is shut down */
739 if (!queue->may_deadlock) {
740 GST_QUEUE_MUTEX_UNLOCK;
741 GST_ELEMENT_ERROR (queue, CORE, THREAD, (NULL),
742 ("deadlock found, shutting down sink pad elements"));
745 GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
746 "%s: waiting for the app to restart "
747 "source pad elements", GST_ELEMENT_NAME (queue));
751 STATUS (queue, "waiting for item_add");
753 if (queue->block_timeout != GST_CLOCK_TIME_NONE) {
756 g_get_current_time (&timeout);
757 g_time_val_add (&timeout, queue->block_timeout / 1000);
758 GST_LOG_OBJECT (queue, "g_cond_time_wait using qlock from thread %p",
760 if (!g_cond_timed_wait (queue->item_add, queue->qlock, &timeout)) {
761 GST_QUEUE_MUTEX_UNLOCK;
762 GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
763 "Sending filler event");
764 return GST_DATA (gst_event_new_filler ());
767 GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
769 g_cond_wait (queue->item_add, queue->qlock);
770 GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
773 STATUS (queue, "got item_add signal");
776 STATUS (queue, "post-empty wait");
777 GST_QUEUE_MUTEX_UNLOCK;
778 g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
779 GST_QUEUE_MUTEX_LOCK;
782 /* There's something in the list now, whatever it is */
783 data = g_queue_pop_head (queue->queue);
784 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
785 "retrieved data %p from queue", data);
790 if (GST_IS_BUFFER (data)) {
791 /* Update statistics */
792 queue->cur_level.buffers--;
793 queue->cur_level.bytes -= GST_BUFFER_SIZE (data);
794 if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
795 queue->cur_level.time -= GST_BUFFER_DURATION (data);
798 /* Now that we're done, we can lose our own reference to
799 * the item, since we're no longer in danger. */
800 gst_data_unref (data);
802 STATUS (queue, "after _get()");
804 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
805 g_cond_signal (queue->item_del);
806 GST_QUEUE_MUTEX_UNLOCK;
808 /* FIXME: I suppose this needs to be locked, since the EOS
809 * bit affects the pipeline state. However, that bit is
810 * locked too so it'd cause a deadlock. */
811 if (GST_IS_EVENT (data)) {
812 GstEvent *event = GST_EVENT (data);
814 switch (GST_EVENT_TYPE (event)) {
816 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
817 "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
818 gst_element_set_eos (GST_ELEMENT (queue));
830 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
832 GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
835 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
836 event, GST_EVENT_TYPE (event));
837 GST_QUEUE_MUTEX_LOCK;
839 if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
840 GstQueueEventResponse er;
842 /* push the event to the queue and wait for upstream consumption */
845 g_mutex_lock (queue->event_lock);
846 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
847 "putting event %p (%d) on internal queue", event,
848 GST_EVENT_TYPE (event));
849 g_queue_push_tail (queue->events, &er);
850 g_mutex_unlock (queue->event_lock);
851 GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
852 "Preparing for loop for event handler");
853 /* see the chain function on why this is here - it prevents a deadlock */
854 g_cond_signal (queue->item_del);
855 while (!er.handled) {
858 g_get_current_time (&timeout);
859 g_time_val_add (&timeout, 500 * 1000); /* half a second */
860 GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
862 if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
864 GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
865 "timeout in upstream event handling, dropping event %p (%d)",
866 er.event, GST_EVENT_TYPE (er.event));
867 g_mutex_lock (queue->event_lock);
868 /* since this queue is for src events (ie upstream), this thread is
869 * the only one that is pushing stuff on it, so we're sure that
870 * it's still the tail element. FIXME: But in practice, we should use
871 * GList instead of GQueue for this so we can remove any element in
873 g_queue_pop_tail (queue->events);
874 g_mutex_unlock (queue->event_lock);
875 gst_event_unref (er.event);
880 GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
883 res = gst_pad_event_default (pad, event);
885 switch (GST_EVENT_TYPE (event)) {
886 case GST_EVENT_FLUSH:
887 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
888 "FLUSH event, flushing queue\n");
889 gst_queue_locked_flush (queue);
892 if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
893 gst_queue_locked_flush (queue);
900 GST_QUEUE_MUTEX_UNLOCK;
906 gst_queue_release_locks (GstElement * element)
910 queue = GST_QUEUE (element);
912 GST_QUEUE_MUTEX_LOCK;
913 queue->interrupt = TRUE;
914 g_cond_signal (queue->item_add);
915 g_cond_signal (queue->item_del);
916 GST_QUEUE_MUTEX_UNLOCK;
921 static GstElementStateReturn
922 gst_queue_change_state (GstElement * element)
925 GstElementStateReturn ret = GST_STATE_SUCCESS;
927 queue = GST_QUEUE (element);
929 GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
931 /* lock the queue so another thread (not in sync with this thread's state)
932 * can't call this queue's _get (or whatever)
934 GST_QUEUE_MUTEX_LOCK;
936 switch (GST_STATE_TRANSITION (element)) {
937 case GST_STATE_NULL_TO_READY:
938 gst_queue_locked_flush (queue);
940 case GST_STATE_PAUSED_TO_PLAYING:
941 if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
942 GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
943 "queue %s is not linked", GST_ELEMENT_NAME (queue));
944 /* FIXME can this be? */
945 g_cond_signal (queue->item_add);
947 ret = GST_STATE_FAILURE;
950 GstScheduler *src_sched, *sink_sched;
952 src_sched = gst_pad_get_scheduler (GST_PAD (queue->srcpad));
953 sink_sched = gst_pad_get_scheduler (GST_PAD (queue->sinkpad));
955 if (src_sched == sink_sched) {
956 GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, queue,
957 "queue %s does not connect different schedulers",
958 GST_ELEMENT_NAME (queue));
960 g_warning ("queue %s does not connect different schedulers",
961 GST_ELEMENT_NAME (queue));
963 ret = GST_STATE_FAILURE;
967 queue->interrupt = FALSE;
969 case GST_STATE_PAUSED_TO_READY:
970 gst_queue_locked_flush (queue);
971 gst_caps_replace (&queue->negotiated_caps, NULL);
977 if (GST_ELEMENT_CLASS (parent_class)->change_state)
978 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
980 /* this is an ugly hack to make sure our pads are always active.
981 * Reason for this is that pad activation for the queue element
982 * depends on 2 schedulers (ugh) */
983 gst_pad_set_active (queue->sinkpad, TRUE);
984 gst_pad_set_active (queue->srcpad, TRUE);
987 GST_QUEUE_MUTEX_UNLOCK;
989 GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
996 gst_queue_set_property (GObject * object,
997 guint prop_id, const GValue * value, GParamSpec * pspec)
999 GstQueue *queue = GST_QUEUE (object);
1001 /* someone could change levels here, and since this
1002 * affects the get/put funcs, we need to lock for safety. */
1003 GST_QUEUE_MUTEX_LOCK;
1006 case ARG_MAX_SIZE_BYTES:
1007 queue->max_size.bytes = g_value_get_uint (value);
1009 case ARG_MAX_SIZE_BUFFERS:
1010 queue->max_size.buffers = g_value_get_uint (value);
1012 case ARG_MAX_SIZE_TIME:
1013 queue->max_size.time = g_value_get_uint64 (value);
1015 case ARG_MIN_THRESHOLD_BYTES:
1016 queue->min_threshold.bytes = g_value_get_uint (value);
1018 case ARG_MIN_THRESHOLD_BUFFERS:
1019 queue->min_threshold.buffers = g_value_get_uint (value);
1021 case ARG_MIN_THRESHOLD_TIME:
1022 queue->min_threshold.time = g_value_get_uint64 (value);
1025 queue->leaky = g_value_get_enum (value);
1027 case ARG_MAY_DEADLOCK:
1028 queue->may_deadlock = g_value_get_boolean (value);
1030 case ARG_BLOCK_TIMEOUT:
1031 queue->block_timeout = g_value_get_uint64 (value);
1034 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1038 GST_QUEUE_MUTEX_UNLOCK;
1042 gst_queue_get_property (GObject * object,
1043 guint prop_id, GValue * value, GParamSpec * pspec)
1045 GstQueue *queue = GST_QUEUE (object);
1048 case ARG_CUR_LEVEL_BYTES:
1049 g_value_set_uint (value, queue->cur_level.bytes);
1051 case ARG_CUR_LEVEL_BUFFERS:
1052 g_value_set_uint (value, queue->cur_level.buffers);
1054 case ARG_CUR_LEVEL_TIME:
1055 g_value_set_uint64 (value, queue->cur_level.time);
1057 case ARG_MAX_SIZE_BYTES:
1058 g_value_set_uint (value, queue->max_size.bytes);
1060 case ARG_MAX_SIZE_BUFFERS:
1061 g_value_set_uint (value, queue->max_size.buffers);
1063 case ARG_MAX_SIZE_TIME:
1064 g_value_set_uint64 (value, queue->max_size.time);
1066 case ARG_MIN_THRESHOLD_BYTES:
1067 g_value_set_uint (value, queue->min_threshold.bytes);
1069 case ARG_MIN_THRESHOLD_BUFFERS:
1070 g_value_set_uint (value, queue->min_threshold.buffers);
1072 case ARG_MIN_THRESHOLD_TIME:
1073 g_value_set_uint64 (value, queue->min_threshold.time);
1076 g_value_set_enum (value, queue->leaky);
1078 case ARG_MAY_DEADLOCK:
1079 g_value_set_boolean (value, queue->may_deadlock);
1081 case ARG_BLOCK_TIMEOUT:
1082 g_value_set_uint64 (value, queue->block_timeout);
1085 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);