multiudpsink: fix client count after removal
[platform/upstream/gst-plugins-good.git] / gst / udp / gstmultiudpsink.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4  * Copyright (C) <2012> Collabora Ltd.
5  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-multiudpsink
25  * @see_also: udpsink, multifdsink
26  *
27  * multiudpsink is a network sink that sends UDP packets to multiple
28  * clients.
29  * It can be combined with rtp payload encoders to implement RTP streaming.
30  */
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 #include "gstmultiudpsink.h"
36
37 #include <string.h>
38 #ifdef HAVE_SYS_SOCKET_H
39 #include <sys/socket.h>
40 #endif
41
42 #ifndef G_OS_WIN32
43 #include <netinet/in.h>
44 #endif
45
46 #include "gst/glib-compat-private.h"
47
48 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
49 #define GST_CAT_DEFAULT (multiudpsink_debug)
50
51 #define UDP_MAX_SIZE 65507
52
53 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
54     GST_PAD_SINK,
55     GST_PAD_ALWAYS,
56     GST_STATIC_CAPS_ANY);
57
58 /* MultiUDPSink signals and args */
59 enum
60 {
61   /* methods */
62   SIGNAL_ADD,
63   SIGNAL_REMOVE,
64   SIGNAL_CLEAR,
65   SIGNAL_GET_STATS,
66
67   /* signals */
68   SIGNAL_CLIENT_ADDED,
69   SIGNAL_CLIENT_REMOVED,
70
71   /* FILL ME */
72   LAST_SIGNAL
73 };
74
75 #define DEFAULT_SOCKET             NULL
76 #define DEFAULT_CLOSE_SOCKET       TRUE
77 #define DEFAULT_USED_SOCKET        NULL
78 #define DEFAULT_CLIENTS            NULL
79 /* FIXME, this should be disabled by default, we don't need to join a multicast
80  * group for sending, if this socket is also used for receiving, it should
81  * be configured in the element that does the receive. */
82 #define DEFAULT_AUTO_MULTICAST     TRUE
83 #define DEFAULT_MULTICAST_IFACE    NULL
84 #define DEFAULT_TTL                64
85 #define DEFAULT_TTL_MC             1
86 #define DEFAULT_LOOP               TRUE
87 #define DEFAULT_FORCE_IPV4         FALSE
88 #define DEFAULT_QOS_DSCP           -1
89 #define DEFAULT_SEND_DUPLICATES    TRUE
90 #define DEFAULT_BUFFER_SIZE        0
91 #define DEFAULT_BIND_ADDRESS       NULL
92 #define DEFAULT_BIND_PORT          0
93
94 enum
95 {
96   PROP_0,
97   PROP_BYTES_TO_SERVE,
98   PROP_BYTES_SERVED,
99   PROP_SOCKET,
100   PROP_SOCKET_V6,
101   PROP_CLOSE_SOCKET,
102   PROP_USED_SOCKET,
103   PROP_USED_SOCKET_V6,
104   PROP_CLIENTS,
105   PROP_AUTO_MULTICAST,
106   PROP_MULTICAST_IFACE,
107   PROP_TTL,
108   PROP_TTL_MC,
109   PROP_LOOP,
110   PROP_FORCE_IPV4,
111   PROP_QOS_DSCP,
112   PROP_SEND_DUPLICATES,
113   PROP_BUFFER_SIZE,
114   PROP_BIND_ADDRESS,
115   PROP_BIND_PORT,
116   PROP_LAST
117 };
118
119 static void gst_multiudpsink_finalize (GObject * object);
120
121 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
122     GstBuffer * buffer);
123 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
124     GstBufferList * buffer_list);
125
126 static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
127 static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
128 static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink);
129 static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink);
130
131 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
132     const GValue * value, GParamSpec * pspec);
133 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
134     GValue * value, GParamSpec * pspec);
135
136 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
137     const gchar * host, gint port, gboolean lock);
138 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
139     gboolean lock);
140
141 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
142
143 #define gst_multiudpsink_parent_class parent_class
144 G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
145
146 static void
147 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
148 {
149   GObjectClass *gobject_class;
150   GstElementClass *gstelement_class;
151   GstBaseSinkClass *gstbasesink_class;
152
153   gobject_class = (GObjectClass *) klass;
154   gstelement_class = (GstElementClass *) klass;
155   gstbasesink_class = (GstBaseSinkClass *) klass;
156
157   gobject_class->set_property = gst_multiudpsink_set_property;
158   gobject_class->get_property = gst_multiudpsink_get_property;
159   gobject_class->finalize = gst_multiudpsink_finalize;
160
161   /**
162    * GstMultiUDPSink::add:
163    * @gstmultiudpsink: the sink on which the signal is emitted
164    * @host: the hostname/IP address of the client to add
165    * @port: the port of the client to add
166    *
167    * Add a client with destination @host and @port to the list of
168    * clients. When the same host/port pair is added multiple times, the
169    * send-duplicates property defines if the packets are sent multiple times to
170    * the same host/port pair or not.
171    *
172    * When a host/port pair is added multiple times, an equal amount of remove
173    * calls must be performed to actually remove the host/port pair from the list
174    * of destinations.
175    */
176   gst_multiudpsink_signals[SIGNAL_ADD] =
177       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
178       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
179       G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
180       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
181       G_TYPE_STRING, G_TYPE_INT);
182   /**
183    * GstMultiUDPSink::remove:
184    * @gstmultiudpsink: the sink on which the signal is emitted
185    * @host: the hostname/IP address of the client to remove
186    * @port: the port of the client to remove
187    *
188    * Remove the client with destination @host and @port from the list of
189    * clients.
190    */
191   gst_multiudpsink_signals[SIGNAL_REMOVE] =
192       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
193       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
194       G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
195       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
196       G_TYPE_STRING, G_TYPE_INT);
197   /**
198    * GstMultiUDPSink::clear:
199    * @gstmultiudpsink: the sink on which the signal is emitted
200    *
201    * Clear the list of clients.
202    */
203   gst_multiudpsink_signals[SIGNAL_CLEAR] =
204       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
206       G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
207       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0);
208   /**
209    * GstMultiUDPSink::get-stats:
210    * @gstmultiudpsink: the sink on which the signal is emitted
211    * @host: the hostname/IP address of the client to get stats on
212    * @port: the port of the client to get stats on
213    *
214    * Get the statistics of the client with destination @host and @port.
215    *
216    * Returns: a GstStructure: bytes_sent, packets_sent,
217    *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
218    */
219   gst_multiudpsink_signals[SIGNAL_GET_STATS] =
220       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
221       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
222       G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
223       NULL, NULL, g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 2,
224       G_TYPE_STRING, G_TYPE_INT);
225   /**
226    * GstMultiUDPSink::client-added:
227    * @gstmultiudpsink: the sink emitting the signal
228    * @host: the hostname/IP address of the added client
229    * @port: the port of the added client
230    *
231    * Signal emited when a new client is added to the list of
232    * clients.
233    */
234   gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
235       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
237       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2,
238       G_TYPE_STRING, G_TYPE_INT);
239   /**
240    * GstMultiUDPSink::client-removed:
241    * @gstmultiudpsink: the sink emitting the signal
242    * @host: the hostname/IP address of the removed client
243    * @port: the port of the removed client
244    *
245    * Signal emited when a client is removed from the list of
246    * clients.
247    */
248   gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
249       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
250       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
251           client_removed), NULL, NULL, g_cclosure_marshal_generic,
252       G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
253
254   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
255       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
256           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
257           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
258   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
259       g_param_spec_uint64 ("bytes-served", "Bytes served",
260           "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0,
261           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_SOCKET,
263       g_param_spec_object ("socket", "Socket Handle",
264           "Socket to use for UDP sending. (NULL == allocate)",
265           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
267       g_param_spec_object ("socket-v6", "Socket Handle IPv6",
268           "Socket to use for UDPv6 sending. (NULL == allocate)",
269           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
271       g_param_spec_boolean ("close-socket", "Close socket",
272           "Close socket if passed as property on state change",
273           DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
274   g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
275       g_param_spec_object ("used-socket", "Used Socket Handle",
276           "Socket currently in use for UDP sending. (NULL == no socket)",
277           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
278   g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
279       g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
280           "Socket currently in use for UDPv6 sending. (NULL == no socket)",
281           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
282   g_object_class_install_property (gobject_class, PROP_CLIENTS,
283       g_param_spec_string ("clients", "Clients",
284           "A comma separated list of host:port pairs with destinations",
285           DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
287       g_param_spec_boolean ("auto-multicast",
288           "Automatically join/leave multicast groups",
289           "Automatically join/leave the multicast groups, FALSE means user"
290           " has to do it himself", DEFAULT_AUTO_MULTICAST,
291           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
293       g_param_spec_string ("multicast-iface", "Multicast Interface",
294           "The network interface on which to join the multicast group",
295           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296   g_object_class_install_property (gobject_class, PROP_TTL,
297       g_param_spec_int ("ttl", "Unicast TTL",
298           "Used for setting the unicast TTL parameter",
299           0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   g_object_class_install_property (gobject_class, PROP_TTL_MC,
301       g_param_spec_int ("ttl-mc", "Multicast TTL",
302           "Used for setting the multicast TTL parameter",
303           0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304   g_object_class_install_property (gobject_class, PROP_LOOP,
305       g_param_spec_boolean ("loop", "Multicast Loopback",
306           "Used for setting the multicast loop parameter. TRUE = enable,"
307           " FALSE = disable", DEFAULT_LOOP,
308           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
309   /**
310    * GstMultiUDPSink::force-ipv4:
311    *
312    * Force the use of an IPv4 socket.
313    *
314    * Since: 1.0.2
315    */
316 #ifndef GST_REMOVE_DEPRECATED
317   g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
318       g_param_spec_boolean ("force-ipv4", "Force IPv4",
319           "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
320           DEFAULT_FORCE_IPV4,
321           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
322 #endif
323   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
324       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
325           "Quality of Service, differentiated services code point (-1 default)",
326           -1, 63, DEFAULT_QOS_DSCP,
327           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328   /**
329    * GstMultiUDPSink::send-duplicates:
330    *
331    * When a host/port pair is added mutliple times, send the packet to the host
332    * multiple times as well.
333    */
334   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
335       g_param_spec_boolean ("send-duplicates", "Send Duplicates",
336           "When a distination/port pair is added multiple times, send packets "
337           "multiple times as well", DEFAULT_SEND_DUPLICATES,
338           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339
340   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
341       g_param_spec_int ("buffer-size", "Buffer Size",
342           "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
343           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345   g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
346       g_param_spec_string ("bind-address", "Bind Address",
347           "Address to bind the socket to", DEFAULT_BIND_ADDRESS,
348           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_BIND_PORT,
350       g_param_spec_int ("bind-port", "Bind Port",
351           "Port to bind the socket to", 0, G_MAXUINT16,
352           DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353
354   gst_element_class_add_pad_template (gstelement_class,
355       gst_static_pad_template_get (&sink_template));
356
357   gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
358       "Sink/Network",
359       "Send data over the network via UDP to one or multiple recipients "
360       "which can be added or removed at runtime using action signals",
361       "Wim Taymans <wim.taymans@gmail.com>");
362
363   gstbasesink_class->render = gst_multiudpsink_render;
364   gstbasesink_class->render_list = gst_multiudpsink_render_list;
365   gstbasesink_class->start = gst_multiudpsink_start;
366   gstbasesink_class->stop = gst_multiudpsink_stop;
367   gstbasesink_class->unlock = gst_multiudpsink_unlock;
368   gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop;
369   klass->add = gst_multiudpsink_add;
370   klass->remove = gst_multiudpsink_remove;
371   klass->clear = gst_multiudpsink_clear;
372   klass->get_stats = gst_multiudpsink_get_stats;
373
374   GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
375 }
376
377
378 static void
379 gst_multiudpsink_init (GstMultiUDPSink * sink)
380 {
381   guint max_mem;
382
383   g_mutex_init (&sink->client_lock);
384   sink->clients = NULL;
385   sink->num_v4_unique = 0;
386   sink->num_v4_all = 0;
387   sink->num_v6_unique = 0;
388   sink->num_v6_all = 0;
389
390   sink->socket = DEFAULT_SOCKET;
391   sink->socket_v6 = DEFAULT_SOCKET;
392   sink->used_socket = DEFAULT_USED_SOCKET;
393   sink->used_socket_v6 = DEFAULT_USED_SOCKET;
394   sink->close_socket = DEFAULT_CLOSE_SOCKET;
395   sink->external_socket = (sink->socket != NULL);
396   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
397   sink->ttl = DEFAULT_TTL;
398   sink->ttl_mc = DEFAULT_TTL_MC;
399   sink->loop = DEFAULT_LOOP;
400   sink->force_ipv4 = DEFAULT_FORCE_IPV4;
401   sink->qos_dscp = DEFAULT_QOS_DSCP;
402   sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
403   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
404
405   sink->cancellable = g_cancellable_new ();
406
407   /* allocate OutputVector and MapInfo for use in the render function, buffers can
408    * hold up to a maximum amount of memory so we can create a maximally sized
409    * array for them.  */
410   max_mem = gst_buffer_get_max_memory ();
411
412   sink->vec = g_new (GOutputVector, max_mem);
413   sink->map = g_new (GstMapInfo, max_mem);
414
415   /* we assume that the number of memories per buffer can fit into a guint8 */
416   g_warn_if_fail (max_mem <= G_MAXUINT8);
417 }
418
419 static GstUDPClient *
420 gst_udp_client_new (GstMultiUDPSink * sink, const gchar * host, gint port)
421 {
422   GstUDPClient *client;
423   GInetAddress *addr;
424   GResolver *resolver;
425   GError *err = NULL;
426
427   addr = g_inet_address_new_from_string (host);
428   if (!addr) {
429     GList *results;
430
431     resolver = g_resolver_get_default ();
432     results =
433         g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err);
434     if (!results)
435       goto name_resolve;
436     addr = G_INET_ADDRESS (g_object_ref (results->data));
437
438     g_resolver_free_addresses (results);
439     g_object_unref (resolver);
440   }
441 #ifndef GST_DISABLE_GST_DEBUG
442   {
443     gchar *ip = g_inet_address_to_string (addr);
444
445     GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip);
446     g_free (ip);
447   }
448 #endif
449
450   client = g_slice_new0 (GstUDPClient);
451   client->ref_count = 1;
452   client->add_count = 0;
453   client->host = g_strdup (host);
454   client->port = port;
455   client->addr = g_inet_socket_address_new (addr, port);
456   g_object_unref (addr);
457
458   return client;
459
460 name_resolve:
461   {
462     g_object_unref (resolver);
463
464     return NULL;
465   }
466 }
467
468 /* call with client lock held */
469 static void
470 gst_udp_client_unref (GstUDPClient * client)
471 {
472   if (--client->ref_count == 0) {
473     g_object_unref (client->addr);
474     g_free (client->host);
475     g_slice_free (GstUDPClient, client);
476   }
477 }
478
479 /* call with client lock held */
480 static inline GstUDPClient *
481 gst_udp_client_ref (GstUDPClient * client)
482 {
483   ++client->ref_count;
484   return client;
485 }
486
487 static gint
488 client_compare (GstUDPClient * a, GstUDPClient * b)
489 {
490   if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
491     return 0;
492
493   return 1;
494 }
495
496 static void
497 gst_multiudpsink_finalize (GObject * object)
498 {
499   GstMultiUDPSink *sink;
500
501   sink = GST_MULTIUDPSINK (object);
502
503   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, NULL);
504   g_list_free (sink->clients);
505
506   if (sink->socket)
507     g_object_unref (sink->socket);
508   sink->socket = NULL;
509
510   if (sink->socket_v6)
511     g_object_unref (sink->socket_v6);
512   sink->socket_v6 = NULL;
513
514   if (sink->used_socket)
515     g_object_unref (sink->used_socket);
516   sink->used_socket = NULL;
517
518   if (sink->used_socket_v6)
519     g_object_unref (sink->used_socket_v6);
520   sink->used_socket_v6 = NULL;
521
522   if (sink->cancellable)
523     g_object_unref (sink->cancellable);
524   sink->cancellable = NULL;
525
526   g_free (sink->multi_iface);
527   sink->multi_iface = NULL;
528
529   g_free (sink->vec);
530   sink->vec = NULL;
531   g_free (sink->map);
532   sink->map = NULL;
533
534   g_free (sink->bind_address);
535   sink->bind_address = NULL;
536
537   g_mutex_clear (&sink->client_lock);
538
539   G_OBJECT_CLASS (parent_class)->finalize (object);
540 }
541
542 /* replacement until we can depend unconditionally on the real one in GLib */
543 #ifndef HAVE_G_SOCKET_SEND_MESSAGES
544 #define g_socket_send_messages gst_socket_send_messages
545
546 static gint
547 gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages,
548     guint num_messages, gint flags, GCancellable * cancellable, GError ** error)
549 {
550   gssize result;
551   gint i;
552
553   for (i = 0; i < num_messages; ++i) {
554     GstOutputMessage *msg = &messages[i];
555     GError *msg_error = NULL;
556
557     result = g_socket_send_message (socket, msg->address,
558         msg->vectors, msg->num_vectors,
559         msg->control_messages, msg->num_control_messages,
560         flags, cancellable, &msg_error);
561
562     if (result < 0) {
563       /* if we couldn't send all messages, just return how many we did
564        * manage to send, provided we managed to send at least one */
565       if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) {
566         g_error_free (msg_error);
567         return i;
568       } else {
569         g_propagate_error (error, msg_error);
570         return -1;
571       }
572     }
573
574     msg->bytes_sent = result;
575   }
576
577   return i;
578 }
579 #endif /* HAVE_G_SOCKET_SEND_MESSAGES */
580
581 static gsize
582 fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
583 {
584   GstMemory *mem;
585   gsize size = 0;
586   guint i;
587
588   g_assert (gst_buffer_n_memory (buf) == n);
589
590   for (i = 0; i < n; ++i) {
591     mem = gst_buffer_peek_memory (buf, i);
592     if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
593       vecs[i].buffer = maps[i].data;
594       vecs[i].size = maps[i].size;
595     } else {
596       GST_WARNING ("Failed to map memory %p for reading", mem);
597       vecs[i].buffer = "";
598       vecs[i].size = 0;
599     }
600     size += vecs[i].size;
601   }
602
603   return size;
604 }
605
606 static gsize
607 gst_udp_calc_message_size (GstOutputMessage * msg)
608 {
609   gsize size = 0;
610   guint i;
611
612   for (i = 0; i < msg->num_vectors; ++i)
613     size += msg->vectors[i].size;
614
615   return size;
616 }
617
618 static gint
619 gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
620     guint num_messages)
621 {
622   guint i;
623
624   for (i = 0; i < num_messages; ++i) {
625     GstOutputMessage *msg = &messages[i];
626
627     if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
628       return i;
629   }
630
631   return -1;
632 }
633
634 static inline gchar *
635 gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
636 {
637   GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
638   GInetAddress *ia;
639   gchar *addr_str;
640
641   ia = g_inet_socket_address_get_address (isa);
642   addr_str = g_inet_address_to_string (ia);
643   g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
644   g_free (addr_str);
645   g_object_unref (ia);
646
647   return s;
648 }
649
650 /* Wrapper around g_socket_send_messages() plus error handling (ignoring).
651  * Returns FALSE if we got cancelled, otherwise TRUE. */
652 static gboolean
653 gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
654     GstOutputMessage * messages, guint num_messages)
655 {
656   gboolean sent_max_size_warning = FALSE;
657
658   while (num_messages > 0) {
659     gchar astr[64] G_GNUC_UNUSED;
660     GError *err = NULL;
661     guint msg_size, skip, i;
662     gint ret, err_idx;
663
664     ret = g_socket_send_messages (socket, messages, num_messages, 0,
665         sink->cancellable, &err);
666
667     if (G_UNLIKELY (ret < 0)) {
668       GstOutputMessage *msg;
669
670       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
671         g_clear_error (&err);
672         return FALSE;
673       }
674
675       err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
676       if (err_idx < 0)
677         break;
678
679       msg = &messages[err_idx];
680       msg_size = gst_udp_calc_message_size (msg);
681
682       GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
683           gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
684           err->message);
685
686       skip = 1;
687       if (msg_size > UDP_MAX_SIZE) {
688         if (!sent_max_size_warning) {
689           GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
690               ("Attempting to send a UDP packets larger than maximum size "
691                   "(%u > %d)", msg_size, UDP_MAX_SIZE),
692               ("Reason: %s", err ? err->message : "unknown reason"));
693           sent_max_size_warning = FALSE;
694         }
695       } else {
696         GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
697             ("Error sending UDP packets"), ("client %s, reason: %s",
698                 gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
699                 (err != NULL) ? err->message : "unknown reason"));
700
701         for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
702           if (messages[i].address != msg->address)
703             break;
704         }
705         GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
706       }
707
708       /* ignore any errors and try sending the rest */
709       g_clear_error (&err);
710       ret = skip;
711     }
712
713     g_assert (ret <= num_messages);
714
715     messages += ret;
716     num_messages -= ret;
717   }
718
719   return TRUE;
720 }
721
722 static GstFlowReturn
723 gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
724     guint num_buffers, guint8 * mem_nums, guint total_mem_num)
725 {
726   GstOutputMessage *msgs;
727   gboolean send_duplicates;
728   GstUDPClient **clients;
729   GOutputVector *vecs;
730   GstMapInfo *map_infos;
731   GstFlowReturn flow_ret;
732   guint num_addr_v4, num_addr_v6;
733   guint num_addr, num_msgs;
734   GError *err = NULL;
735   guint i, j, mem;
736   gsize size = 0;
737   GList *l;
738
739   send_duplicates = sink->send_duplicates;
740
741   g_mutex_lock (&sink->client_lock);
742
743   if (send_duplicates) {
744     num_addr_v4 = sink->num_v4_all;
745     num_addr_v6 = sink->num_v6_all;
746   } else {
747     num_addr_v4 = sink->num_v4_unique;
748     num_addr_v6 = sink->num_v6_unique;
749   }
750   num_addr = num_addr_v4 + num_addr_v6;
751
752   if (num_addr == 0)
753     goto no_clients;
754
755   clients = g_newa (GstUDPClient *, num_addr);
756   for (l = sink->clients, i = 0; l != NULL; l = l->next) {
757     GstUDPClient *client = l->data;
758
759     clients[i++] = gst_udp_client_ref (client);
760     for (j = 1; send_duplicates && j < client->add_count; ++j)
761       clients[i++] = gst_udp_client_ref (client);
762   }
763   g_assert_cmpuint (i, ==, num_addr);
764
765   g_mutex_unlock (&sink->client_lock);
766
767   GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
768       num_buffers, total_mem_num, num_addr);
769
770   vecs = g_newa (GOutputVector, total_mem_num);
771   map_infos = g_newa (GstMapInfo, total_mem_num);
772
773   num_msgs = num_addr * num_buffers;
774   msgs = g_newa (GstOutputMessage, num_msgs);
775
776   /* populate first num_buffers messages with output vectors for the buffers */
777   for (i = 0, mem = 0; i < num_buffers; ++i) {
778     size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
779     msgs[i].vectors = &vecs[mem];
780     msgs[i].num_vectors = mem_nums[i];
781     msgs[i].num_control_messages = 0;
782     msgs[i].control_messages = NULL;
783     msgs[i].address = clients[0]->addr;
784     mem += mem_nums[i];
785   }
786
787   /* FIXME: how about some locking? (there wasn't any before either, but..) */
788   sink->bytes_to_serve += size;
789
790   /* now copy the pre-filled num_buffer messages over to the next num_buffer
791    * messages for the next client, where we also change the target adddress */
792   for (i = 1; i < num_addr; ++i) {
793     for (j = 0; j < num_buffers; ++j) {
794       msgs[i * num_buffers + j] = msgs[j];
795       msgs[i * num_buffers + j].address = clients[i]->addr;
796     }
797   }
798
799   /* now send it! */
800   {
801     gboolean ret;
802
803     /* no IPv4 socket? Send it all from the IPv6 socket then.. */
804     if (sink->used_socket == NULL) {
805       ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
806           msgs, num_msgs);
807     } else {
808       guint num_msgs_v4 = num_buffers * num_addr_v4;
809       guint num_msgs_v6 = num_buffers * num_addr_v6;
810
811       /* our client list is sorted with IPv4 clients first and IPv6 ones last */
812       ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
813           msgs, num_msgs_v4);
814
815       if (!ret)
816         goto cancelled;
817
818       ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
819           msgs + num_msgs_v4, num_msgs_v6);
820     }
821
822     if (!ret)
823       goto cancelled;
824   }
825
826   flow_ret = GST_FLOW_OK;
827
828   /* now update stats */
829   g_mutex_lock (&sink->client_lock);
830
831   for (i = 0; i < num_addr; ++i) {
832     GstUDPClient *client = clients[i];
833
834     for (j = 0; j < num_buffers; ++j) {
835       gsize bytes_sent;
836
837       bytes_sent = msgs[i * num_buffers + j].bytes_sent;
838
839       client->bytes_sent += bytes_sent;
840       client->packets_sent++;
841       sink->bytes_served += bytes_sent;
842     }
843     gst_udp_client_unref (client);
844   }
845
846   g_mutex_unlock (&sink->client_lock);
847
848 out:
849
850   for (i = 0; i < mem; ++i)
851     gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
852
853   return flow_ret;;
854
855 no_clients:
856   {
857     g_mutex_unlock (&sink->client_lock);
858     GST_LOG_OBJECT (sink, "no clients");
859     return GST_FLOW_OK;
860   }
861 cancelled:
862   {
863     GST_INFO_OBJECT (sink, "cancelled");
864     g_clear_error (&err);
865     flow_ret = GST_FLOW_FLUSHING;
866
867     g_mutex_lock (&sink->client_lock);
868     for (i = 0; i < num_addr; ++i)
869       gst_udp_client_unref (clients[i]);
870     g_mutex_unlock (&sink->client_lock);
871     goto out;
872   }
873 }
874
875 static GstFlowReturn
876 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
877 {
878   GstMultiUDPSink *sink;
879   GstBuffer **buffers;
880   GstFlowReturn flow;
881   guint8 *mem_nums;
882   guint total_mems;
883   guint i, num_buffers;
884
885   sink = GST_MULTIUDPSINK_CAST (bsink);
886
887   num_buffers = gst_buffer_list_length (buffer_list);
888   if (num_buffers == 0)
889     goto no_data;
890
891   buffers = g_newa (GstBuffer *, num_buffers);
892   mem_nums = g_newa (guint8, num_buffers);
893   for (i = 0, total_mems = 0; i < num_buffers; ++i) {
894     buffers[i] = gst_buffer_list_get (buffer_list, i);
895     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
896     total_mems += mem_nums[i];
897   }
898
899   flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
900       mem_nums, total_mems);
901
902   return flow;
903
904 no_data:
905   {
906     GST_LOG_OBJECT (sink, "empty buffer");
907     return GST_FLOW_OK;
908   }
909 }
910
911 static GstFlowReturn
912 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
913 {
914   GstMultiUDPSink *sink;
915   GList *clients;
916   GOutputVector *vec;
917   GstMapInfo *map;
918   guint n_mem, i;
919   gsize size;
920   GstMemory *mem;
921   gint num, no_clients;
922   GError *err = NULL;
923
924   sink = GST_MULTIUDPSINK_CAST (bsink);
925
926   n_mem = gst_buffer_n_memory (buffer);
927   if (n_mem == 0)
928     goto no_data;
929
930   /* pre-allocated, the max number of memory blocks is limited so this
931    * should not cause overflows */
932   vec = sink->vec;
933   map = sink->map;
934
935   size = 0;
936   for (i = 0; i < n_mem; i++) {
937     mem = gst_buffer_peek_memory (buffer, i);
938     gst_memory_map (mem, &map[i], GST_MAP_READ);
939
940     vec[i].buffer = map[i].data;
941     vec[i].size = map[i].size;
942
943     size += map[i].size;
944   }
945
946   sink->bytes_to_serve += size;
947
948   /* grab lock while iterating and sending to clients, this should be
949    * fast as UDP never blocks */
950   g_mutex_lock (&sink->client_lock);
951   GST_LOG_OBJECT (bsink, "about to send %" G_GSIZE_FORMAT " bytes in %u blocks",
952       size, n_mem);
953
954   no_clients = 0;
955   num = 0;
956   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
957     GstUDPClient *client;
958     GSocket *socket;
959     GSocketFamily family;
960     gint count;
961
962     client = (GstUDPClient *) clients->data;
963     no_clients++;
964     GST_LOG_OBJECT (sink, "sending %" G_GSIZE_FORMAT " bytes to client %p",
965         size, client);
966
967     family = g_socket_address_get_family (G_SOCKET_ADDRESS (client->addr));
968     /* Select socket to send from for this address */
969     if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
970       socket = sink->used_socket_v6;
971     else
972       socket = sink->used_socket;
973
974     count = sink->send_duplicates ? client->add_count : 1;
975
976     while (count--) {
977       gssize ret;
978
979       ret =
980           g_socket_send_message (socket, client->addr, vec, n_mem,
981           NULL, 0, 0, sink->cancellable, &err);
982
983       if (G_UNLIKELY (ret < 0)) {
984         if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
985           goto flushing;
986
987         /* we continue after posting a warning, next packets might be ok
988          * again */
989         if (size > UDP_MAX_SIZE) {
990           GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
991               ("Attempting to send a UDP packet larger than maximum size "
992                   "(%" G_GSIZE_FORMAT " > %d)", size, UDP_MAX_SIZE),
993               ("Reason: %s", err ? err->message : "unknown reason"));
994         } else {
995           GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
996               ("Error sending UDP packet"), ("Reason: %s",
997                   err ? err->message : "unknown reason"));
998         }
999         g_clear_error (&err);
1000       } else {
1001         num++;
1002         client->bytes_sent += ret;
1003         client->packets_sent++;
1004         sink->bytes_served += ret;
1005       }
1006     }
1007   }
1008   g_mutex_unlock (&sink->client_lock);
1009
1010   /* unmap all memory again */
1011   for (i = 0; i < n_mem; i++)
1012     gst_memory_unmap (map[i].memory, &map[i]);
1013
1014   GST_LOG_OBJECT (sink, "sent %" G_GSIZE_FORMAT " bytes to %d (of %d) clients",
1015       size, num, no_clients);
1016
1017   return GST_FLOW_OK;
1018
1019 no_data:
1020   {
1021     return GST_FLOW_OK;
1022   }
1023 flushing:
1024   {
1025     GST_DEBUG ("we are flushing");
1026     g_mutex_unlock (&sink->client_lock);
1027     g_clear_error (&err);
1028
1029     /* unmap all memory */
1030     for (i = 0; i < n_mem; i++)
1031       gst_memory_unmap (map[i].memory, &map[i]);
1032
1033     return GST_FLOW_FLUSHING;
1034   }
1035 }
1036
1037 static void
1038 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
1039     const gchar * string)
1040 {
1041   gchar **clients;
1042   gint i;
1043
1044   clients = g_strsplit (string, ",", 0);
1045
1046   g_mutex_lock (&sink->client_lock);
1047   /* clear all existing clients */
1048   gst_multiudpsink_clear_internal (sink, FALSE);
1049   for (i = 0; clients[i]; i++) {
1050     gchar *host, *p;
1051     gint64 port = 0;
1052
1053     host = clients[i];
1054     p = strstr (clients[i], ":");
1055     if (p != NULL) {
1056       *p = '\0';
1057       port = g_ascii_strtoll (p + 1, NULL, 10);
1058     }
1059     if (port != 0)
1060       gst_multiudpsink_add_internal (sink, host, port, FALSE);
1061   }
1062   g_mutex_unlock (&sink->client_lock);
1063
1064   g_strfreev (clients);
1065 }
1066
1067 static gchar *
1068 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
1069 {
1070   GString *str;
1071   GList *clients;
1072
1073   str = g_string_new ("");
1074
1075   g_mutex_lock (&sink->client_lock);
1076   clients = sink->clients;
1077   while (clients) {
1078     GstUDPClient *client;
1079     gint count;
1080
1081     client = (GstUDPClient *) clients->data;
1082
1083     clients = g_list_next (clients);
1084
1085     count = client->add_count;
1086     while (count--) {
1087       g_string_append_printf (str, "%s:%d%s", client->host, client->port,
1088           (clients || count > 1 ? "," : ""));
1089     }
1090   }
1091   g_mutex_unlock (&sink->client_lock);
1092
1093   return g_string_free (str, FALSE);
1094 }
1095
1096 static void
1097 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
1098 {
1099   /* don't touch on -1 */
1100   if (sink->qos_dscp < 0)
1101     return;
1102
1103   if (socket == NULL)
1104     return;
1105
1106 #ifdef IP_TOS
1107   {
1108     gint tos;
1109     gint fd;
1110
1111     fd = g_socket_get_fd (socket);
1112
1113     GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
1114
1115     /* Extract and shift 6 bits of DSFIELD */
1116     tos = (sink->qos_dscp & 0x3f) << 2;
1117
1118     if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
1119       GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno));
1120     }
1121 #ifdef IPV6_TCLASS
1122     if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) {
1123       GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno));
1124     }
1125 #endif
1126   }
1127 #endif
1128 }
1129
1130 static void
1131 gst_multiudpsink_set_property (GObject * object, guint prop_id,
1132     const GValue * value, GParamSpec * pspec)
1133 {
1134   GstMultiUDPSink *udpsink;
1135
1136   udpsink = GST_MULTIUDPSINK (object);
1137
1138   switch (prop_id) {
1139     case PROP_SOCKET:
1140       if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
1141           udpsink->close_socket) {
1142         GError *err = NULL;
1143
1144         if (!g_socket_close (udpsink->socket, &err)) {
1145           GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
1146               err->message);
1147           g_clear_error (&err);
1148         }
1149       }
1150       if (udpsink->socket)
1151         g_object_unref (udpsink->socket);
1152       udpsink->socket = g_value_dup_object (value);
1153       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
1154       break;
1155     case PROP_SOCKET_V6:
1156       if (udpsink->socket_v6 != NULL
1157           && udpsink->socket_v6 != udpsink->used_socket_v6
1158           && udpsink->close_socket) {
1159         GError *err = NULL;
1160
1161         if (!g_socket_close (udpsink->socket_v6, &err)) {
1162           GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
1163               err->message);
1164           g_clear_error (&err);
1165         }
1166       }
1167       if (udpsink->socket_v6)
1168         g_object_unref (udpsink->socket_v6);
1169       udpsink->socket_v6 = g_value_dup_object (value);
1170       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
1171       break;
1172     case PROP_CLOSE_SOCKET:
1173       udpsink->close_socket = g_value_get_boolean (value);
1174       break;
1175     case PROP_CLIENTS:
1176       gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
1177       break;
1178     case PROP_AUTO_MULTICAST:
1179       udpsink->auto_multicast = g_value_get_boolean (value);
1180       break;
1181     case PROP_MULTICAST_IFACE:
1182       g_free (udpsink->multi_iface);
1183
1184       if (g_value_get_string (value) == NULL)
1185         udpsink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1186       else
1187         udpsink->multi_iface = g_value_dup_string (value);
1188       break;
1189     case PROP_TTL:
1190       udpsink->ttl = g_value_get_int (value);
1191       break;
1192     case PROP_TTL_MC:
1193       udpsink->ttl_mc = g_value_get_int (value);
1194       break;
1195     case PROP_LOOP:
1196       udpsink->loop = g_value_get_boolean (value);
1197       break;
1198     case PROP_FORCE_IPV4:
1199       udpsink->force_ipv4 = g_value_get_boolean (value);
1200       break;
1201     case PROP_QOS_DSCP:
1202       udpsink->qos_dscp = g_value_get_int (value);
1203       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
1204       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
1205       break;
1206     case PROP_SEND_DUPLICATES:
1207       udpsink->send_duplicates = g_value_get_boolean (value);
1208       break;
1209     case PROP_BUFFER_SIZE:
1210       udpsink->buffer_size = g_value_get_int (value);
1211       break;
1212     case PROP_BIND_ADDRESS:
1213       g_free (udpsink->bind_address);
1214       udpsink->bind_address = g_value_dup_string (value);
1215       break;
1216     case PROP_BIND_PORT:
1217       udpsink->bind_port = g_value_get_int (value);
1218       break;
1219     default:
1220       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1221       break;
1222   }
1223 }
1224
1225 static void
1226 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
1227     GParamSpec * pspec)
1228 {
1229   GstMultiUDPSink *udpsink;
1230
1231   udpsink = GST_MULTIUDPSINK (object);
1232
1233   switch (prop_id) {
1234     case PROP_BYTES_TO_SERVE:
1235       g_value_set_uint64 (value, udpsink->bytes_to_serve);
1236       break;
1237     case PROP_BYTES_SERVED:
1238       g_value_set_uint64 (value, udpsink->bytes_served);
1239       break;
1240     case PROP_SOCKET:
1241       g_value_set_object (value, udpsink->socket);
1242       break;
1243     case PROP_SOCKET_V6:
1244       g_value_set_object (value, udpsink->socket_v6);
1245       break;
1246     case PROP_CLOSE_SOCKET:
1247       g_value_set_boolean (value, udpsink->close_socket);
1248       break;
1249     case PROP_USED_SOCKET:
1250       g_value_set_object (value, udpsink->used_socket);
1251       break;
1252     case PROP_USED_SOCKET_V6:
1253       g_value_set_object (value, udpsink->used_socket_v6);
1254       break;
1255     case PROP_CLIENTS:
1256       g_value_take_string (value,
1257           gst_multiudpsink_get_clients_string (udpsink));
1258       break;
1259     case PROP_AUTO_MULTICAST:
1260       g_value_set_boolean (value, udpsink->auto_multicast);
1261       break;
1262     case PROP_MULTICAST_IFACE:
1263       g_value_set_string (value, udpsink->multi_iface);
1264       break;
1265     case PROP_TTL:
1266       g_value_set_int (value, udpsink->ttl);
1267       break;
1268     case PROP_TTL_MC:
1269       g_value_set_int (value, udpsink->ttl_mc);
1270       break;
1271     case PROP_LOOP:
1272       g_value_set_boolean (value, udpsink->loop);
1273       break;
1274     case PROP_FORCE_IPV4:
1275       g_value_set_boolean (value, udpsink->force_ipv4);
1276       break;
1277     case PROP_QOS_DSCP:
1278       g_value_set_int (value, udpsink->qos_dscp);
1279       break;
1280     case PROP_SEND_DUPLICATES:
1281       g_value_set_boolean (value, udpsink->send_duplicates);
1282       break;
1283     case PROP_BUFFER_SIZE:
1284       g_value_set_int (value, udpsink->buffer_size);
1285       break;
1286     case PROP_BIND_ADDRESS:
1287       g_value_set_string (value, udpsink->bind_address);
1288       break;
1289     case PROP_BIND_PORT:
1290       g_value_set_int (value, udpsink->bind_port);
1291       break;
1292     default:
1293       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1294       break;
1295   }
1296 }
1297
1298 static gboolean
1299 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
1300     GstUDPClient * client)
1301 {
1302   GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1303   GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1304   GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
1305   GSocket *socket;
1306   GError *err = NULL;
1307
1308   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
1309
1310   if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
1311     goto invalid_family;
1312
1313   /* Select socket to send from for this address */
1314   if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1315     socket = sink->used_socket_v6;
1316   else
1317     socket = sink->used_socket;
1318
1319   if (g_inet_address_get_is_multicast (addr)) {
1320     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
1321     if (sink->auto_multicast) {
1322       GST_DEBUG_OBJECT (sink, "autojoining group");
1323       if (!g_socket_join_multicast_group (socket, addr, FALSE,
1324               sink->multi_iface, &err))
1325         goto join_group_failed;
1326     }
1327     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
1328     g_socket_set_multicast_loopback (socket, sink->loop);
1329     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
1330     g_socket_set_multicast_ttl (socket, sink->ttl_mc);
1331   } else {
1332     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
1333     g_socket_set_ttl (socket, sink->ttl);
1334   }
1335   return TRUE;
1336
1337   /* ERRORS */
1338 join_group_failed:
1339   {
1340     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1341     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1342         ("Could not join multicast group: %s",
1343             err ? err->message : "unknown reason"));
1344     g_clear_error (&err);
1345     return FALSE;
1346   }
1347 invalid_family:
1348   {
1349     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1350     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1351         ("Invalid address family (got %d)", family));
1352     return FALSE;
1353   }
1354 }
1355
1356 /* create a socket for sending to remote machine */
1357 static gboolean
1358 gst_multiudpsink_start (GstBaseSink * bsink)
1359 {
1360   GstMultiUDPSink *sink;
1361   GList *clients;
1362   GstUDPClient *client;
1363   GError *err = NULL;
1364
1365   sink = GST_MULTIUDPSINK (bsink);
1366
1367   sink->external_socket = FALSE;
1368
1369   if (sink->socket) {
1370     GST_DEBUG_OBJECT (sink, "using configured socket");
1371     if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) {
1372       sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket));
1373       sink->external_socket = TRUE;
1374     } else {
1375       sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
1376       sink->external_socket = TRUE;
1377     }
1378   }
1379
1380   if (sink->socket_v6) {
1381     GST_DEBUG_OBJECT (sink, "using configured IPv6 socket");
1382     g_return_val_if_fail (g_socket_get_family (sink->socket) !=
1383         G_SOCKET_FAMILY_IPV6, FALSE);
1384
1385     if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) {
1386       GST_ERROR_OBJECT (sink,
1387           "Provided different IPv6 sockets in socket and socket-v6 properties");
1388       return FALSE;
1389     }
1390
1391     sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6));
1392     sink->external_socket = TRUE;
1393   }
1394
1395   if (!sink->used_socket && !sink->used_socket_v6) {
1396     GSocketAddress *bind_addr;
1397     GInetAddress *bind_iaddr;
1398
1399     if (sink->bind_address) {
1400       GSocketFamily family;
1401
1402       bind_iaddr = g_inet_address_new_from_string (sink->bind_address);
1403       if (!bind_iaddr) {
1404         GList *results;
1405         GResolver *resolver;
1406
1407         resolver = g_resolver_get_default ();
1408         results =
1409             g_resolver_lookup_by_name (resolver, sink->bind_address,
1410             sink->cancellable, &err);
1411         if (!results) {
1412           g_object_unref (resolver);
1413           goto name_resolve;
1414         }
1415         bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
1416         g_resolver_free_addresses (results);
1417         g_object_unref (resolver);
1418       }
1419
1420       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1421       g_object_unref (bind_iaddr);
1422       family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
1423
1424       if ((sink->used_socket =
1425               g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1426                   G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
1427         g_object_unref (bind_addr);
1428         goto no_socket;
1429       }
1430
1431       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1432       if (err != NULL)
1433         goto bind_error;
1434     } else {
1435       /* create sender sockets if none available */
1436       if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
1437                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1438         goto no_socket;
1439
1440       bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
1441       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1442       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1443       g_object_unref (bind_addr);
1444       g_object_unref (bind_iaddr);
1445       if (err != NULL)
1446         goto bind_error;
1447
1448       if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
1449                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
1450                   &err)) == NULL) {
1451         GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s",
1452             err->message);
1453         g_clear_error (&err);
1454       } else {
1455         bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
1456         bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1457         g_socket_bind (sink->used_socket_v6, bind_addr, TRUE, &err);
1458         g_object_unref (bind_addr);
1459         g_object_unref (bind_iaddr);
1460         if (err != NULL)
1461           goto bind_error;
1462       }
1463     }
1464   }
1465 #ifdef SO_SNDBUF
1466   {
1467     socklen_t len;
1468     gint sndsize, ret;
1469
1470     len = sizeof (sndsize);
1471     if (sink->buffer_size != 0) {
1472       sndsize = sink->buffer_size;
1473
1474       GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
1475       /* set buffer size, Note that on Linux this is typically limited to a
1476        * maximum of around 100K. Also a minimum of 128 bytes is required on
1477        * Linux. */
1478
1479       if (sink->used_socket) {
1480         ret =
1481             setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1482             SO_SNDBUF, (void *) &sndsize, len);
1483         if (ret != 0) {
1484           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1485               ("Could not create a buffer of requested %d bytes, %d: %s",
1486                   sndsize, ret, g_strerror (errno)));
1487         }
1488       }
1489
1490       if (sink->used_socket_v6) {
1491         ret =
1492             setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1493             SO_SNDBUF, (void *) &sndsize, len);
1494         if (ret != 0) {
1495           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1496               ("Could not create a buffer of requested %d bytes, %d: %s",
1497                   sndsize, ret, g_strerror (errno)));
1498         }
1499       }
1500     }
1501
1502     /* read the value of the receive buffer. Note that on linux this returns 2x the
1503      * value we set because the kernel allocates extra memory for metadata.
1504      * The default on Linux is about 100K (which is about 50K without metadata) */
1505     if (sink->used_socket) {
1506       ret =
1507           getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1508           SO_SNDBUF, (void *) &sndsize, &len);
1509       if (ret == 0)
1510         GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize);
1511       else
1512         GST_DEBUG_OBJECT (sink, "could not get UDP buffer size");
1513     }
1514
1515     if (sink->used_socket_v6) {
1516       ret =
1517           getsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1518           SO_SNDBUF, (void *) &sndsize, &len);
1519       if (ret == 0)
1520         GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize);
1521       else
1522         GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size");
1523     }
1524   }
1525 #endif
1526
1527 #ifdef SO_BINDTODEVICE
1528   if (sink->multi_iface) {
1529     if (sink->used_socket) {
1530       if (setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1531               SO_BINDTODEVICE, sink->multi_iface,
1532               strlen (sink->multi_iface)) < 0)
1533         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed: %s",
1534             strerror (errno));
1535     }
1536     if (sink->used_socket_v6) {
1537       if (setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1538               SO_BINDTODEVICE, sink->multi_iface,
1539               strlen (sink->multi_iface)) < 0)
1540         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed (v6): %s",
1541             strerror (errno));
1542     }
1543   }
1544 #endif
1545
1546   if (sink->used_socket)
1547     g_socket_set_broadcast (sink->used_socket, TRUE);
1548   if (sink->used_socket_v6)
1549     g_socket_set_broadcast (sink->used_socket_v6, TRUE);
1550
1551   sink->bytes_to_serve = 0;
1552   sink->bytes_served = 0;
1553
1554   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket);
1555   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6);
1556
1557   /* look for multicast clients and join multicast groups appropriately
1558      set also ttl and multicast loopback delivery appropriately  */
1559   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
1560     client = (GstUDPClient *) clients->data;
1561
1562     if (!gst_multiudpsink_configure_client (sink, client))
1563       return FALSE;
1564   }
1565   return TRUE;
1566
1567   /* ERRORS */
1568 no_socket:
1569   {
1570     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1571         ("Could not create socket: %s", err->message));
1572     g_clear_error (&err);
1573     return FALSE;
1574   }
1575 bind_error:
1576   {
1577     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1578         ("Failed to bind socket: %s", err->message));
1579     g_clear_error (&err);
1580     return FALSE;
1581   }
1582 name_resolve:
1583   {
1584     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1585         ("Failed to resolve bind address %s: %s", sink->bind_address,
1586             err->message));
1587     g_clear_error (&err);
1588     return FALSE;
1589   }
1590 }
1591
1592 static gboolean
1593 gst_multiudpsink_stop (GstBaseSink * bsink)
1594 {
1595   GstMultiUDPSink *udpsink;
1596
1597   udpsink = GST_MULTIUDPSINK (bsink);
1598
1599   if (udpsink->used_socket) {
1600     if (udpsink->close_socket || !udpsink->external_socket) {
1601       GError *err = NULL;
1602
1603       if (!g_socket_close (udpsink->used_socket, &err)) {
1604         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1605         g_clear_error (&err);
1606       }
1607     }
1608
1609     g_object_unref (udpsink->used_socket);
1610     udpsink->used_socket = NULL;
1611   }
1612
1613   if (udpsink->used_socket_v6) {
1614     if (udpsink->close_socket || !udpsink->external_socket) {
1615       GError *err = NULL;
1616
1617       if (!g_socket_close (udpsink->used_socket_v6, &err)) {
1618         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1619         g_clear_error (&err);
1620       }
1621     }
1622
1623     g_object_unref (udpsink->used_socket_v6);
1624     udpsink->used_socket_v6 = NULL;
1625   }
1626
1627   return TRUE;
1628 }
1629
1630 static gint
1631 gst_udp_client_compare_socket_family (GstUDPClient * a, GstUDPClient * b)
1632 {
1633   GSocketFamily fa = g_socket_address_get_family (a->addr);
1634   GSocketFamily fb = g_socket_address_get_family (b->addr);
1635
1636   if (fa == fb)
1637     return 0;
1638
1639   /* a should go before b */
1640   if (fa == G_SOCKET_FAMILY_IPV4 && fb == G_SOCKET_FAMILY_IPV6)
1641     return -1;
1642
1643   /* b should go before a */
1644   return 1;
1645 }
1646
1647 static void
1648 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1649     gint port, gboolean lock)
1650 {
1651   GSocketFamily family;
1652   GstUDPClient *client;
1653   GstUDPClient udpclient;
1654   GTimeVal now;
1655   GList *find;
1656
1657   udpclient.host = (gchar *) host;
1658   udpclient.port = port;
1659
1660   GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1661
1662   if (lock)
1663     g_mutex_lock (&sink->client_lock);
1664
1665   find = g_list_find_custom (sink->clients, &udpclient,
1666       (GCompareFunc) client_compare);
1667
1668   if (find) {
1669     client = (GstUDPClient *) find->data;
1670
1671     family = g_socket_address_get_family (client->addr);
1672
1673     GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1674         client->add_count, host, port);
1675   } else {
1676     client = gst_udp_client_new (sink, host, port);
1677     if (!client)
1678       goto error;
1679
1680     family = g_socket_address_get_family (client->addr);
1681
1682     g_get_current_time (&now);
1683     client->connect_time = GST_TIMEVAL_TO_TIME (now);
1684
1685     if (sink->used_socket)
1686       gst_multiudpsink_configure_client (sink, client);
1687
1688     GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1689
1690     /* keep IPv4 clients at the beginning, and IPv6 at the end, we can make
1691      * use of this in gst_multiudpsink_render_buffers() */
1692     sink->clients = g_list_insert_sorted (sink->clients, client,
1693         (GCompareFunc) gst_udp_client_compare_socket_family);
1694
1695     if (family == G_SOCKET_FAMILY_IPV4)
1696       ++sink->num_v4_unique;
1697     else
1698       ++sink->num_v6_unique;
1699   }
1700
1701   ++client->add_count;
1702
1703   if (family == G_SOCKET_FAMILY_IPV4)
1704     ++sink->num_v4_all;
1705   else
1706     ++sink->num_v6_all;
1707
1708   if (lock)
1709     g_mutex_unlock (&sink->client_lock);
1710
1711   g_signal_emit (G_OBJECT (sink),
1712       gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1713
1714   GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1715   return;
1716
1717   /* ERRORS */
1718 error:
1719   {
1720     GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1721         port);
1722     if (lock)
1723       g_mutex_unlock (&sink->client_lock);
1724     return;
1725   }
1726 }
1727
1728 void
1729 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1730 {
1731   gst_multiudpsink_add_internal (sink, host, port, TRUE);
1732 }
1733
1734 void
1735 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1736 {
1737   GSocketFamily family;
1738   GList *find;
1739   GstUDPClient udpclient;
1740   GstUDPClient *client;
1741   GTimeVal now;
1742
1743   udpclient.host = (gchar *) host;
1744   udpclient.port = port;
1745
1746   g_mutex_lock (&sink->client_lock);
1747   find = g_list_find_custom (sink->clients, &udpclient,
1748       (GCompareFunc) client_compare);
1749   if (!find)
1750     goto not_found;
1751
1752   client = (GstUDPClient *) find->data;
1753
1754   GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1755       client->add_count, host, port);
1756
1757   --client->add_count;
1758
1759   family = g_socket_address_get_family (client->addr);
1760   if (family == G_SOCKET_FAMILY_IPV4)
1761     --sink->num_v4_all;
1762   else
1763     --sink->num_v6_all;
1764
1765   if (client->add_count == 0) {
1766     GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1767     GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1768     GSocket *socket;
1769
1770     /* Select socket to send from for this address */
1771     if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1772       socket = sink->used_socket_v6;
1773     else
1774       socket = sink->used_socket;
1775
1776     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1777
1778     g_get_current_time (&now);
1779     client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
1780
1781     if (socket && sink->auto_multicast
1782         && g_inet_address_get_is_multicast (addr)) {
1783       GError *err = NULL;
1784
1785       if (!g_socket_leave_multicast_group (socket, addr, FALSE,
1786               sink->multi_iface, &err)) {
1787         GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s",
1788             err->message);
1789         g_clear_error (&err);
1790       }
1791     }
1792
1793     if (family == G_SOCKET_FAMILY_IPV4)
1794       --sink->num_v4_unique;
1795     else
1796       --sink->num_v6_unique;
1797
1798     /* Unlock to emit signal before we delete the actual client */
1799     g_mutex_unlock (&sink->client_lock);
1800     g_signal_emit (G_OBJECT (sink),
1801         gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1802     g_mutex_lock (&sink->client_lock);
1803
1804     sink->clients = g_list_delete_link (sink->clients, find);
1805
1806     gst_udp_client_unref (client);
1807   }
1808   g_mutex_unlock (&sink->client_lock);
1809
1810   return;
1811
1812   /* ERRORS */
1813 not_found:
1814   {
1815     g_mutex_unlock (&sink->client_lock);
1816     GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1817         host, port);
1818     return;
1819   }
1820 }
1821
1822 static void
1823 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1824 {
1825   GST_DEBUG_OBJECT (sink, "clearing");
1826   /* we only need to remove the client structure, there is no additional
1827    * socket or anything to free for UDP */
1828   if (lock)
1829     g_mutex_lock (&sink->client_lock);
1830   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, sink);
1831   g_list_free (sink->clients);
1832   sink->clients = NULL;
1833   sink->num_v4_unique = 0;
1834   sink->num_v4_all = 0;
1835   sink->num_v6_unique = 0;
1836   sink->num_v6_all = 0;
1837   if (lock)
1838     g_mutex_unlock (&sink->client_lock);
1839 }
1840
1841 void
1842 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1843 {
1844   gst_multiudpsink_clear_internal (sink, TRUE);
1845 }
1846
1847 GstStructure *
1848 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1849     gint port)
1850 {
1851   GstUDPClient *client;
1852   GstStructure *result = NULL;
1853   GstUDPClient udpclient;
1854   GList *find;
1855
1856   udpclient.host = (gchar *) host;
1857   udpclient.port = port;
1858
1859   g_mutex_lock (&sink->client_lock);
1860
1861   find = g_list_find_custom (sink->clients, &udpclient,
1862       (GCompareFunc) client_compare);
1863   if (!find)
1864     goto not_found;
1865
1866   GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1867
1868   client = (GstUDPClient *) find->data;
1869
1870   result = gst_structure_new_empty ("multiudpsink-stats");
1871
1872   gst_structure_set (result,
1873       "bytes-sent", G_TYPE_UINT64, client->bytes_sent,
1874       "packets-sent", G_TYPE_UINT64, client->packets_sent,
1875       "connect-time", G_TYPE_UINT64, client->connect_time,
1876       "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL);
1877
1878   g_mutex_unlock (&sink->client_lock);
1879
1880   return result;
1881
1882   /* ERRORS */
1883 not_found:
1884   {
1885     g_mutex_unlock (&sink->client_lock);
1886     GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1887         host, port);
1888     /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1889      * confuse/break python bindings */
1890     return gst_structure_new_empty ("multiudpsink-stats");
1891   }
1892 }
1893
1894 static gboolean
1895 gst_multiudpsink_unlock (GstBaseSink * bsink)
1896 {
1897   GstMultiUDPSink *sink;
1898
1899   sink = GST_MULTIUDPSINK (bsink);
1900
1901   g_cancellable_cancel (sink->cancellable);
1902
1903   return TRUE;
1904 }
1905
1906 static gboolean
1907 gst_multiudpsink_unlock_stop (GstBaseSink * bsink)
1908 {
1909   GstMultiUDPSink *sink;
1910
1911   sink = GST_MULTIUDPSINK (bsink);
1912
1913   g_cancellable_reset (sink->cancellable);
1914
1915   return TRUE;
1916 }