Fix double semicolons
[platform/upstream/gst-plugins-good.git] / gst / udp / gstmultiudpsink.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4  * Copyright (C) <2012> Collabora Ltd.
5  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-multiudpsink
25  * @see_also: udpsink, multifdsink
26  *
27  * multiudpsink is a network sink that sends UDP packets to multiple
28  * clients.
29  * It can be combined with rtp payload encoders to implement RTP streaming.
30  */
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 #include "gstmultiudpsink.h"
36
37 #include <string.h>
38 #ifdef HAVE_SYS_SOCKET_H
39 #include <sys/socket.h>
40 #endif
41
42 #ifndef G_OS_WIN32
43 #include <netinet/in.h>
44 #endif
45
46 #include "gst/glib-compat-private.h"
47
48 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
49 #define GST_CAT_DEFAULT (multiudpsink_debug)
50
51 #define UDP_MAX_SIZE 65507
52
53 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
54     GST_PAD_SINK,
55     GST_PAD_ALWAYS,
56     GST_STATIC_CAPS_ANY);
57
58 /* MultiUDPSink signals and args */
59 enum
60 {
61   /* methods */
62   SIGNAL_ADD,
63   SIGNAL_REMOVE,
64   SIGNAL_CLEAR,
65   SIGNAL_GET_STATS,
66
67   /* signals */
68   SIGNAL_CLIENT_ADDED,
69   SIGNAL_CLIENT_REMOVED,
70
71   /* FILL ME */
72   LAST_SIGNAL
73 };
74
75 #define DEFAULT_SOCKET             NULL
76 #define DEFAULT_CLOSE_SOCKET       TRUE
77 #define DEFAULT_USED_SOCKET        NULL
78 #define DEFAULT_CLIENTS            NULL
79 /* FIXME, this should be disabled by default, we don't need to join a multicast
80  * group for sending, if this socket is also used for receiving, it should
81  * be configured in the element that does the receive. */
82 #define DEFAULT_AUTO_MULTICAST     TRUE
83 #define DEFAULT_MULTICAST_IFACE    NULL
84 #define DEFAULT_TTL                64
85 #define DEFAULT_TTL_MC             1
86 #define DEFAULT_LOOP               TRUE
87 #define DEFAULT_FORCE_IPV4         FALSE
88 #define DEFAULT_QOS_DSCP           -1
89 #define DEFAULT_SEND_DUPLICATES    TRUE
90 #define DEFAULT_BUFFER_SIZE        0
91 #define DEFAULT_BIND_ADDRESS       NULL
92 #define DEFAULT_BIND_PORT          0
93
94 enum
95 {
96   PROP_0,
97   PROP_BYTES_TO_SERVE,
98   PROP_BYTES_SERVED,
99   PROP_SOCKET,
100   PROP_SOCKET_V6,
101   PROP_CLOSE_SOCKET,
102   PROP_USED_SOCKET,
103   PROP_USED_SOCKET_V6,
104   PROP_CLIENTS,
105   PROP_AUTO_MULTICAST,
106   PROP_MULTICAST_IFACE,
107   PROP_TTL,
108   PROP_TTL_MC,
109   PROP_LOOP,
110   PROP_FORCE_IPV4,
111   PROP_QOS_DSCP,
112   PROP_SEND_DUPLICATES,
113   PROP_BUFFER_SIZE,
114   PROP_BIND_ADDRESS,
115   PROP_BIND_PORT,
116   PROP_LAST
117 };
118
119 static void gst_multiudpsink_finalize (GObject * object);
120
121 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
122     GstBuffer * buffer);
123 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
124     GstBufferList * buffer_list);
125
126 static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
127 static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
128 static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink);
129 static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink);
130
131 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
132     const GValue * value, GParamSpec * pspec);
133 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
134     GValue * value, GParamSpec * pspec);
135
136 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
137     const gchar * host, gint port, gboolean lock);
138 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
139     gboolean lock);
140
141 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
142
143 #define gst_multiudpsink_parent_class parent_class
144 G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
145
146 static void
147 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
148 {
149   GObjectClass *gobject_class;
150   GstElementClass *gstelement_class;
151   GstBaseSinkClass *gstbasesink_class;
152
153   gobject_class = (GObjectClass *) klass;
154   gstelement_class = (GstElementClass *) klass;
155   gstbasesink_class = (GstBaseSinkClass *) klass;
156
157   gobject_class->set_property = gst_multiudpsink_set_property;
158   gobject_class->get_property = gst_multiudpsink_get_property;
159   gobject_class->finalize = gst_multiudpsink_finalize;
160
161   /**
162    * GstMultiUDPSink::add:
163    * @gstmultiudpsink: the sink on which the signal is emitted
164    * @host: the hostname/IP address of the client to add
165    * @port: the port of the client to add
166    *
167    * Add a client with destination @host and @port to the list of
168    * clients. When the same host/port pair is added multiple times, the
169    * send-duplicates property defines if the packets are sent multiple times to
170    * the same host/port pair or not.
171    *
172    * When a host/port pair is added multiple times, an equal amount of remove
173    * calls must be performed to actually remove the host/port pair from the list
174    * of destinations.
175    */
176   gst_multiudpsink_signals[SIGNAL_ADD] =
177       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
178       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
179       G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
180       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
181       G_TYPE_STRING, G_TYPE_INT);
182   /**
183    * GstMultiUDPSink::remove:
184    * @gstmultiudpsink: the sink on which the signal is emitted
185    * @host: the hostname/IP address of the client to remove
186    * @port: the port of the client to remove
187    *
188    * Remove the client with destination @host and @port from the list of
189    * clients.
190    */
191   gst_multiudpsink_signals[SIGNAL_REMOVE] =
192       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
193       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
194       G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
195       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
196       G_TYPE_STRING, G_TYPE_INT);
197   /**
198    * GstMultiUDPSink::clear:
199    * @gstmultiudpsink: the sink on which the signal is emitted
200    *
201    * Clear the list of clients.
202    */
203   gst_multiudpsink_signals[SIGNAL_CLEAR] =
204       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
206       G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
207       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0);
208   /**
209    * GstMultiUDPSink::get-stats:
210    * @gstmultiudpsink: the sink on which the signal is emitted
211    * @host: the hostname/IP address of the client to get stats on
212    * @port: the port of the client to get stats on
213    *
214    * Get the statistics of the client with destination @host and @port.
215    *
216    * Returns: a GstStructure: bytes_sent, packets_sent,
217    *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
218    */
219   gst_multiudpsink_signals[SIGNAL_GET_STATS] =
220       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
221       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
222       G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
223       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 2,
224       G_TYPE_STRING, G_TYPE_INT);
225   /**
226    * GstMultiUDPSink::client-added:
227    * @gstmultiudpsink: the sink emitting the signal
228    * @host: the hostname/IP address of the added client
229    * @port: the port of the added client
230    *
231    * Signal emited when a new client is added to the list of
232    * clients.
233    */
234   gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
235       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
237       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
238       G_TYPE_STRING, G_TYPE_INT);
239   /**
240    * GstMultiUDPSink::client-removed:
241    * @gstmultiudpsink: the sink emitting the signal
242    * @host: the hostname/IP address of the removed client
243    * @port: the port of the removed client
244    *
245    * Signal emited when a client is removed from the list of
246    * clients.
247    */
248   gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
249       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
250       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
251           client_removed), NULL, NULL, g_cclosure_marshal_generic,
252       G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
253
254   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
255       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
256           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
257           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
258   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
259       g_param_spec_uint64 ("bytes-served", "Bytes served",
260           "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0,
261           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_SOCKET,
263       g_param_spec_object ("socket", "Socket Handle",
264           "Socket to use for UDP sending. (NULL == allocate)",
265           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
267       g_param_spec_object ("socket-v6", "Socket Handle IPv6",
268           "Socket to use for UDPv6 sending. (NULL == allocate)",
269           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
271       g_param_spec_boolean ("close-socket", "Close socket",
272           "Close socket if passed as property on state change",
273           DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
274   g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
275       g_param_spec_object ("used-socket", "Used Socket Handle",
276           "Socket currently in use for UDP sending. (NULL == no socket)",
277           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
278   g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
279       g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
280           "Socket currently in use for UDPv6 sending. (NULL == no socket)",
281           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
282   g_object_class_install_property (gobject_class, PROP_CLIENTS,
283       g_param_spec_string ("clients", "Clients",
284           "A comma separated list of host:port pairs with destinations",
285           DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
287       g_param_spec_boolean ("auto-multicast",
288           "Automatically join/leave multicast groups",
289           "Automatically join/leave the multicast groups, FALSE means user"
290           " has to do it himself", DEFAULT_AUTO_MULTICAST,
291           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
293       g_param_spec_string ("multicast-iface", "Multicast Interface",
294           "The network interface on which to join the multicast group",
295           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296   g_object_class_install_property (gobject_class, PROP_TTL,
297       g_param_spec_int ("ttl", "Unicast TTL",
298           "Used for setting the unicast TTL parameter",
299           0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   g_object_class_install_property (gobject_class, PROP_TTL_MC,
301       g_param_spec_int ("ttl-mc", "Multicast TTL",
302           "Used for setting the multicast TTL parameter",
303           0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304   g_object_class_install_property (gobject_class, PROP_LOOP,
305       g_param_spec_boolean ("loop", "Multicast Loopback",
306           "Used for setting the multicast loop parameter. TRUE = enable,"
307           " FALSE = disable", DEFAULT_LOOP,
308           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
309   /**
310    * GstMultiUDPSink::force-ipv4:
311    *
312    * Force the use of an IPv4 socket.
313    *
314    * Since: 1.0.2
315    */
316 #ifndef GST_REMOVE_DEPRECATED
317   g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
318       g_param_spec_boolean ("force-ipv4", "Force IPv4",
319           "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
320           DEFAULT_FORCE_IPV4,
321           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
322 #endif
323   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
324       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
325           "Quality of Service, differentiated services code point (-1 default)",
326           -1, 63, DEFAULT_QOS_DSCP,
327           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328   /**
329    * GstMultiUDPSink::send-duplicates:
330    *
331    * When a host/port pair is added mutliple times, send the packet to the host
332    * multiple times as well.
333    */
334   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
335       g_param_spec_boolean ("send-duplicates", "Send Duplicates",
336           "When a distination/port pair is added multiple times, send packets "
337           "multiple times as well", DEFAULT_SEND_DUPLICATES,
338           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339
340   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
341       g_param_spec_int ("buffer-size", "Buffer Size",
342           "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
343           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345   g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
346       g_param_spec_string ("bind-address", "Bind Address",
347           "Address to bind the socket to", DEFAULT_BIND_ADDRESS,
348           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_BIND_PORT,
350       g_param_spec_int ("bind-port", "Bind Port",
351           "Port to bind the socket to", 0, G_MAXUINT16,
352           DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353
354   gst_element_class_add_pad_template (gstelement_class,
355       gst_static_pad_template_get (&sink_template));
356
357   gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
358       "Sink/Network",
359       "Send data over the network via UDP to one or multiple recipients "
360       "which can be added or removed at runtime using action signals",
361       "Wim Taymans <wim.taymans@gmail.com>");
362
363   gstbasesink_class->render = gst_multiudpsink_render;
364   gstbasesink_class->render_list = gst_multiudpsink_render_list;
365   gstbasesink_class->start = gst_multiudpsink_start;
366   gstbasesink_class->stop = gst_multiudpsink_stop;
367   gstbasesink_class->unlock = gst_multiudpsink_unlock;
368   gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop;
369   klass->add = gst_multiudpsink_add;
370   klass->remove = gst_multiudpsink_remove;
371   klass->clear = gst_multiudpsink_clear;
372   klass->get_stats = gst_multiudpsink_get_stats;
373
374   GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
375 }
376
377 static void
378 gst_multiudpsink_init (GstMultiUDPSink * sink)
379 {
380   guint max_mem;
381
382   g_mutex_init (&sink->client_lock);
383   sink->clients = NULL;
384   sink->num_v4_unique = 0;
385   sink->num_v4_all = 0;
386   sink->num_v6_unique = 0;
387   sink->num_v6_all = 0;
388
389   sink->socket = DEFAULT_SOCKET;
390   sink->socket_v6 = DEFAULT_SOCKET;
391   sink->used_socket = DEFAULT_USED_SOCKET;
392   sink->used_socket_v6 = DEFAULT_USED_SOCKET;
393   sink->close_socket = DEFAULT_CLOSE_SOCKET;
394   sink->external_socket = (sink->socket != NULL);
395   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
396   sink->ttl = DEFAULT_TTL;
397   sink->ttl_mc = DEFAULT_TTL_MC;
398   sink->loop = DEFAULT_LOOP;
399   sink->force_ipv4 = DEFAULT_FORCE_IPV4;
400   sink->qos_dscp = DEFAULT_QOS_DSCP;
401   sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
402   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
403
404   sink->cancellable = g_cancellable_new ();
405
406   /* pre-allocate OutputVector, MapInfo and OutputMessage arrays
407    * for use in the render and render_list functions */
408   max_mem = gst_buffer_get_max_memory ();
409
410   sink->n_vecs = max_mem;
411   sink->vecs = g_new (GOutputVector, sink->n_vecs);
412
413   sink->n_maps = max_mem;
414   sink->maps = g_new (GstMapInfo, sink->n_maps);
415
416   sink->n_messages = 1;
417   sink->messages = g_new (GstOutputMessage, sink->n_messages);
418
419   /* we assume that the number of memories per buffer can fit into a guint8 */
420   g_warn_if_fail (max_mem <= G_MAXUINT8);
421 }
422
423 static GstUDPClient *
424 gst_udp_client_new (GstMultiUDPSink * sink, const gchar * host, gint port)
425 {
426   GstUDPClient *client;
427   GInetAddress *addr;
428   GResolver *resolver;
429   GError *err = NULL;
430
431   addr = g_inet_address_new_from_string (host);
432   if (!addr) {
433     GList *results;
434
435     resolver = g_resolver_get_default ();
436     results =
437         g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err);
438     if (!results)
439       goto name_resolve;
440     addr = G_INET_ADDRESS (g_object_ref (results->data));
441
442     g_resolver_free_addresses (results);
443     g_object_unref (resolver);
444   }
445 #ifndef GST_DISABLE_GST_DEBUG
446   {
447     gchar *ip = g_inet_address_to_string (addr);
448
449     GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip);
450     g_free (ip);
451   }
452 #endif
453
454   client = g_slice_new0 (GstUDPClient);
455   client->ref_count = 1;
456   client->add_count = 0;
457   client->host = g_strdup (host);
458   client->port = port;
459   client->addr = g_inet_socket_address_new (addr, port);
460   g_object_unref (addr);
461
462   return client;
463
464 name_resolve:
465   {
466     g_object_unref (resolver);
467
468     return NULL;
469   }
470 }
471
472 /* call with client lock held */
473 static void
474 gst_udp_client_unref (GstUDPClient * client)
475 {
476   if (--client->ref_count == 0) {
477     g_object_unref (client->addr);
478     g_free (client->host);
479     g_slice_free (GstUDPClient, client);
480   }
481 }
482
483 /* call with client lock held */
484 static inline GstUDPClient *
485 gst_udp_client_ref (GstUDPClient * client)
486 {
487   ++client->ref_count;
488   return client;
489 }
490
491 static gint
492 client_compare (GstUDPClient * a, GstUDPClient * b)
493 {
494   if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
495     return 0;
496
497   return 1;
498 }
499
500 static void
501 gst_multiudpsink_finalize (GObject * object)
502 {
503   GstMultiUDPSink *sink;
504
505   sink = GST_MULTIUDPSINK (object);
506
507   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, NULL);
508   g_list_free (sink->clients);
509
510   if (sink->socket)
511     g_object_unref (sink->socket);
512   sink->socket = NULL;
513
514   if (sink->socket_v6)
515     g_object_unref (sink->socket_v6);
516   sink->socket_v6 = NULL;
517
518   if (sink->used_socket)
519     g_object_unref (sink->used_socket);
520   sink->used_socket = NULL;
521
522   if (sink->used_socket_v6)
523     g_object_unref (sink->used_socket_v6);
524   sink->used_socket_v6 = NULL;
525
526   if (sink->cancellable)
527     g_object_unref (sink->cancellable);
528   sink->cancellable = NULL;
529
530   g_free (sink->multi_iface);
531   sink->multi_iface = NULL;
532
533   g_free (sink->vecs);
534   sink->vecs = NULL;
535   g_free (sink->maps);
536   sink->maps = NULL;
537   g_free (sink->messages);
538   sink->messages = NULL;
539
540   g_free (sink->bind_address);
541   sink->bind_address = NULL;
542
543   g_mutex_clear (&sink->client_lock);
544
545   G_OBJECT_CLASS (parent_class)->finalize (object);
546 }
547
548 /* replacement until we can depend unconditionally on the real one in GLib */
549 #ifndef HAVE_G_SOCKET_SEND_MESSAGES
550 #define g_socket_send_messages gst_socket_send_messages
551
552 static gint
553 gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages,
554     guint num_messages, gint flags, GCancellable * cancellable, GError ** error)
555 {
556   gssize result;
557   gint i;
558
559   for (i = 0; i < num_messages; ++i) {
560     GstOutputMessage *msg = &messages[i];
561     GError *msg_error = NULL;
562
563     result = g_socket_send_message (socket, msg->address,
564         msg->vectors, msg->num_vectors,
565         msg->control_messages, msg->num_control_messages,
566         flags, cancellable, &msg_error);
567
568     if (result < 0) {
569       /* if we couldn't send all messages, just return how many we did
570        * manage to send, provided we managed to send at least one */
571       if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) {
572         g_error_free (msg_error);
573         return i;
574       } else {
575         g_propagate_error (error, msg_error);
576         return -1;
577       }
578     }
579
580     msg->bytes_sent = result;
581   }
582
583   return i;
584 }
585 #endif /* HAVE_G_SOCKET_SEND_MESSAGES */
586
587 static gsize
588 fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
589 {
590   GstMemory *mem;
591   gsize size = 0;
592   guint i;
593
594   g_assert (gst_buffer_n_memory (buf) == n);
595
596   for (i = 0; i < n; ++i) {
597     mem = gst_buffer_peek_memory (buf, i);
598     if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
599       vecs[i].buffer = maps[i].data;
600       vecs[i].size = maps[i].size;
601     } else {
602       GST_WARNING ("Failed to map memory %p for reading", mem);
603       vecs[i].buffer = "";
604       vecs[i].size = 0;
605     }
606     size += vecs[i].size;
607   }
608
609   return size;
610 }
611
612 static gsize
613 gst_udp_calc_message_size (GstOutputMessage * msg)
614 {
615   gsize size = 0;
616   guint i;
617
618   for (i = 0; i < msg->num_vectors; ++i)
619     size += msg->vectors[i].size;
620
621   return size;
622 }
623
624 static gint
625 gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
626     guint num_messages)
627 {
628   guint i;
629
630   for (i = 0; i < num_messages; ++i) {
631     GstOutputMessage *msg = &messages[i];
632
633     if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
634       return i;
635   }
636
637   return -1;
638 }
639
640 static inline gchar *
641 gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
642 {
643   GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
644   GInetAddress *ia;
645   gchar *addr_str;
646
647   ia = g_inet_socket_address_get_address (isa);
648   addr_str = g_inet_address_to_string (ia);
649   g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
650   g_free (addr_str);
651   g_object_unref (ia);
652
653   return s;
654 }
655
656 /* Wrapper around g_socket_send_messages() plus error handling (ignoring).
657  * Returns FALSE if we got cancelled, otherwise TRUE. */
658 static gboolean
659 gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
660     GstOutputMessage * messages, guint num_messages)
661 {
662   gboolean sent_max_size_warning = FALSE;
663
664   while (num_messages > 0) {
665     gchar astr[64] G_GNUC_UNUSED;
666     GError *err = NULL;
667     guint msg_size, skip, i;
668     gint ret, err_idx;
669
670     ret = g_socket_send_messages (socket, messages, num_messages, 0,
671         sink->cancellable, &err);
672
673     if (G_UNLIKELY (ret < 0)) {
674       GstOutputMessage *msg;
675
676       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
677         g_clear_error (&err);
678         return FALSE;
679       }
680
681       err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
682       if (err_idx < 0)
683         break;
684
685       msg = &messages[err_idx];
686       msg_size = gst_udp_calc_message_size (msg);
687
688       GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
689           gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
690           err->message);
691
692       skip = 1;
693       if (msg_size > UDP_MAX_SIZE) {
694         if (!sent_max_size_warning) {
695           GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
696               ("Attempting to send a UDP packets larger than maximum size "
697                   "(%u > %d)", msg_size, UDP_MAX_SIZE),
698               ("Reason: %s", err ? err->message : "unknown reason"));
699           sent_max_size_warning = FALSE;
700         }
701       } else {
702         GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
703             ("Error sending UDP packets"), ("client %s, reason: %s",
704                 gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
705                 (err != NULL) ? err->message : "unknown reason"));
706
707         for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
708           if (messages[i].address != msg->address)
709             break;
710         }
711         GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
712       }
713
714       /* ignore any errors and try sending the rest */
715       g_clear_error (&err);
716       ret = skip;
717     }
718
719     g_assert (ret <= num_messages);
720
721     messages += ret;
722     num_messages -= ret;
723   }
724
725   return TRUE;
726 }
727
728 static GstFlowReturn
729 gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
730     guint num_buffers, guint8 * mem_nums, guint total_mem_num)
731 {
732   GstOutputMessage *msgs;
733   gboolean send_duplicates;
734   GstUDPClient **clients;
735   GOutputVector *vecs;
736   GstMapInfo *map_infos;
737   GstFlowReturn flow_ret;
738   guint num_addr_v4, num_addr_v6;
739   guint num_addr, num_msgs;
740   GError *err = NULL;
741   guint i, j, mem;
742   gsize size = 0;
743   GList *l;
744
745   send_duplicates = sink->send_duplicates;
746
747   g_mutex_lock (&sink->client_lock);
748
749   if (send_duplicates) {
750     num_addr_v4 = sink->num_v4_all;
751     num_addr_v6 = sink->num_v6_all;
752   } else {
753     num_addr_v4 = sink->num_v4_unique;
754     num_addr_v6 = sink->num_v6_unique;
755   }
756   num_addr = num_addr_v4 + num_addr_v6;
757
758   if (num_addr == 0)
759     goto no_clients;
760
761   clients = g_newa (GstUDPClient *, num_addr);
762   for (l = sink->clients, i = 0; l != NULL; l = l->next) {
763     GstUDPClient *client = l->data;
764
765     clients[i++] = gst_udp_client_ref (client);
766     for (j = 1; send_duplicates && j < client->add_count; ++j)
767       clients[i++] = gst_udp_client_ref (client);
768   }
769   g_assert_cmpuint (i, ==, num_addr);
770
771   g_mutex_unlock (&sink->client_lock);
772
773   GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
774       num_buffers, total_mem_num, num_addr);
775
776   /* ensure our pre-allocated scratch space arrays are large enough */
777   if (sink->n_vecs < total_mem_num) {
778     sink->n_vecs = GST_ROUND_UP_16 (total_mem_num);
779     g_free (sink->vecs);
780     sink->vecs = g_new (GOutputVector, sink->n_vecs);
781   }
782   vecs = sink->vecs;
783
784   if (sink->n_maps < total_mem_num) {
785     sink->n_maps = GST_ROUND_UP_16 (total_mem_num);
786     g_free (sink->maps);
787     sink->maps = g_new (GstMapInfo, sink->n_maps);
788   }
789   map_infos = sink->maps;
790
791   num_msgs = num_addr * num_buffers;
792   if (sink->n_messages < num_msgs) {
793     sink->n_messages = GST_ROUND_UP_16 (num_msgs);
794     g_free (sink->messages);
795     sink->messages = g_new (GstOutputMessage, sink->n_messages);
796   }
797   msgs = sink->messages;
798
799   /* populate first num_buffers messages with output vectors for the buffers */
800   for (i = 0, mem = 0; i < num_buffers; ++i) {
801     size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
802     msgs[i].vectors = &vecs[mem];
803     msgs[i].num_vectors = mem_nums[i];
804     msgs[i].num_control_messages = 0;
805     msgs[i].control_messages = NULL;
806     msgs[i].address = clients[0]->addr;
807     mem += mem_nums[i];
808   }
809
810   /* FIXME: how about some locking? (there wasn't any before either, but..) */
811   sink->bytes_to_serve += size;
812
813   /* now copy the pre-filled num_buffer messages over to the next num_buffer
814    * messages for the next client, where we also change the target adddress */
815   for (i = 1; i < num_addr; ++i) {
816     for (j = 0; j < num_buffers; ++j) {
817       msgs[i * num_buffers + j] = msgs[j];
818       msgs[i * num_buffers + j].address = clients[i]->addr;
819     }
820   }
821
822   /* now send it! */
823   {
824     gboolean ret;
825
826     /* no IPv4 socket? Send it all from the IPv6 socket then.. */
827     if (sink->used_socket == NULL) {
828       ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
829           msgs, num_msgs);
830     } else {
831       guint num_msgs_v4 = num_buffers * num_addr_v4;
832       guint num_msgs_v6 = num_buffers * num_addr_v6;
833
834       /* our client list is sorted with IPv4 clients first and IPv6 ones last */
835       ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
836           msgs, num_msgs_v4);
837
838       if (!ret)
839         goto cancelled;
840
841       ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
842           msgs + num_msgs_v4, num_msgs_v6);
843     }
844
845     if (!ret)
846       goto cancelled;
847   }
848
849   flow_ret = GST_FLOW_OK;
850
851   /* now update stats */
852   g_mutex_lock (&sink->client_lock);
853
854   for (i = 0; i < num_addr; ++i) {
855     GstUDPClient *client = clients[i];
856
857     for (j = 0; j < num_buffers; ++j) {
858       gsize bytes_sent;
859
860       bytes_sent = msgs[i * num_buffers + j].bytes_sent;
861
862       client->bytes_sent += bytes_sent;
863       client->packets_sent++;
864       sink->bytes_served += bytes_sent;
865     }
866     gst_udp_client_unref (client);
867   }
868
869   g_mutex_unlock (&sink->client_lock);
870
871 out:
872
873   for (i = 0; i < mem; ++i)
874     gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
875
876   return flow_ret;
877
878 no_clients:
879   {
880     g_mutex_unlock (&sink->client_lock);
881     GST_LOG_OBJECT (sink, "no clients");
882     return GST_FLOW_OK;
883   }
884 cancelled:
885   {
886     GST_INFO_OBJECT (sink, "cancelled");
887     g_clear_error (&err);
888     flow_ret = GST_FLOW_FLUSHING;
889
890     g_mutex_lock (&sink->client_lock);
891     for (i = 0; i < num_addr; ++i)
892       gst_udp_client_unref (clients[i]);
893     g_mutex_unlock (&sink->client_lock);
894     goto out;
895   }
896 }
897
898 static GstFlowReturn
899 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
900 {
901   GstMultiUDPSink *sink;
902   GstBuffer **buffers;
903   GstFlowReturn flow;
904   guint8 *mem_nums;
905   guint total_mems;
906   guint i, num_buffers;
907
908   sink = GST_MULTIUDPSINK_CAST (bsink);
909
910   num_buffers = gst_buffer_list_length (buffer_list);
911   if (num_buffers == 0)
912     goto no_data;
913
914   buffers = g_newa (GstBuffer *, num_buffers);
915   mem_nums = g_newa (guint8, num_buffers);
916   for (i = 0, total_mems = 0; i < num_buffers; ++i) {
917     buffers[i] = gst_buffer_list_get (buffer_list, i);
918     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
919     total_mems += mem_nums[i];
920   }
921
922   flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
923       mem_nums, total_mems);
924
925   return flow;
926
927 no_data:
928   {
929     GST_LOG_OBJECT (sink, "empty buffer");
930     return GST_FLOW_OK;
931   }
932 }
933
934 static GstFlowReturn
935 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
936 {
937   GstMultiUDPSink *sink;
938   GstFlowReturn flow;
939   guint8 n_mem;
940
941   sink = GST_MULTIUDPSINK_CAST (bsink);
942
943   n_mem = gst_buffer_n_memory (buffer);
944
945   if (n_mem > 0)
946     flow = gst_multiudpsink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
947   else
948     flow = GST_FLOW_OK;
949
950   return flow;
951 }
952
953 static void
954 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
955     const gchar * string)
956 {
957   gchar **clients;
958   gint i;
959
960   clients = g_strsplit (string, ",", 0);
961
962   g_mutex_lock (&sink->client_lock);
963   /* clear all existing clients */
964   gst_multiudpsink_clear_internal (sink, FALSE);
965   for (i = 0; clients[i]; i++) {
966     gchar *host, *p;
967     gint64 port = 0;
968
969     host = clients[i];
970     p = strstr (clients[i], ":");
971     if (p != NULL) {
972       *p = '\0';
973       port = g_ascii_strtoll (p + 1, NULL, 10);
974     }
975     if (port != 0)
976       gst_multiudpsink_add_internal (sink, host, port, FALSE);
977   }
978   g_mutex_unlock (&sink->client_lock);
979
980   g_strfreev (clients);
981 }
982
983 static gchar *
984 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
985 {
986   GString *str;
987   GList *clients;
988
989   str = g_string_new ("");
990
991   g_mutex_lock (&sink->client_lock);
992   clients = sink->clients;
993   while (clients) {
994     GstUDPClient *client;
995     gint count;
996
997     client = (GstUDPClient *) clients->data;
998
999     clients = g_list_next (clients);
1000
1001     count = client->add_count;
1002     while (count--) {
1003       g_string_append_printf (str, "%s:%d%s", client->host, client->port,
1004           (clients || count > 1 ? "," : ""));
1005     }
1006   }
1007   g_mutex_unlock (&sink->client_lock);
1008
1009   return g_string_free (str, FALSE);
1010 }
1011
1012 static void
1013 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
1014 {
1015   /* don't touch on -1 */
1016   if (sink->qos_dscp < 0)
1017     return;
1018
1019   if (socket == NULL)
1020     return;
1021
1022 #ifdef IP_TOS
1023   {
1024     gint tos;
1025     gint fd;
1026
1027     fd = g_socket_get_fd (socket);
1028
1029     GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
1030
1031     /* Extract and shift 6 bits of DSFIELD */
1032     tos = (sink->qos_dscp & 0x3f) << 2;
1033
1034     if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
1035       GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno));
1036     }
1037 #ifdef IPV6_TCLASS
1038     if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) {
1039       GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno));
1040     }
1041 #endif
1042   }
1043 #endif
1044 }
1045
1046 static void
1047 gst_multiudpsink_set_property (GObject * object, guint prop_id,
1048     const GValue * value, GParamSpec * pspec)
1049 {
1050   GstMultiUDPSink *udpsink;
1051
1052   udpsink = GST_MULTIUDPSINK (object);
1053
1054   switch (prop_id) {
1055     case PROP_SOCKET:
1056       if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
1057           udpsink->close_socket) {
1058         GError *err = NULL;
1059
1060         if (!g_socket_close (udpsink->socket, &err)) {
1061           GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
1062               err->message);
1063           g_clear_error (&err);
1064         }
1065       }
1066       if (udpsink->socket)
1067         g_object_unref (udpsink->socket);
1068       udpsink->socket = g_value_dup_object (value);
1069       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
1070       break;
1071     case PROP_SOCKET_V6:
1072       if (udpsink->socket_v6 != NULL
1073           && udpsink->socket_v6 != udpsink->used_socket_v6
1074           && udpsink->close_socket) {
1075         GError *err = NULL;
1076
1077         if (!g_socket_close (udpsink->socket_v6, &err)) {
1078           GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
1079               err->message);
1080           g_clear_error (&err);
1081         }
1082       }
1083       if (udpsink->socket_v6)
1084         g_object_unref (udpsink->socket_v6);
1085       udpsink->socket_v6 = g_value_dup_object (value);
1086       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
1087       break;
1088     case PROP_CLOSE_SOCKET:
1089       udpsink->close_socket = g_value_get_boolean (value);
1090       break;
1091     case PROP_CLIENTS:
1092       gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
1093       break;
1094     case PROP_AUTO_MULTICAST:
1095       udpsink->auto_multicast = g_value_get_boolean (value);
1096       break;
1097     case PROP_MULTICAST_IFACE:
1098       g_free (udpsink->multi_iface);
1099
1100       if (g_value_get_string (value) == NULL)
1101         udpsink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1102       else
1103         udpsink->multi_iface = g_value_dup_string (value);
1104       break;
1105     case PROP_TTL:
1106       udpsink->ttl = g_value_get_int (value);
1107       break;
1108     case PROP_TTL_MC:
1109       udpsink->ttl_mc = g_value_get_int (value);
1110       break;
1111     case PROP_LOOP:
1112       udpsink->loop = g_value_get_boolean (value);
1113       break;
1114     case PROP_FORCE_IPV4:
1115       udpsink->force_ipv4 = g_value_get_boolean (value);
1116       break;
1117     case PROP_QOS_DSCP:
1118       udpsink->qos_dscp = g_value_get_int (value);
1119       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
1120       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
1121       break;
1122     case PROP_SEND_DUPLICATES:
1123       udpsink->send_duplicates = g_value_get_boolean (value);
1124       break;
1125     case PROP_BUFFER_SIZE:
1126       udpsink->buffer_size = g_value_get_int (value);
1127       break;
1128     case PROP_BIND_ADDRESS:
1129       g_free (udpsink->bind_address);
1130       udpsink->bind_address = g_value_dup_string (value);
1131       break;
1132     case PROP_BIND_PORT:
1133       udpsink->bind_port = g_value_get_int (value);
1134       break;
1135     default:
1136       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1137       break;
1138   }
1139 }
1140
1141 static void
1142 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
1143     GParamSpec * pspec)
1144 {
1145   GstMultiUDPSink *udpsink;
1146
1147   udpsink = GST_MULTIUDPSINK (object);
1148
1149   switch (prop_id) {
1150     case PROP_BYTES_TO_SERVE:
1151       g_value_set_uint64 (value, udpsink->bytes_to_serve);
1152       break;
1153     case PROP_BYTES_SERVED:
1154       g_value_set_uint64 (value, udpsink->bytes_served);
1155       break;
1156     case PROP_SOCKET:
1157       g_value_set_object (value, udpsink->socket);
1158       break;
1159     case PROP_SOCKET_V6:
1160       g_value_set_object (value, udpsink->socket_v6);
1161       break;
1162     case PROP_CLOSE_SOCKET:
1163       g_value_set_boolean (value, udpsink->close_socket);
1164       break;
1165     case PROP_USED_SOCKET:
1166       g_value_set_object (value, udpsink->used_socket);
1167       break;
1168     case PROP_USED_SOCKET_V6:
1169       g_value_set_object (value, udpsink->used_socket_v6);
1170       break;
1171     case PROP_CLIENTS:
1172       g_value_take_string (value,
1173           gst_multiudpsink_get_clients_string (udpsink));
1174       break;
1175     case PROP_AUTO_MULTICAST:
1176       g_value_set_boolean (value, udpsink->auto_multicast);
1177       break;
1178     case PROP_MULTICAST_IFACE:
1179       g_value_set_string (value, udpsink->multi_iface);
1180       break;
1181     case PROP_TTL:
1182       g_value_set_int (value, udpsink->ttl);
1183       break;
1184     case PROP_TTL_MC:
1185       g_value_set_int (value, udpsink->ttl_mc);
1186       break;
1187     case PROP_LOOP:
1188       g_value_set_boolean (value, udpsink->loop);
1189       break;
1190     case PROP_FORCE_IPV4:
1191       g_value_set_boolean (value, udpsink->force_ipv4);
1192       break;
1193     case PROP_QOS_DSCP:
1194       g_value_set_int (value, udpsink->qos_dscp);
1195       break;
1196     case PROP_SEND_DUPLICATES:
1197       g_value_set_boolean (value, udpsink->send_duplicates);
1198       break;
1199     case PROP_BUFFER_SIZE:
1200       g_value_set_int (value, udpsink->buffer_size);
1201       break;
1202     case PROP_BIND_ADDRESS:
1203       g_value_set_string (value, udpsink->bind_address);
1204       break;
1205     case PROP_BIND_PORT:
1206       g_value_set_int (value, udpsink->bind_port);
1207       break;
1208     default:
1209       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1210       break;
1211   }
1212 }
1213
1214 static gboolean
1215 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
1216     GstUDPClient * client)
1217 {
1218   GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1219   GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1220   GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
1221   GSocket *socket;
1222   GError *err = NULL;
1223
1224   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
1225
1226   if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
1227     goto invalid_family;
1228
1229   /* Select socket to send from for this address */
1230   if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1231     socket = sink->used_socket_v6;
1232   else
1233     socket = sink->used_socket;
1234
1235   if (g_inet_address_get_is_multicast (addr)) {
1236     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
1237     if (sink->auto_multicast) {
1238       GST_DEBUG_OBJECT (sink, "autojoining group");
1239       if (!g_socket_join_multicast_group (socket, addr, FALSE,
1240               sink->multi_iface, &err))
1241         goto join_group_failed;
1242     }
1243     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
1244     g_socket_set_multicast_loopback (socket, sink->loop);
1245     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
1246     g_socket_set_multicast_ttl (socket, sink->ttl_mc);
1247   } else {
1248     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
1249     g_socket_set_ttl (socket, sink->ttl);
1250   }
1251   return TRUE;
1252
1253   /* ERRORS */
1254 join_group_failed:
1255   {
1256     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1257     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1258         ("Could not join multicast group: %s",
1259             err ? err->message : "unknown reason"));
1260     g_clear_error (&err);
1261     return FALSE;
1262   }
1263 invalid_family:
1264   {
1265     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1266     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1267         ("Invalid address family (got %d)", family));
1268     return FALSE;
1269   }
1270 }
1271
1272 /* create a socket for sending to remote machine */
1273 static gboolean
1274 gst_multiudpsink_start (GstBaseSink * bsink)
1275 {
1276   GstMultiUDPSink *sink;
1277   GList *clients;
1278   GstUDPClient *client;
1279   GError *err = NULL;
1280
1281   sink = GST_MULTIUDPSINK (bsink);
1282
1283   sink->external_socket = FALSE;
1284
1285   if (sink->socket) {
1286     GST_DEBUG_OBJECT (sink, "using configured socket");
1287     if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) {
1288       sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket));
1289       sink->external_socket = TRUE;
1290     } else {
1291       sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
1292       sink->external_socket = TRUE;
1293     }
1294   }
1295
1296   if (sink->socket_v6) {
1297     GST_DEBUG_OBJECT (sink, "using configured IPv6 socket");
1298     g_return_val_if_fail (g_socket_get_family (sink->socket) !=
1299         G_SOCKET_FAMILY_IPV6, FALSE);
1300
1301     if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) {
1302       GST_ERROR_OBJECT (sink,
1303           "Provided different IPv6 sockets in socket and socket-v6 properties");
1304       return FALSE;
1305     }
1306
1307     sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6));
1308     sink->external_socket = TRUE;
1309   }
1310
1311   if (!sink->used_socket && !sink->used_socket_v6) {
1312     GSocketAddress *bind_addr;
1313     GInetAddress *bind_iaddr;
1314
1315     if (sink->bind_address) {
1316       GSocketFamily family;
1317
1318       bind_iaddr = g_inet_address_new_from_string (sink->bind_address);
1319       if (!bind_iaddr) {
1320         GList *results;
1321         GResolver *resolver;
1322
1323         resolver = g_resolver_get_default ();
1324         results =
1325             g_resolver_lookup_by_name (resolver, sink->bind_address,
1326             sink->cancellable, &err);
1327         if (!results) {
1328           g_object_unref (resolver);
1329           goto name_resolve;
1330         }
1331         bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
1332         g_resolver_free_addresses (results);
1333         g_object_unref (resolver);
1334       }
1335
1336       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1337       g_object_unref (bind_iaddr);
1338       family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
1339
1340       if ((sink->used_socket =
1341               g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1342                   G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
1343         g_object_unref (bind_addr);
1344         goto no_socket;
1345       }
1346
1347       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1348       if (err != NULL)
1349         goto bind_error;
1350     } else {
1351       /* create sender sockets if none available */
1352       if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
1353                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1354         goto no_socket;
1355
1356       bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
1357       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1358       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1359       g_object_unref (bind_addr);
1360       g_object_unref (bind_iaddr);
1361       if (err != NULL)
1362         goto bind_error;
1363
1364       if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
1365                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
1366                   &err)) == NULL) {
1367         GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s",
1368             err->message);
1369         g_clear_error (&err);
1370       } else {
1371         bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
1372         bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1373         g_socket_bind (sink->used_socket_v6, bind_addr, TRUE, &err);
1374         g_object_unref (bind_addr);
1375         g_object_unref (bind_iaddr);
1376         if (err != NULL)
1377           goto bind_error;
1378       }
1379     }
1380   }
1381 #ifdef SO_SNDBUF
1382   {
1383     socklen_t len;
1384     gint sndsize, ret;
1385
1386     len = sizeof (sndsize);
1387     if (sink->buffer_size != 0) {
1388       sndsize = sink->buffer_size;
1389
1390       GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
1391       /* set buffer size, Note that on Linux this is typically limited to a
1392        * maximum of around 100K. Also a minimum of 128 bytes is required on
1393        * Linux. */
1394
1395       if (sink->used_socket) {
1396         ret =
1397             setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1398             SO_SNDBUF, (void *) &sndsize, len);
1399         if (ret != 0) {
1400           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1401               ("Could not create a buffer of requested %d bytes, %d: %s",
1402                   sndsize, ret, g_strerror (errno)));
1403         }
1404       }
1405
1406       if (sink->used_socket_v6) {
1407         ret =
1408             setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1409             SO_SNDBUF, (void *) &sndsize, len);
1410         if (ret != 0) {
1411           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1412               ("Could not create a buffer of requested %d bytes, %d: %s",
1413                   sndsize, ret, g_strerror (errno)));
1414         }
1415       }
1416     }
1417
1418     /* read the value of the receive buffer. Note that on linux this returns 2x the
1419      * value we set because the kernel allocates extra memory for metadata.
1420      * The default on Linux is about 100K (which is about 50K without metadata) */
1421     if (sink->used_socket) {
1422       ret =
1423           getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1424           SO_SNDBUF, (void *) &sndsize, &len);
1425       if (ret == 0)
1426         GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize);
1427       else
1428         GST_DEBUG_OBJECT (sink, "could not get UDP buffer size");
1429     }
1430
1431     if (sink->used_socket_v6) {
1432       ret =
1433           getsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1434           SO_SNDBUF, (void *) &sndsize, &len);
1435       if (ret == 0)
1436         GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize);
1437       else
1438         GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size");
1439     }
1440   }
1441 #endif
1442
1443 #ifdef SO_BINDTODEVICE
1444   if (sink->multi_iface) {
1445     if (sink->used_socket) {
1446       if (setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1447               SO_BINDTODEVICE, sink->multi_iface,
1448               strlen (sink->multi_iface)) < 0)
1449         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed: %s",
1450             strerror (errno));
1451     }
1452     if (sink->used_socket_v6) {
1453       if (setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1454               SO_BINDTODEVICE, sink->multi_iface,
1455               strlen (sink->multi_iface)) < 0)
1456         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed (v6): %s",
1457             strerror (errno));
1458     }
1459   }
1460 #endif
1461
1462   if (sink->used_socket)
1463     g_socket_set_broadcast (sink->used_socket, TRUE);
1464   if (sink->used_socket_v6)
1465     g_socket_set_broadcast (sink->used_socket_v6, TRUE);
1466
1467   sink->bytes_to_serve = 0;
1468   sink->bytes_served = 0;
1469
1470   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket);
1471   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6);
1472
1473   /* look for multicast clients and join multicast groups appropriately
1474      set also ttl and multicast loopback delivery appropriately  */
1475   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
1476     client = (GstUDPClient *) clients->data;
1477
1478     if (!gst_multiudpsink_configure_client (sink, client))
1479       return FALSE;
1480   }
1481   return TRUE;
1482
1483   /* ERRORS */
1484 no_socket:
1485   {
1486     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1487         ("Could not create socket: %s", err->message));
1488     g_clear_error (&err);
1489     return FALSE;
1490   }
1491 bind_error:
1492   {
1493     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1494         ("Failed to bind socket: %s", err->message));
1495     g_clear_error (&err);
1496     return FALSE;
1497   }
1498 name_resolve:
1499   {
1500     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1501         ("Failed to resolve bind address %s: %s", sink->bind_address,
1502             err->message));
1503     g_clear_error (&err);
1504     return FALSE;
1505   }
1506 }
1507
1508 static gboolean
1509 gst_multiudpsink_stop (GstBaseSink * bsink)
1510 {
1511   GstMultiUDPSink *udpsink;
1512
1513   udpsink = GST_MULTIUDPSINK (bsink);
1514
1515   if (udpsink->used_socket) {
1516     if (udpsink->close_socket || !udpsink->external_socket) {
1517       GError *err = NULL;
1518
1519       if (!g_socket_close (udpsink->used_socket, &err)) {
1520         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1521         g_clear_error (&err);
1522       }
1523     }
1524
1525     g_object_unref (udpsink->used_socket);
1526     udpsink->used_socket = NULL;
1527   }
1528
1529   if (udpsink->used_socket_v6) {
1530     if (udpsink->close_socket || !udpsink->external_socket) {
1531       GError *err = NULL;
1532
1533       if (!g_socket_close (udpsink->used_socket_v6, &err)) {
1534         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1535         g_clear_error (&err);
1536       }
1537     }
1538
1539     g_object_unref (udpsink->used_socket_v6);
1540     udpsink->used_socket_v6 = NULL;
1541   }
1542
1543   return TRUE;
1544 }
1545
1546 static gint
1547 gst_udp_client_compare_socket_family (GstUDPClient * a, GstUDPClient * b)
1548 {
1549   GSocketFamily fa = g_socket_address_get_family (a->addr);
1550   GSocketFamily fb = g_socket_address_get_family (b->addr);
1551
1552   if (fa == fb)
1553     return 0;
1554
1555   /* a should go before b */
1556   if (fa == G_SOCKET_FAMILY_IPV4 && fb == G_SOCKET_FAMILY_IPV6)
1557     return -1;
1558
1559   /* b should go before a */
1560   return 1;
1561 }
1562
1563 static void
1564 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1565     gint port, gboolean lock)
1566 {
1567   GSocketFamily family;
1568   GstUDPClient *client;
1569   GstUDPClient udpclient;
1570   GTimeVal now;
1571   GList *find;
1572
1573   udpclient.host = (gchar *) host;
1574   udpclient.port = port;
1575
1576   GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1577
1578   if (lock)
1579     g_mutex_lock (&sink->client_lock);
1580
1581   find = g_list_find_custom (sink->clients, &udpclient,
1582       (GCompareFunc) client_compare);
1583
1584   if (!find) {
1585     find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1586         (GCompareFunc) client_compare);
1587     if (find)
1588       gst_udp_client_ref (find->data);
1589   }
1590
1591   if (find) {
1592     client = (GstUDPClient *) find->data;
1593
1594     family = g_socket_address_get_family (client->addr);
1595
1596     GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1597         client->add_count, host, port);
1598   } else {
1599     client = gst_udp_client_new (sink, host, port);
1600     if (!client)
1601       goto error;
1602
1603     family = g_socket_address_get_family (client->addr);
1604
1605     g_get_current_time (&now);
1606     client->connect_time = GST_TIMEVAL_TO_TIME (now);
1607
1608     if (sink->used_socket)
1609       gst_multiudpsink_configure_client (sink, client);
1610
1611     GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1612
1613     /* keep IPv4 clients at the beginning, and IPv6 at the end, we can make
1614      * use of this in gst_multiudpsink_render_buffers() */
1615     sink->clients = g_list_insert_sorted (sink->clients, client,
1616         (GCompareFunc) gst_udp_client_compare_socket_family);
1617
1618     if (family == G_SOCKET_FAMILY_IPV4)
1619       ++sink->num_v4_unique;
1620     else
1621       ++sink->num_v6_unique;
1622   }
1623
1624   ++client->add_count;
1625
1626   if (family == G_SOCKET_FAMILY_IPV4)
1627     ++sink->num_v4_all;
1628   else
1629     ++sink->num_v6_all;
1630
1631   if (lock)
1632     g_mutex_unlock (&sink->client_lock);
1633
1634   g_signal_emit (G_OBJECT (sink),
1635       gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1636
1637   GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1638   return;
1639
1640   /* ERRORS */
1641 error:
1642   {
1643     GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1644         port);
1645     if (lock)
1646       g_mutex_unlock (&sink->client_lock);
1647     return;
1648   }
1649 }
1650
1651 void
1652 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1653 {
1654   gst_multiudpsink_add_internal (sink, host, port, TRUE);
1655 }
1656
1657 void
1658 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1659 {
1660   GSocketFamily family;
1661   GList *find;
1662   GstUDPClient udpclient;
1663   GstUDPClient *client;
1664   GTimeVal now;
1665
1666   udpclient.host = (gchar *) host;
1667   udpclient.port = port;
1668
1669   g_mutex_lock (&sink->client_lock);
1670   find = g_list_find_custom (sink->clients, &udpclient,
1671       (GCompareFunc) client_compare);
1672   if (!find)
1673     goto not_found;
1674
1675   client = (GstUDPClient *) find->data;
1676
1677   GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1678       client->add_count, host, port);
1679
1680   --client->add_count;
1681
1682   family = g_socket_address_get_family (client->addr);
1683   if (family == G_SOCKET_FAMILY_IPV4)
1684     --sink->num_v4_all;
1685   else
1686     --sink->num_v6_all;
1687
1688   if (client->add_count == 0) {
1689     GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1690     GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1691     GSocket *socket;
1692
1693     /* Select socket to send from for this address */
1694     if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1695       socket = sink->used_socket_v6;
1696     else
1697       socket = sink->used_socket;
1698
1699     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1700
1701     g_get_current_time (&now);
1702     client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
1703
1704     if (socket && sink->auto_multicast
1705         && g_inet_address_get_is_multicast (addr)) {
1706       GError *err = NULL;
1707
1708       if (!g_socket_leave_multicast_group (socket, addr, FALSE,
1709               sink->multi_iface, &err)) {
1710         GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s",
1711             err->message);
1712         g_clear_error (&err);
1713       }
1714     }
1715
1716     if (family == G_SOCKET_FAMILY_IPV4)
1717       --sink->num_v4_unique;
1718     else
1719       --sink->num_v6_unique;
1720
1721     /* Keep state consistent for streaming thread, so remove from client list,
1722      * but keep it around until after the signal has been emitted, in case a
1723      * callback wants to get stats for that client or so */
1724     sink->clients = g_list_delete_link (sink->clients, find);
1725
1726     sink->clients_to_be_removed =
1727         g_list_prepend (sink->clients_to_be_removed, client);
1728
1729     /* Unlock to emit signal before we delete the actual client */
1730     g_mutex_unlock (&sink->client_lock);
1731     g_signal_emit (G_OBJECT (sink),
1732         gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1733     g_mutex_lock (&sink->client_lock);
1734
1735     sink->clients_to_be_removed =
1736         g_list_remove (sink->clients_to_be_removed, client);
1737
1738     gst_udp_client_unref (client);
1739   }
1740   g_mutex_unlock (&sink->client_lock);
1741
1742   return;
1743
1744   /* ERRORS */
1745 not_found:
1746   {
1747     g_mutex_unlock (&sink->client_lock);
1748     GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1749         host, port);
1750     return;
1751   }
1752 }
1753
1754 static void
1755 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1756 {
1757   GST_DEBUG_OBJECT (sink, "clearing");
1758   /* we only need to remove the client structure, there is no additional
1759    * socket or anything to free for UDP */
1760   if (lock)
1761     g_mutex_lock (&sink->client_lock);
1762   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, sink);
1763   g_list_free (sink->clients);
1764   sink->clients = NULL;
1765   sink->num_v4_unique = 0;
1766   sink->num_v4_all = 0;
1767   sink->num_v6_unique = 0;
1768   sink->num_v6_all = 0;
1769   if (lock)
1770     g_mutex_unlock (&sink->client_lock);
1771 }
1772
1773 void
1774 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1775 {
1776   gst_multiudpsink_clear_internal (sink, TRUE);
1777 }
1778
1779 GstStructure *
1780 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1781     gint port)
1782 {
1783   GstUDPClient *client;
1784   GstStructure *result = NULL;
1785   GstUDPClient udpclient;
1786   GList *find;
1787
1788   udpclient.host = (gchar *) host;
1789   udpclient.port = port;
1790
1791   g_mutex_lock (&sink->client_lock);
1792
1793   find = g_list_find_custom (sink->clients, &udpclient,
1794       (GCompareFunc) client_compare);
1795
1796   if (!find)
1797     find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1798         (GCompareFunc) client_compare);
1799
1800   if (!find)
1801     goto not_found;
1802
1803   GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1804
1805   client = (GstUDPClient *) find->data;
1806
1807   result = gst_structure_new_empty ("multiudpsink-stats");
1808
1809   gst_structure_set (result,
1810       "bytes-sent", G_TYPE_UINT64, client->bytes_sent,
1811       "packets-sent", G_TYPE_UINT64, client->packets_sent,
1812       "connect-time", G_TYPE_UINT64, client->connect_time,
1813       "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL);
1814
1815   g_mutex_unlock (&sink->client_lock);
1816
1817   return result;
1818
1819   /* ERRORS */
1820 not_found:
1821   {
1822     g_mutex_unlock (&sink->client_lock);
1823     GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1824         host, port);
1825     /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1826      * confuse/break python bindings */
1827     return gst_structure_new_empty ("multiudpsink-stats");
1828   }
1829 }
1830
1831 static gboolean
1832 gst_multiudpsink_unlock (GstBaseSink * bsink)
1833 {
1834   GstMultiUDPSink *sink;
1835
1836   sink = GST_MULTIUDPSINK (bsink);
1837
1838   g_cancellable_cancel (sink->cancellable);
1839
1840   return TRUE;
1841 }
1842
1843 static gboolean
1844 gst_multiudpsink_unlock_stop (GstBaseSink * bsink)
1845 {
1846   GstMultiUDPSink *sink;
1847
1848   sink = GST_MULTIUDPSINK (bsink);
1849
1850   g_cancellable_reset (sink->cancellable);
1851
1852   return TRUE;
1853 }