2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
23 /* #define DEBUG_ENABLED */
24 /* #define STATUS_ENABLED */
26 #define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
34 #include "gst_private.h"
37 #include "gstscheduler.h"
40 GstElementDetails gst_queue_details = {
45 "Erik Walthinsen <omega@cse.ogi.edu>",
50 /* Queue signals and args */
72 static void gst_queue_class_init (GstQueueClass *klass);
73 static void gst_queue_init (GstQueue *queue);
74 static void gst_queue_dispose (GObject *object);
76 static void gst_queue_set_property (GObject *object, guint prop_id,
77 const GValue *value, GParamSpec *pspec);
78 static void gst_queue_get_property (GObject *object, guint prop_id,
79 GValue *value, GParamSpec *pspec);
81 static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
82 static GstBuffer * gst_queue_get (GstPad *pad);
83 static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
85 static void gst_queue_locked_flush (GstQueue *queue);
87 static GstElementStateReturn gst_queue_change_state (GstElement *element);
90 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
92 queue_leaky_get_type(void) {
93 static GType queue_leaky_type = 0;
94 static GEnumValue queue_leaky[] = {
95 { GST_QUEUE_NO_LEAK, "0", "Not Leaky" },
96 { GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream" },
97 { GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream" },
100 if (!queue_leaky_type) {
101 queue_leaky_type = g_enum_register_static("GstQueueLeaky", queue_leaky);
103 return queue_leaky_type;
106 static GstElementClass *parent_class = NULL;
107 /* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */
110 gst_queue_get_type(void)
112 static GType queue_type = 0;
115 static const GTypeInfo queue_info = {
116 sizeof(GstQueueClass),
119 (GClassInitFunc)gst_queue_class_init,
124 (GInstanceInitFunc)gst_queue_init,
127 queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
133 gst_queue_class_init (GstQueueClass *klass)
135 GObjectClass *gobject_class;
136 GstElementClass *gstelement_class;
138 gobject_class = (GObjectClass*)klass;
139 gstelement_class = (GstElementClass*)klass;
141 parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
143 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
144 g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
145 GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
146 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
147 g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
148 0, G_MAXINT, 0, G_PARAM_READABLE));
149 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
150 g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
151 0, G_MAXINT, 100, G_PARAM_READWRITE));
152 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
153 g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
154 TRUE, G_PARAM_READWRITE));
156 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
157 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
158 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
160 gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
163 static GstPadConnectReturn
164 gst_queue_connect (GstPad *pad, GstCaps *caps)
166 GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
169 if (pad == queue->srcpad)
170 otherpad = queue->sinkpad;
172 otherpad = queue->srcpad;
174 return gst_pad_proxy_connect (otherpad, caps);
178 gst_queue_getcaps (GstPad *pad, GstCaps *caps)
180 GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
183 if (pad == queue->srcpad)
184 otherpad = queue->sinkpad;
186 otherpad = queue->srcpad;
188 return gst_pad_get_allowed_caps (otherpad);
192 gst_queue_init (GstQueue *queue)
194 /* scheduling on this kind of element is, well, interesting */
195 GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
196 GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
198 queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
199 gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain));
200 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
201 gst_pad_set_bufferpool_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_get_bufferpool));
202 gst_pad_set_connect_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
203 gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
205 queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
206 gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
207 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
208 gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
209 gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
211 queue->leaky = GST_QUEUE_NO_LEAK;
213 queue->level_buffers = 0;
214 queue->level_bytes = 0;
215 queue->level_time = 0LL;
216 queue->size_buffers = 100; /* 100 buffers */
217 queue->size_bytes = 100 * 1024; /* 100KB */
218 queue->size_time = 1000000000LL; /* 1sec */
219 queue->may_deadlock = TRUE;
221 queue->qlock = g_mutex_new ();
222 queue->reader = FALSE;
223 queue->writer = FALSE;
224 queue->not_empty = g_cond_new ();
225 queue->not_full = g_cond_new ();
226 GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
230 gst_queue_dispose (GObject *object)
232 GstQueue *queue = GST_QUEUE (object);
234 g_mutex_free (queue->qlock);
235 g_cond_free (queue->not_empty);
236 g_cond_free (queue->not_full);
238 G_OBJECT_CLASS (parent_class)->dispose (object);
241 static GstBufferPool*
242 gst_queue_get_bufferpool (GstPad *pad)
246 queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
248 return gst_pad_get_bufferpool (queue->srcpad);
252 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
254 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
256 if (GST_IS_BUFFER (data)) {
257 gst_buffer_unref (GST_BUFFER (data));
260 gst_event_free (GST_EVENT (data));
265 gst_queue_locked_flush (GstQueue *queue)
267 g_list_foreach (queue->queue, gst_queue_cleanup_buffers,
269 g_list_free (queue->queue);
272 queue->level_buffers = 0;
273 queue->timeval = NULL;
277 gst_queue_chain (GstPad *pad, GstBuffer *buf)
282 g_return_if_fail (pad != NULL);
283 g_return_if_fail (GST_IS_PAD (pad));
284 g_return_if_fail (buf != NULL);
286 queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
289 /* we have to lock the queue since we span threads */
290 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
291 g_mutex_lock (queue->qlock);
292 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
294 if (GST_IS_EVENT (buf)) {
295 switch (GST_EVENT_TYPE (buf)) {
296 case GST_EVENT_FLUSH:
297 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
298 gst_queue_locked_flush (queue);
301 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
302 GST_ELEMENT_NAME (queue), queue->level_buffers);
305 /*gst_pad_event_default (pad, GST_EVENT (buf)); */
310 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
312 if (queue->level_buffers == queue->size_buffers) {
313 /* if this is a leaky queue... */
315 /* FIXME don't want to leak events! */
316 /* if we leak on the upstream side, drop the current buffer */
317 if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
318 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
319 if (GST_IS_EVENT (buf))
320 fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
321 GST_ELEMENT_NAME(GST_ELEMENT(queue)),
322 GST_EVENT_TYPE(GST_EVENT(buf)));
323 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
324 gst_buffer_unref(buf);
325 /* now we have to clean up and exit right away */
326 g_mutex_unlock (queue->qlock);
329 /* otherwise we have to push a buffer off the other end */
333 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
334 front = queue->queue;
335 leakbuf = (GstBuffer *)(front->data);
336 if (GST_IS_EVENT (leakbuf))
337 fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
338 GST_ELEMENT_NAME(GST_ELEMENT(queue)),
339 GST_EVENT_TYPE(GST_EVENT(leakbuf)));
340 queue->level_buffers--;
341 queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
342 gst_buffer_unref(leakbuf);
343 queue->queue = g_list_remove_link (queue->queue, front);
348 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
349 queue->level_buffers, queue->size_buffers);
350 while (queue->level_buffers == queue->size_buffers) {
351 /* if there's a pending state change for this queue or its manager, switch */
352 /* back to iterator so bottom half of state change executes */
353 while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
354 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
355 g_mutex_unlock (queue->qlock);
356 if (gst_element_interrupt (GST_ELEMENT (queue)))
360 if (GST_STATE (queue) != GST_STATE_PLAYING) {
361 /* this means the other end is shut down */
362 /* try to signal to resolve the error */
363 if (!queue->may_deadlock) {
364 if (GST_IS_BUFFER (buf)) gst_buffer_unref (buf);
365 else gst_event_free (GST_EVENT (buf));
366 g_mutex_unlock (queue->qlock);
367 gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
371 g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
375 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
377 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
378 queue->writer = TRUE;
379 g_cond_wait (queue->not_full, queue->qlock);
380 queue->writer = FALSE;
381 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
383 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
384 queue->level_buffers, queue->size_buffers);
387 /* put the buffer on the tail of the list */
388 queue->queue = g_list_append (queue->queue, buf);
389 queue->level_buffers++;
390 queue->level_bytes += GST_BUFFER_SIZE(buf);
392 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
393 GST_DEBUG_PAD_NAME(pad),
394 queue->level_buffers, queue->size_buffers);
396 /* this assertion _has_ to hold */
397 /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
399 /* reader waiting on an empty queue */
400 reader = queue->reader;
402 g_mutex_unlock (queue->qlock);
406 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
407 g_cond_signal (queue->not_empty);
412 gst_queue_get (GstPad *pad)
415 GstBuffer *buf = NULL;
419 g_assert(pad != NULL);
420 g_assert(GST_IS_PAD(pad));
421 g_return_val_if_fail (pad != NULL, NULL);
422 g_return_val_if_fail (GST_IS_PAD (pad), NULL);
424 queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
427 /* have to lock for thread-safety */
428 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
429 g_mutex_lock (queue->qlock);
430 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
432 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
433 while (queue->level_buffers == 0) {
434 /* if there's a pending state change for this queue or its manager, switch
435 * back to iterator so bottom half of state change executes
437 while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
438 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
439 g_mutex_unlock (queue->qlock);
440 if (gst_element_interrupt (GST_ELEMENT (queue)))
444 if (GST_STATE (queue) != GST_STATE_PLAYING) {
445 /* this means the other end is shut down */
446 if (!queue->may_deadlock) {
447 g_mutex_unlock (queue->qlock);
448 gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
452 g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
456 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
458 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
459 queue->reader = TRUE;
460 g_cond_wait (queue->not_empty, queue->qlock);
461 queue->reader = FALSE;
462 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
464 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
466 front = queue->queue;
467 buf = (GstBuffer *)(front->data);
468 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
469 queue->queue = g_list_remove_link (queue->queue, front);
472 queue->level_buffers--;
473 queue->level_bytes -= GST_BUFFER_SIZE(buf);
475 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
476 GST_DEBUG_PAD_NAME(pad),
477 queue->level_buffers, queue->size_buffers);
479 /* this assertion _has_ to hold */
480 /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
482 /* writer waiting on a full queue */
483 writer = queue->writer;
485 g_mutex_unlock (queue->qlock);
489 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
490 g_cond_signal (queue->not_full);
493 /* FIXME where should this be? locked? */
494 if (GST_IS_EVENT(buf)) {
495 GstEvent *event = GST_EVENT(buf);
496 switch (GST_EVENT_TYPE(event)) {
498 GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
499 gst_element_set_eos (GST_ELEMENT (queue));
509 static GstElementStateReturn
510 gst_queue_change_state (GstElement *element)
513 GstElementStateReturn ret;
514 GstElementState new_state;
515 g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
517 queue = GST_QUEUE (element);
519 GST_DEBUG_ENTER("('%s')", GST_ELEMENT_NAME (element));
521 /* lock the queue so another thread (not in sync with this thread's state)
522 * can't call this queue's _get (or whatever)
524 g_mutex_lock (queue->qlock);
526 new_state = GST_STATE_PENDING (element);
528 if (new_state == GST_STATE_PAUSED) {
529 /*g_cond_signal (queue->not_full); */
530 /*g_cond_signal (queue->not_empty); */
532 else if (new_state == GST_STATE_READY) {
533 gst_queue_locked_flush (queue);
535 else if (new_state == GST_STATE_PLAYING) {
536 if (!GST_PAD_IS_CONNECTED (queue->sinkpad)) {
537 /* FIXME can this be? */
539 g_cond_signal (queue->not_empty);
540 g_mutex_unlock (queue->qlock);
542 return GST_STATE_FAILURE;
546 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
547 g_mutex_unlock (queue->qlock);
549 GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
555 gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
559 /* it's not null if we got it, but it might not be ours */
560 g_return_if_fail (GST_IS_QUEUE (object));
562 queue = GST_QUEUE (object);
566 queue->leaky = g_value_get_enum (value);
569 queue->size_buffers = g_value_get_int (value);
571 case ARG_MAY_DEADLOCK:
572 queue->may_deadlock = g_value_get_boolean (value);
575 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
581 gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
585 /* it's not null if we got it, but it might not be ours */
586 g_return_if_fail (GST_IS_QUEUE (object));
588 queue = GST_QUEUE (object);
592 g_value_set_enum (value, queue->leaky);
595 g_value_set_int (value, queue->level_buffers);
598 g_value_set_int (value, queue->size_buffers);
600 case ARG_MAY_DEADLOCK:
601 g_value_set_boolean (value, queue->may_deadlock);
604 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);