tcp: cleanup files
[platform/upstream/gstreamer.git] / gst / tcp / gsttcpserversrc.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-tcpserversrc
25  * @title: tcpserversrc
26  * @see_also: #tcpserversink
27  *
28  * ## Example launch line (server):
29  * |[
30  * gst-launch-1.0 tcpserversrc port=3000 ! fdsink fd=2
31  * ]|
32  * ## Example launch line (client):
33  * |[
34  * gst-launch-1.0 fdsrc fd=1 ! tcpclientsink port=3000
35  * ]|
36  *
37  */
38
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42
43 #include <gst/gst-i18n-plugin.h>
44 #include "gsttcpelements.h"
45 #include "gsttcpsrcstats.h"
46 #include "gsttcpserversrc.h"
47
48 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
49 #define GST_CAT_DEFAULT tcpserversrc_debug
50
51 #define TCP_DEFAULT_LISTEN_HOST         NULL    /* listen on all interfaces */
52 #define TCP_BACKLOG                     1       /* client connection queue */
53
54 #define MAX_READ_SIZE                   4 * 1024
55
56 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
57     GST_PAD_SRC,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS_ANY);
60
61 enum
62 {
63   PROP_0,
64   PROP_HOST,
65   PROP_PORT,
66   PROP_CURRENT_PORT,
67   PROP_STATS,
68 };
69
70 #define gst_tcp_server_src_parent_class parent_class
71 G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
72 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tcpserversrc, "tcpserversrc",
73     GST_RANK_NONE, GST_TYPE_TCP_SERVER_SRC, tcp_element_init (plugin));
74
75 static void gst_tcp_server_src_finalize (GObject * gobject);
76
77 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
78 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
79 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
80 static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
81 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
82     GstBuffer ** buf);
83
84 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
85     const GValue * value, GParamSpec * pspec);
86 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
87     GValue * value, GParamSpec * pspec);
88 static GstStructure *gst_tcp_server_src_get_stats (GstTCPServerSrc * src);
89
90 static void
91 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
92 {
93   GObjectClass *gobject_class;
94   GstElementClass *gstelement_class;
95   GstBaseSrcClass *gstbasesrc_class;
96   GstPushSrcClass *gstpush_src_class;
97
98   gobject_class = (GObjectClass *) klass;
99   gstelement_class = (GstElementClass *) klass;
100   gstbasesrc_class = (GstBaseSrcClass *) klass;
101   gstpush_src_class = (GstPushSrcClass *) klass;
102
103   gobject_class->set_property = gst_tcp_server_src_set_property;
104   gobject_class->get_property = gst_tcp_server_src_get_property;
105   gobject_class->finalize = gst_tcp_server_src_finalize;
106
107   /* FIXME 2.0: Rename this to bind-address, host does not make much
108    * sense here */
109   g_object_class_install_property (gobject_class, PROP_HOST,
110       g_param_spec_string ("host", "Host", "The hostname to listen as",
111           TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112   g_object_class_install_property (gobject_class, PROP_PORT,
113       g_param_spec_int ("port", "Port",
114           "The port to listen to (0=random available port)",
115           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
116           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
117   /**
118    * GstTCPServerSrc:current-port:
119    *
120    * The port number the socket is currently bound to. Applications can use
121    * this property to retrieve the port number actually bound to in case
122    * the port requested was 0 (=allocate a random available port).
123    *
124    * Since: 1.0.2
125    **/
126   g_object_class_install_property (gobject_class, PROP_CURRENT_PORT,
127       g_param_spec_int ("current-port", "current-port",
128           "The port number the socket is currently bound to", 0,
129           TCP_HIGHEST_PORT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
130
131   /**
132    * GstTCPServerSrc::stats:
133    *
134    * Sends a GstStructure with statistics. We count bytes-received in a
135    * platform-independent way and the rest via the tcp_info struct, if it's
136    * available. The OS takes care of the TCP layer for us so we can't know it
137    * from here.
138    *
139    * Struct members:
140    *
141    * bytes-received (uint64): Total bytes received (platform-independent)
142    * reordering (uint): Amount of reordering (linux-specific)
143    * unacked (uint): Un-acked packets (linux-specific)
144    * sacked (uint): Selective acked packets (linux-specific)
145    * lost (uint): Lost packets (linux-specific)
146    * retrans (uint): Retransmits (linux-specific)
147    * fackets (uint): Forward acknowledgement (linux-specific)
148    *
149    * Since: 1.18
150    */
151   g_object_class_install_property (gobject_class, PROP_STATS,
152       g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
153           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
154
155   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
156
157   gst_element_class_set_static_metadata (gstelement_class,
158       "TCP server source", "Source/Network",
159       "Receive data as a server over the network via TCP",
160       "Thomas Vander Stichele <thomas at apestaart dot org>");
161
162   gstbasesrc_class->start = gst_tcp_server_src_start;
163   gstbasesrc_class->stop = gst_tcp_server_src_stop;
164   gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
165   gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
166
167   gstpush_src_class->create = gst_tcp_server_src_create;
168
169   GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
170       "TCP Server Source");
171 }
172
173 static void
174 gst_tcp_server_src_init (GstTCPServerSrc * src)
175 {
176   src->server_port = TCP_DEFAULT_PORT;
177   src->host = g_strdup (TCP_DEFAULT_HOST);
178   src->server_socket = NULL;
179   src->client_socket = NULL;
180   src->cancellable = g_cancellable_new ();
181
182   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
183 }
184
185 static void
186 gst_tcp_server_src_finalize (GObject * gobject)
187 {
188   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
189
190   if (src->cancellable)
191     g_object_unref (src->cancellable);
192   src->cancellable = NULL;
193   if (src->server_socket)
194     g_object_unref (src->server_socket);
195   src->server_socket = NULL;
196   if (src->client_socket)
197     g_object_unref (src->client_socket);
198   src->client_socket = NULL;
199
200   g_free (src->host);
201   src->host = NULL;
202
203   gst_clear_structure (&src->stats);
204
205   G_OBJECT_CLASS (parent_class)->finalize (gobject);
206 }
207
208 static GstFlowReturn
209 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
210 {
211   GstTCPServerSrc *src;
212   GstFlowReturn ret = GST_FLOW_OK;
213   gssize rret, avail;
214   gsize read;
215   GError *err = NULL;
216   GstMapInfo map;
217
218   src = GST_TCP_SERVER_SRC (psrc);
219
220   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
221     goto wrong_state;
222
223   if (!src->client_socket) {
224     /* wait on server socket for connections */
225     src->client_socket =
226         g_socket_accept (src->server_socket, src->cancellable, &err);
227     if (!src->client_socket)
228       goto accept_error;
229     GST_DEBUG_OBJECT (src, "closing server socket");
230
231     if (!g_socket_close (src->server_socket, &err)) {
232       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
233       g_clear_error (&err);
234     }
235     /* now read from the socket. */
236   }
237
238   /* if we have a client, wait for read */
239   GST_LOG_OBJECT (src, "asked for a buffer");
240
241   /* read the buffer header */
242   avail = g_socket_get_available_bytes (src->client_socket);
243   if (avail < 0) {
244     goto get_available_error;
245   } else if (avail == 0) {
246     GIOCondition condition;
247
248     if (!g_socket_condition_wait (src->client_socket,
249             G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
250       goto select_error;
251
252     condition =
253         g_socket_condition_check (src->client_socket,
254         G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
255
256     if ((condition & G_IO_ERR)) {
257       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
258           ("Socket in error state"));
259       *outbuf = NULL;
260       ret = GST_FLOW_ERROR;
261       goto done;
262     } else if ((condition & G_IO_HUP)) {
263       GST_DEBUG_OBJECT (src, "Connection closed");
264       *outbuf = NULL;
265       ret = GST_FLOW_EOS;
266       goto done;
267     }
268     avail = g_socket_get_available_bytes (src->client_socket);
269     if (avail < 0)
270       goto get_available_error;
271   }
272
273   if (avail > 0) {
274     read = MIN (avail, MAX_READ_SIZE);
275     *outbuf = gst_buffer_new_and_alloc (read);
276     gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
277     rret =
278         g_socket_receive (src->client_socket, (gchar *) map.data, read,
279         src->cancellable, &err);
280   } else {
281     /* Connection closed */
282     rret = 0;
283     *outbuf = NULL;
284     read = 0;
285   }
286
287   if (rret == 0) {
288     GST_DEBUG_OBJECT (src, "Connection closed");
289     ret = GST_FLOW_EOS;
290     if (*outbuf) {
291       gst_buffer_unmap (*outbuf, &map);
292       gst_buffer_unref (*outbuf);
293     }
294     *outbuf = NULL;
295   } else if (rret < 0) {
296     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
297       ret = GST_FLOW_FLUSHING;
298       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
299     } else {
300       ret = GST_FLOW_ERROR;
301       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
302           ("Failed to read from socket: %s", err->message));
303     }
304     gst_buffer_unmap (*outbuf, &map);
305     gst_buffer_unref (*outbuf);
306     *outbuf = NULL;
307   } else {
308     ret = GST_FLOW_OK;
309     gst_buffer_unmap (*outbuf, &map);
310     gst_buffer_resize (*outbuf, 0, rret);
311     src->bytes_received += read;
312
313     GST_LOG_OBJECT (src,
314         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
315         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
316         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
317         gst_buffer_get_size (*outbuf),
318         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
319         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
320         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
321   }
322   g_clear_error (&err);
323
324 done:
325   return ret;
326
327 wrong_state:
328   {
329     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
330     return GST_FLOW_FLUSHING;
331   }
332 accept_error:
333   {
334     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
335       GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
336       ret = GST_FLOW_FLUSHING;
337     } else {
338       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
339           ("Failed to accept client: %s", err->message));
340       ret = GST_FLOW_ERROR;
341     }
342     g_clear_error (&err);
343     return ret;
344   }
345 select_error:
346   {
347     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
348       GST_DEBUG_OBJECT (src, "Cancelled select");
349       ret = GST_FLOW_FLUSHING;
350     } else {
351       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
352           ("Select failed: %s", err->message));
353       ret = GST_FLOW_ERROR;
354     }
355     g_clear_error (&err);
356     return ret;
357   }
358 get_available_error:
359   {
360     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
361         ("Failed to get available bytes from socket"));
362     return GST_FLOW_ERROR;
363   }
364 }
365
366 static void
367 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
368     const GValue * value, GParamSpec * pspec)
369 {
370   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
371
372   switch (prop_id) {
373     case PROP_HOST:
374       if (!g_value_get_string (value)) {
375         g_warning ("host property cannot be NULL");
376         break;
377       }
378       g_free (tcpserversrc->host);
379       tcpserversrc->host = g_value_dup_string (value);
380       break;
381     case PROP_PORT:
382       tcpserversrc->server_port = g_value_get_int (value);
383       break;
384
385     default:
386       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
387       break;
388   }
389 }
390
391 static void
392 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
393     GValue * value, GParamSpec * pspec)
394 {
395   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
396
397   switch (prop_id) {
398     case PROP_HOST:
399       g_value_set_string (value, tcpserversrc->host);
400       break;
401     case PROP_PORT:
402       g_value_set_int (value, tcpserversrc->server_port);
403       break;
404     case PROP_CURRENT_PORT:
405       g_value_set_int (value, g_atomic_int_get (&tcpserversrc->current_port));
406       break;
407     case PROP_STATS:
408       g_value_take_boxed (value, gst_tcp_server_src_get_stats (tcpserversrc));
409       break;
410     default:
411       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
412       break;
413   }
414 }
415
416 /* set up server */
417 static gboolean
418 gst_tcp_server_src_start (GstBaseSrc * bsrc)
419 {
420   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
421   GError *err = NULL;
422   GInetAddress *addr;
423   GSocketAddress *saddr;
424   GResolver *resolver;
425   gint bound_port = 0;
426
427   src->bytes_received = 0;
428   gst_clear_structure (&src->stats);
429
430   /* look up name if we need to */
431   addr = g_inet_address_new_from_string (src->host);
432   if (!addr) {
433     GList *results;
434
435     resolver = g_resolver_get_default ();
436
437     results =
438         g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
439     if (!results)
440       goto name_resolve;
441     addr = G_INET_ADDRESS (g_object_ref (results->data));
442
443     g_resolver_free_addresses (results);
444     g_object_unref (resolver);
445   }
446 #ifndef GST_DISABLE_GST_DEBUG
447   {
448     gchar *ip = g_inet_address_to_string (addr);
449
450     GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
451     g_free (ip);
452   }
453 #endif
454
455   saddr = g_inet_socket_address_new (addr, src->server_port);
456   g_object_unref (addr);
457
458   /* create the server listener socket */
459   src->server_socket =
460       g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
461       G_SOCKET_PROTOCOL_TCP, &err);
462   if (!src->server_socket)
463     goto no_socket;
464
465   GST_DEBUG_OBJECT (src, "opened receiving server socket");
466
467   /* bind it */
468   GST_DEBUG_OBJECT (src, "binding server socket to address");
469   if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
470     goto bind_failed;
471
472   g_object_unref (saddr);
473
474   GST_DEBUG_OBJECT (src, "listening on server socket");
475
476   g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
477
478   if (!g_socket_listen (src->server_socket, &err))
479     goto listen_failed;
480
481   GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
482
483   if (src->server_port == 0) {
484     saddr = g_socket_get_local_address (src->server_socket, NULL);
485     bound_port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
486     g_object_unref (saddr);
487   } else {
488     bound_port = src->server_port;
489   }
490
491   GST_DEBUG_OBJECT (src, "listening on port %d", bound_port);
492
493   g_atomic_int_set (&src->current_port, bound_port);
494   g_object_notify (G_OBJECT (src), "current-port");
495
496   return TRUE;
497
498   /* ERRORS */
499 no_socket:
500   {
501     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
502         ("Failed to create socket: %s", err->message));
503     g_clear_error (&err);
504     g_object_unref (saddr);
505     return FALSE;
506   }
507 name_resolve:
508   {
509     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
510       GST_DEBUG_OBJECT (src, "Cancelled name resolval");
511     } else {
512       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
513           ("Failed to resolve host '%s': %s", src->host, err->message));
514     }
515     g_clear_error (&err);
516     g_object_unref (resolver);
517     return FALSE;
518   }
519 bind_failed:
520   {
521     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
522       GST_DEBUG_OBJECT (src, "Cancelled binding");
523     } else {
524       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
525           ("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
526               err->message));
527     }
528     g_clear_error (&err);
529     g_object_unref (saddr);
530     gst_tcp_server_src_stop (GST_BASE_SRC (src));
531     return FALSE;
532   }
533 listen_failed:
534   {
535     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
536       GST_DEBUG_OBJECT (src, "Cancelled listening");
537     } else {
538       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
539           ("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
540               err->message));
541     }
542     g_clear_error (&err);
543     gst_tcp_server_src_stop (GST_BASE_SRC (src));
544     return FALSE;
545   }
546 }
547
548 static gboolean
549 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
550 {
551   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
552   GError *err = NULL;
553
554   if (src->client_socket) {
555     GST_DEBUG_OBJECT (src, "closing socket");
556
557     src->stats = gst_tcp_server_src_get_stats (src);
558
559     if (!g_socket_close (src->client_socket, &err)) {
560       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
561       g_clear_error (&err);
562     }
563     g_object_unref (src->client_socket);
564     src->client_socket = NULL;
565   }
566
567   if (src->server_socket) {
568     GST_DEBUG_OBJECT (src, "closing socket");
569
570     if (!g_socket_close (src->server_socket, &err)) {
571       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
572       g_clear_error (&err);
573     }
574     g_object_unref (src->server_socket);
575     src->server_socket = NULL;
576
577     g_atomic_int_set (&src->current_port, 0);
578     g_object_notify (G_OBJECT (src), "current-port");
579   }
580
581   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
582
583   return TRUE;
584 }
585
586 /* will be called only between calls to start() and stop() */
587 static gboolean
588 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
589 {
590   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
591
592   g_cancellable_cancel (src->cancellable);
593
594   return TRUE;
595 }
596
597 static gboolean
598 gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
599 {
600   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
601
602   g_object_unref (src->cancellable);
603   src->cancellable = g_cancellable_new ();
604
605   return TRUE;
606 }
607
608
609 static GstStructure *
610 gst_tcp_server_src_get_stats (GstTCPServerSrc * src)
611 {
612   GstStructure *s;
613
614   /* we can't get the values post stop so just return the saved ones */
615   if (src->stats)
616     return gst_structure_copy (src->stats);
617
618   s = gst_structure_new ("GstTCPServerSrcStats",
619       "bytes-received", G_TYPE_UINT64, src->bytes_received, NULL);
620
621   gst_tcp_stats_from_socket (s, src->client_socket);
622
623   return s;
624 }