remove unused enum items PROP_LAST
[platform/upstream/gst-plugins-good.git] / gst / udp / gstmultiudpsink.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4  * Copyright (C) <2012> Collabora Ltd.
5  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-multiudpsink
25  * @see_also: udpsink, multifdsink
26  *
27  * multiudpsink is a network sink that sends UDP packets to multiple
28  * clients.
29  * It can be combined with rtp payload encoders to implement RTP streaming.
30  */
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 #include "gstmultiudpsink.h"
36
37 #include <string.h>
38 #ifdef HAVE_SYS_SOCKET_H
39 #include <sys/socket.h>
40 #endif
41
42 #ifndef G_OS_WIN32
43 #include <netinet/in.h>
44 #endif
45
46 #include "gst/glib-compat-private.h"
47
48 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
49 #define GST_CAT_DEFAULT (multiudpsink_debug)
50
51 #define UDP_MAX_SIZE 65507
52
53 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
54     GST_PAD_SINK,
55     GST_PAD_ALWAYS,
56     GST_STATIC_CAPS_ANY);
57
58 /* MultiUDPSink signals and args */
59 enum
60 {
61   /* methods */
62   SIGNAL_ADD,
63   SIGNAL_REMOVE,
64   SIGNAL_CLEAR,
65   SIGNAL_GET_STATS,
66
67   /* signals */
68   SIGNAL_CLIENT_ADDED,
69   SIGNAL_CLIENT_REMOVED,
70
71   /* FILL ME */
72   LAST_SIGNAL
73 };
74
75 #define DEFAULT_SOCKET             NULL
76 #define DEFAULT_CLOSE_SOCKET       TRUE
77 #define DEFAULT_USED_SOCKET        NULL
78 #define DEFAULT_CLIENTS            NULL
79 /* FIXME, this should be disabled by default, we don't need to join a multicast
80  * group for sending, if this socket is also used for receiving, it should
81  * be configured in the element that does the receive. */
82 #define DEFAULT_AUTO_MULTICAST     TRUE
83 #define DEFAULT_MULTICAST_IFACE    NULL
84 #define DEFAULT_TTL                64
85 #define DEFAULT_TTL_MC             1
86 #define DEFAULT_LOOP               TRUE
87 #define DEFAULT_FORCE_IPV4         FALSE
88 #define DEFAULT_QOS_DSCP           -1
89 #define DEFAULT_SEND_DUPLICATES    TRUE
90 #define DEFAULT_BUFFER_SIZE        0
91 #define DEFAULT_BIND_ADDRESS       NULL
92 #define DEFAULT_BIND_PORT          0
93
94 enum
95 {
96   PROP_0,
97   PROP_BYTES_TO_SERVE,
98   PROP_BYTES_SERVED,
99   PROP_SOCKET,
100   PROP_SOCKET_V6,
101   PROP_CLOSE_SOCKET,
102   PROP_USED_SOCKET,
103   PROP_USED_SOCKET_V6,
104   PROP_CLIENTS,
105   PROP_AUTO_MULTICAST,
106   PROP_MULTICAST_IFACE,
107   PROP_TTL,
108   PROP_TTL_MC,
109   PROP_LOOP,
110   PROP_FORCE_IPV4,
111   PROP_QOS_DSCP,
112   PROP_SEND_DUPLICATES,
113   PROP_BUFFER_SIZE,
114   PROP_BIND_ADDRESS,
115   PROP_BIND_PORT
116 };
117
118 static void gst_multiudpsink_finalize (GObject * object);
119
120 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
121     GstBuffer * buffer);
122 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
123     GstBufferList * buffer_list);
124
125 static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
126 static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
127 static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink);
128 static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink);
129
130 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
131     const GValue * value, GParamSpec * pspec);
132 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
133     GValue * value, GParamSpec * pspec);
134
135 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
136     const gchar * host, gint port, gboolean lock);
137 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
138     gboolean lock);
139
140 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
141
142 #define gst_multiudpsink_parent_class parent_class
143 G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
144
145 static void
146 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
147 {
148   GObjectClass *gobject_class;
149   GstElementClass *gstelement_class;
150   GstBaseSinkClass *gstbasesink_class;
151
152   gobject_class = (GObjectClass *) klass;
153   gstelement_class = (GstElementClass *) klass;
154   gstbasesink_class = (GstBaseSinkClass *) klass;
155
156   gobject_class->set_property = gst_multiudpsink_set_property;
157   gobject_class->get_property = gst_multiudpsink_get_property;
158   gobject_class->finalize = gst_multiudpsink_finalize;
159
160   /**
161    * GstMultiUDPSink::add:
162    * @gstmultiudpsink: the sink on which the signal is emitted
163    * @host: the hostname/IP address of the client to add
164    * @port: the port of the client to add
165    *
166    * Add a client with destination @host and @port to the list of
167    * clients. When the same host/port pair is added multiple times, the
168    * send-duplicates property defines if the packets are sent multiple times to
169    * the same host/port pair or not.
170    *
171    * When a host/port pair is added multiple times, an equal amount of remove
172    * calls must be performed to actually remove the host/port pair from the list
173    * of destinations.
174    */
175   gst_multiudpsink_signals[SIGNAL_ADD] =
176       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
177       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
178       G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
179       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
180       G_TYPE_STRING, G_TYPE_INT);
181   /**
182    * GstMultiUDPSink::remove:
183    * @gstmultiudpsink: the sink on which the signal is emitted
184    * @host: the hostname/IP address of the client to remove
185    * @port: the port of the client to remove
186    *
187    * Remove the client with destination @host and @port from the list of
188    * clients.
189    */
190   gst_multiudpsink_signals[SIGNAL_REMOVE] =
191       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
192       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
193       G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
194       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
195       G_TYPE_STRING, G_TYPE_INT);
196   /**
197    * GstMultiUDPSink::clear:
198    * @gstmultiudpsink: the sink on which the signal is emitted
199    *
200    * Clear the list of clients.
201    */
202   gst_multiudpsink_signals[SIGNAL_CLEAR] =
203       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
204       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
205       G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
206       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0);
207   /**
208    * GstMultiUDPSink::get-stats:
209    * @gstmultiudpsink: the sink on which the signal is emitted
210    * @host: the hostname/IP address of the client to get stats on
211    * @port: the port of the client to get stats on
212    *
213    * Get the statistics of the client with destination @host and @port.
214    *
215    * Returns: a GstStructure: bytes_sent, packets_sent,
216    *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
217    */
218   gst_multiudpsink_signals[SIGNAL_GET_STATS] =
219       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
220       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
221       G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
222       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 2,
223       G_TYPE_STRING, G_TYPE_INT);
224   /**
225    * GstMultiUDPSink::client-added:
226    * @gstmultiudpsink: the sink emitting the signal
227    * @host: the hostname/IP address of the added client
228    * @port: the port of the added client
229    *
230    * Signal emited when a new client is added to the list of
231    * clients.
232    */
233   gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
234       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
235       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
236       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
237       G_TYPE_STRING, G_TYPE_INT);
238   /**
239    * GstMultiUDPSink::client-removed:
240    * @gstmultiudpsink: the sink emitting the signal
241    * @host: the hostname/IP address of the removed client
242    * @port: the port of the removed client
243    *
244    * Signal emited when a client is removed from the list of
245    * clients.
246    */
247   gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
248       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
249       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
250           client_removed), NULL, NULL, g_cclosure_marshal_generic,
251       G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
252
253   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
254       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
255           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
256           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
257   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
258       g_param_spec_uint64 ("bytes-served", "Bytes served",
259           "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0,
260           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
261   g_object_class_install_property (gobject_class, PROP_SOCKET,
262       g_param_spec_object ("socket", "Socket Handle",
263           "Socket to use for UDP sending. (NULL == allocate)",
264           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265   g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
266       g_param_spec_object ("socket-v6", "Socket Handle IPv6",
267           "Socket to use for UDPv6 sending. (NULL == allocate)",
268           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
269   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
270       g_param_spec_boolean ("close-socket", "Close socket",
271           "Close socket if passed as property on state change",
272           DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273   g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
274       g_param_spec_object ("used-socket", "Used Socket Handle",
275           "Socket currently in use for UDP sending. (NULL == no socket)",
276           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
277   g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
278       g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
279           "Socket currently in use for UDPv6 sending. (NULL == no socket)",
280           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
281   g_object_class_install_property (gobject_class, PROP_CLIENTS,
282       g_param_spec_string ("clients", "Clients",
283           "A comma separated list of host:port pairs with destinations",
284           DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
286       g_param_spec_boolean ("auto-multicast",
287           "Automatically join/leave multicast groups",
288           "Automatically join/leave the multicast groups, FALSE means user"
289           " has to do it himself", DEFAULT_AUTO_MULTICAST,
290           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
292       g_param_spec_string ("multicast-iface", "Multicast Interface",
293           "The network interface on which to join the multicast group",
294           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
295   g_object_class_install_property (gobject_class, PROP_TTL,
296       g_param_spec_int ("ttl", "Unicast TTL",
297           "Used for setting the unicast TTL parameter",
298           0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
299   g_object_class_install_property (gobject_class, PROP_TTL_MC,
300       g_param_spec_int ("ttl-mc", "Multicast TTL",
301           "Used for setting the multicast TTL parameter",
302           0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303   g_object_class_install_property (gobject_class, PROP_LOOP,
304       g_param_spec_boolean ("loop", "Multicast Loopback",
305           "Used for setting the multicast loop parameter. TRUE = enable,"
306           " FALSE = disable", DEFAULT_LOOP,
307           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308   /**
309    * GstMultiUDPSink::force-ipv4:
310    *
311    * Force the use of an IPv4 socket.
312    *
313    * Since: 1.0.2
314    */
315 #ifndef GST_REMOVE_DEPRECATED
316   g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
317       g_param_spec_boolean ("force-ipv4", "Force IPv4",
318           "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
319           DEFAULT_FORCE_IPV4,
320           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
321 #endif
322   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
323       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
324           "Quality of Service, differentiated services code point (-1 default)",
325           -1, 63, DEFAULT_QOS_DSCP,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327   /**
328    * GstMultiUDPSink::send-duplicates:
329    *
330    * When a host/port pair is added mutliple times, send the packet to the host
331    * multiple times as well.
332    */
333   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
334       g_param_spec_boolean ("send-duplicates", "Send Duplicates",
335           "When a distination/port pair is added multiple times, send packets "
336           "multiple times as well", DEFAULT_SEND_DUPLICATES,
337           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338
339   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
340       g_param_spec_int ("buffer-size", "Buffer Size",
341           "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
342           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
343
344   g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
345       g_param_spec_string ("bind-address", "Bind Address",
346           "Address to bind the socket to", DEFAULT_BIND_ADDRESS,
347           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348   g_object_class_install_property (gobject_class, PROP_BIND_PORT,
349       g_param_spec_int ("bind-port", "Bind Port",
350           "Port to bind the socket to", 0, G_MAXUINT16,
351           DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352
353   gst_element_class_add_pad_template (gstelement_class,
354       gst_static_pad_template_get (&sink_template));
355
356   gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
357       "Sink/Network",
358       "Send data over the network via UDP to one or multiple recipients "
359       "which can be added or removed at runtime using action signals",
360       "Wim Taymans <wim.taymans@gmail.com>");
361
362   gstbasesink_class->render = gst_multiudpsink_render;
363   gstbasesink_class->render_list = gst_multiudpsink_render_list;
364   gstbasesink_class->start = gst_multiudpsink_start;
365   gstbasesink_class->stop = gst_multiudpsink_stop;
366   gstbasesink_class->unlock = gst_multiudpsink_unlock;
367   gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop;
368   klass->add = gst_multiudpsink_add;
369   klass->remove = gst_multiudpsink_remove;
370   klass->clear = gst_multiudpsink_clear;
371   klass->get_stats = gst_multiudpsink_get_stats;
372
373   GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
374 }
375
376 static void
377 gst_multiudpsink_init (GstMultiUDPSink * sink)
378 {
379   guint max_mem;
380
381   g_mutex_init (&sink->client_lock);
382   sink->clients = NULL;
383   sink->num_v4_unique = 0;
384   sink->num_v4_all = 0;
385   sink->num_v6_unique = 0;
386   sink->num_v6_all = 0;
387
388   sink->socket = DEFAULT_SOCKET;
389   sink->socket_v6 = DEFAULT_SOCKET;
390   sink->used_socket = DEFAULT_USED_SOCKET;
391   sink->used_socket_v6 = DEFAULT_USED_SOCKET;
392   sink->close_socket = DEFAULT_CLOSE_SOCKET;
393   sink->external_socket = (sink->socket != NULL);
394   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
395   sink->ttl = DEFAULT_TTL;
396   sink->ttl_mc = DEFAULT_TTL_MC;
397   sink->loop = DEFAULT_LOOP;
398   sink->force_ipv4 = DEFAULT_FORCE_IPV4;
399   sink->qos_dscp = DEFAULT_QOS_DSCP;
400   sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
401   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
402
403   sink->cancellable = g_cancellable_new ();
404
405   /* pre-allocate OutputVector, MapInfo and OutputMessage arrays
406    * for use in the render and render_list functions */
407   max_mem = gst_buffer_get_max_memory ();
408
409   sink->n_vecs = max_mem;
410   sink->vecs = g_new (GOutputVector, sink->n_vecs);
411
412   sink->n_maps = max_mem;
413   sink->maps = g_new (GstMapInfo, sink->n_maps);
414
415   sink->n_messages = 1;
416   sink->messages = g_new (GstOutputMessage, sink->n_messages);
417
418   /* we assume that the number of memories per buffer can fit into a guint8 */
419   g_warn_if_fail (max_mem <= G_MAXUINT8);
420 }
421
422 static GstUDPClient *
423 gst_udp_client_new (GstMultiUDPSink * sink, const gchar * host, gint port)
424 {
425   GstUDPClient *client;
426   GInetAddress *addr;
427   GResolver *resolver;
428   GError *err = NULL;
429
430   addr = g_inet_address_new_from_string (host);
431   if (!addr) {
432     GList *results;
433
434     resolver = g_resolver_get_default ();
435     results =
436         g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err);
437     if (!results)
438       goto name_resolve;
439     addr = G_INET_ADDRESS (g_object_ref (results->data));
440
441     g_resolver_free_addresses (results);
442     g_object_unref (resolver);
443   }
444 #ifndef GST_DISABLE_GST_DEBUG
445   {
446     gchar *ip = g_inet_address_to_string (addr);
447
448     GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip);
449     g_free (ip);
450   }
451 #endif
452
453   client = g_slice_new0 (GstUDPClient);
454   client->ref_count = 1;
455   client->add_count = 0;
456   client->host = g_strdup (host);
457   client->port = port;
458   client->addr = g_inet_socket_address_new (addr, port);
459   g_object_unref (addr);
460
461   return client;
462
463 name_resolve:
464   {
465     g_object_unref (resolver);
466
467     return NULL;
468   }
469 }
470
471 /* call with client lock held */
472 static void
473 gst_udp_client_unref (GstUDPClient * client)
474 {
475   if (--client->ref_count == 0) {
476     g_object_unref (client->addr);
477     g_free (client->host);
478     g_slice_free (GstUDPClient, client);
479   }
480 }
481
482 /* call with client lock held */
483 static inline GstUDPClient *
484 gst_udp_client_ref (GstUDPClient * client)
485 {
486   ++client->ref_count;
487   return client;
488 }
489
490 static gint
491 client_compare (GstUDPClient * a, GstUDPClient * b)
492 {
493   if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
494     return 0;
495
496   return 1;
497 }
498
499 static void
500 gst_multiudpsink_finalize (GObject * object)
501 {
502   GstMultiUDPSink *sink;
503
504   sink = GST_MULTIUDPSINK (object);
505
506   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, NULL);
507   g_list_free (sink->clients);
508
509   if (sink->socket)
510     g_object_unref (sink->socket);
511   sink->socket = NULL;
512
513   if (sink->socket_v6)
514     g_object_unref (sink->socket_v6);
515   sink->socket_v6 = NULL;
516
517   if (sink->used_socket)
518     g_object_unref (sink->used_socket);
519   sink->used_socket = NULL;
520
521   if (sink->used_socket_v6)
522     g_object_unref (sink->used_socket_v6);
523   sink->used_socket_v6 = NULL;
524
525   if (sink->cancellable)
526     g_object_unref (sink->cancellable);
527   sink->cancellable = NULL;
528
529   g_free (sink->multi_iface);
530   sink->multi_iface = NULL;
531
532   g_free (sink->vecs);
533   sink->vecs = NULL;
534   g_free (sink->maps);
535   sink->maps = NULL;
536   g_free (sink->messages);
537   sink->messages = NULL;
538
539   g_free (sink->bind_address);
540   sink->bind_address = NULL;
541
542   g_mutex_clear (&sink->client_lock);
543
544   G_OBJECT_CLASS (parent_class)->finalize (object);
545 }
546
547 /* replacement until we can depend unconditionally on the real one in GLib */
548 #ifndef HAVE_G_SOCKET_SEND_MESSAGES
549 #define g_socket_send_messages gst_socket_send_messages
550
551 static gint
552 gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages,
553     guint num_messages, gint flags, GCancellable * cancellable, GError ** error)
554 {
555   gssize result;
556   gint i;
557
558   for (i = 0; i < num_messages; ++i) {
559     GstOutputMessage *msg = &messages[i];
560     GError *msg_error = NULL;
561
562     result = g_socket_send_message (socket, msg->address,
563         msg->vectors, msg->num_vectors,
564         msg->control_messages, msg->num_control_messages,
565         flags, cancellable, &msg_error);
566
567     if (result < 0) {
568       /* if we couldn't send all messages, just return how many we did
569        * manage to send, provided we managed to send at least one */
570       if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) {
571         g_error_free (msg_error);
572         return i;
573       } else {
574         g_propagate_error (error, msg_error);
575         return -1;
576       }
577     }
578
579     msg->bytes_sent = result;
580   }
581
582   return i;
583 }
584 #endif /* HAVE_G_SOCKET_SEND_MESSAGES */
585
586 static gsize
587 fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
588 {
589   GstMemory *mem;
590   gsize size = 0;
591   guint i;
592
593   g_assert (gst_buffer_n_memory (buf) == n);
594
595   for (i = 0; i < n; ++i) {
596     mem = gst_buffer_peek_memory (buf, i);
597     if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
598       vecs[i].buffer = maps[i].data;
599       vecs[i].size = maps[i].size;
600     } else {
601       GST_WARNING ("Failed to map memory %p for reading", mem);
602       vecs[i].buffer = "";
603       vecs[i].size = 0;
604     }
605     size += vecs[i].size;
606   }
607
608   return size;
609 }
610
611 static gsize
612 gst_udp_calc_message_size (GstOutputMessage * msg)
613 {
614   gsize size = 0;
615   guint i;
616
617   for (i = 0; i < msg->num_vectors; ++i)
618     size += msg->vectors[i].size;
619
620   return size;
621 }
622
623 static gint
624 gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
625     guint num_messages)
626 {
627   guint i;
628
629   for (i = 0; i < num_messages; ++i) {
630     GstOutputMessage *msg = &messages[i];
631
632     if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
633       return i;
634   }
635
636   return -1;
637 }
638
639 static inline gchar *
640 gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
641 {
642   GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
643   GInetAddress *ia;
644   gchar *addr_str;
645
646   ia = g_inet_socket_address_get_address (isa);
647   addr_str = g_inet_address_to_string (ia);
648   g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
649   g_free (addr_str);
650
651   return s;
652 }
653
654 /* Wrapper around g_socket_send_messages() plus error handling (ignoring).
655  * Returns FALSE if we got cancelled, otherwise TRUE. */
656 static gboolean
657 gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
658     GstOutputMessage * messages, guint num_messages)
659 {
660   gboolean sent_max_size_warning = FALSE;
661
662   while (num_messages > 0) {
663     gchar astr[64] G_GNUC_UNUSED;
664     GError *err = NULL;
665     guint msg_size, skip, i;
666     gint ret, err_idx;
667
668     ret = g_socket_send_messages (socket, messages, num_messages, 0,
669         sink->cancellable, &err);
670
671     if (G_UNLIKELY (ret < 0)) {
672       GstOutputMessage *msg;
673
674       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
675         g_clear_error (&err);
676         return FALSE;
677       }
678
679       err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
680       if (err_idx < 0)
681         break;
682
683       msg = &messages[err_idx];
684       msg_size = gst_udp_calc_message_size (msg);
685
686       GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
687           gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
688           err->message);
689
690       skip = 1;
691       if (msg_size > UDP_MAX_SIZE) {
692         if (!sent_max_size_warning) {
693           GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
694               ("Attempting to send a UDP packets larger than maximum size "
695                   "(%u > %d)", msg_size, UDP_MAX_SIZE),
696               ("Reason: %s", err ? err->message : "unknown reason"));
697           sent_max_size_warning = FALSE;
698         }
699       } else {
700         GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
701             ("Error sending UDP packets"), ("client %s, reason: %s",
702                 gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
703                 (err != NULL) ? err->message : "unknown reason"));
704
705         for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
706           if (messages[i].address != msg->address)
707             break;
708         }
709         GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
710       }
711
712       /* ignore any errors and try sending the rest */
713       g_clear_error (&err);
714       ret = skip;
715     }
716
717     g_assert (ret <= num_messages);
718
719     messages += ret;
720     num_messages -= ret;
721   }
722
723   return TRUE;
724 }
725
726 static GstFlowReturn
727 gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
728     guint num_buffers, guint8 * mem_nums, guint total_mem_num)
729 {
730   GstOutputMessage *msgs;
731   gboolean send_duplicates;
732   GstUDPClient **clients;
733   GOutputVector *vecs;
734   GstMapInfo *map_infos;
735   GstFlowReturn flow_ret;
736   guint num_addr_v4, num_addr_v6;
737   guint num_addr, num_msgs;
738   GError *err = NULL;
739   guint i, j, mem;
740   gsize size = 0;
741   GList *l;
742
743   send_duplicates = sink->send_duplicates;
744
745   g_mutex_lock (&sink->client_lock);
746
747   if (send_duplicates) {
748     num_addr_v4 = sink->num_v4_all;
749     num_addr_v6 = sink->num_v6_all;
750   } else {
751     num_addr_v4 = sink->num_v4_unique;
752     num_addr_v6 = sink->num_v6_unique;
753   }
754   num_addr = num_addr_v4 + num_addr_v6;
755
756   if (num_addr == 0)
757     goto no_clients;
758
759   clients = g_newa (GstUDPClient *, num_addr);
760   for (l = sink->clients, i = 0; l != NULL; l = l->next) {
761     GstUDPClient *client = l->data;
762
763     clients[i++] = gst_udp_client_ref (client);
764     for (j = 1; send_duplicates && j < client->add_count; ++j)
765       clients[i++] = gst_udp_client_ref (client);
766   }
767   g_assert_cmpuint (i, ==, num_addr);
768
769   g_mutex_unlock (&sink->client_lock);
770
771   GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
772       num_buffers, total_mem_num, num_addr);
773
774   /* ensure our pre-allocated scratch space arrays are large enough */
775   if (sink->n_vecs < total_mem_num) {
776     sink->n_vecs = GST_ROUND_UP_16 (total_mem_num);
777     g_free (sink->vecs);
778     sink->vecs = g_new (GOutputVector, sink->n_vecs);
779   }
780   vecs = sink->vecs;
781
782   if (sink->n_maps < total_mem_num) {
783     sink->n_maps = GST_ROUND_UP_16 (total_mem_num);
784     g_free (sink->maps);
785     sink->maps = g_new (GstMapInfo, sink->n_maps);
786   }
787   map_infos = sink->maps;
788
789   num_msgs = num_addr * num_buffers;
790   if (sink->n_messages < num_msgs) {
791     sink->n_messages = GST_ROUND_UP_16 (num_msgs);
792     g_free (sink->messages);
793     sink->messages = g_new (GstOutputMessage, sink->n_messages);
794   }
795   msgs = sink->messages;
796
797   /* populate first num_buffers messages with output vectors for the buffers */
798   for (i = 0, mem = 0; i < num_buffers; ++i) {
799     size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
800     msgs[i].vectors = &vecs[mem];
801     msgs[i].num_vectors = mem_nums[i];
802     msgs[i].num_control_messages = 0;
803     msgs[i].control_messages = NULL;
804     msgs[i].address = clients[0]->addr;
805     mem += mem_nums[i];
806   }
807
808   /* FIXME: how about some locking? (there wasn't any before either, but..) */
809   sink->bytes_to_serve += size;
810
811   /* now copy the pre-filled num_buffer messages over to the next num_buffer
812    * messages for the next client, where we also change the target adddress */
813   for (i = 1; i < num_addr; ++i) {
814     for (j = 0; j < num_buffers; ++j) {
815       msgs[i * num_buffers + j] = msgs[j];
816       msgs[i * num_buffers + j].address = clients[i]->addr;
817     }
818   }
819
820   /* now send it! */
821   {
822     gboolean ret;
823
824     /* no IPv4 socket? Send it all from the IPv6 socket then.. */
825     if (sink->used_socket == NULL) {
826       ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
827           msgs, num_msgs);
828     } else {
829       guint num_msgs_v4 = num_buffers * num_addr_v4;
830       guint num_msgs_v6 = num_buffers * num_addr_v6;
831
832       /* our client list is sorted with IPv4 clients first and IPv6 ones last */
833       ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
834           msgs, num_msgs_v4);
835
836       if (!ret)
837         goto cancelled;
838
839       ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
840           msgs + num_msgs_v4, num_msgs_v6);
841     }
842
843     if (!ret)
844       goto cancelled;
845   }
846
847   flow_ret = GST_FLOW_OK;
848
849   /* now update stats */
850   g_mutex_lock (&sink->client_lock);
851
852   for (i = 0; i < num_addr; ++i) {
853     GstUDPClient *client = clients[i];
854
855     for (j = 0; j < num_buffers; ++j) {
856       gsize bytes_sent;
857
858       bytes_sent = msgs[i * num_buffers + j].bytes_sent;
859
860       client->bytes_sent += bytes_sent;
861       client->packets_sent++;
862       sink->bytes_served += bytes_sent;
863     }
864     gst_udp_client_unref (client);
865   }
866
867   g_mutex_unlock (&sink->client_lock);
868
869 out:
870
871   for (i = 0; i < mem; ++i)
872     gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
873
874   return flow_ret;
875
876 no_clients:
877   {
878     g_mutex_unlock (&sink->client_lock);
879     GST_LOG_OBJECT (sink, "no clients");
880     return GST_FLOW_OK;
881   }
882 cancelled:
883   {
884     GST_INFO_OBJECT (sink, "cancelled");
885     g_clear_error (&err);
886     flow_ret = GST_FLOW_FLUSHING;
887
888     g_mutex_lock (&sink->client_lock);
889     for (i = 0; i < num_addr; ++i)
890       gst_udp_client_unref (clients[i]);
891     g_mutex_unlock (&sink->client_lock);
892     goto out;
893   }
894 }
895
896 static GstFlowReturn
897 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
898 {
899   GstMultiUDPSink *sink;
900   GstBuffer **buffers;
901   GstFlowReturn flow;
902   guint8 *mem_nums;
903   guint total_mems;
904   guint i, num_buffers;
905
906   sink = GST_MULTIUDPSINK_CAST (bsink);
907
908   num_buffers = gst_buffer_list_length (buffer_list);
909   if (num_buffers == 0)
910     goto no_data;
911
912   buffers = g_newa (GstBuffer *, num_buffers);
913   mem_nums = g_newa (guint8, num_buffers);
914   for (i = 0, total_mems = 0; i < num_buffers; ++i) {
915     buffers[i] = gst_buffer_list_get (buffer_list, i);
916     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
917     total_mems += mem_nums[i];
918   }
919
920   flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
921       mem_nums, total_mems);
922
923   return flow;
924
925 no_data:
926   {
927     GST_LOG_OBJECT (sink, "empty buffer");
928     return GST_FLOW_OK;
929   }
930 }
931
932 static GstFlowReturn
933 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
934 {
935   GstMultiUDPSink *sink;
936   GstFlowReturn flow;
937   guint8 n_mem;
938
939   sink = GST_MULTIUDPSINK_CAST (bsink);
940
941   n_mem = gst_buffer_n_memory (buffer);
942
943   if (n_mem > 0)
944     flow = gst_multiudpsink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
945   else
946     flow = GST_FLOW_OK;
947
948   return flow;
949 }
950
951 static void
952 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
953     const gchar * string)
954 {
955   gchar **clients;
956   gint i;
957
958   clients = g_strsplit (string, ",", 0);
959
960   g_mutex_lock (&sink->client_lock);
961   /* clear all existing clients */
962   gst_multiudpsink_clear_internal (sink, FALSE);
963   for (i = 0; clients[i]; i++) {
964     gchar *host, *p;
965     gint64 port = 0;
966
967     host = clients[i];
968     p = strstr (clients[i], ":");
969     if (p != NULL) {
970       *p = '\0';
971       port = g_ascii_strtoll (p + 1, NULL, 10);
972     }
973     if (port != 0)
974       gst_multiudpsink_add_internal (sink, host, port, FALSE);
975   }
976   g_mutex_unlock (&sink->client_lock);
977
978   g_strfreev (clients);
979 }
980
981 static gchar *
982 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
983 {
984   GString *str;
985   GList *clients;
986
987   str = g_string_new ("");
988
989   g_mutex_lock (&sink->client_lock);
990   clients = sink->clients;
991   while (clients) {
992     GstUDPClient *client;
993     gint count;
994
995     client = (GstUDPClient *) clients->data;
996
997     clients = g_list_next (clients);
998
999     count = client->add_count;
1000     while (count--) {
1001       g_string_append_printf (str, "%s:%d%s", client->host, client->port,
1002           (clients || count > 1 ? "," : ""));
1003     }
1004   }
1005   g_mutex_unlock (&sink->client_lock);
1006
1007   return g_string_free (str, FALSE);
1008 }
1009
1010 static void
1011 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
1012 {
1013   /* don't touch on -1 */
1014   if (sink->qos_dscp < 0)
1015     return;
1016
1017   if (socket == NULL)
1018     return;
1019
1020 #ifdef IP_TOS
1021   {
1022     gint tos;
1023     gint fd;
1024
1025     fd = g_socket_get_fd (socket);
1026
1027     GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
1028
1029     /* Extract and shift 6 bits of DSFIELD */
1030     tos = (sink->qos_dscp & 0x3f) << 2;
1031
1032     if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
1033       GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno));
1034     }
1035 #ifdef IPV6_TCLASS
1036     if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) {
1037       GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno));
1038     }
1039 #endif
1040   }
1041 #endif
1042 }
1043
1044 static void
1045 gst_multiudpsink_set_property (GObject * object, guint prop_id,
1046     const GValue * value, GParamSpec * pspec)
1047 {
1048   GstMultiUDPSink *udpsink;
1049
1050   udpsink = GST_MULTIUDPSINK (object);
1051
1052   switch (prop_id) {
1053     case PROP_SOCKET:
1054       if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
1055           udpsink->close_socket) {
1056         GError *err = NULL;
1057
1058         if (!g_socket_close (udpsink->socket, &err)) {
1059           GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
1060               err->message);
1061           g_clear_error (&err);
1062         }
1063       }
1064       if (udpsink->socket)
1065         g_object_unref (udpsink->socket);
1066       udpsink->socket = g_value_dup_object (value);
1067       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
1068       break;
1069     case PROP_SOCKET_V6:
1070       if (udpsink->socket_v6 != NULL
1071           && udpsink->socket_v6 != udpsink->used_socket_v6
1072           && udpsink->close_socket) {
1073         GError *err = NULL;
1074
1075         if (!g_socket_close (udpsink->socket_v6, &err)) {
1076           GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
1077               err->message);
1078           g_clear_error (&err);
1079         }
1080       }
1081       if (udpsink->socket_v6)
1082         g_object_unref (udpsink->socket_v6);
1083       udpsink->socket_v6 = g_value_dup_object (value);
1084       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
1085       break;
1086     case PROP_CLOSE_SOCKET:
1087       udpsink->close_socket = g_value_get_boolean (value);
1088       break;
1089     case PROP_CLIENTS:
1090       gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
1091       break;
1092     case PROP_AUTO_MULTICAST:
1093       udpsink->auto_multicast = g_value_get_boolean (value);
1094       break;
1095     case PROP_MULTICAST_IFACE:
1096       g_free (udpsink->multi_iface);
1097
1098       if (g_value_get_string (value) == NULL)
1099         udpsink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1100       else
1101         udpsink->multi_iface = g_value_dup_string (value);
1102       break;
1103     case PROP_TTL:
1104       udpsink->ttl = g_value_get_int (value);
1105       break;
1106     case PROP_TTL_MC:
1107       udpsink->ttl_mc = g_value_get_int (value);
1108       break;
1109     case PROP_LOOP:
1110       udpsink->loop = g_value_get_boolean (value);
1111       break;
1112     case PROP_FORCE_IPV4:
1113       udpsink->force_ipv4 = g_value_get_boolean (value);
1114       break;
1115     case PROP_QOS_DSCP:
1116       udpsink->qos_dscp = g_value_get_int (value);
1117       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
1118       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
1119       break;
1120     case PROP_SEND_DUPLICATES:
1121       udpsink->send_duplicates = g_value_get_boolean (value);
1122       break;
1123     case PROP_BUFFER_SIZE:
1124       udpsink->buffer_size = g_value_get_int (value);
1125       break;
1126     case PROP_BIND_ADDRESS:
1127       g_free (udpsink->bind_address);
1128       udpsink->bind_address = g_value_dup_string (value);
1129       break;
1130     case PROP_BIND_PORT:
1131       udpsink->bind_port = g_value_get_int (value);
1132       break;
1133     default:
1134       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1135       break;
1136   }
1137 }
1138
1139 static void
1140 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
1141     GParamSpec * pspec)
1142 {
1143   GstMultiUDPSink *udpsink;
1144
1145   udpsink = GST_MULTIUDPSINK (object);
1146
1147   switch (prop_id) {
1148     case PROP_BYTES_TO_SERVE:
1149       g_value_set_uint64 (value, udpsink->bytes_to_serve);
1150       break;
1151     case PROP_BYTES_SERVED:
1152       g_value_set_uint64 (value, udpsink->bytes_served);
1153       break;
1154     case PROP_SOCKET:
1155       g_value_set_object (value, udpsink->socket);
1156       break;
1157     case PROP_SOCKET_V6:
1158       g_value_set_object (value, udpsink->socket_v6);
1159       break;
1160     case PROP_CLOSE_SOCKET:
1161       g_value_set_boolean (value, udpsink->close_socket);
1162       break;
1163     case PROP_USED_SOCKET:
1164       g_value_set_object (value, udpsink->used_socket);
1165       break;
1166     case PROP_USED_SOCKET_V6:
1167       g_value_set_object (value, udpsink->used_socket_v6);
1168       break;
1169     case PROP_CLIENTS:
1170       g_value_take_string (value,
1171           gst_multiudpsink_get_clients_string (udpsink));
1172       break;
1173     case PROP_AUTO_MULTICAST:
1174       g_value_set_boolean (value, udpsink->auto_multicast);
1175       break;
1176     case PROP_MULTICAST_IFACE:
1177       g_value_set_string (value, udpsink->multi_iface);
1178       break;
1179     case PROP_TTL:
1180       g_value_set_int (value, udpsink->ttl);
1181       break;
1182     case PROP_TTL_MC:
1183       g_value_set_int (value, udpsink->ttl_mc);
1184       break;
1185     case PROP_LOOP:
1186       g_value_set_boolean (value, udpsink->loop);
1187       break;
1188     case PROP_FORCE_IPV4:
1189       g_value_set_boolean (value, udpsink->force_ipv4);
1190       break;
1191     case PROP_QOS_DSCP:
1192       g_value_set_int (value, udpsink->qos_dscp);
1193       break;
1194     case PROP_SEND_DUPLICATES:
1195       g_value_set_boolean (value, udpsink->send_duplicates);
1196       break;
1197     case PROP_BUFFER_SIZE:
1198       g_value_set_int (value, udpsink->buffer_size);
1199       break;
1200     case PROP_BIND_ADDRESS:
1201       g_value_set_string (value, udpsink->bind_address);
1202       break;
1203     case PROP_BIND_PORT:
1204       g_value_set_int (value, udpsink->bind_port);
1205       break;
1206     default:
1207       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1208       break;
1209   }
1210 }
1211
1212 static gboolean
1213 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
1214     GstUDPClient * client)
1215 {
1216   GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1217   GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1218   GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
1219   GSocket *socket;
1220   GError *err = NULL;
1221
1222   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
1223
1224   if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
1225     goto invalid_family;
1226
1227   /* Select socket to send from for this address */
1228   if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1229     socket = sink->used_socket_v6;
1230   else
1231     socket = sink->used_socket;
1232
1233   if (g_inet_address_get_is_multicast (addr)) {
1234     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
1235     if (sink->auto_multicast) {
1236       GST_DEBUG_OBJECT (sink, "autojoining group");
1237       if (!g_socket_join_multicast_group (socket, addr, FALSE,
1238               sink->multi_iface, &err))
1239         goto join_group_failed;
1240     }
1241     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
1242     g_socket_set_multicast_loopback (socket, sink->loop);
1243     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
1244     g_socket_set_multicast_ttl (socket, sink->ttl_mc);
1245   } else {
1246     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
1247     g_socket_set_ttl (socket, sink->ttl);
1248   }
1249   return TRUE;
1250
1251   /* ERRORS */
1252 join_group_failed:
1253   {
1254     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1255     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1256         ("Could not join multicast group: %s",
1257             err ? err->message : "unknown reason"));
1258     g_clear_error (&err);
1259     return FALSE;
1260   }
1261 invalid_family:
1262   {
1263     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1264     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1265         ("Invalid address family (got %d)", family));
1266     return FALSE;
1267   }
1268 }
1269
1270 /* create a socket for sending to remote machine */
1271 static gboolean
1272 gst_multiudpsink_start (GstBaseSink * bsink)
1273 {
1274   GstMultiUDPSink *sink;
1275   GList *clients;
1276   GstUDPClient *client;
1277   GError *err = NULL;
1278
1279   sink = GST_MULTIUDPSINK (bsink);
1280
1281   sink->external_socket = FALSE;
1282
1283   if (sink->socket) {
1284     GST_DEBUG_OBJECT (sink, "using configured socket");
1285     if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) {
1286       sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket));
1287       sink->external_socket = TRUE;
1288     } else {
1289       sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
1290       sink->external_socket = TRUE;
1291     }
1292   }
1293
1294   if (sink->socket_v6) {
1295     GST_DEBUG_OBJECT (sink, "using configured IPv6 socket");
1296     g_return_val_if_fail (g_socket_get_family (sink->socket) !=
1297         G_SOCKET_FAMILY_IPV6, FALSE);
1298
1299     if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) {
1300       GST_ERROR_OBJECT (sink,
1301           "Provided different IPv6 sockets in socket and socket-v6 properties");
1302       return FALSE;
1303     }
1304
1305     sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6));
1306     sink->external_socket = TRUE;
1307   }
1308
1309   if (!sink->used_socket && !sink->used_socket_v6) {
1310     GSocketAddress *bind_addr;
1311     GInetAddress *bind_iaddr;
1312
1313     if (sink->bind_address) {
1314       GSocketFamily family;
1315
1316       bind_iaddr = g_inet_address_new_from_string (sink->bind_address);
1317       if (!bind_iaddr) {
1318         GList *results;
1319         GResolver *resolver;
1320
1321         resolver = g_resolver_get_default ();
1322         results =
1323             g_resolver_lookup_by_name (resolver, sink->bind_address,
1324             sink->cancellable, &err);
1325         if (!results) {
1326           g_object_unref (resolver);
1327           goto name_resolve;
1328         }
1329         bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
1330         g_resolver_free_addresses (results);
1331         g_object_unref (resolver);
1332       }
1333
1334       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1335       g_object_unref (bind_iaddr);
1336       family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
1337
1338       if ((sink->used_socket =
1339               g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1340                   G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
1341         g_object_unref (bind_addr);
1342         goto no_socket;
1343       }
1344
1345       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1346       if (err != NULL)
1347         goto bind_error;
1348     } else {
1349       /* create sender sockets if none available */
1350       if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
1351                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1352         goto no_socket;
1353
1354       bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
1355       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1356       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1357       g_object_unref (bind_addr);
1358       g_object_unref (bind_iaddr);
1359       if (err != NULL)
1360         goto bind_error;
1361
1362       if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
1363                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
1364                   &err)) == NULL) {
1365         GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s",
1366             err->message);
1367         g_clear_error (&err);
1368       } else {
1369         bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
1370         bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1371         g_socket_bind (sink->used_socket_v6, bind_addr, TRUE, &err);
1372         g_object_unref (bind_addr);
1373         g_object_unref (bind_iaddr);
1374         if (err != NULL)
1375           goto bind_error;
1376       }
1377     }
1378   }
1379 #ifdef SO_SNDBUF
1380   {
1381     socklen_t len;
1382     gint sndsize, ret;
1383
1384     len = sizeof (sndsize);
1385     if (sink->buffer_size != 0) {
1386       sndsize = sink->buffer_size;
1387
1388       GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
1389       /* set buffer size, Note that on Linux this is typically limited to a
1390        * maximum of around 100K. Also a minimum of 128 bytes is required on
1391        * Linux. */
1392
1393       if (sink->used_socket) {
1394         ret =
1395             setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1396             SO_SNDBUF, (void *) &sndsize, len);
1397         if (ret != 0) {
1398           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1399               ("Could not create a buffer of requested %d bytes, %d: %s",
1400                   sndsize, ret, g_strerror (errno)));
1401         }
1402       }
1403
1404       if (sink->used_socket_v6) {
1405         ret =
1406             setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1407             SO_SNDBUF, (void *) &sndsize, len);
1408         if (ret != 0) {
1409           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1410               ("Could not create a buffer of requested %d bytes, %d: %s",
1411                   sndsize, ret, g_strerror (errno)));
1412         }
1413       }
1414     }
1415
1416     /* read the value of the receive buffer. Note that on linux this returns 2x the
1417      * value we set because the kernel allocates extra memory for metadata.
1418      * The default on Linux is about 100K (which is about 50K without metadata) */
1419     if (sink->used_socket) {
1420       ret =
1421           getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1422           SO_SNDBUF, (void *) &sndsize, &len);
1423       if (ret == 0)
1424         GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize);
1425       else
1426         GST_DEBUG_OBJECT (sink, "could not get UDP buffer size");
1427     }
1428
1429     if (sink->used_socket_v6) {
1430       ret =
1431           getsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1432           SO_SNDBUF, (void *) &sndsize, &len);
1433       if (ret == 0)
1434         GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize);
1435       else
1436         GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size");
1437     }
1438   }
1439 #endif
1440
1441 #ifdef SO_BINDTODEVICE
1442   if (sink->multi_iface) {
1443     if (sink->used_socket) {
1444       if (setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1445               SO_BINDTODEVICE, sink->multi_iface,
1446               strlen (sink->multi_iface)) < 0)
1447         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed: %s",
1448             strerror (errno));
1449     }
1450     if (sink->used_socket_v6) {
1451       if (setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1452               SO_BINDTODEVICE, sink->multi_iface,
1453               strlen (sink->multi_iface)) < 0)
1454         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed (v6): %s",
1455             strerror (errno));
1456     }
1457   }
1458 #endif
1459
1460   if (sink->used_socket)
1461     g_socket_set_broadcast (sink->used_socket, TRUE);
1462   if (sink->used_socket_v6)
1463     g_socket_set_broadcast (sink->used_socket_v6, TRUE);
1464
1465   sink->bytes_to_serve = 0;
1466   sink->bytes_served = 0;
1467
1468   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket);
1469   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6);
1470
1471   /* look for multicast clients and join multicast groups appropriately
1472      set also ttl and multicast loopback delivery appropriately  */
1473   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
1474     client = (GstUDPClient *) clients->data;
1475
1476     if (!gst_multiudpsink_configure_client (sink, client))
1477       return FALSE;
1478   }
1479   return TRUE;
1480
1481   /* ERRORS */
1482 no_socket:
1483   {
1484     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1485         ("Could not create socket: %s", err->message));
1486     g_clear_error (&err);
1487     return FALSE;
1488   }
1489 bind_error:
1490   {
1491     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1492         ("Failed to bind socket: %s", err->message));
1493     g_clear_error (&err);
1494     return FALSE;
1495   }
1496 name_resolve:
1497   {
1498     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1499         ("Failed to resolve bind address %s: %s", sink->bind_address,
1500             err->message));
1501     g_clear_error (&err);
1502     return FALSE;
1503   }
1504 }
1505
1506 static gboolean
1507 gst_multiudpsink_stop (GstBaseSink * bsink)
1508 {
1509   GstMultiUDPSink *udpsink;
1510
1511   udpsink = GST_MULTIUDPSINK (bsink);
1512
1513   if (udpsink->used_socket) {
1514     if (udpsink->close_socket || !udpsink->external_socket) {
1515       GError *err = NULL;
1516
1517       if (!g_socket_close (udpsink->used_socket, &err)) {
1518         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1519         g_clear_error (&err);
1520       }
1521     }
1522
1523     g_object_unref (udpsink->used_socket);
1524     udpsink->used_socket = NULL;
1525   }
1526
1527   if (udpsink->used_socket_v6) {
1528     if (udpsink->close_socket || !udpsink->external_socket) {
1529       GError *err = NULL;
1530
1531       if (!g_socket_close (udpsink->used_socket_v6, &err)) {
1532         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1533         g_clear_error (&err);
1534       }
1535     }
1536
1537     g_object_unref (udpsink->used_socket_v6);
1538     udpsink->used_socket_v6 = NULL;
1539   }
1540
1541   return TRUE;
1542 }
1543
1544 static gint
1545 gst_udp_client_compare_socket_family (GstUDPClient * a, GstUDPClient * b)
1546 {
1547   GSocketFamily fa = g_socket_address_get_family (a->addr);
1548   GSocketFamily fb = g_socket_address_get_family (b->addr);
1549
1550   if (fa == fb)
1551     return 0;
1552
1553   /* a should go before b */
1554   if (fa == G_SOCKET_FAMILY_IPV4 && fb == G_SOCKET_FAMILY_IPV6)
1555     return -1;
1556
1557   /* b should go before a */
1558   return 1;
1559 }
1560
1561 static void
1562 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1563     gint port, gboolean lock)
1564 {
1565   GSocketFamily family;
1566   GstUDPClient *client;
1567   GstUDPClient udpclient;
1568   GTimeVal now;
1569   GList *find;
1570
1571   udpclient.host = (gchar *) host;
1572   udpclient.port = port;
1573
1574   GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1575
1576   if (lock)
1577     g_mutex_lock (&sink->client_lock);
1578
1579   find = g_list_find_custom (sink->clients, &udpclient,
1580       (GCompareFunc) client_compare);
1581
1582   if (!find) {
1583     find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1584         (GCompareFunc) client_compare);
1585     if (find)
1586       gst_udp_client_ref (find->data);
1587   }
1588
1589   if (find) {
1590     client = (GstUDPClient *) find->data;
1591
1592     family = g_socket_address_get_family (client->addr);
1593
1594     GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1595         client->add_count, host, port);
1596   } else {
1597     client = gst_udp_client_new (sink, host, port);
1598     if (!client)
1599       goto error;
1600
1601     family = g_socket_address_get_family (client->addr);
1602
1603     g_get_current_time (&now);
1604     client->connect_time = GST_TIMEVAL_TO_TIME (now);
1605
1606     if (sink->used_socket)
1607       gst_multiudpsink_configure_client (sink, client);
1608
1609     GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1610
1611     /* keep IPv4 clients at the beginning, and IPv6 at the end, we can make
1612      * use of this in gst_multiudpsink_render_buffers() */
1613     sink->clients = g_list_insert_sorted (sink->clients, client,
1614         (GCompareFunc) gst_udp_client_compare_socket_family);
1615
1616     if (family == G_SOCKET_FAMILY_IPV4)
1617       ++sink->num_v4_unique;
1618     else
1619       ++sink->num_v6_unique;
1620   }
1621
1622   ++client->add_count;
1623
1624   if (family == G_SOCKET_FAMILY_IPV4)
1625     ++sink->num_v4_all;
1626   else
1627     ++sink->num_v6_all;
1628
1629   if (lock)
1630     g_mutex_unlock (&sink->client_lock);
1631
1632   g_signal_emit (G_OBJECT (sink),
1633       gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1634
1635   GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1636   return;
1637
1638   /* ERRORS */
1639 error:
1640   {
1641     GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1642         port);
1643     if (lock)
1644       g_mutex_unlock (&sink->client_lock);
1645     return;
1646   }
1647 }
1648
1649 void
1650 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1651 {
1652   gst_multiudpsink_add_internal (sink, host, port, TRUE);
1653 }
1654
1655 void
1656 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1657 {
1658   GSocketFamily family;
1659   GList *find;
1660   GstUDPClient udpclient;
1661   GstUDPClient *client;
1662   GTimeVal now;
1663
1664   udpclient.host = (gchar *) host;
1665   udpclient.port = port;
1666
1667   g_mutex_lock (&sink->client_lock);
1668   find = g_list_find_custom (sink->clients, &udpclient,
1669       (GCompareFunc) client_compare);
1670   if (!find)
1671     goto not_found;
1672
1673   client = (GstUDPClient *) find->data;
1674
1675   GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1676       client->add_count, host, port);
1677
1678   --client->add_count;
1679
1680   family = g_socket_address_get_family (client->addr);
1681   if (family == G_SOCKET_FAMILY_IPV4)
1682     --sink->num_v4_all;
1683   else
1684     --sink->num_v6_all;
1685
1686   if (client->add_count == 0) {
1687     GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1688     GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1689     GSocket *socket;
1690
1691     /* Select socket to send from for this address */
1692     if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1693       socket = sink->used_socket_v6;
1694     else
1695       socket = sink->used_socket;
1696
1697     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1698
1699     g_get_current_time (&now);
1700     client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
1701
1702     if (socket && sink->auto_multicast
1703         && g_inet_address_get_is_multicast (addr)) {
1704       GError *err = NULL;
1705
1706       if (!g_socket_leave_multicast_group (socket, addr, FALSE,
1707               sink->multi_iface, &err)) {
1708         GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s",
1709             err->message);
1710         g_clear_error (&err);
1711       }
1712     }
1713
1714     if (family == G_SOCKET_FAMILY_IPV4)
1715       --sink->num_v4_unique;
1716     else
1717       --sink->num_v6_unique;
1718
1719     /* Keep state consistent for streaming thread, so remove from client list,
1720      * but keep it around until after the signal has been emitted, in case a
1721      * callback wants to get stats for that client or so */
1722     sink->clients = g_list_delete_link (sink->clients, find);
1723
1724     sink->clients_to_be_removed =
1725         g_list_prepend (sink->clients_to_be_removed, client);
1726
1727     /* Unlock to emit signal before we delete the actual client */
1728     g_mutex_unlock (&sink->client_lock);
1729     g_signal_emit (G_OBJECT (sink),
1730         gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1731     g_mutex_lock (&sink->client_lock);
1732
1733     sink->clients_to_be_removed =
1734         g_list_remove (sink->clients_to_be_removed, client);
1735
1736     gst_udp_client_unref (client);
1737   }
1738   g_mutex_unlock (&sink->client_lock);
1739
1740   return;
1741
1742   /* ERRORS */
1743 not_found:
1744   {
1745     g_mutex_unlock (&sink->client_lock);
1746     GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1747         host, port);
1748     return;
1749   }
1750 }
1751
1752 static void
1753 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1754 {
1755   GST_DEBUG_OBJECT (sink, "clearing");
1756   /* we only need to remove the client structure, there is no additional
1757    * socket or anything to free for UDP */
1758   if (lock)
1759     g_mutex_lock (&sink->client_lock);
1760   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, sink);
1761   g_list_free (sink->clients);
1762   sink->clients = NULL;
1763   sink->num_v4_unique = 0;
1764   sink->num_v4_all = 0;
1765   sink->num_v6_unique = 0;
1766   sink->num_v6_all = 0;
1767   if (lock)
1768     g_mutex_unlock (&sink->client_lock);
1769 }
1770
1771 void
1772 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1773 {
1774   gst_multiudpsink_clear_internal (sink, TRUE);
1775 }
1776
1777 GstStructure *
1778 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1779     gint port)
1780 {
1781   GstUDPClient *client;
1782   GstStructure *result = NULL;
1783   GstUDPClient udpclient;
1784   GList *find;
1785
1786   udpclient.host = (gchar *) host;
1787   udpclient.port = port;
1788
1789   g_mutex_lock (&sink->client_lock);
1790
1791   find = g_list_find_custom (sink->clients, &udpclient,
1792       (GCompareFunc) client_compare);
1793
1794   if (!find)
1795     find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1796         (GCompareFunc) client_compare);
1797
1798   if (!find)
1799     goto not_found;
1800
1801   GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1802
1803   client = (GstUDPClient *) find->data;
1804
1805   result = gst_structure_new_empty ("multiudpsink-stats");
1806
1807   gst_structure_set (result,
1808       "bytes-sent", G_TYPE_UINT64, client->bytes_sent,
1809       "packets-sent", G_TYPE_UINT64, client->packets_sent,
1810       "connect-time", G_TYPE_UINT64, client->connect_time,
1811       "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL);
1812
1813   g_mutex_unlock (&sink->client_lock);
1814
1815   return result;
1816
1817   /* ERRORS */
1818 not_found:
1819   {
1820     g_mutex_unlock (&sink->client_lock);
1821     GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1822         host, port);
1823     /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1824      * confuse/break python bindings */
1825     return gst_structure_new_empty ("multiudpsink-stats");
1826   }
1827 }
1828
1829 static gboolean
1830 gst_multiudpsink_unlock (GstBaseSink * bsink)
1831 {
1832   GstMultiUDPSink *sink;
1833
1834   sink = GST_MULTIUDPSINK (bsink);
1835
1836   g_cancellable_cancel (sink->cancellable);
1837
1838   return TRUE;
1839 }
1840
1841 static gboolean
1842 gst_multiudpsink_unlock_stop (GstBaseSink * bsink)
1843 {
1844   GstMultiUDPSink *sink;
1845
1846   sink = GST_MULTIUDPSINK (bsink);
1847
1848   g_cancellable_reset (sink->cancellable);
1849
1850   return TRUE;
1851 }