dynudpsink: return FLUSHING when sendto got canceled, not an error
[platform/upstream/gst-plugins-good.git] / gst / udp / gstdynudpsink.c
1 /* GStreamer
2  * Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  * Copyright (C) <2006> Joni Valtanen <joni.valtanen@movial.fi>
5  * Copyright (C) <2012> Collabora Ltd.
6  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23
24 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
25  * with newer GLib versions (>= 2.31.0) */
26 #define GLIB_DISABLE_DEPRECATION_WARNINGS
27
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif
31 #include "gstdynudpsink.h"
32
33 #include <gst/net/gstnetaddressmeta.h>
34
35 GST_DEBUG_CATEGORY_STATIC (dynudpsink_debug);
36 #define GST_CAT_DEFAULT (dynudpsink_debug)
37
38 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
39     GST_PAD_SINK,
40     GST_PAD_ALWAYS,
41     GST_STATIC_CAPS_ANY);
42
43 /* DynUDPSink signals and args */
44 enum
45 {
46   /* methods */
47   SIGNAL_GET_STATS,
48
49   /* signals */
50
51   /* FILL ME */
52   LAST_SIGNAL
53 };
54
55 #define UDP_DEFAULT_SOCKET              NULL
56 #define UDP_DEFAULT_CLOSE_SOCKET        TRUE
57 #define UDP_DEFAULT_BIND_ADDRESS        NULL
58 #define UDP_DEFAULT_BIND_PORT           0
59
60 enum
61 {
62   PROP_0,
63   PROP_SOCKET,
64   PROP_SOCKET_V6,
65   PROP_CLOSE_SOCKET,
66   PROP_BIND_ADDRESS,
67   PROP_BIND_PORT
68 };
69
70 static void gst_dynudpsink_finalize (GObject * object);
71
72 static GstFlowReturn gst_dynudpsink_render (GstBaseSink * sink,
73     GstBuffer * buffer);
74 static gboolean gst_dynudpsink_stop (GstBaseSink * bsink);
75 static gboolean gst_dynudpsink_start (GstBaseSink * bsink);
76 static gboolean gst_dynudpsink_unlock (GstBaseSink * bsink);
77 static gboolean gst_dynudpsink_unlock_stop (GstBaseSink * bsink);
78
79 static void gst_dynudpsink_set_property (GObject * object, guint prop_id,
80     const GValue * value, GParamSpec * pspec);
81 static void gst_dynudpsink_get_property (GObject * object, guint prop_id,
82     GValue * value, GParamSpec * pspec);
83 static GstStructure *gst_dynudpsink_get_stats (GstDynUDPSink * sink,
84     const gchar * host, gint port);
85
86 static guint gst_dynudpsink_signals[LAST_SIGNAL] = { 0 };
87
88 #define gst_dynudpsink_parent_class parent_class
89 G_DEFINE_TYPE (GstDynUDPSink, gst_dynudpsink, GST_TYPE_BASE_SINK);
90
91 static void
92 gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
93 {
94   GObjectClass *gobject_class;
95   GstElementClass *gstelement_class;
96   GstBaseSinkClass *gstbasesink_class;
97
98   gobject_class = (GObjectClass *) klass;
99   gstelement_class = (GstElementClass *) klass;
100   gstbasesink_class = (GstBaseSinkClass *) klass;
101
102   parent_class = g_type_class_peek_parent (klass);
103
104   gobject_class->set_property = gst_dynudpsink_set_property;
105   gobject_class->get_property = gst_dynudpsink_get_property;
106   gobject_class->finalize = gst_dynudpsink_finalize;
107
108   gst_dynudpsink_signals[SIGNAL_GET_STATS] =
109       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
110       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
111       G_STRUCT_OFFSET (GstDynUDPSinkClass, get_stats),
112       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 2,
113       G_TYPE_STRING, G_TYPE_INT);
114
115   g_object_class_install_property (gobject_class, PROP_SOCKET,
116       g_param_spec_object ("socket", "Socket",
117           "Socket to use for UDP sending. (NULL == allocate)",
118           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
119   g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
120       g_param_spec_object ("socket-v6", "Socket IPv6",
121           "Socket to use for UDPv6 sending. (NULL == allocate)",
122           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
123   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
124       g_param_spec_boolean ("close-socket", "Close socket",
125           "Close socket if passed as property on state change",
126           UDP_DEFAULT_CLOSE_SOCKET,
127           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128   g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
129       g_param_spec_string ("bind-address", "Bind Address",
130           "Address to bind the socket to", UDP_DEFAULT_BIND_ADDRESS,
131           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132   g_object_class_install_property (gobject_class, PROP_BIND_PORT,
133       g_param_spec_int ("bind-port", "Bind Port",
134           "Port to bind the socket to", 0, G_MAXUINT16,
135           UDP_DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
136
137   gst_element_class_add_pad_template (gstelement_class,
138       gst_static_pad_template_get (&sink_template));
139
140   gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
141       "Sink/Network",
142       "Send data over the network via UDP",
143       "Philippe Khalaf <burger@speedy.org>");
144
145   gstbasesink_class->render = gst_dynudpsink_render;
146   gstbasesink_class->start = gst_dynudpsink_start;
147   gstbasesink_class->stop = gst_dynudpsink_stop;
148   gstbasesink_class->unlock = gst_dynudpsink_unlock;
149   gstbasesink_class->unlock_stop = gst_dynudpsink_unlock_stop;
150
151   klass->get_stats = gst_dynudpsink_get_stats;
152
153   GST_DEBUG_CATEGORY_INIT (dynudpsink_debug, "dynudpsink", 0, "UDP sink");
154 }
155
156 static void
157 gst_dynudpsink_init (GstDynUDPSink * sink)
158 {
159   sink->socket = UDP_DEFAULT_SOCKET;
160   sink->socket_v6 = UDP_DEFAULT_SOCKET;
161   sink->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
162   sink->external_socket = FALSE;
163   sink->bind_address = UDP_DEFAULT_BIND_ADDRESS;
164   sink->bind_port = UDP_DEFAULT_BIND_PORT;
165
166   sink->used_socket = NULL;
167   sink->used_socket_v6 = NULL;
168   sink->cancellable = g_cancellable_new ();
169 }
170
171 static void
172 gst_dynudpsink_finalize (GObject * object)
173 {
174   GstDynUDPSink *sink;
175
176   sink = GST_DYNUDPSINK (object);
177
178   if (sink->cancellable)
179     g_object_unref (sink->cancellable);
180   sink->cancellable = NULL;
181
182   if (sink->socket)
183     g_object_unref (sink->socket);
184   sink->socket = NULL;
185
186   if (sink->socket_v6)
187     g_object_unref (sink->socket_v6);
188   sink->socket_v6 = NULL;
189
190   if (sink->used_socket)
191     g_object_unref (sink->used_socket);
192   sink->used_socket = NULL;
193
194   if (sink->used_socket_v6)
195     g_object_unref (sink->used_socket_v6);
196   sink->used_socket_v6 = NULL;
197
198   g_free (sink->bind_address);
199   sink->bind_address = NULL;
200
201   G_OBJECT_CLASS (parent_class)->finalize (object);
202 }
203
204 static GstFlowReturn
205 gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
206 {
207   GstDynUDPSink *sink;
208   gssize ret;
209   GstMapInfo map;
210   GstNetAddressMeta *meta;
211   GSocketAddress *addr;
212   GError *err = NULL;
213   GSocketFamily family;
214   GSocket *socket;
215
216   meta = gst_buffer_get_net_address_meta (buffer);
217
218   if (meta == NULL) {
219     GST_DEBUG ("Received buffer without GstNetAddressMeta, skipping");
220     return GST_FLOW_OK;
221   }
222
223   sink = GST_DYNUDPSINK (bsink);
224
225   /* let's get the address from the metadata */
226   addr = meta->addr;
227
228   family = g_socket_address_get_family (addr);
229   if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
230     goto invalid_family;
231
232   gst_buffer_map (buffer, &map, GST_MAP_READ);
233
234   GST_DEBUG ("about to send %" G_GSIZE_FORMAT " bytes", map.size);
235
236 #ifndef GST_DISABLE_GST_DEBUG
237   {
238     gchar *host;
239
240     host =
241         g_inet_address_to_string (g_inet_socket_address_get_address
242         (G_INET_SOCKET_ADDRESS (addr)));
243     GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %s port %d",
244         map.size, host,
245         g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)));
246     g_free (host);
247   }
248 #endif
249
250   /* Select socket to send from for this address */
251   if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
252     socket = sink->used_socket_v6;
253   else
254     socket = sink->used_socket;
255
256   ret =
257       g_socket_send_to (socket, addr, (gchar *) map.data, map.size,
258       sink->cancellable, &err);
259   gst_buffer_unmap (buffer, &map);
260
261   if (ret < 0)
262     goto send_error;
263
264   GST_DEBUG ("sent %" G_GSSIZE_FORMAT " bytes", ret);
265
266   return GST_FLOW_OK;
267
268 send_error:
269   {
270     GstFlowReturn flow_ret;
271
272     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
273       GST_DEBUG_OBJECT (sink, "send cancelled");
274       flow_ret = GST_FLOW_FLUSHING;
275     } else {
276       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
277           ("send error: %s", err->message));
278       flow_ret = GST_FLOW_ERROR;
279     }
280     g_clear_error (&err);
281     return flow_ret;
282   }
283 invalid_family:
284   {
285     GST_DEBUG ("invalid address family (got %d)", family);
286     return GST_FLOW_ERROR;
287   }
288 }
289
290 static void
291 gst_dynudpsink_set_property (GObject * object, guint prop_id,
292     const GValue * value, GParamSpec * pspec)
293 {
294   GstDynUDPSink *udpsink;
295
296   udpsink = GST_DYNUDPSINK (object);
297
298   switch (prop_id) {
299     case PROP_SOCKET:
300       if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
301           udpsink->close_socket) {
302         GError *err = NULL;
303
304         if (!g_socket_close (udpsink->socket, &err)) {
305           GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
306               err->message);
307           g_clear_error (&err);
308         }
309       }
310       if (udpsink->socket)
311         g_object_unref (udpsink->socket);
312       udpsink->socket = g_value_dup_object (value);
313       GST_DEBUG ("setting socket to %p", udpsink->socket);
314       break;
315     case PROP_SOCKET_V6:
316       if (udpsink->socket_v6 != NULL
317           && udpsink->socket_v6 != udpsink->used_socket_v6
318           && udpsink->close_socket) {
319         GError *err = NULL;
320
321         if (!g_socket_close (udpsink->socket_v6, &err)) {
322           GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
323               err->message);
324           g_clear_error (&err);
325         }
326       }
327       if (udpsink->socket_v6)
328         g_object_unref (udpsink->socket_v6);
329       udpsink->socket_v6 = g_value_dup_object (value);
330       GST_DEBUG ("setting socket v6 to %p", udpsink->socket_v6);
331       break;
332     case PROP_CLOSE_SOCKET:
333       udpsink->close_socket = g_value_get_boolean (value);
334       break;
335     case PROP_BIND_ADDRESS:
336       g_free (udpsink->bind_address);
337       udpsink->bind_address = g_value_dup_string (value);
338       break;
339     case PROP_BIND_PORT:
340       udpsink->bind_port = g_value_get_int (value);
341       break;
342     default:
343       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
344       break;
345   }
346 }
347
348 static void
349 gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value,
350     GParamSpec * pspec)
351 {
352   GstDynUDPSink *udpsink;
353
354   udpsink = GST_DYNUDPSINK (object);
355
356   switch (prop_id) {
357     case PROP_SOCKET:
358       g_value_set_object (value, udpsink->socket);
359       break;
360     case PROP_SOCKET_V6:
361       g_value_set_object (value, udpsink->socket_v6);
362       break;
363     case PROP_CLOSE_SOCKET:
364       g_value_set_boolean (value, udpsink->close_socket);
365       break;
366     case PROP_BIND_ADDRESS:
367       g_value_set_string (value, udpsink->bind_address);
368       break;
369     case PROP_BIND_PORT:
370       g_value_set_int (value, udpsink->bind_port);
371       break;
372     default:
373       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
374       break;
375   }
376 }
377
378 /* create a socket for sending to remote machine */
379 static gboolean
380 gst_dynudpsink_start (GstBaseSink * bsink)
381 {
382   GstDynUDPSink *udpsink;
383   GError *err = NULL;
384
385   udpsink = GST_DYNUDPSINK (bsink);
386
387   udpsink->external_socket = FALSE;
388
389   if (udpsink->socket) {
390     if (g_socket_get_family (udpsink->socket) == G_SOCKET_FAMILY_IPV6) {
391       udpsink->used_socket_v6 = G_SOCKET (g_object_ref (udpsink->socket));
392       udpsink->external_socket = TRUE;
393     } else {
394       udpsink->used_socket = G_SOCKET (g_object_ref (udpsink->socket));
395       udpsink->external_socket = TRUE;
396     }
397   }
398
399   if (udpsink->socket_v6) {
400     g_return_val_if_fail (g_socket_get_family (udpsink->socket) !=
401         G_SOCKET_FAMILY_IPV6, FALSE);
402
403     if (udpsink->used_socket_v6
404         && udpsink->used_socket_v6 != udpsink->socket_v6) {
405       GST_ERROR_OBJECT (udpsink,
406           "Provided different IPv6 sockets in socket and socket-v6 properties");
407       return FALSE;
408     }
409
410     udpsink->used_socket_v6 = G_SOCKET (g_object_ref (udpsink->socket_v6));
411     udpsink->external_socket = TRUE;
412   }
413
414   if (!udpsink->used_socket && !udpsink->used_socket_v6) {
415     GSocketAddress *bind_addr;
416     GInetAddress *bind_iaddr;
417
418     if (udpsink->bind_address) {
419       GSocketFamily family;
420
421       bind_iaddr = g_inet_address_new_from_string (udpsink->bind_address);
422       if (!bind_iaddr) {
423         GList *results;
424         GResolver *resolver;
425
426         resolver = g_resolver_get_default ();
427         results =
428             g_resolver_lookup_by_name (resolver, udpsink->bind_address,
429             udpsink->cancellable, &err);
430         if (!results) {
431           g_object_unref (resolver);
432           goto name_resolve;
433         }
434         bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
435         g_resolver_free_addresses (results);
436         g_object_unref (resolver);
437       }
438
439       bind_addr = g_inet_socket_address_new (bind_iaddr, udpsink->bind_port);
440       g_object_unref (bind_iaddr);
441       family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
442
443       if ((udpsink->used_socket =
444               g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
445                   G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
446         g_object_unref (bind_addr);
447         goto no_socket;
448       }
449
450       g_socket_bind (udpsink->used_socket, bind_addr, TRUE, &err);
451       if (err != NULL)
452         goto bind_error;
453     } else {
454       /* create sender sockets if none available */
455       if ((udpsink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
456                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
457         goto no_socket;
458
459       bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
460       bind_addr = g_inet_socket_address_new (bind_iaddr, 0);
461       g_socket_bind (udpsink->used_socket, bind_addr, TRUE, &err);
462       g_object_unref (bind_addr);
463       g_object_unref (bind_iaddr);
464       if (err != NULL)
465         goto bind_error;
466
467       if ((udpsink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
468                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
469                   &err)) == NULL) {
470         GST_INFO_OBJECT (udpsink, "Failed to create IPv6 socket: %s",
471             err->message);
472         g_clear_error (&err);
473       } else {
474         bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
475         bind_addr = g_inet_socket_address_new (bind_iaddr, 0);
476         g_socket_bind (udpsink->used_socket_v6, bind_addr, TRUE, &err);
477         g_object_unref (bind_addr);
478         g_object_unref (bind_iaddr);
479         if (err != NULL)
480           goto bind_error;
481       }
482     }
483   }
484
485   if (udpsink->used_socket)
486     g_socket_set_broadcast (udpsink->used_socket, TRUE);
487   if (udpsink->used_socket_v6)
488     g_socket_set_broadcast (udpsink->used_socket_v6, TRUE);
489
490   return TRUE;
491
492   /* ERRORS */
493 no_socket:
494   {
495     GST_ERROR_OBJECT (udpsink, "Failed to create IPv4 socket: %s",
496         err->message);
497     g_clear_error (&err);
498     return FALSE;
499   }
500 bind_error:
501   {
502     GST_ELEMENT_ERROR (udpsink, RESOURCE, FAILED, (NULL),
503         ("Failed to bind socket: %s", err->message));
504     g_clear_error (&err);
505     return FALSE;
506   }
507 name_resolve:
508   {
509     GST_ELEMENT_ERROR (udpsink, RESOURCE, FAILED, (NULL),
510         ("Failed to resolve bind address %s: %s", udpsink->bind_address,
511             err->message));
512     g_clear_error (&err);
513     return FALSE;
514   }
515 }
516
517 static GstStructure *
518 gst_dynudpsink_get_stats (GstDynUDPSink * sink, const gchar * host, gint port)
519 {
520   return NULL;
521 }
522
523 static gboolean
524 gst_dynudpsink_stop (GstBaseSink * bsink)
525 {
526   GstDynUDPSink *udpsink;
527
528   udpsink = GST_DYNUDPSINK (bsink);
529
530   if (udpsink->used_socket) {
531     if (udpsink->close_socket || !udpsink->external_socket) {
532       GError *err = NULL;
533
534       if (!g_socket_close (udpsink->used_socket, &err)) {
535         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
536         g_clear_error (&err);
537       }
538     }
539
540     g_object_unref (udpsink->used_socket);
541     udpsink->used_socket = NULL;
542   }
543
544   if (udpsink->used_socket_v6) {
545     if (udpsink->close_socket || !udpsink->external_socket) {
546       GError *err = NULL;
547
548       if (!g_socket_close (udpsink->used_socket_v6, &err)) {
549         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
550         g_clear_error (&err);
551       }
552     }
553
554     g_object_unref (udpsink->used_socket_v6);
555     udpsink->used_socket_v6 = NULL;
556   }
557
558   return TRUE;
559 }
560
561 static gboolean
562 gst_dynudpsink_unlock (GstBaseSink * bsink)
563 {
564   GstDynUDPSink *udpsink;
565
566   udpsink = GST_DYNUDPSINK (bsink);
567
568   g_cancellable_cancel (udpsink->cancellable);
569
570   return TRUE;
571 }
572
573 static gboolean
574 gst_dynudpsink_unlock_stop (GstBaseSink * bsink)
575 {
576   GstDynUDPSink *udpsink;
577
578   udpsink = GST_DYNUDPSINK (bsink);
579
580   g_cancellable_reset (udpsink->cancellable);
581
582   return TRUE;
583 }