multisocketsink: Map `GstMemory`s individually when sending
authorWilliam Manley <will@williammanley.net>
Fri, 13 Mar 2015 12:49:31 +0000 (12:49 +0000)
committerWim Taymans <wtaymans@redhat.com>
Fri, 13 Mar 2015 15:20:50 +0000 (16:20 +0100)
If a buffer is made up of non-contiguous `GstMemory`s `gst_buffer_map`
has to copy all the data into a new `GstMemory` which is contiguous.  By
mapping all the `GstMemory`s individually and then using scatter-gather
IO we avoid this situation.

This is a preparatory step for adding support to multisocketsink for
sending file descriptors, where a GstBuffer may be made up of several
`GstMemory`s, some of which are backed by a memfd or file, but I think this
patch is valid and useful on its own.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=746150

gst/tcp/gstmultisocketsink.c
tests/check/elements/multisocketsink.c

index 01647b1..fa4eee5 100644 (file)
@@ -600,6 +600,100 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
   return ret;
 }
 
+/**
+ * map_memory_output_vector_n:
+ * @buf: The #GstBuffer that should be mapped
+ * @offset: Offset into the buffer that should be mapped
+ * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
+ * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
+ * @num_vectors: the number of elements in @vectors to prevent buffer overruns
+ *
+ * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
+ * I/O to send the data over a socket.  The whole buffer won't be mapped into
+ * memory if it consists of more than @num_vectors #GstMemory s.
+ *
+ * Use #unmap_n_memorys after you are
+ * finished with the mappings.
+ *
+ * Returns: The number of GstMemorys mapped
+ */
+static int
+map_n_memory_output_vector (GstBuffer * buf, size_t offset,
+    GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
+{
+  guint mem_idx, mem_len;
+  gsize mem_skip;
+  size_t maxsize;
+  int i;
+
+  g_return_val_if_fail (num_vectors > 0, 0);
+  memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
+
+  maxsize = gst_buffer_get_size (buf) - offset;
+  if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
+          &mem_skip))
+    g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
+        "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
+
+  for (i = 0; i < mem_len && i < num_vectors; i++) {
+    GstMapInfo map = { 0 };
+    GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
+    if (!gst_memory_map (mem, &map, GST_MAP_READ))
+      g_error ("Unable to map memory %p.  This should never happen.", mem);
+
+    if (i == 0) {
+      vectors[i].buffer = map.data + mem_skip;
+      vectors[i].size = map.size - mem_skip;
+    } else {
+      vectors[i].buffer = map.data;
+      vectors[i].size = map.size;
+    }
+    mapinfo[i] = map;
+  }
+  return i;
+}
+
+/**
+ * map_n_memory_output_vector:
+ * @buf: The #GstBuffer that should be mapped
+ * @offset: Offset into the buffer that should be mapped
+ * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
+ * @num_vectors: the number of elements in @vectors to prevent buffer overruns
+ *
+ * Returns: The number of GstMemorys mapped
+ */
+static void
+unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
+{
+  int i;
+  g_return_if_fail (num_mappings > 0);
+
+  for (i = 0; i < num_mappings; i++)
+    gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
+}
+
+static gssize
+gst_multi_socket_sink_write (GstMultiSocketSink * sink,
+    GSocket * sock, GstBuffer * buffer, gsize bufoffset,
+    GCancellable * cancellable, GError ** err)
+{
+  GstMapInfo maps[8];
+  GOutputVector vec[8];
+  guint mems_mapped;
+  gssize wrote;
+
+  mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
+
+  wrote =
+      g_socket_send_message (sock, NULL, vec, mems_mapped, NULL, 0, 0,
+      cancellable, err);
+
+  unmap_n_memorys (maps, mems_mapped);
+
+  return wrote;
+}
+
+
 /* Handle a write on a client,
  * which indicates a read request from a client.
  *
@@ -644,8 +738,6 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
 
   more = TRUE;
   do {
-    gint maxsize;
-
     if (!mhclient->sending) {
       /* client is not working on a buffer */
       if (mhclient->bufpos == -1) {
@@ -725,22 +817,12 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
     if (mhclient->sending) {
       gssize wrote;
       GstBuffer *head;
-      GstMapInfo map;
 
       /* pick first buffer from list */
       head = GST_BUFFER (mhclient->sending->data);
 
-      gst_buffer_map (head, &map, GST_MAP_READ);
-      maxsize = map.size - mhclient->bufoffset;
-
-      /* FIXME: specific */
-      /* try to write the complete buffer */
-
-      wrote =
-          g_socket_send (mhclient->handle.socket,
-          (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
-          &err);
-      gst_buffer_unmap (head, &map);
+      wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
+          mhclient->bufoffset, sink->cancellable, &err);
 
       if (wrote < 0) {
         /* hmm error.. */
@@ -755,7 +837,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
           goto write_error;
         }
       } else {
-        if (wrote < maxsize) {
+        if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
           /* partial write, try again now */
           GST_LOG_OBJECT (sink,
               "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
index a62e8d3..5d9d788 100644 (file)
@@ -180,12 +180,15 @@ GST_START_TEST (test_add_client)
   gst_check_setup_events (mysrcpad, sink, caps, GST_FORMAT_BYTES);
   ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
   gst_buffer_fill (buffer, 0, "dead", 4);
+  gst_buffer_append_memory (buffer,
+      gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) " good", 5,
+          0, 5, NULL, NULL));
   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
 
   GST_DEBUG ("reading");
-  fail_if (read_handle (srcsocket, data, 4) < 4);
-  fail_unless (strncmp (data, "dead", 4) == 0);
-  wait_bytes_served (sink, 4);
+  fail_if (read_handle (srcsocket, data, 9) < 9);
+  fail_unless (strncmp (data, "dead good", 9) == 0);
+  wait_bytes_served (sink, 9);
 
   GST_DEBUG ("cleaning up multisocketsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
@@ -200,6 +203,78 @@ GST_START_TEST (test_add_client)
 
 GST_END_TEST;
 
+typedef struct
+{
+  GSocket *sinksocket, *srcsocket;
+  GstElement *sink;
+} TestSinkAndSocket;
+
+static void
+setup_sink_with_socket (TestSinkAndSocket * tsas)
+{
+  GstCaps *caps = NULL;
+
+  tsas->sink = setup_multisocketsink ();
+  fail_unless (setup_handles (&tsas->sinksocket, &tsas->srcsocket));
+
+  ASSERT_SET_STATE (tsas->sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  /* add the client */
+  g_signal_emit_by_name (tsas->sink, "add", tsas->sinksocket);
+
+  caps = gst_caps_from_string ("application/x-gst-check");
+  gst_check_setup_events (mysrcpad, tsas->sink, caps, GST_FORMAT_BYTES);
+  gst_caps_unref (caps);
+}
+
+static void
+teardown_sink_with_socket (TestSinkAndSocket * tsas)
+{
+  if (tsas->sink != NULL) {
+    ASSERT_SET_STATE (tsas->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+    cleanup_multisocketsink (tsas->sink);
+    tsas->sink = 0;
+  }
+  if (tsas->sinksocket != NULL) {
+    g_object_unref (tsas->sinksocket);
+    tsas->sinksocket = 0;
+  }
+  if (tsas->srcsocket != NULL) {
+    g_object_unref (tsas->srcsocket);
+    tsas->srcsocket = 0;
+  }
+}
+
+GST_START_TEST (test_sending_buffers_with_9_gstmemorys)
+{
+  TestSinkAndSocket tsas = { 0 };
+  GstBuffer *buffer;
+  int i;
+  const char *numbers[9] = { "one", "two", "three", "four", "five", "six",
+    "seven", "eight", "nine"
+  };
+  const char numbers_concat[] = "onetwothreefourfivesixseveneightnine";
+  gchar data[sizeof (numbers_concat)];
+  int len = sizeof (numbers_concat) - 1;
+
+  setup_sink_with_socket (&tsas);
+
+  buffer = gst_buffer_new ();
+  for (i = 0; i < sizeof (numbers) / sizeof (*numbers); i++)
+    gst_buffer_append_memory (buffer,
+        gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) numbers[i],
+            strlen (numbers[i]), 0, strlen (numbers[i]), NULL, NULL));
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  GST_DEBUG ("reading");
+  fail_if (read_handle (tsas.srcsocket, data, len) < len);
+  fail_unless (strncmp (data, numbers_concat, len) == 0);
+
+  teardown_sink_with_socket (&tsas);
+}
+
+GST_END_TEST;
+
 /* from the given two data buffers, create two streamheader buffers and
  * some caps that match it, and store them in the given pointers
  * returns  one ref to each of the buffers and the caps */
@@ -874,6 +949,7 @@ multisocketsink_suite (void)
   suite_add_tcase (s, tc_chain);
   tcase_add_test (tc_chain, test_no_clients);
   tcase_add_test (tc_chain, test_add_client);
+  tcase_add_test (tc_chain, test_sending_buffers_with_9_gstmemorys);
   tcase_add_test (tc_chain, test_streamheader);
   tcase_add_test (tc_chain, test_change_streamheader);
   tcase_add_test (tc_chain, test_burst_client_bytes);