From a649fe2d61f021e729373f3c48d9eea34ec9599e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 17 Jan 2012 11:44:20 +0100 Subject: [PATCH] tcp: Only read as much as is currently available from the socket --- gst/tcp/gstmultisocketsink.c | 8 ++++++- gst/tcp/gsttcpclientsrc.c | 55 ++++++++++++++++++++++++++++++++++++++++---- gst/tcp/gsttcpserversrc.c | 53 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 108 insertions(+), 8 deletions(-) diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index 4c6e0dd..110ef1c 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -1160,11 +1160,17 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, * to write to us except for closing the socket, I guess it's because we * like to listen to our customers. */ do { + gssize navail; + GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read", client->socket); + navail = g_socket_get_available_bytes (client->socket); + if (navail < 0) + break; + nread = - g_socket_receive (client->socket, dummy, sizeof (dummy), + g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)), sink->cancellable, &err); if (first && nread == 0) { /* client sent close, so remove it */ diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c index f8ed6b9..2ea6dc0 100644 --- a/gst/tcp/gsttcpclientsrc.c +++ b/gst/tcp/gsttcpclientsrc.c @@ -180,6 +180,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) gssize rret; GError *err = NULL; guint8 *data; + gssize avail, read; src = GST_TCP_CLIENT_SRC (psrc); @@ -189,16 +190,48 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_LOG_OBJECT (src, "asked for a buffer"); /* read the buffer header */ - *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE); + avail = g_socket_get_available_bytes (src->socket); + if (avail < 0) { + goto get_available_error; + } else if (avail == 0) { + GIOCondition condition; + + if (!g_socket_condition_wait (src->socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err)) + goto select_error; + + condition = + g_socket_condition_check (src->socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); + + if ((condition & G_IO_ERR)) { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Socket in error state")); + *outbuf = NULL; + ret = GST_FLOW_ERROR; + goto done; + } else if ((condition & G_IO_HUP)) { + GST_DEBUG_OBJECT (src, "Connection closed"); + *outbuf = NULL; + ret = GST_FLOW_EOS; + goto done; + } + avail = g_socket_get_available_bytes (src->socket); + if (avail <= 0) + goto get_available_error; + } + + read = MIN (avail, MAX_READ_SIZE); + *outbuf = gst_buffer_new_and_alloc (read); data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE); rret = - g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE, + g_socket_receive (src->socket, (gchar *) data, read, src->cancellable, &err); if (rret == 0) { GST_DEBUG_OBJECT (src, "Connection closed"); ret = GST_FLOW_EOS; - gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE); + gst_buffer_unmap (*outbuf, data, read); gst_buffer_unref (*outbuf); *outbuf = NULL; } else if (ret < 0) { @@ -210,7 +243,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Failed to read from socket: %s", err->message)); } - gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE); + gst_buffer_unmap (*outbuf, data, read); gst_buffer_unref (*outbuf); *outbuf = NULL; } else { @@ -228,8 +261,22 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) } g_clear_error (&err); +done: return ret; +select_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Select failed: %s", err->message)); + g_clear_error (&err); + return GST_FLOW_ERROR; + } +get_available_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Select to get available bytes from socket")); + return GST_FLOW_ERROR; + } wrong_state: { GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c index 0cdfdd2..a217e07 100644 --- a/gst/tcp/gsttcpserversrc.c +++ b/gst/tcp/gsttcpserversrc.c @@ -162,7 +162,8 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPServerSrc *src; GstFlowReturn ret = GST_FLOW_OK; - gssize rret; + gssize rret, avail; + gsize read; GError *err = NULL; guint8 *data; @@ -184,10 +185,42 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_LOG_OBJECT (src, "asked for a buffer"); /* read the buffer header */ - *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE); + avail = g_socket_get_available_bytes (src->client_socket); + if (avail < 0) { + goto get_available_error; + } else if (avail == 0) { + GIOCondition condition; + + if (!g_socket_condition_wait (src->client_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err)) + goto select_error; + + condition = + g_socket_condition_check (src->client_socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); + + if ((condition & G_IO_ERR)) { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Socket in error state")); + *outbuf = NULL; + ret = GST_FLOW_ERROR; + goto done; + } else if ((condition & G_IO_HUP)) { + GST_DEBUG_OBJECT (src, "Connection closed"); + *outbuf = NULL; + ret = GST_FLOW_EOS; + goto done; + } + avail = g_socket_get_available_bytes (src->client_socket); + if (avail <= 0) + goto get_available_error; + } + + read = MIN (avail, MAX_READ_SIZE); + *outbuf = gst_buffer_new_and_alloc (read); data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE); rret = - g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE, + g_socket_receive (src->client_socket, (gchar *) data, read, src->cancellable, &err); if (rret == 0) { @@ -223,6 +256,7 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) } g_clear_error (&err); +done: return ret; wrong_state: @@ -241,6 +275,19 @@ accept_error: g_clear_error (&err); return GST_FLOW_ERROR; } +select_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Select failed: %s", err->message)); + g_clear_error (&err); + return GST_FLOW_ERROR; + } +get_available_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Select to get available bytes from socket")); + return GST_FLOW_ERROR; + } } static void -- 2.7.4