2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
26 #include <gst/gst-i18n-plugin.h>
28 #include "gsttcpserversrc.h"
30 #include <sys/ioctl.h>
34 /* control stuff stolen from fdsrc */
35 #define CONTROL_STOP 'S' /* stop the select call */
36 #define CONTROL_SOCKETS(o) o->control_fds
37 #define WRITE_SOCKET(o) o->control_fds[1]
38 #define READ_SOCKET(o) o->control_fds[0]
40 #define SEND_COMMAND(o, command) \
42 unsigned char c; c = command; \
43 write (WRITE_SOCKET(o), &c, 1); \
46 #define READ_COMMAND(o, command, res) \
48 res = read(READ_SOCKET(o), &command, 1); \
52 GST_DEBUG_CATEGORY (tcpserversrc_debug);
53 #define GST_CAT_DEFAULT tcpserversrc_debug
55 #define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */
56 #define TCP_BACKLOG 1 /* client connection queue */
59 static GstElementDetails gst_tcp_server_src_details =
60 GST_ELEMENT_DETAILS ("TCP Server source",
62 "Receive data as a server over the network via TCP",
63 "Thomas Vander Stichele <thomas at apestaart dot org>");
65 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
80 GST_BOILERPLATE (GstTCPServerSrc, gst_tcp_server_src, GstPushSrc,
84 static void gst_tcp_server_src_finalize (GObject * gobject);
86 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
87 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
88 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
89 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
92 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
93 const GValue * value, GParamSpec * pspec);
94 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
95 GValue * value, GParamSpec * pspec);
99 gst_tcp_server_src_base_init (gpointer g_class)
101 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
103 gst_element_class_add_pad_template (element_class,
104 gst_static_pad_template_get (&srctemplate));
106 gst_element_class_set_details (element_class, &gst_tcp_server_src_details);
110 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
112 GObjectClass *gobject_class;
113 GstBaseSrcClass *gstbasesrc_class;
114 GstPushSrcClass *gstpush_src_class;
116 gobject_class = (GObjectClass *) klass;
117 gstbasesrc_class = (GstBaseSrcClass *) klass;
118 gstpush_src_class = (GstPushSrcClass *) klass;
120 gobject_class->set_property = gst_tcp_server_src_set_property;
121 gobject_class->get_property = gst_tcp_server_src_get_property;
122 gobject_class->finalize = gst_tcp_server_src_finalize;
124 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HOST,
125 g_param_spec_string ("host", "Host", "The hostname to listen as",
126 TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE));
127 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
128 g_param_spec_int ("port", "Port", "The port to listen to",
129 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
130 g_object_class_install_property (gobject_class, PROP_PROTOCOL,
131 g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
132 GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE, G_PARAM_READWRITE));
134 gstbasesrc_class->start = gst_tcp_server_src_start;
135 gstbasesrc_class->stop = gst_tcp_server_src_stop;
136 gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
138 gstpush_src_class->create = gst_tcp_server_src_create;
140 GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
141 "TCP Server Source");
145 gst_tcp_server_src_init (GstTCPServerSrc * src, GstTCPServerSrcClass * g_class)
147 src->server_port = TCP_DEFAULT_PORT;
148 src->host = g_strdup (TCP_DEFAULT_HOST);
149 src->server_sock_fd = -1;
150 src->client_sock_fd = -1;
151 src->protocol = GST_TCP_PROTOCOL_NONE;
153 READ_SOCKET (src) = -1;
154 WRITE_SOCKET (src) = -1;
156 GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
160 gst_tcp_server_src_finalize (GObject * gobject)
162 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
168 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
170 GstTCPServerSrc *src;
171 GstFlowReturn ret = GST_FLOW_OK;
175 src = GST_TCP_SERVER_SRC (psrc);
177 if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
181 /* do a blocking select on the socket */
184 /* always select on cancel socket */
185 FD_SET (READ_SOCKET (src), &testfds);
187 if (src->client_sock_fd >= 0) {
188 /* if we have a client, wait for read */
189 FD_SET (src->client_sock_fd, &testfds);
190 maxfdp1 = MAX (src->client_sock_fd, READ_SOCKET (src)) + 1;
192 /* else wait on server socket for connections */
193 FD_SET (src->server_sock_fd, &testfds);
194 maxfdp1 = MAX (src->server_sock_fd, READ_SOCKET (src)) + 1;
197 /* no action (0) is an error too in our case */
198 if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
201 if (FD_ISSET (READ_SOCKET (src), &testfds))
202 goto select_cancelled;
204 /* if we have no client socket we can accept one now */
205 if (src->client_sock_fd < 0) {
206 if (FD_ISSET (src->server_sock_fd, &testfds)) {
207 if ((src->client_sock_fd =
208 accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
209 &src->client_sin_len)) == -1)
212 /* and restart now to poll the socket. */
216 GST_LOG_OBJECT (src, "asked for a buffer");
218 switch (src->protocol) {
219 case GST_TCP_PROTOCOL_NONE:
220 ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
221 READ_SOCKET (src), outbuf);
224 case GST_TCP_PROTOCOL_GDP:
225 if (!src->caps_received) {
229 ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd,
230 READ_SOCKET (src), &caps);
232 if (ret == GST_FLOW_WRONG_STATE)
235 if (ret != GST_FLOW_OK)
236 goto gdp_caps_read_error;
238 src->caps_received = TRUE;
239 string = gst_caps_to_string (caps);
240 GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
243 gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
246 ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
247 READ_SOCKET (src), outbuf);
249 if (ret == GST_FLOW_OK)
250 gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
255 /* need to assert as buf == NULL */
256 g_assert ("Unhandled protocol type");
260 if (ret == GST_FLOW_OK) {
262 "Returning buffer from _get of size %d, ts %"
263 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
264 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
265 GST_BUFFER_SIZE (*outbuf),
266 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
267 GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
268 GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
275 GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
276 return GST_FLOW_WRONG_STATE;
280 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
281 ("Select error: %s", g_strerror (errno)));
282 return GST_FLOW_ERROR;
286 GST_DEBUG_OBJECT (src, "select canceled");
287 return GST_FLOW_WRONG_STATE;
291 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
292 ("Could not accept client on server socket: %s", g_strerror (errno)));
293 return GST_FLOW_ERROR;
297 GST_DEBUG_OBJECT (src, "reading gdp canceled");
298 return GST_FLOW_WRONG_STATE;
302 /* if we did not get canceled, report an error */
303 if (ret != GST_FLOW_WRONG_STATE) {
304 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
305 ("Could not read caps through GDP"));
312 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
313 const GValue * value, GParamSpec * pspec)
315 GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
319 if (!g_value_get_string (value)) {
320 g_warning ("host property cannot be NULL");
323 g_free (tcpserversrc->host);
324 tcpserversrc->host = g_strdup (g_value_get_string (value));
327 tcpserversrc->server_port = g_value_get_int (value);
330 tcpserversrc->protocol = g_value_get_enum (value);
334 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
341 GValue * value, GParamSpec * pspec)
343 GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
347 g_value_set_string (value, tcpserversrc->host);
350 g_value_set_int (value, tcpserversrc->server_port);
353 g_value_set_enum (value, tcpserversrc->protocol);
357 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
364 gst_tcp_server_src_start (GstBaseSrc * bsrc)
367 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
369 /* create the control sockets before anything */
370 if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
373 fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
374 fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
376 /* reset caps_received flag */
377 src->caps_received = FALSE;
379 /* create the server listener socket */
380 if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
383 GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
384 src->server_sock_fd);
386 /* make address reusable */
388 if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
392 /* name the socket */
393 memset (&src->server_sin, 0, sizeof (src->server_sin));
394 src->server_sin.sin_family = AF_INET; /* network socket */
395 src->server_sin.sin_port = htons (src->server_port); /* on port */
399 if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
401 src->server_sin.sin_addr.s_addr = inet_addr (host);
404 src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
407 GST_DEBUG_OBJECT (src, "binding server socket to address");
408 if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
409 sizeof (src->server_sin))) < 0)
412 GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
413 src->server_sock_fd, TCP_BACKLOG);
415 if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
418 GST_DEBUG_OBJECT (src, "received client");
420 GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
427 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
433 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
438 GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
439 ("Could not setsockopt: %s", g_strerror (errno)));
444 gst_tcp_socket_close (&src->server_sock_fd);
449 gst_tcp_socket_close (&src->server_sock_fd);
452 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
453 ("bind failed: %s", g_strerror (errno)));
460 gst_tcp_socket_close (&src->server_sock_fd);
461 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
462 ("Could not listen on server socket: %s", g_strerror (errno)));
468 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
470 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
472 if (src->server_sock_fd != -1) {
473 close (src->server_sock_fd);
474 src->server_sock_fd = -1;
476 if (src->client_sock_fd != -1) {
477 close (src->client_sock_fd);
478 src->client_sock_fd = -1;
480 GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
482 close (READ_SOCKET (src));
483 close (WRITE_SOCKET (src));
484 READ_SOCKET (src) = -1;
485 WRITE_SOCKET (src) = -1;
490 /* will be called only between calls to start() and stop() */
492 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
494 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
496 SEND_COMMAND (src, CONTROL_STOP);