udpsink: handle scather gather from buffers
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 5 Apr 2011 17:15:11 +0000 (19:15 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 5 Apr 2011 17:15:11 +0000 (19:15 +0200)
Iterate the memory blocks on the buffer and send them using sendmsg.

gst/udp/gstmultiudpsink.c

index 19972d7..2a677fc 100644 (file)
@@ -488,6 +488,8 @@ socket_last_error_message (void)
 #endif
 }
 
+#ifdef G_OS_WIN32
+/* version without sendmsg */
 static GstFlowReturn
 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
 {
@@ -565,106 +567,123 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
 
   return GST_FLOW_OK;
 }
-
-#if 0
-#ifndef G_OS_WIN32
+#else /* !G_OS_WIN32 */
+/* version with sendmsg */
 static GstFlowReturn
-gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list)
+gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
 {
   GstMultiUDPSink *sink;
   GList *clients;
   gint ret, size = 0, num = 0, no_clients = 0;
   struct iovec *iov;
   struct msghdr msg = { 0 };
-  GstBufferListIterator *it;
-  guint gsize;
-  GstBuffer *buf;
+  guint n_mem, i;
+  gpointer bdata;
+  gsize bsize;
+  GstMemory *mem;
 
   sink = GST_MULTIUDPSINK (bsink);
 
-  g_return_val_if_fail (list != NULL, GST_FLOW_ERROR);
+  msg.msg_iovlen = 0;
+  size = 0;
+
+  n_mem = gst_buffer_n_memory (buffer);
+  if (n_mem == 0)
+    goto no_data;
 
-  it = gst_buffer_list_iterate (list);
-  g_return_val_if_fail (it != NULL, GST_FLOW_ERROR);
+  iov = (struct iovec *) g_malloc (n_mem * sizeof (struct iovec));
+  msg.msg_iov = iov;
 
-  while (gst_buffer_list_iterator_next_group (it)) {
-    msg.msg_iovlen = 0;
-    size = 0;
+  for (i = 0; i < n_mem; i++) {
+    mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ);
+    bdata = gst_memory_map (mem, &bsize, NULL, GST_MAP_READ);
 
-    if ((gsize = gst_buffer_list_iterator_n_buffers (it)) == 0) {
-      goto invalid_list;
+    if (bsize > UDP_MAX_SIZE) {
+      GST_WARNING ("Attempting to send a UDP packet larger than maximum "
+          "size (%d > %d)", bsize, UDP_MAX_SIZE);
     }
 
-    iov = (struct iovec *) g_malloc (gsize * sizeof (struct iovec));
-    msg.msg_iov = iov;
+    msg.msg_iov[msg.msg_iovlen].iov_len = bsize;
+    msg.msg_iov[msg.msg_iovlen].iov_base = bdata;
+    msg.msg_iovlen++;
 
-    while ((buf = gst_buffer_list_iterator_next (it))) {
-      if (GST_BUFFER_SIZE (buf) > UDP_MAX_SIZE) {
-        GST_WARNING ("Attempting to send a UDP packet larger than maximum "
-            "size (%d > %d)", GST_BUFFER_SIZE (buf), UDP_MAX_SIZE);
-      }
+    size += bsize;
+  }
 
-      msg.msg_iov[msg.msg_iovlen].iov_len = GST_BUFFER_SIZE (buf);
-      msg.msg_iov[msg.msg_iovlen].iov_base = GST_BUFFER_DATA (buf);
-      msg.msg_iovlen++;
-      size += GST_BUFFER_SIZE (buf);
-    }
+  sink->bytes_to_serve += size;
+
+  /* grab lock while iterating and sending to clients, this should be
+   * fast as UDP never blocks */
+  g_mutex_lock (sink->client_lock);
+  GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
 
-    sink->bytes_to_serve += size;
+  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+    GstUDPClient *client;
+    gint count;
 
-    /* grab lock while iterating and sending to clients, this should be
-     * fast as UDP never blocks */
-    g_mutex_lock (sink->client_lock);
-    GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
-
-    for (clients = sink->clients; clients; clients = g_list_next (clients)) {
-      GstUDPClient *client;
-      gint count;
-
-      client = (GstUDPClient *) clients->data;
-      no_clients++;
-      GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
-
-      count = sink->send_duplicates ? client->refcount : 1;
-
-      while (count--) {
-        while (TRUE) {
-          msg.msg_name = (void *) &client->theiraddr;
-          msg.msg_namelen = sizeof (client->theiraddr);
-          ret = sendmsg (*client->sock, &msg, 0);
-
-          if (ret < 0) {
-            if (!socket_error_is_ignorable ()) {
-              break;
-            }
-          } else {
-            num++;
-            client->bytes_sent += ret;
-            client->packets_sent++;
-            sink->bytes_served += ret;
+    client = (GstUDPClient *) clients->data;
+    no_clients++;
+    GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
+
+    count = sink->send_duplicates ? client->refcount : 1;
+
+    while (count--) {
+      while (TRUE) {
+        msg.msg_name = (void *) &client->theiraddr;
+        msg.msg_namelen = sizeof (client->theiraddr);
+        ret = sendmsg (*client->sock, &msg, 0);
+
+        if (ret < 0) {
+          if (!socket_error_is_ignorable ()) {
+            gchar *errormessage = socket_last_error_message ();
+            GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client,
+                socket_last_error_code (), errormessage);
+            g_free (errormessage);
+            break;
             break;
           }
+        } else {
+          num++;
+          client->bytes_sent += ret;
+          client->packets_sent++;
+          sink->bytes_served += ret;
+          break;
         }
       }
     }
-    g_mutex_unlock (sink->client_lock);
+  }
+  g_mutex_unlock (sink->client_lock);
+
+  /* unmap all memory again */
+  for (i = 0; i < n_mem; i++) {
+    mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ);
 
-    g_free (iov);
-    msg.msg_iov = NULL;
+    bsize = msg.msg_iov[i].iov_len;
+    bdata = msg.msg_iov[i].iov_base;
 
-    GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
-        no_clients);
+    gst_memory_unmap (mem, bdata, bsize);
   }
+  g_free (iov);
 
-  gst_buffer_list_iterator_free (it);
+  GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
+      no_clients);
 
   return GST_FLOW_OK;
 
-invalid_list:
-  gst_buffer_list_iterator_free (it);
-  return GST_FLOW_ERROR;
+no_data:
+  {
+    return GST_FLOW_OK;
+  }
 }
 #endif
+
+#if 0
+/* DISABLED, core sends buffers to our render one by one, we can't really do
+ * much better */
+static GstFlowReturn
+gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list)
+{
+}
 #endif
 
 static void