2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
25 #include "gsttcpsrc.h"
28 #define TCP_DEFAULT_PORT 4953
30 /* elementfactory information */
31 GstElementDetails gst_tcpsrc_details = {
32 "TCP packet receiver",
35 "Receive data over the network via TCP",
37 "Zeeshan Ali <zak147@yahoo.com>",
41 /* TCPSrc signals and args */
51 /* ARG_SOCKET_OPTIONS,*/
55 #define GST_TYPE_TCPSRC_CONTROL (gst_tcpsrc_control_get_type())
57 gst_tcpsrc_control_get_type(void) {
58 static GType tcpsrc_control_type = 0;
59 static GEnumValue tcpsrc_control[] = {
60 {CONTROL_NONE, "1", "none"},
61 {CONTROL_TCP, "2", "tcp"},
62 {CONTROL_ZERO, NULL, NULL}
64 if (!tcpsrc_control_type) {
65 tcpsrc_control_type = g_enum_register_static("GstTCPSrcControl", tcpsrc_control);
67 return tcpsrc_control_type;
70 static void gst_tcpsrc_class_init (GstTCPSrc *klass);
71 static void gst_tcpsrc_init (GstTCPSrc *tcpsrc);
73 static GstData* gst_tcpsrc_get (GstPad *pad);
74 static GstElementStateReturn
75 gst_tcpsrc_change_state (GstElement *element);
77 static void gst_tcpsrc_set_property (GObject *object, guint prop_id,
78 const GValue *value, GParamSpec *pspec);
79 static void gst_tcpsrc_get_property (GObject *object, guint prop_id,
80 GValue *value, GParamSpec *pspec);
81 static void gst_tcpsrc_set_clock (GstElement *element, GstClock *clock);
83 static GstElementClass *parent_class = NULL;
84 /*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */
87 gst_tcpsrc_get_type (void)
89 static GType tcpsrc_type = 0;
93 static const GTypeInfo tcpsrc_info = {
94 sizeof(GstTCPSrcClass),
97 (GClassInitFunc)gst_tcpsrc_class_init,
102 (GInstanceInitFunc)gst_tcpsrc_init,
105 tcpsrc_type = g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0);
111 gst_tcpsrc_class_init (GstTCPSrc *klass)
113 GObjectClass *gobject_class;
114 GstElementClass *gstelement_class;
116 gobject_class = (GObjectClass*) klass;
117 gstelement_class = (GstElementClass*) klass;
119 parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
121 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
122 g_param_spec_int ("port", "port", "The port to receive the packets from",
123 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
124 g_object_class_install_property (gobject_class, ARG_CONTROL,
125 g_param_spec_enum ("control", "control", "The type of control",
126 GST_TYPE_TCPSRC_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
128 g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SOCKET_OPTIONS,
129 g_param_spec_boolean ("socketop", "socketop", "Enable or disable socket options REUSEADDR and KEEPALIVE",
130 FALSE, G_PARAM_READWRITE));
132 gobject_class->set_property = gst_tcpsrc_set_property;
133 gobject_class->get_property = gst_tcpsrc_get_property;
135 gstelement_class->change_state = gst_tcpsrc_change_state;
136 gstelement_class->set_clock = gst_tcpsrc_set_clock;
140 gst_tcpsrc_set_clock (GstElement *element, GstClock *clock)
144 tcpsrc = GST_TCPSRC (element);
146 tcpsrc->clock = clock;
150 gst_tcpsrc_init (GstTCPSrc *tcpsrc)
152 /* create the src and src pads */
153 tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC);
154 gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad);
155 gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get);
157 tcpsrc->port = TCP_DEFAULT_PORT;
158 tcpsrc->control = CONTROL_TCP;
159 tcpsrc->clock = NULL;
161 tcpsrc->control_sock = -1;
162 tcpsrc->client_sock = -1;
163 /*tcpsrc->socket_options = FALSE;*/
165 GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN);
166 GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF);
167 GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
171 gst_tcpsrc_get (GstPad *pad)
179 int ret, client_sock;
180 struct sockaddr client_addr;
182 g_return_val_if_fail (pad != NULL, NULL);
183 g_return_val_if_fail (GST_IS_PAD (pad), NULL);
185 tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad));
188 FD_SET (tcpsrc->sock, &read_fds);
190 max_sock = tcpsrc->sock;
192 if (tcpsrc->control_sock >= 0) {
193 FD_SET (tcpsrc->control_sock, &read_fds);
194 max_sock = MAX(tcpsrc->sock, tcpsrc->control_sock);
197 /* Add to FD_SET client socket, when connection has been established */
198 if (tcpsrc->client_sock >= 0)
200 FD_SET (tcpsrc->client_sock, &read_fds);
201 max_sock = MAX(tcpsrc->client_sock , max_sock);
205 if (select (max_sock+1, &read_fds, NULL, NULL, NULL) > 0) {
206 if ((tcpsrc->control_sock != -1) && FD_ISSET (tcpsrc->control_sock, &read_fds))
212 switch (tcpsrc->control) {
215 #ifndef GST_DISABLE_LOADSAVE
216 buf = g_malloc (1024*10);
218 len = sizeof (struct sockaddr);
219 client_sock = accept (tcpsrc->control_sock, &client_addr, &len);
221 if (client_sock <= 0) {
222 perror ("control_sock accept");
225 else if ((ret = read (client_sock, buf, 1024*10)) <= 0) {
226 perror ("control_sock read");
231 doc = xmlParseMemory(buf, ret);
232 caps = gst_caps_load_thyself(doc->xmlRootNode);
234 /* foward the connect, we don't signal back the result here... */
235 gst_pad_proxy_link (tcpsrc->srcpad, caps);
251 outbuf = gst_buffer_new ();
252 GST_BUFFER_DATA (outbuf) = g_malloc (24000);
253 GST_BUFFER_SIZE (outbuf) = 24000;
255 if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) {
257 GstClockTime current_time;
260 current_time = gst_clock_get_time (tcpsrc->clock);
262 GST_BUFFER_TIMESTAMP (outbuf) = current_time;
264 discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME,
267 gst_pad_push (tcpsrc->srcpad, GST_DATA (discont));
270 GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF);
274 GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
277 if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) {
278 tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len);
280 if (tcpsrc->client_sock <= 0) {
285 GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED);
290 read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf), GST_BUFFER_SIZE (outbuf));
293 GST_BUFFER_SIZE (outbuf) = numbytes;
300 else g_print("End of Stream reached\n");
301 gst_buffer_unref (outbuf);
303 close (tcpsrc->client_sock);
304 tcpsrc->client_sock = -1;
305 GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
315 return GST_DATA (outbuf);
320 gst_tcpsrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
324 /* it's not null if we got it, but it might not be ours */
325 g_return_if_fail(GST_IS_TCPSRC(object));
326 tcpsrc = GST_TCPSRC(object);
330 tcpsrc->port = g_value_get_int (value);
333 tcpsrc->control = g_value_get_enum (value);
335 /* case ARG_SOCKET_OPTIONS:
336 tcpsrc->socket_options = g_value_get_boolean(value);
344 gst_tcpsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
348 /* it's not null if we got it, but it might not be ours */
349 g_return_if_fail(GST_IS_TCPSRC(object));
350 tcpsrc = GST_TCPSRC(object);
354 g_value_set_int (value, tcpsrc->port);
357 g_value_set_enum (value, tcpsrc->control);
359 /* case ARG_SOCKET_OPTIONS:
360 g_value_set_boolean(value,tcpsrc->socket_options);
363 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
368 /* create a socket for sending to remote machine */
370 gst_tcpsrc_init_receive (GstTCPSrc *src)
373 bzero (&src->myaddr, sizeof (src->myaddr));
374 src->myaddr.sin_family = AF_INET; /* host byte order */
375 src->myaddr.sin_port = htons (src->port); /* short, network byte order */
376 src->myaddr.sin_addr.s_addr = INADDR_ANY;
378 if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
379 perror("stream_socket");
383 /* if (src->socket_options)
385 g_print("Socket Options SO_REUSEADDR, SO_KEEPALIVE\n");
388 /* allow local address reuse */
389 if( setsockopt( src->sock,SOL_SOCKET,SO_REUSEADDR, &val, sizeof( int )) <0)
390 perror( "setsockopt()" );
392 /* periodically test if connection still alive */
393 if( setsockopt( src->sock,SOL_SOCKET,SO_KEEPALIVE, &val, sizeof( int )) <0)
394 perror( "setsockopt()" );
398 if (bind (src->sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) {
399 perror("stream_sock bind");
403 if (listen (src->sock, 5) == -1) {
404 perror("stream_sock listen");
408 fcntl (src->sock, F_SETFL, O_NONBLOCK);
410 switch (src->control) {
412 if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
413 perror("control_socket");
417 src->myaddr.sin_port = htons (src->port+1);
418 if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1)
420 perror("control bind");
424 if (listen (src->control_sock, 5) == -1) {
425 perror("control listen");
429 fcntl (src->control_sock, F_SETFL, O_NONBLOCK);
431 GST_FLAG_SET (src, GST_TCPSRC_OPEN);
439 GST_FLAG_SET (src, GST_TCPSRC_OPEN);
445 gst_tcpsrc_close (GstTCPSrc *src)
447 if (src->sock != -1) {
451 if (src->control_sock != -1) {
452 close (src->control_sock);
453 src->control_sock = -1;
455 if (src->client_sock != -1) {
456 close(src->client_sock);
457 src->client_sock = -1;
460 GST_FLAG_UNSET (src, GST_TCPSRC_OPEN);
463 static GstElementStateReturn
464 gst_tcpsrc_change_state (GstElement *element)
466 g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE);
468 if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
469 if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN))
470 gst_tcpsrc_close (GST_TCPSRC (element));
472 if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) {
473 if (!gst_tcpsrc_init_receive (GST_TCPSRC (element)))
474 return GST_STATE_FAILURE;
478 if (GST_ELEMENT_CLASS (parent_class)->change_state)
479 return GST_ELEMENT_CLASS (parent_class)->change_state (element);
481 return GST_STATE_SUCCESS;