static GstFlowReturn
gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
{
- GstFlowReturn ret;
GstUDPSrc *udpsrc;
GstBuffer *outbuf = NULL;
- GstMapInfo info;
GSocketAddress *saddr = NULL;
- gsize offset;
- gssize readsize;
- gssize res;
+ gint flags = G_SOCKET_MSG_NONE;
gboolean try_again;
GError *err = NULL;
+ gssize res;
+ gsize offset;
udpsrc = GST_UDPSRC_CAST (psrc);
goto memory_alloc_error;
retry:
- /* quick check, avoid going in select when we already have data */
- readsize = g_socket_get_available_bytes (udpsrc->used_socket);
- if (readsize > 0)
- goto no_select;
do {
gint64 timeout;
}
} while (G_UNLIKELY (try_again));
- /* ask how much is available for reading on the socket, this should be exactly
- * one UDP packet. We will check the return value, though, because in some
- * case it can return 0 and we don't want a 0 sized buffer. */
- readsize = g_socket_get_available_bytes (udpsrc->used_socket);
- if (G_UNLIKELY (readsize < 0))
- goto get_available_error;
-
- /* If we get here and the readsize is zero, then either select was woken up
- * by activity that is not a read, or a poll error occurred, or a UDP packet
- * was received that has no data. Since we cannot identify which case it is,
- * we handle all of them. This could possibly lead to a UDP packet getting
- * lost, but since UDP is not reliable, we can accept this. */
- if (G_UNLIKELY (!readsize)) {
- /* try to read a packet (and it will be ignored),
- * in case a packet with no data arrived */
- res =
- g_socket_receive_from (udpsrc->used_socket, NULL, NULL,
- 0, udpsrc->cancellable, &err);
- if (G_UNLIKELY (res < 0))
- goto receive_error;
-
- /* poll again */
- goto retry;
- }
-
-no_select:
- GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
-
- /* sanity check value from _get_available_bytes(), which might be as
- * large as the kernel-side buffer on some operating systems */
- if (g_socket_get_family (udpsrc->used_socket) == G_SOCKET_FAMILY_IPV4)
- readsize = MIN (MAX_IPV4_UDP_PACKET_SIZE, readsize);
-
- ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC_CAST (udpsrc),
- -1, readsize, &outbuf);
- if (ret != GST_FLOW_OK)
- goto alloc_failed;
-
- gst_buffer_map (outbuf, &info, GST_MAP_WRITE);
- offset = 0;
-
- if (saddr)
+ if (saddr != NULL) {
g_object_unref (saddr);
- saddr = NULL;
+ saddr = NULL;
+ }
res =
- g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) info.data,
- info.size, udpsrc->cancellable, &err);
+ g_socket_receive_message (udpsrc->used_socket, &saddr, udpsrc->vec, 2,
+ NULL, NULL, &flags, udpsrc->cancellable, &err);
if (G_UNLIKELY (res < 0)) {
/* EHOSTUNREACH for a UDP socket means that a packet sent with udpsink
* generated a "port unreachable" ICMP response. We ignore that and try
* again. */
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE)) {
- gst_buffer_unmap (outbuf, &info);
- gst_buffer_unref (outbuf);
- outbuf = NULL;
g_clear_error (&err);
goto retry;
}
if (res > udpsrc->max_size)
udpsrc->max_size = res;
- /* patch offset and size when stripping off the headers */
- if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
- if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes))
- goto skip_error;
+ outbuf = gst_buffer_new ();
+
+ /* append first memory chunk to buffer */
+ gst_buffer_append_memory (outbuf, udpsrc->mem);
- offset += udpsrc->skip_first_bytes;
- res -= udpsrc->skip_first_bytes;
+ /* if the packet didn't fit into the first chunk, add second one as well */
+ if (res > udpsrc->map.size) {
+ gst_buffer_append_memory (outbuf, udpsrc->mem_max);
+ gst_memory_unmap (udpsrc->mem_max, &udpsrc->map_max);
+ udpsrc->vec[1].buffer = NULL;
+ udpsrc->vec[1].size = 0;
+ udpsrc->mem_max = NULL;
}
- gst_buffer_unmap (outbuf, &info);
- gst_buffer_resize (outbuf, offset, res);
+ /* make sure we allocate a new chunk next time (we do this only here because
+ * we look at map.size to see if the second memory chunk is needed above) */
+ gst_memory_unmap (udpsrc->mem, &udpsrc->map);
+ udpsrc->vec[0].buffer = NULL;
+ udpsrc->vec[0].size = 0;
+ udpsrc->mem = NULL;
+
+ offset = udpsrc->skip_first_bytes;
+
+ if (G_UNLIKELY (offset > 0 && res < offset))
+ goto skip_error;
+
+ gst_buffer_resize (outbuf, offset, res - offset);
/* use buffer metadata so receivers can also track the address */
if (saddr) {
gst_buffer_add_net_address_meta (outbuf, saddr);
g_object_unref (saddr);
+ saddr = NULL;
}
- saddr = NULL;
- GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
+ GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res);
*buf = GST_BUFFER_CAST (outbuf);
- return ret;
+ return GST_FLOW_OK;
/* ERRORS */
memory_alloc_error:
g_clear_error (&err);
return GST_FLOW_FLUSHING;
}
-get_available_error:
- {
- GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
- ("get available bytes failed"));
- return GST_FLOW_ERROR;
- }
-alloc_failed:
- {
- GST_DEBUG ("Allocation failed");
- return ret;
- }
receive_error:
{
- if (outbuf != NULL) {
- gst_buffer_unmap (outbuf, &info);
- gst_buffer_unref (outbuf);
- }
-
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
}
skip_error:
{
- gst_buffer_unmap (outbuf, &info);
gst_buffer_unref (outbuf);
GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),