static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink);
+static gboolean gst_shm_sink_propose_allocation (GstBaseSink * sink,
+ GstQuery * query);
static gpointer pollthread_func (gpointer data);
static guint signals[LAST_SIGNAL] = { 0 };
+
+
+/********************
+ * CUSTOM ALLOCATOR *
+ ********************/
+
+#define GST_TYPE_SHM_SINK_ALLOCATOR \
+ (gst_shm_sink_allocator_get_type())
+#define GST_SHM_SINK_ALLOCATOR(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK_ALLOCATOR, \
+ GstShmSinkAllocator))
+#define GST_SHM_SINK_ALLOCATOR_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK_ALLOCATOR, \
+ GstShmSinkAllocatorClass))
+#define GST_IS_SHM_SINK_ALLOCATOR(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK_ALLOCATOR))
+#define GST_IS_SHM_SINK_ALLOCATOR_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK_ALLOCATOR))
+
+struct _GstShmSinkAllocator
+{
+ GstAllocator parent;
+
+ GstShmSink *sink;
+};
+
+typedef struct _GstShmSinkAllocatorClass
+{
+ GstAllocatorClass parent;
+} GstShmSinkAllocatorClass;
+
+typedef struct _GstShmSinkMemory
+{
+ GstMemory mem;
+
+ gchar *data;
+ GstShmSink *sink;
+ ShmBlock *block;
+} GstShmSinkMemory;
+
+GType gst_shm_sink_allocator_get_type (void);
+
+G_DEFINE_TYPE (GstShmSinkAllocator, gst_shm_sink_allocator, GST_TYPE_ALLOCATOR);
+
+static void
+gst_shm_sink_allocator_dispose (GObject * object)
+{
+ GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (object);
+
+ if (self->sink)
+ gst_object_unref (self->sink);
+ self->sink = NULL;
+
+ G_OBJECT_CLASS (gst_shm_sink_allocator_parent_class)->dispose (object);
+}
+
+static void
+gst_shm_sink_allocator_free (GstAllocator * allocator, GstMemory * mem)
+{
+ GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
+
+ if (mymem->block) {
+ GST_OBJECT_LOCK (mymem->sink);
+ sp_writer_free_block (mymem->block);
+ GST_OBJECT_UNLOCK (mymem->sink);
+ gst_object_unref (mymem->sink);
+ }
+ gst_object_unref (mem->allocator);
+
+ g_slice_free (GstShmSinkMemory, mymem);
+}
+
+static gpointer
+gst_shm_sink_allocator_mem_map (GstMemory * mem, gsize maxsize,
+ GstMapFlags flags)
+{
+ GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
+
+ return mymem->data;
+}
+
+static void
+gst_shm_sink_allocator_mem_unmap (GstMemory * mem)
+{
+}
+
+static GstMemory *
+gst_shm_sink_allocator_mem_share (GstMemory * mem, gssize offset, gssize size)
+{
+ GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
+ GstShmSinkMemory *mysub;
+ GstMemory *parent;
+
+ /* find the real parent */
+ if ((parent = mem->parent) == NULL)
+ parent = mem;
+
+ if (size == -1)
+ size = mem->size - offset;
+
+ mysub = g_slice_new0 (GstShmSinkMemory);
+ /* the shared memory is always readonly */
+ gst_memory_init (GST_MEMORY_CAST (mysub), GST_MINI_OBJECT_FLAGS (parent) |
+ GST_MINI_OBJECT_FLAG_LOCK_READONLY, gst_object_ref (mem->allocator),
+ parent, mem->maxsize, mem->align, mem->offset + offset, size);
+ mysub->data = mymem->data;
+
+ return (GstMemory *) mysub;
+}
+
+static gboolean
+gst_shm_sink_allocator_mem_is_span (GstMemory * mem1, GstMemory * mem2,
+ gsize * offset)
+{
+ GstShmSinkMemory *mymem1 = (GstShmSinkMemory *) mem1;
+ GstShmSinkMemory *mymem2 = (GstShmSinkMemory *) mem2;
+
+ if (offset) {
+ GstMemory *parent;
+
+ parent = mem1->parent;
+
+ *offset = mem1->offset - parent->offset;
+ }
+
+ /* and memory is contiguous */
+ return mymem1->data + mem1->offset + mem1->size ==
+ mymem2->data + mem2->offset;
+}
+
+static void
+gst_shm_sink_allocator_init (GstShmSinkAllocator * self)
+{
+ GstAllocator *allocator = GST_ALLOCATOR (self);
+
+ allocator->mem_map = gst_shm_sink_allocator_mem_map;
+ allocator->mem_unmap = gst_shm_sink_allocator_mem_unmap;
+ allocator->mem_share = gst_shm_sink_allocator_mem_share;
+ allocator->mem_is_span = gst_shm_sink_allocator_mem_is_span;
+}
+
+
+static GstMemory *
+gst_shm_sink_allocator_alloc_locked (GstShmSinkAllocator * self, gsize size,
+ GstAllocationParams * params)
+{
+ GstMemory *memory = NULL;
+ ShmBlock *block = NULL;
+ gsize maxsize = size + params->prefix + params->padding;
+ gsize align = params->align;
+
+ /* ensure configured alignment */
+ align |= gst_memory_alignment;
+ /* allocate more to compensate for alignment */
+ maxsize += align;
+
+ block = sp_writer_alloc_block (self->sink->pipe, size);
+ if (block) {
+ GstShmSinkMemory *mymem;
+ gsize aoffset, padding;
+
+ GST_LOG_OBJECT (self, "Allocated block %p with %u bytes at %p",
+ block, size, sp_writer_block_get_buf (block));
+
+ mymem = g_slice_new0 (GstShmSinkMemory);
+ memory = GST_MEMORY_CAST (mymem);
+ mymem->data = sp_writer_block_get_buf (block);
+ mymem->sink = gst_object_ref (self->sink);
+ mymem->block = block;
+
+ /* do alignment */
+ if ((aoffset = ((guintptr) mymem->data & align))) {
+ aoffset = (align + 1) - aoffset;
+ mymem->data += aoffset;
+ maxsize -= aoffset;
+ }
+
+ if (params->prefix && (params->flags & GST_MEMORY_FLAG_ZERO_PREFIXED))
+ memset (mymem->data, 0, params->prefix);
+
+ padding = maxsize - (params->prefix + size);
+ if (padding && (params->flags & GST_MEMORY_FLAG_ZERO_PADDED))
+ memset (mymem->data + params->prefix + size, 0, padding);
+
+ gst_memory_init (memory, params->flags, g_object_ref (self), NULL,
+ maxsize, align, params->prefix, size);
+ }
+
+ return memory;
+}
+
+static GstMemory *
+gst_shm_sink_allocator_alloc (GstAllocator * allocator, gsize size,
+ GstAllocationParams * params)
+{
+ GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (allocator);
+ GstMemory *memory = NULL;
+
+ GST_OBJECT_LOCK (self->sink);
+ memory = gst_shm_sink_allocator_alloc_locked (self, size, params);
+ GST_OBJECT_UNLOCK (self->sink);
+
+ if (!memory) {
+ memory = gst_allocator_alloc (NULL, size, params);
+ GST_LOG_OBJECT (self, "Not enough shared memory for GstMemory of %u bytes, "
+ "allocating using standard allocator", size);
+ }
+
+ return memory;
+}
+
+
+static void
+gst_shm_sink_allocator_class_init (GstShmSinkAllocatorClass * klass)
+{
+ GstAllocatorClass *allocator_class = GST_ALLOCATOR_CLASS (klass);
+ GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+ allocator_class->alloc = gst_shm_sink_allocator_alloc;
+ allocator_class->free = gst_shm_sink_allocator_free;
+ object_class->dispose = gst_shm_sink_allocator_dispose;
+}
+
+static GstShmSinkAllocator *
+gst_shm_sink_allocator_new (GstShmSink * sink)
+{
+ GstShmSinkAllocator *self = g_object_new (GST_TYPE_SHM_SINK_ALLOCATOR, NULL);
+
+ self->sink = gst_object_ref (sink);
+
+ return self;
+}
+
+
+/***************
+ * MAIN OBJECT *
+ ***************/
+
static void
gst_shm_sink_init (GstShmSink * self)
{
self->size = DEFAULT_SIZE;
self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION;
self->perms = DEFAULT_PERMS;
+
+ gst_allocation_params_init (&self->params);
}
static void
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop);
+ gstbasesink_class->propose_allocation =
+ GST_DEBUG_FUNCPTR (gst_shm_sink_propose_allocation);
g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
g_param_spec_string ("socket-path",
case PROP_SHM_SIZE:
GST_OBJECT_LOCK (object);
if (self->pipe) {
- if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0)
+ if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) {
+ /* Swap allocators, so we can know immediately if the memory is
+ * ours */
+ gst_object_unref (self->allocator);
+ self->allocator = gst_shm_sink_allocator_new (self);
+
GST_DEBUG_OBJECT (self, "Resized shared memory area from %u to "
"%u bytes", self->size, g_value_get_uint (value));
- else
+ } else {
GST_WARNING_OBJECT (self, "Could not resize shared memory area from"
"%u to %u bytes", self->size, g_value_get_uint (value));
+ }
}
self->size = g_value_get_uint (value);
GST_OBJECT_UNLOCK (object);
if (!self->pollthread)
goto thread_error;
+ self->allocator = gst_shm_sink_allocator_new (self);
+
return TRUE;
thread_error:
- sp_close (self->pipe);
+ sp_writer_close (self->pipe, NULL, NULL);
self->pipe = NULL;
gst_poll_free (self->poll);
self->stop = TRUE;
gst_poll_set_flushing (self->poll, TRUE);
+ if (self->allocator)
+ gst_object_unref (self->allocator);
+ self->allocator = NULL;
+
g_thread_join (self->pollthread);
self->pollthread = NULL;
while (self->clients) {
struct GstShmClient *client = self->clients->data;
self->clients = g_list_remove (self->clients, client);
- sp_writer_close_client (self->pipe, client->client);
+ sp_writer_close_client (self->pipe, client->client,
+ (sp_buffer_free_callback) gst_buffer_unref, NULL);
g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
client->pollfd.fd);
g_slice_free (struct GstShmClient, client);
gst_poll_free (self->poll);
self->poll = NULL;
- sp_close (self->pipe);
+ sp_writer_close (self->pipe, NULL, NULL);
self->pipe = NULL;
return TRUE;
b = sp_writer_get_pending_buffers (self->pipe);
for (; b != NULL; b = sp_writer_get_next_buffer (b)) {
- GstClockTime t = sp_writer_buf_get_tag (b);
- if (GST_CLOCK_DIFF (time, t) > self->buffer_time)
+ GstBuffer *buf = sp_writer_buf_get_tag (b);
+ if (GST_CLOCK_DIFF (time, GST_BUFFER_PTS (buf)) > self->buffer_time)
return FALSE;
}
gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstShmSink *self = GST_SHM_SINK (bsink);
- int rv;
+ int rv = 0;
GstMapInfo map;
+ gboolean need_new_memory = FALSE;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstMemory *memory = NULL;
+ GstBuffer *sendbuf = NULL;
GST_OBJECT_LOCK (self);
while (self->wait_for_connection && !self->clients) {
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
- if (self->unlock) {
- GST_OBJECT_UNLOCK (self);
- return GST_FLOW_FLUSHING;
- }
+ if (self->unlock)
+ goto flushing;
}
while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) {
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
- if (self->unlock) {
- GST_OBJECT_UNLOCK (self);
- return GST_FLOW_FLUSHING;
- }
+ if (self->unlock)
+ goto flushing;
}
- gst_buffer_map (buf, &map, GST_MAP_READ);
- rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size,
- GST_BUFFER_TIMESTAMP (buf));
- gst_buffer_unmap (buf, &map);
- if (rv == -1) {
- ShmBlock *block = NULL;
- gchar *shmbuf = NULL;
+ if (gst_buffer_n_memory (buf) > 1) {
+ GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single"
+ " one, need to do a memcpy", buf, gst_buffer_n_memory (buf));
+ need_new_memory = TRUE;
+ } else {
+ memory = gst_buffer_peek_memory (buf, 0);
+ if (memory->allocator != GST_ALLOCATOR (self->allocator)) {
+ need_new_memory = TRUE;
+ GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by "
+ "%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator);
+ }
+ }
+
+ if (need_new_memory) {
if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) {
gsize area_size = sp_writer_get_max_buf_size (self->pipe);
-
GST_OBJECT_UNLOCK (self);
GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT,
("Shared memory area is too small"),
return GST_FLOW_ERROR;
}
- while ((block = sp_writer_alloc_block (self->pipe,
- gst_buffer_get_size (buf))) == NULL) {
+ while ((memory =
+ gst_shm_sink_allocator_alloc_locked (self->allocator,
+ gst_buffer_get_size (buf), &self->params)) == NULL) {
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
- if (self->unlock) {
- GST_OBJECT_UNLOCK (self);
- return GST_FLOW_FLUSHING;
- }
+ if (self->unlock)
+ goto flushing;
}
+
while (self->wait_for_connection && !self->clients) {
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
if (self->unlock) {
- sp_writer_free_block (block);
+ gst_memory_unref (memory);
GST_OBJECT_UNLOCK (self);
return GST_FLOW_FLUSHING;
}
}
- shmbuf = sp_writer_block_get_buf (block);
- gst_buffer_extract (buf, 0, shmbuf, gst_buffer_get_size (buf));
- sp_writer_send_buf (self->pipe, shmbuf, gst_buffer_get_size (buf),
- GST_BUFFER_TIMESTAMP (buf));
- sp_writer_free_block (block);
+ gst_memory_map (memory, &map, GST_MAP_WRITE);
+ gst_buffer_extract (buf, 0, map.data, map.size);
+ gst_memory_unmap (memory, &map);
+
+ sendbuf = gst_buffer_new ();
+ gst_buffer_copy_into (sendbuf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
+ gst_buffer_append_memory (sendbuf, memory);
+ } else {
+ sendbuf = gst_buffer_ref (buf);
}
- GST_OBJECT_UNLOCK (self);
+ gst_buffer_map (sendbuf, &map, GST_MAP_READ);
+ /* Make the memory readonly as of now as we've sent it to the other side
+ * We know it's not mapped for writing anywhere as we just mapped it for
+ * reading
+ */
- return GST_FLOW_OK;
-}
+ rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf);
-#if 0
+ gst_buffer_unmap (sendbuf, &map);
-/* FIXME 0.11 implement some bufferpool support */
+ GST_OBJECT_UNLOCK (self);
-static void
-gst_shm_sink_free_buffer (gpointer data)
-{
- ShmPipe *pipe;
- ShmBlock *block = data;
- GstShmSink *self;
+ if (rv == -1) {
+ GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Invalid allocated buffer"),
+ ("The shmpipe library rejects our buffer, this is a bug"));
+ ret = GST_FLOW_ERROR;
+ }
- pipe = sp_writer_block_get_pipe (block);
- self = sp_get_data (pipe);
+ /* If we allocated our own memory, then unmap it */
- GST_OBJECT_LOCK (self);
- sp_writer_free_block (block);
+ return ret;
+
+flushing:
GST_OBJECT_UNLOCK (self);
- g_object_unref (self);
+ return GST_FLOW_FLUSHING;
}
-static GstFlowReturn
-gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
- GstCaps * caps, GstBuffer ** out_buf)
+static void
+free_buffer_locked (GstBuffer * buffer, void *data)
{
- GstShmSink *self = GST_SHM_SINK (sink);
- GstBuffer *buffer;
- ShmBlock *block = NULL;
- gpointer buf = NULL;
-
- GST_OBJECT_LOCK (self);
- block = sp_writer_alloc_block (self->pipe, size);
- if (block) {
- buf = sp_writer_block_get_buf (block);
- g_object_ref (self);
- }
- GST_OBJECT_UNLOCK (self);
-
- if (block) {
- buffer = gst_buffer_new ();
- GST_BUFFER_DATA (buffer) = buf;
- GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block;
- GST_BUFFER_FREE_FUNC (buffer) =
- GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer);
- GST_BUFFER_SIZE (buffer) = size;
- GST_LOG_OBJECT (self,
- "Allocated buffer of %u bytes from shared memory at %p", size, buf);
- } else {
- buffer = gst_buffer_new_and_alloc (size);
- GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %u bytes, "
- "allocating using standard allocator", size);
- }
-
- GST_BUFFER_OFFSET (buffer) = offset;
- gst_buffer_set_caps (buffer, caps);
+ GSList **list = data;
- *out_buf = buffer;
+ g_assert (buffer != NULL);
- return GST_FLOW_OK;
+ *list = g_slist_prepend (*list, buffer);
}
-#endif
static gpointer
pollthread_func (gpointer data)
if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) {
int rv;
+ gpointer tag = NULL;
GST_OBJECT_LOCK (self);
- rv = sp_writer_recv (self->pipe, gclient->client);
+ rv = sp_writer_recv (self->pipe, gclient->client, &tag);
GST_OBJECT_UNLOCK (self);
if (rv < 0) {
" closing (retval: %d errno: %d)", rv, errno);
goto close_client;
}
+
+ g_assert (rv == 0 || tag == NULL);
+
+ if (rv == 0)
+ gst_buffer_unref (tag);
}
continue;
close_client:
- GST_OBJECT_LOCK (self);
- sp_writer_close_client (self->pipe, gclient->client);
- GST_OBJECT_UNLOCK (self);
+ {
+ GSList *list = NULL;
+ GST_OBJECT_LOCK (self);
+ sp_writer_close_client (self->pipe, gclient->client,
+ (sp_buffer_free_callback) free_buffer_locked, (void **) &list);
+ GST_OBJECT_UNLOCK (self);
+ g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref);
+ }
gst_poll_remove_fd (self->poll, &gclient->pollfd);
self->clients = g_list_remove (self->clients, gclient);
return TRUE;
}
+
+static gboolean
+gst_shm_sink_propose_allocation (GstBaseSink * sink, GstQuery * query)
+{
+ GstShmSink *self = GST_SHM_SINK (sink);
+
+ if (self->allocator)
+ gst_query_add_allocation_param (query, GST_ALLOCATOR (self->allocator),
+ NULL);
+
+ return TRUE;
+}
int num_clients;
int clients[0];
- uint64_t tag;
+ void *tag;
};
static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
static void sp_close_shm (ShmArea * area);
static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
- ShmBuffer * prev_buf, ShmClient * client);
+ ShmBuffer * prev_buf, ShmClient * client, void **tag);
static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
#define RETURN_ERROR(format, ...) do { \
fprintf (stderr, format, __VA_ARGS__); \
- sp_close (self); \
+ sp_writer_close (self, NULL, NULL); \
return NULL; \
} while (0)
}
void
-sp_close (ShmPipe * self)
+sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
+ void *user_data)
{
if (self->main_socket >= 0)
close (self->main_socket);
}
while (self->clients)
- sp_writer_close_client (self, self->clients);
+ sp_writer_close_client (self, self->clients, callback, user_data);
sp_dec (self);
}
+void
+sp_client_close (ShmPipe * self)
+{
+ sp_writer_close (self, NULL, NULL);
+}
+
+
int
sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
{
/* Returns the number of client this has successfully been sent to */
int
-sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag)
+sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
{
ShmArea *area = NULL;
unsigned long offset = 0;
}
int
-sp_writer_recv (ShmPipe * self, ShmClient * client)
+sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
{
ShmBuffer *buf = NULL, *prev_buf = NULL;
struct CommandBuffer cb;
for (buf = self->buffers; buf; buf = buf->next) {
if (buf->shm_area->id == cb.area_id &&
buf->offset == cb.payload.ack_buffer.offset) {
- sp_shmbuf_dec (self, buf, prev_buf, client);
+ return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
break;
}
prev_buf = buf;
return self;
error:
- sp_close (self);
+ sp_client_close (self);
return NULL;
}
static int
sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
- ShmClient * client)
+ ShmClient * client, void **tag)
{
int i;
int had_client = 0;
else
self->buffers = buf->next;
+ if (tag)
+ *tag = buf->tag;
shm_alloc_space_block_dec (buf->ablock);
sp_shm_area_dec (self, buf->shm_area);
spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
}
void
-sp_writer_close_client (ShmPipe * self, ShmClient * client)
+sp_writer_close_client (ShmPipe * self, ShmClient * client,
+ sp_buffer_free_callback callback, void *user_data)
{
ShmBuffer *buffer = NULL, *prev_buf = NULL;
ShmClient *item = NULL, *prev_item = NULL;
again:
for (buffer = self->buffers; buffer; buffer = buffer->next) {
int i;
+ void *tag = NULL;
for (i = 0; i < buffer->num_clients; i++) {
if (buffer->clients[i] == client->fd) {
- if (!sp_shmbuf_dec (self, buffer, prev_buf, client))
+ if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
+ if (callback)
+ callback (tag, user_data);
goto again;
+ }
break;
}
}
return buffer->next;
}
-uint64_t
+void *
sp_writer_buf_get_tag (ShmBuffer * buffer)
{
return buffer->tag;