From: Tim-Philipp Müller Date: Fri, 20 Jun 2014 10:36:19 +0000 (+0100) Subject: multiudpsink: add sendmmsg-ready render_list function prototype X-Git-Tag: 1.6.0~705 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=e1a7deb27f7eeb037e3361551e5e853df0920b05;p=platform%2Fupstream%2Fgst-plugins-good.git multiudpsink: add sendmmsg-ready render_list function prototype Add prototype for a render_list() function that can use a sendmmsg-style g_socket_send_messages() function once it lands in GLib. We can use this infrastructure to send multiple buffers made up by multiple memories to multiple clients in one go, which drastically reduces the number of syscalls made when sending high-bitrate video streams. https://bugzilla.gnome.org/show_bug.cgi?id=732152 --- diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index 2b346a1..d2fbca7 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -120,6 +120,8 @@ static void gst_multiudpsink_finalize (GObject * object); static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink, GstBuffer * buffer); +static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink, + GstBufferList * buffer_list); static gboolean gst_multiudpsink_start (GstBaseSink * bsink); static gboolean gst_multiudpsink_stop (GstBaseSink * bsink); @@ -359,6 +361,7 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) "Wim Taymans "); gstbasesink_class->render = gst_multiudpsink_render; + gstbasesink_class->render_list = gst_multiudpsink_render_list; gstbasesink_class->start = gst_multiudpsink_start; gstbasesink_class->stop = gst_multiudpsink_stop; gstbasesink_class->unlock = gst_multiudpsink_unlock; @@ -408,6 +411,9 @@ gst_multiudpsink_init (GstMultiUDPSink * sink) sink->vec = g_new (GOutputVector, max_mem); sink->map = g_new (GstMapInfo, max_mem); + + /* we assume that the number of memories per buffer can fit into a guint8 */ + g_warn_if_fail (max_mem <= G_MAXUINT8); } static GstUDPClient * @@ -533,6 +539,375 @@ gst_multiudpsink_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +/* replacement until we can depend unconditionally on the real one in GLib */ +#ifndef HAVE_G_SOCKET_SEND_MESSAGES +#define g_socket_send_messages gst_socket_send_messages + +static gint +gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages, + guint num_messages, gint flags, GCancellable * cancellable, GError ** error) +{ + gssize result; + gint i; + + for (i = 0; i < num_messages; ++i) { + GstOutputMessage *msg = &messages[i]; + GError *msg_error = NULL; + + result = g_socket_send_message (socket, msg->address, + msg->vectors, msg->num_vectors, + msg->control_messages, msg->num_control_messages, + flags, cancellable, &msg_error); + + if (result < 0) { + /* if we couldn't send all messages, just return how many we did + * manage to send, provided we managed to send at least one */ + if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) { + g_error_free (msg_error); + return i; + } else { + g_propagate_error (error, msg_error); + return -1; + } + } + + msg->bytes_sent = result; + } + + return i; +} +#endif /* HAVE_G_SOCKET_SEND_MESSAGES */ + +static gsize +fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf) +{ + GstMemory *mem; + gsize size = 0; + guint i; + + g_assert (gst_buffer_n_memory (buf) == n); + + for (i = 0; i < n; ++i) { + mem = gst_buffer_peek_memory (buf, i); + if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) { + vecs[i].buffer = maps[i].data; + vecs[i].size = maps[i].size; + } else { + GST_WARNING ("Failed to map memory %p for reading", mem); + vecs[i].buffer = ""; + vecs[i].size = 0; + } + size += vecs[i].size; + } + + return size; +} + +static gsize +gst_udp_calc_message_size (GstOutputMessage * msg) +{ + gsize size = 0; + guint i; + + for (i = 0; i < msg->num_vectors; ++i) + size += msg->vectors[i].size; + + return size; +} + +static gint +gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages, + guint num_messages) +{ + guint i; + + for (i = 0; i < num_messages; ++i) { + GstOutputMessage *msg = &messages[i]; + + if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0) + return i; + } + + return -1; +} + +static inline gchar * +gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size) +{ + GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr); + GInetAddress *ia; + gchar *addr_str; + + ia = g_inet_socket_address_get_address (isa); + addr_str = g_inet_address_to_string (ia); + g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa)); + g_free (addr_str); + g_object_unref (ia); + + return s; +} + +/* Wrapper around g_socket_send_messages() plus error handling (ignoring). + * Returns FALSE if we got cancelled, otherwise TRUE. */ +static gboolean +gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket, + GstOutputMessage * messages, guint num_messages) +{ + gboolean sent_max_size_warning = FALSE; + + while (num_messages > 0) { + gchar astr[64] G_GNUC_UNUSED; + GError *err = NULL; + guint msg_size, skip, i; + gint ret, err_idx; + + ret = g_socket_send_messages (socket, messages, num_messages, 0, + sink->cancellable, &err); + + if (G_UNLIKELY (ret < 0)) { + GstOutputMessage *msg; + + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_clear_error (&err); + return FALSE; + } + + err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages); + if (err_idx < 0) + break; + + msg = &messages[err_idx]; + msg_size = gst_udp_calc_message_size (msg); + + GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size, + gst_udp_address_get_string (msg->address, astr, sizeof (astr)), + err->message); + + skip = 1; + if (msg_size > UDP_MAX_SIZE) { + if (!sent_max_size_warning) { + GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, + ("Attempting to send a UDP packets larger than maximum size " + "(%u > %d)", msg_size, UDP_MAX_SIZE), + ("Reason: %s", err ? err->message : "unknown reason")); + sent_max_size_warning = FALSE; + } + } else { + GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, + ("Error sending UDP packets"), ("client %s, reason: %s", + gst_udp_address_get_string (msg->address, astr, sizeof (astr)), + (err != NULL) ? err->message : "unknown reason")); + + for (i = err_idx + 1; i < num_messages; ++i, ++skip) { + if (messages[i].address != msg->address) + break; + } + GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip); + } + + /* ignore any errors and try sending the rest */ + g_clear_error (&err); + ret = skip; + } + + g_assert (ret <= num_messages); + + messages += ret; + num_messages -= ret; + } + + return TRUE; +} + +static GstFlowReturn +gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers, + guint num_buffers, guint8 * mem_nums, guint total_mem_num) +{ + GstOutputMessage *msgs; + gboolean send_duplicates; + GstUDPClient **clients; + GOutputVector *vecs; + GstMapInfo *map_infos; + GstFlowReturn flow_ret; + guint num_addr_v4, num_addr_v6; + guint num_addr, num_msgs; + GError *err = NULL; + guint i, j, mem; + gsize size = 0; + GList *l; + + send_duplicates = sink->send_duplicates; + + g_mutex_lock (&sink->client_lock); + + if (send_duplicates) { + num_addr_v4 = sink->num_v4_all; + num_addr_v6 = sink->num_v6_all; + } else { + num_addr_v4 = sink->num_v4_unique; + num_addr_v6 = sink->num_v6_unique; + } + num_addr = num_addr_v4 + num_addr_v6; + + if (num_addr == 0) + goto no_clients; + + clients = g_newa (GstUDPClient *, num_addr); + for (l = sink->clients, i = 0; l != NULL; l = l->next) { + GstUDPClient *client = l->data; + + clients[i++] = gst_udp_client_ref (client); + for (j = 1; send_duplicates && j < client->add_count; ++j) + clients[i++] = gst_udp_client_ref (client); + } + g_assert_cmpuint (i, ==, num_addr); + + g_mutex_unlock (&sink->client_lock); + + GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients", + num_buffers, total_mem_num, num_addr); + + vecs = g_newa (GOutputVector, total_mem_num); + map_infos = g_newa (GstMapInfo, total_mem_num); + + num_msgs = num_addr * num_buffers; + msgs = g_newa (GstOutputMessage, num_msgs); + + /* populate first num_buffers messages with output vectors for the buffers */ + for (i = 0, mem = 0; i < num_buffers; ++i) { + size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]); + msgs[i].vectors = &vecs[mem]; + msgs[i].num_vectors = mem_nums[i]; + msgs[i].num_control_messages = 0; + msgs[i].control_messages = NULL; + msgs[i].address = clients[0]->addr; + mem += mem_nums[i]; + } + + /* FIXME: how about some locking? (there wasn't any before either, but..) */ + sink->bytes_to_serve += size; + + /* now copy the pre-filled num_buffer messages over to the next num_buffer + * messages for the next client, where we also change the target adddress */ + for (i = 1; i < num_addr; ++i) { + for (j = 0; j < num_buffers; ++j) { + msgs[i * num_buffers + j] = msgs[j]; + msgs[i * num_buffers + j].address = clients[i]->addr; + } + } + + /* now send it! */ + { + gboolean ret; + + /* no IPv4 socket? Send it all from the IPv6 socket then.. */ + if (sink->used_socket == NULL) { + ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6, + msgs, num_msgs); + } else { + guint num_msgs_v4 = num_buffers * num_addr_v4; + guint num_msgs_v6 = num_buffers * num_addr_v6; + + /* FIXME: assumes clients are sorted in our list! */ + ret = gst_multiudpsink_send_messages (sink, sink->used_socket, + msgs, num_msgs_v4); + + if (!ret) + goto cancelled; + + ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6, + msgs + num_msgs_v4, num_msgs_v6); + } + + if (!ret) + goto cancelled; + } + + flow_ret = GST_FLOW_OK; + + /* now update stats */ + g_mutex_lock (&sink->client_lock); + + for (i = 0; i < num_addr; ++i) { + GstUDPClient *client = clients[i]; + + for (j = 0; j < num_buffers; ++j) { + gsize bytes_sent; + + bytes_sent = msgs[i * num_buffers + j].bytes_sent; + + client->bytes_sent += bytes_sent; + client->packets_sent++; + sink->bytes_served += bytes_sent; + } + gst_udp_client_unref (client); + } + + g_mutex_unlock (&sink->client_lock); + +out: + + for (i = 0; i < mem; ++i) + gst_memory_unmap (map_infos[i].memory, &map_infos[i]); + + return flow_ret;; + +no_clients: + { + g_mutex_unlock (&sink->client_lock); + GST_LOG_OBJECT (sink, "no clients"); + return GST_FLOW_OK; + } +cancelled: + { + GST_INFO_OBJECT (sink, "cancelled"); + g_clear_error (&err); + flow_ret = GST_FLOW_FLUSHING; + + g_mutex_lock (&sink->client_lock); + for (i = 0; i < num_addr; ++i) + gst_udp_client_unref (clients[i]); + g_mutex_unlock (&sink->client_lock); + goto out; + } +} + +static GstFlowReturn +gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list) +{ + GstMultiUDPSink *sink; + GstBuffer **buffers; + GstFlowReturn flow; + guint8 *mem_nums; + guint total_mems; + guint i, num_buffers; + + sink = GST_MULTIUDPSINK_CAST (bsink); + + num_buffers = gst_buffer_list_length (buffer_list); + if (num_buffers == 0) + goto no_data; + + buffers = g_newa (GstBuffer *, num_buffers); + mem_nums = g_newa (guint8, num_buffers); + for (i = 0, total_mems = 0; i < num_buffers; ++i) { + buffers[i] = gst_buffer_list_get (buffer_list, i); + mem_nums[i] = gst_buffer_n_memory (buffers[i]); + total_mems += mem_nums[i]; + } + + flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers, + mem_nums, total_mems); + + return flow; + +no_data: + { + GST_LOG_OBJECT (sink, "empty buffer"); + return GST_FLOW_OK; + } +} + static GstFlowReturn gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { diff --git a/gst/udp/gstmultiudpsink.h b/gst/udp/gstmultiudpsink.h index ee9776d..f443dd9 100644 --- a/gst/udp/gstmultiudpsink.h +++ b/gst/udp/gstmultiudpsink.h @@ -38,6 +38,28 @@ G_BEGIN_DECLS typedef struct _GstMultiUDPSink GstMultiUDPSink; typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass; +#if GLIB_CHECK_VERSION (2, 43, 2) +#define HAVE_G_SOCKET_SEND_MESSAGES +#endif + +#ifndef HAVE_G_SOCKET_SEND_MESSAGES +/* same as GOutputMessage used for g_socket_send_messages() */ +typedef struct { + /*< private >*/ + GSocketAddress *address; + + GOutputVector *vectors; + guint num_vectors; + + guint bytes_sent; + + GSocketControlMessage **control_messages; + guint num_control_messages; +} GstOutputMessage; +#else +typedef GOutputMessage GstOutputMessage; +#endif /* HAVE_G_SOCKET_SEND_MESSAGES*/ + typedef struct { gint ref_count; /* for memory management */ gint add_count; /* how often this address has been added */ @@ -61,6 +83,7 @@ struct _GstMultiUDPSink { GSocket *used_socket, *used_socket_v6; GCancellable *cancellable; + /* client management */ GMutex client_lock; GList *clients; guint num_v4_unique; /* number IPv4 clients (excluding duplicates) */ @@ -68,6 +91,7 @@ struct _GstMultiUDPSink { guint num_v6_unique; /* number IPv6 clients (excluding duplicates) */ guint num_v6_all; /* number IPv6 clients (including duplicates) */ + /* pre-allocated scrap space for render function */ GOutputVector *vec; GstMapInfo *map;