various: fix pad template leaks
[platform/upstream/gstreamer.git] / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-udpsrc
23  * @see_also: udpsink, multifdsink
24  *
25  * udpsrc is a network source that reads UDP packets from the network.
26  * It can be combined with RTP depayloaders to implement RTP streaming.
27  *
28  * The udpsrc element supports automatic port allocation by setting the
29  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
30  * allocated port can be obtained by reading the port property.
31  *
32  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
33  * property to the IP address of the multicast group.
34  *
35  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
36  * property, udpsrc will then not allocate a socket itself but use the provided
37  * one.
38  *
39  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
40  * so that they can be autoplugged in GStreamer pipelines. This is very usefull
41  * for RTP implementations where the contents of the UDP packets is transfered
42  * out-of-bounds using SDP or other means.
43  *
44  * The #GstUDPSrc:buffer-size property is used to change the default kernel
45  * buffersizes used for receiving packets. The buffer size may be increased for
46  * high-volume connections, or may be decreased to limit the possible backlog of
47  * incoming data. The system places an absolute limit on these values, on Linux,
48  * for example, the default buffer size is typically 50K and can be increased to
49  * maximally 100K.
50  *
51  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
52  * number of bytes from the start of the raw udp packet and can be used to strip
53  * off proprietary header, for example.
54  *
55  * The udpsrc is always a live source. It does however not provide a #GstClock,
56  * this is left for upstream elements such as an RTP session manager or demuxer
57  * (such as an MPEG demuxer). As with all live sources, the captured buffers
58  * will have their timestamp set to the current running time of the pipeline.
59  *
60  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
61  * type URIs.
62  *
63  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
64  * will generate an element message named
65  * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
66  * if no data was recieved in the given timeout.
67  * The message's structure contains one field:
68  * <itemizedlist>
69  * <listitem>
70  *   <para>
71  *   #guint64
72  *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
73  *   expired when waiting for data.
74  *   </para>
75  * </listitem>
76  * </itemizedlist>
77  * The message is typically used to detect that no UDP arrives in the receiver
78  * because it is blocked by a firewall.
79  * </para>
80  * <para>
81  * A custom file descriptor can be configured with the
82  * #GstUDPSrc:sockfd property. The socket will be closed when setting the
83  * element to READY by default. This behaviour can be
84  * overriden with the #GstUDPSrc:closefd property, in which case the application
85  * is responsible for closing the file descriptor.
86  *
87  * <refsect2>
88  * <title>Examples</title>
89  * |[
90  * gst-launch -v udpsrc ! fakesink dump=1
91  * ]| A pipeline to read from the default port and dump the udp packets.
92  * To actually generate udp packets on the default port one can use the
93  * udpsink element. When running the following pipeline in another terminal, the
94  * above mentioned pipeline should dump data packets to the console.
95  * |[
96  * gst-launch -v audiotestsrc ! udpsink
97  * ]|
98  * |[
99  * gst-launch -v udpsrc port=0 ! fakesink
100  * ]| read udp packets from a free port.
101  * </refsect2>
102  *
103  * Last reviewed on 2007-09-20 (0.10.7)
104  */
105 #ifdef HAVE_CONFIG_H
106 #include "config.h"
107 #endif
108
109 #include "gstudpsrc.h"
110 #ifdef HAVE_UNISTD_H
111 #include <unistd.h>
112 #endif
113 #include <stdlib.h>
114
115 #if defined _MSC_VER && (_MSC_VER >= 1400)
116 #include <io.h>
117 #endif
118
119 #include <gst/netbuffer/gstnetbuffer.h>
120
121 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
122 #include <sys/filio.h>
123 #endif
124
125 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
126 #define GST_CAT_DEFAULT (udpsrc_debug)
127
128 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
129     GST_PAD_SRC,
130     GST_PAD_ALWAYS,
131     GST_STATIC_CAPS_ANY);
132
133 #define UDP_DEFAULT_PORT                4951
134 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
135 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
136 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
137 #define UDP_DEFAULT_CAPS                NULL
138 #define UDP_DEFAULT_SOCKFD              -1
139 #define UDP_DEFAULT_BUFFER_SIZE         0
140 #define UDP_DEFAULT_TIMEOUT             0
141 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
142 #define UDP_DEFAULT_CLOSEFD            TRUE
143 #define UDP_DEFAULT_SOCK                -1
144 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
145 #define UDP_DEFAULT_REUSE              TRUE
146
147 enum
148 {
149   PROP_0,
150
151   PROP_PORT,
152   PROP_MULTICAST_GROUP,
153   PROP_MULTICAST_IFACE,
154   PROP_URI,
155   PROP_CAPS,
156   PROP_SOCKFD,
157   PROP_BUFFER_SIZE,
158   PROP_TIMEOUT,
159   PROP_SKIP_FIRST_BYTES,
160   PROP_CLOSEFD,
161   PROP_SOCK,
162   PROP_AUTO_MULTICAST,
163   PROP_REUSE,
164
165   PROP_LAST
166 };
167
168 #define CLOSE_IF_REQUESTED(udpctx)                                        \
169 G_STMT_START {                                                            \
170   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
171     CLOSE_SOCKET(udpctx->sock.fd);                                        \
172     if (udpctx->sock.fd == udpctx->sockfd)                                \
173       udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
174   }                                                                       \
175   udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
176 } G_STMT_END
177
178 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
179
180 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
181
182 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
183
184 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
185
186 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
187
188 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
189
190 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
191
192 static void gst_udpsrc_finalize (GObject * object);
193
194 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198
199 static void
200 _do_init (GType type)
201 {
202   static const GInterfaceInfo urihandler_info = {
203     gst_udpsrc_uri_handler_init,
204     NULL,
205     NULL
206   };
207
208   g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
209
210   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
211 }
212
213 GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
214     _do_init);
215
216 static void
217 gst_udpsrc_base_init (gpointer g_class)
218 {
219   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
220
221   gst_element_class_add_static_pad_template (element_class, &src_template);
222
223   gst_element_class_set_details_simple (element_class, "UDP packet receiver",
224       "Source/Network",
225       "Receive data over the network via UDP",
226       "Wim Taymans <wim@fluendo.com>, "
227       "Thijs Vermeir <thijs.vermeir@barco.com>");
228 }
229
230 static void
231 gst_udpsrc_class_init (GstUDPSrcClass * klass)
232 {
233   GObjectClass *gobject_class;
234   GstBaseSrcClass *gstbasesrc_class;
235   GstPushSrcClass *gstpushsrc_class;
236
237   gobject_class = (GObjectClass *) klass;
238   gstbasesrc_class = (GstBaseSrcClass *) klass;
239   gstpushsrc_class = (GstPushSrcClass *) klass;
240
241   gobject_class->set_property = gst_udpsrc_set_property;
242   gobject_class->get_property = gst_udpsrc_get_property;
243   gobject_class->finalize = gst_udpsrc_finalize;
244
245   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
246       g_param_spec_int ("port", "Port",
247           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
248           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
250       g_param_spec_string ("multicast-group", "Multicast Group",
251           "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
252           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
253   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
254       g_param_spec_string ("multicast-iface", "Multicast Interface",
255           "The network interface on which to join the multicast group",
256           UDP_DEFAULT_MULTICAST_IFACE,
257           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
258   g_object_class_install_property (gobject_class, PROP_URI,
259       g_param_spec_string ("uri", "URI",
260           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
261           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_CAPS,
263       g_param_spec_boxed ("caps", "Caps",
264           "The caps of the source pad", GST_TYPE_CAPS,
265           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   g_object_class_install_property (gobject_class, PROP_SOCKFD,
267       g_param_spec_int ("sockfd", "Socket Handle",
268           "Socket to use for UDP reception. (-1 == allocate)",
269           -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
270           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
271   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
272       g_param_spec_int ("buffer-size", "Buffer Size",
273           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
274           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
275   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
276       g_param_spec_uint64 ("timeout", "Timeout",
277           "Post a message after timeout microseconds (0 = disabled)", 0,
278           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
279           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
280   g_object_class_install_property (G_OBJECT_CLASS (klass),
281       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
282           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
283           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
284           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
286       g_param_spec_boolean ("closefd", "Close sockfd",
287           "Close sockfd if passed as property on state change",
288           UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289   g_object_class_install_property (gobject_class, PROP_SOCK,
290       g_param_spec_int ("sock", "Socket Handle",
291           "Socket currently in use for UDP reception. (-1 = no socket)",
292           -1, G_MAXINT, UDP_DEFAULT_SOCK,
293           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
294   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
295       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
296           "Automatically join/leave multicast groups",
297           UDP_DEFAULT_AUTO_MULTICAST,
298           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
299   g_object_class_install_property (gobject_class, PROP_REUSE,
300       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
301           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302
303   gstbasesrc_class->start = gst_udpsrc_start;
304   gstbasesrc_class->stop = gst_udpsrc_stop;
305   gstbasesrc_class->unlock = gst_udpsrc_unlock;
306   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
307   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
308
309   gstpushsrc_class->create = gst_udpsrc_create;
310 }
311
312 static void
313 gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
314 {
315   WSA_STARTUP (udpsrc);
316
317   gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
318       UDP_DEFAULT_PORT);
319
320   udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
321   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
322   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
323   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
324   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
325   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
326   udpsrc->externalfd = (udpsrc->sockfd != -1);
327   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
328   udpsrc->sock.fd = UDP_DEFAULT_SOCK;
329   udpsrc->reuse = UDP_DEFAULT_REUSE;
330
331   /* configure basesrc to be a live source */
332   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
333   /* make basesrc output a segment in time */
334   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
335   /* make basesrc set timestamps on outgoing buffers based on the running_time
336    * when they were captured */
337   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
338 }
339
340 static void
341 gst_udpsrc_finalize (GObject * object)
342 {
343   GstUDPSrc *udpsrc;
344
345   udpsrc = GST_UDPSRC (object);
346
347   if (udpsrc->caps)
348     gst_caps_unref (udpsrc->caps);
349
350   g_free (udpsrc->multi_iface);
351
352   gst_udp_uri_free (&udpsrc->uri);
353   g_free (udpsrc->uristr);
354
355   if (udpsrc->sockfd >= 0 && udpsrc->closefd)
356     CLOSE_SOCKET (udpsrc->sockfd);
357
358   WSA_CLEANUP (object);
359
360   G_OBJECT_CLASS (parent_class)->finalize (object);
361 }
362
363 static GstCaps *
364 gst_udpsrc_getcaps (GstBaseSrc * src)
365 {
366   GstUDPSrc *udpsrc;
367
368   udpsrc = GST_UDPSRC (src);
369
370   if (udpsrc->caps)
371     return gst_caps_ref (udpsrc->caps);
372   else
373     return gst_caps_new_any ();
374 }
375
376 /* read a message from the error queue */
377 static void
378 clear_error (GstUDPSrc * udpsrc)
379 {
380 #if defined (MSG_ERRQUEUE)
381   struct msghdr cmsg;
382   char cbuf[128];
383   char msgbuf[CMSG_SPACE (128)];
384   struct iovec iov;
385
386   /* Flush ERRORS from fd so next poll will not return at once */
387   /* No need for address : We look for local error */
388   cmsg.msg_name = NULL;
389   cmsg.msg_namelen = 0;
390
391   /* IOV */
392   memset (&cbuf, 0, sizeof (cbuf));
393   iov.iov_base = cbuf;
394   iov.iov_len = sizeof (cbuf);
395   cmsg.msg_iov = &iov;
396   cmsg.msg_iovlen = 1;
397
398   /* msg_control */
399   memset (&msgbuf, 0, sizeof (msgbuf));
400   cmsg.msg_control = &msgbuf;
401   cmsg.msg_controllen = sizeof (msgbuf);
402
403   recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
404 #endif
405 }
406
407 static GstFlowReturn
408 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
409 {
410   GstUDPSrc *udpsrc;
411   GstNetBuffer *outbuf;
412   union gst_sockaddr
413   {
414     struct sockaddr sa;
415     struct sockaddr_in sa_in;
416     struct sockaddr_in6 sa_in6;
417     struct sockaddr_storage sa_stor;
418   } sa;
419   socklen_t slen;
420   guint8 *pktdata;
421   gint pktsize;
422 #ifdef G_OS_UNIX
423   gint readsize;
424 #elif defined G_OS_WIN32
425   gulong readsize;
426 #endif
427   GstClockTime timeout;
428   gint ret;
429   gboolean try_again;
430
431   udpsrc = GST_UDPSRC_CAST (psrc);
432
433 retry:
434   /* quick check, avoid going in select when we already have data */
435   readsize = 0;
436   if (G_UNLIKELY ((ret =
437               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
438     goto ioctl_failed;
439
440   if (readsize > 0)
441     goto no_select;
442
443   if (udpsrc->timeout > 0) {
444     timeout = udpsrc->timeout * GST_USECOND;
445   } else {
446     timeout = GST_CLOCK_TIME_NONE;
447   }
448
449   do {
450     try_again = FALSE;
451
452     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
453         udpsrc->timeout);
454
455     ret = gst_poll_wait (udpsrc->fdset, timeout);
456     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
457     if (G_UNLIKELY (ret < 0)) {
458       if (errno == EBUSY)
459         goto stopped;
460 #ifdef G_OS_WIN32
461       if (WSAGetLastError () != WSAEINTR)
462         goto select_error;
463 #else
464       if (errno != EAGAIN && errno != EINTR)
465         goto select_error;
466 #endif
467       try_again = TRUE;
468     } else if (G_UNLIKELY (ret == 0)) {
469       /* timeout, post element message */
470       gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
471           gst_message_new_element (GST_OBJECT_CAST (udpsrc),
472               gst_structure_new ("GstUDPSrcTimeout",
473                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
474       try_again = TRUE;
475     }
476   } while (G_UNLIKELY (try_again));
477
478   /* ask how much is available for reading on the socket, this should be exactly
479    * one UDP packet. We will check the return value, though, because in some
480    * case it can return 0 and we don't want a 0 sized buffer. */
481   readsize = 0;
482   if (G_UNLIKELY ((ret =
483               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
484     goto ioctl_failed;
485
486   /* if we get here and there is nothing to read from the socket, the select got
487    * woken up by activity on the socket but it was not a read. We know someone
488    * will also do something with the socket so that we don't go into an infinite
489    * loop in the select(). */
490   if (G_UNLIKELY (!readsize)) {
491     clear_error (udpsrc);
492     goto retry;
493   }
494
495 no_select:
496   GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
497
498   pktdata = g_malloc (readsize);
499   pktsize = readsize;
500
501   while (TRUE) {
502     slen = sizeof (sa);
503 #ifdef G_OS_WIN32
504     ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
505         &slen);
506 #else
507     ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
508 #endif
509     if (G_UNLIKELY (ret < 0)) {
510 #ifdef G_OS_WIN32
511       /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
512        * generated a "port unreachable" ICMP response. We ignore that and try
513        * again. */
514       if (WSAGetLastError () == WSAECONNRESET) {
515         g_free (pktdata);
516         pktdata = NULL;
517         goto retry;
518       }
519       if (WSAGetLastError () != WSAEINTR)
520         goto receive_error;
521 #else
522       if (errno != EAGAIN && errno != EINTR)
523         goto receive_error;
524 #endif
525     } else
526       break;
527   }
528
529   /* special case buffer so receivers can also track the address */
530   outbuf = gst_netbuffer_new ();
531   GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
532
533   /* patch pktdata and len when stripping off the headers */
534   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
535     if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes))
536       goto skip_error;
537
538     pktdata += udpsrc->skip_first_bytes;
539     ret -= udpsrc->skip_first_bytes;
540   }
541   GST_BUFFER_DATA (outbuf) = pktdata;
542   GST_BUFFER_SIZE (outbuf) = ret;
543
544   switch (sa.sa.sa_family) {
545     case AF_INET:
546     {
547       gst_netaddress_set_ip4_address (&outbuf->from, sa.sa_in.sin_addr.s_addr,
548           sa.sa_in.sin_port);
549     }
550       break;
551     case AF_INET6:
552     {
553       guint8 ip6[16];
554
555       memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
556       gst_netaddress_set_ip6_address (&outbuf->from, ip6, sa.sa_in6.sin6_port);
557     }
558       break;
559     default:
560 #ifdef G_OS_WIN32
561       WSASetLastError (WSAEAFNOSUPPORT);
562 #else
563       errno = EAFNOSUPPORT;
564 #endif
565       goto receive_error;
566   }
567   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
568
569   *buf = GST_BUFFER_CAST (outbuf);
570
571   return GST_FLOW_OK;
572
573   /* ERRORS */
574 select_error:
575   {
576     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
577         ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
578     return GST_FLOW_ERROR;
579   }
580 stopped:
581   {
582     GST_DEBUG ("stop called");
583     return GST_FLOW_WRONG_STATE;
584   }
585 ioctl_failed:
586   {
587     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
588         ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
589     return GST_FLOW_ERROR;
590   }
591 receive_error:
592   {
593     g_free (pktdata);
594 #ifdef G_OS_WIN32
595     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
596         ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
597 #else
598     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
599         ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
600 #endif
601     return GST_FLOW_ERROR;
602   }
603 skip_error:
604   {
605     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
606         ("UDP buffer to small to skip header"));
607     return GST_FLOW_ERROR;
608   }
609 }
610
611 static gboolean
612 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
613 {
614   if (gst_udp_parse_uri (uri, &src->uri) < 0)
615     goto wrong_uri;
616
617   if (src->uri.port == -1)
618     src->uri.port = UDP_DEFAULT_PORT;
619
620   return TRUE;
621
622   /* ERRORS */
623 wrong_uri:
624   {
625     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
626         ("error parsing uri %s", uri));
627     return FALSE;
628   }
629 }
630
631 static void
632 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
633     GParamSpec * pspec)
634 {
635   GstUDPSrc *udpsrc = GST_UDPSRC (object);
636
637   switch (prop_id) {
638     case PROP_BUFFER_SIZE:
639       udpsrc->buffer_size = g_value_get_int (value);
640       break;
641     case PROP_PORT:
642       gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
643       break;
644     case PROP_MULTICAST_GROUP:
645     {
646       const gchar *group;
647
648       if ((group = g_value_get_string (value)))
649         gst_udp_uri_update (&udpsrc->uri, group, -1);
650       else
651         gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
652       break;
653     }
654     case PROP_MULTICAST_IFACE:
655       g_free (udpsrc->multi_iface);
656
657       if (g_value_get_string (value) == NULL)
658         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
659       else
660         udpsrc->multi_iface = g_value_dup_string (value);
661       break;
662     case PROP_URI:
663       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
664       break;
665     case PROP_CAPS:
666     {
667       const GstCaps *new_caps_val = gst_value_get_caps (value);
668
669       GstCaps *new_caps;
670
671       GstCaps *old_caps;
672
673       if (new_caps_val == NULL) {
674         new_caps = gst_caps_new_any ();
675       } else {
676         new_caps = gst_caps_copy (new_caps_val);
677       }
678
679       old_caps = udpsrc->caps;
680       udpsrc->caps = new_caps;
681       if (old_caps)
682         gst_caps_unref (old_caps);
683       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
684       break;
685     }
686     case PROP_SOCKFD:
687       if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
688           udpsrc->closefd)
689         CLOSE_SOCKET (udpsrc->sockfd);
690       udpsrc->sockfd = g_value_get_int (value);
691       GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
692       break;
693     case PROP_TIMEOUT:
694       udpsrc->timeout = g_value_get_uint64 (value);
695       break;
696     case PROP_SKIP_FIRST_BYTES:
697       udpsrc->skip_first_bytes = g_value_get_int (value);
698       break;
699     case PROP_CLOSEFD:
700       udpsrc->closefd = g_value_get_boolean (value);
701       break;
702     case PROP_AUTO_MULTICAST:
703       udpsrc->auto_multicast = g_value_get_boolean (value);
704       break;
705     case PROP_REUSE:
706       udpsrc->reuse = g_value_get_boolean (value);
707       break;
708     default:
709       break;
710   }
711 }
712
713 static void
714 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
715     GParamSpec * pspec)
716 {
717   GstUDPSrc *udpsrc = GST_UDPSRC (object);
718
719   switch (prop_id) {
720     case PROP_BUFFER_SIZE:
721       g_value_set_int (value, udpsrc->buffer_size);
722       break;
723     case PROP_PORT:
724       g_value_set_int (value, udpsrc->uri.port);
725       break;
726     case PROP_MULTICAST_GROUP:
727       g_value_set_string (value, udpsrc->uri.host);
728       break;
729     case PROP_MULTICAST_IFACE:
730       g_value_set_string (value, udpsrc->multi_iface);
731       break;
732     case PROP_URI:
733       g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
734       break;
735     case PROP_CAPS:
736       gst_value_set_caps (value, udpsrc->caps);
737       break;
738     case PROP_SOCKFD:
739       g_value_set_int (value, udpsrc->sockfd);
740       break;
741     case PROP_TIMEOUT:
742       g_value_set_uint64 (value, udpsrc->timeout);
743       break;
744     case PROP_SKIP_FIRST_BYTES:
745       g_value_set_int (value, udpsrc->skip_first_bytes);
746       break;
747     case PROP_CLOSEFD:
748       g_value_set_boolean (value, udpsrc->closefd);
749       break;
750     case PROP_SOCK:
751       g_value_set_int (value, udpsrc->sock.fd);
752       break;
753     case PROP_AUTO_MULTICAST:
754       g_value_set_boolean (value, udpsrc->auto_multicast);
755       break;
756     case PROP_REUSE:
757       g_value_set_boolean (value, udpsrc->reuse);
758       break;
759     default:
760       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
761       break;
762   }
763 }
764
765 /* create a socket for sending to remote machine */
766 static gboolean
767 gst_udpsrc_start (GstBaseSrc * bsrc)
768 {
769   guint bc_val;
770   guint err_val;
771   gint reuse;
772   int port;
773   GstUDPSrc *src;
774   gint ret;
775   int rcvsize;
776   struct sockaddr_storage bind_address;
777   socklen_t len;
778   src = GST_UDPSRC (bsrc);
779
780   if (src->sockfd == -1) {
781     /* need to allocate a socket */
782     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
783         src->uri.port);
784     if ((ret =
785             gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
786       goto getaddrinfo_error;
787
788     if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
789       goto no_socket;
790
791     src->sock.fd = ret;
792     src->externalfd = FALSE;
793
794     GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
795
796     GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
797     reuse = src->reuse ? 1 : 0;
798     if ((ret =
799             setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
800                 sizeof (reuse))) < 0)
801       goto setsockopt_error;
802
803     GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
804
805     /* Take a temporary copy of the address in case we need to fix it for bind */
806     memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
807
808 #ifdef G_OS_WIN32
809     /* Windows does not allow binding to a multicast group so fix source address */
810     if (gst_udp_is_multicast (&src->myaddr)) {
811       switch (((struct sockaddr *) &bind_address)->sa_family) {
812         case AF_INET:
813           ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
814               htonl (INADDR_ANY);
815           break;
816         case AF_INET6:
817           ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
818           break;
819         default:
820           break;
821       }
822     }
823 #endif
824
825     len = gst_udp_get_sockaddr_length (&bind_address);
826     if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
827       goto bind_error;
828
829     if (!gst_udp_is_multicast (&src->myaddr)) {
830       len = sizeof (src->myaddr);
831       if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
832                   &len)) < 0)
833         goto getsockname_error;
834     }
835   } else {
836     GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
837     /* we use the configured socket, try to get some info about it */
838     len = sizeof (src->myaddr);
839     if ((ret =
840             getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
841                 &len)) < 0)
842       goto getsockname_error;
843
844     src->sock.fd = src->sockfd;
845     src->externalfd = TRUE;
846   }
847
848   len = sizeof (rcvsize);
849   if (src->buffer_size != 0) {
850     rcvsize = src->buffer_size;
851
852     GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
853     /* set buffer size, Note that on Linux this is typically limited to a
854      * maximum of around 100K. Also a minimum of 128 bytes is required on
855      * Linux. */
856     ret =
857         setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
858         len);
859     if (ret != 0) {
860       GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
861           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
862               rcvsize, ret, g_strerror (errno), errno));
863     }
864   }
865
866   /* read the value of the receive buffer. Note that on linux this returns 2x the
867    * value we set because the kernel allocates extra memory for metadata.
868    * The default on Linux is about 100K (which is about 50K without metadata) */
869   ret =
870       getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
871   if (ret == 0)
872     GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
873   else
874     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
875
876   bc_val = 1;
877   if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
878               sizeof (bc_val))) < 0) {
879     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
880         ("could not configure socket for broadcast %d: %s (%d)", ret,
881             g_strerror (errno), errno));
882   }
883
884   /* Accept ERRQUEUE to get and flush icmp errors */
885   err_val = 1;
886 #if defined (IP_RECVERR)
887   if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
888               sizeof (err_val))) < 0) {
889     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
890         ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
891             g_strerror (errno), errno));
892   }
893 #endif
894
895   if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
896     GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
897     ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
898     if (ret < 0)
899       goto membership;
900   }
901
902   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
903    * follows ss_family on both */
904   port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
905   GST_DEBUG_OBJECT (src, "bound, on port %d", port);
906   if (port != src->uri.port) {
907     src->uri.port = port;
908     GST_DEBUG_OBJECT (src, "notifying port %d", port);
909     g_object_notify (G_OBJECT (src), "port");
910   }
911
912   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
913     goto no_fdset;
914
915   gst_poll_add_fd (src->fdset, &src->sock);
916   gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
917
918   return TRUE;
919
920   /* ERRORS */
921 getaddrinfo_error:
922   {
923     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
924         ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
925     return FALSE;
926   }
927 no_socket:
928   {
929     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
930         ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
931     return FALSE;
932   }
933 setsockopt_error:
934   {
935     CLOSE_IF_REQUESTED (src);
936     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
937         ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
938     return FALSE;
939   }
940 bind_error:
941   {
942     CLOSE_IF_REQUESTED (src);
943     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
944         ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
945     return FALSE;
946   }
947 membership:
948   {
949     CLOSE_IF_REQUESTED (src);
950     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
951         ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
952     return FALSE;
953   }
954 getsockname_error:
955   {
956     CLOSE_IF_REQUESTED (src);
957     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
958         ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
959     return FALSE;
960   }
961 no_fdset:
962   {
963     CLOSE_IF_REQUESTED (src);
964     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
965         ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
966             errno));
967     return FALSE;
968   }
969 }
970
971 static gboolean
972 gst_udpsrc_unlock (GstBaseSrc * bsrc)
973 {
974   GstUDPSrc *src;
975
976   src = GST_UDPSRC (bsrc);
977
978   GST_LOG_OBJECT (src, "Flushing");
979   gst_poll_set_flushing (src->fdset, TRUE);
980
981   return TRUE;
982 }
983
984 static gboolean
985 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
986 {
987   GstUDPSrc *src;
988
989   src = GST_UDPSRC (bsrc);
990
991   GST_LOG_OBJECT (src, "No longer flushing");
992   gst_poll_set_flushing (src->fdset, FALSE);
993
994   return TRUE;
995 }
996
997 static gboolean
998 gst_udpsrc_stop (GstBaseSrc * bsrc)
999 {
1000   GstUDPSrc *src;
1001
1002   src = GST_UDPSRC (bsrc);
1003
1004   GST_DEBUG ("stopping, closing sockets");
1005
1006   if (src->sock.fd >= 0) {
1007     if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
1008       GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
1009       gst_udp_leave_group (src->sock.fd, &src->myaddr);
1010     }
1011     CLOSE_IF_REQUESTED (src);
1012   }
1013
1014   if (src->fdset) {
1015     gst_poll_free (src->fdset);
1016     src->fdset = NULL;
1017   }
1018
1019   return TRUE;
1020 }
1021
1022 /*** GSTURIHANDLER INTERFACE *************************************************/
1023
1024 static GstURIType
1025 gst_udpsrc_uri_get_type (void)
1026 {
1027   return GST_URI_SRC;
1028 }
1029
1030 static gchar **
1031 gst_udpsrc_uri_get_protocols (void)
1032 {
1033   static gchar *protocols[] = { (char *) "udp", NULL };
1034
1035   return protocols;
1036 }
1037
1038 static const gchar *
1039 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1040 {
1041   GstUDPSrc *src = GST_UDPSRC (handler);
1042
1043   g_free (src->uristr);
1044   src->uristr = gst_udp_uri_string (&src->uri);
1045
1046   return src->uristr;
1047 }
1048
1049 static gboolean
1050 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1051 {
1052   gboolean ret;
1053
1054   GstUDPSrc *src = GST_UDPSRC (handler);
1055
1056   ret = gst_udpsrc_set_uri (src, uri);
1057
1058   return ret;
1059 }
1060
1061 static void
1062 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1063 {
1064   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1065
1066   iface->get_type = gst_udpsrc_uri_get_type;
1067   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1068   iface->get_uri = gst_udpsrc_uri_get_uri;
1069   iface->set_uri = gst_udpsrc_uri_set_uri;
1070 }