Landed the new improved capsnegotiation system.
[platform/upstream/gstreamer.git] / gst / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstqueue.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /* #define DEBUG_ENABLED */
24 /* #define STATUS_ENABLED */
25 #ifdef STATUS_ENABLED
26 #define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
27 #else
28 #define STATUS(A)
29 #endif
30
31 #include <pthread.h>
32
33 #include "config.h"
34 #include "gst_private.h"
35
36 #include "gstqueue.h"
37 #include "gstscheduler.h"
38 #include "gstevent.h"
39
40 GstElementDetails gst_queue_details = {
41   "Queue",
42   "Connection",
43   "Simple data queue",
44   VERSION,
45   "Erik Walthinsen <omega@cse.ogi.edu>",
46   "(C) 1999",
47 };
48
49
50 /* Queue signals and args */
51 enum {
52   LOW_WATERMARK,
53   HIGH_WATERMARK,
54   LAST_SIGNAL
55 };
56
57 enum {
58   ARG_0,
59   ARG_LEVEL_BUFFERS,
60   ARG_LEVEL_BYTES,
61   ARG_LEVEL_TIME,
62   ARG_SIZE_BUFFERS,
63   ARG_SIZE_BYTES,
64   ARG_SIZE_TIME,
65   ARG_LEAKY,
66   ARG_LEVEL,
67   ARG_MAX_LEVEL,
68   ARG_MAY_DEADLOCK,
69 };
70
71
72 static void                     gst_queue_class_init            (GstQueueClass *klass);
73 static void                     gst_queue_init                  (GstQueue *queue);
74 static void                     gst_queue_dispose               (GObject *object);
75
76 static void                     gst_queue_set_property          (GObject *object, guint prop_id, 
77                                                                  const GValue *value, GParamSpec *pspec);
78 static void                     gst_queue_get_property          (GObject *object, guint prop_id, 
79                                                                  GValue *value, GParamSpec *pspec);
80
81 static void                     gst_queue_chain                 (GstPad *pad, GstBuffer *buf);
82 static GstBuffer *              gst_queue_get                   (GstPad *pad);
83 static GstBufferPool*           gst_queue_get_bufferpool        (GstPad *pad);
84         
85 static void                     gst_queue_locked_flush                  (GstQueue *queue);
86 static void                     gst_queue_flush                 (GstQueue *queue);
87
88 static GstElementStateReturn    gst_queue_change_state          (GstElement *element);
89
90   
91 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
92 static GType
93 queue_leaky_get_type(void) {
94   static GType queue_leaky_type = 0;
95   static GEnumValue queue_leaky[] = {
96     { GST_QUEUE_NO_LEAK,                "0", "Not Leaky" },
97     { GST_QUEUE_LEAK_UPSTREAM,          "1", "Leaky on Upstream" },
98     { GST_QUEUE_LEAK_DOWNSTREAM,        "2", "Leaky on Downstream" },
99     { 0, NULL, NULL },
100   };
101   if (!queue_leaky_type) {
102     queue_leaky_type = g_enum_register_static("GstQueueLeaky", queue_leaky);
103   }
104   return queue_leaky_type;
105 }
106
107 static GstElementClass *parent_class = NULL;
108 /* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */
109
110 GType
111 gst_queue_get_type(void) 
112 {
113   static GType queue_type = 0;
114
115   if (!queue_type) {
116     static const GTypeInfo queue_info = {
117       sizeof(GstQueueClass),
118       NULL,
119       NULL,
120       (GClassInitFunc)gst_queue_class_init,
121       NULL,
122       NULL,
123       sizeof(GstQueue),
124       4,
125       (GInstanceInitFunc)gst_queue_init,
126       NULL
127     };
128     queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
129   }
130   return queue_type;
131 }
132
133 static void
134 gst_queue_class_init (GstQueueClass *klass)
135 {
136   GObjectClass *gobject_class;
137   GstElementClass *gstelement_class;
138
139   gobject_class = (GObjectClass*)klass;
140   gstelement_class = (GstElementClass*)klass;
141
142   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
143
144   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
145     g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
146                        GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
147   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
148     g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
149                       0, G_MAXINT, 0, G_PARAM_READABLE));
150   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
151     g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
152                       0, G_MAXINT, 100, G_PARAM_READWRITE));
153   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
154     g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
155                       TRUE, G_PARAM_READWRITE));
156
157   gobject_class->dispose                = GST_DEBUG_FUNCPTR (gst_queue_dispose);
158   gobject_class->set_property           = GST_DEBUG_FUNCPTR (gst_queue_set_property);
159   gobject_class->get_property           = GST_DEBUG_FUNCPTR (gst_queue_get_property);
160
161   gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
162 }
163
164 static GstPadConnectReturn
165 gst_queue_connect (GstPad *pad, GstCaps *caps)
166 {
167   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
168   GstPad *otherpad;
169
170   if (pad == queue->srcpad) 
171     otherpad = queue->sinkpad;
172   else
173     otherpad = queue->srcpad;
174
175   return gst_pad_proxy_connect (otherpad, caps);
176 }
177
178 static GstCaps*
179 gst_queue_getcaps (GstPad *pad, GstCaps *caps)
180 {
181   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
182   GstPad *otherpad;
183
184   if (pad == queue->srcpad) 
185     otherpad = queue->sinkpad;
186   else
187     otherpad = queue->srcpad;
188
189   return gst_pad_get_allowed_caps (otherpad);
190 }
191
192 static void
193 gst_queue_init (GstQueue *queue)
194 {
195   /* scheduling on this kind of element is, well, interesting */
196   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
197   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
198
199   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
200   gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain));
201   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
202   gst_pad_set_bufferpool_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_get_bufferpool));
203   gst_pad_set_connect_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
204   gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
205
206   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
207   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
208   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
209   gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
210   gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
211
212   queue->leaky = GST_QUEUE_NO_LEAK;
213   queue->queue = NULL;
214   queue->level_buffers = 0;
215   queue->level_bytes = 0;
216   queue->level_time = 0LL;
217   queue->size_buffers = 100;            /* 100 buffers */
218   queue->size_bytes = 100 * 1024;       /* 100KB */
219   queue->size_time = 1000000000LL;      /* 1sec */
220   queue->may_deadlock = TRUE;
221
222   queue->qlock = g_mutex_new ();
223   queue->reader = FALSE;
224   queue->writer = FALSE;
225   queue->not_empty = g_cond_new ();
226   queue->not_full = g_cond_new ();
227   GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
228 }
229
230 static void
231 gst_queue_dispose (GObject *object)
232 {
233   GstQueue *queue = GST_QUEUE (object);
234
235   g_mutex_free (queue->qlock);
236   g_cond_free (queue->not_empty);
237   g_cond_free (queue->not_full);
238
239   G_OBJECT_CLASS (parent_class)->dispose (object);
240 }
241
242 static GstBufferPool*
243 gst_queue_get_bufferpool (GstPad *pad)
244 {
245   GstQueue *queue;
246
247   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
248
249   return gst_pad_get_bufferpool (queue->srcpad);
250 }
251
252 static void
253 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
254 {
255   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
256
257   if (GST_IS_BUFFER (data)) {
258     gst_buffer_unref (GST_BUFFER (data));
259   }
260   else {
261     gst_event_free (GST_EVENT (data));
262   }
263 }
264
265 static void
266 gst_queue_locked_flush (GstQueue *queue)
267 {
268   g_list_foreach (queue->queue, gst_queue_cleanup_buffers,
269                   (gpointer) queue);
270   g_list_free (queue->queue);
271
272   queue->queue = NULL;
273   queue->level_buffers = 0;
274   queue->timeval = NULL;
275 }
276
277 static void
278 gst_queue_flush (GstQueue *queue)
279 {
280   g_mutex_lock (queue->qlock);
281   gst_queue_locked_flush (queue);
282   g_mutex_unlock (queue->qlock);
283 }
284
285
286 static void
287 gst_queue_chain (GstPad *pad, GstBuffer *buf)
288 {
289   GstQueue *queue;
290   gboolean reader;
291
292   g_return_if_fail (pad != NULL);
293   g_return_if_fail (GST_IS_PAD (pad));
294   g_return_if_fail (buf != NULL);
295
296   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
297
298 restart:
299   /* we have to lock the queue since we span threads */
300   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
301   g_mutex_lock (queue->qlock);
302   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
303
304   if (GST_IS_EVENT (buf)) {
305     switch (GST_EVENT_TYPE (buf)) {
306       case GST_EVENT_FLUSH:
307         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
308         gst_queue_locked_flush (queue);
309         break;
310       case GST_EVENT_EOS:
311         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", 
312                            GST_ELEMENT_NAME (queue), queue->level_buffers);
313         break;
314       default:
315         //gst_pad_event_default (pad, GST_EVENT (buf));
316         break;
317     }
318   }
319
320   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
321
322   if (queue->level_buffers == queue->size_buffers) {
323     /* if this is a leaky queue... */
324     if (queue->leaky) {
325       /* FIXME don't want to leak events! */
326       /* if we leak on the upstream side, drop the current buffer */
327       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
328         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
329         if (GST_IS_EVENT (buf))
330           fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
331               GST_ELEMENT_NAME(GST_ELEMENT(queue)),
332               GST_EVENT_TYPE(GST_EVENT(buf)));
333           GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
334         gst_buffer_unref(buf);
335         /* now we have to clean up and exit right away */
336         g_mutex_unlock (queue->qlock);
337         return;
338       }
339       /* otherwise we have to push a buffer off the other end */
340       else {
341         GList *front;
342         GstBuffer *leakbuf;
343         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
344         front = queue->queue;
345         leakbuf = (GstBuffer *)(front->data);
346         if (GST_IS_EVENT (leakbuf))
347           fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
348               GST_ELEMENT_NAME(GST_ELEMENT(queue)),
349               GST_EVENT_TYPE(GST_EVENT(leakbuf)));
350         queue->level_buffers--;
351         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
352         gst_buffer_unref(leakbuf);
353         queue->queue = g_list_remove_link (queue->queue, front);
354         g_list_free (front);
355       }
356     }
357
358     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
359         queue->level_buffers, queue->size_buffers);
360     while (queue->level_buffers == queue->size_buffers) {
361       /* if there's a pending state change for this queue or its manager, switch */
362       /* back to iterator so bottom half of state change executes */
363       while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
364         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
365         g_mutex_unlock (queue->qlock);
366         if (gst_element_interrupt (GST_ELEMENT (queue)))
367           return;
368         goto restart;
369       }
370       if (GST_STATE (queue) != GST_STATE_PLAYING) {
371         /* this means the other end is shut down */
372         /* try to signal to resolve the error */
373         if (!queue->may_deadlock) {
374           if (GST_IS_BUFFER (buf)) gst_buffer_unref (buf);
375           else gst_event_free (GST_EVENT (buf));
376           g_mutex_unlock (queue->qlock);
377           gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
378           return;
379         }
380         else {
381           gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
382         }
383       }
384
385       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
386       if (queue->writer)
387         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
388       queue->writer = TRUE;
389       g_cond_wait (queue->not_full, queue->qlock);
390       queue->writer = FALSE;
391       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
392     }
393     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
394         queue->level_buffers, queue->size_buffers);
395   }
396
397   /* put the buffer on the tail of the list */
398   queue->queue = g_list_append (queue->queue, buf);
399   queue->level_buffers++;
400   queue->level_bytes += GST_BUFFER_SIZE(buf);
401
402   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
403       GST_DEBUG_PAD_NAME(pad),
404       queue->level_buffers, queue->size_buffers);
405
406   /* this assertion _has_ to hold */
407   /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
408
409   /* reader waiting on an empty queue */
410   reader = queue->reader;
411
412   g_mutex_unlock (queue->qlock);
413
414   if (reader)
415   {
416     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
417     g_cond_signal (queue->not_empty);
418   }
419 }
420
421 static GstBuffer *
422 gst_queue_get (GstPad *pad)
423 {
424   GstQueue *queue;
425   GstBuffer *buf = NULL;
426   GList *front;
427   gboolean writer;
428
429   g_assert(pad != NULL);
430   g_assert(GST_IS_PAD(pad));
431   g_return_val_if_fail (pad != NULL, NULL);
432   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
433
434   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
435
436 restart:
437   /* have to lock for thread-safety */
438   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
439   g_mutex_lock (queue->qlock);
440   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
441
442   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
443   while (queue->level_buffers == 0) {
444     /* if there's a pending state change for this queue or its manager, switch
445      * back to iterator so bottom half of state change executes
446      */ 
447     while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
448       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
449       g_mutex_unlock (queue->qlock);
450       if (gst_element_interrupt (GST_ELEMENT (queue)))
451         return NULL;
452       goto restart;
453     }
454     if (GST_STATE (queue) != GST_STATE_PLAYING) {
455       /* this means the other end is shut down */
456       if (!queue->may_deadlock) {
457         g_mutex_unlock (queue->qlock);
458         gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
459         goto restart;
460       }
461       else {
462         gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
463       }
464     }
465
466     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
467     if (queue->reader)
468       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
469     queue->reader = TRUE;
470     g_cond_wait (queue->not_empty, queue->qlock);
471     queue->reader = FALSE;
472     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
473   }
474   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
475
476   front = queue->queue;
477   buf = (GstBuffer *)(front->data);
478   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
479   queue->queue = g_list_remove_link (queue->queue, front);
480   g_list_free (front);
481
482   queue->level_buffers--;
483   queue->level_bytes -= GST_BUFFER_SIZE(buf);
484
485   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
486       GST_DEBUG_PAD_NAME(pad),
487       queue->level_buffers, queue->size_buffers);
488
489   /* this assertion _has_ to hold */
490   /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
491
492   /* writer waiting on a full queue */
493   writer = queue->writer;
494
495   g_mutex_unlock (queue->qlock);
496
497   if (writer)
498   {
499     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
500     g_cond_signal (queue->not_full);
501   }
502
503   /* FIXME where should this be? locked? */
504   if (GST_IS_EVENT(buf)) {
505     GstEvent *event = GST_EVENT(buf);
506     switch (GST_EVENT_TYPE(event)) {
507       case GST_EVENT_EOS:
508         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
509         gst_element_set_eos (GST_ELEMENT (queue));
510         break;
511       default:
512         break;
513     }
514   }
515
516   return buf;
517 }
518
519 static GstElementStateReturn
520 gst_queue_change_state (GstElement *element)
521 {
522   GstQueue *queue;
523   GstElementStateReturn ret;
524   GstElementState new_state;
525   g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
526
527   queue = GST_QUEUE (element);
528
529   GST_DEBUG_ENTER("('%s')", GST_ELEMENT_NAME (element));
530
531   /* lock the queue so another thread (not in sync with this thread's state)
532    * can't call this queue's _get (or whatever)
533    */
534   g_mutex_lock (queue->qlock);
535
536   new_state = GST_STATE_PENDING (element);
537
538   if (new_state == GST_STATE_PAUSED) {
539     //g_cond_signal (queue->not_full);
540     //g_cond_signal (queue->not_empty);
541   }
542   else if (new_state == GST_STATE_READY) {
543     gst_queue_locked_flush (queue);
544   }
545   else if (new_state == GST_STATE_PLAYING) {
546     if (!GST_PAD_IS_CONNECTED (queue->sinkpad)) {
547       /* FIXME can this be? */
548       if (queue->reader)
549         g_cond_signal (queue->not_empty);
550       g_mutex_unlock (queue->qlock);
551
552       return GST_STATE_FAILURE;
553     }
554   }
555
556   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
557   g_mutex_unlock (queue->qlock);
558
559   GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
560   return ret;
561 }
562
563
564 static void
565 gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
566 {
567   GstQueue *queue;
568
569   /* it's not null if we got it, but it might not be ours */
570   g_return_if_fail (GST_IS_QUEUE (object));
571
572   queue = GST_QUEUE (object);
573
574   switch (prop_id) {
575     case ARG_LEAKY:
576       queue->leaky = g_value_get_enum (value);
577       break;
578     case ARG_MAX_LEVEL:
579       queue->size_buffers = g_value_get_int (value);
580       break;
581     case ARG_MAY_DEADLOCK:
582       queue->may_deadlock = g_value_get_boolean (value);
583       break;
584     default:
585       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
586       break;
587   }
588 }
589
590 static void
591 gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
592 {
593   GstQueue *queue;
594
595   /* it's not null if we got it, but it might not be ours */
596   g_return_if_fail (GST_IS_QUEUE (object));
597
598   queue = GST_QUEUE (object);
599
600   switch (prop_id) {
601     case ARG_LEAKY:
602       g_value_set_enum (value, queue->leaky);
603       break;
604     case ARG_LEVEL:
605       g_value_set_int (value, queue->level_buffers);
606       break;
607     case ARG_MAX_LEVEL:
608       g_value_set_int (value, queue->size_buffers);
609       break;
610     case ARG_MAY_DEADLOCK:
611       g_value_set_boolean (value, queue->may_deadlock);
612       break;
613     default:
614       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
615       break;
616   }
617 }