upload tizen1.0 source
[framework/multimedia/gst-plugins-good0.10.git] / gst / udp / gstmultiudpsink.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-multiudpsink
23  * @see_also: udpsink, multifdsink
24  *
25  * multiudpsink is a network sink that sends UDP packets to multiple
26  * clients.
27  * It can be combined with rtp payload encoders to implement RTP streaming.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33 #include "gstudp-marshal.h"
34 #include "gstmultiudpsink.h"
35
36 #include <stdio.h>
37 #include <stdlib.h>
38 #ifdef HAVE_UNISTD_H
39 #include <unistd.h>
40 #endif
41 #include <errno.h>
42 #include <string.h>
43
44 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
45 #define GST_CAT_DEFAULT (multiudpsink_debug)
46
47 #define UDP_MAX_SIZE 65507
48
49 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
50     GST_PAD_SINK,
51     GST_PAD_ALWAYS,
52     GST_STATIC_CAPS_ANY);
53
54 /* MultiUDPSink signals and args */
55 enum
56 {
57   /* methods */
58   SIGNAL_ADD,
59   SIGNAL_REMOVE,
60   SIGNAL_CLEAR,
61   SIGNAL_GET_STATS,
62
63   /* signals */
64   SIGNAL_CLIENT_ADDED,
65   SIGNAL_CLIENT_REMOVED,
66
67   /* FILL ME */
68   LAST_SIGNAL
69 };
70
71 #define DEFAULT_SOCKFD             -1
72 #define DEFAULT_CLOSEFD            TRUE
73 #define DEFAULT_SOCK               -1
74 #define DEFAULT_CLIENTS            NULL
75 #define DEFAULT_FAMILY             0
76 /* FIXME, this should be disabled by default, we don't need to join a multicast
77  * group for sending, if this socket is also used for receiving, it should
78  * be configured in the element that does the receive. */
79 #define DEFAULT_AUTO_MULTICAST     TRUE
80 #define DEFAULT_TTL                64
81 #define DEFAULT_TTL_MC             1
82 #define DEFAULT_LOOP               TRUE
83 #define DEFAULT_QOS_DSCP           -1
84 #define DEFAULT_SEND_DUPLICATES    TRUE
85 #define DEFAULT_BUFFER_SIZE        0
86
87 enum
88 {
89   PROP_0,
90   PROP_BYTES_TO_SERVE,
91   PROP_BYTES_SERVED,
92   PROP_SOCKFD,
93   PROP_CLOSEFD,
94   PROP_SOCK,
95   PROP_CLIENTS,
96   PROP_AUTO_MULTICAST,
97   PROP_TTL,
98   PROP_TTL_MC,
99   PROP_LOOP,
100   PROP_QOS_DSCP,
101   PROP_SEND_DUPLICATES,
102   PROP_BUFFER_SIZE,
103   PROP_LAST
104 };
105
106 #define CLOSE_IF_REQUESTED(udpctx)                                        \
107 G_STMT_START {                                                            \
108   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
109     CLOSE_SOCKET(udpctx->sock);                                           \
110     if (udpctx->sock == udpctx->sockfd)                                   \
111       udpctx->sockfd = DEFAULT_SOCKFD;                                    \
112   }                                                                       \
113   udpctx->sock = DEFAULT_SOCK;                                            \
114 } G_STMT_END
115
116 static void gst_multiudpsink_base_init (gpointer g_class);
117 static void gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass);
118 static void gst_multiudpsink_init (GstMultiUDPSink * udpsink);
119 static void gst_multiudpsink_finalize (GObject * object);
120
121 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
122     GstBuffer * buffer);
123 #ifndef G_OS_WIN32              /* sendmsg() is not available on Windows */
124 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
125     GstBufferList * list);
126 #endif
127 static GstStateChangeReturn gst_multiudpsink_change_state (GstElement *
128     element, GstStateChange transition);
129
130 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
131     const GValue * value, GParamSpec * pspec);
132 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
133     GValue * value, GParamSpec * pspec);
134
135 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
136     const gchar * host, gint port, gboolean lock);
137 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
138     gboolean lock);
139
140 static GstElementClass *parent_class = NULL;
141
142 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
143
144 GType
145 gst_multiudpsink_get_type (void)
146 {
147   static GType multiudpsink_type = 0;
148
149   if (!multiudpsink_type) {
150     static const GTypeInfo multiudpsink_info = {
151       sizeof (GstMultiUDPSinkClass),
152       gst_multiudpsink_base_init,
153       NULL,
154       (GClassInitFunc) gst_multiudpsink_class_init,
155       NULL,
156       NULL,
157       sizeof (GstMultiUDPSink),
158       0,
159       (GInstanceInitFunc) gst_multiudpsink_init,
160       NULL
161     };
162
163     multiudpsink_type =
164         g_type_register_static (GST_TYPE_BASE_SINK, "GstMultiUDPSink",
165         &multiudpsink_info, 0);
166   }
167   return multiudpsink_type;
168 }
169
170 static void
171 gst_multiudpsink_base_init (gpointer g_class)
172 {
173   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
174
175   gst_element_class_add_pad_template (element_class,
176       gst_static_pad_template_get (&sink_template));
177
178   gst_element_class_set_details_simple (element_class, "UDP packet sender",
179       "Sink/Network",
180       "Send data over the network via UDP",
181       "Wim Taymans <wim.taymans@gmail.com>");
182 }
183
184 static void
185 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
186 {
187   GObjectClass *gobject_class;
188   GstElementClass *gstelement_class;
189   GstBaseSinkClass *gstbasesink_class;
190
191   gobject_class = (GObjectClass *) klass;
192   gstelement_class = (GstElementClass *) klass;
193   gstbasesink_class = (GstBaseSinkClass *) klass;
194
195   parent_class = g_type_class_peek_parent (klass);
196
197   gobject_class->set_property = gst_multiudpsink_set_property;
198   gobject_class->get_property = gst_multiudpsink_get_property;
199   gobject_class->finalize = gst_multiudpsink_finalize;
200
201   /**
202    * GstMultiUDPSink::add:
203    * @gstmultiudpsink: the sink on which the signal is emitted
204    * @host: the hostname/IP address of the client to add
205    * @port: the port of the client to add
206    *
207    * Add a client with destination @host and @port to the list of
208    * clients. When the same host/port pair is added multiple times, the
209    * send-duplicates property defines if the packets are sent multiple times to
210    * the same host/port pair or not.
211    *
212    * When a host/port pair is added multiple times, an equal amount of remove
213    * calls must be performed to actually remove the host/port pair from the list
214    * of destinations.
215    */
216   gst_multiudpsink_signals[SIGNAL_ADD] =
217       g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
218       G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
219       NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
220       G_TYPE_STRING, G_TYPE_INT);
221   /**
222    * GstMultiUDPSink::remove:
223    * @gstmultiudpsink: the sink on which the signal is emitted
224    * @host: the hostname/IP address of the client to remove
225    * @port: the port of the client to remove
226    *
227    * Remove the client with destination @host and @port from the list of
228    * clients.
229    */
230   gst_multiudpsink_signals[SIGNAL_REMOVE] =
231       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
232       G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
233       NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
234       G_TYPE_STRING, G_TYPE_INT);
235   /**
236    * GstMultiUDPSink::clear:
237    * @gstmultiudpsink: the sink on which the signal is emitted
238    *
239    * Clear the list of clients.
240    */
241   gst_multiudpsink_signals[SIGNAL_CLEAR] =
242       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
243       G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
244       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
245   /**
246    * GstMultiUDPSink::get-stats:
247    * @gstmultiudpsink: the sink on which the signal is emitted
248    * @host: the hostname/IP address of the client to get stats on
249    * @port: the port of the client to get stats on
250    *
251    * Get the statistics of the client with destination @host and @port.
252    *
253    * Returns: a GValueArray of uint64: bytes_sent, packets_sent,
254    *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
255    */
256   gst_multiudpsink_signals[SIGNAL_GET_STATS] =
257       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
258       G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
259       NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
260       G_TYPE_STRING, G_TYPE_INT);
261   /**
262    * GstMultiUDPSink::client-added:
263    * @gstmultiudpsink: the sink emitting the signal
264    * @host: the hostname/IP address of the added client
265    * @port: the port of the added client
266    *
267    * Signal emited when a new client is added to the list of
268    * clients.
269    */
270   gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
271       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
272       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
273       NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
274       G_TYPE_STRING, G_TYPE_INT);
275   /**
276    * GstMultiUDPSink::client-removed:
277    * @gstmultiudpsink: the sink emitting the signal
278    * @host: the hostname/IP address of the removed client
279    * @port: the port of the removed client
280    *
281    * Signal emited when a client is removed from the list of
282    * clients.
283    */
284   gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
285       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
286       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
287           client_removed), NULL, NULL, gst_udp_marshal_VOID__STRING_INT,
288       G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
289
290   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
291       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
292           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
293           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
294   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
295       g_param_spec_uint64 ("bytes-served", "Bytes served",
296           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
297           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
298   g_object_class_install_property (gobject_class, PROP_SOCKFD,
299       g_param_spec_int ("sockfd", "Socket Handle",
300           "Socket to use for UDP sending. (-1 == allocate)",
301           -1, G_MAXINT, DEFAULT_SOCKFD,
302           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
304       g_param_spec_boolean ("closefd", "Close sockfd",
305           "Close sockfd if passed as property on state change",
306           DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
307   g_object_class_install_property (gobject_class, PROP_SOCK,
308       g_param_spec_int ("sock", "Socket Handle",
309           "Socket currently in use for UDP sending. (-1 == no socket)",
310           -1, G_MAXINT, DEFAULT_SOCK,
311           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
312   g_object_class_install_property (gobject_class, PROP_CLIENTS,
313       g_param_spec_string ("clients", "Clients",
314           "A comma separated list of host:port pairs with destinations",
315           DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
317       g_param_spec_boolean ("auto-multicast",
318           "Automatically join/leave multicast groups",
319           "Automatically join/leave the multicast groups, FALSE means user"
320           " has to do it himself", DEFAULT_AUTO_MULTICAST,
321           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
322   g_object_class_install_property (gobject_class, PROP_TTL,
323       g_param_spec_int ("ttl", "Unicast TTL",
324           "Used for setting the unicast TTL parameter",
325           0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
326   g_object_class_install_property (gobject_class, PROP_TTL_MC,
327       g_param_spec_int ("ttl-mc", "Multicast TTL",
328           "Used for setting the multicast TTL parameter",
329           0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
330   g_object_class_install_property (gobject_class, PROP_LOOP,
331       g_param_spec_boolean ("loop", "Multicast Loopback",
332           "Used for setting the multicast loop parameter. TRUE = enable,"
333           " FALSE = disable", DEFAULT_LOOP,
334           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
336       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
337           "Quality of Service, differentiated services code point (-1 default)",
338           -1, 63, DEFAULT_QOS_DSCP,
339           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340   /**
341    * GstMultiUDPSink::send-duplicates
342    *
343    * When a host/port pair is added mutliple times, send the packet to the host
344    * multiple times as well.
345    *
346    * Since: 0.10.26
347    */
348   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
349       g_param_spec_boolean ("send-duplicates", "Send Duplicates",
350           "When a distination/port pair is added multiple times, send packets "
351           "multiple times as well", DEFAULT_SEND_DUPLICATES,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353
354   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
355       g_param_spec_int ("buffer-size", "Buffer Size",
356           "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
357           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358
359   gstelement_class->change_state = gst_multiudpsink_change_state;
360
361   gstbasesink_class->render = gst_multiudpsink_render;
362 #ifndef G_OS_WIN32
363   gstbasesink_class->render_list = gst_multiudpsink_render_list;
364 #endif
365   klass->add = gst_multiudpsink_add;
366   klass->remove = gst_multiudpsink_remove;
367   klass->clear = gst_multiudpsink_clear;
368   klass->get_stats = gst_multiudpsink_get_stats;
369
370   GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
371 }
372
373
374 static void
375 gst_multiudpsink_init (GstMultiUDPSink * sink)
376 {
377   WSA_STARTUP (sink);
378
379   sink->client_lock = g_mutex_new ();
380   sink->sock = DEFAULT_SOCK;
381   sink->sockfd = DEFAULT_SOCKFD;
382   sink->closefd = DEFAULT_CLOSEFD;
383   sink->externalfd = (sink->sockfd != -1);
384   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
385   sink->ttl = DEFAULT_TTL;
386   sink->ttl_mc = DEFAULT_TTL_MC;
387   sink->loop = DEFAULT_LOOP;
388   sink->qos_dscp = DEFAULT_QOS_DSCP;
389   sink->ss_family = DEFAULT_FAMILY;
390   sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
391 }
392
393 static GstUDPClient *
394 create_client (GstMultiUDPSink * sink, const gchar * host, gint port)
395 {
396   GstUDPClient *client;
397
398   client = g_slice_new0 (GstUDPClient);
399   client->refcount = 1;
400   client->host = g_strdup (host);
401   client->port = port;
402
403   return client;
404 }
405
406 static void
407 free_client (GstUDPClient * client)
408 {
409   g_free (client->host);
410   g_slice_free (GstUDPClient, client);
411 }
412
413 static gint
414 client_compare (GstUDPClient * a, GstUDPClient * b)
415 {
416   if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
417     return 0;
418
419   return 1;
420 }
421
422 static void
423 gst_multiudpsink_finalize (GObject * object)
424 {
425   GstMultiUDPSink *sink;
426
427   sink = GST_MULTIUDPSINK (object);
428
429   g_list_foreach (sink->clients, (GFunc) free_client, NULL);
430   g_list_free (sink->clients);
431
432   if (sink->sockfd >= 0 && sink->closefd)
433     CLOSE_SOCKET (sink->sockfd);
434
435   g_mutex_free (sink->client_lock);
436
437   WSA_CLEANUP (object);
438
439   G_OBJECT_CLASS (parent_class)->finalize (object);
440 }
441
442 static gboolean
443 socket_error_is_ignorable (void)
444 {
445 #ifdef G_OS_WIN32
446   /* Windows doesn't seem to have an EAGAIN for sockets */
447   return WSAGetLastError () == WSAEINTR;
448 #else
449   return errno == EINTR || errno == EAGAIN;
450 #endif
451 }
452
453 static int
454 socket_last_error_code (void)
455 {
456 #ifdef G_OS_WIN32
457   return WSAGetLastError ();
458 #else
459   return errno;
460 #endif
461 }
462
463 static gchar *
464 socket_last_error_message (void)
465 {
466 #ifdef G_OS_WIN32
467   int errorcode = WSAGetLastError ();
468   wchar_t buf[1024];
469   DWORD result =
470       FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
471       NULL, errorcode, 0, (LPSTR) buf, sizeof (buf) / sizeof (wchar_t), NULL);
472   if (FAILED (result)) {
473     return g_strdup ("failed to get error message from system");
474   } else {
475     gchar *res =
476         g_convert ((gchar *) buf, -1, "UTF-16", "UTF-8", NULL, NULL, NULL);
477     /* g_convert() internally calls windows functions which reset the
478        windows error code, so fix it up again like this */
479     WSASetLastError (errorcode);
480     return res;
481   }
482 #else
483   return g_strdup (g_strerror (errno));
484 #endif
485 }
486
487 static GstFlowReturn
488 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
489 {
490   GstMultiUDPSink *sink;
491   gint ret, size, num = 0, no_clients = 0;
492   guint8 *data;
493   GList *clients;
494   gint len;
495
496   sink = GST_MULTIUDPSINK (bsink);
497
498   size = GST_BUFFER_SIZE (buffer);
499   data = GST_BUFFER_DATA (buffer);
500
501   if (size > UDP_MAX_SIZE) {
502     GST_WARNING ("Attempting to send a UDP packet larger than maximum "
503         "size (%d > %d)", size, UDP_MAX_SIZE);
504   }
505
506   sink->bytes_to_serve += size;
507
508   /* grab lock while iterating and sending to clients, this should be
509    * fast as UDP never blocks */
510   g_mutex_lock (sink->client_lock);
511   GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
512
513   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
514     GstUDPClient *client;
515     gint count;
516
517     client = (GstUDPClient *) clients->data;
518     no_clients++;
519     GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
520
521     count = sink->send_duplicates ? client->refcount : 1;
522
523     while (count--) {
524       while (TRUE) {
525         len = gst_udp_get_sockaddr_length (&client->theiraddr);
526
527         ret = sendto (*client->sock,
528 #ifdef G_OS_WIN32
529             (char *) data,
530 #else
531             data,
532 #endif
533             size, 0, (struct sockaddr *) &client->theiraddr, len);
534
535         if (ret < 0) {
536           /* some error, just warn, it's likely recoverable and we don't want to
537            * break streaming. We break so that we stop retrying for this client. */
538           if (!socket_error_is_ignorable ()) {
539             gchar *errormessage = socket_last_error_message ();
540             GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client,
541                 socket_last_error_code (), errormessage);
542             g_free (errormessage);
543             break;
544           }
545         } else {
546           num++;
547           client->bytes_sent += ret;
548           client->packets_sent++;
549           sink->bytes_served += ret;
550           break;
551         }
552       }
553     }
554   }
555   g_mutex_unlock (sink->client_lock);
556
557   GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
558       no_clients);
559
560   return GST_FLOW_OK;
561 }
562
563 #ifndef G_OS_WIN32
564 static GstFlowReturn
565 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list)
566 {
567   GstMultiUDPSink *sink;
568   GList *clients;
569   gint ret, size = 0, num = 0, no_clients = 0;
570   struct iovec *iov;
571   struct msghdr msg = { 0 };
572
573   GstBufferListIterator *it;
574   guint gsize;
575   GstBuffer *buf;
576
577   sink = GST_MULTIUDPSINK (bsink);
578
579   g_return_val_if_fail (list != NULL, GST_FLOW_ERROR);
580
581   it = gst_buffer_list_iterate (list);
582   g_return_val_if_fail (it != NULL, GST_FLOW_ERROR);
583
584   while (gst_buffer_list_iterator_next_group (it)) {
585     msg.msg_iovlen = 0;
586     size = 0;
587
588     if ((gsize = gst_buffer_list_iterator_n_buffers (it)) == 0) {
589       goto invalid_list;
590     }
591
592     iov = (struct iovec *) g_malloc (gsize * sizeof (struct iovec));
593     msg.msg_iov = iov;
594
595     while ((buf = gst_buffer_list_iterator_next (it))) {
596       if (GST_BUFFER_SIZE (buf) > UDP_MAX_SIZE) {
597         GST_WARNING ("Attempting to send a UDP packet larger than maximum "
598             "size (%d > %d)", GST_BUFFER_SIZE (buf), UDP_MAX_SIZE);
599       }
600
601       msg.msg_iov[msg.msg_iovlen].iov_len = GST_BUFFER_SIZE (buf);
602       msg.msg_iov[msg.msg_iovlen].iov_base = GST_BUFFER_DATA (buf);
603       msg.msg_iovlen++;
604       size += GST_BUFFER_SIZE (buf);
605     }
606
607     sink->bytes_to_serve += size;
608
609     /* grab lock while iterating and sending to clients, this should be
610      * fast as UDP never blocks */
611     g_mutex_lock (sink->client_lock);
612     GST_LOG_OBJECT (bsink, "about to send %d bytes", size);
613
614     for (clients = sink->clients; clients; clients = g_list_next (clients)) {
615       GstUDPClient *client;
616       gint count;
617
618       client = (GstUDPClient *) clients->data;
619       no_clients++;
620       GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
621
622       count = sink->send_duplicates ? client->refcount : 1;
623
624       while (count--) {
625         while (TRUE) {
626           msg.msg_name = (void *) &client->theiraddr;
627           msg.msg_namelen = sizeof (client->theiraddr);
628           ret = sendmsg (*client->sock, &msg, 0);
629
630           if (ret < 0) {
631             if (!socket_error_is_ignorable ()) {
632               break;
633             }
634           } else {
635             num++;
636             client->bytes_sent += ret;
637             client->packets_sent++;
638             sink->bytes_served += ret;
639             break;
640           }
641         }
642       }
643     }
644     g_mutex_unlock (sink->client_lock);
645
646     g_free (iov);
647     msg.msg_iov = NULL;
648
649     GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num,
650         no_clients);
651   }
652
653   gst_buffer_list_iterator_free (it);
654
655   return GST_FLOW_OK;
656
657 invalid_list:
658   gst_buffer_list_iterator_free (it);
659   return GST_FLOW_ERROR;
660 }
661 #endif
662
663 static void
664 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
665     const gchar * string)
666 {
667   gchar **clients;
668   gint i;
669
670   clients = g_strsplit (string, ",", 0);
671
672   g_mutex_lock (sink->client_lock);
673   /* clear all existing clients */
674   gst_multiudpsink_clear_internal (sink, FALSE);
675   for (i = 0; clients[i]; i++) {
676     gchar *host, *p;
677     gint port = 0;
678
679     host = clients[i];
680     p = strstr (clients[i], ":");
681     if (p != NULL) {
682       *p = '\0';
683       port = atoi (p + 1);
684     }
685     if (port != 0)
686       gst_multiudpsink_add_internal (sink, host, port, FALSE);
687   }
688   g_mutex_unlock (sink->client_lock);
689
690   g_strfreev (clients);
691 }
692
693 static gchar *
694 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
695 {
696   GString *str;
697   GList *clients;
698
699   str = g_string_new ("");
700
701   g_mutex_lock (sink->client_lock);
702   clients = sink->clients;
703   while (clients) {
704     GstUDPClient *client;
705     gint count;
706
707     client = (GstUDPClient *) clients->data;
708
709     clients = g_list_next (clients);
710
711     count = client->refcount;
712     while (count--) {
713       g_string_append_printf (str, "%s:%d%s", client->host, client->port,
714           (clients || count > 1 ? "," : ""));
715     }
716   }
717   g_mutex_unlock (sink->client_lock);
718
719   return g_string_free (str, FALSE);
720 }
721
722 static void
723 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink)
724 {
725   gint tos;
726
727   /* don't touch on -1 */
728   if (sink->qos_dscp < 0)
729     return;
730
731   if (sink->sock < 0)
732     return;
733
734   GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
735
736   /* Extract and shift 6 bits of DSFIELD */
737   tos = (sink->qos_dscp & 0x3f) << 2;
738
739   if (setsockopt (sink->sock, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
740     gchar *errormessage = socket_last_error_message ();
741     GST_ERROR_OBJECT (sink, "could not set TOS: %s", errormessage);
742     g_free (errormessage);
743   }
744 #ifdef IPV6_TCLASS
745   if (setsockopt (sink->sock, IPPROTO_IPV6, IPV6_TCLASS, &tos,
746           sizeof (tos)) < 0) {
747     gchar *errormessage = socket_last_error_message ();
748     GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", errormessage);
749     g_free (errormessage);
750   }
751 #endif
752 }
753
754 static void
755 gst_multiudpsink_set_property (GObject * object, guint prop_id,
756     const GValue * value, GParamSpec * pspec)
757 {
758   GstMultiUDPSink *udpsink;
759
760   udpsink = GST_MULTIUDPSINK (object);
761
762   switch (prop_id) {
763     case PROP_SOCKFD:
764       if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock &&
765           udpsink->closefd)
766         CLOSE_SOCKET (udpsink->sockfd);
767       udpsink->sockfd = g_value_get_int (value);
768       GST_DEBUG_OBJECT (udpsink, "setting SOCKFD to %d", udpsink->sockfd);
769       break;
770     case PROP_CLOSEFD:
771       udpsink->closefd = g_value_get_boolean (value);
772       break;
773     case PROP_CLIENTS:
774       gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
775       break;
776     case PROP_AUTO_MULTICAST:
777       udpsink->auto_multicast = g_value_get_boolean (value);
778       break;
779     case PROP_TTL:
780       udpsink->ttl = g_value_get_int (value);
781       break;
782     case PROP_TTL_MC:
783       udpsink->ttl_mc = g_value_get_int (value);
784       break;
785     case PROP_LOOP:
786       udpsink->loop = g_value_get_boolean (value);
787       break;
788     case PROP_QOS_DSCP:
789       udpsink->qos_dscp = g_value_get_int (value);
790       gst_multiudpsink_setup_qos_dscp (udpsink);
791       break;
792     case PROP_SEND_DUPLICATES:
793       udpsink->send_duplicates = g_value_get_boolean (value);
794       break;
795     case PROP_BUFFER_SIZE:
796       udpsink->buffer_size = g_value_get_int (value);
797       break;
798     default:
799       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
800       break;
801   }
802 }
803
804 static void
805 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
806     GParamSpec * pspec)
807 {
808   GstMultiUDPSink *udpsink;
809
810   udpsink = GST_MULTIUDPSINK (object);
811
812   switch (prop_id) {
813     case PROP_BYTES_TO_SERVE:
814       g_value_set_uint64 (value, udpsink->bytes_to_serve);
815       break;
816     case PROP_BYTES_SERVED:
817       g_value_set_uint64 (value, udpsink->bytes_served);
818       break;
819     case PROP_SOCKFD:
820       g_value_set_int (value, udpsink->sockfd);
821       break;
822     case PROP_CLOSEFD:
823       g_value_set_boolean (value, udpsink->closefd);
824       break;
825     case PROP_SOCK:
826       g_value_set_int (value, udpsink->sock);
827       break;
828     case PROP_CLIENTS:
829       g_value_take_string (value,
830           gst_multiudpsink_get_clients_string (udpsink));
831       break;
832     case PROP_AUTO_MULTICAST:
833       g_value_set_boolean (value, udpsink->auto_multicast);
834       break;
835     case PROP_TTL:
836       g_value_set_int (value, udpsink->ttl);
837       break;
838     case PROP_TTL_MC:
839       g_value_set_int (value, udpsink->ttl_mc);
840       break;
841     case PROP_LOOP:
842       g_value_set_boolean (value, udpsink->loop);
843       break;
844     case PROP_QOS_DSCP:
845       g_value_set_int (value, udpsink->qos_dscp);
846       break;
847     case PROP_SEND_DUPLICATES:
848       g_value_set_boolean (value, udpsink->send_duplicates);
849       break;
850     case PROP_BUFFER_SIZE:
851       g_value_set_int (value, udpsink->buffer_size);
852       break;
853     default:
854       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
855       break;
856   }
857 }
858
859 static gboolean
860 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
861     GstUDPClient * client)
862 {
863   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
864
865   if (gst_udp_is_multicast (&client->theiraddr)) {
866     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
867     if (sink->auto_multicast) {
868       GST_DEBUG_OBJECT (sink, "autojoining group");
869       if (gst_udp_join_group (*(client->sock), &client->theiraddr, NULL)
870           != 0)
871         goto join_group_failed;
872     }
873     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
874     if (gst_udp_set_loop (sink->sock, sink->ss_family, sink->loop) != 0)
875       goto loop_failed;
876     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
877     if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl_mc, TRUE) != 0)
878       goto ttl_failed;
879   } else {
880     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
881     if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl, FALSE) != 0)
882       goto ttl_failed;
883   }
884   return TRUE;
885
886   /* ERRORS */
887 join_group_failed:
888   {
889     gchar *errormessage = socket_last_error_message ();
890     int errorcode = socket_last_error_code ();
891     CLOSE_IF_REQUESTED (sink);
892     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
893         ("Could not join multicast group (%d): %s", errorcode, errormessage));
894     g_free (errormessage);
895     return FALSE;
896   }
897 ttl_failed:
898   {
899     gchar *errormessage = socket_last_error_message ();
900     int errorcode = socket_last_error_code ();
901     CLOSE_IF_REQUESTED (sink);
902     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
903         ("Could not set TTL socket option (%d): %s", errorcode, errormessage));
904     g_free (errormessage);
905     return FALSE;
906   }
907 loop_failed:
908   {
909     gchar *errormessage = socket_last_error_message ();
910     int errorcode = socket_last_error_code ();
911     CLOSE_IF_REQUESTED (sink);
912     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
913         ("Could not set loopback socket option (%d): %s",
914             errorcode, errormessage));
915     g_free (errormessage);
916     return FALSE;
917   }
918 }
919
920 /* create a socket for sending to remote machine */
921 static gboolean
922 gst_multiudpsink_init_send (GstMultiUDPSink * sink)
923 {
924   guint bc_val;
925   GList *clients;
926   GstUDPClient *client;
927   int sndsize, ret;
928   socklen_t len;
929
930   if (sink->sockfd == -1) {
931     GST_DEBUG_OBJECT (sink, "creating sockets");
932     /* create sender socket try IP6, fall back to IP4 */
933     sink->ss_family = AF_INET6;
934     if ((sink->sock = socket (AF_INET6, SOCK_DGRAM, 0)) == -1) {
935       sink->ss_family = AF_INET;
936       if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1)
937         goto no_socket;
938     }
939
940     GST_DEBUG_OBJECT (sink, "have socket");
941     sink->externalfd = FALSE;
942   } else {
943     struct sockaddr_storage myaddr;
944
945     GST_DEBUG_OBJECT (sink, "using configured socket");
946     /* we use the configured socket, try to get some info about it */
947     len = sizeof (myaddr);
948     if (getsockname (sink->sockfd, (struct sockaddr *) &myaddr, &len) < 0)
949       goto getsockname_error;
950
951     sink->ss_family = myaddr.ss_family;
952     /* we use the configured socket */
953     sink->sock = sink->sockfd;
954     sink->externalfd = TRUE;
955   }
956
957   len = sizeof (sndsize);
958   if (sink->buffer_size != 0) {
959     sndsize = sink->buffer_size;
960
961     GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
962     /* set buffer size, Note that on Linux this is typically limited to a
963      * maximum of around 100K. Also a minimum of 128 bytes is required on
964      * Linux. */
965     ret =
966         setsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize,
967         len);
968     if (ret != 0) {
969       GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
970           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
971               sndsize, ret, g_strerror (errno), errno));
972     }
973   }
974
975   /* read the value of the receive buffer. Note that on linux this returns 2x the
976    * value we set because the kernel allocates extra memory for metadata.
977    * The default on Linux is about 100K (which is about 50K without metadata) */
978   ret =
979       getsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize, &len);
980   if (ret == 0)
981     GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize);
982   else
983     GST_DEBUG_OBJECT (sink, "could not get udp buffer size");
984
985
986   bc_val = 1;
987   if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
988           sizeof (bc_val)) < 0)
989     goto no_broadcast;
990
991   sink->bytes_to_serve = 0;
992   sink->bytes_served = 0;
993
994   gst_multiudpsink_setup_qos_dscp (sink);
995
996   /* look for multicast clients and join multicast groups appropriately
997      set also ttl and multicast loopback delivery appropriately  */
998   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
999     client = (GstUDPClient *) clients->data;
1000
1001     if (!gst_multiudpsink_configure_client (sink, client))
1002       return FALSE;
1003   }
1004   return TRUE;
1005
1006   /* ERRORS */
1007 no_socket:
1008   {
1009     gchar *errormessage = socket_last_error_message ();
1010     int errorcode = socket_last_error_code ();
1011     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1012         ("Could not create socket (%d): %s", errorcode, errormessage));
1013     g_free (errormessage);
1014     return FALSE;
1015   }
1016 getsockname_error:
1017   {
1018     gchar *errormessage = socket_last_error_message ();
1019     int errorcode = socket_last_error_code ();
1020     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1021         ("Could not getsockname (%d): %s", errorcode, errormessage));
1022     g_free (errormessage);
1023     return FALSE;
1024   }
1025 no_broadcast:
1026   {
1027     gchar *errormessage = socket_last_error_message ();
1028     int errorcode = socket_last_error_code ();
1029     CLOSE_IF_REQUESTED (sink);
1030     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1031         ("Could not set broadcast socket option (%d): %s",
1032             errorcode, errormessage));
1033     g_free (errormessage);
1034     return FALSE;
1035   }
1036 }
1037
1038 static void
1039 gst_multiudpsink_close (GstMultiUDPSink * sink)
1040 {
1041   CLOSE_IF_REQUESTED (sink);
1042 }
1043
1044 static void
1045 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1046     gint port, gboolean lock)
1047 {
1048   GstUDPClient *client;
1049   GstUDPClient udpclient;
1050   GTimeVal now;
1051   GList *find;
1052
1053   udpclient.host = (gchar *) host;
1054   udpclient.port = port;
1055
1056   GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1057
1058   if (lock)
1059     g_mutex_lock (sink->client_lock);
1060
1061   find = g_list_find_custom (sink->clients, &udpclient,
1062       (GCompareFunc) client_compare);
1063   if (find) {
1064     client = (GstUDPClient *) find->data;
1065
1066     GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1067         client->refcount, host, port);
1068     client->refcount++;
1069   } else {
1070     client = create_client (sink, host, port);
1071
1072     client->sock = &sink->sock;
1073
1074     if (gst_udp_get_addr (host, port, &client->theiraddr) < 0)
1075       goto getaddrinfo_error;
1076
1077     g_get_current_time (&now);
1078     client->connect_time = GST_TIMEVAL_TO_TIME (now);
1079
1080     if (*client->sock > 0) {
1081       gst_multiudpsink_configure_client (sink, client);
1082     }
1083
1084     GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1085     sink->clients = g_list_prepend (sink->clients, client);
1086   }
1087
1088   if (lock)
1089     g_mutex_unlock (sink->client_lock);
1090
1091   g_signal_emit (G_OBJECT (sink),
1092       gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1093
1094   GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1095   return;
1096
1097   /* ERRORS */
1098 getaddrinfo_error:
1099   {
1100     GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1101         port);
1102     GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?");
1103     free_client (client);
1104     if (lock)
1105       g_mutex_unlock (sink->client_lock);
1106     return;
1107   }
1108 }
1109
1110 void
1111 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1112 {
1113   gst_multiudpsink_add_internal (sink, host, port, TRUE);
1114 }
1115
1116 void
1117 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1118 {
1119   GList *find;
1120   GstUDPClient udpclient;
1121   GstUDPClient *client;
1122   GTimeVal now;
1123
1124   udpclient.host = (gchar *) host;
1125   udpclient.port = port;
1126
1127   g_mutex_lock (sink->client_lock);
1128   find = g_list_find_custom (sink->clients, &udpclient,
1129       (GCompareFunc) client_compare);
1130   if (!find)
1131     goto not_found;
1132
1133   client = (GstUDPClient *) find->data;
1134
1135   GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1136       client->refcount, host, port);
1137
1138   client->refcount--;
1139   if (client->refcount == 0) {
1140     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1141
1142     g_get_current_time (&now);
1143     client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
1144
1145     if (*(client->sock) != -1 && sink->auto_multicast
1146         && gst_udp_is_multicast (&client->theiraddr))
1147       gst_udp_leave_group (*(client->sock), &client->theiraddr);
1148
1149     /* Unlock to emit signal before we delete the actual client */
1150     g_mutex_unlock (sink->client_lock);
1151     g_signal_emit (G_OBJECT (sink),
1152         gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1153     g_mutex_lock (sink->client_lock);
1154
1155     sink->clients = g_list_delete_link (sink->clients, find);
1156
1157     free_client (client);
1158   }
1159   g_mutex_unlock (sink->client_lock);
1160
1161   return;
1162
1163   /* ERRORS */
1164 not_found:
1165   {
1166     g_mutex_unlock (sink->client_lock);
1167     GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1168         host, port);
1169     return;
1170   }
1171 }
1172
1173 static void
1174 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1175 {
1176   GST_DEBUG_OBJECT (sink, "clearing");
1177   /* we only need to remove the client structure, there is no additional
1178    * socket or anything to free for UDP */
1179   if (lock)
1180     g_mutex_lock (sink->client_lock);
1181   g_list_foreach (sink->clients, (GFunc) free_client, sink);
1182   g_list_free (sink->clients);
1183   sink->clients = NULL;
1184   if (lock)
1185     g_mutex_unlock (sink->client_lock);
1186 }
1187
1188 void
1189 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1190 {
1191   gst_multiudpsink_clear_internal (sink, TRUE);
1192 }
1193
1194 GValueArray *
1195 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1196     gint port)
1197 {
1198   GstUDPClient *client;
1199   GValueArray *result = NULL;
1200   GstUDPClient udpclient;
1201   GList *find;
1202   GValue value = { 0 };
1203
1204   udpclient.host = (gchar *) host;
1205   udpclient.port = port;
1206
1207   g_mutex_lock (sink->client_lock);
1208
1209   find = g_list_find_custom (sink->clients, &udpclient,
1210       (GCompareFunc) client_compare);
1211   if (!find)
1212     goto not_found;
1213
1214   GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1215
1216   client = (GstUDPClient *) find->data;
1217
1218   /* Result is a value array of (bytes_sent, packets_sent,
1219    * connect_time, disconnect_time), all as uint64 */
1220   result = g_value_array_new (4);
1221
1222   g_value_init (&value, G_TYPE_UINT64);
1223   g_value_set_uint64 (&value, client->bytes_sent);
1224   result = g_value_array_append (result, &value);
1225   g_value_unset (&value);
1226
1227   g_value_init (&value, G_TYPE_UINT64);
1228   g_value_set_uint64 (&value, client->packets_sent);
1229   result = g_value_array_append (result, &value);
1230   g_value_unset (&value);
1231
1232   g_value_init (&value, G_TYPE_UINT64);
1233   g_value_set_uint64 (&value, client->connect_time);
1234   result = g_value_array_append (result, &value);
1235   g_value_unset (&value);
1236
1237   g_value_init (&value, G_TYPE_UINT64);
1238   g_value_set_uint64 (&value, client->disconnect_time);
1239   result = g_value_array_append (result, &value);
1240   g_value_unset (&value);
1241
1242   g_mutex_unlock (sink->client_lock);
1243
1244   return result;
1245
1246   /* ERRORS */
1247 not_found:
1248   {
1249     g_mutex_unlock (sink->client_lock);
1250     GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1251         host, port);
1252     /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1253      * confuse/break python bindings */
1254     return g_value_array_new (0);
1255   }
1256 }
1257
1258 static GstStateChangeReturn
1259 gst_multiudpsink_change_state (GstElement * element, GstStateChange transition)
1260 {
1261   GstStateChangeReturn ret;
1262   GstMultiUDPSink *sink;
1263
1264   sink = GST_MULTIUDPSINK (element);
1265
1266   switch (transition) {
1267     case GST_STATE_CHANGE_READY_TO_PAUSED:
1268       if (!gst_multiudpsink_init_send (sink))
1269         goto no_init;
1270       break;
1271     default:
1272       break;
1273   }
1274
1275   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1276
1277   switch (transition) {
1278     case GST_STATE_CHANGE_PAUSED_TO_READY:
1279       gst_multiudpsink_close (sink);
1280       break;
1281     default:
1282       break;
1283   }
1284   return ret;
1285
1286   /* ERRORS */
1287 no_init:
1288   {
1289     /* _init_send() posted specific error already */
1290     return GST_STATE_CHANGE_FAILURE;
1291   }
1292 }