v4l2: Record buffer states in pool to fix dequeue race
authorJames Cowgill <james.cowgill@blaize.com>
Mon, 27 Sep 2021 15:52:22 +0000 (16:52 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Thu, 11 Nov 2021 22:30:31 +0000 (22:30 +0000)
The `gst_v4l2_buffer_pool_dqbuf` function contains this ominous comment:

    /* get our GstBuffer with that index from the pool, if the buffer was
     * outstanding we have a serious problem.
     */
    outbuf = pool->buffers[group->buffer.index];

Unfortunately it is common for buffers in _output_ buffer pools to be
both queued and outstanding at the same time. This can happen if the
upstream element keeps a reference to the buffer, or in an encoder
element itself when it keeps a reference to the input buffer for each
frame.

Since the current code doesn't handle this case properly we can end up
with crashes in other elements such as:

    (gst-launch-1.0:32559): CRITICAL **: 17:33:35.740: gst_video_frame_map_id: assertion 'GST_IS_BUFFER (buffer)' failed

and:

    (gst-launch-1.0:231): GStreamer-CRITICAL **: 00:16:20.882: write map requested on non-writable buffer

Both these crashes are caused by a race condition related to releasing
the same buffer twice from two different threads. If a buffer is queued
and outstanding this situation is possible:

**Thread 1**
- Calls `gst_buffer_unref` decrementing the reference count to zero.
- The core GstBufferPool object marks the buffer non-outstanding.
- Calls the V4L2 release buffer function.
- If the buffer is _not_ queued:
  - Release it back to the free pool (containing non-queued buffers).

**Thread 2**
- Dequeues the queued output buffer.
  - Marks the buffer as not queued.
- If the buffer is _not_ outstanding:
  - Calls the V4L2 release buffer function.
  - Release it back to the free pool (containing non-queued buffers).

If both of these threads run at exactly the same time there is a small
window where the buffer is marked both not outstanding and not queued
but before it has been released. In this case the buffer will be freed
twice causing the above crashes.

Unfortunately the variable recording whether a buffer is outstanding is
part of the core `GstBuffer` object and is managed by `GstBufferPool` so
it's not as straightforward as adding a mutex. Instead we can fix this
by additionally recording the buffer state in `GstV4l2BufferPool`, and
handle "internal" and "external" buffer release separately so we can
detect when a buffer becomes not outstanding.

In the new solution:
- The "external" buffer pool release and the "dqbuf" functions
  atomically update the buffer state and determine if a buffer is still
  queued or outstanding.
- Subsequent code and a new
  `gst_v4l2_buffer_pool_complete_release_buffer` function can proceed to
  release (or not) a buffer knowing that it's not racing with another
  thread.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1010>

subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.c
subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.h

index 45c672d114c5f32b079220c624b2083a6df2eaee..58667de434ebeb4b9ac850ecf234d4e7197cade0 100644 (file)
@@ -66,8 +66,22 @@ enum _GstV4l2BufferPoolAcquireFlags
   GST_V4L2_BUFFER_POOL_ACQUIRE_FLAG_LAST
 };
 
-static void gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool,
-    GstBuffer * buffer);
+/* Buffer state flags */
+enum _GstV4l2BufferState
+{
+  /* Buffer is free (either on the GstBufferPool free queue, or no GstBuffer has
+   * been allocated yet) */
+  BUFFER_STATE_FREE = 0,
+
+  /* Buffer had outstanding external users */
+  BUFFER_STATE_OUTSTANDING = 1,
+
+  /* Buffer is on one of the kernel queues */
+  BUFFER_STATE_QUEUED = 2,
+};
+
+static void gst_v4l2_buffer_pool_complete_release_buffer (GstBufferPool * bpool,
+    GstBuffer * buffer, gboolean queued);
 
 static gboolean
 gst_v4l2_is_buffer_valid (GstBuffer * buffer, GstV4l2MemoryGroup ** out_group)
@@ -458,6 +472,11 @@ gst_v4l2_buffer_pool_alloc_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
 
     for (i = 0; i < group->n_mem; i++)
       gst_buffer_append_memory (newbuf, group->mem[i]);
+
+    if (g_atomic_int_get (&pool->buffer_state[group->buffer.index])) {
+      GST_WARNING_OBJECT (pool, "newly allocated buffer %u is not free",
+          group->buffer.index);
+    }
   } else if (newbuf == NULL) {
     goto allocation_failed;
   }
@@ -723,16 +742,21 @@ gst_v4l2_buffer_pool_streamoff (GstV4l2BufferPool * pool)
   }
 
   for (i = 0; i < VIDEO_MAX_FRAME; i++) {
-    if (pool->buffers[i]) {
+    gint old_buffer_state =
+        g_atomic_int_and (&pool->buffer_state[i], ~BUFFER_STATE_QUEUED);
+    if (old_buffer_state & BUFFER_STATE_QUEUED) {
       GstBuffer *buffer = pool->buffers[i];
       GstBufferPool *bpool = GST_BUFFER_POOL (pool);
 
       pool->buffers[i] = NULL;
 
-      if (V4L2_TYPE_IS_OUTPUT (pool->obj->type))
-        gst_v4l2_buffer_pool_release_buffer (bpool, buffer);
-      else                      /* Don't re-enqueue capture buffer on stop */
-        pclass->release_buffer (bpool, buffer);
+      if (!(old_buffer_state & BUFFER_STATE_OUTSTANDING)) {
+        if (V4L2_TYPE_IS_OUTPUT (pool->obj->type))
+          gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, FALSE);
+
+        else                    /* Don't re-enqueue capture buffer on stop */
+          pclass->release_buffer (bpool, buffer);
+      }
 
       g_atomic_int_add (&pool->num_queued, -1);
     }
@@ -1175,14 +1199,18 @@ gst_v4l2_buffer_pool_qbuf (GstV4l2BufferPool * pool, GstBuffer * buf,
     GstV4l2MemoryGroup * group, guint32 * frame_number)
 {
   const GstV4l2Object *obj = pool->obj;
+  gint old_buffer_state;
   gint index;
 
   index = group->buffer.index;
 
-  if (pool->buffers[index] != NULL)
+  old_buffer_state =
+      g_atomic_int_or (&pool->buffer_state[index], BUFFER_STATE_QUEUED);
+  if (old_buffer_state & BUFFER_STATE_QUEUED)
     goto already_queued;
 
-  GST_LOG_OBJECT (pool, "queuing buffer %i", index);
+  GST_LOG_OBJECT (pool, "queuing buffer %i, previous-state = %i", index,
+      old_buffer_state);
 
   if (V4L2_TYPE_IS_OUTPUT (obj->type)) {
     enum v4l2_field field;
@@ -1238,6 +1266,7 @@ was_orphaned:
   {
     GST_DEBUG_OBJECT (pool, "pool was orphaned, not queuing back buffer.");
     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_TAG_MEMORY);
+    g_atomic_int_and (&pool->buffer_state[index], ~BUFFER_STATE_QUEUED);
     GST_OBJECT_UNLOCK (pool);
     return GST_FLOW_FLUSHING;
   }
@@ -1248,6 +1277,7 @@ queue_failed:
     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_TAG_MEMORY);
     g_atomic_int_add (&pool->num_queued, -1);
     pool->buffers[index] = NULL;
+    g_atomic_int_and (&pool->buffer_state[index], ~BUFFER_STATE_QUEUED);
     GST_OBJECT_UNLOCK (pool);
     return GST_FLOW_ERROR;
   }
@@ -1255,7 +1285,7 @@ queue_failed:
 
 static GstFlowReturn
 gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
-    gboolean wait)
+    gboolean * outstanding, gboolean wait)
 {
   GstFlowReturn res;
   GstBuffer *outbuf = NULL;
@@ -1265,6 +1295,7 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
   GstVideoMeta *vmeta;
   gsize size;
   gint i;
+  gint old_buffer_state;
 
   if ((res = gst_v4l2_buffer_pool_poll (pool, wait)) < GST_FLOW_OK)
     goto poll_failed;
@@ -1287,14 +1318,23 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
   if (res != GST_FLOW_OK)
     goto dqbuf_failed;
 
-  /* get our GstBuffer with that index from the pool, if the buffer was
-   * outstanding we have a serious problem.
-   */
+  old_buffer_state =
+      g_atomic_int_and (&pool->buffer_state[group->buffer.index],
+      ~BUFFER_STATE_QUEUED);
+  if (!(old_buffer_state & BUFFER_STATE_QUEUED))
+    goto no_buffer;
+
+  if (outstanding) {
+    *outstanding = (old_buffer_state & BUFFER_STATE_OUTSTANDING) != 0;
+  } else if (old_buffer_state & BUFFER_STATE_OUTSTANDING) {
+    GST_WARNING_OBJECT (pool, "unexpected outstanding buffer %u",
+        group->buffer.index);
+  }
+
   outbuf = pool->buffers[group->buffer.index];
   if (outbuf == NULL)
     goto no_buffer;
 
-  /* mark the buffer outstanding */
   pool->buffers[group->buffer.index] = NULL;
   if (g_atomic_int_dec_and_test (&pool->num_queued)) {
     GST_OBJECT_LOCK (pool);
@@ -1309,10 +1349,10 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
   for (i = 0; i < group->n_mem; i++) {
     GST_LOG_OBJECT (pool,
         "dequeued buffer %p seq:%d (ix=%d), mem %p used %d, plane=%d, flags %08x, ts %"
-        GST_TIME_FORMAT ", pool-queued=%d, buffer=%p", outbuf,
-        group->buffer.sequence, group->buffer.index, group->mem[i],
+        GST_TIME_FORMAT ", pool-queued=%d, buffer=%p, previous-state=%i",
+        outbuf, group->buffer.sequence, group->buffer.index, group->mem[i],
         group->planes[i].bytesused, i, group->buffer.flags,
-        GST_TIME_ARGS (timestamp), pool->num_queued, outbuf);
+        GST_TIME_ARGS (timestamp), pool->num_queued, outbuf, old_buffer_state);
 
     if (vmeta) {
       vmeta->offset[i] = size;
@@ -1473,7 +1513,7 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
           /* just dequeue a buffer, we basically use the queue of v4l2 as the
            * storage for our buffers. This function does poll first so we can
            * interrupt it fine. */
-          ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer, TRUE);
+          ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer, NULL, TRUE);
           break;
         }
         default:
@@ -1514,23 +1554,48 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
       break;
   }
 done:
+  /* Mark buffer as outstanding */
+  if (ret == GST_FLOW_OK) {
+    GstV4l2MemoryGroup *group;
+    if (gst_v4l2_is_buffer_valid (*buffer, &group)) {
+      GST_LOG_OBJECT (pool, "mark buffer %u outstanding", group->buffer.index);
+      g_atomic_int_or (&pool->buffer_state[group->buffer.index],
+          BUFFER_STATE_OUTSTANDING);
+    }
+  }
+
   return ret;
 }
 
+/*
+ * Completes a release buffer operation
+ *
+ * Before calling this function:
+ * - The buffer state (if applicable) must have already been updated.
+ * - The buffer must not be outstanding.
+ * - The "queued" argument contains whether the buffer is currently queued.
+ */
 static void
-gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
+gst_v4l2_buffer_pool_complete_release_buffer (GstBufferPool * bpool,
+    GstBuffer * buffer, gboolean queued)
 {
   GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL (bpool);
   GstBufferPoolClass *pclass = GST_BUFFER_POOL_CLASS (parent_class);
   GstV4l2Object *obj = pool->obj;
 
-  GST_DEBUG_OBJECT (pool, "release buffer %p", buffer);
+  GST_DEBUG_OBJECT (pool, "complete release buffer %p (queued = %s)", buffer,
+      queued ? "yes" : "no");
 
   switch (obj->type) {
     case V4L2_BUF_TYPE_VIDEO_CAPTURE:
     case V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE:
       /* capture, put the buffer back in the queue so that we can refill it
        * later. */
+      if (queued) {
+        GST_WARNING_OBJECT (pool,
+            "capture buffer %p was release while still queued", buffer);
+      }
+
       switch (obj->mode) {
         case GST_V4L2_IO_RW:
           /* release back in the pool */
@@ -1594,7 +1659,7 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
 
           index = group->buffer.index;
 
-          if (pool->buffers[index] == NULL) {
+          if (!queued) {
             GST_LOG_OBJECT (pool, "buffer %u not queued, putting on free list",
                 index);
 
@@ -1628,6 +1693,25 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
   }
 }
 
+static void
+gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
+{
+  GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL (bpool);
+  GstV4l2MemoryGroup *group;
+  gboolean queued = FALSE;
+
+  if (gst_v4l2_is_buffer_valid (buffer, &group)) {
+    gint old_buffer_state =
+        g_atomic_int_and (&pool->buffer_state[group->buffer.index],
+        ~BUFFER_STATE_OUTSTANDING);
+    queued = (old_buffer_state & BUFFER_STATE_QUEUED) != 0;
+    GST_LOG_OBJECT (pool, "mark buffer %u not outstanding",
+        group->buffer.index);
+  }
+
+  gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, queued);
+}
+
 static void
 gst_v4l2_buffer_pool_dispose (GObject * object)
 {
@@ -1678,6 +1762,8 @@ gst_v4l2_buffer_pool_init (GstV4l2BufferPool * pool)
   g_cond_init (&pool->empty_cond);
   pool->empty = TRUE;
   pool->orphaned = FALSE;
+  for (gint i = 0; i < VIDEO_MAX_FRAME; i++)
+    g_atomic_int_set (&pool->buffer_state[i], BUFFER_STATE_FREE);
 }
 
 static void
@@ -1931,13 +2017,13 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
           }
 
           /* buffer not from our pool, grab a frame and copy it into the target */
-          if ((ret = gst_v4l2_buffer_pool_dqbuf (pool, &tmp, TRUE))
+          if ((ret = gst_v4l2_buffer_pool_dqbuf (pool, &tmp, NULL, TRUE))
               != GST_FLOW_OK)
             goto done;
 
           /* An empty buffer on capture indicates the end of stream */
           if (gst_buffer_get_size (tmp) == 0) {
-            gst_v4l2_buffer_pool_release_buffer (bpool, tmp);
+            gst_v4l2_buffer_pool_complete_release_buffer (bpool, tmp, FALSE);
 
             /* Legacy M2M devices return empty buffer when drained */
             if (GST_V4L2_IS_M2M (obj->device_caps))
@@ -1947,7 +2033,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
           ret = gst_v4l2_buffer_pool_copy_buffer (pool, *buf, tmp);
 
           /* an queue the buffer again after the copy */
-          gst_v4l2_buffer_pool_release_buffer (bpool, tmp);
+          gst_v4l2_buffer_pool_complete_release_buffer (bpool, tmp, FALSE);
 
           if (ret != GST_FLOW_OK)
             goto copy_failed;
@@ -2014,6 +2100,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
           GstBuffer *buffer;
           GstV4l2MemoryGroup *group;
           gint index;
+          gboolean outstanding;
 
           if ((*buf)->pool != bpool)
             goto copying;
@@ -2025,7 +2112,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
 
           GST_LOG_OBJECT (pool, "processing buffer %i from our pool", index);
 
-          if (pool->buffers[index] != NULL) {
+          if (g_atomic_int_get (&pool->buffer_state[index]) &
+              BUFFER_STATE_QUEUED) {
             GST_LOG_OBJECT (pool, "buffer %i already queued, copying", index);
             goto copying;
           }
@@ -2076,6 +2164,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
             gst_v4l2_allocator_flush (pool->vallocator);
 
             pool->buffers[group->buffer.index] = NULL;
+            g_atomic_int_and (&pool->buffer_state[group->buffer.index],
+                ~BUFFER_STATE_QUEUED);
 
             gst_mini_object_set_qdata (GST_MINI_OBJECT (to_queue),
                 GST_V4L2_IMPORT_QUARK, NULL, NULL);
@@ -2089,20 +2179,23 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
           gst_buffer_unref (to_queue);
 
           /* release as many buffer as possible */
-          while (gst_v4l2_buffer_pool_dqbuf (pool, &buffer, FALSE) ==
-              GST_FLOW_OK) {
-            if (buffer->pool == NULL)
-              gst_v4l2_buffer_pool_release_buffer (bpool, buffer);
+          while (gst_v4l2_buffer_pool_dqbuf (pool, &buffer, &outstanding,
+                  FALSE) == GST_FLOW_OK) {
+            if (!outstanding)
+              gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer,
+                  FALSE);
           }
 
           if (g_atomic_int_get (&pool->num_queued) >= pool->min_latency) {
             /* all buffers are queued, try to dequeue one and release it back
              * into the pool so that _acquire can get to it again. */
-            ret = gst_v4l2_buffer_pool_dqbuf (pool, &buffer, TRUE);
-            if (ret == GST_FLOW_OK && buffer->pool == NULL)
+            ret =
+                gst_v4l2_buffer_pool_dqbuf (pool, &buffer, &outstanding, TRUE);
+            if (ret == GST_FLOW_OK && !outstanding)
               /* release the rendered buffer back into the pool. This wakes up any
                * thread waiting for a buffer in _acquire(). */
-              gst_v4l2_buffer_pool_release_buffer (bpool, buffer);
+              gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer,
+                  FALSE);
           }
           break;
         }
index f4fdf92b4e516c11af0d1c30e975e51df583224b..10742634f08bd211e076fef481315fb2d890633a 100644 (file)
@@ -93,6 +93,7 @@ struct _GstV4l2BufferPool
   gboolean flushing;
 
   GstBuffer *buffers[VIDEO_MAX_FRAME];
+  volatile gint buffer_state[VIDEO_MAX_FRAME];
 
   /* signal handlers */
   gulong group_released_handler;