bufferpool: Refactor stopping of the pool
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 21 Feb 2011 17:43:19 +0000 (18:43 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 2 Mar 2011 10:23:21 +0000 (11:23 +0100)
Move some methods around.
Make sure we check for config parsing errors.
Increment the outstanding buffers before calling acquire so that we can be sure
that set_active() doesn't free the pool from under us.

gst/gstbufferpool.c

index 7ab221a..8ac4cb6 100644 (file)
@@ -49,9 +49,9 @@
 struct _GstBufferPoolPrivate
 {
   GStaticRecMutex rec_lock;
+  guint size;
   guint min_buffers;
   guint max_buffers;
-  guint size;
   guint prefix;
   guint postfix;
   guint align;
@@ -149,6 +149,30 @@ gst_buffer_pool_new (void)
   return result;
 }
 
+static GstFlowReturn
+default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
+    GstBufferPoolParams * params)
+{
+  guint size, align;
+  GstBufferPoolPrivate *priv = pool->priv;
+
+  *buffer = gst_buffer_new ();
+
+  align = priv->align - 1;
+  size = priv->prefix + priv->postfix + priv->size + align;
+  if (size > 0) {
+    guint8 *memptr;
+
+    memptr = g_malloc (size);
+    GST_BUFFER_MALLOCDATA (*buffer) = memptr;
+    memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
+    GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
+    GST_BUFFER_SIZE (*buffer) = priv->size;
+  }
+
+  return GST_FLOW_OK;
+}
+
 /* the default implementation for preallocating the buffers
  * in the pool */
 static gboolean
@@ -190,12 +214,34 @@ alloc_failed:
   }
 }
 
+/* must be called with the lock */
+static gboolean
+do_start (GstBufferPool * pool)
+{
+  if (!pool->started) {
+    GstBufferPoolClass *pclass;
+
+    pclass = GST_BUFFER_POOL_GET_CLASS (pool);
+
+    /* start the pool, subclasses should allocate buffers and put them
+     * in the queue */
+    if (G_LIKELY (pclass->start)) {
+      if (!pclass->start (pool))
+        return FALSE;
+    }
+    pool->started = TRUE;
+  }
+  return TRUE;
+}
+
+
 static void
 default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
 {
   gst_buffer_unref (buffer);
 }
 
+/* must be called with the lock */
 static gboolean
 default_stop (GstBufferPool * pool)
 {
@@ -214,26 +260,7 @@ default_stop (GstBufferPool * pool)
   return TRUE;
 }
 
-static gboolean
-do_start (GstBufferPool * pool)
-{
-
-  if (!pool->started) {
-    GstBufferPoolClass *pclass;
-
-    pclass = GST_BUFFER_POOL_GET_CLASS (pool);
-
-    /* start the pool, subclasses should allocate buffers and put them
-     * in the queue */
-    if (G_LIKELY (pclass->start)) {
-      if (!pclass->start (pool))
-        return FALSE;
-    }
-    pool->started = TRUE;
-  }
-  return TRUE;
-}
-
+/* must be called with the lock */
 static gboolean
 do_stop (GstBufferPool * pool)
 {
@@ -335,12 +362,28 @@ static gboolean
 default_set_config (GstBufferPool * pool, GstStructure * config)
 {
   GstBufferPoolPrivate *priv = pool->priv;
+  guint size, min_buffers, max_buffers;
+  guint prefix, postfix, align;
 
   /* parse the config and keep around */
-  gst_buffer_pool_config_get (config, &priv->size, &priv->min_buffers,
-      &priv->max_buffers, &priv->prefix, &priv->postfix, &priv->align);
+  if (!gst_buffer_pool_config_get (config, &size, &min_buffers,
+          &max_buffers, &prefix, &postfix, &align))
+    goto wrong_config;
+
+  priv->size = size;
+  priv->min_buffers = min_buffers;
+  priv->max_buffers = max_buffers;
+  priv->prefix = prefix;
+  priv->postfix = postfix;
+  priv->align = align;
 
   return TRUE;
+
+wrong_config:
+  {
+    GST_WARNING_OBJECT (pool, "invalid config");
+    return FALSE;
+  }
 }
 
 /**
@@ -493,30 +536,6 @@ gst_buffer_pool_config_get (GstStructure * config, guint * size,
 }
 
 static GstFlowReturn
-default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
-    GstBufferPoolParams * params)
-{
-  guint size, align;
-  GstBufferPoolPrivate *priv = pool->priv;
-
-  *buffer = gst_buffer_new ();
-
-  align = priv->align - 1;
-  size = priv->prefix + priv->postfix + priv->size + align;
-  if (size > 0) {
-    guint8 *memptr;
-
-    memptr = g_malloc (size);
-    GST_BUFFER_MALLOCDATA (*buffer) = memptr;
-    memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
-    GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
-    GST_BUFFER_SIZE (*buffer) = priv->size;
-  }
-
-  return GST_FLOW_OK;
-}
-
-static GstFlowReturn
 default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
     GstBufferPoolParams * params)
 {
@@ -527,12 +546,12 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
   while (TRUE) {
-    if (g_atomic_int_get (&pool->flushing))
+    if (G_UNLIKELY (g_atomic_int_get (&pool->flushing)))
       return GST_FLOW_WRONG_STATE;
 
     /* try to get a buffer from the queue */
     *buffer = gst_atomic_queue_pop (pool->queue);
-    if (*buffer) {
+    if (G_LIKELY (*buffer)) {
       gst_poll_read_control (pool->poll);
       result = GST_FLOW_OK;
       break;
@@ -561,6 +580,24 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
   return result;
 }
 
+static inline void
+dec_outstanding (GstBufferPool * pool)
+{
+  if (g_atomic_int_dec_and_test (&pool->outstanding)) {
+    /* all buffers are returned to the pool, see if we need to free them */
+    if (g_atomic_int_get (&pool->flushing)) {
+      /* take the lock so that set_active is not run concurrently */
+      GST_BUFFER_POOL_LOCK (pool);
+      /* recheck the flushing state in the lock, the pool could have been
+       * set to active again */
+      if (g_atomic_int_get (&pool->flushing))
+        do_stop (pool);
+
+      GST_BUFFER_POOL_UNLOCK (pool);
+    }
+  }
+}
+
 /**
  * gst_buffer_pool_acquire_buffer:
  * @pool: a #GstBufferPool
@@ -587,13 +624,17 @@ gst_buffer_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
 
   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
+  /* assume we'll have one more outstanding buffer we need to do that so
+   * that concurrent set_active doesn't clear the buffers */
+  g_atomic_int_inc (&pool->outstanding);
+
   if (G_LIKELY (pclass->acquire_buffer))
     result = pclass->acquire_buffer (pool, buffer, params);
   else
     result = GST_FLOW_NOT_SUPPORTED;
 
-  if (G_LIKELY (result == GST_FLOW_OK && *buffer))
-    g_atomic_int_inc (&pool->outstanding);
+  if (G_UNLIKELY (result != GST_FLOW_OK))
+    dec_outstanding (pool);
 
   return result;
 }
@@ -630,25 +671,5 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
   if (G_LIKELY (pclass->release_buffer))
     pclass->release_buffer (pool, buffer);
 
-  if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
-    if (g_atomic_int_dec_and_test (&pool->outstanding)) {
-      /* take the lock so that set_active is not run concurrently */
-      GST_BUFFER_POOL_LOCK (pool);
-      /* recheck the flushing state in the lock, the pool could have been
-       * set to active again */
-      if (g_atomic_int_get (&pool->flushing)) {
-        if (!do_stop (pool))
-          goto stop_failed;
-      }
-      GST_BUFFER_POOL_UNLOCK (pool);
-    }
-  }
-  return;
-
-stop_failed:
-  {
-    GST_WARNING_OBJECT (pool, "stop failed");
-    GST_BUFFER_POOL_UNLOCK (pool);
-    return;
-  }
+  dec_outstanding (pool);
 }