udpsrc: Ignore G_IO_ERROR_CONNECTION_CLOSED when receiving data
[platform/upstream/gst-plugins-good.git] / gst / udp / gstdynudpsink.c
1 /* GStreamer
2  * Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  * Copyright (C) <2006> Joni Valtanen <joni.valtanen@movial.fi>
5  * Copyright (C) <2012> Collabora Ltd.
6  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 #ifdef HAVE_CONFIG_H
25 #include "config.h"
26 #endif
27 #include "gstdynudpsink.h"
28
29 #include <gst/net/gstnetaddressmeta.h>
30
31 GST_DEBUG_CATEGORY_STATIC (dynudpsink_debug);
32 #define GST_CAT_DEFAULT (dynudpsink_debug)
33
34 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
35     GST_PAD_SINK,
36     GST_PAD_ALWAYS,
37     GST_STATIC_CAPS_ANY);
38
39 /* DynUDPSink signals and args */
40 enum
41 {
42   /* methods */
43   SIGNAL_GET_STATS,
44
45   /* signals */
46
47   /* FILL ME */
48   LAST_SIGNAL
49 };
50
51 #define UDP_DEFAULT_SOCKET              NULL
52 #define UDP_DEFAULT_CLOSE_SOCKET        TRUE
53 #define UDP_DEFAULT_BIND_ADDRESS        NULL
54 #define UDP_DEFAULT_BIND_PORT           0
55
56 enum
57 {
58   PROP_0,
59   PROP_SOCKET,
60   PROP_SOCKET_V6,
61   PROP_CLOSE_SOCKET,
62   PROP_BIND_ADDRESS,
63   PROP_BIND_PORT
64 };
65
66 static void gst_dynudpsink_finalize (GObject * object);
67
68 static GstFlowReturn gst_dynudpsink_render (GstBaseSink * sink,
69     GstBuffer * buffer);
70 static gboolean gst_dynudpsink_stop (GstBaseSink * bsink);
71 static gboolean gst_dynudpsink_start (GstBaseSink * bsink);
72 static gboolean gst_dynudpsink_unlock (GstBaseSink * bsink);
73 static gboolean gst_dynudpsink_unlock_stop (GstBaseSink * bsink);
74
75 static void gst_dynudpsink_set_property (GObject * object, guint prop_id,
76     const GValue * value, GParamSpec * pspec);
77 static void gst_dynudpsink_get_property (GObject * object, guint prop_id,
78     GValue * value, GParamSpec * pspec);
79 static GstStructure *gst_dynudpsink_get_stats (GstDynUDPSink * sink,
80     const gchar * host, gint port);
81
82 static guint gst_dynudpsink_signals[LAST_SIGNAL] = { 0 };
83
84 #define gst_dynudpsink_parent_class parent_class
85 G_DEFINE_TYPE (GstDynUDPSink, gst_dynudpsink, GST_TYPE_BASE_SINK);
86
87 static void
88 gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
89 {
90   GObjectClass *gobject_class;
91   GstElementClass *gstelement_class;
92   GstBaseSinkClass *gstbasesink_class;
93
94   gobject_class = (GObjectClass *) klass;
95   gstelement_class = (GstElementClass *) klass;
96   gstbasesink_class = (GstBaseSinkClass *) klass;
97
98   parent_class = g_type_class_peek_parent (klass);
99
100   gobject_class->set_property = gst_dynudpsink_set_property;
101   gobject_class->get_property = gst_dynudpsink_get_property;
102   gobject_class->finalize = gst_dynudpsink_finalize;
103
104   gst_dynudpsink_signals[SIGNAL_GET_STATS] =
105       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
106       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
107       G_STRUCT_OFFSET (GstDynUDPSinkClass, get_stats),
108       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 2,
109       G_TYPE_STRING, G_TYPE_INT);
110
111   g_object_class_install_property (gobject_class, PROP_SOCKET,
112       g_param_spec_object ("socket", "Socket",
113           "Socket to use for UDP sending. (NULL == allocate)",
114           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115   g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
116       g_param_spec_object ("socket-v6", "Socket IPv6",
117           "Socket to use for UDPv6 sending. (NULL == allocate)",
118           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
119   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
120       g_param_spec_boolean ("close-socket", "Close socket",
121           "Close socket if passed as property on state change",
122           UDP_DEFAULT_CLOSE_SOCKET,
123           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
124   g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
125       g_param_spec_string ("bind-address", "Bind Address",
126           "Address to bind the socket to", UDP_DEFAULT_BIND_ADDRESS,
127           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128   g_object_class_install_property (gobject_class, PROP_BIND_PORT,
129       g_param_spec_int ("bind-port", "Bind Port",
130           "Port to bind the socket to", 0, G_MAXUINT16,
131           UDP_DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132
133   gst_element_class_add_pad_template (gstelement_class,
134       gst_static_pad_template_get (&sink_template));
135
136   gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
137       "Sink/Network",
138       "Send data over the network via UDP with packet destinations picked up "
139       "dynamically from meta on the buffers passed",
140       "Philippe Khalaf <burger@speedy.org>");
141
142   gstbasesink_class->render = gst_dynudpsink_render;
143   gstbasesink_class->start = gst_dynudpsink_start;
144   gstbasesink_class->stop = gst_dynudpsink_stop;
145   gstbasesink_class->unlock = gst_dynudpsink_unlock;
146   gstbasesink_class->unlock_stop = gst_dynudpsink_unlock_stop;
147
148   klass->get_stats = gst_dynudpsink_get_stats;
149
150   GST_DEBUG_CATEGORY_INIT (dynudpsink_debug, "dynudpsink", 0, "UDP sink");
151 }
152
153 static void
154 gst_dynudpsink_init (GstDynUDPSink * sink)
155 {
156   sink->socket = UDP_DEFAULT_SOCKET;
157   sink->socket_v6 = UDP_DEFAULT_SOCKET;
158   sink->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
159   sink->external_socket = FALSE;
160   sink->bind_address = UDP_DEFAULT_BIND_ADDRESS;
161   sink->bind_port = UDP_DEFAULT_BIND_PORT;
162
163   sink->used_socket = NULL;
164   sink->used_socket_v6 = NULL;
165 }
166
167 static void
168 gst_dynudpsink_finalize (GObject * object)
169 {
170   GstDynUDPSink *sink;
171
172   sink = GST_DYNUDPSINK (object);
173
174   if (sink->socket)
175     g_object_unref (sink->socket);
176   sink->socket = NULL;
177
178   if (sink->socket_v6)
179     g_object_unref (sink->socket_v6);
180   sink->socket_v6 = NULL;
181
182   if (sink->used_socket)
183     g_object_unref (sink->used_socket);
184   sink->used_socket = NULL;
185
186   if (sink->used_socket_v6)
187     g_object_unref (sink->used_socket_v6);
188   sink->used_socket_v6 = NULL;
189
190   g_free (sink->bind_address);
191   sink->bind_address = NULL;
192
193   G_OBJECT_CLASS (parent_class)->finalize (object);
194 }
195
196 static GstFlowReturn
197 gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
198 {
199   GstDynUDPSink *sink;
200   gssize ret;
201   GstMapInfo map;
202   GstNetAddressMeta *meta;
203   GSocketAddress *addr;
204   GError *err = NULL;
205   GSocketFamily family;
206   GSocket *socket;
207
208   meta = gst_buffer_get_net_address_meta (buffer);
209
210   if (meta == NULL) {
211     GST_DEBUG ("Received buffer without GstNetAddressMeta, skipping");
212     return GST_FLOW_OK;
213   }
214
215   sink = GST_DYNUDPSINK (bsink);
216
217   /* let's get the address from the metadata */
218   addr = meta->addr;
219
220   family = g_socket_address_get_family (addr);
221   if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
222     goto invalid_family;
223
224   gst_buffer_map (buffer, &map, GST_MAP_READ);
225
226   GST_DEBUG ("about to send %" G_GSIZE_FORMAT " bytes", map.size);
227
228 #ifndef GST_DISABLE_GST_DEBUG
229   {
230     gchar *host;
231
232     host =
233         g_inet_address_to_string (g_inet_socket_address_get_address
234         (G_INET_SOCKET_ADDRESS (addr)));
235     GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %s port %d",
236         map.size, host,
237         g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)));
238     g_free (host);
239   }
240 #endif
241
242   /* Select socket to send from for this address */
243   if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
244     socket = sink->used_socket_v6;
245   else
246     socket = sink->used_socket;
247
248   ret =
249       g_socket_send_to (socket, addr, (gchar *) map.data, map.size,
250       sink->cancellable, &err);
251   gst_buffer_unmap (buffer, &map);
252
253   if (ret < 0)
254     goto send_error;
255
256   GST_DEBUG ("sent %" G_GSSIZE_FORMAT " bytes", ret);
257
258   return GST_FLOW_OK;
259
260 send_error:
261   {
262     GstFlowReturn flow_ret;
263
264     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
265       GST_DEBUG_OBJECT (sink, "send cancelled");
266       flow_ret = GST_FLOW_FLUSHING;
267     } else {
268       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
269           ("send error: %s", err->message));
270       flow_ret = GST_FLOW_ERROR;
271     }
272     g_clear_error (&err);
273     return flow_ret;
274   }
275 invalid_family:
276   {
277     GST_DEBUG ("invalid address family (got %d)", family);
278     return GST_FLOW_ERROR;
279   }
280 }
281
282 static void
283 gst_dynudpsink_set_property (GObject * object, guint prop_id,
284     const GValue * value, GParamSpec * pspec)
285 {
286   GstDynUDPSink *udpsink;
287
288   udpsink = GST_DYNUDPSINK (object);
289
290   switch (prop_id) {
291     case PROP_SOCKET:
292       if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
293           udpsink->close_socket) {
294         GError *err = NULL;
295
296         if (!g_socket_close (udpsink->socket, &err)) {
297           GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
298               err->message);
299           g_clear_error (&err);
300         }
301       }
302       if (udpsink->socket)
303         g_object_unref (udpsink->socket);
304       udpsink->socket = g_value_dup_object (value);
305       GST_DEBUG ("setting socket to %p", udpsink->socket);
306       break;
307     case PROP_SOCKET_V6:
308       if (udpsink->socket_v6 != NULL
309           && udpsink->socket_v6 != udpsink->used_socket_v6
310           && udpsink->close_socket) {
311         GError *err = NULL;
312
313         if (!g_socket_close (udpsink->socket_v6, &err)) {
314           GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
315               err->message);
316           g_clear_error (&err);
317         }
318       }
319       if (udpsink->socket_v6)
320         g_object_unref (udpsink->socket_v6);
321       udpsink->socket_v6 = g_value_dup_object (value);
322       GST_DEBUG ("setting socket v6 to %p", udpsink->socket_v6);
323       break;
324     case PROP_CLOSE_SOCKET:
325       udpsink->close_socket = g_value_get_boolean (value);
326       break;
327     case PROP_BIND_ADDRESS:
328       g_free (udpsink->bind_address);
329       udpsink->bind_address = g_value_dup_string (value);
330       break;
331     case PROP_BIND_PORT:
332       udpsink->bind_port = g_value_get_int (value);
333       break;
334     default:
335       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
336       break;
337   }
338 }
339
340 static void
341 gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value,
342     GParamSpec * pspec)
343 {
344   GstDynUDPSink *udpsink;
345
346   udpsink = GST_DYNUDPSINK (object);
347
348   switch (prop_id) {
349     case PROP_SOCKET:
350       g_value_set_object (value, udpsink->socket);
351       break;
352     case PROP_SOCKET_V6:
353       g_value_set_object (value, udpsink->socket_v6);
354       break;
355     case PROP_CLOSE_SOCKET:
356       g_value_set_boolean (value, udpsink->close_socket);
357       break;
358     case PROP_BIND_ADDRESS:
359       g_value_set_string (value, udpsink->bind_address);
360       break;
361     case PROP_BIND_PORT:
362       g_value_set_int (value, udpsink->bind_port);
363       break;
364     default:
365       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
366       break;
367   }
368 }
369
370 static void
371 gst_dynudpsink_create_cancellable (GstDynUDPSink * sink)
372 {
373   GPollFD pollfd;
374
375   sink->cancellable = g_cancellable_new ();
376   sink->made_cancel_fd = g_cancellable_make_pollfd (sink->cancellable, &pollfd);
377 }
378
379 static void
380 gst_dynudpsink_free_cancellable (GstDynUDPSink * sink)
381 {
382   if (sink->made_cancel_fd) {
383     g_cancellable_release_fd (sink->cancellable);
384     sink->made_cancel_fd = FALSE;
385   }
386   g_object_unref (sink->cancellable);
387   sink->cancellable = NULL;
388 }
389
390 /* create a socket for sending to remote machine */
391 static gboolean
392 gst_dynudpsink_start (GstBaseSink * bsink)
393 {
394   GstDynUDPSink *udpsink;
395   GError *err = NULL;
396
397   udpsink = GST_DYNUDPSINK (bsink);
398
399   gst_dynudpsink_create_cancellable (udpsink);
400
401   udpsink->external_socket = FALSE;
402
403   if (udpsink->socket) {
404     if (g_socket_get_family (udpsink->socket) == G_SOCKET_FAMILY_IPV6) {
405       udpsink->used_socket_v6 = G_SOCKET (g_object_ref (udpsink->socket));
406       udpsink->external_socket = TRUE;
407     } else {
408       udpsink->used_socket = G_SOCKET (g_object_ref (udpsink->socket));
409       udpsink->external_socket = TRUE;
410     }
411   }
412
413   if (udpsink->socket_v6) {
414     g_return_val_if_fail (g_socket_get_family (udpsink->socket) !=
415         G_SOCKET_FAMILY_IPV6, FALSE);
416
417     if (udpsink->used_socket_v6
418         && udpsink->used_socket_v6 != udpsink->socket_v6) {
419       GST_ERROR_OBJECT (udpsink,
420           "Provided different IPv6 sockets in socket and socket-v6 properties");
421       return FALSE;
422     }
423
424     udpsink->used_socket_v6 = G_SOCKET (g_object_ref (udpsink->socket_v6));
425     udpsink->external_socket = TRUE;
426   }
427
428   if (!udpsink->used_socket && !udpsink->used_socket_v6) {
429     GSocketAddress *bind_addr;
430     GInetAddress *bind_iaddr;
431
432     if (udpsink->bind_address) {
433       GSocketFamily family;
434
435       bind_iaddr = g_inet_address_new_from_string (udpsink->bind_address);
436       if (!bind_iaddr) {
437         GList *results;
438         GResolver *resolver;
439
440         resolver = g_resolver_get_default ();
441         results =
442             g_resolver_lookup_by_name (resolver, udpsink->bind_address,
443             udpsink->cancellable, &err);
444         if (!results) {
445           g_object_unref (resolver);
446           goto name_resolve;
447         }
448         bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
449         g_resolver_free_addresses (results);
450         g_object_unref (resolver);
451       }
452
453       bind_addr = g_inet_socket_address_new (bind_iaddr, udpsink->bind_port);
454       g_object_unref (bind_iaddr);
455       family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
456
457       if ((udpsink->used_socket =
458               g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
459                   G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
460         g_object_unref (bind_addr);
461         goto no_socket;
462       }
463
464       g_socket_bind (udpsink->used_socket, bind_addr, TRUE, &err);
465       if (err != NULL)
466         goto bind_error;
467     } else {
468       /* create sender sockets if none available */
469       if ((udpsink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
470                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
471         goto no_socket;
472
473       bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
474       bind_addr = g_inet_socket_address_new (bind_iaddr, 0);
475       g_socket_bind (udpsink->used_socket, bind_addr, TRUE, &err);
476       g_object_unref (bind_addr);
477       g_object_unref (bind_iaddr);
478       if (err != NULL)
479         goto bind_error;
480
481       if ((udpsink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
482                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
483                   &err)) == NULL) {
484         GST_INFO_OBJECT (udpsink, "Failed to create IPv6 socket: %s",
485             err->message);
486         g_clear_error (&err);
487       } else {
488         bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
489         bind_addr = g_inet_socket_address_new (bind_iaddr, 0);
490         g_socket_bind (udpsink->used_socket_v6, bind_addr, TRUE, &err);
491         g_object_unref (bind_addr);
492         g_object_unref (bind_iaddr);
493         if (err != NULL)
494           goto bind_error;
495       }
496     }
497   }
498
499   if (udpsink->used_socket)
500     g_socket_set_broadcast (udpsink->used_socket, TRUE);
501   if (udpsink->used_socket_v6)
502     g_socket_set_broadcast (udpsink->used_socket_v6, TRUE);
503
504   return TRUE;
505
506   /* ERRORS */
507 no_socket:
508   {
509     GST_ERROR_OBJECT (udpsink, "Failed to create IPv4 socket: %s",
510         err->message);
511     g_clear_error (&err);
512     return FALSE;
513   }
514 bind_error:
515   {
516     GST_ELEMENT_ERROR (udpsink, RESOURCE, FAILED, (NULL),
517         ("Failed to bind socket: %s", err->message));
518     g_clear_error (&err);
519     return FALSE;
520   }
521 name_resolve:
522   {
523     GST_ELEMENT_ERROR (udpsink, RESOURCE, FAILED, (NULL),
524         ("Failed to resolve bind address %s: %s", udpsink->bind_address,
525             err->message));
526     g_clear_error (&err);
527     return FALSE;
528   }
529 }
530
531 static GstStructure *
532 gst_dynudpsink_get_stats (GstDynUDPSink * sink, const gchar * host, gint port)
533 {
534   return NULL;
535 }
536
537 static gboolean
538 gst_dynudpsink_stop (GstBaseSink * bsink)
539 {
540   GstDynUDPSink *udpsink;
541
542   udpsink = GST_DYNUDPSINK (bsink);
543
544   if (udpsink->used_socket) {
545     if (udpsink->close_socket || !udpsink->external_socket) {
546       GError *err = NULL;
547
548       if (!g_socket_close (udpsink->used_socket, &err)) {
549         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
550         g_clear_error (&err);
551       }
552     }
553
554     g_object_unref (udpsink->used_socket);
555     udpsink->used_socket = NULL;
556   }
557
558   if (udpsink->used_socket_v6) {
559     if (udpsink->close_socket || !udpsink->external_socket) {
560       GError *err = NULL;
561
562       if (!g_socket_close (udpsink->used_socket_v6, &err)) {
563         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
564         g_clear_error (&err);
565       }
566     }
567
568     g_object_unref (udpsink->used_socket_v6);
569     udpsink->used_socket_v6 = NULL;
570   }
571
572   gst_dynudpsink_free_cancellable (udpsink);
573
574   return TRUE;
575 }
576
577 static gboolean
578 gst_dynudpsink_unlock (GstBaseSink * bsink)
579 {
580   GstDynUDPSink *udpsink;
581
582   udpsink = GST_DYNUDPSINK (bsink);
583
584   g_cancellable_cancel (udpsink->cancellable);
585
586   return TRUE;
587 }
588
589 static gboolean
590 gst_dynudpsink_unlock_stop (GstBaseSink * bsink)
591 {
592   GstDynUDPSink *udpsink;
593
594   udpsink = GST_DYNUDPSINK (bsink);
595
596   gst_dynudpsink_free_cancellable (udpsink);
597   gst_dynudpsink_create_cancellable (udpsink);
598
599   return TRUE;
600 }