added taaz's threading patch, including queue events
authorErik Walthinsen <omega@temple-baptist.org>
Sat, 27 Oct 2001 20:28:31 +0000 (20:28 +0000)
committerErik Walthinsen <omega@temple-baptist.org>
Sat, 27 Oct 2001 20:28:31 +0000 (20:28 +0000)
Original commit message from CVS:
added taaz's threading patch, including queue events

gst/elements/gstfakesrc.c
gst/elements/gststatistics.c
gst/gstpad.h
gst/gstqueue.c
gst/gstqueue.h
gst/gstscheduler.c
gst/gstthread.c
plugins/elements/gstfakesrc.c
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h
plugins/elements/gststatistics.c

index bed56e4..8dece3e 100644 (file)
@@ -679,8 +679,10 @@ gst_fakesrc_get(GstPad *pad)
     g_print("fakesrc: get      ******* (%s:%s)> (%d bytes, %llu) \n",
                GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
 
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n");
   g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
                    buf, pad);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n");
 
   return buf;
 }
index 398c119..fbfdbd9 100644 (file)
@@ -37,7 +37,6 @@ GstElementDetails gst_statistics_details = {
 /* Statistics signals and args */
 enum {
   SIGNAL_UPDATE,
-  /* FILL ME */
   LAST_SIGNAL
 };
 
@@ -66,7 +65,7 @@ static void gst_statistics_reset              (GstStatistics *statistics);
 static void gst_statistics_print               (GstStatistics *statistics);
 
 static GstElementClass *parent_class = NULL;
-static guint gst_statistics_signals[LAST_SIGNAL] = { 0 };
+static guint gst_statistics_signals[LAST_SIGNAL] = { 0, };
 
 static stats zero_stats = { 0, };
 
@@ -131,7 +130,7 @@ gst_statistics_class_init (GstStatisticsClass *klass)
   gst_statistics_signals[SIGNAL_UPDATE] =
     g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
                    G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL,
-                   g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0);
+                   g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
 
   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property);
   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property);
@@ -283,10 +282,12 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
 
   if (GST_IS_EVENT(buf)) {
     GstEvent *event = GST_EVENT (buf);
-    gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
     statistics->stats.events += 1;
-    if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) {
-      update = TRUE;
+    if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
+      gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
+      if (statistics->update_on_eos) {
+        update = TRUE;
+      }
     }
     if (statistics->update_freq.events) {
       statistics->update_count.events += 1;
@@ -317,7 +318,9 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
 
   if (update) {
     if (statistics->update) {
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n");
       g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0);
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n");
     }
     if (!statistics->silent) {
       gst_statistics_print(statistics);
index bfeddb6..be75ca9 100644 (file)
@@ -257,7 +257,7 @@ struct _GstGhostPadClass {
 #define GST_GPAD_REALPAD(pad)          (((GstGhostPad *)(pad))->realpad)
 
 /* Generic */
-#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
+#define GST_PAD_REALIZE(pad)   (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
 #define GST_PAD_DIRECTION(pad)         GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
 #define GST_PAD_CAPS(pad)              GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
 #define GST_PAD_PEER(pad)              GST_RPAD_PEER(GST_PAD_REALIZE(pad))
index 08cc5f8..6a25133 100644 (file)
@@ -81,6 +81,7 @@ static void                   gst_queue_chain                 (GstPad *pad, GstBuffer *buf);
 static GstBuffer *             gst_queue_get                   (GstPad *pad);
 static GstBufferPool*          gst_queue_get_bufferpool        (GstPad *pad);
        
+static void                    gst_queue_locked_flush                  (GstQueue *queue);
 static void                    gst_queue_flush                 (GstQueue *queue);
 
 static GstElementStateReturn   gst_queue_change_state          (GstElement *element);
@@ -180,9 +181,12 @@ gst_queue_init (GstQueue *queue)
   queue->size_bytes = 100 * 1024;      // 100KB
   queue->size_time = 1000000000LL;     // 1sec
 
-  queue->emptycond = g_cond_new ();
-  queue->fullcond = g_cond_new ();
-  GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
+  queue->qlock = g_mutex_new ();
+  queue->reader = FALSE;
+  queue->writer = FALSE;
+  queue->not_empty = g_cond_new ();
+  queue->not_full = g_cond_new ();
+  GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
 }
 
 static GstBufferPool*
@@ -215,41 +219,18 @@ gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
   return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
 }
 
-static gboolean
-gst_queue_handle_event (GstPad *pad)
-{
-  GstQueue *queue;
-
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
-
-  GST_LOCK (queue);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
-                 queue->level_buffers);
-
-  GST_FLAG_SET (pad, GST_PAD_EOS);
-
-  g_cond_signal (queue->emptycond);
-
-  GST_UNLOCK (queue);
-
-  return TRUE;
-}
-
 static void
 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
 {
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
-
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
   gst_buffer_unref (GST_BUFFER (data));
 }
 
 static void
-gst_queue_flush (GstQueue *queue)
+gst_queue_locked_flush (GstQueue *queue)
 {
   g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
-                 (char *) GST_ELEMENT_NAME (queue));
+                 (gpointer) queue);
   g_slist_free (queue->queue);
 
   queue->queue = NULL;
@@ -258,39 +239,62 @@ gst_queue_flush (GstQueue *queue)
 }
 
 static void
+gst_queue_flush (GstQueue *queue)
+{
+  g_mutex_lock (queue->qlock);
+  gst_queue_locked_flush (queue);
+  g_mutex_unlock (queue->qlock);
+}
+
+static void
 gst_queue_chain (GstPad *pad, GstBuffer *buf)
 {
   GstQueue *queue;
-  const guchar *name;
+  gboolean reader;
 
   g_return_if_fail (pad != NULL);
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (buf != NULL);
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-  name = GST_ELEMENT_NAME (queue);
 
-  /* we have to lock the queue since we span threads */
+  reader = FALSE;
 
-//  GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
-  GST_LOCK (queue);
+  /* we have to lock the queue since we span threads */
 
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
-    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
-    gst_queue_flush (queue);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+  g_mutex_lock (queue->qlock);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
+
+  if (GST_IS_EVENT(buf)) {
+    GstEvent *event = GST_EVENT(buf);
+    switch (GST_EVENT_TYPE(event)) {
+      case GST_EVENT_FLUSH:
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
+        gst_queue_locked_flush (queue);
+        break;
+      default:
+        break;
+    }
   }
 
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
 
-  if (queue->level_buffers >= queue->size_buffers) {
+  if (queue->level_buffers == queue->size_buffers) {
     // if this is a leaky queue...
     if (queue->leaky) {
+      // FIXME don't want to leak events!
       // if we leak on the upstream side, drop the current buffer
       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+        if (GST_IS_EVENT (buf))
+          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+              GST_EVENT_TYPE(GST_EVENT(buf)));
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
         gst_buffer_unref(buf);
         // now we have to clean up and exit right away
-        GST_UNLOCK (queue);
+        g_mutex_unlock (queue->qlock);
         return;
       }
       // otherwise we have to push a buffer off the other end
@@ -300,6 +304,10 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
         front = queue->queue;
         leakbuf = (GstBuffer *)(front->data);
+        if (GST_IS_EVENT (leakbuf))
+          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+              GST_EVENT_TYPE(GST_EVENT(leakbuf)));
         queue->level_buffers--;
         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
         gst_buffer_unref(leakbuf);
@@ -308,7 +316,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
       }
     }
 
-    while (queue->level_buffers >= queue->size_buffers) {
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+        queue->level_buffers, queue->size_buffers);
+    while (queue->level_buffers == queue->size_buffers) {
       // if there's a pending state change for this queue or its manager, switch
       // back to iterator so bottom half of state change executes
       if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -316,36 +326,45 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != 
 GST_STATE_VOID_PENDING)
       {
-        GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
-          GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
         if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
-          GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
-        GST_UNLOCK(queue);
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+        g_mutex_unlock (queue->qlock);
         cothread_switch(cothread_current_main());
       }
 
-      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
-      g_cond_signal (queue->emptycond);
-      g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
-      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+      if (queue->writer)
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
+      queue->writer = TRUE;
+      g_cond_wait (queue->not_full, queue->qlock);
+      queue->writer = FALSE;
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
     }
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
+        queue->level_buffers, queue->size_buffers);
   }
 
   /* put the buffer on the tail of the list */
   queue->queue = g_slist_append (queue->queue, buf);
   queue->level_buffers++;
   queue->level_bytes += GST_BUFFER_SIZE(buf);
-//  GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
+      GST_DEBUG_PAD_NAME(pad),
+      queue->level_buffers, queue->size_buffers);
+
+  /* reader waiting on an empty queue */
+  reader = queue->reader;
 
-  /* if we were empty, but aren't any more, signal a condition */
-  if (queue->level_buffers == 1)
+  g_mutex_unlock (queue->qlock);
+
+  if (reader)
   {
-    GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
-    g_cond_signal (queue->emptycond);
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
+    g_cond_signal (queue->not_empty);
   }
-
-  GST_UNLOCK (queue);
 }
 
 static GstBuffer *
@@ -354,7 +373,7 @@ gst_queue_get (GstPad *pad)
   GstQueue *queue;
   GstBuffer *buf = NULL;
   GSList *front;
-  const guchar *name;
+  gboolean writer;
 
   g_assert(pad != NULL);
   g_assert(GST_IS_PAD(pad));
@@ -362,22 +381,16 @@ gst_queue_get (GstPad *pad)
   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-  name = GST_ELEMENT_NAME (queue);
+
+  writer = FALSE;
 
   /* have to lock for thread-safety */
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
-  GST_LOCK (queue);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
-
-  while (!queue->level_buffers) {
-    if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
-      GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
-      GST_UNLOCK(queue);
-      // this return NULL shouldn't hurt anything...
-      return NULL;
-    }
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+  g_mutex_lock (queue->qlock);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
 
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+  while (queue->level_buffers == 0) {
     // if there's a pending state change for this queue or its manager, switch
     // back to iterator so bottom half of state change executes
     if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -385,37 +398,60 @@ gst_queue_get (GstPad *pad)
 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != 
 GST_STATE_VOID_PENDING)
     {
-      GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
-        GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
       if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
-        GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
-      GST_UNLOCK(queue);
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+      g_mutex_unlock (queue->qlock);
       cothread_switch(cothread_current_main());
     }
 
-    g_cond_signal (queue->fullcond);
-    g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+    if (queue->reader)
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
+    queue->reader = TRUE;
+    g_cond_wait (queue->not_empty, queue->qlock);
+    queue->reader = FALSE;
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
   }
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
 
   front = queue->queue;
   buf = (GstBuffer *)(front->data);
-  GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
   queue->queue = g_slist_remove_link (queue->queue, front);
   g_slist_free (front);
 
-//  if (queue->level_buffers < queue->size_buffers)
-  if (queue->level_buffers == queue->size_buffers)
-  {
-    GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
-    g_cond_signal (queue->fullcond);
-  }
-
   queue->level_buffers--;
   queue->level_bytes -= GST_BUFFER_SIZE(buf);
-  GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
+      GST_DEBUG_PAD_NAME(pad),
+      queue->level_buffers, queue->size_buffers);
+
+  /* writer waiting on a full queue */
+  writer = queue->writer;
+
+  g_mutex_unlock (queue->qlock);
 
-  GST_UNLOCK(queue);
+  if (writer)
+  {
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
+    g_cond_signal (queue->not_full);
+  }
+
+  // FIXME where should this be? locked?
+  if (GST_IS_EVENT(buf)) {
+    GstEvent *event = GST_EVENT(buf);
+    switch (GST_EVENT_TYPE(event)) {
+      case GST_EVENT_EOS:
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
+        gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
+        break;
+      default:
+        break;
+    }
+  }
 
   return buf;
 }
index 02b73f2..fb091e1 100644 (file)
@@ -75,9 +75,12 @@ struct _GstQueue {
 
   gint leaky;          /* whether the queue is leaky, and if so at which end */
 
-//  GMutex *lock;      (optimization?)
-  GCond *emptycond;
-  GCond *fullcond;
+  GMutex *qlock;       /* lock for queue (vs object lock) */
+  /* we are single reader and single writer queue */
+  gboolean reader;     /* reader waiting on empty queue */
+  gboolean writer;     /* writer waiting on full queue */
+  GCond *not_empty;    /* signals buffers now available for reading */
+  GCond *not_full;     /* signals space now available for writing */
 
   GTimeVal *timeval;   /* the timeout for the queue locking */
 };
index 1c10f13..4b69e11 100644 (file)
@@ -1490,6 +1490,7 @@ GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_
 
         } else {
           GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
+gst_schedule_show(sched);
          //eos = TRUE;
         }
       } else {
index 608e1f2..65a3793 100644 (file)
@@ -382,10 +382,10 @@ gst_thread_change_state (GstElement *element)
             //
             //FIXME also make this more efficient by keeping list of managed queues
             THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
-            GST_LOCK(e);
-            g_cond_signal((GST_QUEUE(e)->emptycond));
-            g_cond_signal((GST_QUEUE(e)->fullcond));
-            GST_UNLOCK(e);
+            //GST_LOCK(e);
+            g_cond_signal((GST_QUEUE(e)->not_empty));
+            g_cond_signal((GST_QUEUE(e)->not_full));
+            //GST_UNLOCK(e);
           }
           else
           {
@@ -417,10 +417,10 @@ gst_thread_change_state (GstElement *element)
               if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
               {
                 THR_DEBUG("  element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
-                GST_LOCK(peerelement);
-                g_cond_signal(GST_QUEUE(peerelement)->emptycond);
-                g_cond_signal(GST_QUEUE(peerelement)->fullcond);
-                GST_UNLOCK(peerelement);
+                //GST_LOCK(peerelement);
+                g_cond_signal(GST_QUEUE(peerelement)->not_empty);
+                g_cond_signal(GST_QUEUE(peerelement)->not_full);
+                //GST_UNLOCK(peerelement);
               }
             }
           }
index bed56e4..8dece3e 100644 (file)
@@ -679,8 +679,10 @@ gst_fakesrc_get(GstPad *pad)
     g_print("fakesrc: get      ******* (%s:%s)> (%d bytes, %llu) \n",
                GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
 
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n");
   g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
                    buf, pad);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n");
 
   return buf;
 }
index 08cc5f8..6a25133 100644 (file)
@@ -81,6 +81,7 @@ static void                   gst_queue_chain                 (GstPad *pad, GstBuffer *buf);
 static GstBuffer *             gst_queue_get                   (GstPad *pad);
 static GstBufferPool*          gst_queue_get_bufferpool        (GstPad *pad);
        
+static void                    gst_queue_locked_flush                  (GstQueue *queue);
 static void                    gst_queue_flush                 (GstQueue *queue);
 
 static GstElementStateReturn   gst_queue_change_state          (GstElement *element);
@@ -180,9 +181,12 @@ gst_queue_init (GstQueue *queue)
   queue->size_bytes = 100 * 1024;      // 100KB
   queue->size_time = 1000000000LL;     // 1sec
 
-  queue->emptycond = g_cond_new ();
-  queue->fullcond = g_cond_new ();
-  GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
+  queue->qlock = g_mutex_new ();
+  queue->reader = FALSE;
+  queue->writer = FALSE;
+  queue->not_empty = g_cond_new ();
+  queue->not_full = g_cond_new ();
+  GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
 }
 
 static GstBufferPool*
@@ -215,41 +219,18 @@ gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
   return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
 }
 
-static gboolean
-gst_queue_handle_event (GstPad *pad)
-{
-  GstQueue *queue;
-
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
-
-  GST_LOCK (queue);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
-                 queue->level_buffers);
-
-  GST_FLAG_SET (pad, GST_PAD_EOS);
-
-  g_cond_signal (queue->emptycond);
-
-  GST_UNLOCK (queue);
-
-  return TRUE;
-}
-
 static void
 gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
 {
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
-
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
   gst_buffer_unref (GST_BUFFER (data));
 }
 
 static void
-gst_queue_flush (GstQueue *queue)
+gst_queue_locked_flush (GstQueue *queue)
 {
   g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
-                 (char *) GST_ELEMENT_NAME (queue));
+                 (gpointer) queue);
   g_slist_free (queue->queue);
 
   queue->queue = NULL;
@@ -258,39 +239,62 @@ gst_queue_flush (GstQueue *queue)
 }
 
 static void
+gst_queue_flush (GstQueue *queue)
+{
+  g_mutex_lock (queue->qlock);
+  gst_queue_locked_flush (queue);
+  g_mutex_unlock (queue->qlock);
+}
+
+static void
 gst_queue_chain (GstPad *pad, GstBuffer *buf)
 {
   GstQueue *queue;
-  const guchar *name;
+  gboolean reader;
 
   g_return_if_fail (pad != NULL);
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (buf != NULL);
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-  name = GST_ELEMENT_NAME (queue);
 
-  /* we have to lock the queue since we span threads */
+  reader = FALSE;
 
-//  GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
-  GST_LOCK (queue);
+  /* we have to lock the queue since we span threads */
 
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
-    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
-    gst_queue_flush (queue);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+  g_mutex_lock (queue->qlock);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
+
+  if (GST_IS_EVENT(buf)) {
+    GstEvent *event = GST_EVENT(buf);
+    switch (GST_EVENT_TYPE(event)) {
+      case GST_EVENT_FLUSH:
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
+        gst_queue_locked_flush (queue);
+        break;
+      default:
+        break;
+    }
   }
 
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
 
-  if (queue->level_buffers >= queue->size_buffers) {
+  if (queue->level_buffers == queue->size_buffers) {
     // if this is a leaky queue...
     if (queue->leaky) {
+      // FIXME don't want to leak events!
       // if we leak on the upstream side, drop the current buffer
       if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+        if (GST_IS_EVENT (buf))
+          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+              GST_EVENT_TYPE(GST_EVENT(buf)));
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
         gst_buffer_unref(buf);
         // now we have to clean up and exit right away
-        GST_UNLOCK (queue);
+        g_mutex_unlock (queue->qlock);
         return;
       }
       // otherwise we have to push a buffer off the other end
@@ -300,6 +304,10 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
         front = queue->queue;
         leakbuf = (GstBuffer *)(front->data);
+        if (GST_IS_EVENT (leakbuf))
+          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
+              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
+              GST_EVENT_TYPE(GST_EVENT(leakbuf)));
         queue->level_buffers--;
         queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
         gst_buffer_unref(leakbuf);
@@ -308,7 +316,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
       }
     }
 
-    while (queue->level_buffers >= queue->size_buffers) {
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+        queue->level_buffers, queue->size_buffers);
+    while (queue->level_buffers == queue->size_buffers) {
       // if there's a pending state change for this queue or its manager, switch
       // back to iterator so bottom half of state change executes
       if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -316,36 +326,45 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) != 
 GST_STATE_VOID_PENDING)
       {
-        GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
-          GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
         if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
-          GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
-        GST_UNLOCK(queue);
+          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+        g_mutex_unlock (queue->qlock);
         cothread_switch(cothread_current_main());
       }
 
-      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
-      g_cond_signal (queue->emptycond);
-      g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
-      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+      if (queue->writer)
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
+      queue->writer = TRUE;
+      g_cond_wait (queue->not_full, queue->qlock);
+      queue->writer = FALSE;
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
     }
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
+        queue->level_buffers, queue->size_buffers);
   }
 
   /* put the buffer on the tail of the list */
   queue->queue = g_slist_append (queue->queue, buf);
   queue->level_buffers++;
   queue->level_bytes += GST_BUFFER_SIZE(buf);
-//  GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
+      GST_DEBUG_PAD_NAME(pad),
+      queue->level_buffers, queue->size_buffers);
+
+  /* reader waiting on an empty queue */
+  reader = queue->reader;
 
-  /* if we were empty, but aren't any more, signal a condition */
-  if (queue->level_buffers == 1)
+  g_mutex_unlock (queue->qlock);
+
+  if (reader)
   {
-    GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
-    g_cond_signal (queue->emptycond);
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
+    g_cond_signal (queue->not_empty);
   }
-
-  GST_UNLOCK (queue);
 }
 
 static GstBuffer *
@@ -354,7 +373,7 @@ gst_queue_get (GstPad *pad)
   GstQueue *queue;
   GstBuffer *buf = NULL;
   GSList *front;
-  const guchar *name;
+  gboolean writer;
 
   g_assert(pad != NULL);
   g_assert(GST_IS_PAD(pad));
@@ -362,22 +381,16 @@ gst_queue_get (GstPad *pad)
   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
-  name = GST_ELEMENT_NAME (queue);
+
+  writer = FALSE;
 
   /* have to lock for thread-safety */
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
-  GST_LOCK (queue);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
-  GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
-
-  while (!queue->level_buffers) {
-    if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
-      GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
-      GST_UNLOCK(queue);
-      // this return NULL shouldn't hurt anything...
-      return NULL;
-    }
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
+  g_mutex_lock (queue->qlock);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
 
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+  while (queue->level_buffers == 0) {
     // if there's a pending state change for this queue or its manager, switch
     // back to iterator so bottom half of state change executes
     if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@@ -385,37 +398,60 @@ gst_queue_get (GstPad *pad)
 GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) != 
 GST_STATE_VOID_PENDING)
     {
-      GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
-        GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
       if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
-        GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
-      GST_UNLOCK(queue);
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
+      g_mutex_unlock (queue->qlock);
       cothread_switch(cothread_current_main());
     }
 
-    g_cond_signal (queue->fullcond);
-    g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
+    if (queue->reader)
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
+    queue->reader = TRUE;
+    g_cond_wait (queue->not_empty, queue->qlock);
+    queue->reader = FALSE;
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
   }
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
 
   front = queue->queue;
   buf = (GstBuffer *)(front->data);
-  GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
   queue->queue = g_slist_remove_link (queue->queue, front);
   g_slist_free (front);
 
-//  if (queue->level_buffers < queue->size_buffers)
-  if (queue->level_buffers == queue->size_buffers)
-  {
-    GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
-    g_cond_signal (queue->fullcond);
-  }
-
   queue->level_buffers--;
   queue->level_bytes -= GST_BUFFER_SIZE(buf);
-  GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
+  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
+      GST_DEBUG_PAD_NAME(pad),
+      queue->level_buffers, queue->size_buffers);
+
+  /* writer waiting on a full queue */
+  writer = queue->writer;
+
+  g_mutex_unlock (queue->qlock);
 
-  GST_UNLOCK(queue);
+  if (writer)
+  {
+    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
+    g_cond_signal (queue->not_full);
+  }
+
+  // FIXME where should this be? locked?
+  if (GST_IS_EVENT(buf)) {
+    GstEvent *event = GST_EVENT(buf);
+    switch (GST_EVENT_TYPE(event)) {
+      case GST_EVENT_EOS:
+        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
+        gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
+        break;
+      default:
+        break;
+    }
+  }
 
   return buf;
 }
index 02b73f2..fb091e1 100644 (file)
@@ -75,9 +75,12 @@ struct _GstQueue {
 
   gint leaky;          /* whether the queue is leaky, and if so at which end */
 
-//  GMutex *lock;      (optimization?)
-  GCond *emptycond;
-  GCond *fullcond;
+  GMutex *qlock;       /* lock for queue (vs object lock) */
+  /* we are single reader and single writer queue */
+  gboolean reader;     /* reader waiting on empty queue */
+  gboolean writer;     /* writer waiting on full queue */
+  GCond *not_empty;    /* signals buffers now available for reading */
+  GCond *not_full;     /* signals space now available for writing */
 
   GTimeVal *timeval;   /* the timeout for the queue locking */
 };
index 398c119..fbfdbd9 100644 (file)
@@ -37,7 +37,6 @@ GstElementDetails gst_statistics_details = {
 /* Statistics signals and args */
 enum {
   SIGNAL_UPDATE,
-  /* FILL ME */
   LAST_SIGNAL
 };
 
@@ -66,7 +65,7 @@ static void gst_statistics_reset              (GstStatistics *statistics);
 static void gst_statistics_print               (GstStatistics *statistics);
 
 static GstElementClass *parent_class = NULL;
-static guint gst_statistics_signals[LAST_SIGNAL] = { 0 };
+static guint gst_statistics_signals[LAST_SIGNAL] = { 0, };
 
 static stats zero_stats = { 0, };
 
@@ -131,7 +130,7 @@ gst_statistics_class_init (GstStatisticsClass *klass)
   gst_statistics_signals[SIGNAL_UPDATE] =
     g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
                    G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL,
-                   g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0);
+                   g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
 
   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property);
   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property);
@@ -283,10 +282,12 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
 
   if (GST_IS_EVENT(buf)) {
     GstEvent *event = GST_EVENT (buf);
-    gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
     statistics->stats.events += 1;
-    if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) {
-      update = TRUE;
+    if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
+      gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
+      if (statistics->update_on_eos) {
+        update = TRUE;
+      }
     }
     if (statistics->update_freq.events) {
       statistics->update_count.events += 1;
@@ -317,7 +318,9 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
 
   if (update) {
     if (statistics->update) {
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n");
       g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0);
+      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n");
     }
     if (!statistics->silent) {
       gst_statistics_print(statistics);