va: pool, allocator: honor GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT
authorVíctor Manuel Jáquez Leal <vjaquez@igalia.com>
Tue, 17 Nov 2020 13:53:05 +0000 (14:53 +0100)
committerVíctor Manuel Jáquez Leal <vjaquez@igalia.com>
Tue, 24 Nov 2020 11:44:24 +0000 (12:44 +0100)
In order to honor GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT in VA pool, allocators'
wait_for_memory() has to be decoupled from their prepare_buffer() so it could be
called in pools' acquire_buffer() if the flag is not set.

wait_for_memory() functions are blocking so the received memories are assigned
to the fist requested buffer, if multithreaded calls. For this a new mutex were
added.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1815>

sys/va/gstvaallocator.c
sys/va/gstvaallocator.h
sys/va/gstvapool.c

index 18dc452..1d8d532 100644 (file)
@@ -403,6 +403,7 @@ struct _GstVaDmabufAllocator
 
   GstMemoryMapFunction parent_map;
 
+  GMutex buffer_lock;
   GCond buffer_cond;
 
   GstVideoInfo info;
@@ -446,6 +447,7 @@ gst_va_dmabuf_allocator_finalize (GObject * object)
 
   gst_atomic_queue_unref (self->available_mems);
   gst_clear_object (&self->display);
+  g_mutex_clear (&self->buffer_lock);
   g_cond_clear (&self->buffer_cond);
 
   G_OBJECT_CLASS (dmabuf_parent_class)->finalize (object);
@@ -477,6 +479,7 @@ static void
 gst_va_dmabuf_allocator_init (GstVaDmabufAllocator * self)
 {
   self->available_mems = gst_atomic_queue_new (2);
+  g_mutex_init (&self->buffer_lock);
   g_cond_init (&self->buffer_cond);
 
   self->parent_map = GST_ALLOCATOR (self)->mem_map;
@@ -509,13 +512,12 @@ gst_va_dmabuf_memory_release (GstMiniObject * mini_object)
   GstMemory *mem = GST_MEMORY_CAST (mini_object);
   GstVaDmabufAllocator *self = GST_VA_DMABUF_ALLOCATOR (mem->allocator);
 
-  GST_OBJECT_LOCK (self);
-
   GST_LOG ("releasing %p", mem);
+
+  g_mutex_lock (&self->buffer_lock);
   gst_atomic_queue_push (self->available_mems, gst_memory_ref (mem));
   g_cond_signal (&self->buffer_cond);
-
-  GST_OBJECT_UNLOCK (self);
+  g_mutex_unlock (&self->buffer_lock);
 
   /* Keep last in case we are holding on the last allocator ref */
   gst_object_unref (mem->allocator);
@@ -649,30 +651,23 @@ gst_va_dmabuf_allocator_setup_buffer (GstAllocator * allocator,
   return gst_va_dmabuf_allocator_setup_buffer_full (allocator, buffer, NULL);
 }
 
-gboolean
-gst_va_dmabuf_allocator_prepare_buffer (GstAllocator * allocator,
+static VASurfaceID
+gst_va_dmabuf_allocator_prepare_buffer_unlocked (GstVaDmabufAllocator * self,
     GstBuffer * buffer)
 {
-  GstMemory *pmem, *mem[GST_VIDEO_MAX_PLANES] = { 0, };
-  GstVaDmabufAllocator *self = GST_VA_DMABUF_ALLOCATOR (allocator);
-  VASurfaceID surface, psurface;
+  GstMemory *mem[GST_VIDEO_MAX_PLANES] = { 0, };
+  VASurfaceID surface;
   gint j, idx = 1;
 
-  GST_OBJECT_LOCK (self);
-
-  /* if available mems, use them */
-  while (gst_atomic_queue_length (self->available_mems) == 0 && !self->flushing)
-    g_cond_wait (&self->buffer_cond, GST_OBJECT_GET_LOCK (self));
-
-  if (self->flushing) {
-    GST_OBJECT_UNLOCK (self);
-    return FALSE;
-  }
-
   mem[0] = gst_atomic_queue_pop (self->available_mems);
+  if (!mem[0])
+    return VA_INVALID_ID;
+
   surface = gst_va_memory_get_surface (mem[0]);
+  while (surface != VA_INVALID_ID) {
+    GstMemory *pmem;
+    VASurfaceID psurface;
 
-  do {
     pmem = gst_atomic_queue_peek (self->available_mems);
     if (!pmem)
       break;
@@ -682,9 +677,7 @@ gst_va_dmabuf_allocator_prepare_buffer (GstAllocator * allocator,
       break;
 
     mem[idx++] = gst_atomic_queue_pop (self->available_mems);
-  } while (TRUE);
-
-  GST_OBJECT_UNLOCK (self);
+  };
 
   /* append them in reverse order */
   for (j = idx - 1; j >= 0; j--) {
@@ -692,9 +685,53 @@ gst_va_dmabuf_allocator_prepare_buffer (GstAllocator * allocator,
     gst_buffer_append_memory (buffer, mem[j]);
   }
 
-  GST_TRACE_OBJECT (self, "Prepared surface %#x in buffer %p", surface, buffer);
+  return surface;
+}
 
-  return TRUE;
+gboolean
+gst_va_dmabuf_allocator_prepare_buffer (GstAllocator * allocator,
+    GstBuffer * buffer)
+{
+  GstVaDmabufAllocator *self = GST_VA_DMABUF_ALLOCATOR (allocator);
+  VASurfaceID surface;
+
+  g_mutex_lock (&self->buffer_lock);
+  surface = gst_va_dmabuf_allocator_prepare_buffer_unlocked (self, buffer);
+  g_mutex_unlock (&self->buffer_lock);
+
+  if (surface != VA_INVALID_ID) {
+    GST_TRACE_OBJECT (self, "Prepared surface %#x in buffer %p", surface,
+        buffer);
+  }
+
+  return (surface != VA_INVALID_ID);
+}
+
+gboolean
+gst_va_dmabuf_allocator_wait_for_memory (GstAllocator * allocator,
+    GstBuffer * buffer)
+{
+  GstVaDmabufAllocator *self = GST_VA_DMABUF_ALLOCATOR (allocator);
+  VASurfaceID surface;
+
+  g_mutex_lock (&self->buffer_lock);
+  while (gst_atomic_queue_length (self->available_mems) == 0 && !self->flushing)
+    g_cond_wait (&self->buffer_cond, &self->buffer_lock);
+
+  if (self->flushing) {
+    g_mutex_unlock (&self->buffer_lock);
+    return FALSE;
+  }
+
+  surface = gst_va_dmabuf_allocator_prepare_buffer_unlocked (self, buffer);
+  g_mutex_unlock (&self->buffer_lock);
+
+  if (surface != VA_INVALID_ID) {
+    GST_TRACE_OBJECT (self, "Prepared surface %#x in buffer %p", surface,
+        buffer);
+  }
+
+  return (surface != VA_INVALID_ID);
 }
 
 void
@@ -702,13 +739,13 @@ gst_va_dmabuf_allocator_flush (GstAllocator * allocator)
 {
   GstVaDmabufAllocator *self = GST_VA_DMABUF_ALLOCATOR (allocator);
 
-  GST_OBJECT_LOCK (self);
+  g_mutex_lock (&self->buffer_lock);
   self->flushing = TRUE;
   _available_mems_flush (self->display, self->available_mems,
       &self->surface_count);
   self->flushing = FALSE;
   g_cond_broadcast (&self->buffer_cond);
-  GST_OBJECT_UNLOCK (self);
+  g_mutex_unlock (&self->buffer_lock);
 }
 
 static gboolean
@@ -867,6 +904,7 @@ struct _GstVaAllocator
   guint32 fourcc;
   guint32 rt_format;
 
+  GMutex buffer_lock;
   GCond buffer_cond;
 
   GstVideoInfo info;
@@ -906,6 +944,7 @@ gst_va_allocator_finalize (GObject * object)
   gst_atomic_queue_unref (self->available_mems);
   gst_clear_object (&self->display);
   g_clear_pointer (&self->surface_formats, g_array_unref);
+  g_mutex_clear (&self->buffer_lock);
   g_cond_clear (&self->buffer_cond);
 
   G_OBJECT_CLASS (gst_va_allocator_parent_class)->finalize (object);
@@ -1220,6 +1259,7 @@ gst_va_allocator_init (GstVaAllocator * self)
 
   self->use_derived = TRUE;
 
+  g_mutex_init (&self->buffer_lock);
   g_cond_init (&self->buffer_cond);
 
   GST_OBJECT_FLAG_SET (self, GST_ALLOCATOR_FLAG_CUSTOM_ALLOC);
@@ -1231,13 +1271,12 @@ gst_va_memory_release (GstMiniObject * mini_object)
   GstMemory *mem = GST_MEMORY_CAST (mini_object);
   GstVaAllocator *self = GST_VA_ALLOCATOR (mem->allocator);
 
-  GST_OBJECT_LOCK (self);
-
   GST_LOG ("releasing %p", mem);
+
+  g_mutex_lock (&self->buffer_lock);
   gst_atomic_queue_push (self->available_mems, gst_memory_ref (mem));
   g_cond_signal (&self->buffer_cond);
-
-  GST_OBJECT_UNLOCK (self);
+  g_mutex_unlock (&self->buffer_lock);
 
   /* Keep last in case we are holding on the last allocator ref */
   gst_object_unref (mem->allocator);
@@ -1296,34 +1335,66 @@ gst_va_allocator_new (GstVaDisplay * display, GArray * surface_formats)
   return GST_ALLOCATOR (self);
 }
 
+static VASurfaceID
+gst_va_allocator_prepare_buffer_unlocked (GstVaAllocator * self,
+    GstBuffer * buffer)
+{
+  GstMemory *mem;
+  VASurfaceID surface;
+
+  mem = gst_atomic_queue_pop (self->available_mems);
+  if (!mem)
+    return VA_INVALID_ID;
+
+  gst_object_ref (mem->allocator);
+  surface = gst_va_memory_get_surface (mem);
+  gst_buffer_append_memory (buffer, mem);
+
+  return surface;
+}
+
 gboolean
 gst_va_allocator_prepare_buffer (GstAllocator * allocator, GstBuffer * buffer)
 {
-  GstMemory *mem;
   GstVaAllocator *self = GST_VA_ALLOCATOR (allocator);
   VASurfaceID surface;
 
-  GST_OBJECT_LOCK (self);
+  g_mutex_lock (&self->buffer_lock);
+  surface = gst_va_allocator_prepare_buffer_unlocked (self, buffer);
+  g_mutex_unlock (&self->buffer_lock);
 
-  if (self->flushing) {
-    GST_OBJECT_UNLOCK (self);
-    return FALSE;
+  if (surface != VA_INVALID_ID) {
+    GST_TRACE_OBJECT (self, "Prepared surface %#x in buffer %p", surface,
+        buffer);
   }
 
-  /* if available mems, use them */
-  while (gst_atomic_queue_length (self->available_mems) == 0)
-    g_cond_wait (&self->buffer_cond, GST_OBJECT_GET_LOCK (self));
+  return (surface != VA_INVALID_ID);
+}
 
-  mem = gst_atomic_queue_pop (self->available_mems);
-  GST_OBJECT_UNLOCK (self);
+gboolean
+gst_va_allocator_wait_for_memory (GstAllocator * allocator, GstBuffer * buffer)
+{
+  GstVaAllocator *self = GST_VA_ALLOCATOR (allocator);
+  VASurfaceID surface;
 
-  gst_object_ref (mem->allocator);
-  surface = gst_va_memory_get_surface (mem);
-  gst_buffer_append_memory (buffer, mem);
+  g_mutex_lock (&self->buffer_lock);
+  while (gst_atomic_queue_length (self->available_mems) == 0 && !self->flushing)
+    g_cond_wait (&self->buffer_cond, &self->buffer_lock);
+
+  if (self->flushing) {
+    g_mutex_unlock (&self->buffer_lock);
+    return FALSE;
+  }
 
-  GST_TRACE_OBJECT (self, "Prepared surface %#x in buffer %p", surface, buffer);
+  surface = gst_va_allocator_prepare_buffer_unlocked (self, buffer);
+  g_mutex_unlock (&self->buffer_lock);
 
-  return TRUE;
+  if (surface != VA_INVALID_ID) {
+    GST_TRACE_OBJECT (self, "Prepared surface %#x in buffer %p", surface,
+        buffer);
+  }
+
+  return (surface != VA_INVALID_ID);
 }
 
 void
@@ -1331,13 +1402,13 @@ gst_va_allocator_flush (GstAllocator * allocator)
 {
   GstVaAllocator *self = GST_VA_ALLOCATOR (allocator);
 
-  GST_OBJECT_LOCK (self);
+  g_mutex_lock (&self->buffer_lock);
   self->flushing = TRUE;
   _available_mems_flush (self->display, self->available_mems,
       &self->surface_count);
   self->flushing = FALSE;
   g_cond_broadcast (&self->buffer_cond);
-  GST_OBJECT_UNLOCK (self);
+  g_mutex_unlock (&self->buffer_lock);
 }
 
 static gboolean
index bbe364c..4324e70 100644 (file)
@@ -36,6 +36,8 @@ gboolean              gst_va_dmabuf_allocator_setup_buffer (GstAllocator * alloc
                                                             GstBuffer * buffer);
 gboolean              gst_va_dmabuf_allocator_prepare_buffer (GstAllocator * allocator,
                                                               GstBuffer * buffer);
+gboolean              gst_va_dmabuf_allocator_wait_for_memory (GstAllocator * allocator,
+                                                               GstBuffer * buffer);
 void                  gst_va_dmabuf_allocator_flush       (GstAllocator * allocator);
 gboolean              gst_va_dmabuf_allocator_set_format  (GstAllocator * allocator,
                                                            GstVideoInfo * info,
@@ -64,6 +66,8 @@ GstAllocator *        gst_va_allocator_new                (GstVaDisplay * displa
 GstMemory *           gst_va_allocator_alloc              (GstAllocator * allocator);
 gboolean              gst_va_allocator_prepare_buffer     (GstAllocator * allocator,
                                                            GstBuffer * buffer);
+gboolean              gst_va_allocator_wait_for_memory    (GstAllocator * allocator,
+                                                           GstBuffer * buffer);
 void                  gst_va_allocator_flush              (GstAllocator * allocator);
 gboolean              gst_va_allocator_set_format         (GstAllocator * allocator,
                                                            GstVideoInfo * info,
index 2a54701..31753ef 100644 (file)
@@ -309,11 +309,28 @@ gst_va_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
   if (GST_IS_VA_DMABUF_ALLOCATOR (vpool->allocator)) {
     if (gst_va_dmabuf_allocator_prepare_buffer (vpool->allocator, *buffer))
       return GST_FLOW_OK;
+
+    if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT))
+      return GST_FLOW_EOS;
+    if (!gst_va_dmabuf_allocator_wait_for_memory (vpool->allocator, *buffer))
+      goto flushing;
+
+    return GST_FLOW_OK;
   } else if (GST_IS_VA_ALLOCATOR (vpool->allocator)) {
     if (gst_va_allocator_prepare_buffer (vpool->allocator, *buffer))
       return GST_FLOW_OK;
+
+    if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT))
+      return GST_FLOW_EOS;
+    if (!gst_va_allocator_wait_for_memory (vpool->allocator, *buffer))
+      goto flushing;
+
+    return GST_FLOW_OK;
   }
 
+  return GST_FLOW_ERROR;
+
+flushing:
   gst_buffer_replace (buffer, NULL);
   return GST_FLOW_FLUSHING;
 }