More EOS changes.
authorWim Taymans <wim.taymans@gmail.com>
Sat, 20 Jan 2001 17:59:25 +0000 (17:59 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Sat, 20 Jan 2001 17:59:25 +0000 (17:59 +0000)
Original commit message from CVS:
More EOS changes.
When a bin2 is found inside a bin1, we add the bin2 to the EOS providers
of the bin1. When there is nothing more to schedule in bin1 and bin2 has
fired EOS, bin1 is in EOS.
The queue overrides the EOS notification and calls EOS on the src pad
when the queue is empty and the sink pad is in EOS.

gst/gstbin.c
gst/gstbin.h
gst/gstelement.c
gst/gstqueue.c
gst/gstscheduler.c
gst/gstthread.c
plugins/elements/gstqueue.c

index 0e74010..bf0c9e1 100644 (file)
@@ -49,7 +49,7 @@ static gboolean                       gst_bin_change_state_type       (GstBin *bin,
                                                                 GtkType type);
 
 static void                    gst_bin_create_plan_func        (GstBin *bin);
-static void                    gst_bin_iterate_func            (GstBin *bin);
+static gboolean                        gst_bin_iterate_func            (GstBin *bin);
 
 static xmlNodePtr              gst_bin_save_thyself            (GstElement *element, xmlNodePtr parent);
 static void                    gst_bin_restore_thyself         (GstElement *element, xmlNodePtr parent,
@@ -134,6 +134,7 @@ gst_bin_init (GstBin *bin)
   bin->numchildren = 0;
   bin->children = NULL;
   bin->eos_providers = NULL;
+  bin->num_eos_providers = 0;
   bin->chains = NULL;
 // FIXME temporary testing measure
 //  bin->use_cothreads = TRUE;
@@ -508,19 +509,22 @@ gst_bin_use_cothreads (GstBin *bin,
  *
  * Iterates over the elements in this bin.
  */
-void
+gboolean
 gst_bin_iterate (GstBin *bin)
 {
   GstBinClass *oclass;
+  gboolean eos = TRUE;
 
   GST_DEBUG_ENTER("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
 
   oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass);
 
   if (oclass->iterate)
-    (oclass->iterate) (bin);
+    eos = (oclass->iterate) (bin);
 
   GST_DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
+
+  return eos;
 }
 
 /**
@@ -540,6 +544,18 @@ gst_bin_create_plan (GstBin *bin)
     (oclass->create_plan) (bin);
 }
 
+/* out internal element fired EOS, we decrement the number of pending EOS childs */
+static void
+gst_bin_received_eos (GstElement *element, GstBin *bin)
+{
+  GST_INFO_ELEMENT (GST_CAT_PLANNING, bin, "child %s fired eos, pending %d\n", gst_element_get_name (element),
+                 bin->num_eos_providers);
+
+  if (bin->num_eos_providers) {
+    bin->num_eos_providers--;
+  }
+}
+
 /**
  * gst_bin_schedule:
  * @bin: #GstBin to schedule
@@ -684,6 +700,11 @@ gst_bin_create_plan_func (GstBin *bin)
         if (GST_IS_BIN (element)) {
           GST_DEBUG (0,"flattened recurse into \"%s\"\n",elementname);
           pending = g_slist_prepend (pending, element);
+
+         gtk_signal_connect (GTK_OBJECT (element), "eos", gst_bin_received_eos, bin);
+         bin->eos_providers = g_list_prepend (bin->eos_providers, element);
+         bin->num_eos_providers++;
+
         // otherwise add it to the list of elements
         } else {
           GST_DEBUG (0,"found element \"%s\" that I manage\n",elementname);
@@ -701,7 +722,7 @@ gst_bin_create_plan_func (GstBin *bin)
   GST_DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
 }
 
-static void
+static gboolean
 gst_bin_iterate_func (GstBin *bin)
 {
   GList *chains;
@@ -712,12 +733,13 @@ gst_bin_iterate_func (GstBin *bin)
   GstPad *pad;
   GstBuffer *buf = NULL;
   gint num_scheduled = 0;
+  gboolean eos = FALSE;
 
   GST_DEBUG_ENTER("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin)));
 
-  g_return_if_fail (bin != NULL);
-  g_return_if_fail (GST_IS_BIN (bin));
-  g_return_if_fail (GST_STATE (bin) == GST_STATE_PLAYING);
+  g_return_val_if_fail (bin != NULL, TRUE);
+  g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
+  g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
 
   // step through all the chains
   chains = bin->chains;
@@ -773,10 +795,18 @@ gst_bin_iterate_func (GstBin *bin)
     num_scheduled++;
   }
 
-  if (!num_scheduled) {
+  /*
+  g_print ("bin \"%s\", eos providers:%d, scheduled: %d\n",
+                 gst_element_get_name (GST_ELEMENT (bin)),
+                 bin->num_eos_providers, num_scheduled);
+                 */
+
+  if (!num_scheduled && !bin->num_eos_providers) {
     gst_element_signal_eos (GST_ELEMENT (bin));
+    eos = TRUE;
   }
 
   GST_DEBUG_LEAVE("(%s)", gst_element_get_name (GST_ELEMENT (bin)));
+  return !eos;
 }
 
index 077a1a3..4f238ec 100644 (file)
@@ -97,7 +97,7 @@ struct _GstBinClass {
   void         (*create_plan)          (GstBin *bin);
   void         (*schedule)             (GstBin *bin);
   /* run a full iteration of operation */
-  void         (*iterate)              (GstBin *bin);
+  gboolean     (*iterate)              (GstBin *bin);
 };
 
 struct __GstBinChain {
@@ -132,7 +132,7 @@ gboolean    gst_bin_set_state_type          (GstBin *bin,
                                                 GstElementState state,
                                                 GtkType type);
 
-void           gst_bin_iterate                 (GstBin *bin);
+gboolean       gst_bin_iterate                 (GstBin *bin);
 
 /* hack FIXME */
 void           gst_bin_use_cothreads           (GstBin *bin,
index 70fc8a0..cd8bdfc 100644 (file)
@@ -447,11 +447,13 @@ gst_element_connect (GstElement *src, const gchar *srcpadname,
   destparent = gst_object_get_parent (GST_OBJECT (dest));
 
   /* have to make sure that they have the same parents... */
+  /*
   if (srcparent != destparent) {
     GST_ERROR_OBJECT(srcparent,destparent,"%s and %s have different parents",
                  gst_element_get_name(src),gst_element_get_name(dest));
     return;
   }
+  */
 
   /* we're satisified they can be connected, let's do it */
   gst_pad_connect(srcpad,destpad);
index f5f1b86..6e44fd8 100644 (file)
@@ -2,7 +2,7 @@
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
  *                    2000 Wim Taymans <wtay@chello.be>
  *
- * gstqueue.c: 
+ * gstqueue.c:
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -25,7 +25,7 @@
 #ifdef STATUS_ENABLED
 #define STATUS(A) GST_DEBUG(0,A, gst_element_get_name(GST_ELEMENT(queue)))
 #else
-#define STATUS(A) 
+#define STATUS(A)
 #endif
 
 #include <pthread.h>
@@ -60,18 +60,19 @@ enum {
 };
 
 
-static void                    gst_queue_class_init    (GstQueueClass *klass);
-static void                    gst_queue_init          (GstQueue *queue);
+static void                    gst_queue_class_init    (GstQueueClass *klass);
+static void                    gst_queue_init          (GstQueue *queue);
 
-static void                    gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
-static void                    gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
+static void                    gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
+static void                    gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
 
-static void                    gst_queue_chain         (GstPad *pad, GstBuffer *buf);
+static gboolean                        gst_queue_handle_eos    (GstPad *pad);
+static void                    gst_queue_chain         (GstPad *pad, GstBuffer *buf);
 static GstBuffer *             gst_queue_get           (GstPad *pad);
 
-static void                    gst_queue_flush         (GstQueue *queue);
+static void                    gst_queue_flush         (GstQueue *queue);
 
-static GstElementStateReturn   gst_queue_change_state  (GstElement *element);
+static GstElementStateReturn   gst_queue_change_state  (GstElement *element);
 
 
 static GstElementClass *parent_class = NULL;
@@ -97,8 +98,8 @@ gst_queue_get_type(void) {
   return queue_type;
 }
 
-static void 
-gst_queue_class_init (GstQueueClass *klass) 
+static void
+gst_queue_class_init (GstQueueClass *klass)
 {
   GtkObjectClass *gtkobject_class;
   GstElementClass *gstelement_class;
@@ -117,14 +118,14 @@ gst_queue_class_init (GstQueueClass *klass)
   gtk_object_add_arg_type ("GstQueue::timeout", GTK_TYPE_INT,
                            GTK_ARG_READWRITE, ARG_TIMEOUT);
 
-  gtkobject_class->set_arg = gst_queue_set_arg;  
+  gtkobject_class->set_arg = gst_queue_set_arg;
   gtkobject_class->get_arg = gst_queue_get_arg;
 
   gstelement_class->change_state = gst_queue_change_state;
 }
 
-static void 
-gst_queue_init (GstQueue *queue) 
+static void
+gst_queue_init (GstQueue *queue)
 {
   // scheduling on this kind of element is, well, interesting
   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
@@ -132,6 +133,7 @@ gst_queue_init (GstQueue *queue)
   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
   gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
+  gst_pad_set_eos_function (queue->sinkpad, gst_queue_handle_eos);
 
   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
@@ -149,28 +151,50 @@ gst_queue_init (GstQueue *queue)
   queue->fullcond = g_cond_new ();
 }
 
-static void 
-gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) 
+static gboolean
+gst_queue_handle_eos (GstPad *pad)
+{
+  GstQueue *queue;
+
+  queue = GST_QUEUE(pad->parent);
+
+  GST_DEBUG (0,"queue: %s received eos\n", gst_element_get_name (GST_ELEMENT (queue)));
+
+  GST_LOCK (queue);
+  GST_DEBUG (0,"queue: %s has %d buffers left\n", gst_element_get_name (GST_ELEMENT (queue)),
+                 queue->level_buffers);
+
+  GST_FLAG_SET (pad, GST_PAD_EOS);
+
+  g_cond_signal (queue->emptycond);
+
+  GST_UNLOCK (queue);
+
+  return TRUE;
+}
+
+static void
+gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
 {
   GST_DEBUG (0,"queue: %s cleaning buffer %p\n", (gchar *)user_data, data);
-  
+
   gst_buffer_unref (GST_BUFFER (data));
 }
 
-static void 
-gst_queue_flush (GstQueue *queue) 
+static void
+gst_queue_flush (GstQueue *queue)
 {
-  g_slist_foreach (queue->queue, gst_queue_cleanup_buffers, 
-                 (char *)gst_element_get_name (GST_ELEMENT (queue))); 
+  g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
+                 (char *)gst_element_get_name (GST_ELEMENT (queue)));
   g_slist_free (queue->queue);
-  
+
   queue->queue = NULL;
   queue->level_buffers = 0;
   queue->timeval = NULL;
 }
 
-static void 
-gst_queue_chain (GstPad *pad, GstBuffer *buf) 
+static void
+gst_queue_chain (GstPad *pad, GstBuffer *buf)
 {
   GstQueue *queue;
   gboolean tosignal = FALSE;
@@ -228,7 +252,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
 }
 
 static GstBuffer *
-gst_queue_get (GstPad *pad) 
+gst_queue_get (GstPad *pad)
 {
   GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
   GstBuffer *buf = NULL;
@@ -252,6 +276,10 @@ gst_queue_get (GstPad *pad)
   while (!queue->level_buffers) {
     STATUS("queue: %s U released lock\n");
     //g_cond_timed_wait (queue->emptycond, queue->emptylock, queue->timeval);
+    if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
+      gst_pad_set_eos (queue->srcpad);
+      return NULL;
+    }
     //FIXME need to signal other thread in case signals got lost?
     g_cond_signal (queue->fullcond);
     g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
@@ -283,8 +311,8 @@ gst_queue_get (GstPad *pad)
   /* unlock now */
 }
 
-static GstElementStateReturn 
-gst_queue_change_state (GstElement *element) 
+static GstElementStateReturn
+gst_queue_change_state (GstElement *element)
 {
   GstQueue *queue;
   g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
@@ -306,14 +334,14 @@ gst_queue_change_state (GstElement *element)
 }
 
 
-static void 
-gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id) 
+static void
+gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
 {
   GstQueue *queue;
 
   /* it's not null if we got it, but it might not be ours */
   g_return_if_fail (GST_IS_QUEUE (object));
-  
+
   queue = GST_QUEUE (object);
 
   switch(id) {
@@ -330,14 +358,14 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
   }
 }
 
-static void 
-gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id) 
+static void
+gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
 {
   GstQueue *queue;
 
   /* it's not null if we got it, but it might not be ours */
   g_return_if_fail (GST_IS_QUEUE (object));
-  
+
   queue = GST_QUEUE (object);
 
   switch (id) {
index e7fca52..34f5a18 100644 (file)
@@ -431,6 +431,7 @@ void gst_bin_schedule_func(GstBin *bin) {
             if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
                 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
               chain->entries = g_list_prepend (chain->entries, peerparent);
+              gtk_signal_connect (GTK_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
               GST_DEBUG (0,"added '%s' as DECOUPLED entry into the chain\n",gst_element_get_name(peerparent));
             }
           } else
index 1fe53cc..4f82a31 100644 (file)
@@ -311,9 +311,13 @@ gst_thread_main_loop (void *arg)
   gst_thread_signal_thread (thread);
 
   while (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING)) {
-    if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING))
-      gst_bin_iterate (GST_BIN (thread));
+    if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
+      if (!gst_bin_iterate (GST_BIN (thread))) {
+       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
+      }
+    }
     else {
+      GST_DEBUG (0, "thread \"%s\" waiting\n", gst_element_get_name (GST_ELEMENT (thread)));
       gst_thread_wait_thread (thread);
     }
   }
@@ -326,8 +330,8 @@ gst_thread_main_loop (void *arg)
   return NULL;
 }
 
-static void 
-gst_thread_signal_thread (GstThread *thread) 
+static void
+gst_thread_signal_thread (GstThread *thread)
 {
   GST_DEBUG (0,"signaling thread\n");
   g_mutex_lock (thread->lock);
@@ -345,10 +349,10 @@ gst_thread_wait_thread (GstThread *thread)
 }
 
 
-static void 
+static void
 gst_thread_restore_thyself (GstElement *element,
-                           xmlNodePtr parent, 
-                           GHashTable *elements) 
+                           xmlNodePtr parent,
+                           GHashTable *elements)
 {
   GST_DEBUG (0,"gstthread: restore\n");
 
@@ -356,9 +360,9 @@ gst_thread_restore_thyself (GstElement *element,
     GST_ELEMENT_CLASS (parent_class)->restore_thyself (element,parent, elements);
 }
 
-static xmlNodePtr 
+static xmlNodePtr
 gst_thread_save_thyself (GstElement *element,
-                        xmlNodePtr parent) 
+                        xmlNodePtr parent)
 {
   if (GST_ELEMENT_CLASS (parent_class)->save_thyself)
     GST_ELEMENT_CLASS (parent_class)->save_thyself (element,parent);
index f5f1b86..6e44fd8 100644 (file)
@@ -2,7 +2,7 @@
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
  *                    2000 Wim Taymans <wtay@chello.be>
  *
- * gstqueue.c: 
+ * gstqueue.c:
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -25,7 +25,7 @@
 #ifdef STATUS_ENABLED
 #define STATUS(A) GST_DEBUG(0,A, gst_element_get_name(GST_ELEMENT(queue)))
 #else
-#define STATUS(A) 
+#define STATUS(A)
 #endif
 
 #include <pthread.h>
@@ -60,18 +60,19 @@ enum {
 };
 
 
-static void                    gst_queue_class_init    (GstQueueClass *klass);
-static void                    gst_queue_init          (GstQueue *queue);
+static void                    gst_queue_class_init    (GstQueueClass *klass);
+static void                    gst_queue_init          (GstQueue *queue);
 
-static void                    gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
-static void                    gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
+static void                    gst_queue_set_arg       (GtkObject *object, GtkArg *arg, guint id);
+static void                    gst_queue_get_arg       (GtkObject *object, GtkArg *arg, guint id);
 
-static void                    gst_queue_chain         (GstPad *pad, GstBuffer *buf);
+static gboolean                        gst_queue_handle_eos    (GstPad *pad);
+static void                    gst_queue_chain         (GstPad *pad, GstBuffer *buf);
 static GstBuffer *             gst_queue_get           (GstPad *pad);
 
-static void                    gst_queue_flush         (GstQueue *queue);
+static void                    gst_queue_flush         (GstQueue *queue);
 
-static GstElementStateReturn   gst_queue_change_state  (GstElement *element);
+static GstElementStateReturn   gst_queue_change_state  (GstElement *element);
 
 
 static GstElementClass *parent_class = NULL;
@@ -97,8 +98,8 @@ gst_queue_get_type(void) {
   return queue_type;
 }
 
-static void 
-gst_queue_class_init (GstQueueClass *klass) 
+static void
+gst_queue_class_init (GstQueueClass *klass)
 {
   GtkObjectClass *gtkobject_class;
   GstElementClass *gstelement_class;
@@ -117,14 +118,14 @@ gst_queue_class_init (GstQueueClass *klass)
   gtk_object_add_arg_type ("GstQueue::timeout", GTK_TYPE_INT,
                            GTK_ARG_READWRITE, ARG_TIMEOUT);
 
-  gtkobject_class->set_arg = gst_queue_set_arg;  
+  gtkobject_class->set_arg = gst_queue_set_arg;
   gtkobject_class->get_arg = gst_queue_get_arg;
 
   gstelement_class->change_state = gst_queue_change_state;
 }
 
-static void 
-gst_queue_init (GstQueue *queue) 
+static void
+gst_queue_init (GstQueue *queue)
 {
   // scheduling on this kind of element is, well, interesting
   GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
@@ -132,6 +133,7 @@ gst_queue_init (GstQueue *queue)
   queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
   gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
+  gst_pad_set_eos_function (queue->sinkpad, gst_queue_handle_eos);
 
   queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
@@ -149,28 +151,50 @@ gst_queue_init (GstQueue *queue)
   queue->fullcond = g_cond_new ();
 }
 
-static void 
-gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) 
+static gboolean
+gst_queue_handle_eos (GstPad *pad)
+{
+  GstQueue *queue;
+
+  queue = GST_QUEUE(pad->parent);
+
+  GST_DEBUG (0,"queue: %s received eos\n", gst_element_get_name (GST_ELEMENT (queue)));
+
+  GST_LOCK (queue);
+  GST_DEBUG (0,"queue: %s has %d buffers left\n", gst_element_get_name (GST_ELEMENT (queue)),
+                 queue->level_buffers);
+
+  GST_FLAG_SET (pad, GST_PAD_EOS);
+
+  g_cond_signal (queue->emptycond);
+
+  GST_UNLOCK (queue);
+
+  return TRUE;
+}
+
+static void
+gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
 {
   GST_DEBUG (0,"queue: %s cleaning buffer %p\n", (gchar *)user_data, data);
-  
+
   gst_buffer_unref (GST_BUFFER (data));
 }
 
-static void 
-gst_queue_flush (GstQueue *queue) 
+static void
+gst_queue_flush (GstQueue *queue)
 {
-  g_slist_foreach (queue->queue, gst_queue_cleanup_buffers, 
-                 (char *)gst_element_get_name (GST_ELEMENT (queue))); 
+  g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
+                 (char *)gst_element_get_name (GST_ELEMENT (queue)));
   g_slist_free (queue->queue);
-  
+
   queue->queue = NULL;
   queue->level_buffers = 0;
   queue->timeval = NULL;
 }
 
-static void 
-gst_queue_chain (GstPad *pad, GstBuffer *buf) 
+static void
+gst_queue_chain (GstPad *pad, GstBuffer *buf)
 {
   GstQueue *queue;
   gboolean tosignal = FALSE;
@@ -228,7 +252,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
 }
 
 static GstBuffer *
-gst_queue_get (GstPad *pad) 
+gst_queue_get (GstPad *pad)
 {
   GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
   GstBuffer *buf = NULL;
@@ -252,6 +276,10 @@ gst_queue_get (GstPad *pad)
   while (!queue->level_buffers) {
     STATUS("queue: %s U released lock\n");
     //g_cond_timed_wait (queue->emptycond, queue->emptylock, queue->timeval);
+    if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
+      gst_pad_set_eos (queue->srcpad);
+      return NULL;
+    }
     //FIXME need to signal other thread in case signals got lost?
     g_cond_signal (queue->fullcond);
     g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
@@ -283,8 +311,8 @@ gst_queue_get (GstPad *pad)
   /* unlock now */
 }
 
-static GstElementStateReturn 
-gst_queue_change_state (GstElement *element) 
+static GstElementStateReturn
+gst_queue_change_state (GstElement *element)
 {
   GstQueue *queue;
   g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
@@ -306,14 +334,14 @@ gst_queue_change_state (GstElement *element)
 }
 
 
-static void 
-gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id) 
+static void
+gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
 {
   GstQueue *queue;
 
   /* it's not null if we got it, but it might not be ours */
   g_return_if_fail (GST_IS_QUEUE (object));
-  
+
   queue = GST_QUEUE (object);
 
   switch(id) {
@@ -330,14 +358,14 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
   }
 }
 
-static void 
-gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id) 
+static void
+gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
 {
   GstQueue *queue;
 
   /* it's not null if we got it, but it might not be ours */
   g_return_if_fail (GST_IS_QUEUE (object));
-  
+
   queue = GST_QUEUE (object);
 
   switch (id) {