shmsink: Add custom allocator to allow for zero-copy shared memory use
authorOlivier Crête <olivier.crete@collabora.com>
Wed, 26 Sep 2012 20:00:39 +0000 (16:00 -0400)
committerOlivier Crête <olivier.crete@collabora.com>
Thu, 28 Feb 2013 23:46:02 +0000 (18:46 -0500)
sys/shm/gstshmsink.c
sys/shm/gstshmsink.h
sys/shm/gstshmsrc.c
sys/shm/shmpipe.c
sys/shm/shmpipe.h

index 7529016..bf64200 100644 (file)
@@ -95,11 +95,251 @@ static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf);
 static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
 static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
 static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink);
+static gboolean gst_shm_sink_propose_allocation (GstBaseSink * sink,
+    GstQuery * query);
 
 static gpointer pollthread_func (gpointer data);
 
 static guint signals[LAST_SIGNAL] = { 0 };
 
+
+
+/********************
+ * CUSTOM ALLOCATOR *
+ ********************/
+
+#define GST_TYPE_SHM_SINK_ALLOCATOR \
+  (gst_shm_sink_allocator_get_type())
+#define GST_SHM_SINK_ALLOCATOR(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK_ALLOCATOR, \
+      GstShmSinkAllocator))
+#define GST_SHM_SINK_ALLOCATOR_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK_ALLOCATOR, \
+      GstShmSinkAllocatorClass))
+#define GST_IS_SHM_SINK_ALLOCATOR(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK_ALLOCATOR))
+#define GST_IS_SHM_SINK_ALLOCATOR_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK_ALLOCATOR))
+
+struct _GstShmSinkAllocator
+{
+  GstAllocator parent;
+
+  GstShmSink *sink;
+};
+
+typedef struct _GstShmSinkAllocatorClass
+{
+  GstAllocatorClass parent;
+} GstShmSinkAllocatorClass;
+
+typedef struct _GstShmSinkMemory
+{
+  GstMemory mem;
+
+  gchar *data;
+  GstShmSink *sink;
+  ShmBlock *block;
+} GstShmSinkMemory;
+
+GType gst_shm_sink_allocator_get_type (void);
+
+G_DEFINE_TYPE (GstShmSinkAllocator, gst_shm_sink_allocator, GST_TYPE_ALLOCATOR);
+
+static void
+gst_shm_sink_allocator_dispose (GObject * object)
+{
+  GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (object);
+
+  if (self->sink)
+    gst_object_unref (self->sink);
+  self->sink = NULL;
+
+  G_OBJECT_CLASS (gst_shm_sink_allocator_parent_class)->dispose (object);
+}
+
+static void
+gst_shm_sink_allocator_free (GstAllocator * allocator, GstMemory * mem)
+{
+  GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
+
+  if (mymem->block) {
+    GST_OBJECT_LOCK (mymem->sink);
+    sp_writer_free_block (mymem->block);
+    GST_OBJECT_UNLOCK (mymem->sink);
+    gst_object_unref (mymem->sink);
+  }
+  gst_object_unref (mem->allocator);
+
+  g_slice_free (GstShmSinkMemory, mymem);
+}
+
+static gpointer
+gst_shm_sink_allocator_mem_map (GstMemory * mem, gsize maxsize,
+    GstMapFlags flags)
+{
+  GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
+
+  return mymem->data;
+}
+
+static void
+gst_shm_sink_allocator_mem_unmap (GstMemory * mem)
+{
+}
+
+static GstMemory *
+gst_shm_sink_allocator_mem_share (GstMemory * mem, gssize offset, gssize size)
+{
+  GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
+  GstShmSinkMemory *mysub;
+  GstMemory *parent;
+
+  /* find the real parent */
+  if ((parent = mem->parent) == NULL)
+    parent = mem;
+
+  if (size == -1)
+    size = mem->size - offset;
+
+  mysub = g_slice_new0 (GstShmSinkMemory);
+  /* the shared memory is always readonly */
+  gst_memory_init (GST_MEMORY_CAST (mysub), GST_MINI_OBJECT_FLAGS (parent) |
+      GST_MINI_OBJECT_FLAG_LOCK_READONLY, gst_object_ref (mem->allocator),
+      parent, mem->maxsize, mem->align, mem->offset + offset, size);
+  mysub->data = mymem->data;
+
+  return (GstMemory *) mysub;
+}
+
+static gboolean
+gst_shm_sink_allocator_mem_is_span (GstMemory * mem1, GstMemory * mem2,
+    gsize * offset)
+{
+  GstShmSinkMemory *mymem1 = (GstShmSinkMemory *) mem1;
+  GstShmSinkMemory *mymem2 = (GstShmSinkMemory *) mem2;
+
+  if (offset) {
+    GstMemory *parent;
+
+    parent = mem1->parent;
+
+    *offset = mem1->offset - parent->offset;
+  }
+
+  /* and memory is contiguous */
+  return mymem1->data + mem1->offset + mem1->size ==
+      mymem2->data + mem2->offset;
+}
+
+static void
+gst_shm_sink_allocator_init (GstShmSinkAllocator * self)
+{
+  GstAllocator *allocator = GST_ALLOCATOR (self);
+
+  allocator->mem_map = gst_shm_sink_allocator_mem_map;
+  allocator->mem_unmap = gst_shm_sink_allocator_mem_unmap;
+  allocator->mem_share = gst_shm_sink_allocator_mem_share;
+  allocator->mem_is_span = gst_shm_sink_allocator_mem_is_span;
+}
+
+
+static GstMemory *
+gst_shm_sink_allocator_alloc_locked (GstShmSinkAllocator * self, gsize size,
+    GstAllocationParams * params)
+{
+  GstMemory *memory = NULL;
+  ShmBlock *block = NULL;
+  gsize maxsize = size + params->prefix + params->padding;
+  gsize align = params->align;
+
+  /* ensure configured alignment */
+  align |= gst_memory_alignment;
+  /* allocate more to compensate for alignment */
+  maxsize += align;
+
+  block = sp_writer_alloc_block (self->sink->pipe, size);
+  if (block) {
+    GstShmSinkMemory *mymem;
+    gsize aoffset, padding;
+
+    GST_LOG_OBJECT (self, "Allocated block %p with %u bytes at %p",
+        block, size, sp_writer_block_get_buf (block));
+
+    mymem = g_slice_new0 (GstShmSinkMemory);
+    memory = GST_MEMORY_CAST (mymem);
+    mymem->data = sp_writer_block_get_buf (block);
+    mymem->sink = gst_object_ref (self->sink);
+    mymem->block = block;
+
+    /* do alignment */
+    if ((aoffset = ((guintptr) mymem->data & align))) {
+      aoffset = (align + 1) - aoffset;
+      mymem->data += aoffset;
+      maxsize -= aoffset;
+    }
+
+    if (params->prefix && (params->flags & GST_MEMORY_FLAG_ZERO_PREFIXED))
+      memset (mymem->data, 0, params->prefix);
+
+    padding = maxsize - (params->prefix + size);
+    if (padding && (params->flags & GST_MEMORY_FLAG_ZERO_PADDED))
+      memset (mymem->data + params->prefix + size, 0, padding);
+
+    gst_memory_init (memory, params->flags, g_object_ref (self), NULL,
+        maxsize, align, params->prefix, size);
+  }
+
+  return memory;
+}
+
+static GstMemory *
+gst_shm_sink_allocator_alloc (GstAllocator * allocator, gsize size,
+    GstAllocationParams * params)
+{
+  GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (allocator);
+  GstMemory *memory = NULL;
+
+  GST_OBJECT_LOCK (self->sink);
+  memory = gst_shm_sink_allocator_alloc_locked (self, size, params);
+  GST_OBJECT_UNLOCK (self->sink);
+
+  if (!memory) {
+    memory = gst_allocator_alloc (NULL, size, params);
+    GST_LOG_OBJECT (self, "Not enough shared memory for GstMemory of %u bytes, "
+        "allocating using standard allocator", size);
+  }
+
+  return memory;
+}
+
+
+static void
+gst_shm_sink_allocator_class_init (GstShmSinkAllocatorClass * klass)
+{
+  GstAllocatorClass *allocator_class = GST_ALLOCATOR_CLASS (klass);
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+  allocator_class->alloc = gst_shm_sink_allocator_alloc;
+  allocator_class->free = gst_shm_sink_allocator_free;
+  object_class->dispose = gst_shm_sink_allocator_dispose;
+}
+
+static GstShmSinkAllocator *
+gst_shm_sink_allocator_new (GstShmSink * sink)
+{
+  GstShmSinkAllocator *self = g_object_new (GST_TYPE_SHM_SINK_ALLOCATOR, NULL);
+
+  self->sink = gst_object_ref (sink);
+
+  return self;
+}
+
+
+/***************
+ * MAIN OBJECT *
+ ***************/
+
 static void
 gst_shm_sink_init (GstShmSink * self)
 {
@@ -107,6 +347,8 @@ gst_shm_sink_init (GstShmSink * self)
   self->size = DEFAULT_SIZE;
   self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION;
   self->perms = DEFAULT_PERMS;
+
+  gst_allocation_params_init (&self->params);
 }
 
 static void
@@ -130,6 +372,8 @@ gst_shm_sink_class_init (GstShmSinkClass * klass)
   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
   gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop);
+  gstbasesink_class->propose_allocation =
+      GST_DEBUG_FUNCPTR (gst_shm_sink_propose_allocation);
 
   g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
       g_param_spec_string ("socket-path",
@@ -225,12 +469,18 @@ gst_shm_sink_set_property (GObject * object, guint prop_id,
     case PROP_SHM_SIZE:
       GST_OBJECT_LOCK (object);
       if (self->pipe) {
-        if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0)
+        if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) {
+          /* Swap allocators, so we can know immediately if the memory is
+           * ours */
+          gst_object_unref (self->allocator);
+          self->allocator = gst_shm_sink_allocator_new (self);
+
           GST_DEBUG_OBJECT (self, "Resized shared memory area from %u to "
               "%u bytes", self->size, g_value_get_uint (value));
-        else
+        } else {
           GST_WARNING_OBJECT (self, "Could not resize shared memory area from"
               "%u to %u bytes", self->size, g_value_get_uint (value));
+        }
       }
       self->size = g_value_get_uint (value);
       GST_OBJECT_UNLOCK (object);
@@ -329,11 +579,13 @@ gst_shm_sink_start (GstBaseSink * bsink)
   if (!self->pollthread)
     goto thread_error;
 
+  self->allocator = gst_shm_sink_allocator_new (self);
+
   return TRUE;
 
 thread_error:
 
-  sp_close (self->pipe);
+  sp_writer_close (self->pipe, NULL, NULL);
   self->pipe = NULL;
   gst_poll_free (self->poll);
 
@@ -352,6 +604,10 @@ gst_shm_sink_stop (GstBaseSink * bsink)
   self->stop = TRUE;
   gst_poll_set_flushing (self->poll, TRUE);
 
+  if (self->allocator)
+    gst_object_unref (self->allocator);
+  self->allocator = NULL;
+
   g_thread_join (self->pollthread);
   self->pollthread = NULL;
 
@@ -360,7 +616,8 @@ gst_shm_sink_stop (GstBaseSink * bsink)
   while (self->clients) {
     struct GstShmClient *client = self->clients->data;
     self->clients = g_list_remove (self->clients, client);
-    sp_writer_close_client (self->pipe, client->client);
+    sp_writer_close_client (self->pipe, client->client,
+        (sp_buffer_free_callback) gst_buffer_unref, NULL);
     g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
         client->pollfd.fd);
     g_slice_free (struct GstShmClient, client);
@@ -369,7 +626,7 @@ gst_shm_sink_stop (GstBaseSink * bsink)
   gst_poll_free (self->poll);
   self->poll = NULL;
 
-  sp_close (self->pipe);
+  sp_writer_close (self->pipe, NULL, NULL);
   self->pipe = NULL;
 
   return TRUE;
@@ -385,8 +642,8 @@ gst_shm_sink_can_render (GstShmSink * self, GstClockTime time)
 
   b = sp_writer_get_pending_buffers (self->pipe);
   for (; b != NULL; b = sp_writer_get_next_buffer (b)) {
-    GstClockTime t = sp_writer_buf_get_tag (b);
-    if (GST_CLOCK_DIFF (time, t) > self->buffer_time)
+    GstBuffer *buf = sp_writer_buf_get_tag (b);
+    if (GST_CLOCK_DIFF (time, GST_BUFFER_PTS (buf)) > self->buffer_time)
       return FALSE;
   }
 
@@ -397,38 +654,44 @@ static GstFlowReturn
 gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
   GstShmSink *self = GST_SHM_SINK (bsink);
-  int rv;
+  int rv = 0;
   GstMapInfo map;
+  gboolean need_new_memory = FALSE;
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstMemory *memory = NULL;
+  GstBuffer *sendbuf = NULL;
 
   GST_OBJECT_LOCK (self);
   while (self->wait_for_connection && !self->clients) {
     g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
-    if (self->unlock) {
-      GST_OBJECT_UNLOCK (self);
-      return GST_FLOW_FLUSHING;
-    }
+    if (self->unlock)
+      goto flushing;
   }
 
   while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) {
     g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
-    if (self->unlock) {
-      GST_OBJECT_UNLOCK (self);
-      return GST_FLOW_FLUSHING;
-    }
+    if (self->unlock)
+      goto flushing;
   }
 
-  gst_buffer_map (buf, &map, GST_MAP_READ);
-  rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size,
-      GST_BUFFER_TIMESTAMP (buf));
-  gst_buffer_unmap (buf, &map);
 
-  if (rv == -1) {
-    ShmBlock *block = NULL;
-    gchar *shmbuf = NULL;
+  if (gst_buffer_n_memory (buf) > 1) {
+    GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single"
+        " one, need to do a memcpy", buf, gst_buffer_n_memory (buf));
+    need_new_memory = TRUE;
+  } else {
+    memory = gst_buffer_peek_memory (buf, 0);
 
+    if (memory->allocator != GST_ALLOCATOR (self->allocator)) {
+      need_new_memory = TRUE;
+      GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by "
+          "%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator);
+    }
+  }
+
+  if (need_new_memory) {
     if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) {
       gsize area_size = sp_writer_get_max_buf_size (self->pipe);
-
       GST_OBJECT_UNLOCK (self);
       GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT,
           ("Shared memory area is too small"),
@@ -438,95 +701,70 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
       return GST_FLOW_ERROR;
     }
 
-    while ((block = sp_writer_alloc_block (self->pipe,
-                gst_buffer_get_size (buf))) == NULL) {
+    while ((memory =
+            gst_shm_sink_allocator_alloc_locked (self->allocator,
+                gst_buffer_get_size (buf), &self->params)) == NULL) {
       g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
-      if (self->unlock) {
-        GST_OBJECT_UNLOCK (self);
-        return GST_FLOW_FLUSHING;
-      }
+      if (self->unlock)
+        goto flushing;
     }
+
     while (self->wait_for_connection && !self->clients) {
       g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
       if (self->unlock) {
-        sp_writer_free_block (block);
+        gst_memory_unref (memory);
         GST_OBJECT_UNLOCK (self);
         return GST_FLOW_FLUSHING;
       }
     }
 
-    shmbuf = sp_writer_block_get_buf (block);
-    gst_buffer_extract (buf, 0, shmbuf, gst_buffer_get_size (buf));
-    sp_writer_send_buf (self->pipe, shmbuf, gst_buffer_get_size (buf),
-        GST_BUFFER_TIMESTAMP (buf));
-    sp_writer_free_block (block);
+    gst_memory_map (memory, &map, GST_MAP_WRITE);
+    gst_buffer_extract (buf, 0, map.data, map.size);
+    gst_memory_unmap (memory, &map);
+
+    sendbuf = gst_buffer_new ();
+    gst_buffer_copy_into (sendbuf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
+    gst_buffer_append_memory (sendbuf, memory);
+  } else {
+    sendbuf = gst_buffer_ref (buf);
   }
 
-  GST_OBJECT_UNLOCK (self);
+  gst_buffer_map (sendbuf, &map, GST_MAP_READ);
+  /* Make the memory readonly as of now as we've sent it to the other side
+   * We know it's not mapped for writing anywhere as we just mapped it for
+   * reading
+   */
 
-  return GST_FLOW_OK;
-}
+  rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf);
 
-#if 0
+  gst_buffer_unmap (sendbuf, &map);
 
-/* FIXME 0.11 implement some bufferpool support */
+  GST_OBJECT_UNLOCK (self);
 
-static void
-gst_shm_sink_free_buffer (gpointer data)
-{
-  ShmPipe *pipe;
-  ShmBlock *block = data;
-  GstShmSink *self;
+  if (rv == -1) {
+    GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Invalid allocated buffer"),
+        ("The shmpipe library rejects our buffer, this is a bug"));
+    ret = GST_FLOW_ERROR;
+  }
 
-  pipe = sp_writer_block_get_pipe (block);
-  self = sp_get_data (pipe);
+  /* If we allocated our own memory, then unmap it */
 
-  GST_OBJECT_LOCK (self);
-  sp_writer_free_block (block);
+  return ret;
+
+flushing:
   GST_OBJECT_UNLOCK (self);
-  g_object_unref (self);
+  return GST_FLOW_FLUSHING;
 }
 
-static GstFlowReturn
-gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
-    GstCaps * caps, GstBuffer ** out_buf)
+static void
+free_buffer_locked (GstBuffer * buffer, void *data)
 {
-  GstShmSink *self = GST_SHM_SINK (sink);
-  GstBuffer *buffer;
-  ShmBlock *block = NULL;
-  gpointer buf = NULL;
-
-  GST_OBJECT_LOCK (self);
-  block = sp_writer_alloc_block (self->pipe, size);
-  if (block) {
-    buf = sp_writer_block_get_buf (block);
-    g_object_ref (self);
-  }
-  GST_OBJECT_UNLOCK (self);
-
-  if (block) {
-    buffer = gst_buffer_new ();
-    GST_BUFFER_DATA (buffer) = buf;
-    GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block;
-    GST_BUFFER_FREE_FUNC (buffer) =
-        GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer);
-    GST_BUFFER_SIZE (buffer) = size;
-    GST_LOG_OBJECT (self,
-        "Allocated buffer of %u bytes from shared memory at %p", size, buf);
-  } else {
-    buffer = gst_buffer_new_and_alloc (size);
-    GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %u bytes, "
-        "allocating using standard allocator", size);
-  }
-
-  GST_BUFFER_OFFSET (buffer) = offset;
-  gst_buffer_set_caps (buffer, caps);
+  GSList **list = data;
 
-  *out_buf = buffer;
+  g_assert (buffer != NULL);
 
-  return GST_FLOW_OK;
+  *list = g_slist_prepend (*list, buffer);
 }
-#endif
 
 static gpointer
 pollthread_func (gpointer data)
@@ -605,9 +843,10 @@ pollthread_func (gpointer data)
 
       if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) {
         int rv;
+        gpointer tag = NULL;
 
         GST_OBJECT_LOCK (self);
-        rv = sp_writer_recv (self->pipe, gclient->client);
+        rv = sp_writer_recv (self->pipe, gclient->client, &tag);
         GST_OBJECT_UNLOCK (self);
 
         if (rv < 0) {
@@ -615,12 +854,22 @@ pollthread_func (gpointer data)
               " closing (retval: %d errno: %d)", rv, errno);
           goto close_client;
         }
+
+        g_assert (rv == 0 || tag == NULL);
+
+        if (rv == 0)
+          gst_buffer_unref (tag);
       }
       continue;
     close_client:
-      GST_OBJECT_LOCK (self);
-      sp_writer_close_client (self->pipe, gclient->client);
-      GST_OBJECT_UNLOCK (self);
+      {
+        GSList *list = NULL;
+        GST_OBJECT_LOCK (self);
+        sp_writer_close_client (self->pipe, gclient->client,
+            (sp_buffer_free_callback) free_buffer_locked, (void **) &list);
+        GST_OBJECT_UNLOCK (self);
+        g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref);
+      }
 
       gst_poll_remove_fd (self->poll, &gclient->pollfd);
       self->clients = g_list_remove (self->clients, gclient);
@@ -683,3 +932,15 @@ gst_shm_sink_unlock_stop (GstBaseSink * bsink)
 
   return TRUE;
 }
+
+static gboolean
+gst_shm_sink_propose_allocation (GstBaseSink * sink, GstQuery * query)
+{
+  GstShmSink *self = GST_SHM_SINK (sink);
+
+  if (self->allocator)
+    gst_query_add_allocation_param (query, GST_ALLOCATOR (self->allocator),
+        NULL);
+
+  return TRUE;
+}
index ca1ee12..a514096 100644 (file)
@@ -40,6 +40,7 @@ G_BEGIN_DECLS
   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK))
 typedef struct _GstShmSink GstShmSink;
 typedef struct _GstShmSinkClass GstShmSinkClass;
+typedef struct _GstShmSinkAllocator GstShmSinkAllocator;
 
 struct _GstShmSink
 {
@@ -64,6 +65,10 @@ struct _GstShmSink
   GstClockTimeDiff buffer_time;
 
   GCond cond;
+
+  GstShmSinkAllocator *allocator;
+
+  GstAllocationParams params;
 };
 
 struct _GstShmSinkClass
index 198a0b9..d8ee560 100644 (file)
@@ -357,10 +357,8 @@ gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   gsb->pipe = self->pipe;
   gst_shm_pipe_inc (self->pipe);
 
-  *outbuf = gst_buffer_new ();
-  gst_buffer_append_memory (*outbuf,
-      gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
-          buf, rv, 0, rv, gsb, free_buffer));
+  *outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
+      buf, rv, 0, rv, gsb, free_buffer);
 
   return GST_FLOW_OK;
 }
@@ -445,7 +443,7 @@ gst_shm_pipe_dec (GstShmPipe * pipe)
   }
 
   if (pipe->pipe)
-    sp_close (pipe->pipe);
+    sp_client_close (pipe->pipe);
   GST_OBJECT_UNLOCK (pipe->src);
 
   gst_object_unref (pipe->src);
index 6c7b80b..677113b 100644 (file)
@@ -118,7 +118,7 @@ struct _ShmBuffer
   int num_clients;
   int clients[0];
 
-  uint64_t tag;
+  void *tag;
 };
 
 
@@ -183,14 +183,14 @@ struct CommandBuffer
 static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
 static void sp_close_shm (ShmArea * area);
 static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
-    ShmBuffer * prev_buf, ShmClient * client);
+    ShmBuffer * prev_buf, ShmClient * client, void **tag);
 static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
 
 
 
 #define RETURN_ERROR(format, ...) do {                  \
   fprintf (stderr, format, __VA_ARGS__);                \
-  sp_close (self);                                      \
+  sp_writer_close (self, NULL, NULL);                   \
   return NULL;                                          \
   } while (0)
 
@@ -424,7 +424,8 @@ sp_dec (ShmPipe * self)
 }
 
 void
-sp_close (ShmPipe * self)
+sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
+    void *user_data)
 {
   if (self->main_socket >= 0)
     close (self->main_socket);
@@ -435,11 +436,18 @@ sp_close (ShmPipe * self)
   }
 
   while (self->clients)
-    sp_writer_close_client (self, self->clients);
+    sp_writer_close_client (self, self->clients, callback, user_data);
 
   sp_dec (self);
 }
 
+void
+sp_client_close (ShmPipe * self)
+{
+  sp_writer_close (self, NULL, NULL);
+}
+
+
 int
 sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
 {
@@ -560,7 +568,7 @@ sp_writer_free_block (ShmBlock * block)
 /* Returns the number of client this has successfully been sent to */
 
 int
-sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag)
+sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
 {
   ShmArea *area = NULL;
   unsigned long offset = 0;
@@ -699,7 +707,7 @@ sp_client_recv (ShmPipe * self, char **buf)
 }
 
 int
-sp_writer_recv (ShmPipe * self, ShmClient * client)
+sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
 {
   ShmBuffer *buf = NULL, *prev_buf = NULL;
   struct CommandBuffer cb;
@@ -713,7 +721,7 @@ sp_writer_recv (ShmPipe * self, ShmClient * client)
       for (buf = self->buffers; buf; buf = buf->next) {
         if (buf->shm_area->id == cb.area_id &&
             buf->offset == cb.payload.ack_buffer.offset) {
-          sp_shmbuf_dec (self, buf, prev_buf, client);
+          return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
           break;
         }
         prev_buf = buf;
@@ -786,7 +794,7 @@ sp_client_open (const char *path)
   return self;
 
 error:
-  sp_close (self);
+  sp_client_close (self);
   return NULL;
 }
 
@@ -837,7 +845,7 @@ error:
 
 static int
 sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
-    ShmClient * client)
+    ShmClient * client, void **tag)
 {
   int i;
   int had_client = 0;
@@ -866,6 +874,8 @@ sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
     else
       self->buffers = buf->next;
 
+    if (tag)
+      *tag = buf->tag;
     shm_alloc_space_block_dec (buf->ablock);
     sp_shm_area_dec (self, buf->shm_area);
     spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
@@ -875,7 +885,8 @@ sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
 }
 
 void
-sp_writer_close_client (ShmPipe * self, ShmClient * client)
+sp_writer_close_client (ShmPipe * self, ShmClient * client,
+    sp_buffer_free_callback callback, void *user_data)
 {
   ShmBuffer *buffer = NULL, *prev_buf = NULL;
   ShmClient *item = NULL, *prev_item = NULL;
@@ -885,11 +896,15 @@ sp_writer_close_client (ShmPipe * self, ShmClient * client)
 again:
   for (buffer = self->buffers; buffer; buffer = buffer->next) {
     int i;
+    void *tag = NULL;
 
     for (i = 0; i < buffer->num_clients; i++) {
       if (buffer->clients[i] == client->fd) {
-        if (!sp_shmbuf_dec (self, buffer, prev_buf, client))
+        if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
+          if (callback)
+            callback (tag, user_data);
           goto again;
+        }
         break;
       }
     }
@@ -949,7 +964,7 @@ sp_writer_get_next_buffer (ShmBuffer * buffer)
   return buffer->next;
 }
 
-uint64_t
+void *
 sp_writer_buf_get_tag (ShmBuffer * buffer)
 {
   return buffer->tag;
index ecac432..6d75629 100644 (file)
@@ -78,9 +78,12 @@ typedef struct _ShmPipe ShmPipe;
 typedef struct _ShmBlock ShmBlock;
 typedef struct _ShmBuffer ShmBuffer;
 
+typedef void (*sp_buffer_free_callback) (void * tag, void * user_data);
+
 ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms);
 const char *sp_writer_get_path (ShmPipe *pipe);
-void sp_close (ShmPipe * self);
+void sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
+    void * user_data);
 void *sp_get_data (ShmPipe * self);
 void sp_set_data (ShmPipe * self, void *data);
 
@@ -92,24 +95,26 @@ int sp_writer_get_client_fd (ShmClient * client);
 
 ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size);
 void sp_writer_free_block (ShmBlock *block);
-int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag);
+int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag);
 char *sp_writer_block_get_buf (ShmBlock *block);
 ShmPipe *sp_writer_block_get_pipe (ShmBlock *block);
 size_t sp_writer_get_max_buf_size (ShmPipe * self);
 
 ShmClient * sp_writer_accept_client (ShmPipe * self);
-void sp_writer_close_client (ShmPipe *self, ShmClient * client);
-int sp_writer_recv (ShmPipe * self, ShmClient * client);
+void sp_writer_close_client (ShmPipe *self, ShmClient * client,
+    sp_buffer_free_callback callback, void * user_data);
+int sp_writer_recv (ShmPipe * self, ShmClient * client, void ** tag);
 
 int sp_writer_pending_writes (ShmPipe * self);
 
+ShmBuffer *sp_writer_get_pending_buffers (ShmPipe * self);
+ShmBuffer *sp_writer_get_next_buffer (ShmBuffer * buffer);
+void *sp_writer_buf_get_tag (ShmBuffer * buffer);
+
 ShmPipe *sp_client_open (const char *path);
 long int sp_client_recv (ShmPipe * self, char **buf);
 int sp_client_recv_finish (ShmPipe * self, char *buf);
-
-ShmBuffer *sp_writer_get_pending_buffers (ShmPipe * self);
-ShmBuffer *sp_writer_get_next_buffer (ShmBuffer * buffer);
-uint64_t sp_writer_buf_get_tag (ShmBuffer * buffer);
+void sp_client_close (ShmPipe * self);
 
 #ifdef __cplusplus
 }