socketsrc: Refactor to simplify
authorWilliam Manley <will@williammanley.net>
Fri, 13 Mar 2015 13:30:48 +0000 (13:30 +0000)
committerWim Taymans <wtaymans@redhat.com>
Fri, 13 Mar 2015 19:05:00 +0000 (20:05 +0100)
* Don't bother polling, just do a blocking read, the `GCancellable` will
  take care of unlocking.  This should also be faster on MS Windows where
  the GIO documentation for `g_socket_get_available_bytes` states: "Note
  that on Windows, this function is rather inefficient in the UDP case".

* Implement `GstPushSrc.fill` rather than `GstPushSrc.create`.  This means
  that we will be using the downstream allocator which may be more
  efficient.  It also means that socketsrc is likely to respect its
  "blocksize" property (assuming that there is enough data available).

See https://bugzilla.gnome.org/show_bug.cgi?id=739546

gst/tcp/gstsocketsrc.c

index cb70254..7263e40 100644 (file)
@@ -76,8 +76,8 @@ G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
 
 static void gst_socket_src_finalize (GObject * gobject);
 
-static GstFlowReturn gst_socket_src_create (GstPushSrc * psrc,
-    GstBuffer ** outbuf);
+static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
+    GstBuffer * outbuf);
 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
 
@@ -120,7 +120,7 @@ gst_socket_src_class_init (GstSocketSrcClass * klass)
   gstbasesrc_class->unlock = gst_socket_src_unlock;
   gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
 
-  gstpush_src_class->create = gst_socket_src_create;
+  gstpush_src_class->fill = gst_socket_src_fill;
 
   GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
 }
@@ -148,14 +148,13 @@ gst_socket_src_finalize (GObject * gobject)
 }
 
 static GstFlowReturn
-gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
+gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
 {
   GstSocketSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
   gssize rret;
   GError *err = NULL;
   GstMapInfo map;
-  gssize avail, read;
   GSocket *socket;
 
   src = GST_SOCKET_SRC (psrc);
@@ -173,60 +172,14 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 
   GST_LOG_OBJECT (src, "asked for a buffer");
 
-  /* read the buffer header */
-  avail = g_socket_get_available_bytes (socket);
-  if (avail < 0) {
-    goto get_available_error;
-  } else if (avail == 0) {
-    GIOCondition condition;
-
-    if (!g_socket_condition_wait (socket,
-            G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
-      goto select_error;
-
-    condition =
-        g_socket_condition_check (socket,
-        G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
-
-    if ((condition & G_IO_ERR)) {
-      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-          ("Socket in error state"));
-      *outbuf = NULL;
-      ret = GST_FLOW_ERROR;
-      goto done;
-    } else if ((condition & G_IO_HUP)) {
-      GST_DEBUG_OBJECT (src, "Connection closed");
-      *outbuf = NULL;
-      ret = GST_FLOW_EOS;
-      goto done;
-    }
-    avail = g_socket_get_available_bytes (socket);
-    if (avail < 0)
-      goto get_available_error;
-  }
-
-  if (avail > 0) {
-    read = MIN (avail, MAX_READ_SIZE);
-    *outbuf = gst_buffer_new_and_alloc (read);
-    gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
-    rret =
-        g_socket_receive (socket, (gchar *) map.data, read,
-        src->cancellable, &err);
-  } else {
-    /* Connection closed */
-    *outbuf = NULL;
-    read = 0;
-    rret = 0;
-  }
+  gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
+  rret = g_socket_receive_with_blocking (socket, (gchar *) map.data,
+      map.size, TRUE, src->cancellable, &err);
+  gst_buffer_unmap (outbuf, &map);
 
   if (rret == 0) {
     GST_DEBUG_OBJECT (src, "Connection closed");
     ret = GST_FLOW_EOS;
-    if (*outbuf) {
-      gst_buffer_unmap (*outbuf, &map);
-      gst_buffer_unref (*outbuf);
-    }
-    *outbuf = NULL;
   } else if (rret < 0) {
     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
       ret = GST_FLOW_FLUSHING;
@@ -236,50 +189,24 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
           ("Failed to read from socket: %s", err->message));
     }
-    gst_buffer_unmap (*outbuf, &map);
-    gst_buffer_unref (*outbuf);
-    *outbuf = NULL;
   } else {
     ret = GST_FLOW_OK;
-    gst_buffer_unmap (*outbuf, &map);
-    gst_buffer_resize (*outbuf, 0, rret);
+    gst_buffer_resize (outbuf, 0, rret);
 
     GST_LOG_OBJECT (src,
         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
-        gst_buffer_get_size (*outbuf),
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
-        GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
-        GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+        gst_buffer_get_size (outbuf),
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
+        GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
+        GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
   }
   g_clear_error (&err);
 
-done:
   g_object_unref (socket);
   return ret;
 
-select_error:
-  {
-    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-      GST_DEBUG_OBJECT (src, "Cancelled");
-      ret = GST_FLOW_FLUSHING;
-    } else {
-      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-          ("Select failed: %s", err->message));
-      ret = GST_FLOW_ERROR;
-    }
-    g_clear_error (&err);
-    g_object_unref (socket);
-    return ret;
-  }
-get_available_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("Failed to get available bytes from socket"));
-    g_object_unref (socket);
-    return GST_FLOW_ERROR;
-  }
 no_socket:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),