2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
26 #include <gst/gst-i18n-plugin.h>
28 #include "gsttcpserversrc.h"
29 #include <string.h> /* memset */
31 #include <sys/ioctl.h>
35 /* control stuff stolen from fdsrc */
36 #define CONTROL_STOP 'S' /* stop the select call */
37 #define CONTROL_SOCKETS(o) o->control_fds
38 #define WRITE_SOCKET(o) o->control_fds[1]
39 #define READ_SOCKET(o) o->control_fds[0]
41 #define SEND_COMMAND(o, command) \
43 unsigned char c; c = command; \
44 write (WRITE_SOCKET(o), &c, 1); \
47 #define READ_COMMAND(o, command, res) \
49 res = read(READ_SOCKET(o), &command, 1); \
53 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
54 #define GST_CAT_DEFAULT tcpserversrc_debug
56 #define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */
57 #define TCP_BACKLOG 1 /* client connection queue */
60 static const GstElementDetails gst_tcp_server_src_details =
61 GST_ELEMENT_DETAILS ("TCP server source",
63 "Receive data as a server over the network via TCP",
64 "Thomas Vander Stichele <thomas at apestaart dot org>");
66 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
81 GST_BOILERPLATE (GstTCPServerSrc, gst_tcp_server_src, GstPushSrc,
85 static void gst_tcp_server_src_finalize (GObject * gobject);
87 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
88 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
89 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
90 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
93 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
94 const GValue * value, GParamSpec * pspec);
95 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
96 GValue * value, GParamSpec * pspec);
100 gst_tcp_server_src_base_init (gpointer g_class)
102 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
104 gst_element_class_add_pad_template (element_class,
105 gst_static_pad_template_get (&srctemplate));
107 gst_element_class_set_details (element_class, &gst_tcp_server_src_details);
111 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
113 GObjectClass *gobject_class;
114 GstBaseSrcClass *gstbasesrc_class;
115 GstPushSrcClass *gstpush_src_class;
117 gobject_class = (GObjectClass *) klass;
118 gstbasesrc_class = (GstBaseSrcClass *) klass;
119 gstpush_src_class = (GstPushSrcClass *) klass;
121 gobject_class->set_property = gst_tcp_server_src_set_property;
122 gobject_class->get_property = gst_tcp_server_src_get_property;
123 gobject_class->finalize = gst_tcp_server_src_finalize;
125 g_object_class_install_property (gobject_class, PROP_HOST,
126 g_param_spec_string ("host", "Host", "The hostname to listen as",
127 TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE));
128 g_object_class_install_property (gobject_class, PROP_PORT,
129 g_param_spec_int ("port", "Port", "The port to listen to",
130 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
131 g_object_class_install_property (gobject_class, PROP_PROTOCOL,
132 g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
133 GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE, G_PARAM_READWRITE));
135 gstbasesrc_class->start = gst_tcp_server_src_start;
136 gstbasesrc_class->stop = gst_tcp_server_src_stop;
137 gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
139 gstpush_src_class->create = gst_tcp_server_src_create;
141 GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
142 "TCP Server Source");
146 gst_tcp_server_src_init (GstTCPServerSrc * src, GstTCPServerSrcClass * g_class)
148 src->server_port = TCP_DEFAULT_PORT;
149 src->host = g_strdup (TCP_DEFAULT_HOST);
150 src->server_sock_fd = -1;
151 src->client_sock_fd = -1;
152 src->protocol = GST_TCP_PROTOCOL_NONE;
154 READ_SOCKET (src) = -1;
155 WRITE_SOCKET (src) = -1;
157 GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
161 gst_tcp_server_src_finalize (GObject * gobject)
163 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
167 G_OBJECT_CLASS (parent_class)->finalize (gobject);
171 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
173 GstTCPServerSrc *src;
174 GstFlowReturn ret = GST_FLOW_OK;
178 src = GST_TCP_SERVER_SRC (psrc);
180 if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
184 /* do a blocking select on the socket */
187 /* always select on cancel socket */
188 FD_SET (READ_SOCKET (src), &testfds);
190 if (src->client_sock_fd >= 0) {
191 /* if we have a client, wait for read */
192 FD_SET (src->client_sock_fd, &testfds);
193 maxfdp1 = MAX (src->client_sock_fd, READ_SOCKET (src)) + 1;
195 /* else wait on server socket for connections */
196 FD_SET (src->server_sock_fd, &testfds);
197 maxfdp1 = MAX (src->server_sock_fd, READ_SOCKET (src)) + 1;
200 /* no action (0) is an error too in our case */
201 if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
204 if (FD_ISSET (READ_SOCKET (src), &testfds))
205 goto select_cancelled;
207 /* if we have no client socket we can accept one now */
208 if (src->client_sock_fd < 0) {
209 if (FD_ISSET (src->server_sock_fd, &testfds)) {
210 if ((src->client_sock_fd =
211 accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
212 &src->client_sin_len)) == -1)
215 /* and restart now to poll the socket. */
219 GST_LOG_OBJECT (src, "asked for a buffer");
221 switch (src->protocol) {
222 case GST_TCP_PROTOCOL_NONE:
223 ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
224 READ_SOCKET (src), outbuf);
227 case GST_TCP_PROTOCOL_GDP:
228 if (!src->caps_received) {
232 ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd,
233 READ_SOCKET (src), &caps);
235 if (ret == GST_FLOW_WRONG_STATE)
238 if (ret != GST_FLOW_OK)
239 goto gdp_caps_read_error;
241 src->caps_received = TRUE;
242 string = gst_caps_to_string (caps);
243 GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
246 gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
249 ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
250 READ_SOCKET (src), outbuf);
252 if (ret == GST_FLOW_OK)
253 gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
258 /* need to assert as buf == NULL */
259 g_assert ("Unhandled protocol type");
263 if (ret == GST_FLOW_OK) {
265 "Returning buffer from _get of size %d, ts %"
266 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
267 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
268 GST_BUFFER_SIZE (*outbuf),
269 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
270 GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
271 GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
278 GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
279 return GST_FLOW_WRONG_STATE;
283 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
284 ("Select error: %s", g_strerror (errno)));
285 return GST_FLOW_ERROR;
289 GST_DEBUG_OBJECT (src, "select canceled");
290 return GST_FLOW_WRONG_STATE;
294 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
295 ("Could not accept client on server socket: %s", g_strerror (errno)));
296 return GST_FLOW_ERROR;
300 GST_DEBUG_OBJECT (src, "reading gdp canceled");
301 return GST_FLOW_WRONG_STATE;
305 /* if we did not get canceled, report an error */
306 if (ret != GST_FLOW_WRONG_STATE) {
307 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
308 ("Could not read caps through GDP"));
315 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
316 const GValue * value, GParamSpec * pspec)
318 GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
322 if (!g_value_get_string (value)) {
323 g_warning ("host property cannot be NULL");
326 g_free (tcpserversrc->host);
327 tcpserversrc->host = g_strdup (g_value_get_string (value));
330 tcpserversrc->server_port = g_value_get_int (value);
333 tcpserversrc->protocol = g_value_get_enum (value);
337 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
343 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
344 GValue * value, GParamSpec * pspec)
346 GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
350 g_value_set_string (value, tcpserversrc->host);
353 g_value_set_int (value, tcpserversrc->server_port);
356 g_value_set_enum (value, tcpserversrc->protocol);
360 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
367 gst_tcp_server_src_start (GstBaseSrc * bsrc)
370 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
372 /* create the control sockets before anything */
373 if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
376 fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
377 fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
379 /* reset caps_received flag */
380 src->caps_received = FALSE;
382 /* create the server listener socket */
383 if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
386 GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
387 src->server_sock_fd);
389 /* make address reusable */
391 if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
395 /* name the socket */
396 memset (&src->server_sin, 0, sizeof (src->server_sin));
397 src->server_sin.sin_family = AF_INET; /* network socket */
398 src->server_sin.sin_port = htons (src->server_port); /* on port */
402 if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
404 src->server_sin.sin_addr.s_addr = inet_addr (host);
407 src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
410 GST_DEBUG_OBJECT (src, "binding server socket to address");
411 if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
412 sizeof (src->server_sin))) < 0)
415 GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
416 src->server_sock_fd, TCP_BACKLOG);
418 if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
421 GST_DEBUG_OBJECT (src, "received client");
423 GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
430 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
436 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
441 GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
442 ("Could not setsockopt: %s", g_strerror (errno)));
447 gst_tcp_socket_close (&src->server_sock_fd);
452 gst_tcp_socket_close (&src->server_sock_fd);
455 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
456 ("bind failed: %s", g_strerror (errno)));
463 gst_tcp_socket_close (&src->server_sock_fd);
464 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
465 ("Could not listen on server socket: %s", g_strerror (errno)));
471 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
473 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
475 if (src->server_sock_fd != -1) {
476 close (src->server_sock_fd);
477 src->server_sock_fd = -1;
479 if (src->client_sock_fd != -1) {
480 close (src->client_sock_fd);
481 src->client_sock_fd = -1;
483 GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
485 close (READ_SOCKET (src));
486 close (WRITE_SOCKET (src));
487 READ_SOCKET (src) = -1;
488 WRITE_SOCKET (src) = -1;
493 /* will be called only between calls to start() and stop() */
495 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
497 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
499 SEND_COMMAND (src, CONTROL_STOP);