upload tizen1.0 source
[framework/multimedia/gst-plugins-good0.10.git] / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-udpsrc
23  * @see_also: udpsink, multifdsink
24  *
25  * udpsrc is a network source that reads UDP packets from the network.
26  * It can be combined with RTP depayloaders to implement RTP streaming.
27  *
28  * The udpsrc element supports automatic port allocation by setting the
29  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
30  * allocated port can be obtained by reading the port property.
31  *
32  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
33  * property to the IP address of the multicast group.
34  *
35  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
36  * property, udpsrc will then not allocate a socket itself but use the provided
37  * one.
38  *
39  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
40  * so that they can be autoplugged in GStreamer pipelines. This is very usefull
41  * for RTP implementations where the contents of the UDP packets is transfered
42  * out-of-bounds using SDP or other means.
43  *
44  * The #GstUDPSrc:buffer-size property is used to change the default kernel
45  * buffersizes used for receiving packets. The buffer size may be increased for
46  * high-volume connections, or may be decreased to limit the possible backlog of
47  * incoming data. The system places an absolute limit on these values, on Linux,
48  * for example, the default buffer size is typically 50K and can be increased to
49  * maximally 100K.
50  *
51  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
52  * number of bytes from the start of the raw udp packet and can be used to strip
53  * off proprietary header, for example.
54  *
55  * The udpsrc is always a live source. It does however not provide a #GstClock,
56  * this is left for upstream elements such as an RTP session manager or demuxer
57  * (such as an MPEG demuxer). As with all live sources, the captured buffers
58  * will have their timestamp set to the current running time of the pipeline.
59  *
60  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
61  * type URIs.
62  *
63  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
64  * will generate an element message named
65  * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
66  * if no data was recieved in the given timeout.
67  * The message's structure contains one field:
68  * <itemizedlist>
69  * <listitem>
70  *   <para>
71  *   #guint64
72  *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
73  *   expired when waiting for data.
74  *   </para>
75  * </listitem>
76  * </itemizedlist>
77  * The message is typically used to detect that no UDP arrives in the receiver
78  * because it is blocked by a firewall.
79  * </para>
80  * <para>
81  * A custom file descriptor can be configured with the
82  * #GstUDPSrc:sockfd property. The socket will be closed when setting the
83  * element to READY by default. This behaviour can be
84  * overriden with the #GstUDPSrc:closefd property, in which case the application
85  * is responsible for closing the file descriptor.
86  *
87  * <refsect2>
88  * <title>Examples</title>
89  * |[
90  * gst-launch -v udpsrc ! fakesink dump=1
91  * ]| A pipeline to read from the default port and dump the udp packets.
92  * To actually generate udp packets on the default port one can use the
93  * udpsink element. When running the following pipeline in another terminal, the
94  * above mentioned pipeline should dump data packets to the console.
95  * |[
96  * gst-launch -v audiotestsrc ! udpsink
97  * ]|
98  * |[
99  * gst-launch -v udpsrc port=0 ! fakesink
100  * ]| read udp packets from a free port.
101  * </refsect2>
102  *
103  * Last reviewed on 2007-09-20 (0.10.7)
104  */
105 #ifdef HAVE_CONFIG_H
106 #include "config.h"
107 #endif
108
109 #include "gstudpsrc.h"
110 #ifdef HAVE_UNISTD_H
111 #include <unistd.h>
112 #endif
113 #include <stdlib.h>
114
115 #if defined _MSC_VER && (_MSC_VER >= 1400)
116 #include <io.h>
117 #endif
118
119 #include <gst/netbuffer/gstnetbuffer.h>
120
121 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
122 #include <sys/filio.h>
123 #endif
124
125 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
126 #define GST_CAT_DEFAULT (udpsrc_debug)
127
128 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
129     GST_PAD_SRC,
130     GST_PAD_ALWAYS,
131     GST_STATIC_CAPS_ANY);
132
133 #define UDP_DEFAULT_PORT                4951
134 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
135 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
136 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
137 #define UDP_DEFAULT_CAPS                NULL
138 #define UDP_DEFAULT_SOCKFD              -1
139 #define UDP_DEFAULT_BUFFER_SIZE         0
140 #define UDP_DEFAULT_TIMEOUT             0
141 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
142 #define UDP_DEFAULT_CLOSEFD            TRUE
143 #define UDP_DEFAULT_SOCK                -1
144 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
145 #define UDP_DEFAULT_REUSE              TRUE
146
147 enum
148 {
149   PROP_0,
150
151   PROP_PORT,
152   PROP_MULTICAST_GROUP,
153   PROP_MULTICAST_IFACE,
154   PROP_URI,
155   PROP_CAPS,
156   PROP_SOCKFD,
157   PROP_BUFFER_SIZE,
158   PROP_TIMEOUT,
159   PROP_SKIP_FIRST_BYTES,
160   PROP_CLOSEFD,
161   PROP_SOCK,
162   PROP_AUTO_MULTICAST,
163   PROP_REUSE,
164
165   PROP_LAST
166 };
167
168 #define CLOSE_IF_REQUESTED(udpctx)                                        \
169 G_STMT_START {                                                            \
170   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
171     CLOSE_SOCKET(udpctx->sock.fd);                                        \
172     if (udpctx->sock.fd == udpctx->sockfd)                                \
173       udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
174   }                                                                       \
175   udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
176 } G_STMT_END
177
178 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
179
180 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
181
182 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
183
184 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
185
186 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
187
188 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
189
190 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
191
192 static void gst_udpsrc_finalize (GObject * object);
193
194 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198
199 static void
200 _do_init (GType type)
201 {
202   static const GInterfaceInfo urihandler_info = {
203     gst_udpsrc_uri_handler_init,
204     NULL,
205     NULL
206   };
207
208   g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
209
210   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
211 }
212
213 GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
214     _do_init);
215
216 static void
217 gst_udpsrc_base_init (gpointer g_class)
218 {
219   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
220
221   gst_element_class_add_pad_template (element_class,
222       gst_static_pad_template_get (&src_template));
223
224   gst_element_class_set_details_simple (element_class, "UDP packet receiver",
225       "Source/Network",
226       "Receive data over the network via UDP",
227       "Wim Taymans <wim@fluendo.com>, "
228       "Thijs Vermeir <thijs.vermeir@barco.com>");
229 }
230
231 static void
232 gst_udpsrc_class_init (GstUDPSrcClass * klass)
233 {
234   GObjectClass *gobject_class;
235   GstBaseSrcClass *gstbasesrc_class;
236   GstPushSrcClass *gstpushsrc_class;
237
238   gobject_class = (GObjectClass *) klass;
239   gstbasesrc_class = (GstBaseSrcClass *) klass;
240   gstpushsrc_class = (GstPushSrcClass *) klass;
241
242   gobject_class->set_property = gst_udpsrc_set_property;
243   gobject_class->get_property = gst_udpsrc_get_property;
244   gobject_class->finalize = gst_udpsrc_finalize;
245
246   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
247       g_param_spec_int ("port", "Port",
248           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
249           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
251       g_param_spec_string ("multicast-group", "Multicast Group",
252           "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
253           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
255       g_param_spec_string ("multicast-iface", "Multicast Interface",
256           "The network interface on which to join the multicast group",
257           UDP_DEFAULT_MULTICAST_IFACE,
258           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259   g_object_class_install_property (gobject_class, PROP_URI,
260       g_param_spec_string ("uri", "URI",
261           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
262           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263   g_object_class_install_property (gobject_class, PROP_CAPS,
264       g_param_spec_boxed ("caps", "Caps",
265           "The caps of the source pad", GST_TYPE_CAPS,
266           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267   g_object_class_install_property (gobject_class, PROP_SOCKFD,
268       g_param_spec_int ("sockfd", "Socket Handle",
269           "Socket to use for UDP reception. (-1 == allocate)",
270           -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
271           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
272   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
273       g_param_spec_int ("buffer-size", "Buffer Size",
274           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
275           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
277       g_param_spec_uint64 ("timeout", "Timeout",
278           "Post a message after timeout microseconds (0 = disabled)", 0,
279           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
280           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
281   g_object_class_install_property (G_OBJECT_CLASS (klass),
282       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
283           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
284           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
285           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
287       g_param_spec_boolean ("closefd", "Close sockfd",
288           "Close sockfd if passed as property on state change",
289           UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
290   g_object_class_install_property (gobject_class, PROP_SOCK,
291       g_param_spec_int ("sock", "Socket Handle",
292           "Socket currently in use for UDP reception. (-1 = no socket)",
293           -1, G_MAXINT, UDP_DEFAULT_SOCK,
294           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
295   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
296       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
297           "Automatically join/leave multicast groups",
298           UDP_DEFAULT_AUTO_MULTICAST,
299           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   g_object_class_install_property (gobject_class, PROP_REUSE,
301       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
302           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303
304   gstbasesrc_class->start = gst_udpsrc_start;
305   gstbasesrc_class->stop = gst_udpsrc_stop;
306   gstbasesrc_class->unlock = gst_udpsrc_unlock;
307   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
308   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
309
310   gstpushsrc_class->create = gst_udpsrc_create;
311 }
312
313 static void
314 gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
315 {
316   WSA_STARTUP (udpsrc);
317
318   gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
319       UDP_DEFAULT_PORT);
320
321   udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
322   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
323   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
324   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
325   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
326   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
327   udpsrc->externalfd = (udpsrc->sockfd != -1);
328   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
329   udpsrc->sock.fd = UDP_DEFAULT_SOCK;
330   udpsrc->reuse = UDP_DEFAULT_REUSE;
331
332   /* configure basesrc to be a live source */
333   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
334   /* make basesrc output a segment in time */
335   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
336   /* make basesrc set timestamps on outgoing buffers based on the running_time
337    * when they were captured */
338   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
339 }
340
341 static void
342 gst_udpsrc_finalize (GObject * object)
343 {
344   GstUDPSrc *udpsrc;
345
346   udpsrc = GST_UDPSRC (object);
347
348   if (udpsrc->caps)
349     gst_caps_unref (udpsrc->caps);
350
351   g_free (udpsrc->multi_iface);
352
353   gst_udp_uri_free (&udpsrc->uri);
354   g_free (udpsrc->uristr);
355
356   if (udpsrc->sockfd >= 0 && udpsrc->closefd)
357     CLOSE_SOCKET (udpsrc->sockfd);
358
359   WSA_CLEANUP (object);
360
361   G_OBJECT_CLASS (parent_class)->finalize (object);
362 }
363
364 static GstCaps *
365 gst_udpsrc_getcaps (GstBaseSrc * src)
366 {
367   GstUDPSrc *udpsrc;
368
369   udpsrc = GST_UDPSRC (src);
370
371   if (udpsrc->caps)
372     return gst_caps_ref (udpsrc->caps);
373   else
374     return gst_caps_new_any ();
375 }
376
377 /* read a message from the error queue */
378 static void
379 clear_error (GstUDPSrc * udpsrc)
380 {
381 #if defined (MSG_ERRQUEUE)
382   struct msghdr cmsg;
383   char cbuf[128];
384   char msgbuf[CMSG_SPACE (128)];
385   struct iovec iov;
386
387   /* Flush ERRORS from fd so next poll will not return at once */
388   /* No need for address : We look for local error */
389   cmsg.msg_name = NULL;
390   cmsg.msg_namelen = 0;
391
392   /* IOV */
393   memset (&cbuf, 0, sizeof (cbuf));
394   iov.iov_base = cbuf;
395   iov.iov_len = sizeof (cbuf);
396   cmsg.msg_iov = &iov;
397   cmsg.msg_iovlen = 1;
398
399   /* msg_control */
400   memset (&msgbuf, 0, sizeof (msgbuf));
401   cmsg.msg_control = &msgbuf;
402   cmsg.msg_controllen = sizeof (msgbuf);
403
404   recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
405 #endif
406 }
407
408 static GstFlowReturn
409 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
410 {
411   GstUDPSrc *udpsrc;
412   GstNetBuffer *outbuf;
413   union gst_sockaddr
414   {
415     struct sockaddr sa;
416     struct sockaddr_in sa_in;
417     struct sockaddr_in6 sa_in6;
418     struct sockaddr_storage sa_stor;
419   } sa;
420   socklen_t slen;
421   guint8 *pktdata;
422   gint pktsize;
423 #ifdef G_OS_UNIX
424   gint readsize;
425 #elif defined G_OS_WIN32
426   gulong readsize;
427 #endif
428   GstClockTime timeout;
429   gint ret;
430   gboolean try_again;
431
432   udpsrc = GST_UDPSRC_CAST (psrc);
433
434 retry:
435   /* quick check, avoid going in select when we already have data */
436   readsize = 0;
437   if (G_UNLIKELY ((ret =
438               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
439     goto ioctl_failed;
440
441   if (readsize > 0)
442     goto no_select;
443
444   if (udpsrc->timeout > 0) {
445     timeout = udpsrc->timeout * GST_USECOND;
446   } else {
447     timeout = GST_CLOCK_TIME_NONE;
448   }
449
450   do {
451     try_again = FALSE;
452
453     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
454         udpsrc->timeout);
455
456     ret = gst_poll_wait (udpsrc->fdset, timeout);
457     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
458     if (G_UNLIKELY (ret < 0)) {
459       if (errno == EBUSY)
460         goto stopped;
461 #ifdef G_OS_WIN32
462       if (WSAGetLastError () != WSAEINTR)
463         goto select_error;
464 #else
465       if (errno != EAGAIN && errno != EINTR)
466         goto select_error;
467 #endif
468       try_again = TRUE;
469     } else if (G_UNLIKELY (ret == 0)) {
470       /* timeout, post element message */
471       gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
472           gst_message_new_element (GST_OBJECT_CAST (udpsrc),
473               gst_structure_new ("GstUDPSrcTimeout",
474                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
475       try_again = TRUE;
476     }
477   } while (G_UNLIKELY (try_again));
478
479   /* ask how much is available for reading on the socket, this should be exactly
480    * one UDP packet. We will check the return value, though, because in some
481    * case it can return 0 and we don't want a 0 sized buffer. */
482   readsize = 0;
483   if (G_UNLIKELY ((ret =
484               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
485     goto ioctl_failed;
486
487   /* if we get here and there is nothing to read from the socket, the select got
488    * woken up by activity on the socket but it was not a read. We know someone
489    * will also do something with the socket so that we don't go into an infinite
490    * loop in the select(). */
491   if (G_UNLIKELY (!readsize)) {
492     clear_error (udpsrc);
493     goto retry;
494   }
495
496 no_select:
497   GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
498
499   pktdata = g_malloc (readsize);
500   pktsize = readsize;
501
502   while (TRUE) {
503     slen = sizeof (sa);
504 #ifdef G_OS_WIN32
505     ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
506         &slen);
507 #else
508     ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
509 #endif
510     if (G_UNLIKELY (ret < 0)) {
511 #ifdef G_OS_WIN32
512       /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
513        * generated a "port unreachable" ICMP response. We ignore that and try
514        * again. */
515       if (WSAGetLastError () == WSAECONNRESET) {
516         g_free (pktdata);
517         pktdata = NULL;
518         goto retry;
519       }
520       if (WSAGetLastError () != WSAEINTR)
521         goto receive_error;
522 #else
523       if (errno != EAGAIN && errno != EINTR)
524         goto receive_error;
525 #endif
526     } else
527       break;
528   }
529
530   /* special case buffer so receivers can also track the address */
531   outbuf = gst_netbuffer_new ();
532   GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
533
534   /* patch pktdata and len when stripping off the headers */
535   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
536     if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
537       goto skip_error;
538
539     pktdata += udpsrc->skip_first_bytes;
540     ret -= udpsrc->skip_first_bytes;
541   }
542   GST_BUFFER_DATA (outbuf) = pktdata;
543   GST_BUFFER_SIZE (outbuf) = ret;
544
545   switch (sa.sa.sa_family) {
546     case AF_INET:
547     {
548       gst_netaddress_set_ip4_address (&outbuf->from, sa.sa_in.sin_addr.s_addr,
549           sa.sa_in.sin_port);
550     }
551       break;
552     case AF_INET6:
553     {
554       guint8 ip6[16];
555
556       memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
557       gst_netaddress_set_ip6_address (&outbuf->from, ip6, sa.sa_in6.sin6_port);
558     }
559       break;
560     default:
561 #ifdef G_OS_WIN32
562       WSASetLastError (WSAEAFNOSUPPORT);
563 #else
564       errno = EAFNOSUPPORT;
565 #endif
566       goto receive_error;
567   }
568   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
569
570   *buf = GST_BUFFER_CAST (outbuf);
571
572   return GST_FLOW_OK;
573
574   /* ERRORS */
575 select_error:
576   {
577     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
578         ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
579     return GST_FLOW_ERROR;
580   }
581 stopped:
582   {
583     GST_DEBUG ("stop called");
584     return GST_FLOW_WRONG_STATE;
585   }
586 ioctl_failed:
587   {
588     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
589         ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
590     return GST_FLOW_ERROR;
591   }
592 receive_error:
593   {
594     g_free (pktdata);
595 #ifdef G_OS_WIN32
596     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
597         ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
598 #else
599     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
600         ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
601 #endif
602     return GST_FLOW_ERROR;
603   }
604 skip_error:
605   {
606     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
607         ("UDP buffer to small to skip header"));
608     return GST_FLOW_ERROR;
609   }
610 }
611
612 static gboolean
613 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
614 {
615   if (gst_udp_parse_uri (uri, &src->uri) < 0)
616     goto wrong_uri;
617
618   if (src->uri.port == -1)
619     src->uri.port = UDP_DEFAULT_PORT;
620
621   return TRUE;
622
623   /* ERRORS */
624 wrong_uri:
625   {
626     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
627         ("error parsing uri %s", uri));
628     return FALSE;
629   }
630 }
631
632 static void
633 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
634     GParamSpec * pspec)
635 {
636   GstUDPSrc *udpsrc = GST_UDPSRC (object);
637
638   switch (prop_id) {
639     case PROP_BUFFER_SIZE:
640       udpsrc->buffer_size = g_value_get_int (value);
641       break;
642     case PROP_PORT:
643       gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
644       break;
645     case PROP_MULTICAST_GROUP:
646     {
647       const gchar *group;
648
649       if ((group = g_value_get_string (value)))
650         gst_udp_uri_update (&udpsrc->uri, group, -1);
651       else
652         gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
653       break;
654     }
655     case PROP_MULTICAST_IFACE:
656       g_free (udpsrc->multi_iface);
657
658       if (g_value_get_string (value) == NULL)
659         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
660       else
661         udpsrc->multi_iface = g_value_dup_string (value);
662       break;
663     case PROP_URI:
664       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
665       break;
666     case PROP_CAPS:
667     {
668       const GstCaps *new_caps_val = gst_value_get_caps (value);
669
670       GstCaps *new_caps;
671
672       GstCaps *old_caps;
673
674       if (new_caps_val == NULL) {
675         new_caps = gst_caps_new_any ();
676       } else {
677         new_caps = gst_caps_copy (new_caps_val);
678       }
679
680       old_caps = udpsrc->caps;
681       udpsrc->caps = new_caps;
682       if (old_caps)
683         gst_caps_unref (old_caps);
684       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
685       break;
686     }
687     case PROP_SOCKFD:
688       if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
689           udpsrc->closefd)
690         CLOSE_SOCKET (udpsrc->sockfd);
691       udpsrc->sockfd = g_value_get_int (value);
692       GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
693       break;
694     case PROP_TIMEOUT:
695       udpsrc->timeout = g_value_get_uint64 (value);
696       break;
697     case PROP_SKIP_FIRST_BYTES:
698       udpsrc->skip_first_bytes = g_value_get_int (value);
699       break;
700     case PROP_CLOSEFD:
701       udpsrc->closefd = g_value_get_boolean (value);
702       break;
703     case PROP_AUTO_MULTICAST:
704       udpsrc->auto_multicast = g_value_get_boolean (value);
705       break;
706     case PROP_REUSE:
707       udpsrc->reuse = g_value_get_boolean (value);
708       break;
709     default:
710       break;
711   }
712 }
713
714 static void
715 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
716     GParamSpec * pspec)
717 {
718   GstUDPSrc *udpsrc = GST_UDPSRC (object);
719
720   switch (prop_id) {
721     case PROP_BUFFER_SIZE:
722       g_value_set_int (value, udpsrc->buffer_size);
723       break;
724     case PROP_PORT:
725       g_value_set_int (value, udpsrc->uri.port);
726       break;
727     case PROP_MULTICAST_GROUP:
728       g_value_set_string (value, udpsrc->uri.host);
729       break;
730     case PROP_MULTICAST_IFACE:
731       g_value_set_string (value, udpsrc->multi_iface);
732       break;
733     case PROP_URI:
734       g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
735       break;
736     case PROP_CAPS:
737       gst_value_set_caps (value, udpsrc->caps);
738       break;
739     case PROP_SOCKFD:
740       g_value_set_int (value, udpsrc->sockfd);
741       break;
742     case PROP_TIMEOUT:
743       g_value_set_uint64 (value, udpsrc->timeout);
744       break;
745     case PROP_SKIP_FIRST_BYTES:
746       g_value_set_int (value, udpsrc->skip_first_bytes);
747       break;
748     case PROP_CLOSEFD:
749       g_value_set_boolean (value, udpsrc->closefd);
750       break;
751     case PROP_SOCK:
752       g_value_set_int (value, udpsrc->sock.fd);
753       break;
754     case PROP_AUTO_MULTICAST:
755       g_value_set_boolean (value, udpsrc->auto_multicast);
756       break;
757     case PROP_REUSE:
758       g_value_set_boolean (value, udpsrc->reuse);
759       break;
760     default:
761       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
762       break;
763   }
764 }
765
766 /* create a socket for sending to remote machine */
767 static gboolean
768 gst_udpsrc_start (GstBaseSrc * bsrc)
769 {
770   guint bc_val;
771   guint err_val;
772   gint reuse;
773   int port;
774   GstUDPSrc *src;
775   gint ret;
776   int rcvsize;
777   struct sockaddr_storage bind_address;
778   socklen_t len;
779   src = GST_UDPSRC (bsrc);
780
781   if (src->sockfd == -1) {
782     /* need to allocate a socket */
783     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
784         src->uri.port);
785     if ((ret =
786             gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
787       goto getaddrinfo_error;
788
789     if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
790       goto no_socket;
791
792     src->sock.fd = ret;
793     src->externalfd = FALSE;
794
795     GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
796
797     GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
798     reuse = src->reuse ? 1 : 0;
799     if ((ret =
800             setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
801                 sizeof (reuse))) < 0)
802       goto setsockopt_error;
803
804     GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
805
806     /* Take a temporary copy of the address in case we need to fix it for bind */
807     memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
808
809 #ifdef G_OS_WIN32
810     /* Windows does not allow binding to a multicast group so fix source address */
811     if (gst_udp_is_multicast (&src->myaddr)) {
812       switch (((struct sockaddr *) &bind_address)->sa_family) {
813         case AF_INET:
814           ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
815               htonl (INADDR_ANY);
816           break;
817         case AF_INET6:
818           ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
819           break;
820         default:
821           break;
822       }
823     }
824 #endif
825
826     len = gst_udp_get_sockaddr_length (&bind_address);
827     if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
828       goto bind_error;
829
830     if (!gst_udp_is_multicast (&src->myaddr)) {
831       len = sizeof (src->myaddr);
832       if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
833                   &len)) < 0)
834         goto getsockname_error;
835     }
836   } else {
837     GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
838     /* we use the configured socket, try to get some info about it */
839     len = sizeof (src->myaddr);
840     if ((ret =
841             getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
842                 &len)) < 0)
843       goto getsockname_error;
844
845     src->sock.fd = src->sockfd;
846     src->externalfd = TRUE;
847   }
848
849   len = sizeof (rcvsize);
850   if (src->buffer_size != 0) {
851     rcvsize = src->buffer_size;
852
853     GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
854     /* set buffer size, Note that on Linux this is typically limited to a
855      * maximum of around 100K. Also a minimum of 128 bytes is required on
856      * Linux. */
857     ret =
858         setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
859         len);
860     if (ret != 0) {
861       GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
862           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
863               rcvsize, ret, g_strerror (errno), errno));
864     }
865   }
866
867   /* read the value of the receive buffer. Note that on linux this returns 2x the
868    * value we set because the kernel allocates extra memory for metadata.
869    * The default on Linux is about 100K (which is about 50K without metadata) */
870   ret =
871       getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
872   if (ret == 0)
873     GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
874   else
875     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
876
877   bc_val = 1;
878   if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
879               sizeof (bc_val))) < 0) {
880     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
881         ("could not configure socket for broadcast %d: %s (%d)", ret,
882             g_strerror (errno), errno));
883   }
884
885   /* Accept ERRQUEUE to get and flush icmp errors */
886   err_val = 1;
887 #if defined (IP_RECVERR)
888   if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
889               sizeof (err_val))) < 0) {
890     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
891         ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
892             g_strerror (errno), errno));
893   }
894 #endif
895
896   if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
897     GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
898     ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
899     if (ret < 0)
900       goto membership;
901   }
902
903   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
904    * follows ss_family on both */
905   port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
906   GST_DEBUG_OBJECT (src, "bound, on port %d", port);
907   if (port != src->uri.port) {
908     src->uri.port = port;
909     GST_DEBUG_OBJECT (src, "notifying port %d", port);
910     g_object_notify (G_OBJECT (src), "port");
911   }
912
913   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
914     goto no_fdset;
915
916   gst_poll_add_fd (src->fdset, &src->sock);
917   gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
918
919   return TRUE;
920
921   /* ERRORS */
922 getaddrinfo_error:
923   {
924     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
925         ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
926     return FALSE;
927   }
928 no_socket:
929   {
930     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
931         ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
932     return FALSE;
933   }
934 setsockopt_error:
935   {
936     CLOSE_IF_REQUESTED (src);
937     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
938         ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
939     return FALSE;
940   }
941 bind_error:
942   {
943     CLOSE_IF_REQUESTED (src);
944     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
945         ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
946     return FALSE;
947   }
948 membership:
949   {
950     CLOSE_IF_REQUESTED (src);
951     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
952         ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
953     return FALSE;
954   }
955 getsockname_error:
956   {
957     CLOSE_IF_REQUESTED (src);
958     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
959         ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
960     return FALSE;
961   }
962 no_fdset:
963   {
964     CLOSE_IF_REQUESTED (src);
965     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
966         ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
967             errno));
968     return FALSE;
969   }
970 }
971
972 static gboolean
973 gst_udpsrc_unlock (GstBaseSrc * bsrc)
974 {
975   GstUDPSrc *src;
976
977   src = GST_UDPSRC (bsrc);
978
979   GST_LOG_OBJECT (src, "Flushing");
980   gst_poll_set_flushing (src->fdset, TRUE);
981
982   return TRUE;
983 }
984
985 static gboolean
986 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
987 {
988   GstUDPSrc *src;
989
990   src = GST_UDPSRC (bsrc);
991
992   GST_LOG_OBJECT (src, "No longer flushing");
993   gst_poll_set_flushing (src->fdset, FALSE);
994
995   return TRUE;
996 }
997
998 static gboolean
999 gst_udpsrc_stop (GstBaseSrc * bsrc)
1000 {
1001   GstUDPSrc *src;
1002
1003   src = GST_UDPSRC (bsrc);
1004
1005   GST_DEBUG ("stopping, closing sockets");
1006
1007   if (src->sock.fd >= 0) {
1008     if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
1009       GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
1010       gst_udp_leave_group (src->sock.fd, &src->myaddr);
1011     }
1012     CLOSE_IF_REQUESTED (src);
1013   }
1014
1015   if (src->fdset) {
1016     gst_poll_free (src->fdset);
1017     src->fdset = NULL;
1018   }
1019
1020   return TRUE;
1021 }
1022
1023 /*** GSTURIHANDLER INTERFACE *************************************************/
1024
1025 static GstURIType
1026 gst_udpsrc_uri_get_type (void)
1027 {
1028   return GST_URI_SRC;
1029 }
1030
1031 static gchar **
1032 gst_udpsrc_uri_get_protocols (void)
1033 {
1034   static gchar *protocols[] = { (char *) "udp", NULL };
1035
1036   return protocols;
1037 }
1038
1039 static const gchar *
1040 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1041 {
1042   GstUDPSrc *src = GST_UDPSRC (handler);
1043
1044   g_free (src->uristr);
1045   src->uristr = gst_udp_uri_string (&src->uri);
1046
1047   return src->uristr;
1048 }
1049
1050 static gboolean
1051 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1052 {
1053   gboolean ret;
1054
1055   GstUDPSrc *src = GST_UDPSRC (handler);
1056
1057   ret = gst_udpsrc_set_uri (src, uri);
1058
1059   return ret;
1060 }
1061
1062 static void
1063 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1064 {
1065   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1066
1067   iface->get_type = gst_udpsrc_uri_get_type;
1068   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1069   iface->get_uri = gst_udpsrc_uri_get_uri;
1070   iface->set_uri = gst_udpsrc_uri_set_uri;
1071 }