+ avail = g_socket_get_available_bytes (src->socket);
+ if (avail < 0) {
+ goto get_available_error;
+ } else if (avail == 0) {
+ GIOCondition condition;
+
+ if (!g_socket_condition_wait (src->socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+ goto select_error;
+
+ condition =
+ g_socket_condition_check (src->socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+ if ((condition & G_IO_ERR)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Socket in error state"));
+ *outbuf = NULL;
+ ret = GST_FLOW_ERROR;
+ goto done;
+ } else if ((condition & G_IO_HUP)) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ *outbuf = NULL;
+ ret = GST_FLOW_EOS;
+ goto done;
+ }
+ avail = g_socket_get_available_bytes (src->socket);
+ if (avail < 0)
+ goto get_available_error;
+ }
+
+ if (avail > 0) {
+ read = MIN (avail, MAX_READ_SIZE);
+ *outbuf = gst_buffer_new_and_alloc (read);
+ gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
+ rret =
+ g_socket_receive (src->socket, (gchar *) map.data, read,
+ src->cancellable, &err);
+ } else {
+ /* Connection closed */
+ *outbuf = NULL;
+ read = 0;
+ rret = 0;
+ }
+
+ if (rret == 0) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ ret = GST_FLOW_EOS;
+ if (*outbuf) {
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_unref (*outbuf);
+ }
+ *outbuf = NULL;
+ } else if (rret < 0) {
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ ret = GST_FLOW_FLUSHING;
+ GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
+ } else {
+ ret = GST_FLOW_ERROR;
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Failed to read from socket: %s", err->message));
+ }
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_unref (*outbuf);
+ *outbuf = NULL;
+ } else {
+ ret = GST_FLOW_OK;
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_resize (*outbuf, 0, rret);