From: Tim-Philipp Müller Date: Tue, 9 Sep 2014 12:46:56 +0000 (+0100) Subject: udpsrc: more efficient memory handling X-Git-Tag: 1.6.0~937 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=e6f77948acb5fdd5bac673a4ad669ed4360a6952;p=platform%2Fupstream%2Fgst-plugins-good.git udpsrc: more efficient memory handling Drop use of g_socket_get_available_bytes() which is not useful on all systems (where it returns the size of the entire buffer not that of the next pending packet), and is yet another syscall and apparently very inefficient on Windows in the UDP case. Instead, when reading UDP packets, use the more featureful g_socket_receive_message() call that allows to read into scattered memory, and allocate one memory chunk which is likely to be large enough for a packet, while also providing a larger allocated memory chunk just in case the packet is larger than expected. If the received data fits into the first chunk, we'll just add that to the buffer we return and re-use the fallback buffer for next time, otherwise we add both chunks to the buffer. This reduces memory waste more reliably on systems where get_available_bytes() doesn't work properly. In a multimedia streaming scenario, incoming UDP packets are almost never fragmented and thus almost always smaller than the MTU size, which is also why we don't try to do something smarter with more fallback memory chunks of different sizes. The fallback scenario is just for when someone built a broken sender pipeline (not using a payloader or somesuch) https://bugzilla.gnome.org/show_bug.cgi?id=610364 --- diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index f1ac223..c6e41d4 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -503,16 +503,14 @@ gst_udpsrc_ensure_mem (GstUDPSrc * src) static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) { - GstFlowReturn ret; GstUDPSrc *udpsrc; GstBuffer *outbuf = NULL; - GstMapInfo info; GSocketAddress *saddr = NULL; - gsize offset; - gssize readsize; - gssize res; + gint flags = G_SOCKET_MSG_NONE; gboolean try_again; GError *err = NULL; + gssize res; + gsize offset; udpsrc = GST_UDPSRC_CAST (psrc); @@ -520,10 +518,6 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) goto memory_alloc_error; retry: - /* quick check, avoid going in select when we already have data */ - readsize = g_socket_get_available_bytes (udpsrc->used_socket); - if (readsize > 0) - goto no_select; do { gint64 timeout; @@ -557,63 +551,20 @@ retry: } } while (G_UNLIKELY (try_again)); - /* ask how much is available for reading on the socket, this should be exactly - * one UDP packet. We will check the return value, though, because in some - * case it can return 0 and we don't want a 0 sized buffer. */ - readsize = g_socket_get_available_bytes (udpsrc->used_socket); - if (G_UNLIKELY (readsize < 0)) - goto get_available_error; - - /* If we get here and the readsize is zero, then either select was woken up - * by activity that is not a read, or a poll error occurred, or a UDP packet - * was received that has no data. Since we cannot identify which case it is, - * we handle all of them. This could possibly lead to a UDP packet getting - * lost, but since UDP is not reliable, we can accept this. */ - if (G_UNLIKELY (!readsize)) { - /* try to read a packet (and it will be ignored), - * in case a packet with no data arrived */ - res = - g_socket_receive_from (udpsrc->used_socket, NULL, NULL, - 0, udpsrc->cancellable, &err); - if (G_UNLIKELY (res < 0)) - goto receive_error; - - /* poll again */ - goto retry; - } - -no_select: - GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize); - - /* sanity check value from _get_available_bytes(), which might be as - * large as the kernel-side buffer on some operating systems */ - if (g_socket_get_family (udpsrc->used_socket) == G_SOCKET_FAMILY_IPV4) - readsize = MIN (MAX_IPV4_UDP_PACKET_SIZE, readsize); - - ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC_CAST (udpsrc), - -1, readsize, &outbuf); - if (ret != GST_FLOW_OK) - goto alloc_failed; - - gst_buffer_map (outbuf, &info, GST_MAP_WRITE); - offset = 0; - - if (saddr) + if (saddr != NULL) { g_object_unref (saddr); - saddr = NULL; + saddr = NULL; + } res = - g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) info.data, - info.size, udpsrc->cancellable, &err); + g_socket_receive_message (udpsrc->used_socket, &saddr, udpsrc->vec, 2, + NULL, NULL, &flags, udpsrc->cancellable, &err); if (G_UNLIKELY (res < 0)) { /* EHOSTUNREACH for a UDP socket means that a packet sent with udpsink * generated a "port unreachable" ICMP response. We ignore that and try * again. */ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE)) { - gst_buffer_unmap (outbuf, &info); - gst_buffer_unref (outbuf); - outbuf = NULL; g_clear_error (&err); goto retry; } @@ -624,30 +575,46 @@ no_select: if (res > udpsrc->max_size) udpsrc->max_size = res; - /* patch offset and size when stripping off the headers */ - if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) { - if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes)) - goto skip_error; + outbuf = gst_buffer_new (); + + /* append first memory chunk to buffer */ + gst_buffer_append_memory (outbuf, udpsrc->mem); - offset += udpsrc->skip_first_bytes; - res -= udpsrc->skip_first_bytes; + /* if the packet didn't fit into the first chunk, add second one as well */ + if (res > udpsrc->map.size) { + gst_buffer_append_memory (outbuf, udpsrc->mem_max); + gst_memory_unmap (udpsrc->mem_max, &udpsrc->map_max); + udpsrc->vec[1].buffer = NULL; + udpsrc->vec[1].size = 0; + udpsrc->mem_max = NULL; } - gst_buffer_unmap (outbuf, &info); - gst_buffer_resize (outbuf, offset, res); + /* make sure we allocate a new chunk next time (we do this only here because + * we look at map.size to see if the second memory chunk is needed above) */ + gst_memory_unmap (udpsrc->mem, &udpsrc->map); + udpsrc->vec[0].buffer = NULL; + udpsrc->vec[0].size = 0; + udpsrc->mem = NULL; + + offset = udpsrc->skip_first_bytes; + + if (G_UNLIKELY (offset > 0 && res < offset)) + goto skip_error; + + gst_buffer_resize (outbuf, offset, res - offset); /* use buffer metadata so receivers can also track the address */ if (saddr) { gst_buffer_add_net_address_meta (outbuf, saddr); g_object_unref (saddr); + saddr = NULL; } - saddr = NULL; - GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize); + GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res); *buf = GST_BUFFER_CAST (outbuf); - return ret; + return GST_FLOW_OK; /* ERRORS */ memory_alloc_error: @@ -669,24 +636,8 @@ stopped: g_clear_error (&err); return GST_FLOW_FLUSHING; } -get_available_error: - { - GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), - ("get available bytes failed")); - return GST_FLOW_ERROR; - } -alloc_failed: - { - GST_DEBUG ("Allocation failed"); - return ret; - } receive_error: { - if (outbuf != NULL) { - gst_buffer_unmap (outbuf, &info); - gst_buffer_unref (outbuf); - } - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); @@ -700,7 +651,6 @@ receive_error: } skip_error: { - gst_buffer_unmap (outbuf, &info); gst_buffer_unref (outbuf); GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),