bufferpool: Rework buffer management a little
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 21 Feb 2011 16:33:38 +0000 (17:33 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 2 Mar 2011 10:23:21 +0000 (11:23 +0100)
Add start/stop methods to allow for bulk allocation of buffers.
Free buffers only when all outstanding buffers returned.
Make things more threadsafe wrt flushing and starting/stopping by
keeping track of start and stop method calls.

gst/gstbufferpool.c
gst/gstbufferpool.h

index 8fbd4aa..7ab221a 100644 (file)
@@ -67,7 +67,8 @@ static void gst_buffer_pool_finalize (GObject * object);
 
 G_DEFINE_TYPE (GstBufferPool, gst_buffer_pool, GST_TYPE_OBJECT);
 
-static gboolean default_set_active (GstBufferPool * pool, gboolean active);
+static gboolean default_start (GstBufferPool * pool);
+static gboolean default_stop (GstBufferPool * pool);
 static gboolean default_set_config (GstBufferPool * pool,
     GstStructure * config);
 static GstFlowReturn default_alloc_buffer (GstBufferPool * pool,
@@ -84,7 +85,8 @@ gst_buffer_pool_class_init (GstBufferPoolClass * klass)
 
   gobject_class->finalize = gst_buffer_pool_finalize;
 
-  klass->set_active = default_set_active;
+  klass->start = default_start;
+  klass->stop = default_stop;
   klass->set_config = default_set_config;
   klass->acquire_buffer = default_acquire_buffer;
   klass->alloc_buffer = default_alloc_buffer;
@@ -101,37 +103,17 @@ gst_buffer_pool_init (GstBufferPool * pool)
 
   pool->poll = gst_poll_new_timer ();
   pool->queue = gst_atomic_queue_new (10);
+  pool->flushing = TRUE;
+  pool->active = FALSE;
+  pool->configured = FALSE;
+  pool->started = FALSE;
   pool->config = gst_structure_id_empty_new (GST_QUARK (BUFFER_POOL_CONFIG));
   gst_buffer_pool_config_set (pool->config, 0, 0, 0, 0, 0, 1);
-  default_set_active (pool, FALSE);
 
   GST_DEBUG_OBJECT (pool, "created");
 }
 
 static void
-default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
-{
-  gst_buffer_unref (buffer);
-}
-
-static void
-flush_buffers (GstBufferPool * pool)
-{
-  GstBuffer *buffer;
-  GstBufferPoolClass *pclass;
-
-  pclass = GST_BUFFER_POOL_GET_CLASS (pool);
-
-  /* clear the pool */
-  while ((buffer = gst_atomic_queue_pop (pool->queue))) {
-    gst_poll_read_control (pool->poll);
-
-    if (G_LIKELY (pclass->free_buffer))
-      pclass->free_buffer (pool, buffer);
-  }
-}
-
-static void
 gst_buffer_pool_finalize (GObject * object)
 {
   GstBufferPool *pool;
@@ -141,7 +123,6 @@ gst_buffer_pool_finalize (GObject * object)
   GST_DEBUG_OBJECT (pool, "finalize");
 
   gst_buffer_pool_set_active (pool, FALSE);
-  flush_buffers (pool);
   gst_atomic_queue_unref (pool->queue);
   gst_poll_free (pool->poll);
   gst_structure_free (pool->config);
@@ -168,33 +149,104 @@ gst_buffer_pool_new (void)
   return result;
 }
 
-/* the default implementation for allocating and freeing the
- * buffers when changing the active state */
+/* the default implementation for preallocating the buffers
+ * in the pool */
 static gboolean
-default_set_active (GstBufferPool * pool, gboolean active)
+default_start (GstBufferPool * pool)
 {
   guint i;
   GstBufferPoolPrivate *priv = pool->priv;
+  GstBufferPoolClass *pclass;
 
-  if (active) {
+  pclass = GST_BUFFER_POOL_GET_CLASS (pool);
+
+  /* no alloc function, error */
+  if (G_UNLIKELY (pclass->alloc_buffer == NULL))
+    goto no_alloc;
+
+  /* we need to prealloc buffers */
+  for (i = 0; i < priv->min_buffers; i++) {
+    GstBuffer *buffer;
+
+    if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
+      goto alloc_failed;
+
+    /* store in the queue */
+    gst_atomic_queue_push (pool->queue, buffer);
+    gst_poll_write_control (pool->poll);
+  }
+  return TRUE;
+
+  /* ERRORS */
+no_alloc:
+  {
+    GST_WARNING_OBJECT (pool, "no alloc function");
+    return FALSE;
+  }
+alloc_failed:
+  {
+    GST_WARNING_OBJECT (pool, "alloc function failed");
+    return FALSE;
+  }
+}
+
+static void
+default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
+{
+  gst_buffer_unref (buffer);
+}
+
+static gboolean
+default_stop (GstBufferPool * pool)
+{
+  GstBuffer *buffer;
+  GstBufferPoolClass *pclass;
+
+  pclass = GST_BUFFER_POOL_GET_CLASS (pool);
+
+  /* clear the pool */
+  while ((buffer = gst_atomic_queue_pop (pool->queue))) {
+    gst_poll_read_control (pool->poll);
+
+    if (G_LIKELY (pclass->free_buffer))
+      pclass->free_buffer (pool, buffer);
+  }
+  return TRUE;
+}
+
+static gboolean
+do_start (GstBufferPool * pool)
+{
+
+  if (!pool->started) {
     GstBufferPoolClass *pclass;
 
     pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
-    if (G_UNLIKELY (pclass->alloc_buffer == NULL))
-      return TRUE;
+    /* start the pool, subclasses should allocate buffers and put them
+     * in the queue */
+    if (G_LIKELY (pclass->start)) {
+      if (!pclass->start (pool))
+        return FALSE;
+    }
+    pool->started = TRUE;
+  }
+  return TRUE;
+}
 
-    /* we need to prealloc buffers */
-    for (i = priv->min_buffers; i > 0; i--) {
-      GstBuffer *buffer;
+static gboolean
+do_stop (GstBufferPool * pool)
+{
+  if (pool->started) {
+    GstBufferPoolClass *pclass;
 
-      if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
-        return FALSE;
+    pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
-      /* store in the queue */
-      gst_atomic_queue_push (pool->queue, buffer);
-      gst_poll_write_control (pool->poll);
+    if (G_LIKELY (pclass->stop)) {
+      if (!pclass->stop (pool))
+        return FALSE;
     }
+    pool->started = FALSE;
   }
   return TRUE;
 }
@@ -214,7 +266,7 @@ gboolean
 gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
 {
   GstBufferPoolClass *pclass;
-  gboolean res;
+  gboolean res = TRUE;
 
   g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
 
@@ -229,35 +281,51 @@ gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
   if (!pool->configured)
     goto not_configured;
 
-  if (!active) {
+  if (active) {
+    if (!do_start (pool))
+      goto start_failed;
+
+    /* unset the flushing state now */
+    gst_poll_read_control (pool->poll);
+    g_atomic_int_set (&pool->flushing, FALSE);
+  } else {
+    /* set to flushing first */
     g_atomic_int_set (&pool->flushing, TRUE);
     gst_poll_write_control (pool->poll);
-  }
-
-  if (G_LIKELY (pclass->set_active))
-    res = pclass->set_active (pool, active);
-  else
-    res = TRUE;
 
-  if (res) {
-    if (active) {
-      gst_poll_read_control (pool->poll);
-      g_atomic_int_set (&pool->flushing, FALSE);
+    /* when all buffers are in the pool, free them. Else they will be
+     * freed when they are released */
+    if (g_atomic_int_get (&pool->outstanding) == 0) {
+      if (!do_stop (pool))
+        goto stop_failed;
     }
-
-    pool->active = active;
   }
+  pool->active = active;
   GST_BUFFER_POOL_UNLOCK (pool);
 
   return res;
 
 was_ok:
   {
+    GST_DEBUG_OBJECT (pool, "pool was in the right state");
     GST_BUFFER_POOL_UNLOCK (pool);
     return TRUE;
   }
 not_configured:
   {
+    GST_ERROR_OBJECT (pool, "pool was not configured");
+    GST_BUFFER_POOL_UNLOCK (pool);
+    return FALSE;
+  }
+start_failed:
+  {
+    GST_ERROR_OBJECT (pool, "start failed");
+    GST_BUFFER_POOL_UNLOCK (pool);
+    return FALSE;
+  }
+stop_failed:
+  {
+    GST_WARNING_OBJECT (pool, "stop failed");
     GST_BUFFER_POOL_UNLOCK (pool);
     return FALSE;
   }
@@ -310,7 +378,7 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
 
   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
 
-  /* free the buffer when we are inactive */
+  /* set the new config */
   if (G_LIKELY (pclass->set_config))
     result = pclass->set_config (pool, config);
   else
@@ -331,11 +399,13 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
   /* ERRORS */
 was_active:
   {
+    GST_WARNING_OBJECT (pool, "can't change config, we are active");
     GST_BUFFER_POOL_UNLOCK (pool);
     return FALSE;
   }
 have_outstanding:
   {
+    GST_WARNING_OBJECT (pool, "can't change config, have outstanding buffers");
     GST_BUFFER_POOL_UNLOCK (pool);
     return FALSE;
   }
@@ -562,7 +632,23 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
 
   if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
     if (g_atomic_int_dec_and_test (&pool->outstanding)) {
-      flush_buffers (pool);
+      /* take the lock so that set_active is not run concurrently */
+      GST_BUFFER_POOL_LOCK (pool);
+      /* recheck the flushing state in the lock, the pool could have been
+       * set to active again */
+      if (g_atomic_int_get (&pool->flushing)) {
+        if (!do_stop (pool))
+          goto stop_failed;
+      }
+      GST_BUFFER_POOL_UNLOCK (pool);
     }
   }
+  return;
+
+stop_failed:
+  {
+    GST_WARNING_OBJECT (pool, "stop failed");
+    GST_BUFFER_POOL_UNLOCK (pool);
+    return;
+  }
 }
index e75b82f..48df0b8 100644 (file)
@@ -98,6 +98,7 @@ struct _GstBufferPool {
   /*< private >*/
   gboolean             active;
   gboolean             flushing;
+  gboolean             started;
   gint                 outstanding;
   GstAtomicQueue      *queue;
   GstPoll             *poll;
@@ -114,9 +115,11 @@ struct _GstBufferPoolClass {
   GstObjectClass    object_class;
 
   /* vmethods */
-  gboolean       (*set_active)     (GstBufferPool *pool, gboolean active);
   gboolean       (*set_config)     (GstBufferPool *pool, GstStructure *config);
 
+  gboolean       (*start)          (GstBufferPool *pool);
+  gboolean       (*stop)           (GstBufferPool *pool);
+
   GstFlowReturn  (*acquire_buffer) (GstBufferPool *pool, GstBuffer **buffer,
                                     GstBufferPoolParams *params);
   GstFlowReturn  (*alloc_buffer)   (GstBufferPool *pool, GstBuffer **buffer,