/GstBuffer/GstData/ in the API where you can pass events. Fix the plugins to deal...
[platform/upstream/gstreamer.git] / gst / tcp / gsttcpsrc.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include "gsttcpsrc.h"
26 #include <unistd.h>
27
28 #define TCP_DEFAULT_PORT                4953
29
30 /* elementfactory information */
31 GstElementDetails gst_tcpsrc_details = {
32   "TCP packet receiver",
33   "Source/Network",
34   "LGPL",
35   "Receive data over the network via TCP",
36   VERSION,
37   "Zeeshan Ali <zak147@yahoo.com>",
38   "(C) 2003",
39 };
40
41 /* TCPSrc signals and args */
42 enum {
43   /* FILL ME */
44   LAST_SIGNAL
45 };
46
47 enum {
48   ARG_0,
49   ARG_PORT,
50   ARG_CONTROL,
51 /*  ARG_SOCKET_OPTIONS,*/
52   /* FILL ME */
53 };
54
55 #define GST_TYPE_TCPSRC_CONTROL (gst_tcpsrc_control_get_type())
56 static GType
57 gst_tcpsrc_control_get_type(void) {
58   static GType tcpsrc_control_type = 0;
59   static GEnumValue tcpsrc_control[] = {
60     {CONTROL_NONE, "1", "none"},
61     {CONTROL_TCP, "2", "tcp"},
62     {CONTROL_ZERO, NULL, NULL}
63   };
64   if (!tcpsrc_control_type) {
65     tcpsrc_control_type = g_enum_register_static("GstTCPSrcControl", tcpsrc_control);
66   }
67   return tcpsrc_control_type;
68 }
69
70 static void             gst_tcpsrc_class_init           (GstTCPSrc *klass);
71 static void             gst_tcpsrc_init                 (GstTCPSrc *tcpsrc);
72
73 static GstData* gst_tcpsrc_get                  (GstPad *pad);
74 static GstElementStateReturn
75                         gst_tcpsrc_change_state         (GstElement *element);
76
77 static void             gst_tcpsrc_set_property         (GObject *object, guint prop_id, 
78                                                          const GValue *value, GParamSpec *pspec);
79 static void             gst_tcpsrc_get_property         (GObject *object, guint prop_id, 
80                                                          GValue *value, GParamSpec *pspec);
81 static void             gst_tcpsrc_set_clock            (GstElement *element, GstClock *clock);
82
83 static GstElementClass *parent_class = NULL;
84 /*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */
85
86 GType
87 gst_tcpsrc_get_type (void)
88 {
89   static GType tcpsrc_type = 0;
90
91
92   if (!tcpsrc_type) {
93     static const GTypeInfo tcpsrc_info = {
94       sizeof(GstTCPSrcClass),
95       NULL,
96       NULL,
97       (GClassInitFunc)gst_tcpsrc_class_init,
98       NULL,
99       NULL,
100       sizeof(GstTCPSrc),
101       0,
102       (GInstanceInitFunc)gst_tcpsrc_init,
103       NULL
104     };
105     tcpsrc_type = g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0);
106   }
107   return tcpsrc_type;
108 }
109
110 static void
111 gst_tcpsrc_class_init (GstTCPSrc *klass)
112 {
113   GObjectClass *gobject_class;
114   GstElementClass *gstelement_class;
115
116   gobject_class = (GObjectClass*) klass;
117   gstelement_class = (GstElementClass*) klass;
118
119   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
120
121   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
122     g_param_spec_int ("port", "port", "The port to receive the packets from",
123                        0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); 
124   g_object_class_install_property (gobject_class, ARG_CONTROL,
125     g_param_spec_enum ("control", "control", "The type of control",
126                        GST_TYPE_TCPSRC_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
127 /*
128   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SOCKET_OPTIONS,
129     g_param_spec_boolean ("socketop", "socketop", "Enable or disable socket options REUSEADDR and KEEPALIVE",
130                         FALSE, G_PARAM_READWRITE));
131 */
132   gobject_class->set_property = gst_tcpsrc_set_property;
133   gobject_class->get_property = gst_tcpsrc_get_property;
134
135   gstelement_class->change_state = gst_tcpsrc_change_state;
136   gstelement_class->set_clock = gst_tcpsrc_set_clock;
137 }
138
139 static void
140 gst_tcpsrc_set_clock (GstElement *element, GstClock *clock)
141 {
142   GstTCPSrc *tcpsrc;
143               
144   tcpsrc = GST_TCPSRC (element);
145
146   tcpsrc->clock = clock;
147 }
148
149 static void
150 gst_tcpsrc_init (GstTCPSrc *tcpsrc)
151 {
152   /* create the src and src pads */
153   tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC);
154   gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad);
155   gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get);
156
157   tcpsrc->port = TCP_DEFAULT_PORT;
158   tcpsrc->control = CONTROL_TCP;
159   tcpsrc->clock = NULL;
160   tcpsrc->sock = -1;
161   tcpsrc->control_sock = -1;
162   tcpsrc->client_sock = -1;
163   /*tcpsrc->socket_options = FALSE;*/
164
165   GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN);
166   GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF);
167   GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
168 }
169
170 static GstData*
171 gst_tcpsrc_get (GstPad *pad)
172 {
173   GstTCPSrc *tcpsrc;
174   GstBuffer *outbuf;
175   socklen_t len;
176   gint numbytes;
177   fd_set read_fds;
178   guint max_sock;
179   int ret, client_sock;
180   struct sockaddr client_addr;
181
182   g_return_val_if_fail (pad != NULL, NULL);
183   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
184
185   tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad));
186
187   FD_ZERO (&read_fds);
188   FD_SET (tcpsrc->sock, &read_fds);
189
190   max_sock = tcpsrc->sock;
191
192   if (tcpsrc->control_sock >= 0) {
193     FD_SET (tcpsrc->control_sock, &read_fds);
194     max_sock = MAX(tcpsrc->sock, tcpsrc->control_sock); 
195   }
196
197   /* Add to FD_SET client socket, when connection has been established */ 
198   if (tcpsrc->client_sock >= 0)  
199   {
200    FD_SET (tcpsrc->client_sock, &read_fds);
201    max_sock = MAX(tcpsrc->client_sock , max_sock);
202   }
203    
204
205   if (select (max_sock+1, &read_fds, NULL, NULL, NULL) > 0) {
206     if ((tcpsrc->control_sock != -1) && FD_ISSET (tcpsrc->control_sock, &read_fds)) 
207     {
208       guchar *buf=NULL;
209       xmlDocPtr doc;
210       GstCaps *caps;
211       
212       switch (tcpsrc->control) {
213         case CONTROL_TCP:
214
215 #ifndef GST_DISABLE_LOADSAVE
216            buf = g_malloc (1024*10);
217
218            len = sizeof (struct sockaddr);
219            client_sock = accept (tcpsrc->control_sock, &client_addr, &len);
220             
221            if (client_sock <= 0) {
222              perror ("control_sock accept");
223            }
224       
225            else if ((ret = read (client_sock, buf, 1024*10)) <= 0) {
226              perror ("control_sock read");
227            }
228
229            else {
230              buf[ret] = '\0';
231              doc = xmlParseMemory(buf, ret);
232              caps = gst_caps_load_thyself(doc->xmlRootNode);
233       
234              /* foward the connect, we don't signal back the result here... */
235              gst_pad_proxy_link (tcpsrc->srcpad, caps);
236            }
237       
238            g_free (buf);
239 #endif
240         break;
241         case CONTROL_NONE:
242         default:
243             g_free (buf);
244             return NULL;
245             break;
246       }
247       
248       outbuf = NULL;
249     }
250     else {
251       outbuf = gst_buffer_new ();
252       GST_BUFFER_DATA (outbuf) = g_malloc (24000);
253       GST_BUFFER_SIZE (outbuf) = 24000;
254
255       if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) {
256         if (tcpsrc->clock) {
257            GstClockTime current_time;
258            GstEvent *discont;
259
260            current_time = gst_clock_get_time (tcpsrc->clock);
261            
262            GST_BUFFER_TIMESTAMP (outbuf) = current_time;
263
264            discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, 
265                                 current_time, NULL);
266
267            gst_pad_push (tcpsrc->srcpad, GST_DATA (discont));
268         }
269   
270         GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF);
271       }
272       
273       else {
274         GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
275       }
276         
277       if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) {
278         tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len);
279             
280         if (tcpsrc->client_sock <= 0) {
281           perror ("accept");
282         }
283
284         else {
285           GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED);
286         }
287       }
288
289       numbytes = 
290         read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf), GST_BUFFER_SIZE (outbuf));
291
292       if (numbytes > 0) {
293         GST_BUFFER_SIZE (outbuf) = numbytes;
294       }
295       
296       else {
297         if (numbytes == -1){
298                 perror ("read");
299         }
300         else g_print("End of Stream reached\n");
301         gst_buffer_unref (outbuf);
302         outbuf = NULL;
303         close (tcpsrc->client_sock);
304         tcpsrc->client_sock = -1;
305         GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
306       }
307     }
308   }
309   
310   else {
311     perror ("select");
312     outbuf = NULL;
313   }
314   
315   return GST_DATA (outbuf);
316 }
317
318
319 static void
320 gst_tcpsrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
321 {
322   GstTCPSrc *tcpsrc;
323
324   /* it's not null if we got it, but it might not be ours */
325   g_return_if_fail(GST_IS_TCPSRC(object));
326   tcpsrc = GST_TCPSRC(object);
327
328   switch (prop_id) {
329     case ARG_PORT:
330         tcpsrc->port = g_value_get_int (value);
331       break;
332     case ARG_CONTROL:
333         tcpsrc->control = g_value_get_enum (value);
334       break;
335 /*    case ARG_SOCKET_OPTIONS:
336         tcpsrc->socket_options = g_value_get_boolean(value);    
337       break;    */
338     default:
339       break;
340   }
341 }
342
343 static void
344 gst_tcpsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
345 {
346   GstTCPSrc *tcpsrc;
347
348   /* it's not null if we got it, but it might not be ours */
349   g_return_if_fail(GST_IS_TCPSRC(object));
350   tcpsrc = GST_TCPSRC(object);
351
352   switch (prop_id) {
353     case ARG_PORT:
354       g_value_set_int (value, tcpsrc->port);
355       break;
356     case ARG_CONTROL:
357       g_value_set_enum (value, tcpsrc->control);
358       break;
359 /*    case ARG_SOCKET_OPTIONS:
360       g_value_set_boolean(value,tcpsrc->socket_options);
361       break;*/
362     default:
363       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
364       break;
365   }
366 }
367
368 /* create a socket for sending to remote machine */
369 static gboolean
370 gst_tcpsrc_init_receive (GstTCPSrc *src)
371 {
372   guint val=0;
373   bzero (&src->myaddr, sizeof (src->myaddr));
374   src->myaddr.sin_family = AF_INET;           /* host byte order */
375   src->myaddr.sin_port = htons (src->port);   /* short, network byte order */
376   src->myaddr.sin_addr.s_addr = INADDR_ANY;
377
378   if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
379     perror("stream_socket");
380     return FALSE;
381   }
382
383 /*  if (src->socket_options)
384   {*/
385    g_print("Socket Options SO_REUSEADDR, SO_KEEPALIVE\n");
386   /* Sock Options */ 
387   val = 1;
388   /* allow local address reuse */ 
389   if( setsockopt( src->sock,SOL_SOCKET,SO_REUSEADDR, &val, sizeof( int )) <0)
390     perror( "setsockopt()" );
391   val = 1;
392   /* periodically test if connection still alive */ 
393   if( setsockopt( src->sock,SOL_SOCKET,SO_KEEPALIVE, &val, sizeof( int )) <0)
394     perror( "setsockopt()" );
395   /* Sock Options */
396 /*  } */
397
398   if (bind (src->sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) {
399     perror("stream_sock bind");
400     return FALSE;
401   }
402
403   if (listen (src->sock, 5) == -1) {
404     perror("stream_sock listen");
405     return FALSE;
406   }
407   
408   fcntl (src->sock, F_SETFL, O_NONBLOCK);
409   
410   switch (src->control) {
411     case CONTROL_TCP:
412         if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
413           perror("control_socket");
414           return FALSE;
415         }
416         
417         src->myaddr.sin_port = htons (src->port+1);
418         if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) 
419         {
420           perror("control bind");
421           return FALSE;
422         }
423         
424         if (listen (src->control_sock, 5) == -1) {
425           perror("control listen");
426           return FALSE;
427         }
428   
429         fcntl (src->control_sock, F_SETFL, O_NONBLOCK);
430     case CONTROL_NONE:
431         GST_FLAG_SET (src, GST_TCPSRC_OPEN);
432         return TRUE;
433         break;
434     default:
435         return FALSE;
436         break;
437   }
438   
439   GST_FLAG_SET (src, GST_TCPSRC_OPEN);
440   
441   return TRUE;
442 }
443
444 static void
445 gst_tcpsrc_close (GstTCPSrc *src)
446 {
447   if (src->sock != -1) {
448     close (src->sock);
449     src->sock = -1;
450   }
451   if (src->control_sock != -1) {
452     close (src->control_sock);
453     src->control_sock = -1;
454   }
455   if (src->client_sock != -1) {
456     close(src->client_sock);
457     src->client_sock = -1;
458   } 
459
460   GST_FLAG_UNSET (src, GST_TCPSRC_OPEN);
461 }
462
463 static GstElementStateReturn
464 gst_tcpsrc_change_state (GstElement *element)
465 {
466   g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE);
467
468   if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
469     if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN))
470       gst_tcpsrc_close (GST_TCPSRC (element));
471   } else {
472     if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) {
473       if (!gst_tcpsrc_init_receive (GST_TCPSRC (element)))
474         return GST_STATE_FAILURE;
475     }
476   }
477
478   if (GST_ELEMENT_CLASS (parent_class)->change_state)
479     return GST_ELEMENT_CLASS (parent_class)->change_state (element);
480
481   return GST_STATE_SUCCESS;
482 }
483