udpsrc: more efficient memory handling
authorTim-Philipp Müller <tim@centricular.com>
Tue, 9 Sep 2014 12:46:56 +0000 (13:46 +0100)
committerTim-Philipp Müller <tim@centricular.com>
Tue, 9 Sep 2014 16:38:52 +0000 (17:38 +0100)
Drop use of g_socket_get_available_bytes() which is
not useful on all systems (where it returns the size
of the entire buffer not that of the next pending
packet), and is yet another syscall and apparently
very inefficient on Windows in the UDP case.

Instead, when reading UDP packets, use the more featureful
g_socket_receive_message() call that allows to read into
scattered memory, and allocate one memory chunk which is
likely to be large enough for a packet, while also providing
a larger allocated memory chunk just in case the packet
is larger than expected. If the received data fits into the
first chunk, we'll just add that to the buffer we return
and re-use the fallback buffer for next time, otherwise we
add both chunks to the buffer.

This reduces memory waste more reliably on systems where
get_available_bytes() doesn't work properly.

In a multimedia streaming scenario, incoming UDP packets
are almost never fragmented and thus almost always smaller
than the MTU size, which is also why we don't try to do
something smarter with more fallback memory chunks of
different sizes. The fallback scenario is just for when
someone built a broken sender pipeline (not using a
payloader or somesuch)

https://bugzilla.gnome.org/show_bug.cgi?id=610364

gst/udp/gstudpsrc.c

index f1ac223..c6e41d4 100644 (file)
@@ -503,16 +503,14 @@ gst_udpsrc_ensure_mem (GstUDPSrc * src)
 static GstFlowReturn
 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 {
-  GstFlowReturn ret;
   GstUDPSrc *udpsrc;
   GstBuffer *outbuf = NULL;
-  GstMapInfo info;
   GSocketAddress *saddr = NULL;
-  gsize offset;
-  gssize readsize;
-  gssize res;
+  gint flags = G_SOCKET_MSG_NONE;
   gboolean try_again;
   GError *err = NULL;
+  gssize res;
+  gsize offset;
 
   udpsrc = GST_UDPSRC_CAST (psrc);
 
@@ -520,10 +518,6 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
     goto memory_alloc_error;
 
 retry:
-  /* quick check, avoid going in select when we already have data */
-  readsize = g_socket_get_available_bytes (udpsrc->used_socket);
-  if (readsize > 0)
-    goto no_select;
 
   do {
     gint64 timeout;
@@ -557,63 +551,20 @@ retry:
     }
   } while (G_UNLIKELY (try_again));
 
-  /* ask how much is available for reading on the socket, this should be exactly
-   * one UDP packet. We will check the return value, though, because in some
-   * case it can return 0 and we don't want a 0 sized buffer. */
-  readsize = g_socket_get_available_bytes (udpsrc->used_socket);
-  if (G_UNLIKELY (readsize < 0))
-    goto get_available_error;
-
-  /* If we get here and the readsize is zero, then either select was woken up
-   * by activity that is not a read, or a poll error occurred, or a UDP packet
-   * was received that has no data. Since we cannot identify which case it is,
-   * we handle all of them. This could possibly lead to a UDP packet getting
-   * lost, but since UDP is not reliable, we can accept this. */
-  if (G_UNLIKELY (!readsize)) {
-    /* try to read a packet (and it will be ignored),
-     * in case a packet with no data arrived */
-    res =
-        g_socket_receive_from (udpsrc->used_socket, NULL, NULL,
-        0, udpsrc->cancellable, &err);
-    if (G_UNLIKELY (res < 0))
-      goto receive_error;
-
-    /* poll again */
-    goto retry;
-  }
-
-no_select:
-  GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
-
-  /* sanity check value from _get_available_bytes(), which might be as
-   * large as the kernel-side buffer on some operating systems */
-  if (g_socket_get_family (udpsrc->used_socket) == G_SOCKET_FAMILY_IPV4)
-    readsize = MIN (MAX_IPV4_UDP_PACKET_SIZE, readsize);
-
-  ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC_CAST (udpsrc),
-      -1, readsize, &outbuf);
-  if (ret != GST_FLOW_OK)
-    goto alloc_failed;
-
-  gst_buffer_map (outbuf, &info, GST_MAP_WRITE);
-  offset = 0;
-
-  if (saddr)
+  if (saddr != NULL) {
     g_object_unref (saddr);
-  saddr = NULL;
+    saddr = NULL;
+  }
 
   res =
-      g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) info.data,
-      info.size, udpsrc->cancellable, &err);
+      g_socket_receive_message (udpsrc->used_socket, &saddr, udpsrc->vec, 2,
+      NULL, NULL, &flags, udpsrc->cancellable, &err);
 
   if (G_UNLIKELY (res < 0)) {
     /* EHOSTUNREACH for a UDP socket means that a packet sent with udpsink
      * generated a "port unreachable" ICMP response. We ignore that and try
      * again. */
     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE)) {
-      gst_buffer_unmap (outbuf, &info);
-      gst_buffer_unref (outbuf);
-      outbuf = NULL;
       g_clear_error (&err);
       goto retry;
     }
@@ -624,30 +575,46 @@ no_select:
   if (res > udpsrc->max_size)
     udpsrc->max_size = res;
 
-  /* patch offset and size when stripping off the headers */
-  if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
-    if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes))
-      goto skip_error;
+  outbuf = gst_buffer_new ();
+
+  /* append first memory chunk to buffer */
+  gst_buffer_append_memory (outbuf, udpsrc->mem);
 
-    offset += udpsrc->skip_first_bytes;
-    res -= udpsrc->skip_first_bytes;
+  /* if the packet didn't fit into the first chunk, add second one as well */
+  if (res > udpsrc->map.size) {
+    gst_buffer_append_memory (outbuf, udpsrc->mem_max);
+    gst_memory_unmap (udpsrc->mem_max, &udpsrc->map_max);
+    udpsrc->vec[1].buffer = NULL;
+    udpsrc->vec[1].size = 0;
+    udpsrc->mem_max = NULL;
   }
 
-  gst_buffer_unmap (outbuf, &info);
-  gst_buffer_resize (outbuf, offset, res);
+  /* make sure we allocate a new chunk next time (we do this only here because
+   * we look at map.size to see if the second memory chunk is needed above) */
+  gst_memory_unmap (udpsrc->mem, &udpsrc->map);
+  udpsrc->vec[0].buffer = NULL;
+  udpsrc->vec[0].size = 0;
+  udpsrc->mem = NULL;
+
+  offset = udpsrc->skip_first_bytes;
+
+  if (G_UNLIKELY (offset > 0 && res < offset))
+    goto skip_error;
+
+  gst_buffer_resize (outbuf, offset, res - offset);
 
   /* use buffer metadata so receivers can also track the address */
   if (saddr) {
     gst_buffer_add_net_address_meta (outbuf, saddr);
     g_object_unref (saddr);
+    saddr = NULL;
   }
-  saddr = NULL;
 
-  GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
+  GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res);
 
   *buf = GST_BUFFER_CAST (outbuf);
 
-  return ret;
+  return GST_FLOW_OK;
 
   /* ERRORS */
 memory_alloc_error:
@@ -669,24 +636,8 @@ stopped:
     g_clear_error (&err);
     return GST_FLOW_FLUSHING;
   }
-get_available_error:
-  {
-    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
-        ("get available bytes failed"));
-    return GST_FLOW_ERROR;
-  }
-alloc_failed:
-  {
-    GST_DEBUG ("Allocation failed");
-    return ret;
-  }
 receive_error:
   {
-    if (outbuf != NULL) {
-      gst_buffer_unmap (outbuf, &info);
-      gst_buffer_unref (outbuf);
-    }
-
     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
         g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
       g_clear_error (&err);
@@ -700,7 +651,6 @@ receive_error:
   }
 skip_error:
   {
-    gst_buffer_unmap (outbuf, &info);
     gst_buffer_unref (outbuf);
 
     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),