audiotestsrc: fix rounding errors that might cause segments to be one sample too...
[platform/upstream/gstreamer.git] / gst / tcp / gsttcpserversrc.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-tcpserversrc
25  * @see_also: #tcpserversink
26  *
27  * <refsect2>
28  * <title>Example launch line</title>
29  * |[
30  * # server:
31  * gst-launch tcpserversrc port=3000 ! fdsink fd=2
32  * # client:
33  * gst-launch fdsrc fd=1 ! tcpclientsink port=3000
34  * ]| 
35  * </refsect2>
36  */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst-i18n-plugin.h>
43 #include "gsttcp.h"
44 #include "gsttcpserversrc.h"
45
46 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
47 #define GST_CAT_DEFAULT tcpserversrc_debug
48
49 #define TCP_DEFAULT_LISTEN_HOST         NULL    /* listen on all interfaces */
50 #define TCP_BACKLOG                     1       /* client connection queue */
51
52 #define MAX_READ_SIZE                   4 * 1024
53
54 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
55     GST_PAD_SRC,
56     GST_PAD_ALWAYS,
57     GST_STATIC_CAPS_ANY);
58
59 enum
60 {
61   PROP_0,
62   PROP_HOST,
63   PROP_PORT,
64   PROP_CURRENT_PORT
65 };
66
67 #define gst_tcp_server_src_parent_class parent_class
68 G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
69
70 static void gst_tcp_server_src_finalize (GObject * gobject);
71
72 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
73 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
74 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
75 static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
76 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
77     GstBuffer ** buf);
78
79 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
80     const GValue * value, GParamSpec * pspec);
81 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
82     GValue * value, GParamSpec * pspec);
83
84 static void
85 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
86 {
87   GObjectClass *gobject_class;
88   GstElementClass *gstelement_class;
89   GstBaseSrcClass *gstbasesrc_class;
90   GstPushSrcClass *gstpush_src_class;
91
92   gobject_class = (GObjectClass *) klass;
93   gstelement_class = (GstElementClass *) klass;
94   gstbasesrc_class = (GstBaseSrcClass *) klass;
95   gstpush_src_class = (GstPushSrcClass *) klass;
96
97   gobject_class->set_property = gst_tcp_server_src_set_property;
98   gobject_class->get_property = gst_tcp_server_src_get_property;
99   gobject_class->finalize = gst_tcp_server_src_finalize;
100
101   g_object_class_install_property (gobject_class, PROP_HOST,
102       g_param_spec_string ("host", "Host", "The hostname to listen as",
103           TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
104   g_object_class_install_property (gobject_class, PROP_PORT,
105       g_param_spec_int ("port", "Port",
106           "The port to listen to (0=random available port)",
107           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
108           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109   /**
110    * GstTCPServerSrc:current-port:
111    *
112    * The port number the socket is currently bound to. Applications can use
113    * this property to retrieve the port number actually bound to in case
114    * the port requested was 0 (=allocate a random available port).
115    *
116    * Since: 1.0.2
117    **/
118   g_object_class_install_property (gobject_class, PROP_CURRENT_PORT,
119       g_param_spec_int ("current-port", "current-port",
120           "The port number the socket is currently bound to", 0,
121           TCP_HIGHEST_PORT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
122
123   gst_element_class_add_pad_template (gstelement_class,
124       gst_static_pad_template_get (&srctemplate));
125
126   gst_element_class_set_static_metadata (gstelement_class,
127       "TCP server source", "Source/Network",
128       "Receive data as a server over the network via TCP",
129       "Thomas Vander Stichele <thomas at apestaart dot org>");
130
131   gstbasesrc_class->start = gst_tcp_server_src_start;
132   gstbasesrc_class->stop = gst_tcp_server_src_stop;
133   gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
134   gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
135
136   gstpush_src_class->create = gst_tcp_server_src_create;
137
138   GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
139       "TCP Server Source");
140 }
141
142 static void
143 gst_tcp_server_src_init (GstTCPServerSrc * src)
144 {
145   src->server_port = TCP_DEFAULT_PORT;
146   src->host = g_strdup (TCP_DEFAULT_HOST);
147   src->server_socket = NULL;
148   src->client_socket = NULL;
149   src->cancellable = g_cancellable_new ();
150
151   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
152 }
153
154 static void
155 gst_tcp_server_src_finalize (GObject * gobject)
156 {
157   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
158
159   if (src->cancellable)
160     g_object_unref (src->cancellable);
161   src->cancellable = NULL;
162   if (src->server_socket)
163     g_object_unref (src->server_socket);
164   src->server_socket = NULL;
165   if (src->client_socket)
166     g_object_unref (src->client_socket);
167   src->client_socket = NULL;
168
169   g_free (src->host);
170   src->host = NULL;
171
172   G_OBJECT_CLASS (parent_class)->finalize (gobject);
173 }
174
175 static GstFlowReturn
176 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
177 {
178   GstTCPServerSrc *src;
179   GstFlowReturn ret = GST_FLOW_OK;
180   gssize rret, avail;
181   gsize read;
182   GError *err = NULL;
183   GstMapInfo map;
184
185   src = GST_TCP_SERVER_SRC (psrc);
186
187   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
188     goto wrong_state;
189
190   if (!src->client_socket) {
191     /* wait on server socket for connections */
192     src->client_socket =
193         g_socket_accept (src->server_socket, src->cancellable, &err);
194     if (!src->client_socket)
195       goto accept_error;
196     /* now read from the socket. */
197   }
198
199   /* if we have a client, wait for read */
200   GST_LOG_OBJECT (src, "asked for a buffer");
201
202   /* read the buffer header */
203   avail = g_socket_get_available_bytes (src->client_socket);
204   if (avail < 0) {
205     goto get_available_error;
206   } else if (avail == 0) {
207     GIOCondition condition;
208
209     if (!g_socket_condition_wait (src->client_socket,
210             G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
211       goto select_error;
212
213     condition =
214         g_socket_condition_check (src->client_socket,
215         G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
216
217     if ((condition & G_IO_ERR)) {
218       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
219           ("Socket in error state"));
220       *outbuf = NULL;
221       ret = GST_FLOW_ERROR;
222       goto done;
223     } else if ((condition & G_IO_HUP)) {
224       GST_DEBUG_OBJECT (src, "Connection closed");
225       *outbuf = NULL;
226       ret = GST_FLOW_EOS;
227       goto done;
228     }
229     avail = g_socket_get_available_bytes (src->client_socket);
230     if (avail < 0)
231       goto get_available_error;
232   }
233
234   if (avail > 0) {
235     read = MIN (avail, MAX_READ_SIZE);
236     *outbuf = gst_buffer_new_and_alloc (read);
237     gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
238     rret =
239         g_socket_receive (src->client_socket, (gchar *) map.data, read,
240         src->cancellable, &err);
241   } else {
242     /* Connection closed */
243     rret = 0;
244     *outbuf = NULL;
245     read = 0;
246   }
247
248   if (rret == 0) {
249     GST_DEBUG_OBJECT (src, "Connection closed");
250     ret = GST_FLOW_EOS;
251     if (*outbuf) {
252       gst_buffer_unmap (*outbuf, &map);
253       gst_buffer_unref (*outbuf);
254     }
255     *outbuf = NULL;
256   } else if (rret < 0) {
257     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
258       ret = GST_FLOW_FLUSHING;
259       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
260     } else {
261       ret = GST_FLOW_ERROR;
262       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
263           ("Failed to read from socket: %s", err->message));
264     }
265     gst_buffer_unmap (*outbuf, &map);
266     gst_buffer_unref (*outbuf);
267     *outbuf = NULL;
268   } else {
269     ret = GST_FLOW_OK;
270     gst_buffer_unmap (*outbuf, &map);
271     gst_buffer_resize (*outbuf, 0, rret);
272
273     GST_LOG_OBJECT (src,
274         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
275         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
276         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
277         gst_buffer_get_size (*outbuf),
278         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
279         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
280         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
281   }
282   g_clear_error (&err);
283
284 done:
285   return ret;
286
287 wrong_state:
288   {
289     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
290     return GST_FLOW_FLUSHING;
291   }
292 accept_error:
293   {
294     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
295       GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
296     } else {
297       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
298           ("Failed to accept client: %s", err->message));
299     }
300     g_clear_error (&err);
301     return GST_FLOW_ERROR;
302   }
303 select_error:
304   {
305     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
306         ("Select failed: %s", err->message));
307     g_clear_error (&err);
308     return GST_FLOW_ERROR;
309   }
310 get_available_error:
311   {
312     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
313         ("Failed to get available bytes from socket"));
314     return GST_FLOW_ERROR;
315   }
316 }
317
318 static void
319 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
320     const GValue * value, GParamSpec * pspec)
321 {
322   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
323
324   switch (prop_id) {
325     case PROP_HOST:
326       if (!g_value_get_string (value)) {
327         g_warning ("host property cannot be NULL");
328         break;
329       }
330       g_free (tcpserversrc->host);
331       tcpserversrc->host = g_strdup (g_value_get_string (value));
332       break;
333     case PROP_PORT:
334       tcpserversrc->server_port = g_value_get_int (value);
335       break;
336
337     default:
338       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
339       break;
340   }
341 }
342
343 static void
344 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
345     GValue * value, GParamSpec * pspec)
346 {
347   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
348
349   switch (prop_id) {
350     case PROP_HOST:
351       g_value_set_string (value, tcpserversrc->host);
352       break;
353     case PROP_PORT:
354       g_value_set_int (value, tcpserversrc->server_port);
355       break;
356     case PROP_CURRENT_PORT:
357       g_value_set_int (value, g_atomic_int_get (&tcpserversrc->current_port));
358       break;
359     default:
360       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
361       break;
362   }
363 }
364
365 /* set up server */
366 static gboolean
367 gst_tcp_server_src_start (GstBaseSrc * bsrc)
368 {
369   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
370   GError *err = NULL;
371   GInetAddress *addr;
372   GSocketAddress *saddr;
373   GResolver *resolver;
374   gint bound_port = 0;
375
376   /* look up name if we need to */
377   addr = g_inet_address_new_from_string (src->host);
378   if (!addr) {
379     GList *results;
380
381     resolver = g_resolver_get_default ();
382
383     results =
384         g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
385     if (!results)
386       goto name_resolve;
387     addr = G_INET_ADDRESS (g_object_ref (results->data));
388
389     g_resolver_free_addresses (results);
390     g_object_unref (resolver);
391   }
392 #ifndef GST_DISABLE_GST_DEBUG
393   {
394     gchar *ip = g_inet_address_to_string (addr);
395
396     GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
397     g_free (ip);
398   }
399 #endif
400
401   saddr = g_inet_socket_address_new (addr, src->server_port);
402   g_object_unref (addr);
403
404   /* create the server listener socket */
405   src->server_socket =
406       g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
407       G_SOCKET_PROTOCOL_TCP, &err);
408   if (!src->server_socket)
409     goto no_socket;
410
411   GST_DEBUG_OBJECT (src, "opened receiving server socket");
412
413   /* bind it */
414   GST_DEBUG_OBJECT (src, "binding server socket to address");
415   if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
416     goto bind_failed;
417
418   g_object_unref (saddr);
419
420   GST_DEBUG_OBJECT (src, "listening on server socket");
421
422   g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
423
424   if (!g_socket_listen (src->server_socket, &err))
425     goto listen_failed;
426
427   GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
428
429   if (src->server_port == 0) {
430     saddr = g_socket_get_local_address (src->server_socket, NULL);
431     bound_port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
432     g_object_unref (saddr);
433   } else {
434     bound_port = src->server_port;
435   }
436
437   GST_DEBUG_OBJECT (src, "listening on port %d", bound_port);
438
439   g_atomic_int_set (&src->current_port, bound_port);
440   g_object_notify (G_OBJECT (src), "current-port");
441
442   return TRUE;
443
444   /* ERRORS */
445 no_socket:
446   {
447     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
448         ("Failed to create socket: %s", err->message));
449     g_clear_error (&err);
450     g_object_unref (saddr);
451     return FALSE;
452   }
453 name_resolve:
454   {
455     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
456       GST_DEBUG_OBJECT (src, "Cancelled name resolval");
457     } else {
458       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
459           ("Failed to resolve host '%s': %s", src->host, err->message));
460     }
461     g_clear_error (&err);
462     g_object_unref (resolver);
463     return FALSE;
464   }
465 bind_failed:
466   {
467     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
468       GST_DEBUG_OBJECT (src, "Cancelled binding");
469     } else {
470       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
471           ("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
472               err->message));
473     }
474     g_clear_error (&err);
475     g_object_unref (saddr);
476     gst_tcp_server_src_stop (GST_BASE_SRC (src));
477     return FALSE;
478   }
479 listen_failed:
480   {
481     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
482       GST_DEBUG_OBJECT (src, "Cancelled listening");
483     } else {
484       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
485           ("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
486               err->message));
487     }
488     g_clear_error (&err);
489     gst_tcp_server_src_stop (GST_BASE_SRC (src));
490     return FALSE;
491   }
492 }
493
494 static gboolean
495 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
496 {
497   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
498   GError *err = NULL;
499
500   if (src->client_socket) {
501     GST_DEBUG_OBJECT (src, "closing socket");
502
503     if (!g_socket_close (src->client_socket, &err)) {
504       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
505       g_clear_error (&err);
506     }
507     g_object_unref (src->client_socket);
508     src->client_socket = NULL;
509   }
510
511   if (src->server_socket) {
512     GST_DEBUG_OBJECT (src, "closing socket");
513
514     if (!g_socket_close (src->server_socket, &err)) {
515       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
516       g_clear_error (&err);
517     }
518     g_object_unref (src->server_socket);
519     src->server_socket = NULL;
520
521     g_atomic_int_set (&src->current_port, 0);
522     g_object_notify (G_OBJECT (src), "current-port");
523   }
524
525   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
526
527   return TRUE;
528 }
529
530 /* will be called only between calls to start() and stop() */
531 static gboolean
532 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
533 {
534   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
535
536   g_cancellable_cancel (src->cancellable);
537
538   return TRUE;
539 }
540
541 static gboolean
542 gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
543 {
544   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
545
546   g_cancellable_reset (src->cancellable);
547
548   return TRUE;
549 }