* to write to us except for closing the socket, I guess it's because we
* like to listen to our customers. */
do {
+ gssize navail;
+
GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
client->socket);
+ navail = g_socket_get_available_bytes (client->socket);
+ if (navail < 0)
+ break;
+
nread =
- g_socket_receive (client->socket, dummy, sizeof (dummy),
+ g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
sink->cancellable, &err);
if (first && nread == 0) {
/* client sent close, so remove it */
gssize rret;
GError *err = NULL;
guint8 *data;
+ gssize avail, read;
src = GST_TCP_CLIENT_SRC (psrc);
GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header */
- *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
+ avail = g_socket_get_available_bytes (src->socket);
+ if (avail < 0) {
+ goto get_available_error;
+ } else if (avail == 0) {
+ GIOCondition condition;
+
+ if (!g_socket_condition_wait (src->socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+ goto select_error;
+
+ condition =
+ g_socket_condition_check (src->socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+ if ((condition & G_IO_ERR)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Socket in error state"));
+ *outbuf = NULL;
+ ret = GST_FLOW_ERROR;
+ goto done;
+ } else if ((condition & G_IO_HUP)) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ *outbuf = NULL;
+ ret = GST_FLOW_EOS;
+ goto done;
+ }
+ avail = g_socket_get_available_bytes (src->socket);
+ if (avail <= 0)
+ goto get_available_error;
+ }
+
+ read = MIN (avail, MAX_READ_SIZE);
+ *outbuf = gst_buffer_new_and_alloc (read);
data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
rret =
- g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE,
+ g_socket_receive (src->socket, (gchar *) data, read,
src->cancellable, &err);
if (rret == 0) {
GST_DEBUG_OBJECT (src, "Connection closed");
ret = GST_FLOW_EOS;
- gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+ gst_buffer_unmap (*outbuf, data, read);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else if (ret < 0) {
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to read from socket: %s", err->message));
}
- gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+ gst_buffer_unmap (*outbuf, data, read);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else {
}
g_clear_error (&err);
+done:
return ret;
+select_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Select failed: %s", err->message));
+ g_clear_error (&err);
+ return GST_FLOW_ERROR;
+ }
+get_available_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Select to get available bytes from socket"));
+ return GST_FLOW_ERROR;
+ }
wrong_state:
{
GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
{
GstTCPServerSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
- gssize rret;
+ gssize rret, avail;
+ gsize read;
GError *err = NULL;
guint8 *data;
GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header */
- *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
+ avail = g_socket_get_available_bytes (src->client_socket);
+ if (avail < 0) {
+ goto get_available_error;
+ } else if (avail == 0) {
+ GIOCondition condition;
+
+ if (!g_socket_condition_wait (src->client_socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+ goto select_error;
+
+ condition =
+ g_socket_condition_check (src->client_socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+ if ((condition & G_IO_ERR)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Socket in error state"));
+ *outbuf = NULL;
+ ret = GST_FLOW_ERROR;
+ goto done;
+ } else if ((condition & G_IO_HUP)) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ *outbuf = NULL;
+ ret = GST_FLOW_EOS;
+ goto done;
+ }
+ avail = g_socket_get_available_bytes (src->client_socket);
+ if (avail <= 0)
+ goto get_available_error;
+ }
+
+ read = MIN (avail, MAX_READ_SIZE);
+ *outbuf = gst_buffer_new_and_alloc (read);
data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
rret =
- g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE,
+ g_socket_receive (src->client_socket, (gchar *) data, read,
src->cancellable, &err);
if (rret == 0) {
}
g_clear_error (&err);
+done:
return ret;
wrong_state:
g_clear_error (&err);
return GST_FLOW_ERROR;
}
+select_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Select failed: %s", err->message));
+ g_clear_error (&err);
+ return GST_FLOW_ERROR;
+ }
+get_available_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Select to get available bytes from socket"));
+ return GST_FLOW_ERROR;
+ }
}
static void