udp: implement NetAddress with metadata
[platform/upstream/gst-plugins-good.git] / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-udpsrc
23  * @see_also: udpsink, multifdsink
24  *
25  * udpsrc is a network source that reads UDP packets from the network.
26  * It can be combined with RTP depayloaders to implement RTP streaming.
27  *
28  * The udpsrc element supports automatic port allocation by setting the
29  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
30  * allocated port can be obtained by reading the port property.
31  *
32  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
33  * property to the IP address of the multicast group.
34  *
35  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
36  * property, udpsrc will then not allocate a socket itself but use the provided
37  * one.
38  *
39  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
40  * so that they can be autoplugged in GStreamer pipelines. This is very usefull
41  * for RTP implementations where the contents of the UDP packets is transfered
42  * out-of-bounds using SDP or other means.
43  *
44  * The #GstUDPSrc:buffer-size property is used to change the default kernel
45  * buffersizes used for receiving packets. The buffer size may be increased for
46  * high-volume connections, or may be decreased to limit the possible backlog of
47  * incoming data. The system places an absolute limit on these values, on Linux,
48  * for example, the default buffer size is typically 50K and can be increased to
49  * maximally 100K.
50  *
51  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
52  * number of bytes from the start of the raw udp packet and can be used to strip
53  * off proprietary header, for example.
54  *
55  * The udpsrc is always a live source. It does however not provide a #GstClock,
56  * this is left for upstream elements such as an RTP session manager or demuxer
57  * (such as an MPEG demuxer). As with all live sources, the captured buffers
58  * will have their timestamp set to the current running time of the pipeline.
59  *
60  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
61  * type URIs.
62  *
63  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
64  * will generate an element message named
65  * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
66  * if no data was recieved in the given timeout.
67  * The message's structure contains one field:
68  * <itemizedlist>
69  * <listitem>
70  *   <para>
71  *   #guint64
72  *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
73  *   expired when waiting for data.
74  *   </para>
75  * </listitem>
76  * </itemizedlist>
77  * The message is typically used to detect that no UDP arrives in the receiver
78  * because it is blocked by a firewall.
79  * </para>
80  * <para>
81  * A custom file descriptor can be configured with the
82  * #GstUDPSrc:sockfd property. The socket will be closed when setting the
83  * element to READY by default. This behaviour can be
84  * overriden with the #GstUDPSrc:closefd property, in which case the application
85  * is responsible for closing the file descriptor.
86  *
87  * <refsect2>
88  * <title>Examples</title>
89  * |[
90  * gst-launch -v udpsrc ! fakesink dump=1
91  * ]| A pipeline to read from the default port and dump the udp packets.
92  * To actually generate udp packets on the default port one can use the
93  * udpsink element. When running the following pipeline in another terminal, the
94  * above mentioned pipeline should dump data packets to the console.
95  * |[
96  * gst-launch -v audiotestsrc ! udpsink
97  * ]|
98  * |[
99  * gst-launch -v udpsrc port=0 ! fakesink
100  * ]| read udp packets from a free port.
101  * </refsect2>
102  *
103  * Last reviewed on 2007-09-20 (0.10.7)
104  */
105 #ifdef HAVE_CONFIG_H
106 #include "config.h"
107 #endif
108
109 #include "gstudpsrc.h"
110 #ifdef HAVE_UNISTD_H
111 #include <unistd.h>
112 #endif
113 #include <stdlib.h>
114
115 #if defined _MSC_VER && (_MSC_VER >= 1400)
116 #include <io.h>
117 #endif
118
119 #include <gst/netbuffer/gstnetbuffer.h>
120
121 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
122 #include <sys/filio.h>
123 #endif
124
125 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
126 #define GST_CAT_DEFAULT (udpsrc_debug)
127
128 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
129     GST_PAD_SRC,
130     GST_PAD_ALWAYS,
131     GST_STATIC_CAPS_ANY);
132
133 #define UDP_DEFAULT_PORT                4951
134 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
135 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
136 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
137 #define UDP_DEFAULT_CAPS                NULL
138 #define UDP_DEFAULT_SOCKFD              -1
139 #define UDP_DEFAULT_BUFFER_SIZE         0
140 #define UDP_DEFAULT_TIMEOUT             0
141 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
142 #define UDP_DEFAULT_CLOSEFD            TRUE
143 #define UDP_DEFAULT_SOCK                -1
144 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
145 #define UDP_DEFAULT_REUSE              TRUE
146
147 enum
148 {
149   PROP_0,
150
151   PROP_PORT,
152   PROP_MULTICAST_GROUP,
153   PROP_MULTICAST_IFACE,
154   PROP_URI,
155   PROP_CAPS,
156   PROP_SOCKFD,
157   PROP_BUFFER_SIZE,
158   PROP_TIMEOUT,
159   PROP_SKIP_FIRST_BYTES,
160   PROP_CLOSEFD,
161   PROP_SOCK,
162   PROP_AUTO_MULTICAST,
163   PROP_REUSE,
164
165   PROP_LAST
166 };
167
168 #define CLOSE_IF_REQUESTED(udpctx)                                        \
169 G_STMT_START {                                                            \
170   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
171     CLOSE_SOCKET(udpctx->sock.fd);                                        \
172     if (udpctx->sock.fd == udpctx->sockfd)                                \
173       udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
174   }                                                                       \
175   udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
176 } G_STMT_END
177
178 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
179
180 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
181
182 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
183
184 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
185
186 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
187
188 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
189
190 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
191
192 static void gst_udpsrc_finalize (GObject * object);
193
194 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198
199 static void
200 _do_init (GType type)
201 {
202   static const GInterfaceInfo urihandler_info = {
203     gst_udpsrc_uri_handler_init,
204     NULL,
205     NULL
206   };
207
208   g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
209
210   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
211 }
212
213 GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
214     _do_init);
215
216 static void
217 gst_udpsrc_base_init (gpointer g_class)
218 {
219   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
220
221   gst_element_class_add_pad_template (element_class,
222       gst_static_pad_template_get (&src_template));
223
224   gst_element_class_set_details_simple (element_class, "UDP packet receiver",
225       "Source/Network",
226       "Receive data over the network via UDP",
227       "Wim Taymans <wim@fluendo.com>, "
228       "Thijs Vermeir <thijs.vermeir@barco.com>");
229 }
230
231 static void
232 gst_udpsrc_class_init (GstUDPSrcClass * klass)
233 {
234   GObjectClass *gobject_class;
235   GstBaseSrcClass *gstbasesrc_class;
236   GstPushSrcClass *gstpushsrc_class;
237
238   gobject_class = (GObjectClass *) klass;
239   gstbasesrc_class = (GstBaseSrcClass *) klass;
240   gstpushsrc_class = (GstPushSrcClass *) klass;
241
242   gobject_class->set_property = gst_udpsrc_set_property;
243   gobject_class->get_property = gst_udpsrc_get_property;
244   gobject_class->finalize = gst_udpsrc_finalize;
245
246   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
247       g_param_spec_int ("port", "Port",
248           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
249           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
251       g_param_spec_string ("multicast-group", "Multicast Group",
252           "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
253           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
255       g_param_spec_string ("multicast-iface", "Multicast Interface",
256           "The network interface on which to join the multicast group",
257           UDP_DEFAULT_MULTICAST_IFACE,
258           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259   g_object_class_install_property (gobject_class, PROP_URI,
260       g_param_spec_string ("uri", "URI",
261           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
262           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263   g_object_class_install_property (gobject_class, PROP_CAPS,
264       g_param_spec_boxed ("caps", "Caps",
265           "The caps of the source pad", GST_TYPE_CAPS,
266           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267   g_object_class_install_property (gobject_class, PROP_SOCKFD,
268       g_param_spec_int ("sockfd", "Socket Handle",
269           "Socket to use for UDP reception. (-1 == allocate)",
270           -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
271           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
272   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
273       g_param_spec_int ("buffer-size", "Buffer Size",
274           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
275           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
277       g_param_spec_uint64 ("timeout", "Timeout",
278           "Post a message after timeout microseconds (0 = disabled)", 0,
279           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
280           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
281   g_object_class_install_property (G_OBJECT_CLASS (klass),
282       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
283           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
284           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
285           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
287       g_param_spec_boolean ("closefd", "Close sockfd",
288           "Close sockfd if passed as property on state change",
289           UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
290   g_object_class_install_property (gobject_class, PROP_SOCK,
291       g_param_spec_int ("sock", "Socket Handle",
292           "Socket currently in use for UDP reception. (-1 = no socket)",
293           -1, G_MAXINT, UDP_DEFAULT_SOCK,
294           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
295   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
296       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
297           "Automatically join/leave multicast groups",
298           UDP_DEFAULT_AUTO_MULTICAST,
299           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   g_object_class_install_property (gobject_class, PROP_REUSE,
301       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
302           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303
304   gstbasesrc_class->start = gst_udpsrc_start;
305   gstbasesrc_class->stop = gst_udpsrc_stop;
306   gstbasesrc_class->unlock = gst_udpsrc_unlock;
307   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
308   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
309
310   gstpushsrc_class->create = gst_udpsrc_create;
311 }
312
313 static void
314 gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
315 {
316   WSA_STARTUP (udpsrc);
317
318   gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
319       UDP_DEFAULT_PORT);
320
321   udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
322   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
323   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
324   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
325   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
326   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
327   udpsrc->externalfd = (udpsrc->sockfd != -1);
328   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
329   udpsrc->sock.fd = UDP_DEFAULT_SOCK;
330   udpsrc->reuse = UDP_DEFAULT_REUSE;
331
332   /* configure basesrc to be a live source */
333   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
334   /* make basesrc output a segment in time */
335   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
336   /* make basesrc set timestamps on outgoing buffers based on the running_time
337    * when they were captured */
338   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
339 }
340
341 static void
342 gst_udpsrc_finalize (GObject * object)
343 {
344   GstUDPSrc *udpsrc;
345
346   udpsrc = GST_UDPSRC (object);
347
348   if (udpsrc->caps)
349     gst_caps_unref (udpsrc->caps);
350
351   g_free (udpsrc->multi_iface);
352
353   gst_udp_uri_free (&udpsrc->uri);
354   g_free (udpsrc->uristr);
355
356   if (udpsrc->sockfd >= 0 && udpsrc->closefd)
357     CLOSE_SOCKET (udpsrc->sockfd);
358
359   WSA_CLEANUP (object);
360
361   G_OBJECT_CLASS (parent_class)->finalize (object);
362 }
363
364 static GstCaps *
365 gst_udpsrc_getcaps (GstBaseSrc * src)
366 {
367   GstUDPSrc *udpsrc;
368
369   udpsrc = GST_UDPSRC (src);
370
371   if (udpsrc->caps)
372     return gst_caps_ref (udpsrc->caps);
373   else
374     return gst_caps_new_any ();
375 }
376
377 /* read a message from the error queue */
378 static void
379 clear_error (GstUDPSrc * udpsrc)
380 {
381 #if defined (MSG_ERRQUEUE)
382   struct msghdr cmsg;
383   char cbuf[128];
384   char msgbuf[CMSG_SPACE (128)];
385   struct iovec iov;
386
387   /* Flush ERRORS from fd so next poll will not return at once */
388   /* No need for address : We look for local error */
389   cmsg.msg_name = NULL;
390   cmsg.msg_namelen = 0;
391
392   /* IOV */
393   memset (&cbuf, 0, sizeof (cbuf));
394   iov.iov_base = cbuf;
395   iov.iov_len = sizeof (cbuf);
396   cmsg.msg_iov = &iov;
397   cmsg.msg_iovlen = 1;
398
399   /* msg_control */
400   memset (&msgbuf, 0, sizeof (msgbuf));
401   cmsg.msg_control = &msgbuf;
402   cmsg.msg_controllen = sizeof (msgbuf);
403
404   recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
405 #endif
406 }
407
408 static GstFlowReturn
409 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
410 {
411   GstUDPSrc *udpsrc;
412   GstMetaNetAddress *meta;
413   GstBuffer *outbuf;
414   union gst_sockaddr
415   {
416     struct sockaddr sa;
417     struct sockaddr_in sa_in;
418     struct sockaddr_in6 sa_in6;
419     struct sockaddr_storage sa_stor;
420   } sa;
421   socklen_t slen;
422   guint8 *pktdata;
423   gint pktsize;
424 #ifdef G_OS_UNIX
425   gint readsize;
426 #elif defined G_OS_WIN32
427   gulong readsize;
428 #endif
429   GstClockTime timeout;
430   gint ret;
431   gboolean try_again;
432
433   udpsrc = GST_UDPSRC_CAST (psrc);
434
435 retry:
436   /* quick check, avoid going in select when we already have data */
437   readsize = 0;
438   if (G_UNLIKELY ((ret =
439               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
440     goto ioctl_failed;
441
442   if (readsize > 0)
443     goto no_select;
444
445   if (udpsrc->timeout > 0) {
446     timeout = udpsrc->timeout * GST_USECOND;
447   } else {
448     timeout = GST_CLOCK_TIME_NONE;
449   }
450
451   do {
452     try_again = FALSE;
453
454     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
455         udpsrc->timeout);
456
457     ret = gst_poll_wait (udpsrc->fdset, timeout);
458     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
459     if (G_UNLIKELY (ret < 0)) {
460       if (errno == EBUSY)
461         goto stopped;
462 #ifdef G_OS_WIN32
463       if (WSAGetLastError () != WSAEINTR)
464         goto select_error;
465 #else
466       if (errno != EAGAIN && errno != EINTR)
467         goto select_error;
468 #endif
469       try_again = TRUE;
470     } else if (G_UNLIKELY (ret == 0)) {
471       /* timeout, post element message */
472       gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
473           gst_message_new_element (GST_OBJECT_CAST (udpsrc),
474               gst_structure_new ("GstUDPSrcTimeout",
475                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
476       try_again = TRUE;
477     }
478   } while (G_UNLIKELY (try_again));
479
480   /* ask how much is available for reading on the socket, this should be exactly
481    * one UDP packet. We will check the return value, though, because in some
482    * case it can return 0 and we don't want a 0 sized buffer. */
483   readsize = 0;
484   if (G_UNLIKELY ((ret =
485               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
486     goto ioctl_failed;
487
488   /* if we get here and there is nothing to read from the socket, the select got
489    * woken up by activity on the socket but it was not a read. We know someone
490    * will also do something with the socket so that we don't go into an infinite
491    * loop in the select(). */
492   if (G_UNLIKELY (!readsize)) {
493     clear_error (udpsrc);
494     goto retry;
495   }
496
497 no_select:
498   GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
499
500   pktdata = g_malloc (readsize);
501   pktsize = readsize;
502
503   while (TRUE) {
504     slen = sizeof (sa);
505 #ifdef G_OS_WIN32
506     ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
507         &slen);
508 #else
509     ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
510 #endif
511     if (G_UNLIKELY (ret < 0)) {
512 #ifdef G_OS_WIN32
513       /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
514        * generated a "port unreachable" ICMP response. We ignore that and try
515        * again. */
516       if (WSAGetLastError () == WSAECONNRESET) {
517         g_free (pktdata);
518         pktdata = NULL;
519         goto retry;
520       }
521       if (WSAGetLastError () != WSAEINTR)
522         goto receive_error;
523 #else
524       if (errno != EAGAIN && errno != EINTR)
525         goto receive_error;
526 #endif
527     } else
528       break;
529   }
530
531   outbuf = gst_buffer_new ();
532   GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
533
534   /* patch pktdata and len when stripping off the headers */
535   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
536     if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
537       goto skip_error;
538
539     pktdata += udpsrc->skip_first_bytes;
540     ret -= udpsrc->skip_first_bytes;
541   }
542   GST_BUFFER_DATA (outbuf) = pktdata;
543   GST_BUFFER_SIZE (outbuf) = ret;
544
545   /* use buffer metadata so receivers can also track the address */
546   meta = gst_buffer_add_meta_net_address (outbuf);
547
548   switch (sa.sa.sa_family) {
549     case AF_INET:
550     {
551       gst_netaddress_set_ip4_address (&meta->naddr, sa.sa_in.sin_addr.s_addr,
552           sa.sa_in.sin_port);
553     }
554       break;
555     case AF_INET6:
556     {
557       guint8 ip6[16];
558
559       memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
560       gst_netaddress_set_ip6_address (&meta->naddr, ip6, sa.sa_in6.sin6_port);
561     }
562       break;
563     default:
564 #ifdef G_OS_WIN32
565       WSASetLastError (WSAEAFNOSUPPORT);
566 #else
567       errno = EAFNOSUPPORT;
568 #endif
569       goto receive_error;
570   }
571   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
572
573   *buf = GST_BUFFER_CAST (outbuf);
574
575   return GST_FLOW_OK;
576
577   /* ERRORS */
578 select_error:
579   {
580     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
581         ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
582     return GST_FLOW_ERROR;
583   }
584 stopped:
585   {
586     GST_DEBUG ("stop called");
587     return GST_FLOW_WRONG_STATE;
588   }
589 ioctl_failed:
590   {
591     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
592         ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
593     return GST_FLOW_ERROR;
594   }
595 receive_error:
596   {
597     g_free (pktdata);
598 #ifdef G_OS_WIN32
599     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
600         ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
601 #else
602     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
603         ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
604 #endif
605     return GST_FLOW_ERROR;
606   }
607 skip_error:
608   {
609     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
610         ("UDP buffer to small to skip header"));
611     return GST_FLOW_ERROR;
612   }
613 }
614
615 static gboolean
616 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
617 {
618   if (gst_udp_parse_uri (uri, &src->uri) < 0)
619     goto wrong_uri;
620
621   if (src->uri.port == -1)
622     src->uri.port = UDP_DEFAULT_PORT;
623
624   return TRUE;
625
626   /* ERRORS */
627 wrong_uri:
628   {
629     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
630         ("error parsing uri %s", uri));
631     return FALSE;
632   }
633 }
634
635 static void
636 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
637     GParamSpec * pspec)
638 {
639   GstUDPSrc *udpsrc = GST_UDPSRC (object);
640
641   switch (prop_id) {
642     case PROP_BUFFER_SIZE:
643       udpsrc->buffer_size = g_value_get_int (value);
644       break;
645     case PROP_PORT:
646       gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
647       break;
648     case PROP_MULTICAST_GROUP:
649     {
650       const gchar *group;
651
652       if ((group = g_value_get_string (value)))
653         gst_udp_uri_update (&udpsrc->uri, group, -1);
654       else
655         gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
656       break;
657     }
658     case PROP_MULTICAST_IFACE:
659       g_free (udpsrc->multi_iface);
660
661       if (g_value_get_string (value) == NULL)
662         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
663       else
664         udpsrc->multi_iface = g_value_dup_string (value);
665       break;
666     case PROP_URI:
667       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
668       break;
669     case PROP_CAPS:
670     {
671       const GstCaps *new_caps_val = gst_value_get_caps (value);
672
673       GstCaps *new_caps;
674
675       GstCaps *old_caps;
676
677       if (new_caps_val == NULL) {
678         new_caps = gst_caps_new_any ();
679       } else {
680         new_caps = gst_caps_copy (new_caps_val);
681       }
682
683       old_caps = udpsrc->caps;
684       udpsrc->caps = new_caps;
685       if (old_caps)
686         gst_caps_unref (old_caps);
687       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
688       break;
689     }
690     case PROP_SOCKFD:
691       if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
692           udpsrc->closefd)
693         CLOSE_SOCKET (udpsrc->sockfd);
694       udpsrc->sockfd = g_value_get_int (value);
695       GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
696       break;
697     case PROP_TIMEOUT:
698       udpsrc->timeout = g_value_get_uint64 (value);
699       break;
700     case PROP_SKIP_FIRST_BYTES:
701       udpsrc->skip_first_bytes = g_value_get_int (value);
702       break;
703     case PROP_CLOSEFD:
704       udpsrc->closefd = g_value_get_boolean (value);
705       break;
706     case PROP_AUTO_MULTICAST:
707       udpsrc->auto_multicast = g_value_get_boolean (value);
708       break;
709     case PROP_REUSE:
710       udpsrc->reuse = g_value_get_boolean (value);
711       break;
712     default:
713       break;
714   }
715 }
716
717 static void
718 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
719     GParamSpec * pspec)
720 {
721   GstUDPSrc *udpsrc = GST_UDPSRC (object);
722
723   switch (prop_id) {
724     case PROP_BUFFER_SIZE:
725       g_value_set_int (value, udpsrc->buffer_size);
726       break;
727     case PROP_PORT:
728       g_value_set_int (value, udpsrc->uri.port);
729       break;
730     case PROP_MULTICAST_GROUP:
731       g_value_set_string (value, udpsrc->uri.host);
732       break;
733     case PROP_MULTICAST_IFACE:
734       g_value_set_string (value, udpsrc->multi_iface);
735       break;
736     case PROP_URI:
737       g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
738       break;
739     case PROP_CAPS:
740       gst_value_set_caps (value, udpsrc->caps);
741       break;
742     case PROP_SOCKFD:
743       g_value_set_int (value, udpsrc->sockfd);
744       break;
745     case PROP_TIMEOUT:
746       g_value_set_uint64 (value, udpsrc->timeout);
747       break;
748     case PROP_SKIP_FIRST_BYTES:
749       g_value_set_int (value, udpsrc->skip_first_bytes);
750       break;
751     case PROP_CLOSEFD:
752       g_value_set_boolean (value, udpsrc->closefd);
753       break;
754     case PROP_SOCK:
755       g_value_set_int (value, udpsrc->sock.fd);
756       break;
757     case PROP_AUTO_MULTICAST:
758       g_value_set_boolean (value, udpsrc->auto_multicast);
759       break;
760     case PROP_REUSE:
761       g_value_set_boolean (value, udpsrc->reuse);
762       break;
763     default:
764       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
765       break;
766   }
767 }
768
769 /* create a socket for sending to remote machine */
770 static gboolean
771 gst_udpsrc_start (GstBaseSrc * bsrc)
772 {
773   guint bc_val;
774   guint err_val;
775   gint reuse;
776   int port;
777   GstUDPSrc *src;
778   gint ret;
779   int rcvsize;
780   struct sockaddr_storage bind_address;
781   socklen_t len;
782   src = GST_UDPSRC (bsrc);
783
784   if (src->sockfd == -1) {
785     /* need to allocate a socket */
786     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
787         src->uri.port);
788     if ((ret =
789             gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
790       goto getaddrinfo_error;
791
792     if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
793       goto no_socket;
794
795     src->sock.fd = ret;
796     src->externalfd = FALSE;
797
798     GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
799
800     GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
801     reuse = src->reuse ? 1 : 0;
802     if ((ret =
803             setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
804                 sizeof (reuse))) < 0)
805       goto setsockopt_error;
806
807     GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
808
809     /* Take a temporary copy of the address in case we need to fix it for bind */
810     memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
811
812 #ifdef G_OS_WIN32
813     /* Windows does not allow binding to a multicast group so fix source address */
814     if (gst_udp_is_multicast (&src->myaddr)) {
815       switch (((struct sockaddr *) &bind_address)->sa_family) {
816         case AF_INET:
817           ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
818               htonl (INADDR_ANY);
819           break;
820         case AF_INET6:
821           ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
822           break;
823         default:
824           break;
825       }
826     }
827 #endif
828
829     len = gst_udp_get_sockaddr_length (&bind_address);
830     if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
831       goto bind_error;
832
833     if (!gst_udp_is_multicast (&src->myaddr)) {
834       len = sizeof (src->myaddr);
835       if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
836                   &len)) < 0)
837         goto getsockname_error;
838     }
839   } else {
840     GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
841     /* we use the configured socket, try to get some info about it */
842     len = sizeof (src->myaddr);
843     if ((ret =
844             getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
845                 &len)) < 0)
846       goto getsockname_error;
847
848     src->sock.fd = src->sockfd;
849     src->externalfd = TRUE;
850   }
851
852   len = sizeof (rcvsize);
853   if (src->buffer_size != 0) {
854     rcvsize = src->buffer_size;
855
856     GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
857     /* set buffer size, Note that on Linux this is typically limited to a
858      * maximum of around 100K. Also a minimum of 128 bytes is required on
859      * Linux. */
860     ret =
861         setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
862         len);
863     if (ret != 0) {
864       GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
865           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
866               rcvsize, ret, g_strerror (errno), errno));
867     }
868   }
869
870   /* read the value of the receive buffer. Note that on linux this returns 2x the
871    * value we set because the kernel allocates extra memory for metadata.
872    * The default on Linux is about 100K (which is about 50K without metadata) */
873   ret =
874       getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
875   if (ret == 0)
876     GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
877   else
878     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
879
880   bc_val = 1;
881   if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
882               sizeof (bc_val))) < 0) {
883     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
884         ("could not configure socket for broadcast %d: %s (%d)", ret,
885             g_strerror (errno), errno));
886   }
887
888   /* Accept ERRQUEUE to get and flush icmp errors */
889   err_val = 1;
890 #if defined (IP_RECVERR)
891   if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
892               sizeof (err_val))) < 0) {
893     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
894         ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
895             g_strerror (errno), errno));
896   }
897 #endif
898
899   if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
900     GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
901     ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
902     if (ret < 0)
903       goto membership;
904   }
905
906   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
907    * follows ss_family on both */
908   port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
909   GST_DEBUG_OBJECT (src, "bound, on port %d", port);
910   if (port != src->uri.port) {
911     src->uri.port = port;
912     GST_DEBUG_OBJECT (src, "notifying port %d", port);
913     g_object_notify (G_OBJECT (src), "port");
914   }
915
916   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
917     goto no_fdset;
918
919   gst_poll_add_fd (src->fdset, &src->sock);
920   gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
921
922   return TRUE;
923
924   /* ERRORS */
925 getaddrinfo_error:
926   {
927     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
928         ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
929     return FALSE;
930   }
931 no_socket:
932   {
933     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
934         ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
935     return FALSE;
936   }
937 setsockopt_error:
938   {
939     CLOSE_IF_REQUESTED (src);
940     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
941         ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
942     return FALSE;
943   }
944 bind_error:
945   {
946     CLOSE_IF_REQUESTED (src);
947     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
948         ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
949     return FALSE;
950   }
951 membership:
952   {
953     CLOSE_IF_REQUESTED (src);
954     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
955         ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
956     return FALSE;
957   }
958 getsockname_error:
959   {
960     CLOSE_IF_REQUESTED (src);
961     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
962         ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
963     return FALSE;
964   }
965 no_fdset:
966   {
967     CLOSE_IF_REQUESTED (src);
968     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
969         ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
970             errno));
971     return FALSE;
972   }
973 }
974
975 static gboolean
976 gst_udpsrc_unlock (GstBaseSrc * bsrc)
977 {
978   GstUDPSrc *src;
979
980   src = GST_UDPSRC (bsrc);
981
982   GST_LOG_OBJECT (src, "Flushing");
983   gst_poll_set_flushing (src->fdset, TRUE);
984
985   return TRUE;
986 }
987
988 static gboolean
989 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
990 {
991   GstUDPSrc *src;
992
993   src = GST_UDPSRC (bsrc);
994
995   GST_LOG_OBJECT (src, "No longer flushing");
996   gst_poll_set_flushing (src->fdset, FALSE);
997
998   return TRUE;
999 }
1000
1001 static gboolean
1002 gst_udpsrc_stop (GstBaseSrc * bsrc)
1003 {
1004   GstUDPSrc *src;
1005
1006   src = GST_UDPSRC (bsrc);
1007
1008   GST_DEBUG ("stopping, closing sockets");
1009
1010   if (src->sock.fd >= 0) {
1011     if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
1012       GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
1013       gst_udp_leave_group (src->sock.fd, &src->myaddr);
1014     }
1015     CLOSE_IF_REQUESTED (src);
1016   }
1017
1018   if (src->fdset) {
1019     gst_poll_free (src->fdset);
1020     src->fdset = NULL;
1021   }
1022
1023   return TRUE;
1024 }
1025
1026 /*** GSTURIHANDLER INTERFACE *************************************************/
1027
1028 static GstURIType
1029 gst_udpsrc_uri_get_type (void)
1030 {
1031   return GST_URI_SRC;
1032 }
1033
1034 static gchar **
1035 gst_udpsrc_uri_get_protocols (void)
1036 {
1037   static gchar *protocols[] = { (char *) "udp", NULL };
1038
1039   return protocols;
1040 }
1041
1042 static const gchar *
1043 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1044 {
1045   GstUDPSrc *src = GST_UDPSRC (handler);
1046
1047   g_free (src->uristr);
1048   src->uristr = gst_udp_uri_string (&src->uri);
1049
1050   return src->uristr;
1051 }
1052
1053 static gboolean
1054 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1055 {
1056   gboolean ret;
1057
1058   GstUDPSrc *src = GST_UDPSRC (handler);
1059
1060   ret = gst_udpsrc_set_uri (src, uri);
1061
1062   return ret;
1063 }
1064
1065 static void
1066 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1067 {
1068   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1069
1070   iface->get_type = gst_udpsrc_uri_get_type;
1071   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1072   iface->get_uri = gst_udpsrc_uri_get_uri;
1073   iface->set_uri = gst_udpsrc_uri_set_uri;
1074 }