From 0c054aa00d99f02a73e83261ccb08f237c5e7b70 Mon Sep 17 00:00:00 2001 From: William Manley Date: Fri, 13 Mar 2015 13:30:48 +0000 Subject: [PATCH] socketsrc: Refactor to simplify * Don't bother polling, just do a blocking read, the `GCancellable` will take care of unlocking. This should also be faster on MS Windows where the GIO documentation for `g_socket_get_available_bytes` states: "Note that on Windows, this function is rather inefficient in the UDP case". * Implement `GstPushSrc.fill` rather than `GstPushSrc.create`. This means that we will be using the downstream allocator which may be more efficient. It also means that socketsrc is likely to respect its "blocksize" property (assuming that there is enough data available). See https://bugzilla.gnome.org/show_bug.cgi?id=739546 --- gst/tcp/gstsocketsrc.c | 99 +++++++------------------------------------------- 1 file changed, 13 insertions(+), 86 deletions(-) diff --git a/gst/tcp/gstsocketsrc.c b/gst/tcp/gstsocketsrc.c index cb70254..7263e40 100644 --- a/gst/tcp/gstsocketsrc.c +++ b/gst/tcp/gstsocketsrc.c @@ -76,8 +76,8 @@ G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC); static void gst_socket_src_finalize (GObject * gobject); -static GstFlowReturn gst_socket_src_create (GstPushSrc * psrc, - GstBuffer ** outbuf); +static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc, + GstBuffer * outbuf); static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc); static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc); @@ -120,7 +120,7 @@ gst_socket_src_class_init (GstSocketSrcClass * klass) gstbasesrc_class->unlock = gst_socket_src_unlock; gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop; - gstpush_src_class->create = gst_socket_src_create; + gstpush_src_class->fill = gst_socket_src_fill; GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source"); } @@ -148,14 +148,13 @@ gst_socket_src_finalize (GObject * gobject) } static GstFlowReturn -gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) +gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf) { GstSocketSrc *src; GstFlowReturn ret = GST_FLOW_OK; gssize rret; GError *err = NULL; GstMapInfo map; - gssize avail, read; GSocket *socket; src = GST_SOCKET_SRC (psrc); @@ -173,60 +172,14 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_LOG_OBJECT (src, "asked for a buffer"); - /* read the buffer header */ - avail = g_socket_get_available_bytes (socket); - if (avail < 0) { - goto get_available_error; - } else if (avail == 0) { - GIOCondition condition; - - if (!g_socket_condition_wait (socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err)) - goto select_error; - - condition = - g_socket_condition_check (socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); - - if ((condition & G_IO_ERR)) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Socket in error state")); - *outbuf = NULL; - ret = GST_FLOW_ERROR; - goto done; - } else if ((condition & G_IO_HUP)) { - GST_DEBUG_OBJECT (src, "Connection closed"); - *outbuf = NULL; - ret = GST_FLOW_EOS; - goto done; - } - avail = g_socket_get_available_bytes (socket); - if (avail < 0) - goto get_available_error; - } - - if (avail > 0) { - read = MIN (avail, MAX_READ_SIZE); - *outbuf = gst_buffer_new_and_alloc (read); - gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE); - rret = - g_socket_receive (socket, (gchar *) map.data, read, - src->cancellable, &err); - } else { - /* Connection closed */ - *outbuf = NULL; - read = 0; - rret = 0; - } + gst_buffer_map (outbuf, &map, GST_MAP_READWRITE); + rret = g_socket_receive_with_blocking (socket, (gchar *) map.data, + map.size, TRUE, src->cancellable, &err); + gst_buffer_unmap (outbuf, &map); if (rret == 0) { GST_DEBUG_OBJECT (src, "Connection closed"); ret = GST_FLOW_EOS; - if (*outbuf) { - gst_buffer_unmap (*outbuf, &map); - gst_buffer_unref (*outbuf); - } - *outbuf = NULL; } else if (rret < 0) { if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { ret = GST_FLOW_FLUSHING; @@ -236,50 +189,24 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Failed to read from socket: %s", err->message)); } - gst_buffer_unmap (*outbuf, &map); - gst_buffer_unref (*outbuf); - *outbuf = NULL; } else { ret = GST_FLOW_OK; - gst_buffer_unmap (*outbuf, &map); - gst_buffer_resize (*outbuf, 0, rret); + gst_buffer_resize (outbuf, 0, rret); GST_LOG_OBJECT (src, "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, - gst_buffer_get_size (*outbuf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), - GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); + gst_buffer_get_size (outbuf), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)), + GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf)); } g_clear_error (&err); -done: g_object_unref (socket); return ret; -select_error: - { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - GST_DEBUG_OBJECT (src, "Cancelled"); - ret = GST_FLOW_FLUSHING; - } else { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Select failed: %s", err->message)); - ret = GST_FLOW_ERROR; - } - g_clear_error (&err); - g_object_unref (socket); - return ret; - } -get_available_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Failed to get available bytes from socket")); - g_object_unref (socket); - return GST_FLOW_ERROR; - } no_socket: { GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), -- 2.7.4