a302465bd131bd3721b03c1d899abc3accd2db89
[platform/upstream/gst-plugins-good.git] / gst / udp / gstmultiudpsink.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-multiudpsink
23  * @see_also: udpsink, multifdsink
24  *
25  * multiudpsink is a network sink that sends UDP packets to multiple
26  * clients.
27  * It can be combined with rtp payload encoders to implement RTP streaming.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33 #include "gstudp-marshal.h"
34 #include "gstmultiudpsink.h"
35
36 #include <stdio.h>
37 #include <stdlib.h>
38 #ifdef HAVE_UNISTD_H
39 #include <unistd.h>
40 #endif
41 #include <errno.h>
42 #include <string.h>
43
44 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
45 #define GST_CAT_DEFAULT (multiudpsink_debug)
46
47 #define UDP_MAX_SIZE 65507
48
49 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
50     GST_PAD_SINK,
51     GST_PAD_ALWAYS,
52     GST_STATIC_CAPS_ANY);
53
54 /* MultiUDPSink signals and args */
55 enum
56 {
57   /* methods */
58   SIGNAL_ADD,
59   SIGNAL_REMOVE,
60   SIGNAL_CLEAR,
61   SIGNAL_GET_STATS,
62
63   /* signals */
64   SIGNAL_CLIENT_ADDED,
65   SIGNAL_CLIENT_REMOVED,
66
67   /* FILL ME */
68   LAST_SIGNAL
69 };
70
71 #define DEFAULT_SOCKFD             -1
72 #define DEFAULT_CLOSEFD            TRUE
73 #define DEFAULT_SOCK               -1
74 #define DEFAULT_CLIENTS            NULL
75 #define DEFAULT_FAMILY             0
76 /* FIXME, this should be disabled by default, we don't need to join a multicast
77  * group for sending, if this socket is also used for receiving, it should
78  * be configured in the element that does the receive. */
79 #define DEFAULT_AUTO_MULTICAST     TRUE
80 #define DEFAULT_TTL                64
81 #define DEFAULT_TTL_MC             1
82 #define DEFAULT_LOOP               TRUE
83 #define DEFAULT_QOS_DSCP           -1
84 #define DEFAULT_SEND_DUPLICATES    TRUE
85 #define DEFAULT_BUFFER_SIZE        0
86
87 enum
88 {
89   PROP_0,
90   PROP_BYTES_TO_SERVE,
91   PROP_BYTES_SERVED,
92   PROP_SOCKFD,
93   PROP_CLOSEFD,
94   PROP_SOCK,
95   PROP_CLIENTS,
96   PROP_AUTO_MULTICAST,
97   PROP_TTL,
98   PROP_TTL_MC,
99   PROP_LOOP,
100   PROP_QOS_DSCP,
101   PROP_SEND_DUPLICATES,
102   PROP_BUFFER_SIZE,
103   PROP_LAST
104 };
105
106 #define CLOSE_IF_REQUESTED(udpctx)                                        \
107 G_STMT_START {                                                            \
108   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
109     CLOSE_SOCKET(udpctx->sock);                                           \
110     if (udpctx->sock == udpctx->sockfd)                                   \
111       udpctx->sockfd = DEFAULT_SOCKFD;                                    \
112   }                                                                       \
113   udpctx->sock = DEFAULT_SOCK;                                            \
114 } G_STMT_END
115
116 static void gst_multiudpsink_finalize (GObject * object);
117
118 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
119     GstBuffer * buffer);
120 #if 0
121 #ifndef G_OS_WIN32              /* sendmsg() is not available on Windows */
122 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
123     GstBufferList * list);
124 #endif
125 #endif
126 static GstStateChangeReturn gst_multiudpsink_change_state (GstElement *
127     element, GstStateChange transition);
128
129 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
130     const GValue * value, GParamSpec * pspec);
131 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
132     GValue * value, GParamSpec * pspec);
133
134 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
135     const gchar * host, gint port, gboolean lock);
136 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
137     gboolean lock);
138
139 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
140
141 #define gst_multiudpsink_parent_class parent_class
142 G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
143
144 static void
145 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
146 {
147   GObjectClass *gobject_class;
148   GstElementClass *gstelement_class;
149   GstBaseSinkClass *gstbasesink_class;
150
151   gobject_class = (GObjectClass *) klass;
152   gstelement_class = (GstElementClass *) klass;
153   gstbasesink_class = (GstBaseSinkClass *) klass;
154
155   parent_class = g_type_class_peek_parent (klass);
156
157   gobject_class->set_property = gst_multiudpsink_set_property;
158   gobject_class->get_property = gst_multiudpsink_get_property;
159   gobject_class->finalize = gst_multiudpsink_finalize;
160
161   /**
162    * GstMultiUDPSink::add:
163    * @gstmultiudpsink: the sink on which the signal is emitted
164    * @host: the hostname/IP address of the client to add
165    * @port: the port of the client to add
166    *
167    * Add a client with destination @host and @port to the list of
168    * clients. When the same host/port pair is added multiple times, the
169    * send-duplicates property defines if the packets are sent multiple times to
170    * the same host/port pair or not.
171    *
172    * When a host/port pair is added multiple times, an equal amount of remove
173    * calls must be performed to actually remove the host/port pair from the list
174    * of destinations.
175    */
176   gst_multiudpsink_signals[SIGNAL_ADD] =
177       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
178       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
179       G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
180       NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
181       G_TYPE_STRING, G_TYPE_INT);
182   /**
183    * GstMultiUDPSink::remove:
184    * @gstmultiudpsink: the sink on which the signal is emitted
185    * @host: the hostname/IP address of the client to remove
186    * @port: the port of the client to remove
187    *
188    * Remove the client with destination @host and @port from the list of
189    * clients.
190    */
191   gst_multiudpsink_signals[SIGNAL_REMOVE] =
192       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
193       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
194       G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
195       NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
196       G_TYPE_STRING, G_TYPE_INT);
197   /**
198    * GstMultiUDPSink::clear:
199    * @gstmultiudpsink: the sink on which the signal is emitted
200    *
201    * Clear the list of clients.
202    */
203   gst_multiudpsink_signals[SIGNAL_CLEAR] =
204       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
206       G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
207       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
208   /**
209    * GstMultiUDPSink::get-stats:
210    * @gstmultiudpsink: the sink on which the signal is emitted
211    * @host: the hostname/IP address of the client to get stats on
212    * @port: the port of the client to get stats on
213    *
214    * Get the statistics of the client with destination @host and @port.
215    *
216    * Returns: a GValueArray of uint64: bytes_sent, packets_sent,
217    *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
218    */
219   gst_multiudpsink_signals[SIGNAL_GET_STATS] =
220       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
221       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
222       G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
223       NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
224       G_TYPE_STRING, G_TYPE_INT);
225   /**
226    * GstMultiUDPSink::client-added:
227    * @gstmultiudpsink: the sink emitting the signal
228    * @host: the hostname/IP address of the added client
229    * @port: the port of the added client
230    *
231    * Signal emited when a new client is added to the list of
232    * clients.
233    */
234   gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
235       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
237       NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
238       G_TYPE_STRING, G_TYPE_INT);
239   /**
240    * GstMultiUDPSink::client-removed:
241    * @gstmultiudpsink: the sink emitting the signal
242    * @host: the hostname/IP address of the removed client
243    * @port: the port of the removed client
244    *
245    * Signal emited when a client is removed from the list of
246    * clients.
247    */
248   gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
249       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
250       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
251           client_removed), NULL, NULL, gst_udp_marshal_VOID__STRING_INT,
252       G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
253
254   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
255       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
256           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
257           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
258   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
259       g_param_spec_uint64 ("bytes-served", "Bytes served",
260           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
261           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_SOCKFD,
263       g_param_spec_int ("sockfd", "Socket Handle",
264           "Socket to use for UDP sending. (-1 == allocate)",
265           -1, G_MAXINT, DEFAULT_SOCKFD,
266           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
268       g_param_spec_boolean ("closefd", "Close sockfd",
269           "Close sockfd if passed as property on state change",
270           DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
271   g_object_class_install_property (gobject_class, PROP_SOCK,
272       g_param_spec_int ("sock", "Socket Handle",
273           "Socket currently in use for UDP sending. (-1 == no socket)",
274           -1, G_MAXINT, DEFAULT_SOCK,
275           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
276   g_object_class_install_property (gobject_class, PROP_CLIENTS,
277       g_param_spec_string ("clients", "Clients",
278           "A comma separated list of host:port pairs with destinations",
279           DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
280   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
281       g_param_spec_boolean ("auto-multicast",
282           "Automatically join/leave multicast groups",
283           "Automatically join/leave the multicast groups, FALSE means user"
284           " has to do it himself", DEFAULT_AUTO_MULTICAST,
285           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   g_object_class_install_property (gobject_class, PROP_TTL,
287       g_param_spec_int ("ttl", "Unicast TTL",
288           "Used for setting the unicast TTL parameter",
289           0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
290   g_object_class_install_property (gobject_class, PROP_TTL_MC,
291       g_param_spec_int ("ttl-mc", "Multicast TTL",
292           "Used for setting the multicast TTL parameter",
293           0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
294   g_object_class_install_property (gobject_class, PROP_LOOP,
295       g_param_spec_boolean ("loop", "Multicast Loopback",
296           "Used for setting the multicast loop parameter. TRUE = enable,"
297           " FALSE = disable", DEFAULT_LOOP,
298           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
299   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
300       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
301           "Quality of Service, differentiated services code point (-1 default)",
302           -1, 63, DEFAULT_QOS_DSCP,
303           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304   /**
305    * GstMultiUDPSink::send-duplicates
306    *
307    * When a host/port pair is added mutliple times, send the packet to the host
308    * multiple times as well.
309    *
310    * Since: 0.10.26
311    */
312   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
313       g_param_spec_boolean ("send-duplicates", "Send Duplicates",
314           "When a distination/port pair is added multiple times, send packets "
315           "multiple times as well", DEFAULT_SEND_DUPLICATES,
316           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
317
318   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
319       g_param_spec_int ("buffer-size", "Buffer Size",
320           "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
321           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
322
323   gstelement_class->change_state = gst_multiudpsink_change_state;
324
325   gst_element_class_add_pad_template (gstelement_class,
326       gst_static_pad_template_get (&sink_template));
327
328   gst_element_class_set_details_simple (gstelement_class, "UDP packet sender",
329       "Sink/Network",
330       "Send data over the network via UDP",
331       "Wim Taymans <wim.taymans@gmail.com>");
332
333   gstbasesink_class->render = gst_multiudpsink_render;
334 #if 0
335 #ifndef G_OS_WIN32
336   gstbasesink_class->render_list = gst_multiudpsink_render_list;
337 #endif
338 #endif
339   klass->add = gst_multiudpsink_add;
340   klass->remove = gst_multiudpsink_remove;
341   klass->clear = gst_multiudpsink_clear;
342   klass->get_stats = gst_multiudpsink_get_stats;
343
344   GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
345 }
346
347
348 static void
349 gst_multiudpsink_init (GstMultiUDPSink * sink)
350 {
351   WSA_STARTUP (sink);
352
353   sink->client_lock = g_mutex_new ();
354   sink->sock = DEFAULT_SOCK;
355   sink->sockfd = DEFAULT_SOCKFD;
356   sink->closefd = DEFAULT_CLOSEFD;
357   sink->externalfd = (sink->sockfd != -1);
358   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
359   sink->ttl = DEFAULT_TTL;
360   sink->ttl_mc = DEFAULT_TTL_MC;
361   sink->loop = DEFAULT_LOOP;
362   sink->qos_dscp = DEFAULT_QOS_DSCP;
363   sink->ss_family = DEFAULT_FAMILY;
364   sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
365 }
366
367 static GstUDPClient *
368 create_client (GstMultiUDPSink * sink, const gchar * host, gint port)
369 {
370   GstUDPClient *client;
371
372   client = g_slice_new0 (GstUDPClient);
373   client->refcount = 1;
374   client->host = g_strdup (host);
375   client->port = port;
376
377   return client;
378 }
379
380 static void
381 free_client (GstUDPClient * client)
382 {
383   g_free (client->host);
384   g_slice_free (GstUDPClient, client);
385 }
386
387 static gint
388 client_compare (GstUDPClient * a, GstUDPClient * b)
389 {
390   if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
391     return 0;
392
393   return 1;
394 }
395
396 static void
397 gst_multiudpsink_finalize (GObject * object)
398 {
399   GstMultiUDPSink *sink;
400
401   sink = GST_MULTIUDPSINK (object);
402
403   g_list_foreach (sink->clients, (GFunc) free_client, NULL);
404   g_list_free (sink->clients);
405
406   if (sink->sockfd >= 0 && sink->closefd)
407     CLOSE_SOCKET (sink->sockfd);
408
409   g_mutex_free (sink->client_lock);
410
411   WSA_CLEANUP (object);
412
413   G_OBJECT_CLASS (parent_class)->finalize (object);
414 }
415
416 static gboolean
417 socket_error_is_ignorable (void)
418 {
419 #ifdef G_OS_WIN32
420   /* Windows doesn't seem to have an EAGAIN for sockets */
421   return WSAGetLastError () == WSAEINTR;
422 #else
423   return errno == EINTR || errno == EAGAIN;
424 #endif
425 }
426
427 static int
428 socket_last_error_code (void)
429 {
430 #ifdef G_OS_WIN32
431   return WSAGetLastError ();
432 #else
433   return errno;
434 #endif
435 }
436
437 static gchar *
438 socket_last_error_message (void)
439 {
440 #ifdef G_OS_WIN32
441   int errorcode = WSAGetLastError ();
442   wchar_t buf[1024];
443   DWORD result =
444       FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
445       NULL, errorcode, 0, (LPSTR) buf, sizeof (buf) / sizeof (wchar_t), NULL);
446   if (FAILED (result)) {
447     return g_strdup ("failed to get error message from system");
448   } else {
449     gchar *res =
450         g_convert ((gchar *) buf, -1, "UTF-16", "UTF-8", NULL, NULL, NULL);
451     /* g_convert() internally calls windows functions which reset the
452        windows error code, so fix it up again like this */
453     WSASetLastError (errorcode);
454     return res;
455   }
456 #else
457   return g_strdup (g_strerror (errno));
458 #endif
459 }
460
461 #ifdef G_OS_WIN32
462 /* version without sendmsg */
463 static GstFlowReturn
464 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
465 {
466   GstMultiUDPSink *sink;
467   gint ret, num = 0, no_clients = 0;
468   gsize size;
469   guint8 *data;
470   GList *clients;
471   gint len;
472
473   sink = GST_MULTIUDPSINK (bsink);
474
475   data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
476
477   if (size > UDP_MAX_SIZE) {
478     GST_WARNING ("Attempting to send a UDP packet larger than maximum "
479         "size (%d > %d)", size, UDP_MAX_SIZE);
480   }
481
482   sink->bytes_to_serve += size;
483
484   /* grab lock while iterating and sending to clients, this should be
485    * fast as UDP never blocks */
486   g_mutex_lock (sink->client_lock);
487   GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
488
489   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
490     GstUDPClient *client;
491     gint count;
492
493     client = (GstUDPClient *) clients->data;
494     no_clients++;
495     GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
496
497     count = sink->send_duplicates ? client->refcount : 1;
498
499     while (count--) {
500       while (TRUE) {
501         len = gst_udp_get_sockaddr_length (&client->theiraddr);
502
503         ret = sendto (*client->sock,
504 #ifdef G_OS_WIN32
505             (char *) data,
506 #else
507             data,
508 #endif
509             size, 0, (struct sockaddr *) &client->theiraddr, len);
510
511         if (ret < 0) {
512           /* some error, just warn, it's likely recoverable and we don't want to
513            * break streaming. We break so that we stop retrying for this client. */
514           if (!socket_error_is_ignorable ()) {
515             gchar *errormessage = socket_last_error_message ();
516             GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client,
517                 socket_last_error_code (), errormessage);
518             g_free (errormessage);
519             break;
520           }
521         } else {
522           num++;
523           client->bytes_sent += ret;
524           client->packets_sent++;
525           sink->bytes_served += ret;
526           break;
527         }
528       }
529     }
530   }
531   g_mutex_unlock (sink->client_lock);
532
533   gst_buffer_unmap (buffer, data, size);
534
535   GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
536       no_clients);
537
538   return GST_FLOW_OK;
539 }
540 #else /* !G_OS_WIN32 */
541 /* version with sendmsg */
542 static GstFlowReturn
543 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
544 {
545   GstMultiUDPSink *sink;
546   GList *clients;
547   gint ret, size = 0, num = 0, no_clients = 0;
548   struct iovec *iov;
549   struct msghdr msg = { 0 };
550   guint n_mem, i;
551   gpointer bdata;
552   gsize bsize;
553   GstMemory *mem;
554
555   sink = GST_MULTIUDPSINK (bsink);
556
557   msg.msg_iovlen = 0;
558   size = 0;
559
560   n_mem = gst_buffer_n_memory (buffer);
561   if (n_mem == 0)
562     goto no_data;
563
564   iov = (struct iovec *) g_malloc (n_mem * sizeof (struct iovec));
565   msg.msg_iov = iov;
566
567   for (i = 0; i < n_mem; i++) {
568     mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ);
569     bdata = gst_memory_map (mem, &bsize, NULL, GST_MAP_READ);
570
571     if (bsize > UDP_MAX_SIZE) {
572       GST_WARNING ("Attempting to send a UDP packet larger than maximum "
573           "size (%" G_GSIZE_FORMAT " > %d)", bsize, UDP_MAX_SIZE);
574     }
575
576     msg.msg_iov[msg.msg_iovlen].iov_len = bsize;
577     msg.msg_iov[msg.msg_iovlen].iov_base = bdata;
578     msg.msg_iovlen++;
579
580     size += bsize;
581   }
582
583   sink->bytes_to_serve += size;
584
585   /* grab lock while iterating and sending to clients, this should be
586    * fast as UDP never blocks */
587   g_mutex_lock (sink->client_lock);
588   GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
589
590   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
591     GstUDPClient *client;
592     gint count;
593
594     client = (GstUDPClient *) clients->data;
595     no_clients++;
596     GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
597
598     count = sink->send_duplicates ? client->refcount : 1;
599
600     while (count--) {
601       while (TRUE) {
602         msg.msg_name = (void *) &client->theiraddr;
603         msg.msg_namelen = sizeof (client->theiraddr);
604         ret = sendmsg (*client->sock, &msg, 0);
605
606         if (ret < 0) {
607           if (!socket_error_is_ignorable ()) {
608             gchar *errormessage = socket_last_error_message ();
609             GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client,
610                 socket_last_error_code (), errormessage);
611             g_free (errormessage);
612             break;
613             break;
614           }
615         } else {
616           num++;
617           client->bytes_sent += ret;
618           client->packets_sent++;
619           sink->bytes_served += ret;
620           break;
621         }
622       }
623     }
624   }
625   g_mutex_unlock (sink->client_lock);
626
627   /* unmap all memory again */
628   for (i = 0; i < n_mem; i++) {
629     mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ);
630
631     bsize = msg.msg_iov[i].iov_len;
632     bdata = msg.msg_iov[i].iov_base;
633
634     gst_memory_unmap (mem, bdata, bsize);
635   }
636   g_free (iov);
637
638   GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
639       no_clients);
640
641   return GST_FLOW_OK;
642
643 no_data:
644   {
645     return GST_FLOW_OK;
646   }
647 }
648 #endif
649
650 #if 0
651 /* DISABLED, core sends buffers to our render one by one, we can't really do
652  * much better */
653 static GstFlowReturn
654 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list)
655 {
656 }
657 #endif
658
659 static void
660 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
661     const gchar * string)
662 {
663   gchar **clients;
664   gint i;
665
666   clients = g_strsplit (string, ",", 0);
667
668   g_mutex_lock (sink->client_lock);
669   /* clear all existing clients */
670   gst_multiudpsink_clear_internal (sink, FALSE);
671   for (i = 0; clients[i]; i++) {
672     gchar *host, *p;
673     gint port = 0;
674
675     host = clients[i];
676     p = strstr (clients[i], ":");
677     if (p != NULL) {
678       *p = '\0';
679       port = atoi (p + 1);
680     }
681     if (port != 0)
682       gst_multiudpsink_add_internal (sink, host, port, FALSE);
683   }
684   g_mutex_unlock (sink->client_lock);
685
686   g_strfreev (clients);
687 }
688
689 static gchar *
690 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
691 {
692   GString *str;
693   GList *clients;
694
695   str = g_string_new ("");
696
697   g_mutex_lock (sink->client_lock);
698   clients = sink->clients;
699   while (clients) {
700     GstUDPClient *client;
701     gint count;
702
703     client = (GstUDPClient *) clients->data;
704
705     clients = g_list_next (clients);
706
707     count = client->refcount;
708     while (count--) {
709       g_string_append_printf (str, "%s:%d%s", client->host, client->port,
710           (clients || count > 1 ? "," : ""));
711     }
712   }
713   g_mutex_unlock (sink->client_lock);
714
715   return g_string_free (str, FALSE);
716 }
717
718 static void
719 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink)
720 {
721   gint tos;
722
723   /* don't touch on -1 */
724   if (sink->qos_dscp < 0)
725     return;
726
727   if (sink->sock < 0)
728     return;
729
730   GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
731
732   /* Extract and shift 6 bits of DSFIELD */
733   tos = (sink->qos_dscp & 0x3f) << 2;
734
735   if (setsockopt (sink->sock, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
736     gchar *errormessage = socket_last_error_message ();
737     GST_ERROR_OBJECT (sink, "could not set TOS: %s", errormessage);
738     g_free (errormessage);
739   }
740 #ifdef IPV6_TCLASS
741   if (setsockopt (sink->sock, IPPROTO_IPV6, IPV6_TCLASS, &tos,
742           sizeof (tos)) < 0) {
743     gchar *errormessage = socket_last_error_message ();
744     GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", errormessage);
745     g_free (errormessage);
746   }
747 #endif
748 }
749
750 static void
751 gst_multiudpsink_set_property (GObject * object, guint prop_id,
752     const GValue * value, GParamSpec * pspec)
753 {
754   GstMultiUDPSink *udpsink;
755
756   udpsink = GST_MULTIUDPSINK (object);
757
758   switch (prop_id) {
759     case PROP_SOCKFD:
760       if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock &&
761           udpsink->closefd)
762         CLOSE_SOCKET (udpsink->sockfd);
763       udpsink->sockfd = g_value_get_int (value);
764       GST_DEBUG_OBJECT (udpsink, "setting SOCKFD to %d", udpsink->sockfd);
765       break;
766     case PROP_CLOSEFD:
767       udpsink->closefd = g_value_get_boolean (value);
768       break;
769     case PROP_CLIENTS:
770       gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
771       break;
772     case PROP_AUTO_MULTICAST:
773       udpsink->auto_multicast = g_value_get_boolean (value);
774       break;
775     case PROP_TTL:
776       udpsink->ttl = g_value_get_int (value);
777       break;
778     case PROP_TTL_MC:
779       udpsink->ttl_mc = g_value_get_int (value);
780       break;
781     case PROP_LOOP:
782       udpsink->loop = g_value_get_boolean (value);
783       break;
784     case PROP_QOS_DSCP:
785       udpsink->qos_dscp = g_value_get_int (value);
786       gst_multiudpsink_setup_qos_dscp (udpsink);
787       break;
788     case PROP_SEND_DUPLICATES:
789       udpsink->send_duplicates = g_value_get_boolean (value);
790       break;
791     case PROP_BUFFER_SIZE:
792       udpsink->buffer_size = g_value_get_int (value);
793       break;
794     default:
795       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
796       break;
797   }
798 }
799
800 static void
801 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
802     GParamSpec * pspec)
803 {
804   GstMultiUDPSink *udpsink;
805
806   udpsink = GST_MULTIUDPSINK (object);
807
808   switch (prop_id) {
809     case PROP_BYTES_TO_SERVE:
810       g_value_set_uint64 (value, udpsink->bytes_to_serve);
811       break;
812     case PROP_BYTES_SERVED:
813       g_value_set_uint64 (value, udpsink->bytes_served);
814       break;
815     case PROP_SOCKFD:
816       g_value_set_int (value, udpsink->sockfd);
817       break;
818     case PROP_CLOSEFD:
819       g_value_set_boolean (value, udpsink->closefd);
820       break;
821     case PROP_SOCK:
822       g_value_set_int (value, udpsink->sock);
823       break;
824     case PROP_CLIENTS:
825       g_value_take_string (value,
826           gst_multiudpsink_get_clients_string (udpsink));
827       break;
828     case PROP_AUTO_MULTICAST:
829       g_value_set_boolean (value, udpsink->auto_multicast);
830       break;
831     case PROP_TTL:
832       g_value_set_int (value, udpsink->ttl);
833       break;
834     case PROP_TTL_MC:
835       g_value_set_int (value, udpsink->ttl_mc);
836       break;
837     case PROP_LOOP:
838       g_value_set_boolean (value, udpsink->loop);
839       break;
840     case PROP_QOS_DSCP:
841       g_value_set_int (value, udpsink->qos_dscp);
842       break;
843     case PROP_SEND_DUPLICATES:
844       g_value_set_boolean (value, udpsink->send_duplicates);
845       break;
846     case PROP_BUFFER_SIZE:
847       g_value_set_int (value, udpsink->buffer_size);
848       break;
849     default:
850       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
851       break;
852   }
853 }
854
855 static gboolean
856 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
857     GstUDPClient * client)
858 {
859   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
860
861   if (gst_udp_is_multicast (&client->theiraddr)) {
862     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
863     if (sink->auto_multicast) {
864       GST_DEBUG_OBJECT (sink, "autojoining group");
865       if (gst_udp_join_group (*(client->sock), &client->theiraddr, NULL)
866           != 0)
867         goto join_group_failed;
868     }
869     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
870     if (gst_udp_set_loop (sink->sock, sink->ss_family, sink->loop) != 0)
871       goto loop_failed;
872     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
873     if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl_mc, TRUE) != 0)
874       goto ttl_failed;
875   } else {
876     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
877     if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl, FALSE) != 0)
878       goto ttl_failed;
879   }
880   return TRUE;
881
882   /* ERRORS */
883 join_group_failed:
884   {
885     gchar *errormessage = socket_last_error_message ();
886     int errorcode = socket_last_error_code ();
887     CLOSE_IF_REQUESTED (sink);
888     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
889         ("Could not join multicast group (%d): %s", errorcode, errormessage));
890     g_free (errormessage);
891     return FALSE;
892   }
893 ttl_failed:
894   {
895     gchar *errormessage = socket_last_error_message ();
896     int errorcode = socket_last_error_code ();
897     CLOSE_IF_REQUESTED (sink);
898     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
899         ("Could not set TTL socket option (%d): %s", errorcode, errormessage));
900     g_free (errormessage);
901     return FALSE;
902   }
903 loop_failed:
904   {
905     gchar *errormessage = socket_last_error_message ();
906     int errorcode = socket_last_error_code ();
907     CLOSE_IF_REQUESTED (sink);
908     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
909         ("Could not set loopback socket option (%d): %s",
910             errorcode, errormessage));
911     g_free (errormessage);
912     return FALSE;
913   }
914 }
915
916 /* create a socket for sending to remote machine */
917 static gboolean
918 gst_multiudpsink_init_send (GstMultiUDPSink * sink)
919 {
920   guint bc_val;
921   GList *clients;
922   GstUDPClient *client;
923   int sndsize, ret;
924   socklen_t len;
925
926   if (sink->sockfd == -1) {
927     GST_DEBUG_OBJECT (sink, "creating sockets");
928     /* create sender socket try IP6, fall back to IP4 */
929     sink->ss_family = AF_INET6;
930     if ((sink->sock = socket (AF_INET6, SOCK_DGRAM, 0)) == -1) {
931       sink->ss_family = AF_INET;
932       if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1)
933         goto no_socket;
934     }
935
936     GST_DEBUG_OBJECT (sink, "have socket");
937     sink->externalfd = FALSE;
938   } else {
939     struct sockaddr_storage myaddr;
940
941     GST_DEBUG_OBJECT (sink, "using configured socket");
942     /* we use the configured socket, try to get some info about it */
943     len = sizeof (myaddr);
944     if (getsockname (sink->sockfd, (struct sockaddr *) &myaddr, &len) < 0)
945       goto getsockname_error;
946
947     sink->ss_family = myaddr.ss_family;
948     /* we use the configured socket */
949     sink->sock = sink->sockfd;
950     sink->externalfd = TRUE;
951   }
952
953   len = sizeof (sndsize);
954   if (sink->buffer_size != 0) {
955     sndsize = sink->buffer_size;
956
957     GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
958     /* set buffer size, Note that on Linux this is typically limited to a
959      * maximum of around 100K. Also a minimum of 128 bytes is required on
960      * Linux. */
961     ret =
962         setsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize,
963         len);
964     if (ret != 0) {
965       GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
966           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
967               sndsize, ret, g_strerror (errno), errno));
968     }
969   }
970
971   /* read the value of the receive buffer. Note that on linux this returns 2x the
972    * value we set because the kernel allocates extra memory for metadata.
973    * The default on Linux is about 100K (which is about 50K without metadata) */
974   ret =
975       getsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize, &len);
976   if (ret == 0)
977     GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize);
978   else
979     GST_DEBUG_OBJECT (sink, "could not get udp buffer size");
980
981
982   bc_val = 1;
983   if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
984           sizeof (bc_val)) < 0)
985     goto no_broadcast;
986
987   sink->bytes_to_serve = 0;
988   sink->bytes_served = 0;
989
990   gst_multiudpsink_setup_qos_dscp (sink);
991
992   /* look for multicast clients and join multicast groups appropriately
993      set also ttl and multicast loopback delivery appropriately  */
994   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
995     client = (GstUDPClient *) clients->data;
996
997     if (!gst_multiudpsink_configure_client (sink, client))
998       return FALSE;
999   }
1000   return TRUE;
1001
1002   /* ERRORS */
1003 no_socket:
1004   {
1005     gchar *errormessage = socket_last_error_message ();
1006     int errorcode = socket_last_error_code ();
1007     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1008         ("Could not create socket (%d): %s", errorcode, errormessage));
1009     g_free (errormessage);
1010     return FALSE;
1011   }
1012 getsockname_error:
1013   {
1014     gchar *errormessage = socket_last_error_message ();
1015     int errorcode = socket_last_error_code ();
1016     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1017         ("Could not getsockname (%d): %s", errorcode, errormessage));
1018     g_free (errormessage);
1019     return FALSE;
1020   }
1021 no_broadcast:
1022   {
1023     gchar *errormessage = socket_last_error_message ();
1024     int errorcode = socket_last_error_code ();
1025     CLOSE_IF_REQUESTED (sink);
1026     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1027         ("Could not set broadcast socket option (%d): %s",
1028             errorcode, errormessage));
1029     g_free (errormessage);
1030     return FALSE;
1031   }
1032 }
1033
1034 static void
1035 gst_multiudpsink_close (GstMultiUDPSink * sink)
1036 {
1037   CLOSE_IF_REQUESTED (sink);
1038 }
1039
1040 static void
1041 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1042     gint port, gboolean lock)
1043 {
1044   GstUDPClient *client;
1045   GstUDPClient udpclient;
1046   GTimeVal now;
1047   GList *find;
1048
1049   udpclient.host = (gchar *) host;
1050   udpclient.port = port;
1051
1052   GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1053
1054   if (lock)
1055     g_mutex_lock (sink->client_lock);
1056
1057   find = g_list_find_custom (sink->clients, &udpclient,
1058       (GCompareFunc) client_compare);
1059   if (find) {
1060     client = (GstUDPClient *) find->data;
1061
1062     GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1063         client->refcount, host, port);
1064     client->refcount++;
1065   } else {
1066     client = create_client (sink, host, port);
1067
1068     client->sock = &sink->sock;
1069
1070     if (gst_udp_get_addr (host, port, &client->theiraddr) < 0)
1071       goto getaddrinfo_error;
1072
1073     g_get_current_time (&now);
1074     client->connect_time = GST_TIMEVAL_TO_TIME (now);
1075
1076     if (*client->sock > 0) {
1077       gst_multiudpsink_configure_client (sink, client);
1078     }
1079
1080     GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1081     sink->clients = g_list_prepend (sink->clients, client);
1082   }
1083
1084   if (lock)
1085     g_mutex_unlock (sink->client_lock);
1086
1087   g_signal_emit (G_OBJECT (sink),
1088       gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1089
1090   GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1091   return;
1092
1093   /* ERRORS */
1094 getaddrinfo_error:
1095   {
1096     GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1097         port);
1098     GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?");
1099     free_client (client);
1100     if (lock)
1101       g_mutex_unlock (sink->client_lock);
1102     return;
1103   }
1104 }
1105
1106 void
1107 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1108 {
1109   gst_multiudpsink_add_internal (sink, host, port, TRUE);
1110 }
1111
1112 void
1113 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1114 {
1115   GList *find;
1116   GstUDPClient udpclient;
1117   GstUDPClient *client;
1118   GTimeVal now;
1119
1120   udpclient.host = (gchar *) host;
1121   udpclient.port = port;
1122
1123   g_mutex_lock (sink->client_lock);
1124   find = g_list_find_custom (sink->clients, &udpclient,
1125       (GCompareFunc) client_compare);
1126   if (!find)
1127     goto not_found;
1128
1129   client = (GstUDPClient *) find->data;
1130
1131   GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1132       client->refcount, host, port);
1133
1134   client->refcount--;
1135   if (client->refcount == 0) {
1136     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1137
1138     g_get_current_time (&now);
1139     client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
1140
1141     if (*(client->sock) != -1 && sink->auto_multicast
1142         && gst_udp_is_multicast (&client->theiraddr))
1143       gst_udp_leave_group (*(client->sock), &client->theiraddr);
1144
1145     /* Unlock to emit signal before we delete the actual client */
1146     g_mutex_unlock (sink->client_lock);
1147     g_signal_emit (G_OBJECT (sink),
1148         gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1149     g_mutex_lock (sink->client_lock);
1150
1151     sink->clients = g_list_delete_link (sink->clients, find);
1152
1153     free_client (client);
1154   }
1155   g_mutex_unlock (sink->client_lock);
1156
1157   return;
1158
1159   /* ERRORS */
1160 not_found:
1161   {
1162     g_mutex_unlock (sink->client_lock);
1163     GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1164         host, port);
1165     return;
1166   }
1167 }
1168
1169 static void
1170 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1171 {
1172   GST_DEBUG_OBJECT (sink, "clearing");
1173   /* we only need to remove the client structure, there is no additional
1174    * socket or anything to free for UDP */
1175   if (lock)
1176     g_mutex_lock (sink->client_lock);
1177   g_list_foreach (sink->clients, (GFunc) free_client, sink);
1178   g_list_free (sink->clients);
1179   sink->clients = NULL;
1180   if (lock)
1181     g_mutex_unlock (sink->client_lock);
1182 }
1183
1184 void
1185 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1186 {
1187   gst_multiudpsink_clear_internal (sink, TRUE);
1188 }
1189
1190 GValueArray *
1191 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1192     gint port)
1193 {
1194   GstUDPClient *client;
1195   GValueArray *result = NULL;
1196   GstUDPClient udpclient;
1197   GList *find;
1198   GValue value = { 0 };
1199
1200   udpclient.host = (gchar *) host;
1201   udpclient.port = port;
1202
1203   g_mutex_lock (sink->client_lock);
1204
1205   find = g_list_find_custom (sink->clients, &udpclient,
1206       (GCompareFunc) client_compare);
1207   if (!find)
1208     goto not_found;
1209
1210   GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1211
1212   client = (GstUDPClient *) find->data;
1213
1214   /* Result is a value array of (bytes_sent, packets_sent,
1215    * connect_time, disconnect_time), all as uint64 */
1216   result = g_value_array_new (4);
1217
1218   g_value_init (&value, G_TYPE_UINT64);
1219   g_value_set_uint64 (&value, client->bytes_sent);
1220   result = g_value_array_append (result, &value);
1221   g_value_unset (&value);
1222
1223   g_value_init (&value, G_TYPE_UINT64);
1224   g_value_set_uint64 (&value, client->packets_sent);
1225   result = g_value_array_append (result, &value);
1226   g_value_unset (&value);
1227
1228   g_value_init (&value, G_TYPE_UINT64);
1229   g_value_set_uint64 (&value, client->connect_time);
1230   result = g_value_array_append (result, &value);
1231   g_value_unset (&value);
1232
1233   g_value_init (&value, G_TYPE_UINT64);
1234   g_value_set_uint64 (&value, client->disconnect_time);
1235   result = g_value_array_append (result, &value);
1236   g_value_unset (&value);
1237
1238   g_mutex_unlock (sink->client_lock);
1239
1240   return result;
1241
1242   /* ERRORS */
1243 not_found:
1244   {
1245     g_mutex_unlock (sink->client_lock);
1246     GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1247         host, port);
1248     /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1249      * confuse/break python bindings */
1250     return g_value_array_new (0);
1251   }
1252 }
1253
1254 static GstStateChangeReturn
1255 gst_multiudpsink_change_state (GstElement * element, GstStateChange transition)
1256 {
1257   GstStateChangeReturn ret;
1258   GstMultiUDPSink *sink;
1259
1260   sink = GST_MULTIUDPSINK (element);
1261
1262   switch (transition) {
1263     case GST_STATE_CHANGE_READY_TO_PAUSED:
1264       if (!gst_multiudpsink_init_send (sink))
1265         goto no_init;
1266       break;
1267     default:
1268       break;
1269   }
1270
1271   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1272
1273   switch (transition) {
1274     case GST_STATE_CHANGE_PAUSED_TO_READY:
1275       gst_multiudpsink_close (sink);
1276       break;
1277     default:
1278       break;
1279   }
1280   return ret;
1281
1282   /* ERRORS */
1283 no_init:
1284   {
1285     /* _init_send() posted specific error already */
1286     return GST_STATE_CHANGE_FAILURE;
1287   }
1288 }