*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
- gst_element_class_set_details_simple (gstelement_class,
+ gst_element_class_set_static_metadata (gstelement_class,
"TCP client source", "Source/Network",
"Receive data as a client over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
GstFlowReturn ret = GST_FLOW_OK;
gssize rret;
GError *err = NULL;
- guint8 *data;
+ GstMapInfo map;
+ gssize avail, read;
src = GST_TCP_CLIENT_SRC (psrc);
GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header */
- *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
- data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
- rret =
- g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE,
- src->cancellable, &err);
+ avail = g_socket_get_available_bytes (src->socket);
+ if (avail < 0) {
+ goto get_available_error;
+ } else if (avail == 0) {
+ GIOCondition condition;
+
+ if (!g_socket_condition_wait (src->socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+ goto select_error;
+
+ condition =
+ g_socket_condition_check (src->socket,
+ G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+ if ((condition & G_IO_ERR)) {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Socket in error state"));
+ *outbuf = NULL;
+ ret = GST_FLOW_ERROR;
+ goto done;
+ } else if ((condition & G_IO_HUP)) {
+ GST_DEBUG_OBJECT (src, "Connection closed");
+ *outbuf = NULL;
+ ret = GST_FLOW_EOS;
+ goto done;
+ }
+ avail = g_socket_get_available_bytes (src->socket);
+ if (avail < 0)
+ goto get_available_error;
+ }
+
+ if (avail > 0) {
+ read = MIN (avail, MAX_READ_SIZE);
+ *outbuf = gst_buffer_new_and_alloc (read);
+ gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
+ rret =
+ g_socket_receive (src->socket, (gchar *) map.data, read,
+ src->cancellable, &err);
+ } else {
+ /* Connection closed */
+ *outbuf = NULL;
+ read = 0;
+ rret = 0;
+ }
if (rret == 0) {
GST_DEBUG_OBJECT (src, "Connection closed");
ret = GST_FLOW_EOS;
- gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
- gst_buffer_unref (*outbuf);
+ if (*outbuf) {
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_unref (*outbuf);
+ }
*outbuf = NULL;
- } else if (ret < 0) {
+ } else if (rret < 0) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- ret = GST_FLOW_WRONG_STATE;
+ ret = GST_FLOW_FLUSHING;
GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
} else {
ret = GST_FLOW_ERROR;
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Failed to read from socket: %s", err->message));
}
- gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+ gst_buffer_unmap (*outbuf, &map);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
} else {
ret = GST_FLOW_OK;
- gst_buffer_unmap (*outbuf, data, rret);
+ gst_buffer_unmap (*outbuf, &map);
+ gst_buffer_resize (*outbuf, 0, rret);
GST_LOG_OBJECT (src,
"Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
}
g_clear_error (&err);
+done:
return ret;
+select_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Select failed: %s", err->message));
+ g_clear_error (&err);
+ return GST_FLOW_ERROR;
+ }
+get_available_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Failed to get available bytes from socket"));
+ return GST_FLOW_ERROR;
+ }
wrong_state:
{
GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
- return GST_FLOW_WRONG_STATE;
+ return GST_FLOW_FLUSHING;
}
}
GSocketAddress *saddr;
GResolver *resolver;
- /* create receiving client socket */
- GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
- src->host, src->port);
-
- src->socket =
- g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
- G_SOCKET_PROTOCOL_TCP, &err);
- if (!src->socket)
- goto no_socket;
-
- GST_DEBUG_OBJECT (src, "opened receiving client socket");
- GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
-
/* look up name if we need to */
addr = g_inet_address_new_from_string (src->host);
if (!addr) {
}
#endif
- /* connect to server */
saddr = g_inet_socket_address_new (addr, src->port);
+ g_object_unref (addr);
+
+ /* create receiving client socket */
+ GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
+ src->host, src->port);
+
+ src->socket =
+ g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
+ G_SOCKET_PROTOCOL_TCP, &err);
+ if (!src->socket)
+ goto no_socket;
+
+ GST_DEBUG_OBJECT (src, "opened receiving client socket");
+ GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
+
+ /* connect to server */
if (!g_socket_connect (src->socket, saddr, src->cancellable, &err))
goto connect_failed;
g_object_unref (saddr);
- g_object_unref (addr);
return TRUE;
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Failed to create socket: %s", err->message));
g_clear_error (&err);
+ g_object_unref (saddr);
return FALSE;
}
name_resolve:
}
g_clear_error (&err);
g_object_unref (resolver);
- gst_tcp_client_src_stop (GST_BASE_SRC (src));
return FALSE;
}
connect_failed:
}
g_clear_error (&err);
g_object_unref (saddr);
- g_object_unref (addr);
gst_tcp_client_src_stop (GST_BASE_SRC (src));
return FALSE;
}