A discont event should not flush the queue
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstqueue.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /* #define DEBUG_ENABLED */
24 /* #define STATUS_ENABLED */
25
26 #ifdef STATUS_ENABLED
27 #define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
28 #else
29 #define STATUS(A)
30 #endif
31
32 #include <pthread.h>
33
34 #include "config.h"
35 #include "gst_private.h"
36
37 #include "gstqueue.h"
38 #include "gstscheduler.h"
39 #include "gstevent.h"
40 #include "gstlog.h"
41
42 GstElementDetails gst_queue_details = {
43   "Queue",
44   "Generic",
45   "LGPL",
46   "Simple data queue",
47   VERSION,
48   "Erik Walthinsen <omega@cse.ogi.edu>",
49   "(C) 1999",
50 };
51
52
53 /* Queue signals and args */
54 enum {
55   LOW_WATERMARK,
56   HIGH_WATERMARK,
57   LAST_SIGNAL
58 };
59
60 enum {
61   ARG_0,
62   ARG_LEVEL_BUFFERS,
63   ARG_LEVEL_BYTES,
64   ARG_LEVEL_TIME,
65   ARG_SIZE_BUFFERS,
66   ARG_SIZE_BYTES,
67   ARG_SIZE_TIME,
68   ARG_LEAKY,
69   ARG_LEVEL,
70   ARG_MAX_LEVEL,
71   ARG_MAY_DEADLOCK,
72   ARG_BLOCK_TIMEOUT,
73 };
74
75
76 static void                     gst_queue_class_init            (GstQueueClass *klass);
77 static void                     gst_queue_init                  (GstQueue *queue);
78 static void                     gst_queue_dispose               (GObject *object);
79
80 static void                     gst_queue_set_property          (GObject *object, guint prop_id, 
81                                                                  const GValue *value, GParamSpec *pspec);
82 static void                     gst_queue_get_property          (GObject *object, guint prop_id, 
83                                                                  GValue *value, GParamSpec *pspec);
84
85 static void                     gst_queue_chain                 (GstPad *pad, GstBuffer *buf);
86 static GstBuffer *              gst_queue_get                   (GstPad *pad);
87 static GstBufferPool*           gst_queue_get_bufferpool        (GstPad *pad);
88         
89 static gboolean                 gst_queue_handle_src_event      (GstPad *pad, GstEvent *event);
90
91
92 static void                     gst_queue_locked_flush          (GstQueue *queue);
93
94 static GstElementStateReturn    gst_queue_change_state          (GstElement *element);
95 static gboolean                 gst_queue_release_locks         (GstElement *element);
96
97   
98 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
99 static GType
100 queue_leaky_get_type(void) {
101   static GType queue_leaky_type = 0;
102   static GEnumValue queue_leaky[] = {
103     { GST_QUEUE_NO_LEAK,                "0", "Not Leaky" },
104     { GST_QUEUE_LEAK_UPSTREAM,          "1", "Leaky on Upstream" },
105     { GST_QUEUE_LEAK_DOWNSTREAM,        "2", "Leaky on Downstream" },
106     { 0, NULL, NULL },
107   };
108   if (!queue_leaky_type) {
109     queue_leaky_type = g_enum_register_static("GstQueueLeaky", queue_leaky);
110   }
111   return queue_leaky_type;
112 }
113
114 static GstElementClass *parent_class = NULL;
115 /* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */
116
117 GType
118 gst_queue_get_type(void) 
119 {
120   static GType queue_type = 0;
121
122   if (!queue_type) {
123     static const GTypeInfo queue_info = {
124       sizeof(GstQueueClass),
125       NULL,
126       NULL,
127       (GClassInitFunc)gst_queue_class_init,
128       NULL,
129       NULL,
130       sizeof(GstQueue),
131       4,
132       (GInstanceInitFunc)gst_queue_init,
133       NULL
134     };
135     queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
136   }
137   return queue_type;
138 }
139
140 static void
141 gst_queue_class_init (GstQueueClass *klass)
142 {
143   GObjectClass *gobject_class;
144   GstElementClass *gstelement_class;
145
146   gobject_class = (GObjectClass*)klass;
147   gstelement_class = (GstElementClass*)klass;
148
149   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
150
151   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
152     g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
153                        GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
154   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
155     g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
156                       0, G_MAXINT, 0, G_PARAM_READABLE));
157   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
158     g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
159                       0, G_MAXINT, 100, G_PARAM_READWRITE));
160   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
161     g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
162                       TRUE, G_PARAM_READWRITE));
163   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BLOCK_TIMEOUT,
164     g_param_spec_int ("block_timeout", "Timeout for Block", 
165                       "Microseconds until blocked queue times out and returns filler event. "
166                       "Value of -1 disables timeout",
167                       -1, G_MAXINT, -1, G_PARAM_READWRITE));
168
169   gobject_class->dispose                = GST_DEBUG_FUNCPTR (gst_queue_dispose);
170   gobject_class->set_property           = GST_DEBUG_FUNCPTR (gst_queue_set_property);
171   gobject_class->get_property           = GST_DEBUG_FUNCPTR (gst_queue_get_property);
172
173   gstelement_class->change_state  = GST_DEBUG_FUNCPTR(gst_queue_change_state);
174   gstelement_class->release_locks = GST_DEBUG_FUNCPTR(gst_queue_release_locks);
175 }
176
177 static GstPadConnectReturn
178 gst_queue_connect (GstPad *pad, GstCaps *caps)
179 {
180   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
181   GstPad *otherpad;
182
183   if (pad == queue->srcpad) 
184     otherpad = queue->sinkpad;
185   else
186     otherpad = queue->srcpad;
187
188   return gst_pad_proxy_connect (otherpad, caps);
189 }
190
191 static GstCaps*
192 gst_queue_getcaps (GstPad *pad, GstCaps *caps)
193 {
194   GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
195   GstPad *otherpad;
196
197   if (pad == queue->srcpad) 
198     otherpad = queue->sinkpad;
199   else
200     otherpad = queue->srcpad;
201
202   return gst_pad_get_allowed_caps (otherpad);
203 }
204
205 static void
206 gst_queue_init (GstQueue *queue)
207 {
208   /* scheduling on this kind of element is, well, interesting */
209   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
210   GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
211
212   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
213   gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain));
214   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
215   gst_pad_set_bufferpool_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_get_bufferpool));
216   gst_pad_set_connect_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
217   gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
218
219   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
220   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
221   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
222   gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
223   gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
224   gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
225
226   queue->leaky = GST_QUEUE_NO_LEAK;
227   queue->queue = NULL;
228   queue->level_buffers = 0;
229   queue->level_bytes = 0;
230   queue->level_time = 0LL;
231   queue->size_buffers = 100;            /* 100 buffers */
232   queue->size_bytes = 100 * 1024;       /* 100KB */
233   queue->size_time = 1000000000LL;      /* 1sec */
234   queue->may_deadlock = TRUE;
235   queue->block_timeout = -1;
236   queue->interrupt = FALSE;
237   queue->flush = FALSE;
238
239   queue->qlock = g_mutex_new ();
240   queue->reader = FALSE;
241   queue->writer = FALSE;
242   queue->not_empty = g_cond_new ();
243   queue->not_full = g_cond_new ();
244   queue->events = g_async_queue_new();
245   queue->queue = g_queue_new ();
246   GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions");
247 }
248
249 static void
250 gst_queue_dispose (GObject *object)
251 {
252   GstQueue *queue = GST_QUEUE (object);
253
254   g_mutex_free (queue->qlock);
255   g_cond_free (queue->not_empty);
256   g_cond_free (queue->not_full);
257   gst_queue_locked_flush (queue);
258   g_queue_free (queue->queue);
259
260   g_async_queue_unref(queue->events);
261
262   G_OBJECT_CLASS (parent_class)->dispose (object);
263 }
264
265 static GstBufferPool*
266 gst_queue_get_bufferpool (GstPad *pad)
267 {
268   GstQueue *queue;
269
270   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
271
272   return gst_pad_get_bufferpool (queue->srcpad);
273 }
274
275 static void
276 gst_queue_cleanup_data (gpointer data, const gpointer user_data)
277 {
278   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data);
279
280   gst_data_unref (GST_DATA (data));
281 }
282
283 static void
284 gst_queue_locked_flush (GstQueue *queue)
285 {
286   gpointer data;
287   
288   while ((data = g_queue_pop_head (queue->queue))) {
289     gst_queue_cleanup_data (data, (gpointer) queue);
290   }
291   queue->timeval = NULL;
292   queue->level_buffers = 0;
293   queue->level_bytes = 0;
294   queue->level_time = 0LL;
295   /* make sure any pending buffers to be added are flushed too */
296   queue->flush = TRUE;
297 }
298
299 static void
300 gst_queue_chain (GstPad *pad, GstBuffer *buf)
301 {
302   GstQueue *queue;
303   gboolean reader;
304
305   g_return_if_fail (pad != NULL);
306   g_return_if_fail (GST_IS_PAD (pad));
307   g_return_if_fail (buf != NULL);
308
309   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
310   
311   /* check for events to send upstream */
312   g_async_queue_lock(queue->events);
313   while (g_async_queue_length_unlocked(queue->events) > 0){
314     GstEvent *event = (GstEvent*)g_async_queue_pop_unlocked(queue->events);
315     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "sending event upstream\n");
316     gst_pad_event_default (pad, event);
317     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "event sent\n");
318   }
319   g_async_queue_unlock(queue->events);
320
321 restart:
322   /* we have to lock the queue since we span threads */
323   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
324   g_mutex_lock (queue->qlock);
325   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld", pthread_self ());
326
327   /* assume don't need to flush this buffer when the queue is filled */
328   queue->flush = FALSE;
329
330   if (GST_IS_EVENT (buf)) {
331     switch (GST_EVENT_TYPE (buf)) {
332       case GST_EVENT_FLUSH:
333         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
334         gst_queue_locked_flush (queue);
335         break;
336       case GST_EVENT_EOS:
337         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", 
338                            GST_ELEMENT_NAME (queue), queue->level_buffers);
339         break;
340       default:
341         /* we put the event in the queue, we don't have to act ourselves */
342         break;
343     }
344   }
345
346   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d",buf,GST_BUFFER_SIZE(buf));
347
348   if (queue->level_buffers == queue->size_buffers) {
349     /* if this is a leaky queue... */
350     if (queue->leaky) {
351       /* FIXME don't want to leak events! */
352       /* if we leak on the upstream side, drop the current buffer */
353       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
354         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
355         if (GST_IS_EVENT (buf))
356           fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
357               GST_ELEMENT_NAME(GST_ELEMENT(queue)),
358               GST_EVENT_TYPE(GST_EVENT(buf)));
359           GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
360         gst_buffer_unref(buf);
361         /* now we have to clean up and exit right away */
362         g_mutex_unlock (queue->qlock);
363         return;
364       }
365       /* otherwise we have to push a buffer off the other end */
366       else {
367         gpointer front;
368         GstBuffer *leakbuf;
369
370         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
371
372         front = g_queue_pop_head (queue->queue);
373         leakbuf = (GstBuffer *)(front);
374
375         if (GST_IS_EVENT (leakbuf)) {
376           fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
377               GST_ELEMENT_NAME(GST_ELEMENT(queue)),
378               GST_EVENT_TYPE(GST_EVENT(leakbuf)));
379         }
380         queue->level_buffers--;
381         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
382         gst_data_unref (GST_DATA (leakbuf));
383       }
384     }
385
386     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d",
387                         queue->level_buffers, queue->size_buffers);
388
389     while (queue->level_buffers == queue->size_buffers) {
390       /* if there's a pending state change for this queue or its manager, switch */
391       /* back to iterator so bottom half of state change executes */
392       if (queue->interrupt) {
393         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
394         g_mutex_unlock (queue->qlock);
395         if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue)))
396           return;
397         /* if we got here bacause we were unlocked after a flush, we don't need
398          * to add the buffer to the queue again */
399         if (queue->flush) {
400           GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush");
401           return;
402         }
403         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding pending buffer after interrupt");
404         goto restart;
405       }
406       if (GST_STATE (queue) != GST_STATE_PLAYING) {
407         /* this means the other end is shut down */
408         /* try to signal to resolve the error */
409         if (!queue->may_deadlock) {
410           gst_data_unref (GST_DATA (buf));
411           g_mutex_unlock (queue->qlock);
412           gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
413           return;
414         }
415         else {
416           g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
417         }
418       }
419
420       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d", 
421                       queue->level_buffers, queue->size_buffers);
422       if (queue->writer)
423         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!");
424       queue->writer = TRUE;
425       g_cond_wait (queue->not_full, queue->qlock);
426       queue->writer = FALSE;
427       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
428     }
429     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
430         queue->level_buffers, queue->size_buffers);
431   }
432
433   /* put the buffer on the tail of the list */
434   g_queue_push_tail (queue->queue, buf);
435
436   queue->level_buffers++;
437   queue->level_bytes += GST_BUFFER_SIZE(buf);
438
439   /* this assertion _has_ to hold */
440   g_assert (queue->queue->length == queue->level_buffers);
441
442   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d",
443       GST_DEBUG_PAD_NAME(pad),
444       queue->level_buffers, queue->size_buffers);
445
446   /* reader waiting on an empty queue */
447   reader = queue->reader;
448
449   g_mutex_unlock (queue->qlock);
450
451   if (reader)
452   {
453     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
454     g_cond_signal (queue->not_empty);
455   }
456 }
457
458 static GstBuffer *
459 gst_queue_get (GstPad *pad)
460 {
461   GstQueue *queue;
462   GstBuffer *buf = NULL;
463   gpointer front;
464   gboolean writer;
465
466   g_assert(pad != NULL);
467   g_assert(GST_IS_PAD(pad));
468   g_return_val_if_fail (pad != NULL, NULL);
469   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
470
471   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
472
473 restart:
474   /* have to lock for thread-safety */
475   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
476   g_mutex_lock (queue->qlock);
477   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p", pthread_self (), queue->not_empty);
478
479   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
480   while (queue->level_buffers == 0) {
481     /* if there's a pending state change for this queue or its manager, switch
482      * back to iterator so bottom half of state change executes
483      */ 
484     //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
485     if (queue->interrupt) {
486       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
487       g_mutex_unlock (queue->qlock);
488       if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
489         return NULL;
490       goto restart;
491     }
492     if (GST_STATE (queue) != GST_STATE_PLAYING) {
493       /* this means the other end is shut down */
494       if (!queue->may_deadlock) {
495         g_mutex_unlock (queue->qlock);
496         gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
497         goto restart;
498       }
499       else {
500         g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
501       }
502     }
503
504     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
505     if (queue->reader)
506       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!");
507     queue->reader = TRUE;
508     
509     //if (queue->block_timeout > -1){
510     if (FALSE) {
511       GTimeVal timeout;
512       g_get_current_time(&timeout);
513       g_time_val_add(&timeout, queue->block_timeout);
514       if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
515         g_mutex_unlock (queue->qlock);
516         g_warning ("filler");
517         return GST_BUFFER(gst_event_new_filler());
518       }
519     }
520     else {
521       g_cond_wait (queue->not_empty, queue->qlock);
522     }
523     queue->reader = FALSE;
524     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
525   }
526   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
527
528   front = g_queue_pop_head (queue->queue);
529   buf = (GstBuffer *)(front);
530   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue", buf);
531
532   queue->level_buffers--;
533   queue->level_bytes -= GST_BUFFER_SIZE(buf);
534
535   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d",
536       GST_DEBUG_PAD_NAME(pad),
537       queue->level_buffers, queue->size_buffers);
538
539   /* this assertion _has_ to hold */
540   g_assert (queue->queue->length == queue->level_buffers);
541
542   /* writer waiting on a full queue */
543   writer = queue->writer;
544
545   g_mutex_unlock (queue->qlock);
546
547   if (writer)
548   {
549     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full");
550     g_cond_signal (queue->not_full);
551   }
552
553   /* FIXME where should this be? locked? */
554   if (GST_IS_EVENT(buf)) {
555     GstEvent *event = GST_EVENT(buf);
556     switch (GST_EVENT_TYPE(event)) {
557       case GST_EVENT_EOS:
558         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
559         gst_element_set_eos (GST_ELEMENT (queue));
560         break;
561       default:
562         break;
563     }
564   }
565
566   return buf;
567 }
568
569
570 static gboolean
571 gst_queue_handle_src_event (GstPad *pad, GstEvent *event)
572 {
573   GstQueue *queue;
574   gboolean res;
575
576   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
577
578   g_mutex_lock (queue->qlock);
579
580   if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
581     /* push the event to the queue for upstream consumption */
582     g_async_queue_push(queue->events, event);
583     g_mutex_unlock (queue->qlock);
584     g_warning ("FIXME: sending event in a running queue");
585     /* FIXME wait for delivery of the event here, then return the result
586      * instead of FALSE */
587     return FALSE;
588   }
589
590   res = gst_pad_event_default (pad, event); 
591   switch (GST_EVENT_TYPE (event)) {
592     case GST_EVENT_FLUSH:
593       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
594       gst_queue_locked_flush (queue);
595       break;
596     case GST_EVENT_SEEK:
597       if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
598         gst_queue_locked_flush (queue);
599       }
600     default:
601       break;
602   }
603   g_mutex_unlock (queue->qlock);
604
605   /* we have to claim success, but we don't really know */
606   return TRUE;
607 }
608
609 static gboolean
610 gst_queue_release_locks (GstElement *element)
611 {
612   GstQueue *queue;
613
614   queue = GST_QUEUE (element);
615
616   g_mutex_lock (queue->qlock);
617   queue->interrupt = TRUE;
618   g_cond_signal (queue->not_full);
619   g_cond_signal (queue->not_empty); 
620   g_mutex_unlock (queue->qlock);
621
622   return TRUE;
623 }
624
625 static GstElementStateReturn
626 gst_queue_change_state (GstElement *element)
627 {
628   GstQueue *queue;
629   GstElementStateReturn ret;
630   GstElementState new_state;
631   g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
632
633   queue = GST_QUEUE (element);
634
635   GST_DEBUG_ENTER("('%s')", GST_ELEMENT_NAME (element));
636
637   /* lock the queue so another thread (not in sync with this thread's state)
638    * can't call this queue's _get (or whatever)
639    */
640   g_mutex_lock (queue->qlock);
641
642   new_state = GST_STATE_PENDING (element);
643
644   if (new_state == GST_STATE_READY) {
645     gst_queue_locked_flush (queue);
646   }
647   else if (new_state == GST_STATE_PLAYING) {
648     if (!GST_PAD_IS_USABLE (queue->sinkpad)) {
649       GST_DEBUG_ELEMENT (GST_CAT_STATES, queue, "queue %s is not connected", GST_ELEMENT_NAME (queue));
650       /* FIXME can this be? */
651       if (queue->reader)
652         g_cond_signal (queue->not_empty);
653       g_mutex_unlock (queue->qlock);
654
655       return GST_STATE_FAILURE;
656     }
657     queue->interrupt = FALSE;
658   }
659
660   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
661   g_mutex_unlock (queue->qlock);
662
663   GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
664   return ret;
665 }
666
667
668 static void
669 gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
670 {
671   GstQueue *queue;
672
673   /* it's not null if we got it, but it might not be ours */
674   g_return_if_fail (GST_IS_QUEUE (object));
675
676   queue = GST_QUEUE (object);
677
678   switch (prop_id) {
679     case ARG_LEAKY:
680       queue->leaky = g_value_get_enum (value);
681       break;
682     case ARG_MAX_LEVEL:
683       queue->size_buffers = g_value_get_int (value);
684       break;
685     case ARG_MAY_DEADLOCK:
686       queue->may_deadlock = g_value_get_boolean (value);
687       break;
688     case ARG_BLOCK_TIMEOUT:
689       queue->block_timeout = g_value_get_int (value);
690       break;
691     default:
692       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
693       break;
694   }
695 }
696
697 static void
698 gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
699 {
700   GstQueue *queue;
701
702   /* it's not null if we got it, but it might not be ours */
703   g_return_if_fail (GST_IS_QUEUE (object));
704
705   queue = GST_QUEUE (object);
706
707   switch (prop_id) {
708     case ARG_LEAKY:
709       g_value_set_enum (value, queue->leaky);
710       break;
711     case ARG_LEVEL:
712       g_value_set_int (value, queue->level_buffers);
713       break;
714     case ARG_MAX_LEVEL:
715       g_value_set_int (value, queue->size_buffers);
716       break;
717     case ARG_MAY_DEADLOCK:
718       g_value_set_boolean (value, queue->may_deadlock);
719       break;
720     case ARG_BLOCK_TIMEOUT:
721       g_value_set_int (value, queue->block_timeout);
722       break;
723     default:
724       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
725       break;
726   }
727 }