From: William Manley Date: Fri, 13 Mar 2015 12:49:31 +0000 (+0000) Subject: multisocketsink: Map `GstMemory`s individually when sending X-Git-Tag: 1.6.0~462 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=b8232a74673afae38bc3bff70c7ed0d592209556;p=platform%2Fupstream%2Fgst-plugins-base.git multisocketsink: Map `GstMemory`s individually when sending If a buffer is made up of non-contiguous `GstMemory`s `gst_buffer_map` has to copy all the data into a new `GstMemory` which is contiguous. By mapping all the `GstMemory`s individually and then using scatter-gather IO we avoid this situation. This is a preparatory step for adding support to multisocketsink for sending file descriptors, where a GstBuffer may be made up of several `GstMemory`s, some of which are backed by a memfd or file, but I think this patch is valid and useful on its own. Fixes https://bugzilla.gnome.org/show_bug.cgi?id=746150 --- diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index 01647b1..fa4eee5 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -600,6 +600,100 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, return ret; } +/** + * map_memory_output_vector_n: + * @buf: The #GstBuffer that should be mapped + * @offset: Offset into the buffer that should be mapped + * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into + * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into + * @num_vectors: the number of elements in @vectors to prevent buffer overruns + * + * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather + * I/O to send the data over a socket. The whole buffer won't be mapped into + * memory if it consists of more than @num_vectors #GstMemory s. + * + * Use #unmap_n_memorys after you are + * finished with the mappings. + * + * Returns: The number of GstMemorys mapped + */ +static int +map_n_memory_output_vector (GstBuffer * buf, size_t offset, + GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors) +{ + guint mem_idx, mem_len; + gsize mem_skip; + size_t maxsize; + int i; + + g_return_val_if_fail (num_vectors > 0, 0); + memset (vectors, 0, sizeof (GOutputVector) * num_vectors); + + maxsize = gst_buffer_get_size (buf) - offset; + if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len, + &mem_skip)) + g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer " + "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf)); + + for (i = 0; i < mem_len && i < num_vectors; i++) { + GstMapInfo map = { 0 }; + GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i); + if (!gst_memory_map (mem, &map, GST_MAP_READ)) + g_error ("Unable to map memory %p. This should never happen.", mem); + + if (i == 0) { + vectors[i].buffer = map.data + mem_skip; + vectors[i].size = map.size - mem_skip; + } else { + vectors[i].buffer = map.data; + vectors[i].size = map.size; + } + mapinfo[i] = map; + } + return i; +} + +/** + * map_n_memory_output_vector: + * @buf: The #GstBuffer that should be mapped + * @offset: Offset into the buffer that should be mapped + * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into + * @num_vectors: the number of elements in @vectors to prevent buffer overruns + * + * Returns: The number of GstMemorys mapped + */ +static void +unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings) +{ + int i; + g_return_if_fail (num_mappings > 0); + + for (i = 0; i < num_mappings; i++) + gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]); +} + +static gssize +gst_multi_socket_sink_write (GstMultiSocketSink * sink, + GSocket * sock, GstBuffer * buffer, gsize bufoffset, + GCancellable * cancellable, GError ** err) +{ + GstMapInfo maps[8]; + GOutputVector vec[8]; + guint mems_mapped; + gssize wrote; + + mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8); + + wrote = + g_socket_send_message (sock, NULL, vec, mems_mapped, NULL, 0, 0, + cancellable, err); + + unmap_n_memorys (maps, mems_mapped); + + return wrote; +} + + /* Handle a write on a client, * which indicates a read request from a client. * @@ -644,8 +738,6 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, more = TRUE; do { - gint maxsize; - if (!mhclient->sending) { /* client is not working on a buffer */ if (mhclient->bufpos == -1) { @@ -725,22 +817,12 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, if (mhclient->sending) { gssize wrote; GstBuffer *head; - GstMapInfo map; /* pick first buffer from list */ head = GST_BUFFER (mhclient->sending->data); - gst_buffer_map (head, &map, GST_MAP_READ); - maxsize = map.size - mhclient->bufoffset; - - /* FIXME: specific */ - /* try to write the complete buffer */ - - wrote = - g_socket_send (mhclient->handle.socket, - (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable, - &err); - gst_buffer_unmap (head, &map); + wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head, + mhclient->bufoffset, sink->cancellable, &err); if (wrote < 0) { /* hmm error.. */ @@ -755,7 +837,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, goto write_error; } } else { - if (wrote < maxsize) { + if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) { /* partial write, try again now */ GST_LOG_OBJECT (sink, "partial write on %p of %" G_GSSIZE_FORMAT " bytes", diff --git a/tests/check/elements/multisocketsink.c b/tests/check/elements/multisocketsink.c index a62e8d3..5d9d788 100644 --- a/tests/check/elements/multisocketsink.c +++ b/tests/check/elements/multisocketsink.c @@ -180,12 +180,15 @@ GST_START_TEST (test_add_client) gst_check_setup_events (mysrcpad, sink, caps, GST_FORMAT_BYTES); ASSERT_CAPS_REFCOUNT (caps, "caps", 3); gst_buffer_fill (buffer, 0, "dead", 4); + gst_buffer_append_memory (buffer, + gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) " good", 5, + 0, 5, NULL, NULL)); fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); GST_DEBUG ("reading"); - fail_if (read_handle (srcsocket, data, 4) < 4); - fail_unless (strncmp (data, "dead", 4) == 0); - wait_bytes_served (sink, 4); + fail_if (read_handle (srcsocket, data, 9) < 9); + fail_unless (strncmp (data, "dead good", 9) == 0); + wait_bytes_served (sink, 9); GST_DEBUG ("cleaning up multisocketsink"); ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); @@ -200,6 +203,78 @@ GST_START_TEST (test_add_client) GST_END_TEST; +typedef struct +{ + GSocket *sinksocket, *srcsocket; + GstElement *sink; +} TestSinkAndSocket; + +static void +setup_sink_with_socket (TestSinkAndSocket * tsas) +{ + GstCaps *caps = NULL; + + tsas->sink = setup_multisocketsink (); + fail_unless (setup_handles (&tsas->sinksocket, &tsas->srcsocket)); + + ASSERT_SET_STATE (tsas->sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); + + /* add the client */ + g_signal_emit_by_name (tsas->sink, "add", tsas->sinksocket); + + caps = gst_caps_from_string ("application/x-gst-check"); + gst_check_setup_events (mysrcpad, tsas->sink, caps, GST_FORMAT_BYTES); + gst_caps_unref (caps); +} + +static void +teardown_sink_with_socket (TestSinkAndSocket * tsas) +{ + if (tsas->sink != NULL) { + ASSERT_SET_STATE (tsas->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_multisocketsink (tsas->sink); + tsas->sink = 0; + } + if (tsas->sinksocket != NULL) { + g_object_unref (tsas->sinksocket); + tsas->sinksocket = 0; + } + if (tsas->srcsocket != NULL) { + g_object_unref (tsas->srcsocket); + tsas->srcsocket = 0; + } +} + +GST_START_TEST (test_sending_buffers_with_9_gstmemorys) +{ + TestSinkAndSocket tsas = { 0 }; + GstBuffer *buffer; + int i; + const char *numbers[9] = { "one", "two", "three", "four", "five", "six", + "seven", "eight", "nine" + }; + const char numbers_concat[] = "onetwothreefourfivesixseveneightnine"; + gchar data[sizeof (numbers_concat)]; + int len = sizeof (numbers_concat) - 1; + + setup_sink_with_socket (&tsas); + + buffer = gst_buffer_new (); + for (i = 0; i < sizeof (numbers) / sizeof (*numbers); i++) + gst_buffer_append_memory (buffer, + gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) numbers[i], + strlen (numbers[i]), 0, strlen (numbers[i]), NULL, NULL)); + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + + GST_DEBUG ("reading"); + fail_if (read_handle (tsas.srcsocket, data, len) < len); + fail_unless (strncmp (data, numbers_concat, len) == 0); + + teardown_sink_with_socket (&tsas); +} + +GST_END_TEST; + /* from the given two data buffers, create two streamheader buffers and * some caps that match it, and store them in the given pointers * returns one ref to each of the buffers and the caps */ @@ -874,6 +949,7 @@ multisocketsink_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_no_clients); tcase_add_test (tc_chain, test_add_client); + tcase_add_test (tc_chain, test_sending_buffers_with_9_gstmemorys); tcase_add_test (tc_chain, test_streamheader); tcase_add_test (tc_chain, test_change_streamheader); tcase_add_test (tc_chain, test_burst_client_bytes);