From 48b9fa2c24ba3bde9cfc04144accca2d95244594 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Olivier=20Cr=C3=AAte?= Date: Wed, 26 Sep 2012 16:00:39 -0400 Subject: [PATCH] shmsink: Add custom allocator to allow for zero-copy shared memory use --- sys/shm/gstshmsink.c | 445 ++++++++++++++++++++++++++++++++++++++++----------- sys/shm/gstshmsink.h | 5 + sys/shm/gstshmsrc.c | 8 +- sys/shm/shmpipe.c | 41 +++-- sys/shm/shmpipe.h | 21 ++- 5 files changed, 402 insertions(+), 118 deletions(-) diff --git a/sys/shm/gstshmsink.c b/sys/shm/gstshmsink.c index 7529016..bf64200 100644 --- a/sys/shm/gstshmsink.c +++ b/sys/shm/gstshmsink.c @@ -95,11 +95,251 @@ static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf); static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event); static gboolean gst_shm_sink_unlock (GstBaseSink * bsink); static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink); +static gboolean gst_shm_sink_propose_allocation (GstBaseSink * sink, + GstQuery * query); static gpointer pollthread_func (gpointer data); static guint signals[LAST_SIGNAL] = { 0 }; + + +/******************** + * CUSTOM ALLOCATOR * + ********************/ + +#define GST_TYPE_SHM_SINK_ALLOCATOR \ + (gst_shm_sink_allocator_get_type()) +#define GST_SHM_SINK_ALLOCATOR(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK_ALLOCATOR, \ + GstShmSinkAllocator)) +#define GST_SHM_SINK_ALLOCATOR_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK_ALLOCATOR, \ + GstShmSinkAllocatorClass)) +#define GST_IS_SHM_SINK_ALLOCATOR(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK_ALLOCATOR)) +#define GST_IS_SHM_SINK_ALLOCATOR_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK_ALLOCATOR)) + +struct _GstShmSinkAllocator +{ + GstAllocator parent; + + GstShmSink *sink; +}; + +typedef struct _GstShmSinkAllocatorClass +{ + GstAllocatorClass parent; +} GstShmSinkAllocatorClass; + +typedef struct _GstShmSinkMemory +{ + GstMemory mem; + + gchar *data; + GstShmSink *sink; + ShmBlock *block; +} GstShmSinkMemory; + +GType gst_shm_sink_allocator_get_type (void); + +G_DEFINE_TYPE (GstShmSinkAllocator, gst_shm_sink_allocator, GST_TYPE_ALLOCATOR); + +static void +gst_shm_sink_allocator_dispose (GObject * object) +{ + GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (object); + + if (self->sink) + gst_object_unref (self->sink); + self->sink = NULL; + + G_OBJECT_CLASS (gst_shm_sink_allocator_parent_class)->dispose (object); +} + +static void +gst_shm_sink_allocator_free (GstAllocator * allocator, GstMemory * mem) +{ + GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem; + + if (mymem->block) { + GST_OBJECT_LOCK (mymem->sink); + sp_writer_free_block (mymem->block); + GST_OBJECT_UNLOCK (mymem->sink); + gst_object_unref (mymem->sink); + } + gst_object_unref (mem->allocator); + + g_slice_free (GstShmSinkMemory, mymem); +} + +static gpointer +gst_shm_sink_allocator_mem_map (GstMemory * mem, gsize maxsize, + GstMapFlags flags) +{ + GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem; + + return mymem->data; +} + +static void +gst_shm_sink_allocator_mem_unmap (GstMemory * mem) +{ +} + +static GstMemory * +gst_shm_sink_allocator_mem_share (GstMemory * mem, gssize offset, gssize size) +{ + GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem; + GstShmSinkMemory *mysub; + GstMemory *parent; + + /* find the real parent */ + if ((parent = mem->parent) == NULL) + parent = mem; + + if (size == -1) + size = mem->size - offset; + + mysub = g_slice_new0 (GstShmSinkMemory); + /* the shared memory is always readonly */ + gst_memory_init (GST_MEMORY_CAST (mysub), GST_MINI_OBJECT_FLAGS (parent) | + GST_MINI_OBJECT_FLAG_LOCK_READONLY, gst_object_ref (mem->allocator), + parent, mem->maxsize, mem->align, mem->offset + offset, size); + mysub->data = mymem->data; + + return (GstMemory *) mysub; +} + +static gboolean +gst_shm_sink_allocator_mem_is_span (GstMemory * mem1, GstMemory * mem2, + gsize * offset) +{ + GstShmSinkMemory *mymem1 = (GstShmSinkMemory *) mem1; + GstShmSinkMemory *mymem2 = (GstShmSinkMemory *) mem2; + + if (offset) { + GstMemory *parent; + + parent = mem1->parent; + + *offset = mem1->offset - parent->offset; + } + + /* and memory is contiguous */ + return mymem1->data + mem1->offset + mem1->size == + mymem2->data + mem2->offset; +} + +static void +gst_shm_sink_allocator_init (GstShmSinkAllocator * self) +{ + GstAllocator *allocator = GST_ALLOCATOR (self); + + allocator->mem_map = gst_shm_sink_allocator_mem_map; + allocator->mem_unmap = gst_shm_sink_allocator_mem_unmap; + allocator->mem_share = gst_shm_sink_allocator_mem_share; + allocator->mem_is_span = gst_shm_sink_allocator_mem_is_span; +} + + +static GstMemory * +gst_shm_sink_allocator_alloc_locked (GstShmSinkAllocator * self, gsize size, + GstAllocationParams * params) +{ + GstMemory *memory = NULL; + ShmBlock *block = NULL; + gsize maxsize = size + params->prefix + params->padding; + gsize align = params->align; + + /* ensure configured alignment */ + align |= gst_memory_alignment; + /* allocate more to compensate for alignment */ + maxsize += align; + + block = sp_writer_alloc_block (self->sink->pipe, size); + if (block) { + GstShmSinkMemory *mymem; + gsize aoffset, padding; + + GST_LOG_OBJECT (self, "Allocated block %p with %u bytes at %p", + block, size, sp_writer_block_get_buf (block)); + + mymem = g_slice_new0 (GstShmSinkMemory); + memory = GST_MEMORY_CAST (mymem); + mymem->data = sp_writer_block_get_buf (block); + mymem->sink = gst_object_ref (self->sink); + mymem->block = block; + + /* do alignment */ + if ((aoffset = ((guintptr) mymem->data & align))) { + aoffset = (align + 1) - aoffset; + mymem->data += aoffset; + maxsize -= aoffset; + } + + if (params->prefix && (params->flags & GST_MEMORY_FLAG_ZERO_PREFIXED)) + memset (mymem->data, 0, params->prefix); + + padding = maxsize - (params->prefix + size); + if (padding && (params->flags & GST_MEMORY_FLAG_ZERO_PADDED)) + memset (mymem->data + params->prefix + size, 0, padding); + + gst_memory_init (memory, params->flags, g_object_ref (self), NULL, + maxsize, align, params->prefix, size); + } + + return memory; +} + +static GstMemory * +gst_shm_sink_allocator_alloc (GstAllocator * allocator, gsize size, + GstAllocationParams * params) +{ + GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (allocator); + GstMemory *memory = NULL; + + GST_OBJECT_LOCK (self->sink); + memory = gst_shm_sink_allocator_alloc_locked (self, size, params); + GST_OBJECT_UNLOCK (self->sink); + + if (!memory) { + memory = gst_allocator_alloc (NULL, size, params); + GST_LOG_OBJECT (self, "Not enough shared memory for GstMemory of %u bytes, " + "allocating using standard allocator", size); + } + + return memory; +} + + +static void +gst_shm_sink_allocator_class_init (GstShmSinkAllocatorClass * klass) +{ + GstAllocatorClass *allocator_class = GST_ALLOCATOR_CLASS (klass); + GObjectClass *object_class = G_OBJECT_CLASS (klass); + + allocator_class->alloc = gst_shm_sink_allocator_alloc; + allocator_class->free = gst_shm_sink_allocator_free; + object_class->dispose = gst_shm_sink_allocator_dispose; +} + +static GstShmSinkAllocator * +gst_shm_sink_allocator_new (GstShmSink * sink) +{ + GstShmSinkAllocator *self = g_object_new (GST_TYPE_SHM_SINK_ALLOCATOR, NULL); + + self->sink = gst_object_ref (sink); + + return self; +} + + +/*************** + * MAIN OBJECT * + ***************/ + static void gst_shm_sink_init (GstShmSink * self) { @@ -107,6 +347,8 @@ gst_shm_sink_init (GstShmSink * self) self->size = DEFAULT_SIZE; self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION; self->perms = DEFAULT_PERMS; + + gst_allocation_params_init (&self->params); } static void @@ -130,6 +372,8 @@ gst_shm_sink_class_init (GstShmSinkClass * klass) gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event); gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock); gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop); + gstbasesink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_shm_sink_propose_allocation); g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, g_param_spec_string ("socket-path", @@ -225,12 +469,18 @@ gst_shm_sink_set_property (GObject * object, guint prop_id, case PROP_SHM_SIZE: GST_OBJECT_LOCK (object); if (self->pipe) { - if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) + if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) { + /* Swap allocators, so we can know immediately if the memory is + * ours */ + gst_object_unref (self->allocator); + self->allocator = gst_shm_sink_allocator_new (self); + GST_DEBUG_OBJECT (self, "Resized shared memory area from %u to " "%u bytes", self->size, g_value_get_uint (value)); - else + } else { GST_WARNING_OBJECT (self, "Could not resize shared memory area from" "%u to %u bytes", self->size, g_value_get_uint (value)); + } } self->size = g_value_get_uint (value); GST_OBJECT_UNLOCK (object); @@ -329,11 +579,13 @@ gst_shm_sink_start (GstBaseSink * bsink) if (!self->pollthread) goto thread_error; + self->allocator = gst_shm_sink_allocator_new (self); + return TRUE; thread_error: - sp_close (self->pipe); + sp_writer_close (self->pipe, NULL, NULL); self->pipe = NULL; gst_poll_free (self->poll); @@ -352,6 +604,10 @@ gst_shm_sink_stop (GstBaseSink * bsink) self->stop = TRUE; gst_poll_set_flushing (self->poll, TRUE); + if (self->allocator) + gst_object_unref (self->allocator); + self->allocator = NULL; + g_thread_join (self->pollthread); self->pollthread = NULL; @@ -360,7 +616,8 @@ gst_shm_sink_stop (GstBaseSink * bsink) while (self->clients) { struct GstShmClient *client = self->clients->data; self->clients = g_list_remove (self->clients, client); - sp_writer_close_client (self->pipe, client->client); + sp_writer_close_client (self->pipe, client->client, + (sp_buffer_free_callback) gst_buffer_unref, NULL); g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0, client->pollfd.fd); g_slice_free (struct GstShmClient, client); @@ -369,7 +626,7 @@ gst_shm_sink_stop (GstBaseSink * bsink) gst_poll_free (self->poll); self->poll = NULL; - sp_close (self->pipe); + sp_writer_close (self->pipe, NULL, NULL); self->pipe = NULL; return TRUE; @@ -385,8 +642,8 @@ gst_shm_sink_can_render (GstShmSink * self, GstClockTime time) b = sp_writer_get_pending_buffers (self->pipe); for (; b != NULL; b = sp_writer_get_next_buffer (b)) { - GstClockTime t = sp_writer_buf_get_tag (b); - if (GST_CLOCK_DIFF (time, t) > self->buffer_time) + GstBuffer *buf = sp_writer_buf_get_tag (b); + if (GST_CLOCK_DIFF (time, GST_BUFFER_PTS (buf)) > self->buffer_time) return FALSE; } @@ -397,38 +654,44 @@ static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) { GstShmSink *self = GST_SHM_SINK (bsink); - int rv; + int rv = 0; GstMapInfo map; + gboolean need_new_memory = FALSE; + GstFlowReturn ret = GST_FLOW_OK; + GstMemory *memory = NULL; + GstBuffer *sendbuf = NULL; GST_OBJECT_LOCK (self); while (self->wait_for_connection && !self->clients) { g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); - if (self->unlock) { - GST_OBJECT_UNLOCK (self); - return GST_FLOW_FLUSHING; - } + if (self->unlock) + goto flushing; } while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) { g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); - if (self->unlock) { - GST_OBJECT_UNLOCK (self); - return GST_FLOW_FLUSHING; - } + if (self->unlock) + goto flushing; } - gst_buffer_map (buf, &map, GST_MAP_READ); - rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, - GST_BUFFER_TIMESTAMP (buf)); - gst_buffer_unmap (buf, &map); - if (rv == -1) { - ShmBlock *block = NULL; - gchar *shmbuf = NULL; + if (gst_buffer_n_memory (buf) > 1) { + GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single" + " one, need to do a memcpy", buf, gst_buffer_n_memory (buf)); + need_new_memory = TRUE; + } else { + memory = gst_buffer_peek_memory (buf, 0); + if (memory->allocator != GST_ALLOCATOR (self->allocator)) { + need_new_memory = TRUE; + GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by " + "%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator); + } + } + + if (need_new_memory) { if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) { gsize area_size = sp_writer_get_max_buf_size (self->pipe); - GST_OBJECT_UNLOCK (self); GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT, ("Shared memory area is too small"), @@ -438,95 +701,70 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) return GST_FLOW_ERROR; } - while ((block = sp_writer_alloc_block (self->pipe, - gst_buffer_get_size (buf))) == NULL) { + while ((memory = + gst_shm_sink_allocator_alloc_locked (self->allocator, + gst_buffer_get_size (buf), &self->params)) == NULL) { g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); - if (self->unlock) { - GST_OBJECT_UNLOCK (self); - return GST_FLOW_FLUSHING; - } + if (self->unlock) + goto flushing; } + while (self->wait_for_connection && !self->clients) { g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self)); if (self->unlock) { - sp_writer_free_block (block); + gst_memory_unref (memory); GST_OBJECT_UNLOCK (self); return GST_FLOW_FLUSHING; } } - shmbuf = sp_writer_block_get_buf (block); - gst_buffer_extract (buf, 0, shmbuf, gst_buffer_get_size (buf)); - sp_writer_send_buf (self->pipe, shmbuf, gst_buffer_get_size (buf), - GST_BUFFER_TIMESTAMP (buf)); - sp_writer_free_block (block); + gst_memory_map (memory, &map, GST_MAP_WRITE); + gst_buffer_extract (buf, 0, map.data, map.size); + gst_memory_unmap (memory, &map); + + sendbuf = gst_buffer_new (); + gst_buffer_copy_into (sendbuf, buf, GST_BUFFER_COPY_METADATA, 0, -1); + gst_buffer_append_memory (sendbuf, memory); + } else { + sendbuf = gst_buffer_ref (buf); } - GST_OBJECT_UNLOCK (self); + gst_buffer_map (sendbuf, &map, GST_MAP_READ); + /* Make the memory readonly as of now as we've sent it to the other side + * We know it's not mapped for writing anywhere as we just mapped it for + * reading + */ - return GST_FLOW_OK; -} + rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf); -#if 0 + gst_buffer_unmap (sendbuf, &map); -/* FIXME 0.11 implement some bufferpool support */ + GST_OBJECT_UNLOCK (self); -static void -gst_shm_sink_free_buffer (gpointer data) -{ - ShmPipe *pipe; - ShmBlock *block = data; - GstShmSink *self; + if (rv == -1) { + GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Invalid allocated buffer"), + ("The shmpipe library rejects our buffer, this is a bug")); + ret = GST_FLOW_ERROR; + } - pipe = sp_writer_block_get_pipe (block); - self = sp_get_data (pipe); + /* If we allocated our own memory, then unmap it */ - GST_OBJECT_LOCK (self); - sp_writer_free_block (block); + return ret; + +flushing: GST_OBJECT_UNLOCK (self); - g_object_unref (self); + return GST_FLOW_FLUSHING; } -static GstFlowReturn -gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, - GstCaps * caps, GstBuffer ** out_buf) +static void +free_buffer_locked (GstBuffer * buffer, void *data) { - GstShmSink *self = GST_SHM_SINK (sink); - GstBuffer *buffer; - ShmBlock *block = NULL; - gpointer buf = NULL; - - GST_OBJECT_LOCK (self); - block = sp_writer_alloc_block (self->pipe, size); - if (block) { - buf = sp_writer_block_get_buf (block); - g_object_ref (self); - } - GST_OBJECT_UNLOCK (self); - - if (block) { - buffer = gst_buffer_new (); - GST_BUFFER_DATA (buffer) = buf; - GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block; - GST_BUFFER_FREE_FUNC (buffer) = - GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer); - GST_BUFFER_SIZE (buffer) = size; - GST_LOG_OBJECT (self, - "Allocated buffer of %u bytes from shared memory at %p", size, buf); - } else { - buffer = gst_buffer_new_and_alloc (size); - GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %u bytes, " - "allocating using standard allocator", size); - } - - GST_BUFFER_OFFSET (buffer) = offset; - gst_buffer_set_caps (buffer, caps); + GSList **list = data; - *out_buf = buffer; + g_assert (buffer != NULL); - return GST_FLOW_OK; + *list = g_slist_prepend (*list, buffer); } -#endif static gpointer pollthread_func (gpointer data) @@ -605,9 +843,10 @@ pollthread_func (gpointer data) if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) { int rv; + gpointer tag = NULL; GST_OBJECT_LOCK (self); - rv = sp_writer_recv (self->pipe, gclient->client); + rv = sp_writer_recv (self->pipe, gclient->client, &tag); GST_OBJECT_UNLOCK (self); if (rv < 0) { @@ -615,12 +854,22 @@ pollthread_func (gpointer data) " closing (retval: %d errno: %d)", rv, errno); goto close_client; } + + g_assert (rv == 0 || tag == NULL); + + if (rv == 0) + gst_buffer_unref (tag); } continue; close_client: - GST_OBJECT_LOCK (self); - sp_writer_close_client (self->pipe, gclient->client); - GST_OBJECT_UNLOCK (self); + { + GSList *list = NULL; + GST_OBJECT_LOCK (self); + sp_writer_close_client (self->pipe, gclient->client, + (sp_buffer_free_callback) free_buffer_locked, (void **) &list); + GST_OBJECT_UNLOCK (self); + g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref); + } gst_poll_remove_fd (self->poll, &gclient->pollfd); self->clients = g_list_remove (self->clients, gclient); @@ -683,3 +932,15 @@ gst_shm_sink_unlock_stop (GstBaseSink * bsink) return TRUE; } + +static gboolean +gst_shm_sink_propose_allocation (GstBaseSink * sink, GstQuery * query) +{ + GstShmSink *self = GST_SHM_SINK (sink); + + if (self->allocator) + gst_query_add_allocation_param (query, GST_ALLOCATOR (self->allocator), + NULL); + + return TRUE; +} diff --git a/sys/shm/gstshmsink.h b/sys/shm/gstshmsink.h index ca1ee12..a514096 100644 --- a/sys/shm/gstshmsink.h +++ b/sys/shm/gstshmsink.h @@ -40,6 +40,7 @@ G_BEGIN_DECLS (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK)) typedef struct _GstShmSink GstShmSink; typedef struct _GstShmSinkClass GstShmSinkClass; +typedef struct _GstShmSinkAllocator GstShmSinkAllocator; struct _GstShmSink { @@ -64,6 +65,10 @@ struct _GstShmSink GstClockTimeDiff buffer_time; GCond cond; + + GstShmSinkAllocator *allocator; + + GstAllocationParams params; }; struct _GstShmSinkClass diff --git a/sys/shm/gstshmsrc.c b/sys/shm/gstshmsrc.c index 198a0b9..d8ee560 100644 --- a/sys/shm/gstshmsrc.c +++ b/sys/shm/gstshmsrc.c @@ -357,10 +357,8 @@ gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) gsb->pipe = self->pipe; gst_shm_pipe_inc (self->pipe); - *outbuf = gst_buffer_new (); - gst_buffer_append_memory (*outbuf, - gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, - buf, rv, 0, rv, gsb, free_buffer)); + *outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, + buf, rv, 0, rv, gsb, free_buffer); return GST_FLOW_OK; } @@ -445,7 +443,7 @@ gst_shm_pipe_dec (GstShmPipe * pipe) } if (pipe->pipe) - sp_close (pipe->pipe); + sp_client_close (pipe->pipe); GST_OBJECT_UNLOCK (pipe->src); gst_object_unref (pipe->src); diff --git a/sys/shm/shmpipe.c b/sys/shm/shmpipe.c index 6c7b80b..677113b 100644 --- a/sys/shm/shmpipe.c +++ b/sys/shm/shmpipe.c @@ -118,7 +118,7 @@ struct _ShmBuffer int num_clients; int clients[0]; - uint64_t tag; + void *tag; }; @@ -183,14 +183,14 @@ struct CommandBuffer static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size); static void sp_close_shm (ShmArea * area); static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, - ShmBuffer * prev_buf, ShmClient * client); + ShmBuffer * prev_buf, ShmClient * client, void **tag); static void sp_shm_area_dec (ShmPipe * self, ShmArea * area); #define RETURN_ERROR(format, ...) do { \ fprintf (stderr, format, __VA_ARGS__); \ - sp_close (self); \ + sp_writer_close (self, NULL, NULL); \ return NULL; \ } while (0) @@ -424,7 +424,8 @@ sp_dec (ShmPipe * self) } void -sp_close (ShmPipe * self) +sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback, + void *user_data) { if (self->main_socket >= 0) close (self->main_socket); @@ -435,11 +436,18 @@ sp_close (ShmPipe * self) } while (self->clients) - sp_writer_close_client (self, self->clients); + sp_writer_close_client (self, self->clients, callback, user_data); sp_dec (self); } +void +sp_client_close (ShmPipe * self) +{ + sp_writer_close (self, NULL, NULL); +} + + int sp_writer_setperms_shm (ShmPipe * self, mode_t perms) { @@ -560,7 +568,7 @@ sp_writer_free_block (ShmBlock * block) /* Returns the number of client this has successfully been sent to */ int -sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag) +sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag) { ShmArea *area = NULL; unsigned long offset = 0; @@ -699,7 +707,7 @@ sp_client_recv (ShmPipe * self, char **buf) } int -sp_writer_recv (ShmPipe * self, ShmClient * client) +sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag) { ShmBuffer *buf = NULL, *prev_buf = NULL; struct CommandBuffer cb; @@ -713,7 +721,7 @@ sp_writer_recv (ShmPipe * self, ShmClient * client) for (buf = self->buffers; buf; buf = buf->next) { if (buf->shm_area->id == cb.area_id && buf->offset == cb.payload.ack_buffer.offset) { - sp_shmbuf_dec (self, buf, prev_buf, client); + return sp_shmbuf_dec (self, buf, prev_buf, client, tag); break; } prev_buf = buf; @@ -786,7 +794,7 @@ sp_client_open (const char *path) return self; error: - sp_close (self); + sp_client_close (self); return NULL; } @@ -837,7 +845,7 @@ error: static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf, - ShmClient * client) + ShmClient * client, void **tag) { int i; int had_client = 0; @@ -866,6 +874,8 @@ sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf, else self->buffers = buf->next; + if (tag) + *tag = buf->tag; shm_alloc_space_block_dec (buf->ablock); sp_shm_area_dec (self, buf->shm_area); spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf); @@ -875,7 +885,8 @@ sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf, } void -sp_writer_close_client (ShmPipe * self, ShmClient * client) +sp_writer_close_client (ShmPipe * self, ShmClient * client, + sp_buffer_free_callback callback, void *user_data) { ShmBuffer *buffer = NULL, *prev_buf = NULL; ShmClient *item = NULL, *prev_item = NULL; @@ -885,11 +896,15 @@ sp_writer_close_client (ShmPipe * self, ShmClient * client) again: for (buffer = self->buffers; buffer; buffer = buffer->next) { int i; + void *tag = NULL; for (i = 0; i < buffer->num_clients; i++) { if (buffer->clients[i] == client->fd) { - if (!sp_shmbuf_dec (self, buffer, prev_buf, client)) + if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) { + if (callback) + callback (tag, user_data); goto again; + } break; } } @@ -949,7 +964,7 @@ sp_writer_get_next_buffer (ShmBuffer * buffer) return buffer->next; } -uint64_t +void * sp_writer_buf_get_tag (ShmBuffer * buffer) { return buffer->tag; diff --git a/sys/shm/shmpipe.h b/sys/shm/shmpipe.h index ecac432..6d75629 100644 --- a/sys/shm/shmpipe.h +++ b/sys/shm/shmpipe.h @@ -78,9 +78,12 @@ typedef struct _ShmPipe ShmPipe; typedef struct _ShmBlock ShmBlock; typedef struct _ShmBuffer ShmBuffer; +typedef void (*sp_buffer_free_callback) (void * tag, void * user_data); + ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms); const char *sp_writer_get_path (ShmPipe *pipe); -void sp_close (ShmPipe * self); +void sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback, + void * user_data); void *sp_get_data (ShmPipe * self); void sp_set_data (ShmPipe * self, void *data); @@ -92,24 +95,26 @@ int sp_writer_get_client_fd (ShmClient * client); ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size); void sp_writer_free_block (ShmBlock *block); -int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag); +int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag); char *sp_writer_block_get_buf (ShmBlock *block); ShmPipe *sp_writer_block_get_pipe (ShmBlock *block); size_t sp_writer_get_max_buf_size (ShmPipe * self); ShmClient * sp_writer_accept_client (ShmPipe * self); -void sp_writer_close_client (ShmPipe *self, ShmClient * client); -int sp_writer_recv (ShmPipe * self, ShmClient * client); +void sp_writer_close_client (ShmPipe *self, ShmClient * client, + sp_buffer_free_callback callback, void * user_data); +int sp_writer_recv (ShmPipe * self, ShmClient * client, void ** tag); int sp_writer_pending_writes (ShmPipe * self); +ShmBuffer *sp_writer_get_pending_buffers (ShmPipe * self); +ShmBuffer *sp_writer_get_next_buffer (ShmBuffer * buffer); +void *sp_writer_buf_get_tag (ShmBuffer * buffer); + ShmPipe *sp_client_open (const char *path); long int sp_client_recv (ShmPipe * self, char **buf); int sp_client_recv_finish (ShmPipe * self, char *buf); - -ShmBuffer *sp_writer_get_pending_buffers (ShmPipe * self); -ShmBuffer *sp_writer_get_next_buffer (ShmBuffer * buffer); -uint64_t sp_writer_buf_get_tag (ShmBuffer * buffer); +void sp_client_close (ShmPipe * self); #ifdef __cplusplus } -- 2.7.4