plugins/elements/gstmultiqueue.*: revert extra-size-buffers stuff, caused some race...
authorThijs Vermeir <thijsvermeir@gmail.com>
Thu, 26 Jun 2008 20:27:00 +0000 (20:27 +0000)
committerThijs Vermeir <thijsvermeir@gmail.com>
Thu, 26 Jun 2008 20:27:00 +0000 (20:27 +0000)
Original commit message from CVS:
* plugins/elements/gstmultiqueue.c:
* plugins/elements/gstmultiqueue.h:
revert extra-size-buffers stuff, caused some race conditions
and extra-size-buffers is not used anymore. Docs needs some updates

ChangeLog
plugins/elements/gstmultiqueue.c
plugins/elements/gstmultiqueue.h

index 33b968a..cea8f2d 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+2008-06-26  Thijs Vermeir  <thijsvermeir@gmail.com>
+
+       * plugins/elements/gstmultiqueue.c:
+       * plugins/elements/gstmultiqueue.h:
+       revert extra-size-buffers stuff, caused some race conditions
+       and extra-size-buffers is not used anymore. Docs needs some updates
+
 2008-06-26  Tim-Philipp Müller  <tim.muller at collabora co uk>
 
        * win32/common/config.h:
index 1e0b9ba..291f136 100644 (file)
@@ -2,7 +2,6 @@
  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
  * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
  * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
- * Copyright (C) 2008 Thijs Vermeir <thijsvermeir@gmail.com>
  *
  * gstmultiqueue.c:
  *
@@ -219,11 +218,11 @@ enum
 };
 
 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
-  g_mutex_lock (q->qlock);                                                    \
+  g_mutex_lock (q->qlock);                                              \
 } G_STMT_END
 
 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
-  g_mutex_unlock (q->qlock);                                                  \
+  g_mutex_unlock (q->qlock);                                            \
 } G_STMT_END
 
 static void gst_multi_queue_finalize (GObject * object);
@@ -253,8 +252,7 @@ gst_multi_queue_base_init (gpointer g_class)
 
   gst_element_class_set_details_simple (gstelement_class,
       "MultiQueue",
-      "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>\n"
-      "Thijs Vermeir <thijsvermeir@gmail.com>");
+      "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
   gst_element_class_add_pad_template (gstelement_class,
@@ -381,14 +379,13 @@ gst_multi_queue_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
-#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {            \
-    GList * tmp = mq->queues;                                   \
-    while (tmp) {                                               \
-      GstSingleQueue *q = (GstSingleQueue*)tmp->data;           \
+#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {           \
+    GList * tmp = mq->queues;                                  \
+    while (tmp) {                                              \
+      GstSingleQueue *q = (GstSingleQueue*)tmp->data;          \
       q->max_size.format = mq->max_size.format;                 \
-      q->extra_size.format = mq->extra_size.format;             \
-      tmp = g_list_next(tmp);                                   \
-    };                                                          \
+      tmp = g_list_next(tmp);                                  \
+    };                                                         \
 } G_STMT_END
 
 static void
@@ -397,37 +394,38 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
 {
   GstMultiQueue *mq = GST_MULTI_QUEUE (object);
 
-  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   switch (prop_id) {
     case ARG_MAX_SIZE_BYTES:
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
       mq->max_size.bytes = g_value_get_uint (value);
       SET_CHILD_PROPERTY (mq, bytes);
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       break;
     case ARG_MAX_SIZE_BUFFERS:
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
       mq->max_size.visible = g_value_get_uint (value);
       SET_CHILD_PROPERTY (mq, visible);
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       break;
     case ARG_MAX_SIZE_TIME:
+      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
       mq->max_size.time = g_value_get_uint64 (value);
       SET_CHILD_PROPERTY (mq, time);
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
       break;
     case ARG_EXTRA_SIZE_BYTES:
       mq->extra_size.bytes = g_value_get_uint (value);
-      SET_CHILD_PROPERTY (mq, bytes);
       break;
     case ARG_EXTRA_SIZE_BUFFERS:
       mq->extra_size.visible = g_value_get_uint (value);
-      SET_CHILD_PROPERTY (mq, visible);
       break;
     case ARG_EXTRA_SIZE_TIME:
       mq->extra_size.time = g_value_get_uint64 (value);
-      SET_CHILD_PROPERTY (mq, time);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
-  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 }
 
 static void
@@ -503,6 +501,7 @@ no_parent:
   }
 }
 
+
 /*
  * GstElement methods
  */
@@ -601,7 +600,6 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
     sq->srcresult = GST_FLOW_OK;
     sq->cur_time = 0;
     sq->max_size.visible = mq->max_size.visible;
-    sq->extra_size.visible = mq->extra_size.visible;
     sq->is_eos = FALSE;
     sq->inextra = FALSE;
     sq->nextid = 0;
@@ -1272,67 +1270,70 @@ compute_high_id (GstMultiQueue * mq)
 #define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
      (sq->max_size.format) <= (value))
 
+/*
+ * GstSingleQueue functions
+ */
 static void
-single_queue_overrun_cb_unlocked (GstDataQueue * dq, GstSingleQueue * sq)
+single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
 {
   GstMultiQueue *mq = sq->mqueue;
   GList *tmp;
   GstDataQueueSize size;
+  gboolean filled = FALSE;
 
   gst_data_queue_get_level (sq->queue, &size);
-  mq->filled = FALSE;
 
-  /* if we have reached max visible we can maybe bump this
-   * if another queue is empty, skip this if we can't grow anymore
-   */
-  if (IS_FILLED (visible, size.visible) && sq->extra_size.visible > 0) {
-    for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
-      GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
+  GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
 
-      if (ssq == sq)
-        continue;
+  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+    GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
+    GstDataQueueSize ssize;
+
+    GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
 
-      if (gst_data_queue_is_empty (ssq->queue)) {
-        sq->max_size.visible++;
-        sq->extra_size.visible--;
+    if (gst_data_queue_is_empty (ssq->queue)) {
+      GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
+      if (IS_FILLED (visible, size.visible)) {
+        sq->max_size.visible = size.visible + 1;
         GST_DEBUG_OBJECT (mq,
-            "queue %d is empty, bumping single queue %d max visible to %d",
-            ssq->id, sq->id, sq->max_size.visible);
-        break;
+            "Another queue is empty, bumping single queue %d max visible to %d",
+            sq->id, sq->max_size.visible);
       }
+      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+      goto beach;
     }
-    /* check if the queue is still full */
-    mq->filled = gst_data_queue_is_full (sq->queue);
-  }
-}
-
-/*
- * GstSingleQueue functions
- */
-static void
-single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
-{
-  GstMultiQueue *mq = sq->mqueue;
-
-  GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
-
-  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
-
-  single_queue_overrun_cb_unlocked (dq, sq);
+    /* check if we reached the hard time/bytes limits */
+    gst_data_queue_get_level (ssq->queue, &ssize);
 
+    GST_DEBUG_OBJECT (mq,
+        "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+        G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
+        ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
+
+    /* if this queue is filled completely we must signal overrun */
+    if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
+      GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
+      filled = TRUE;
+    }
+  }
+  /* no queues were empty */
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
-  if (mq->filled) {
+  /* Overrun is always forwarded, since this is blocking the upstream element */
+  if (filled) {
     GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
     g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
-    mq->filled = FALSE;
   }
+
+beach:
+  return;
 }
 
 static void
 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
 {
-  gboolean all_empty = TRUE;
+  gboolean empty = TRUE;
   GstMultiQueue *mq = sq->mqueue;
   GList *tmp;
 
@@ -1341,36 +1342,29 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
 
   GST_MULTI_QUEUE_MUTEX_LOCK (mq);
   for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
-    GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
+    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
 
-    if (sq == ssq)
-      continue;
+    if (gst_data_queue_is_full (sq->queue)) {
+      GstDataQueueSize size;
 
-    /* prevent data starvation */
-    if (gst_data_queue_is_full (ssq->queue)) {
-      single_queue_overrun_cb_unlocked (dq, ssq);
-      goto check_filled;
-    }
-
-    if (!gst_data_queue_is_empty (ssq->queue)) {
-      all_empty = FALSE;
-      break;
+      gst_data_queue_get_level (sq->queue, &size);
+      if (IS_FILLED (visible, size.visible)) {
+        sq->max_size.visible = size.visible + 1;
+        GST_DEBUG_OBJECT (mq,
+            "queue %d is filled, bumping its max visible to %d", sq->id,
+            sq->max_size.visible);
+        gst_data_queue_limits_changed (sq->queue);
+      }
     }
+    if (!gst_data_queue_is_empty (sq->queue))
+      empty = FALSE;
   }
   GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
 
-  if (all_empty) {
+  if (empty) {
     GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
     g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
   }
-  return;
-
-check_filled:
-  if (mq->filled) {
-    GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
-    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
-    mq->filled = FALSE;
-  }
 }
 
 static gboolean
@@ -1448,7 +1442,7 @@ gst_single_queue_new (GstMultiQueue * mqueue)
   sq->oldid = 0;
   sq->turn = g_cond_new ();
 
-  /* attach to underrun/overrun signals to handle non-starvation */
+  /* attach to underrun/overrun signals to handle non-starvation  */
   g_signal_connect (G_OBJECT (sq->queue), "full",
       G_CALLBACK (single_queue_overrun_cb), sq);
   g_signal_connect (G_OBJECT (sq->queue), "empty",
index 1666b5d..4874563 100644 (file)
@@ -68,7 +68,6 @@ struct _GstMultiQueue {
   gint nextnotlinked;  /* ID of the next queue not linked (-1 : none) */
 
   gint numwaiting;     /* number of not-linked pads waiting */
-  gboolean filled; /* overrun detected */
 };
 
 struct _GstMultiQueueClass {