G_DEFINE_TYPE (GstBufferPool, gst_buffer_pool, GST_TYPE_OBJECT);
-static gboolean default_set_active (GstBufferPool * pool, gboolean active);
+static gboolean default_start (GstBufferPool * pool);
+static gboolean default_stop (GstBufferPool * pool);
static gboolean default_set_config (GstBufferPool * pool,
GstStructure * config);
static GstFlowReturn default_alloc_buffer (GstBufferPool * pool,
gobject_class->finalize = gst_buffer_pool_finalize;
- klass->set_active = default_set_active;
+ klass->start = default_start;
+ klass->stop = default_stop;
klass->set_config = default_set_config;
klass->acquire_buffer = default_acquire_buffer;
klass->alloc_buffer = default_alloc_buffer;
pool->poll = gst_poll_new_timer ();
pool->queue = gst_atomic_queue_new (10);
+ pool->flushing = TRUE;
+ pool->active = FALSE;
+ pool->configured = FALSE;
+ pool->started = FALSE;
pool->config = gst_structure_id_empty_new (GST_QUARK (BUFFER_POOL_CONFIG));
gst_buffer_pool_config_set (pool->config, 0, 0, 0, 0, 0, 1);
- default_set_active (pool, FALSE);
GST_DEBUG_OBJECT (pool, "created");
}
static void
-default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
-{
- gst_buffer_unref (buffer);
-}
-
-static void
-flush_buffers (GstBufferPool * pool)
-{
- GstBuffer *buffer;
- GstBufferPoolClass *pclass;
-
- pclass = GST_BUFFER_POOL_GET_CLASS (pool);
-
- /* clear the pool */
- while ((buffer = gst_atomic_queue_pop (pool->queue))) {
- gst_poll_read_control (pool->poll);
-
- if (G_LIKELY (pclass->free_buffer))
- pclass->free_buffer (pool, buffer);
- }
-}
-
-static void
gst_buffer_pool_finalize (GObject * object)
{
GstBufferPool *pool;
GST_DEBUG_OBJECT (pool, "finalize");
gst_buffer_pool_set_active (pool, FALSE);
- flush_buffers (pool);
gst_atomic_queue_unref (pool->queue);
gst_poll_free (pool->poll);
gst_structure_free (pool->config);
return result;
}
-/* the default implementation for allocating and freeing the
- * buffers when changing the active state */
+/* the default implementation for preallocating the buffers
+ * in the pool */
static gboolean
-default_set_active (GstBufferPool * pool, gboolean active)
+default_start (GstBufferPool * pool)
{
guint i;
GstBufferPoolPrivate *priv = pool->priv;
+ GstBufferPoolClass *pclass;
- if (active) {
+ pclass = GST_BUFFER_POOL_GET_CLASS (pool);
+
+ /* no alloc function, error */
+ if (G_UNLIKELY (pclass->alloc_buffer == NULL))
+ goto no_alloc;
+
+ /* we need to prealloc buffers */
+ for (i = 0; i < priv->min_buffers; i++) {
+ GstBuffer *buffer;
+
+ if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
+ goto alloc_failed;
+
+ /* store in the queue */
+ gst_atomic_queue_push (pool->queue, buffer);
+ gst_poll_write_control (pool->poll);
+ }
+ return TRUE;
+
+ /* ERRORS */
+no_alloc:
+ {
+ GST_WARNING_OBJECT (pool, "no alloc function");
+ return FALSE;
+ }
+alloc_failed:
+ {
+ GST_WARNING_OBJECT (pool, "alloc function failed");
+ return FALSE;
+ }
+}
+
+static void
+default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
+{
+ gst_buffer_unref (buffer);
+}
+
+static gboolean
+default_stop (GstBufferPool * pool)
+{
+ GstBuffer *buffer;
+ GstBufferPoolClass *pclass;
+
+ pclass = GST_BUFFER_POOL_GET_CLASS (pool);
+
+ /* clear the pool */
+ while ((buffer = gst_atomic_queue_pop (pool->queue))) {
+ gst_poll_read_control (pool->poll);
+
+ if (G_LIKELY (pclass->free_buffer))
+ pclass->free_buffer (pool, buffer);
+ }
+ return TRUE;
+}
+
+static gboolean
+do_start (GstBufferPool * pool)
+{
+
+ if (!pool->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
- if (G_UNLIKELY (pclass->alloc_buffer == NULL))
- return TRUE;
+ /* start the pool, subclasses should allocate buffers and put them
+ * in the queue */
+ if (G_LIKELY (pclass->start)) {
+ if (!pclass->start (pool))
+ return FALSE;
+ }
+ pool->started = TRUE;
+ }
+ return TRUE;
+}
- /* we need to prealloc buffers */
- for (i = priv->min_buffers; i > 0; i--) {
- GstBuffer *buffer;
+static gboolean
+do_stop (GstBufferPool * pool)
+{
+ if (pool->started) {
+ GstBufferPoolClass *pclass;
- if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
- return FALSE;
+ pclass = GST_BUFFER_POOL_GET_CLASS (pool);
- /* store in the queue */
- gst_atomic_queue_push (pool->queue, buffer);
- gst_poll_write_control (pool->poll);
+ if (G_LIKELY (pclass->stop)) {
+ if (!pclass->stop (pool))
+ return FALSE;
}
+ pool->started = FALSE;
}
return TRUE;
}
gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
{
GstBufferPoolClass *pclass;
- gboolean res;
+ gboolean res = TRUE;
g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
if (!pool->configured)
goto not_configured;
- if (!active) {
+ if (active) {
+ if (!do_start (pool))
+ goto start_failed;
+
+ /* unset the flushing state now */
+ gst_poll_read_control (pool->poll);
+ g_atomic_int_set (&pool->flushing, FALSE);
+ } else {
+ /* set to flushing first */
g_atomic_int_set (&pool->flushing, TRUE);
gst_poll_write_control (pool->poll);
- }
-
- if (G_LIKELY (pclass->set_active))
- res = pclass->set_active (pool, active);
- else
- res = TRUE;
- if (res) {
- if (active) {
- gst_poll_read_control (pool->poll);
- g_atomic_int_set (&pool->flushing, FALSE);
+ /* when all buffers are in the pool, free them. Else they will be
+ * freed when they are released */
+ if (g_atomic_int_get (&pool->outstanding) == 0) {
+ if (!do_stop (pool))
+ goto stop_failed;
}
-
- pool->active = active;
}
+ pool->active = active;
GST_BUFFER_POOL_UNLOCK (pool);
return res;
was_ok:
{
+ GST_DEBUG_OBJECT (pool, "pool was in the right state");
GST_BUFFER_POOL_UNLOCK (pool);
return TRUE;
}
not_configured:
{
+ GST_ERROR_OBJECT (pool, "pool was not configured");
+ GST_BUFFER_POOL_UNLOCK (pool);
+ return FALSE;
+ }
+start_failed:
+ {
+ GST_ERROR_OBJECT (pool, "start failed");
+ GST_BUFFER_POOL_UNLOCK (pool);
+ return FALSE;
+ }
+stop_failed:
+ {
+ GST_WARNING_OBJECT (pool, "stop failed");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
- /* free the buffer when we are inactive */
+ /* set the new config */
if (G_LIKELY (pclass->set_config))
result = pclass->set_config (pool, config);
else
/* ERRORS */
was_active:
{
+ GST_WARNING_OBJECT (pool, "can't change config, we are active");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
have_outstanding:
{
+ GST_WARNING_OBJECT (pool, "can't change config, have outstanding buffers");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
if (g_atomic_int_dec_and_test (&pool->outstanding)) {
- flush_buffers (pool);
+ /* take the lock so that set_active is not run concurrently */
+ GST_BUFFER_POOL_LOCK (pool);
+ /* recheck the flushing state in the lock, the pool could have been
+ * set to active again */
+ if (g_atomic_int_get (&pool->flushing)) {
+ if (!do_stop (pool))
+ goto stop_failed;
+ }
+ GST_BUFFER_POOL_UNLOCK (pool);
}
}
+ return;
+
+stop_failed:
+ {
+ GST_WARNING_OBJECT (pool, "stop failed");
+ GST_BUFFER_POOL_UNLOCK (pool);
+ return;
+ }
}