use G_DEFINE_TYPE some more
[platform/upstream/gstreamer.git] / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-udpsrc
23  * @see_also: udpsink, multifdsink
24  *
25  * udpsrc is a network source that reads UDP packets from the network.
26  * It can be combined with RTP depayloaders to implement RTP streaming.
27  *
28  * The udpsrc element supports automatic port allocation by setting the
29  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
30  * allocated port can be obtained by reading the port property.
31  *
32  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
33  * property to the IP address of the multicast group.
34  *
35  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
36  * property, udpsrc will then not allocate a socket itself but use the provided
37  * one.
38  *
39  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
40  * so that they can be autoplugged in GStreamer pipelines. This is very usefull
41  * for RTP implementations where the contents of the UDP packets is transfered
42  * out-of-bounds using SDP or other means.
43  *
44  * The #GstUDPSrc:buffer-size property is used to change the default kernel
45  * buffersizes used for receiving packets. The buffer size may be increased for
46  * high-volume connections, or may be decreased to limit the possible backlog of
47  * incoming data. The system places an absolute limit on these values, on Linux,
48  * for example, the default buffer size is typically 50K and can be increased to
49  * maximally 100K.
50  *
51  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
52  * number of bytes from the start of the raw udp packet and can be used to strip
53  * off proprietary header, for example.
54  *
55  * The udpsrc is always a live source. It does however not provide a #GstClock,
56  * this is left for upstream elements such as an RTP session manager or demuxer
57  * (such as an MPEG demuxer). As with all live sources, the captured buffers
58  * will have their timestamp set to the current running time of the pipeline.
59  *
60  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
61  * type URIs.
62  *
63  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
64  * will generate an element message named
65  * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
66  * if no data was recieved in the given timeout.
67  * The message's structure contains one field:
68  * <itemizedlist>
69  * <listitem>
70  *   <para>
71  *   #guint64
72  *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
73  *   expired when waiting for data.
74  *   </para>
75  * </listitem>
76  * </itemizedlist>
77  * The message is typically used to detect that no UDP arrives in the receiver
78  * because it is blocked by a firewall.
79  * </para>
80  * <para>
81  * A custom file descriptor can be configured with the
82  * #GstUDPSrc:sockfd property. The socket will be closed when setting the
83  * element to READY by default. This behaviour can be
84  * overriden with the #GstUDPSrc:closefd property, in which case the application
85  * is responsible for closing the file descriptor.
86  *
87  * <refsect2>
88  * <title>Examples</title>
89  * |[
90  * gst-launch -v udpsrc ! fakesink dump=1
91  * ]| A pipeline to read from the default port and dump the udp packets.
92  * To actually generate udp packets on the default port one can use the
93  * udpsink element. When running the following pipeline in another terminal, the
94  * above mentioned pipeline should dump data packets to the console.
95  * |[
96  * gst-launch -v audiotestsrc ! udpsink
97  * ]|
98  * |[
99  * gst-launch -v udpsrc port=0 ! fakesink
100  * ]| read udp packets from a free port.
101  * </refsect2>
102  *
103  * Last reviewed on 2007-09-20 (0.10.7)
104  */
105 #ifdef HAVE_CONFIG_H
106 #include "config.h"
107 #endif
108
109 #include "gstudpsrc.h"
110 #ifdef HAVE_UNISTD_H
111 #include <unistd.h>
112 #endif
113 #include <stdlib.h>
114
115 #if defined _MSC_VER && (_MSC_VER >= 1400)
116 #include <io.h>
117 #endif
118
119 #include <gst/netbuffer/gstnetbuffer.h>
120
121 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
122 #include <sys/filio.h>
123 #endif
124
125 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
126 #define GST_CAT_DEFAULT (udpsrc_debug)
127
128 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
129     GST_PAD_SRC,
130     GST_PAD_ALWAYS,
131     GST_STATIC_CAPS_ANY);
132
133 #define UDP_DEFAULT_PORT                4951
134 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
135 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
136 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
137 #define UDP_DEFAULT_CAPS                NULL
138 #define UDP_DEFAULT_SOCKFD              -1
139 #define UDP_DEFAULT_BUFFER_SIZE         0
140 #define UDP_DEFAULT_TIMEOUT             0
141 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
142 #define UDP_DEFAULT_CLOSEFD            TRUE
143 #define UDP_DEFAULT_SOCK                -1
144 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
145 #define UDP_DEFAULT_REUSE              TRUE
146
147 enum
148 {
149   PROP_0,
150
151   PROP_PORT,
152   PROP_MULTICAST_GROUP,
153   PROP_MULTICAST_IFACE,
154   PROP_URI,
155   PROP_CAPS,
156   PROP_SOCKFD,
157   PROP_BUFFER_SIZE,
158   PROP_TIMEOUT,
159   PROP_SKIP_FIRST_BYTES,
160   PROP_CLOSEFD,
161   PROP_SOCK,
162   PROP_AUTO_MULTICAST,
163   PROP_REUSE,
164
165   PROP_LAST
166 };
167
168 #define CLOSE_IF_REQUESTED(udpctx)                                        \
169 G_STMT_START {                                                            \
170   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
171     CLOSE_SOCKET(udpctx->sock.fd);                                        \
172     if (udpctx->sock.fd == udpctx->sockfd)                                \
173       udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
174   }                                                                       \
175   udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
176 } G_STMT_END
177
178 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
179
180 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
181
182 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
183
184 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
185
186 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
187
188 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
189
190 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
191
192 static void gst_udpsrc_finalize (GObject * object);
193
194 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
195     const GValue * value, GParamSpec * pspec);
196 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
197     GValue * value, GParamSpec * pspec);
198
199 #define gst_udpsrc_parent_class parent_class
200 G_DEFINE_TYPE_WITH_CODE (GstUDPSrc, gst_udpsrc, GST_TYPE_PUSH_SRC,
201     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_udpsrc_uri_handler_init));
202
203 static void
204 gst_udpsrc_class_init (GstUDPSrcClass * klass)
205 {
206   GObjectClass *gobject_class;
207   GstElementClass *gstelement_class;
208   GstBaseSrcClass *gstbasesrc_class;
209   GstPushSrcClass *gstpushsrc_class;
210
211   gobject_class = (GObjectClass *) klass;
212   gstelement_class = (GstElementClass *) klass;
213   gstbasesrc_class = (GstBaseSrcClass *) klass;
214   gstpushsrc_class = (GstPushSrcClass *) klass;
215
216   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
217
218   gobject_class->set_property = gst_udpsrc_set_property;
219   gobject_class->get_property = gst_udpsrc_get_property;
220   gobject_class->finalize = gst_udpsrc_finalize;
221
222   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
223       g_param_spec_int ("port", "Port",
224           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
225           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
227       g_param_spec_string ("multicast-group", "Multicast Group",
228           "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
229           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
231       g_param_spec_string ("multicast-iface", "Multicast Interface",
232           "The network interface on which to join the multicast group",
233           UDP_DEFAULT_MULTICAST_IFACE,
234           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
235   g_object_class_install_property (gobject_class, PROP_URI,
236       g_param_spec_string ("uri", "URI",
237           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
238           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
239   g_object_class_install_property (gobject_class, PROP_CAPS,
240       g_param_spec_boxed ("caps", "Caps",
241           "The caps of the source pad", GST_TYPE_CAPS,
242           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
243   g_object_class_install_property (gobject_class, PROP_SOCKFD,
244       g_param_spec_int ("sockfd", "Socket Handle",
245           "Socket to use for UDP reception. (-1 == allocate)",
246           -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
247           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
248   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
249       g_param_spec_int ("buffer-size", "Buffer Size",
250           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
251           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
252   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
253       g_param_spec_uint64 ("timeout", "Timeout",
254           "Post a message after timeout microseconds (0 = disabled)", 0,
255           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
256           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
257   g_object_class_install_property (G_OBJECT_CLASS (klass),
258       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
259           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
260           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
261           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_CLOSEFD,
263       g_param_spec_boolean ("closefd", "Close sockfd",
264           "Close sockfd if passed as property on state change",
265           UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   g_object_class_install_property (gobject_class, PROP_SOCK,
267       g_param_spec_int ("sock", "Socket Handle",
268           "Socket currently in use for UDP reception. (-1 = no socket)",
269           -1, G_MAXINT, UDP_DEFAULT_SOCK,
270           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
271   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
272       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
273           "Automatically join/leave multicast groups",
274           UDP_DEFAULT_AUTO_MULTICAST,
275           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276   g_object_class_install_property (gobject_class, PROP_REUSE,
277       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
278           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
279
280   gst_element_class_add_pad_template (gstelement_class,
281       gst_static_pad_template_get (&src_template));
282
283   gst_element_class_set_details_simple (gstelement_class, "UDP packet receiver",
284       "Source/Network",
285       "Receive data over the network via UDP",
286       "Wim Taymans <wim@fluendo.com>, "
287       "Thijs Vermeir <thijs.vermeir@barco.com>");
288
289   gstbasesrc_class->start = gst_udpsrc_start;
290   gstbasesrc_class->stop = gst_udpsrc_stop;
291   gstbasesrc_class->unlock = gst_udpsrc_unlock;
292   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
293   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
294
295   gstpushsrc_class->create = gst_udpsrc_create;
296 }
297
298 static void
299 gst_udpsrc_init (GstUDPSrc * udpsrc)
300 {
301   WSA_STARTUP (udpsrc);
302
303   gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
304       UDP_DEFAULT_PORT);
305
306   udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
307   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
308   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
309   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
310   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
311   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
312   udpsrc->externalfd = (udpsrc->sockfd != -1);
313   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
314   udpsrc->sock.fd = UDP_DEFAULT_SOCK;
315   udpsrc->reuse = UDP_DEFAULT_REUSE;
316
317   /* configure basesrc to be a live source */
318   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
319   /* make basesrc output a segment in time */
320   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
321   /* make basesrc set timestamps on outgoing buffers based on the running_time
322    * when they were captured */
323   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
324 }
325
326 static void
327 gst_udpsrc_finalize (GObject * object)
328 {
329   GstUDPSrc *udpsrc;
330
331   udpsrc = GST_UDPSRC (object);
332
333   if (udpsrc->caps)
334     gst_caps_unref (udpsrc->caps);
335
336   g_free (udpsrc->multi_iface);
337
338   gst_udp_uri_free (&udpsrc->uri);
339   g_free (udpsrc->uristr);
340
341   if (udpsrc->sockfd >= 0 && udpsrc->closefd)
342     CLOSE_SOCKET (udpsrc->sockfd);
343
344   WSA_CLEANUP (object);
345
346   G_OBJECT_CLASS (parent_class)->finalize (object);
347 }
348
349 static GstCaps *
350 gst_udpsrc_getcaps (GstBaseSrc * src)
351 {
352   GstUDPSrc *udpsrc;
353
354   udpsrc = GST_UDPSRC (src);
355
356   if (udpsrc->caps)
357     return gst_caps_ref (udpsrc->caps);
358   else
359     return gst_caps_new_any ();
360 }
361
362 /* read a message from the error queue */
363 static void
364 clear_error (GstUDPSrc * udpsrc)
365 {
366 #if defined (MSG_ERRQUEUE)
367   struct msghdr cmsg;
368   char cbuf[128];
369   char msgbuf[CMSG_SPACE (128)];
370   struct iovec iov;
371
372   /* Flush ERRORS from fd so next poll will not return at once */
373   /* No need for address : We look for local error */
374   cmsg.msg_name = NULL;
375   cmsg.msg_namelen = 0;
376
377   /* IOV */
378   memset (&cbuf, 0, sizeof (cbuf));
379   iov.iov_base = cbuf;
380   iov.iov_len = sizeof (cbuf);
381   cmsg.msg_iov = &iov;
382   cmsg.msg_iovlen = 1;
383
384   /* msg_control */
385   memset (&msgbuf, 0, sizeof (msgbuf));
386   cmsg.msg_control = &msgbuf;
387   cmsg.msg_controllen = sizeof (msgbuf);
388
389   recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
390 #endif
391 }
392
393 static GstFlowReturn
394 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
395 {
396   GstUDPSrc *udpsrc;
397   GstMetaNetAddress *meta;
398   GstBuffer *outbuf;
399   union gst_sockaddr
400   {
401     struct sockaddr sa;
402     struct sockaddr_in sa_in;
403     struct sockaddr_in6 sa_in6;
404     struct sockaddr_storage sa_stor;
405   } sa;
406   socklen_t slen;
407   guint8 *pktdata;
408   gint pktsize;
409   gsize offset;
410 #ifdef G_OS_UNIX
411   gint readsize;
412 #elif defined G_OS_WIN32
413   gulong readsize;
414 #endif
415   GstClockTime timeout;
416   gint ret;
417   gboolean try_again;
418
419   udpsrc = GST_UDPSRC_CAST (psrc);
420
421 retry:
422   /* quick check, avoid going in select when we already have data */
423   readsize = 0;
424   if (G_UNLIKELY ((ret =
425               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
426     goto ioctl_failed;
427
428   if (readsize > 0)
429     goto no_select;
430
431   if (udpsrc->timeout > 0) {
432     timeout = udpsrc->timeout * GST_USECOND;
433   } else {
434     timeout = GST_CLOCK_TIME_NONE;
435   }
436
437   do {
438     try_again = FALSE;
439
440     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
441         udpsrc->timeout);
442
443     ret = gst_poll_wait (udpsrc->fdset, timeout);
444     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
445     if (G_UNLIKELY (ret < 0)) {
446       if (errno == EBUSY)
447         goto stopped;
448 #ifdef G_OS_WIN32
449       if (WSAGetLastError () != WSAEINTR)
450         goto select_error;
451 #else
452       if (errno != EAGAIN && errno != EINTR)
453         goto select_error;
454 #endif
455       try_again = TRUE;
456     } else if (G_UNLIKELY (ret == 0)) {
457       /* timeout, post element message */
458       gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
459           gst_message_new_element (GST_OBJECT_CAST (udpsrc),
460               gst_structure_new ("GstUDPSrcTimeout",
461                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
462       try_again = TRUE;
463     }
464   } while (G_UNLIKELY (try_again));
465
466   /* ask how much is available for reading on the socket, this should be exactly
467    * one UDP packet. We will check the return value, though, because in some
468    * case it can return 0 and we don't want a 0 sized buffer. */
469   readsize = 0;
470   if (G_UNLIKELY ((ret =
471               IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
472     goto ioctl_failed;
473
474   /* if we get here and there is nothing to read from the socket, the select got
475    * woken up by activity on the socket but it was not a read. We know someone
476    * will also do something with the socket so that we don't go into an infinite
477    * loop in the select(). */
478   if (G_UNLIKELY (!readsize)) {
479     clear_error (udpsrc);
480     goto retry;
481   }
482
483 no_select:
484   GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
485
486   pktdata = g_malloc (readsize);
487   pktsize = readsize;
488   offset = 0;
489
490   while (TRUE) {
491     slen = sizeof (sa);
492 #ifdef G_OS_WIN32
493     ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
494         &slen);
495 #else
496     ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
497 #endif
498     if (G_UNLIKELY (ret < 0)) {
499 #ifdef G_OS_WIN32
500       /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
501        * generated a "port unreachable" ICMP response. We ignore that and try
502        * again. */
503       if (WSAGetLastError () == WSAECONNRESET) {
504         g_free (pktdata);
505         pktdata = NULL;
506         goto retry;
507       }
508       if (WSAGetLastError () != WSAEINTR)
509         goto receive_error;
510 #else
511       if (errno != EAGAIN && errno != EINTR)
512         goto receive_error;
513 #endif
514     } else
515       break;
516   }
517
518   /* patch pktdata and len when stripping off the headers */
519   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
520     if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
521       goto skip_error;
522
523     offset += udpsrc->skip_first_bytes;
524     ret -= udpsrc->skip_first_bytes;
525   }
526
527   outbuf = gst_buffer_new ();
528   gst_buffer_take_memory (outbuf,
529       gst_memory_new_wrapped (0, pktdata, g_free, pktsize, offset, ret));
530
531   /* use buffer metadata so receivers can also track the address */
532   meta = gst_buffer_add_meta_net_address (outbuf);
533
534   switch (sa.sa.sa_family) {
535     case AF_INET:
536     {
537       gst_netaddress_set_ip4_address (&meta->naddr, sa.sa_in.sin_addr.s_addr,
538           sa.sa_in.sin_port);
539     }
540       break;
541     case AF_INET6:
542     {
543       guint8 ip6[16];
544
545       memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
546       gst_netaddress_set_ip6_address (&meta->naddr, ip6, sa.sa_in6.sin6_port);
547     }
548       break;
549     default:
550 #ifdef G_OS_WIN32
551       WSASetLastError (WSAEAFNOSUPPORT);
552 #else
553       errno = EAFNOSUPPORT;
554 #endif
555       goto receive_error;
556   }
557   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
558
559   *buf = GST_BUFFER_CAST (outbuf);
560
561   return GST_FLOW_OK;
562
563   /* ERRORS */
564 select_error:
565   {
566     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
567         ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
568     return GST_FLOW_ERROR;
569   }
570 stopped:
571   {
572     GST_DEBUG ("stop called");
573     return GST_FLOW_WRONG_STATE;
574   }
575 ioctl_failed:
576   {
577     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
578         ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
579     return GST_FLOW_ERROR;
580   }
581 receive_error:
582   {
583     g_free (pktdata);
584 #ifdef G_OS_WIN32
585     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
586         ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
587 #else
588     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
589         ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
590 #endif
591     return GST_FLOW_ERROR;
592   }
593 skip_error:
594   {
595     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
596         ("UDP buffer to small to skip header"));
597     return GST_FLOW_ERROR;
598   }
599 }
600
601 static gboolean
602 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
603 {
604   if (gst_udp_parse_uri (uri, &src->uri) < 0)
605     goto wrong_uri;
606
607   if (src->uri.port == -1)
608     src->uri.port = UDP_DEFAULT_PORT;
609
610   return TRUE;
611
612   /* ERRORS */
613 wrong_uri:
614   {
615     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
616         ("error parsing uri %s", uri));
617     return FALSE;
618   }
619 }
620
621 static void
622 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
623     GParamSpec * pspec)
624 {
625   GstUDPSrc *udpsrc = GST_UDPSRC (object);
626
627   switch (prop_id) {
628     case PROP_BUFFER_SIZE:
629       udpsrc->buffer_size = g_value_get_int (value);
630       break;
631     case PROP_PORT:
632       gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
633       break;
634     case PROP_MULTICAST_GROUP:
635     {
636       const gchar *group;
637
638       if ((group = g_value_get_string (value)))
639         gst_udp_uri_update (&udpsrc->uri, group, -1);
640       else
641         gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
642       break;
643     }
644     case PROP_MULTICAST_IFACE:
645       g_free (udpsrc->multi_iface);
646
647       if (g_value_get_string (value) == NULL)
648         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
649       else
650         udpsrc->multi_iface = g_value_dup_string (value);
651       break;
652     case PROP_URI:
653       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
654       break;
655     case PROP_CAPS:
656     {
657       const GstCaps *new_caps_val = gst_value_get_caps (value);
658
659       GstCaps *new_caps;
660
661       GstCaps *old_caps;
662
663       if (new_caps_val == NULL) {
664         new_caps = gst_caps_new_any ();
665       } else {
666         new_caps = gst_caps_copy (new_caps_val);
667       }
668
669       old_caps = udpsrc->caps;
670       udpsrc->caps = new_caps;
671       if (old_caps)
672         gst_caps_unref (old_caps);
673       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
674       break;
675     }
676     case PROP_SOCKFD:
677       if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
678           udpsrc->closefd)
679         CLOSE_SOCKET (udpsrc->sockfd);
680       udpsrc->sockfd = g_value_get_int (value);
681       GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
682       break;
683     case PROP_TIMEOUT:
684       udpsrc->timeout = g_value_get_uint64 (value);
685       break;
686     case PROP_SKIP_FIRST_BYTES:
687       udpsrc->skip_first_bytes = g_value_get_int (value);
688       break;
689     case PROP_CLOSEFD:
690       udpsrc->closefd = g_value_get_boolean (value);
691       break;
692     case PROP_AUTO_MULTICAST:
693       udpsrc->auto_multicast = g_value_get_boolean (value);
694       break;
695     case PROP_REUSE:
696       udpsrc->reuse = g_value_get_boolean (value);
697       break;
698     default:
699       break;
700   }
701 }
702
703 static void
704 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
705     GParamSpec * pspec)
706 {
707   GstUDPSrc *udpsrc = GST_UDPSRC (object);
708
709   switch (prop_id) {
710     case PROP_BUFFER_SIZE:
711       g_value_set_int (value, udpsrc->buffer_size);
712       break;
713     case PROP_PORT:
714       g_value_set_int (value, udpsrc->uri.port);
715       break;
716     case PROP_MULTICAST_GROUP:
717       g_value_set_string (value, udpsrc->uri.host);
718       break;
719     case PROP_MULTICAST_IFACE:
720       g_value_set_string (value, udpsrc->multi_iface);
721       break;
722     case PROP_URI:
723       g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
724       break;
725     case PROP_CAPS:
726       gst_value_set_caps (value, udpsrc->caps);
727       break;
728     case PROP_SOCKFD:
729       g_value_set_int (value, udpsrc->sockfd);
730       break;
731     case PROP_TIMEOUT:
732       g_value_set_uint64 (value, udpsrc->timeout);
733       break;
734     case PROP_SKIP_FIRST_BYTES:
735       g_value_set_int (value, udpsrc->skip_first_bytes);
736       break;
737     case PROP_CLOSEFD:
738       g_value_set_boolean (value, udpsrc->closefd);
739       break;
740     case PROP_SOCK:
741       g_value_set_int (value, udpsrc->sock.fd);
742       break;
743     case PROP_AUTO_MULTICAST:
744       g_value_set_boolean (value, udpsrc->auto_multicast);
745       break;
746     case PROP_REUSE:
747       g_value_set_boolean (value, udpsrc->reuse);
748       break;
749     default:
750       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
751       break;
752   }
753 }
754
755 /* create a socket for sending to remote machine */
756 static gboolean
757 gst_udpsrc_start (GstBaseSrc * bsrc)
758 {
759   guint bc_val;
760   guint err_val;
761   gint reuse;
762   int port;
763   GstUDPSrc *src;
764   gint ret;
765   int rcvsize;
766   struct sockaddr_storage bind_address;
767   socklen_t len;
768   src = GST_UDPSRC (bsrc);
769
770   if (src->sockfd == -1) {
771     /* need to allocate a socket */
772     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
773         src->uri.port);
774     if ((ret =
775             gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
776       goto getaddrinfo_error;
777
778     if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
779       goto no_socket;
780
781     src->sock.fd = ret;
782     src->externalfd = FALSE;
783
784     GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
785
786     GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
787     reuse = src->reuse ? 1 : 0;
788     if ((ret =
789             setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
790                 sizeof (reuse))) < 0)
791       goto setsockopt_error;
792
793     GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
794
795     /* Take a temporary copy of the address in case we need to fix it for bind */
796     memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
797
798 #ifdef G_OS_WIN32
799     /* Windows does not allow binding to a multicast group so fix source address */
800     if (gst_udp_is_multicast (&src->myaddr)) {
801       switch (((struct sockaddr *) &bind_address)->sa_family) {
802         case AF_INET:
803           ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
804               htonl (INADDR_ANY);
805           break;
806         case AF_INET6:
807           ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
808           break;
809         default:
810           break;
811       }
812     }
813 #endif
814
815     len = gst_udp_get_sockaddr_length (&bind_address);
816     if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
817       goto bind_error;
818
819     if (!gst_udp_is_multicast (&src->myaddr)) {
820       len = sizeof (src->myaddr);
821       if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
822                   &len)) < 0)
823         goto getsockname_error;
824     }
825   } else {
826     GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
827     /* we use the configured socket, try to get some info about it */
828     len = sizeof (src->myaddr);
829     if ((ret =
830             getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
831                 &len)) < 0)
832       goto getsockname_error;
833
834     src->sock.fd = src->sockfd;
835     src->externalfd = TRUE;
836   }
837
838   len = sizeof (rcvsize);
839   if (src->buffer_size != 0) {
840     rcvsize = src->buffer_size;
841
842     GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
843     /* set buffer size, Note that on Linux this is typically limited to a
844      * maximum of around 100K. Also a minimum of 128 bytes is required on
845      * Linux. */
846     ret =
847         setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
848         len);
849     if (ret != 0) {
850       GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
851           ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
852               rcvsize, ret, g_strerror (errno), errno));
853     }
854   }
855
856   /* read the value of the receive buffer. Note that on linux this returns 2x the
857    * value we set because the kernel allocates extra memory for metadata.
858    * The default on Linux is about 100K (which is about 50K without metadata) */
859   ret =
860       getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
861   if (ret == 0)
862     GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
863   else
864     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
865
866   bc_val = 1;
867   if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
868               sizeof (bc_val))) < 0) {
869     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
870         ("could not configure socket for broadcast %d: %s (%d)", ret,
871             g_strerror (errno), errno));
872   }
873
874   /* Accept ERRQUEUE to get and flush icmp errors */
875   err_val = 1;
876 #if defined (IP_RECVERR)
877   if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
878               sizeof (err_val))) < 0) {
879     GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
880         ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
881             g_strerror (errno), errno));
882   }
883 #endif
884
885   if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
886     GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
887     ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
888     if (ret < 0)
889       goto membership;
890   }
891
892   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
893    * follows ss_family on both */
894   port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
895   GST_DEBUG_OBJECT (src, "bound, on port %d", port);
896   if (port != src->uri.port) {
897     src->uri.port = port;
898     GST_DEBUG_OBJECT (src, "notifying port %d", port);
899     g_object_notify (G_OBJECT (src), "port");
900   }
901
902   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
903     goto no_fdset;
904
905   gst_poll_add_fd (src->fdset, &src->sock);
906   gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
907
908   return TRUE;
909
910   /* ERRORS */
911 getaddrinfo_error:
912   {
913     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
914         ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
915     return FALSE;
916   }
917 no_socket:
918   {
919     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
920         ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
921     return FALSE;
922   }
923 setsockopt_error:
924   {
925     CLOSE_IF_REQUESTED (src);
926     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
927         ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
928     return FALSE;
929   }
930 bind_error:
931   {
932     CLOSE_IF_REQUESTED (src);
933     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
934         ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
935     return FALSE;
936   }
937 membership:
938   {
939     CLOSE_IF_REQUESTED (src);
940     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
941         ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
942     return FALSE;
943   }
944 getsockname_error:
945   {
946     CLOSE_IF_REQUESTED (src);
947     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
948         ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
949     return FALSE;
950   }
951 no_fdset:
952   {
953     CLOSE_IF_REQUESTED (src);
954     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
955         ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
956             errno));
957     return FALSE;
958   }
959 }
960
961 static gboolean
962 gst_udpsrc_unlock (GstBaseSrc * bsrc)
963 {
964   GstUDPSrc *src;
965
966   src = GST_UDPSRC (bsrc);
967
968   GST_LOG_OBJECT (src, "Flushing");
969   gst_poll_set_flushing (src->fdset, TRUE);
970
971   return TRUE;
972 }
973
974 static gboolean
975 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
976 {
977   GstUDPSrc *src;
978
979   src = GST_UDPSRC (bsrc);
980
981   GST_LOG_OBJECT (src, "No longer flushing");
982   gst_poll_set_flushing (src->fdset, FALSE);
983
984   return TRUE;
985 }
986
987 static gboolean
988 gst_udpsrc_stop (GstBaseSrc * bsrc)
989 {
990   GstUDPSrc *src;
991
992   src = GST_UDPSRC (bsrc);
993
994   GST_DEBUG ("stopping, closing sockets");
995
996   if (src->sock.fd >= 0) {
997     if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
998       GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
999       gst_udp_leave_group (src->sock.fd, &src->myaddr);
1000     }
1001     CLOSE_IF_REQUESTED (src);
1002   }
1003
1004   if (src->fdset) {
1005     gst_poll_free (src->fdset);
1006     src->fdset = NULL;
1007   }
1008
1009   return TRUE;
1010 }
1011
1012 /*** GSTURIHANDLER INTERFACE *************************************************/
1013
1014 static GstURIType
1015 gst_udpsrc_uri_get_type (void)
1016 {
1017   return GST_URI_SRC;
1018 }
1019
1020 static gchar **
1021 gst_udpsrc_uri_get_protocols (void)
1022 {
1023   static gchar *protocols[] = { (char *) "udp", NULL };
1024
1025   return protocols;
1026 }
1027
1028 static const gchar *
1029 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1030 {
1031   GstUDPSrc *src = GST_UDPSRC (handler);
1032
1033   g_free (src->uristr);
1034   src->uristr = gst_udp_uri_string (&src->uri);
1035
1036   return src->uristr;
1037 }
1038
1039 static gboolean
1040 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
1041 {
1042   gboolean ret;
1043
1044   GstUDPSrc *src = GST_UDPSRC (handler);
1045
1046   ret = gst_udpsrc_set_uri (src, uri);
1047
1048   return ret;
1049 }
1050
1051 static void
1052 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1053 {
1054   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1055
1056   iface->get_type = gst_udpsrc_uri_get_type;
1057   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1058   iface->get_uri = gst_udpsrc_uri_get_uri;
1059   iface->set_uri = gst_udpsrc_uri_set_uri;
1060 }