980edd4c51cda94f73b0e1450c2916501e32e68c
[platform/upstream/gstreamer.git] / gst / tcp / gsttcpclientsrc.c
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-tcpclientsrc
25  * @title: tcpclientsrc
26  * @see_also: #tcpclientsink
27  *
28  * ## Example launch line (server):
29  * |[
30  * nc -l -p 3000
31  * ]|
32  * ## Example launch line (client):
33  * |[
34  * gst-launch-1.0 tcpclientsrc port=3000 ! fdsink fd=2
35  * ]|
36  *  everything you type in the server is shown on the client.
37  * If you want to detect network failures and/or limit the time your tcp client
38  * keeps waiting for data from server setting a timeout value can be useful.
39  *
40  */
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46 #include <gst/gst-i18n-plugin.h>
47 #include "gsttcpelements.h"
48 #include "gsttcpclientsrc.h"
49 #include "gsttcpsrcstats.h"
50 #include "gsttcp.h"
51
52 GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
53 #define GST_CAT_DEFAULT tcpclientsrc_debug
54
55 #define MAX_READ_SIZE                   4 * 1024
56 #define TCP_DEFAULT_TIMEOUT             0
57
58
59 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
60     GST_PAD_SRC,
61     GST_PAD_ALWAYS,
62     GST_STATIC_CAPS_ANY);
63
64
65 enum
66 {
67   PROP_0,
68   PROP_HOST,
69   PROP_PORT,
70   PROP_TIMEOUT,
71   PROP_STATS,
72 };
73
74 #define gst_tcp_client_src_parent_class parent_class
75 G_DEFINE_TYPE (GstTCPClientSrc, gst_tcp_client_src, GST_TYPE_PUSH_SRC);
76 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tcpclientsrc, "tcpclientsrc",
77     GST_RANK_NONE, GST_TYPE_TCP_CLIENT_SRC, tcp_element_init (plugin));
78
79 static void gst_tcp_client_src_finalize (GObject * gobject);
80
81 static GstCaps *gst_tcp_client_src_getcaps (GstBaseSrc * psrc,
82     GstCaps * filter);
83
84 static GstFlowReturn gst_tcp_client_src_create (GstPushSrc * psrc,
85     GstBuffer ** outbuf);
86 static gboolean gst_tcp_client_src_stop (GstBaseSrc * bsrc);
87 static gboolean gst_tcp_client_src_start (GstBaseSrc * bsrc);
88 static gboolean gst_tcp_client_src_unlock (GstBaseSrc * bsrc);
89 static gboolean gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc);
90
91 static void gst_tcp_client_src_set_property (GObject * object, guint prop_id,
92     const GValue * value, GParamSpec * pspec);
93 static void gst_tcp_client_src_get_property (GObject * object, guint prop_id,
94     GValue * value, GParamSpec * pspec);
95 static GstStructure *gst_tcp_client_src_get_stats (GstTCPClientSrc * src);
96
97 static void
98 gst_tcp_client_src_class_init (GstTCPClientSrcClass * klass)
99 {
100   GObjectClass *gobject_class;
101   GstElementClass *gstelement_class;
102   GstBaseSrcClass *gstbasesrc_class;
103   GstPushSrcClass *gstpush_src_class;
104
105   gobject_class = (GObjectClass *) klass;
106   gstelement_class = (GstElementClass *) klass;
107   gstbasesrc_class = (GstBaseSrcClass *) klass;
108   gstpush_src_class = (GstPushSrcClass *) klass;
109
110   gobject_class->set_property = gst_tcp_client_src_set_property;
111   gobject_class->get_property = gst_tcp_client_src_get_property;
112   gobject_class->finalize = gst_tcp_client_src_finalize;
113
114   g_object_class_install_property (gobject_class, PROP_HOST,
115       g_param_spec_string ("host", "Host",
116           "The host IP address to receive packets from", TCP_DEFAULT_HOST,
117           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118   g_object_class_install_property (gobject_class, PROP_PORT,
119       g_param_spec_int ("port", "Port", "The port to receive packets from", 0,
120           TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
121           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
122
123   /**
124    * GstTCPClientSrc::timeout;
125    *
126    * Value in seconds to timeout a blocking I/O (0 = No timeout).
127    *
128    * Since: 1.12
129    */
130   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
131       g_param_spec_uint ("timeout", "timeout",
132           "Value in seconds to timeout a blocking I/O. 0 = No timeout. ", 0,
133           G_MAXUINT, TCP_DEFAULT_TIMEOUT,
134           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135
136   /**
137    * GstTCPClientSrc::stats:
138    *
139    * Sends a GstStructure with statistics. We count bytes-received in a
140    * platform-independent way and the rest via the tcp_info struct, if it's
141    * available. The OS takes care of the TCP layer for us so we can't know it
142    * from here.
143    *
144    * Struct members:
145    *
146    * bytes-received (uint64): Total bytes received (platform-independent)
147    * reordering (uint): Amount of reordering (linux-specific)
148    * unacked (uint): Un-acked packets (linux-specific)
149    * sacked (uint): Selective acked packets (linux-specific)
150    * lost (uint): Lost packets (linux-specific)
151    * retrans (uint): Retransmits (linux-specific)
152    * fackets (uint): Forward acknowledgement (linux-specific)
153    *
154    * Since: 1.18
155    */
156   g_object_class_install_property (gobject_class, PROP_STATS,
157       g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
158           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
159
160   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
161
162   gst_element_class_set_static_metadata (gstelement_class,
163       "TCP client source", "Source/Network",
164       "Receive data as a client over the network via TCP",
165       "Thomas Vander Stichele <thomas at apestaart dot org>");
166
167   gstbasesrc_class->get_caps = gst_tcp_client_src_getcaps;
168   gstbasesrc_class->start = gst_tcp_client_src_start;
169   gstbasesrc_class->stop = gst_tcp_client_src_stop;
170   gstbasesrc_class->unlock = gst_tcp_client_src_unlock;
171   gstbasesrc_class->unlock_stop = gst_tcp_client_src_unlock_stop;
172
173   gstpush_src_class->create = gst_tcp_client_src_create;
174
175   GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
176       "TCP Client Source");
177 }
178
179 static void
180 gst_tcp_client_src_init (GstTCPClientSrc * this)
181 {
182   this->port = TCP_DEFAULT_PORT;
183   this->host = g_strdup (TCP_DEFAULT_HOST);
184   this->timeout = TCP_DEFAULT_TIMEOUT;
185   this->socket = NULL;
186   this->cancellable = g_cancellable_new ();
187
188   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
189 }
190
191 static void
192 gst_tcp_client_src_finalize (GObject * gobject)
193 {
194   GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject);
195
196   if (this->cancellable)
197     g_object_unref (this->cancellable);
198   this->cancellable = NULL;
199   if (this->socket)
200     g_object_unref (this->socket);
201   this->socket = NULL;
202   g_free (this->host);
203   this->host = NULL;
204   gst_clear_structure (&this->stats);
205
206   G_OBJECT_CLASS (parent_class)->finalize (gobject);
207 }
208
209 static GstCaps *
210 gst_tcp_client_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
211 {
212   GstTCPClientSrc *src;
213   GstCaps *caps = NULL;
214
215   src = GST_TCP_CLIENT_SRC (bsrc);
216
217   caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
218
219   GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps);
220   g_assert (GST_IS_CAPS (caps));
221   return caps;
222 }
223
224 static GstFlowReturn
225 gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
226 {
227   GstTCPClientSrc *src;
228   GstFlowReturn ret = GST_FLOW_OK;
229   gssize rret;
230   GError *err = NULL;
231   GstMapInfo map;
232   gssize avail, read;
233
234   src = GST_TCP_CLIENT_SRC (psrc);
235
236   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN))
237     goto wrong_state;
238
239   GST_LOG_OBJECT (src, "asked for a buffer");
240
241   /* read the buffer header */
242   avail = g_socket_get_available_bytes (src->socket);
243   if (avail < 0) {
244     goto get_available_error;
245   } else if (avail == 0) {
246     GIOCondition condition;
247
248     if (!g_socket_condition_wait (src->socket,
249             G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
250       goto select_error;
251
252     condition =
253         g_socket_condition_check (src->socket,
254         G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
255
256     if ((condition & G_IO_ERR)) {
257       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
258           ("Socket in error state"));
259       *outbuf = NULL;
260       ret = GST_FLOW_ERROR;
261       goto done;
262     } else if ((condition & G_IO_HUP)) {
263       GST_DEBUG_OBJECT (src, "Connection closed");
264       *outbuf = NULL;
265       ret = GST_FLOW_EOS;
266       goto done;
267     }
268     avail = g_socket_get_available_bytes (src->socket);
269     if (avail < 0)
270       goto get_available_error;
271   }
272
273   if (avail > 0) {
274     read = MIN (avail, MAX_READ_SIZE);
275     *outbuf = gst_buffer_new_and_alloc (read);
276     gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
277     rret =
278         g_socket_receive (src->socket, (gchar *) map.data, read,
279         src->cancellable, &err);
280   } else {
281     /* Connection closed */
282     *outbuf = NULL;
283     read = 0;
284     rret = 0;
285   }
286
287   if (rret == 0) {
288     GST_DEBUG_OBJECT (src, "Connection closed");
289     ret = GST_FLOW_EOS;
290     if (*outbuf) {
291       gst_buffer_unmap (*outbuf, &map);
292       gst_buffer_unref (*outbuf);
293     }
294     *outbuf = NULL;
295   } else if (rret < 0) {
296     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
297       ret = GST_FLOW_FLUSHING;
298       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
299     } else {
300       ret = GST_FLOW_ERROR;
301       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
302           ("Failed to read from socket: %s", err->message));
303     }
304     gst_buffer_unmap (*outbuf, &map);
305     gst_buffer_unref (*outbuf);
306     *outbuf = NULL;
307   } else {
308     ret = GST_FLOW_OK;
309     gst_buffer_unmap (*outbuf, &map);
310     gst_buffer_resize (*outbuf, 0, rret);
311     src->bytes_received += read;
312
313     GST_LOG_OBJECT (src,
314         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
315         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
316         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
317         gst_buffer_get_size (*outbuf),
318         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
319         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
320         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
321   }
322   g_clear_error (&err);
323
324 done:
325   return ret;
326
327 select_error:
328   {
329     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
330       GST_DEBUG_OBJECT (src, "Cancelled");
331       ret = GST_FLOW_FLUSHING;
332     } else {
333       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
334           ("Select failed: %s", err->message));
335       ret = GST_FLOW_ERROR;
336     }
337     g_clear_error (&err);
338     return ret;
339   }
340 get_available_error:
341   {
342     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
343         ("Failed to get available bytes from socket"));
344     return GST_FLOW_ERROR;
345   }
346 wrong_state:
347   {
348     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
349     return GST_FLOW_FLUSHING;
350   }
351 }
352
353 static void
354 gst_tcp_client_src_set_property (GObject * object, guint prop_id,
355     const GValue * value, GParamSpec * pspec)
356 {
357   GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object);
358
359   switch (prop_id) {
360     case PROP_HOST:
361       if (!g_value_get_string (value)) {
362         g_warning ("host property cannot be NULL");
363         break;
364       }
365       g_free (tcpclientsrc->host);
366       tcpclientsrc->host = g_value_dup_string (value);
367       break;
368     case PROP_PORT:
369       tcpclientsrc->port = g_value_get_int (value);
370       break;
371     case PROP_TIMEOUT:
372       tcpclientsrc->timeout = g_value_get_uint (value);
373       break;
374
375     default:
376       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
377       break;
378   }
379 }
380
381 static void
382 gst_tcp_client_src_get_property (GObject * object, guint prop_id,
383     GValue * value, GParamSpec * pspec)
384 {
385   GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object);
386
387   switch (prop_id) {
388     case PROP_HOST:
389       g_value_set_string (value, tcpclientsrc->host);
390       break;
391     case PROP_PORT:
392       g_value_set_int (value, tcpclientsrc->port);
393       break;
394     case PROP_TIMEOUT:
395       g_value_set_uint (value, tcpclientsrc->timeout);
396       break;
397     case PROP_STATS:
398       g_value_take_boxed (value, gst_tcp_client_src_get_stats (tcpclientsrc));
399       break;
400     default:
401       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
402       break;
403   }
404 }
405
406 /* create a socket for connecting to remote server */
407 static gboolean
408 gst_tcp_client_src_start (GstBaseSrc * bsrc)
409 {
410   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
411   GError *err = NULL;
412   GInetAddress *addr;
413   GSocketAddress *saddr;
414   GResolver *resolver;
415
416   src->bytes_received = 0;
417   gst_clear_structure (&src->stats);
418
419   /* look up name if we need to */
420   addr = g_inet_address_new_from_string (src->host);
421   if (!addr) {
422     GList *results;
423
424     resolver = g_resolver_get_default ();
425
426     results =
427         g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
428     if (!results)
429       goto name_resolve;
430     addr = G_INET_ADDRESS (g_object_ref (results->data));
431
432     g_resolver_free_addresses (results);
433     g_object_unref (resolver);
434   }
435 #ifndef GST_DISABLE_GST_DEBUG
436   {
437     gchar *ip = g_inet_address_to_string (addr);
438
439     GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
440     g_free (ip);
441   }
442 #endif
443
444   saddr = g_inet_socket_address_new (addr, src->port);
445   g_object_unref (addr);
446
447   /* create receiving client socket */
448   GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
449       src->host, src->port);
450
451   src->socket =
452       g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
453       G_SOCKET_PROTOCOL_TCP, &err);
454   if (!src->socket)
455     goto no_socket;
456
457   g_socket_set_timeout (src->socket, src->timeout);
458
459   GST_DEBUG_OBJECT (src, "opened receiving client socket");
460   GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
461
462   /* connect to server */
463   if (!g_socket_connect (src->socket, saddr, src->cancellable, &err))
464     goto connect_failed;
465
466   g_object_unref (saddr);
467
468   return TRUE;
469
470 no_socket:
471   {
472     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
473         ("Failed to create socket: %s", err->message));
474     g_clear_error (&err);
475     g_object_unref (saddr);
476     return FALSE;
477   }
478 name_resolve:
479   {
480     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
481       GST_DEBUG_OBJECT (src, "Cancelled name resolval");
482     } else {
483       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
484           ("Failed to resolve host '%s': %s", src->host, err->message));
485     }
486     g_clear_error (&err);
487     g_object_unref (resolver);
488     return FALSE;
489   }
490 connect_failed:
491   {
492     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
493       GST_DEBUG_OBJECT (src, "Cancelled connecting");
494     } else {
495       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
496           ("Failed to connect to host '%s:%d': %s", src->host, src->port,
497               err->message));
498     }
499     g_clear_error (&err);
500     g_object_unref (saddr);
501     gst_tcp_client_src_stop (GST_BASE_SRC (src));
502     return FALSE;
503   }
504 }
505
506 /* close the socket and associated resources
507  * unset OPEN flag
508  * used both to recover from errors and go to NULL state */
509 static gboolean
510 gst_tcp_client_src_stop (GstBaseSrc * bsrc)
511 {
512   GstTCPClientSrc *src;
513   GError *err = NULL;
514
515   src = GST_TCP_CLIENT_SRC (bsrc);
516
517   if (src->socket) {
518     GST_DEBUG_OBJECT (src, "closing socket");
519
520     src->stats = gst_tcp_client_src_get_stats (src);
521
522     if (!g_socket_close (src->socket, &err)) {
523       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
524       g_clear_error (&err);
525     }
526     g_object_unref (src->socket);
527     src->socket = NULL;
528   }
529
530   GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
531
532   return TRUE;
533 }
534
535 /* will be called only between calls to start() and stop() */
536 static gboolean
537 gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
538 {
539   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
540
541   GST_DEBUG_OBJECT (src, "set to flushing");
542   g_cancellable_cancel (src->cancellable);
543
544   return TRUE;
545 }
546
547 /* will be called only between calls to start() and stop() */
548 static gboolean
549 gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc)
550 {
551   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
552
553   GST_DEBUG_OBJECT (src, "unset flushing");
554   g_object_unref (src->cancellable);
555   src->cancellable = g_cancellable_new ();
556
557   return TRUE;
558 }
559
560 static GstStructure *
561 gst_tcp_client_src_get_stats (GstTCPClientSrc * src)
562 {
563   GstStructure *s;
564
565   /* we can't get the values post stop so just return the saved ones */
566   if (src->stats)
567     return gst_structure_copy (src->stats);
568
569   s = gst_structure_new ("GstTCPClientSrcStats",
570       "bytes-received", G_TYPE_UINT64, src->bytes_received, NULL);
571
572   gst_tcp_stats_from_socket (s, src->socket);
573
574   return s;
575 }