Tizen 2.0 Release
[framework/multimedia/gst-plugins-good0.10.git] / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-udpsrc
23  * @see_also: udpsink, multifdsink
24  *
25  * udpsrc is a network source that reads UDP packets from the network.
26  * It can be combined with RTP depayloaders to implement RTP streaming.
27  *
28  * The udpsrc element supports automatic port allocation by setting the
29  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
30  * allocated port can be obtained by reading the port property.
31  *
32  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
33  * property to the IP address of the multicast group.
34  *
35  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
36  * property, udpsrc will then not allocate a socket itself but use the provided
37  * one.
38  *
39  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
40  * so that they can be autoplugged in GStreamer pipelines. This is very usefull
41  * for RTP implementations where the contents of the UDP packets is transfered
42  * out-of-bounds using SDP or other means.
43  *
44  * The #GstUDPSrc:buffer-size property is used to change the default kernel
45  * buffersizes used for receiving packets. The buffer size may be increased for
46  * high-volume connections, or may be decreased to limit the possible backlog of
47  * incoming data. The system places an absolute limit on these values, on Linux,
48  * for example, the default buffer size is typically 50K and can be increased to
49  * maximally 100K.
50  *
51  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
52  * number of bytes from the start of the raw udp packet and can be used to strip
53  * off proprietary header, for example.
54  *
55  * The udpsrc is always a live source. It does however not provide a #GstClock,
56  * this is left for upstream elements such as an RTP session manager or demuxer
57  * (such as an MPEG demuxer). As with all live sources, the captured buffers
58  * will have their timestamp set to the current running time of the pipeline.
59  *
60  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
61  * type URIs.
62  *
63  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
64  * will generate an element message named
65  * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
66  * if no data was recieved in the given timeout.
67  * The message's structure contains one field:
68  * <itemizedlist>
69  * <listitem>
70  *   <para>
71  *   #guint64
72  *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
73  *   expired when waiting for data.
74  *   </para>
75  * </listitem>
76  * </itemizedlist>
77  * The message is typically used to detect that no UDP arrives in the receiver
78  * because it is blocked by a firewall.
79  * </para>
80  * <para>
81  * A custom file descriptor can be configured with the
82  * #GstUDPSrc:sockfd property. The socket will be closed when setting the
83  * element to READY by default. This behaviour can be
84  * overriden with the #GstUDPSrc:closefd property, in which case the application
85  * is responsible for closing the file descriptor.
86  *
87  * <refsect2>
88  * <title>Examples</title>
89  * |[
90  * gst-launch -v udpsrc ! fakesink dump=1
91  * ]| A pipeline to read from the default port and dump the udp packets.
92  * To actually generate udp packets on the default port one can use the
93  * udpsink element. When running the following pipeline in another terminal, the
94  * above mentioned pipeline should dump data packets to the console.
95  * |[
96  * gst-launch -v audiotestsrc ! udpsink
97  * ]|
98  * |[
99  * gst-launch -v udpsrc port=0 ! fakesink
100  * ]| read udp packets from a free port.
101  * </refsect2>
102  *
103  * Last reviewed on 2007-09-20 (0.10.7)
104  */
105 #ifdef HAVE_CONFIG_H
106 #include "config.h"
107 #endif
108
109 #include "gstudpsrc.h"
110 #ifdef HAVE_UNISTD_H
111 #include <unistd.h>
112 #endif
113 #include <stdlib.h>
114
115 #if defined _MSC_VER && (_MSC_VER >= 1400)
116 #include <io.h>
117 #endif
118
119 #include <gst/netbuffer/gstnetbuffer.h>
120
121 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
122 #include <sys/filio.h>
123 #endif
124
125 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
126 #define GST_CAT_DEFAULT (udpsrc_debug)
127
128 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
129     GST_PAD_SRC,
130     GST_PAD_ALWAYS,
131     GST_STATIC_CAPS_ANY);
132
133 #define UDP_DEFAULT_PORT                4951
134 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
135 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
136 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
137 #define UDP_DEFAULT_CAPS                NULL
138 #define UDP_DEFAULT_SOCKFD              -1
139 #define UDP_DEFAULT_BUFFER_SIZE         0
140 #define UDP_DEFAULT_TIMEOUT             0
141 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
142 #define UDP_DEFAULT_CLOSEFD            TRUE
143 #define UDP_DEFAULT_SOCK                -1
144 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
145 #define UDP_DEFAULT_REUSE              TRUE
146
147 enum
148 {
149   PROP_0,
150
151   PROP_PORT,
152   PROP_MULTICAST_GROUP,
153   PROP_MULTICAST_IFACE,
154   PROP_URI,
155   PROP_CAPS,
156   PROP_SOCKFD,
157   PROP_BUFFER_SIZE,
158   PROP_TIMEOUT,
159   PROP_SKIP_FIRST_BYTES,
160   PROP_CLOSEFD,
161   PROP_SOCK,
162   PROP_AUTO_MULTICAST,
163   PROP_REUSE,
164
165   PROP_LAST
166 };
167
168 #define CLOSE_IF_REQUESTED(udpctx)                                        \
169 G_STMT_START {                                                            \
170   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
171     CLOSE_SOCKET(udpctx->sock.fd);                                        \
172     if (udpctx->sock.fd == udpctx->sockfd)                                \
173       udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
174   }                                                                       \
175   udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
176 } G_STMT_END
177
178 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
179
180 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
181
182 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
183
184 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
185
186 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
187
188 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
189
190 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
191
192 static void gst_udpsrc_finalize (GObject * object);
193
194 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198
199 static void
200 _do_init (GType type)
201 {
202   static const GInterfaceInfo urihandler_info = {
203     gst_udpsrc_uri_handler_init,
204     NULL,
205     NULL
206   };
207
208   g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
209
210   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
211 }
212
213 GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
214     _do_init);
215
216 static void
217 gst_udpsrc_base_init (gpointer g_class)
218 {
219   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
220
221   gst_element_class_add_static_pad_template (element_class, &src_template);
222
223   gst_element_class_set_details_simple (element_class, "UDP packet receiver",
224       "Source/Network",
225       "Receive data over the network via UDP",
226       "Wim Taymans <wim@fluendo.com>, "
227       "Thijs Vermeir <thijs.vermeir@barco.com>");
228 }
229
230 static void
231 gst_udpsrc_class_init (GstUDPSrcClass * klass)
232 {
233   GObjectClass *gobject_class;
234   GstBaseSrcClass *gstbasesrc_class;
235   GstPushSrcClass *gstpushsrc_class;
236
237   gobject_class = (GObjectClass *) klass;
238   gstbasesrc_class = (GstBaseSrcClass *) klass;
239   gstpushsrc_class = (GstPushSrcClass *) klass;
240
241   gobject_class->set_property = gst_udpsrc_set_property;
242   gobject_class->get_property = gst_udpsrc_get_property;
243   gobject_class->finalize = gst_udpsrc_finalize;
244
245   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
246       g_param_spec_int ("port", "Port",
247           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
248           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
250       g_param_spec_string ("multicast-group", "Multicast Group",
251           "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
252           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
253   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
254       g_param_spec_string ("multicast-iface", "Multicast Interface",
255           "The network interface on which to join the multicast group",
256           UDP_DEFAULT_MULTICAST_IFACE,
257           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
258   g_object_class_install_property (gobject_class, PROP_URI,
259       g_param_spec_string ("uri", "URI",
260           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
261           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_CAPS,
263       g_param_spec_boxed ("caps", "Caps",
264           "The caps of the source pad", GST_TYPE_CAPS,
265           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   g_object_class_install_property (gobject_class, PROP_SOCKFD,
267       g_param_spec_int ("sockfd", "Socket Handle",
268           "Socket to use for UDP reception. (-1 == allocate)",
269           -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
270           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
271   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
272       g_param_spec_int ("buffer-size", "Buffer Size",
273           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
274           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
275   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
276       g_param_spec_uint64 ("timeout", "Timeout",
277           "Post a message after timeout microseconds (0 = disabled)", 0,
278           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
279           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
280   g_object_class_install_property (G_OBJECT_CLASS (klass),
281       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
282           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
283           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
284           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
286       g_param_spec_boolean ("closefd", "Close sockfd",
287           "Close sockfd if passed as property on state change",
288           UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289   g_object_class_install_property (gobject_class, PROP_SOCK,
290       g_param_spec_int ("sock", "Socket Handle",
291           "Socket currently in use for UDP reception. (-1 = no socket)",
292           -1, G_MAXINT, UDP_DEFAULT_SOCK,
293           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
294   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
295       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
296           "Automatically join/leave multicast groups",
297           UDP_DEFAULT_AUTO_MULTICAST,
298           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
299   g_object_class_install_property (gobject_class, PROP_REUSE,
300       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
301           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302
303   gstbasesrc_class->start = gst_udpsrc_start;
304   gstbasesrc_class->stop = gst_udpsrc_stop;
305   gstbasesrc_class->unlock = gst_udpsrc_unlock;
306   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
307   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
308
309   gstpushsrc_class->create = gst_udpsrc_create;
310 }
311
312 static void
313 gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
314 {
315   WSA_STARTUP (udpsrc);
316
317   gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
318       UDP_DEFAULT_PORT);
319
320   udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
321   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
322   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
323   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
324   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
325   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
326   udpsrc->externalfd = (udpsrc->sockfd != -1);
327   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
328   udpsrc->sock.fd = UDP_DEFAULT_SOCK;
329   udpsrc->reuse = UDP_DEFAULT_REUSE;
330
331   /* configure basesrc to be a live source */
332   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
333   /* make basesrc output a segment in time */
334   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
335   /* make basesrc set timestamps on outgoing buffers based on the running_time
336    * when they were captured */
337   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
338 }
339
340 static void
341 gst_udpsrc_finalize (GObject * object)
342 {
343   GstUDPSrc *udpsrc;
344
345   udpsrc = GST_UDPSRC (object);
346
347   if (udpsrc->caps)
348     gst_caps_unref (udpsrc->caps);
349
350   g_free (udpsrc->multi_iface);
351
352   gst_udp_uri_free (&udpsrc->uri);
353   g_free (udpsrc->uristr);
354
355   if (udpsrc->sockfd >= 0 && udpsrc->closefd)
356     CLOSE_SOCKET (udpsrc->sockfd);
357
358   WSA_CLEANUP (object);
359
360   G_OBJECT_CLASS (parent_class)->finalize (object);
361 }
362
363 static GstCaps *
364 gst_udpsrc_getcaps (GstBaseSrc * src)
365 {
366   GstUDPSrc *udpsrc;
367
368   udpsrc = GST_UDPSRC (src);
369
370   if (udpsrc->caps)
371     return gst_caps_ref (udpsrc->caps);
372   else
373     return gst_caps_new_any ();
374 }
375
376 /* read a message from the error queue */
377 static void
378 clear_error (GstUDPSrc * udpsrc)
379 {
380 #if defined (MSG_ERRQUEUE)
381   struct msghdr cmsg;
382   char cbuf[128];
383   char msgbuf[CMSG_SPACE (128)];
384   struct iovec iov;
385
386   /* Flush ERRORS from fd so next poll will not return at once */
387   /* No need for address : We look for local error */
388   cmsg.msg_name = NULL;
389   cmsg.msg_namelen = 0;
390
391   /* IOV */
392   memset (&cbuf, 0, sizeof (cbuf));
393   iov.iov_base = cbuf;
394   iov.iov_len = sizeof (cbuf);
395   cmsg.msg_iov = &iov;
396   cmsg.msg_iovlen = 1;
397
398   /* msg_control */
399   memset (&msgbuf, 0, sizeof (msgbuf));
400   cmsg.msg_control = &msgbuf;
401   cmsg.msg_controllen = sizeof (msgbuf);
402
403   recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
404 #endif
405 }
406
407 static GstFlowReturn
408 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
409 {
410   GstUDPSrc *udpsrc;
411   GstNetBuffer *outbuf;
412   union gst_sockaddr
413   {
414     struct sockaddr sa;
415     struct sockaddr_in sa_in;
416     struct sockaddr_in6 sa_in6;
417     struct sockaddr_storage sa_stor;
418   } sa;
419   socklen_t slen;
420   guint8 *pktdata;
421   gint pktsize;
422 #ifdef G_OS_UNIX
423   gint readsize;
424 #elif defined G_OS_WIN32
425   gulong readsize;
426 #endif
427   GstClockTime timeout;
428   gint ret;
429   gboolean try_again;
430
431   udpsrc = GST_UDPSRC_CAST (psrc);
432
433 retry:
434   /* quick check, avoid going in select when we already have data */
435   readsize = 0;
436   if (G_UNLIKELY ((ret =
437               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
438     goto ioctl_failed;
439
440   if (readsize > 0)
441     goto no_select;
442
443   if (udpsrc->timeout > 0) {
444     timeout = udpsrc->timeout * GST_USECOND;
445   } else {
446     timeout = GST_CLOCK_TIME_NONE;
447   }
448
449   do {
450     try_again = FALSE;
451
452     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
453         udpsrc->timeout);
454
455     ret = gst_poll_wait (udpsrc->fdset, timeout);
456     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
457     if (G_UNLIKELY (ret < 0)) {
458       if (errno == EBUSY)
459         goto stopped;
460 #ifdef G_OS_WIN32
461       if (WSAGetLastError () != WSAEINTR)
462         goto select_error;
463 #else
464       if (errno != EAGAIN && errno != EINTR)
465         goto select_error;
466 #endif
467       try_again = TRUE;
468     } else if (G_UNLIKELY (ret == 0)) {
469       /* timeout, post element message */
470       gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
471           gst_message_new_element (GST_OBJECT_CAST (udpsrc),
472               gst_structure_new ("GstUDPSrcTimeout",
473                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
474       try_again = TRUE;
475     }
476   } while (G_UNLIKELY (try_again));
477
478   /* ask how much is available for reading on the socket, this should be exactly
479    * one UDP packet. We will check the return value, though, because in some
480    * case it can return 0 and we don't want a 0 sized buffer. */
481   readsize = 0;
482   if (G_UNLIKELY ((ret =
483               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
484     goto ioctl_failed;
485
486   /* If we get here and the readsize is zero, then either select was woken up
487    * by activity that is not a read, or a poll error occurred, or a UDP packet
488    * was received that has no data. Since we cannot identify which case it is,
489    * we handle all of them. This could possibly lead to a UDP packet getting
490    * lost, but since UDP is not reliable, we can accept this. */
491   if (G_UNLIKELY (!readsize)) {
492     /* try to read a packet (and it will be ignored),
493      * in case a packet with no data arrived */
494     slen = sizeof (sa);
495     recvfrom (udpsrc->sock.fd, (char *) &slen, 0, 0, &sa.sa, &slen);
496
497     /* clear any error, in case a poll error occurred */
498     clear_error (udpsrc);
499
500     /* poll again */
501     goto retry;
502   }
503
504 no_select:
505   GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
506
507   pktdata = g_malloc (readsize);
508   pktsize = readsize;
509
510   while (TRUE) {
511     slen = sizeof (sa);
512 #ifdef G_OS_WIN32
513     ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
514         &slen);
515 #else
516     ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
517 #endif
518     if (G_UNLIKELY (ret < 0)) {
519 #ifdef G_OS_WIN32
520       /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
521        * generated a "port unreachable" ICMP response. We ignore that and try
522        * again. */
523       if (WSAGetLastError () == WSAECONNRESET) {
524         g_free (pktdata);
525         pktdata = NULL;
526         goto retry;
527       }
528       if (WSAGetLastError () != WSAEINTR)
529         goto receive_error;
530 #else
531       if (errno != EAGAIN && errno != EINTR)
532         goto receive_error;
533 #endif
534     } else
535       break;
536   }
537
538   /* special case buffer so receivers can also track the address */
539   outbuf = gst_netbuffer_new ();
540   GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
541
542   /* patch pktdata and len when stripping off the headers */
543   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
544     if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes))
545       goto skip_error;
546
547     pktdata += udpsrc->skip_first_bytes;
548     ret -= udpsrc->skip_first_bytes;
549   }
550   GST_BUFFER_DATA (outbuf) = pktdata;
551   GST_BUFFER_SIZE (outbuf) = ret;
552
553   switch (sa.sa.sa_family) {
554     case AF_INET:
555     {
556       gst_netaddress_set_ip4_address (&outbuf->from, sa.sa_in.sin_addr.s_addr,
557           sa.sa_in.sin_port);
558     }
559       break;
560     case AF_INET6:
561     {
562       guint8 ip6[16];
563
564       memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
565       gst_netaddress_set_ip6_address (&outbuf->from, ip6, sa.sa_in6.sin6_port);
566     }
567       break;
568     default:
569 #ifdef G_OS_WIN32
570       WSASetLastError (WSAEAFNOSUPPORT);
571 #else
572       errno = EAFNOSUPPORT;
573 #endif
574       goto receive_error;
575   }
576   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
577
578   *buf = GST_BUFFER_CAST (outbuf);
579
580   return GST_FLOW_OK;
581
582   /* ERRORS */
583 select_error:
584   {
585     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
586         ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
587     return GST_FLOW_ERROR;
588   }
589 stopped:
590   {
591     GST_DEBUG ("stop called");
592     return GST_FLOW_WRONG_STATE;
593   }
594 ioctl_failed:
595   {
596     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
597         ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
598     return GST_FLOW_ERROR;
599   }
600 receive_error:
601   {
602     g_free (pktdata);
603 #ifdef G_OS_WIN32
604     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
605         ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
606 #else
607     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
608         ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
609 #endif
610     return GST_FLOW_ERROR;
611   }
612 skip_error:
613   {
614     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
615         ("UDP buffer to small to skip header"));
616     return GST_FLOW_ERROR;
617   }
618 }
619
620 static gboolean
621 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
622 {
623   if (gst_udp_parse_uri (uri, &src->uri) < 0)
624     goto wrong_uri;
625
626   if (src->uri.port == -1)
627     src->uri.port = UDP_DEFAULT_PORT;
628
629   return TRUE;
630
631   /* ERRORS */
632 wrong_uri:
633   {
634     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
635         ("error parsing uri %s", uri));
636     return FALSE;
637   }
638 }
639
640 static void
641 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
642     GParamSpec * pspec)
643 {
644   GstUDPSrc *udpsrc = GST_UDPSRC (object);
645
646   switch (prop_id) {
647     case PROP_BUFFER_SIZE:
648       udpsrc->buffer_size = g_value_get_int (value);
649       break;
650     case PROP_PORT:
651       gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
652       break;
653     case PROP_MULTICAST_GROUP:
654     {
655       const gchar *group;
656
657       if ((group = g_value_get_string (value)))
658         gst_udp_uri_update (&udpsrc->uri, group, -1);
659       else
660         gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
661       break;
662     }
663     case PROP_MULTICAST_IFACE:
664       g_free (udpsrc->multi_iface);
665
666       if (g_value_get_string (value) == NULL)
667         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
668       else
669         udpsrc->multi_iface = g_value_dup_string (value);
670       break;
671     case PROP_URI:
672       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
673       break;
674     case PROP_CAPS:
675     {
676       const GstCaps *new_caps_val = gst_value_get_caps (value);
677
678       GstCaps *new_caps;
679
680       GstCaps *old_caps;
681
682       if (new_caps_val == NULL) {
683         new_caps = gst_caps_new_any ();
684       } else {
685         new_caps = gst_caps_copy (new_caps_val);
686       }
687
688       old_caps = udpsrc->caps;
689       udpsrc->caps = new_caps;
690       if (old_caps)
691         gst_caps_unref (old_caps);
692       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
693       break;
694     }
695     case PROP_SOCKFD:
696       if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
697           udpsrc->closefd)
698         CLOSE_SOCKET (udpsrc->sockfd);
699       udpsrc->sockfd = g_value_get_int (value);
700       GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
701       break;
702     case PROP_TIMEOUT:
703       udpsrc->timeout = g_value_get_uint64 (value);
704       break;
705     case PROP_SKIP_FIRST_BYTES:
706       udpsrc->skip_first_bytes = g_value_get_int (value);
707       break;
708     case PROP_CLOSEFD:
709       udpsrc->closefd = g_value_get_boolean (value);
710       break;
711     case PROP_AUTO_MULTICAST:
712       udpsrc->auto_multicast = g_value_get_boolean (value);
713       break;
714     case PROP_REUSE:
715       udpsrc->reuse = g_value_get_boolean (value);
716       break;
717     default:
718       break;
719   }
720 }
721
722 static void
723 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
724     GParamSpec * pspec)
725 {
726   GstUDPSrc *udpsrc = GST_UDPSRC (object);
727
728   switch (prop_id) {
729     case PROP_BUFFER_SIZE:
730       g_value_set_int (value, udpsrc->buffer_size);
731       break;
732     case PROP_PORT:
733       g_value_set_int (value, udpsrc->uri.port);
734       break;
735     case PROP_MULTICAST_GROUP:
736       g_value_set_string (value, udpsrc->uri.host);
737       break;
738     case PROP_MULTICAST_IFACE:
739       g_value_set_string (value, udpsrc->multi_iface);
740       break;
741     case PROP_URI:
742       g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
743       break;
744     case PROP_CAPS:
745       gst_value_set_caps (value, udpsrc->caps);
746       break;
747     case PROP_SOCKFD:
748       g_value_set_int (value, udpsrc->sockfd);
749       break;
750     case PROP_TIMEOUT:
751       g_value_set_uint64 (value, udpsrc->timeout);
752       break;
753     case PROP_SKIP_FIRST_BYTES:
754       g_value_set_int (value, udpsrc->skip_first_bytes);
755       break;
756     case PROP_CLOSEFD:
757       g_value_set_boolean (value, udpsrc->closefd);
758       break;
759     case PROP_SOCK:
760       g_value_set_int (value, udpsrc->sock.fd);
761       break;
762     case PROP_AUTO_MULTICAST:
763       g_value_set_boolean (value, udpsrc->auto_multicast);
764       break;
765     case PROP_REUSE:
766       g_value_set_boolean (value, udpsrc->reuse);
767       break;
768     default:
769       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
770       break;
771   }
772 }
773
774 /* create a socket for sending to remote machine */
775 static gboolean
776 gst_udpsrc_start (GstBaseSrc * bsrc)
777 {
778   guint bc_val;
779   guint err_val;
780   gint reuse;
781   int port;
782   GstUDPSrc *src;
783   gint ret;
784   int rcvsize;
785   struct sockaddr_storage bind_address;
786   socklen_t len;
787   src = GST_UDPSRC (bsrc);
788
789   if (src->sockfd == -1) {
790     /* need to allocate a socket */
791     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
792         src->uri.port);
793     if ((ret =
794             gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
795       goto getaddrinfo_error;
796
797     if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
798       goto no_socket;
799
800     src->sock.fd = ret;
801     src->externalfd = FALSE;
802
803     GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
804
805     GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
806     reuse = src->reuse ? 1 : 0;
807     if ((ret =
808             setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
809                 sizeof (reuse))) < 0)
810       goto setsockopt_error;
811
812     GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
813
814     /* Take a temporary copy of the address in case we need to fix it for bind */
815     memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
816
817 #ifdef G_OS_WIN32
818     /* Windows does not allow binding to a multicast group so fix source address */
819     if (gst_udp_is_multicast (&src->myaddr)) {
820       switch (((struct sockaddr *) &bind_address)->sa_family) {
821         case AF_INET:
822           ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
823               htonl (INADDR_ANY);
824           break;
825         case AF_INET6:
826           ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
827           break;
828         default:
829           break;
830       }
831     }
832 #endif
833
834     len = gst_udp_get_sockaddr_length (&bind_address);
835     if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
836       goto bind_error;
837
838     if (!gst_udp_is_multicast (&src->myaddr)) {
839       len = sizeof (src->myaddr);
840       if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
841                   &len)) < 0)
842         goto getsockname_error;
843     }
844   } else {
845     GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
846     /* we use the configured socket, try to get some info about it */
847     len = sizeof (src->myaddr);
848     if ((ret =
849             getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
850                 &len)) < 0)
851       goto getsockname_error;
852
853     src->sock.fd = src->sockfd;
854     src->externalfd = TRUE;
855   }
856
857   len = sizeof (rcvsize);
858   if (src->buffer_size != 0) {
859     rcvsize = src->buffer_size;
860
861     GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
862     /* set buffer size, Note that on Linux this is typically limited to a
863      * maximum of around 100K. Also a minimum of 128 bytes is required on
864      * Linux. */
865     ret =
866         setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
867         len);
868     if (ret != 0) {
869       GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
870           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
871               rcvsize, ret, g_strerror (errno), errno));
872     }
873   }
874
875   /* read the value of the receive buffer. Note that on linux this returns 2x the
876    * value we set because the kernel allocates extra memory for metadata.
877    * The default on Linux is about 100K (which is about 50K without metadata) */
878   ret =
879       getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
880   if (ret == 0)
881     GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
882   else
883     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
884
885   bc_val = 1;
886   if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
887               sizeof (bc_val))) < 0) {
888     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
889         ("could not configure socket for broadcast %d: %s (%d)", ret,
890             g_strerror (errno), errno));
891   }
892
893   /* Accept ERRQUEUE to get and flush icmp errors */
894   err_val = 1;
895 #if defined (IP_RECVERR)
896   if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
897               sizeof (err_val))) < 0) {
898     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
899         ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
900             g_strerror (errno), errno));
901   }
902 #endif
903
904   if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
905     GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
906     ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
907     if (ret < 0)
908       goto membership;
909   }
910
911   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
912    * follows ss_family on both */
913   port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
914   GST_DEBUG_OBJECT (src, "bound, on port %d", port);
915   if (port != src->uri.port) {
916     src->uri.port = port;
917     GST_DEBUG_OBJECT (src, "notifying port %d", port);
918     g_object_notify (G_OBJECT (src), "port");
919   }
920
921   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
922     goto no_fdset;
923
924   gst_poll_add_fd (src->fdset, &src->sock);
925   gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
926
927   return TRUE;
928
929   /* ERRORS */
930 getaddrinfo_error:
931   {
932     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
933         ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
934     return FALSE;
935   }
936 no_socket:
937   {
938     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
939         ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
940     return FALSE;
941   }
942 setsockopt_error:
943   {
944     CLOSE_IF_REQUESTED (src);
945     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
946         ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
947     return FALSE;
948   }
949 bind_error:
950   {
951     CLOSE_IF_REQUESTED (src);
952     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
953         ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
954     return FALSE;
955   }
956 membership:
957   {
958     CLOSE_IF_REQUESTED (src);
959     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
960         ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
961     return FALSE;
962   }
963 getsockname_error:
964   {
965     CLOSE_IF_REQUESTED (src);
966     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
967         ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
968     return FALSE;
969   }
970 no_fdset:
971   {
972     CLOSE_IF_REQUESTED (src);
973     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
974         ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
975             errno));
976     return FALSE;
977   }
978 }
979
980 static gboolean
981 gst_udpsrc_unlock (GstBaseSrc * bsrc)
982 {
983   GstUDPSrc *src;
984
985   src = GST_UDPSRC (bsrc);
986
987   GST_LOG_OBJECT (src, "Flushing");
988   gst_poll_set_flushing (src->fdset, TRUE);
989
990   return TRUE;
991 }
992
993 static gboolean
994 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
995 {
996   GstUDPSrc *src;
997
998   src = GST_UDPSRC (bsrc);
999
1000   GST_LOG_OBJECT (src, "No longer flushing");
1001   gst_poll_set_flushing (src->fdset, FALSE);
1002
1003   return TRUE;
1004 }
1005
1006 static gboolean
1007 gst_udpsrc_stop (GstBaseSrc * bsrc)
1008 {
1009   GstUDPSrc *src;
1010
1011   src = GST_UDPSRC (bsrc);
1012
1013   GST_DEBUG ("stopping, closing sockets");
1014
1015   if (src->sock.fd >= 0) {
1016     if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
1017       GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
1018       gst_udp_leave_group (src->sock.fd, &src->myaddr);
1019     }
1020     CLOSE_IF_REQUESTED (src);
1021   }
1022
1023   if (src->fdset) {
1024     gst_poll_free (src->fdset);
1025     src->fdset = NULL;
1026   }
1027
1028   return TRUE;
1029 }
1030
1031 /*** GSTURIHANDLER INTERFACE *************************************************/
1032
1033 static GstURIType
1034 gst_udpsrc_uri_get_type (void)
1035 {
1036   return GST_URI_SRC;
1037 }
1038
1039 static gchar **
1040 gst_udpsrc_uri_get_protocols (void)
1041 {
1042   static gchar *protocols[] = { (char *) "udp", NULL };
1043
1044   return protocols;
1045 }
1046
1047 static const gchar *
1048 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1049 {
1050   GstUDPSrc *src = GST_UDPSRC (handler);
1051
1052   g_free (src->uristr);
1053   src->uristr = gst_udp_uri_string (&src->uri);
1054
1055   return src->uristr;
1056 }
1057
1058 static gboolean
1059 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1060 {
1061   gboolean ret;
1062
1063   GstUDPSrc *src = GST_UDPSRC (handler);
1064
1065   ret = gst_udpsrc_set_uri (src, uri);
1066
1067   return ret;
1068 }
1069
1070 static void
1071 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1072 {
1073   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1074
1075   iface->get_type = gst_udpsrc_uri_get_type;
1076   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1077   iface->get_uri = gst_udpsrc_uri_get_uri;
1078   iface->set_uri = gst_udpsrc_uri_set_uri;
1079 }