bd7082d4703bcc7810446f78a314b613ec13d65d
[platform/upstream/gstreamer.git] / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  * Copyright (C) <2012> Collabora Ltd.
5  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-udpsrc
25  * @see_also: udpsink, multifdsink
26  *
27  * udpsrc is a network source that reads UDP packets from the network.
28  * It can be combined with RTP depayloaders to implement RTP streaming.
29  *
30  * The udpsrc element supports automatic port allocation by setting the
31  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
32  * allocated port can be obtained by reading the port property.
33  *
34  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
35  * property to the IP address of the multicast group.
36  *
37  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
38  * property, udpsrc will then not allocate a socket itself but use the provided
39  * one.
40  *
41  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
42  * so that they can be autoplugged in GStreamer pipelines. This is very usefull
43  * for RTP implementations where the contents of the UDP packets is transfered
44  * out-of-bounds using SDP or other means.
45  *
46  * The #GstUDPSrc:buffer-size property is used to change the default kernel
47  * buffersizes used for receiving packets. The buffer size may be increased for
48  * high-volume connections, or may be decreased to limit the possible backlog of
49  * incoming data. The system places an absolute limit on these values, on Linux,
50  * for example, the default buffer size is typically 50K and can be increased to
51  * maximally 100K.
52  *
53  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
54  * number of bytes from the start of the raw udp packet and can be used to strip
55  * off proprietary header, for example.
56  *
57  * The udpsrc is always a live source. It does however not provide a #GstClock,
58  * this is left for upstream elements such as an RTP session manager or demuxer
59  * (such as an MPEG demuxer). As with all live sources, the captured buffers
60  * will have their timestamp set to the current running time of the pipeline.
61  *
62  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
63  * type URIs.
64  *
65  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
66  * will generate an element message named
67  * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
68  * if no data was recieved in the given timeout.
69  * The message's structure contains one field:
70  * <itemizedlist>
71  * <listitem>
72  *   <para>
73  *   #guint64
74  *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
75  *   expired when waiting for data.
76  *   </para>
77  * </listitem>
78  * </itemizedlist>
79  * The message is typically used to detect that no UDP arrives in the receiver
80  * because it is blocked by a firewall.
81  * </para>
82  * <para>
83  * A custom file descriptor can be configured with the
84  * #GstUDPSrc:sockfd property. The socket will be closed when setting the
85  * element to READY by default. This behaviour can be
86  * overriden with the #GstUDPSrc:closefd property, in which case the application
87  * is responsible for closing the file descriptor.
88  *
89  * <refsect2>
90  * <title>Examples</title>
91  * |[
92  * gst-launch-1.0 -v udpsrc ! fakesink dump=1
93  * ]| A pipeline to read from the default port and dump the udp packets.
94  * To actually generate udp packets on the default port one can use the
95  * udpsink element. When running the following pipeline in another terminal, the
96  * above mentioned pipeline should dump data packets to the console.
97  * |[
98  * gst-launch-1.0 -v audiotestsrc ! udpsink
99  * ]|
100  * |[
101  * gst-launch-1.0 -v udpsrc port=0 ! fakesink
102  * ]| read udp packets from a free port.
103  * </refsect2>
104  *
105  * Last reviewed on 2007-09-20 (0.10.7)
106  */
107 #ifdef HAVE_CONFIG_H
108 #include "config.h"
109 #endif
110
111 #include "gstudpsrc.h"
112
113 #include <gst/net/gstnetaddressmeta.h>
114
115 #ifdef HAVE_SYS_SOCKET_H
116 #include <sys/socket.h>
117 #endif
118
119 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
120 #define GST_CAT_DEFAULT (udpsrc_debug)
121
122 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
123     GST_PAD_SRC,
124     GST_PAD_ALWAYS,
125     GST_STATIC_CAPS_ANY);
126
127 #define UDP_DEFAULT_PORT                5004
128 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
129 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
130 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
131 #define UDP_DEFAULT_CAPS                NULL
132 #define UDP_DEFAULT_SOCKET              NULL
133 #define UDP_DEFAULT_BUFFER_SIZE         0
134 #define UDP_DEFAULT_TIMEOUT             0
135 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
136 #define UDP_DEFAULT_CLOSE_SOCKET       TRUE
137 #define UDP_DEFAULT_USED_SOCKET        NULL
138 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
139 #define UDP_DEFAULT_REUSE              TRUE
140
141 enum
142 {
143   PROP_0,
144
145   PROP_PORT,
146   PROP_MULTICAST_GROUP,
147   PROP_MULTICAST_IFACE,
148   PROP_URI,
149   PROP_CAPS,
150   PROP_SOCKET,
151   PROP_BUFFER_SIZE,
152   PROP_TIMEOUT,
153   PROP_SKIP_FIRST_BYTES,
154   PROP_CLOSE_SOCKET,
155   PROP_USED_SOCKET,
156   PROP_AUTO_MULTICAST,
157   PROP_REUSE,
158
159   PROP_LAST
160 };
161
162 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
163
164 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
165
166 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
167
168 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
169
170 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
171
172 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
173
174 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
175
176 static void gst_udpsrc_finalize (GObject * object);
177
178 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
179     const GValue * value, GParamSpec * pspec);
180 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
181     GValue * value, GParamSpec * pspec);
182
183 #define gst_udpsrc_parent_class parent_class
184 G_DEFINE_TYPE_WITH_CODE (GstUDPSrc, gst_udpsrc, GST_TYPE_PUSH_SRC,
185     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_udpsrc_uri_handler_init));
186
187 static void
188 gst_udpsrc_class_init (GstUDPSrcClass * klass)
189 {
190   GObjectClass *gobject_class;
191   GstElementClass *gstelement_class;
192   GstBaseSrcClass *gstbasesrc_class;
193   GstPushSrcClass *gstpushsrc_class;
194
195   gobject_class = (GObjectClass *) klass;
196   gstelement_class = (GstElementClass *) klass;
197   gstbasesrc_class = (GstBaseSrcClass *) klass;
198   gstpushsrc_class = (GstPushSrcClass *) klass;
199
200   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
201
202   gobject_class->set_property = gst_udpsrc_set_property;
203   gobject_class->get_property = gst_udpsrc_get_property;
204   gobject_class->finalize = gst_udpsrc_finalize;
205
206   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
207       g_param_spec_int ("port", "Port",
208           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
209           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
211       g_param_spec_string ("multicast-group", "Multicast Group",
212           "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
213           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
215       g_param_spec_string ("multicast-iface", "Multicast Interface",
216           "The network interface on which to join the multicast group",
217           UDP_DEFAULT_MULTICAST_IFACE,
218           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219   g_object_class_install_property (gobject_class, PROP_URI,
220       g_param_spec_string ("uri", "URI",
221           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
222           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
223   g_object_class_install_property (gobject_class, PROP_CAPS,
224       g_param_spec_boxed ("caps", "Caps",
225           "The caps of the source pad", GST_TYPE_CAPS,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227   g_object_class_install_property (gobject_class, PROP_SOCKET,
228       g_param_spec_object ("socket", "Socket",
229           "Socket to use for UDP reception. (NULL == allocate)",
230           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
232       g_param_spec_int ("buffer-size", "Buffer Size",
233           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
234           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
235   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
236       g_param_spec_uint64 ("timeout", "Timeout",
237           "Post a message after timeout nanoseconds (0 = disabled)", 0,
238           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
239           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240   g_object_class_install_property (G_OBJECT_CLASS (klass),
241       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
242           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
243           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
244           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
245   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
246       g_param_spec_boolean ("close-socket", "Close socket",
247           "Close socket if passed as property on state change",
248           UDP_DEFAULT_CLOSE_SOCKET,
249           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250   g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
251       g_param_spec_object ("used-socket", "Socket Handle",
252           "Socket currently in use for UDP reception. (NULL = no socket)",
253           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
254   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
255       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
256           "Automatically join/leave multicast groups",
257           UDP_DEFAULT_AUTO_MULTICAST,
258           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259   g_object_class_install_property (gobject_class, PROP_REUSE,
260       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
261           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262
263   gst_element_class_add_pad_template (gstelement_class,
264       gst_static_pad_template_get (&src_template));
265
266   gst_element_class_set_static_metadata (gstelement_class,
267       "UDP packet receiver", "Source/Network",
268       "Receive data over the network via UDP",
269       "Wim Taymans <wim@fluendo.com>, "
270       "Thijs Vermeir <thijs.vermeir@barco.com>");
271
272   gstbasesrc_class->start = gst_udpsrc_start;
273   gstbasesrc_class->stop = gst_udpsrc_stop;
274   gstbasesrc_class->unlock = gst_udpsrc_unlock;
275   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
276   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
277
278   gstpushsrc_class->create = gst_udpsrc_create;
279 }
280
281 static void
282 gst_udpsrc_init (GstUDPSrc * udpsrc)
283 {
284   udpsrc->uri =
285       g_strdup_printf ("udp://%s:%u", UDP_DEFAULT_MULTICAST_GROUP,
286       UDP_DEFAULT_PORT);
287
288   udpsrc->host = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
289   udpsrc->port = UDP_DEFAULT_PORT;
290   udpsrc->socket = UDP_DEFAULT_SOCKET;
291   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
292   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
293   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
294   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
295   udpsrc->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
296   udpsrc->external_socket = (udpsrc->socket != NULL);
297   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
298   udpsrc->used_socket = UDP_DEFAULT_USED_SOCKET;
299   udpsrc->reuse = UDP_DEFAULT_REUSE;
300
301   udpsrc->cancellable = g_cancellable_new ();
302
303   /* configure basesrc to be a live source */
304   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
305   /* make basesrc output a segment in time */
306   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
307   /* make basesrc set timestamps on outgoing buffers based on the running_time
308    * when they were captured */
309   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
310 }
311
312 static void
313 gst_udpsrc_finalize (GObject * object)
314 {
315   GstUDPSrc *udpsrc;
316
317   udpsrc = GST_UDPSRC (object);
318
319   if (udpsrc->caps)
320     gst_caps_unref (udpsrc->caps);
321   udpsrc->caps = NULL;
322
323   g_free (udpsrc->multi_iface);
324   udpsrc->multi_iface = NULL;
325
326   g_free (udpsrc->uri);
327   udpsrc->uri = NULL;
328
329   g_free (udpsrc->host);
330   udpsrc->host = NULL;
331
332   if (udpsrc->socket)
333     g_object_unref (udpsrc->socket);
334   udpsrc->socket = NULL;
335
336   if (udpsrc->used_socket)
337     g_object_unref (udpsrc->used_socket);
338   udpsrc->used_socket = NULL;
339
340   if (udpsrc->cancellable)
341     g_object_unref (udpsrc->cancellable);
342   udpsrc->cancellable = NULL;
343
344   G_OBJECT_CLASS (parent_class)->finalize (object);
345 }
346
347 static GstCaps *
348 gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
349 {
350   GstUDPSrc *udpsrc;
351
352   udpsrc = GST_UDPSRC (src);
353
354   if (udpsrc->caps) {
355     return (filter) ? gst_caps_intersect_full (filter, udpsrc->caps,
356         GST_CAPS_INTERSECT_FIRST) : gst_caps_ref (udpsrc->caps);
357   } else {
358     return (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
359   }
360 }
361
362 static GstFlowReturn
363 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
364 {
365   GstFlowReturn ret;
366   GstUDPSrc *udpsrc;
367   GstBuffer *outbuf;
368   GstMapInfo info;
369   GSocketAddress *saddr = NULL;
370   gsize offset;
371   gssize readsize;
372   gssize res;
373   gboolean try_again;
374   GError *err = NULL;
375
376   udpsrc = GST_UDPSRC_CAST (psrc);
377
378 retry:
379   /* quick check, avoid going in select when we already have data */
380   readsize = g_socket_get_available_bytes (udpsrc->used_socket);
381   if (readsize > 0)
382     goto no_select;
383
384   do {
385     gint64 timeout;
386
387     try_again = FALSE;
388
389     if (udpsrc->timeout)
390       timeout = udpsrc->timeout / 1000;
391     else
392       timeout = -1;
393
394     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
395         timeout);
396
397     if (!g_socket_condition_timed_wait (udpsrc->used_socket, G_IO_IN | G_IO_PRI,
398             timeout, udpsrc->cancellable, &err)) {
399       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)
400           || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
401         goto stopped;
402       } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
403         g_clear_error (&err);
404         /* timeout, post element message */
405         gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
406             gst_message_new_element (GST_OBJECT_CAST (udpsrc),
407                 gst_structure_new ("GstUDPSrcTimeout",
408                     "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
409       } else {
410         goto select_error;
411       }
412
413       try_again = TRUE;
414     }
415   } while (G_UNLIKELY (try_again));
416
417   /* ask how much is available for reading on the socket, this should be exactly
418    * one UDP packet. We will check the return value, though, because in some
419    * case it can return 0 and we don't want a 0 sized buffer. */
420   readsize = g_socket_get_available_bytes (udpsrc->used_socket);
421   if (G_UNLIKELY (readsize < 0))
422     goto get_available_error;
423
424   /* If we get here and the readsize is zero, then either select was woken up
425    * by activity that is not a read, or a poll error occurred, or a UDP packet
426    * was received that has no data. Since we cannot identify which case it is,
427    * we handle all of them. This could possibly lead to a UDP packet getting
428    * lost, but since UDP is not reliable, we can accept this. */
429   if (G_UNLIKELY (!readsize)) {
430     /* try to read a packet (and it will be ignored),
431      * in case a packet with no data arrived */
432     res =
433         g_socket_receive_from (udpsrc->used_socket, NULL, NULL,
434         0, udpsrc->cancellable, &err);
435     if (G_UNLIKELY (res < 0))
436       goto receive_error;
437
438     /* poll again */
439     goto retry;
440   }
441
442 no_select:
443   GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
444
445   ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC_CAST (udpsrc),
446       -1, readsize, &outbuf);
447   if (ret != GST_FLOW_OK)
448     goto alloc_failed;
449
450   gst_buffer_map (outbuf, &info, GST_MAP_WRITE);
451   offset = 0;
452
453   if (saddr)
454     g_object_unref (saddr);
455   saddr = NULL;
456
457   res =
458       g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) info.data,
459       info.size, udpsrc->cancellable, &err);
460
461   if (G_UNLIKELY (res < 0)) {
462     /* EHOSTUNREACH for a UDP socket means that a packet sent with udpsink
463      * generated a "port unreachable" ICMP response. We ignore that and try
464      * again. */
465     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE)) {
466       gst_buffer_unmap (outbuf, &info);
467       gst_buffer_unref (outbuf);
468       outbuf = NULL;
469       g_clear_error (&err);
470       goto retry;
471     }
472     goto receive_error;
473   }
474
475   /* patch offset and size when stripping off the headers */
476   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
477     if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes))
478       goto skip_error;
479
480     offset += udpsrc->skip_first_bytes;
481     res -= udpsrc->skip_first_bytes;
482   }
483
484   gst_buffer_unmap (outbuf, &info);
485   gst_buffer_resize (outbuf, offset, res);
486
487   /* use buffer metadata so receivers can also track the address */
488   if (saddr) {
489     gst_buffer_add_net_address_meta (outbuf, saddr);
490     g_object_unref (saddr);
491   }
492   saddr = NULL;
493
494   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
495
496   *buf = GST_BUFFER_CAST (outbuf);
497
498   return ret;
499
500   /* ERRORS */
501 select_error:
502   {
503     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
504         ("select error: %s", err->message));
505     g_clear_error (&err);
506     return GST_FLOW_ERROR;
507   }
508 stopped:
509   {
510     GST_DEBUG ("stop called");
511     g_clear_error (&err);
512     return GST_FLOW_FLUSHING;
513   }
514 get_available_error:
515   {
516     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
517         ("get available bytes failed"));
518     return GST_FLOW_ERROR;
519   }
520 alloc_failed:
521   {
522     GST_DEBUG ("Allocation failed");
523     return ret;
524   }
525 receive_error:
526   {
527     gst_buffer_unmap (outbuf, &info);
528     gst_buffer_unref (outbuf);
529
530     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
531         g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
532       g_clear_error (&err);
533       return GST_FLOW_FLUSHING;
534     } else {
535       GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
536           ("receive error %" G_GSSIZE_FORMAT ": %s", res, err->message));
537       g_clear_error (&err);
538       return GST_FLOW_ERROR;
539     }
540   }
541 skip_error:
542   {
543     gst_buffer_unmap (outbuf, &info);
544     gst_buffer_unref (outbuf);
545
546     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
547         ("UDP buffer to small to skip header"));
548     return GST_FLOW_ERROR;
549   }
550 }
551
552 static gboolean
553 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
554 {
555   gchar *host;
556   guint16 port;
557
558   if (!gst_udp_parse_uri (uri, &host, &port))
559     goto wrong_uri;
560
561   if (port == (guint16) - 1)
562     port = UDP_DEFAULT_PORT;
563
564   g_free (src->host);
565   src->host = host;
566   src->port = port;
567
568   g_free (src->uri);
569   src->uri = g_strdup (uri);
570
571   return TRUE;
572
573   /* ERRORS */
574 wrong_uri:
575   {
576     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
577         ("error parsing uri %s", uri));
578     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
579         "Could not parse UDP URI");
580     return FALSE;
581   }
582 }
583
584 static void
585 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
586     GParamSpec * pspec)
587 {
588   GstUDPSrc *udpsrc = GST_UDPSRC (object);
589
590   switch (prop_id) {
591     case PROP_BUFFER_SIZE:
592       udpsrc->buffer_size = g_value_get_int (value);
593       break;
594     case PROP_PORT:
595       udpsrc->port = g_value_get_int (value);
596       g_free (udpsrc->uri);
597       udpsrc->uri = g_strdup_printf ("udp://%s:%u", udpsrc->host, udpsrc->port);
598       break;
599     case PROP_MULTICAST_GROUP:
600     {
601       const gchar *group;
602
603       g_free (udpsrc->host);
604       if ((group = g_value_get_string (value)))
605         udpsrc->host = g_strdup (group);
606       else
607         udpsrc->host = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
608
609       g_free (udpsrc->uri);
610       udpsrc->uri = g_strdup_printf ("udp://%s:%u", udpsrc->host, udpsrc->port);
611       break;
612     }
613     case PROP_MULTICAST_IFACE:
614       g_free (udpsrc->multi_iface);
615
616       if (g_value_get_string (value) == NULL)
617         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
618       else
619         udpsrc->multi_iface = g_value_dup_string (value);
620       break;
621     case PROP_URI:
622       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value), NULL);
623       break;
624     case PROP_CAPS:
625     {
626       const GstCaps *new_caps_val = gst_value_get_caps (value);
627
628       GstCaps *new_caps;
629
630       GstCaps *old_caps;
631
632       if (new_caps_val == NULL) {
633         new_caps = gst_caps_new_any ();
634       } else {
635         new_caps = gst_caps_copy (new_caps_val);
636       }
637
638       old_caps = udpsrc->caps;
639       udpsrc->caps = new_caps;
640       if (old_caps)
641         gst_caps_unref (old_caps);
642       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
643       break;
644     }
645     case PROP_SOCKET:
646       if (udpsrc->socket != NULL && udpsrc->socket != udpsrc->used_socket &&
647           udpsrc->close_socket) {
648         GError *err = NULL;
649
650         if (!g_socket_close (udpsrc->socket, &err)) {
651           GST_ERROR ("failed to close socket %p: %s", udpsrc->socket,
652               err->message);
653           g_clear_error (&err);
654         }
655       }
656       if (udpsrc->socket)
657         g_object_unref (udpsrc->socket);
658       udpsrc->socket = g_value_dup_object (value);
659       GST_DEBUG ("setting socket to %p", udpsrc->socket);
660       break;
661     case PROP_TIMEOUT:
662       udpsrc->timeout = g_value_get_uint64 (value);
663       break;
664     case PROP_SKIP_FIRST_BYTES:
665       udpsrc->skip_first_bytes = g_value_get_int (value);
666       break;
667     case PROP_CLOSE_SOCKET:
668       udpsrc->close_socket = g_value_get_boolean (value);
669       break;
670     case PROP_AUTO_MULTICAST:
671       udpsrc->auto_multicast = g_value_get_boolean (value);
672       break;
673     case PROP_REUSE:
674       udpsrc->reuse = g_value_get_boolean (value);
675       break;
676     default:
677       break;
678   }
679 }
680
681 static void
682 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
683     GParamSpec * pspec)
684 {
685   GstUDPSrc *udpsrc = GST_UDPSRC (object);
686
687   switch (prop_id) {
688     case PROP_BUFFER_SIZE:
689       g_value_set_int (value, udpsrc->buffer_size);
690       break;
691     case PROP_PORT:
692       g_value_set_int (value, udpsrc->port);
693       break;
694     case PROP_MULTICAST_GROUP:
695       g_value_set_string (value, udpsrc->host);
696       break;
697     case PROP_MULTICAST_IFACE:
698       g_value_set_string (value, udpsrc->multi_iface);
699       break;
700     case PROP_URI:
701       g_value_set_string (value, udpsrc->uri);
702       break;
703     case PROP_CAPS:
704       gst_value_set_caps (value, udpsrc->caps);
705       break;
706     case PROP_SOCKET:
707       g_value_set_object (value, udpsrc->socket);
708       break;
709     case PROP_TIMEOUT:
710       g_value_set_uint64 (value, udpsrc->timeout);
711       break;
712     case PROP_SKIP_FIRST_BYTES:
713       g_value_set_int (value, udpsrc->skip_first_bytes);
714       break;
715     case PROP_CLOSE_SOCKET:
716       g_value_set_boolean (value, udpsrc->close_socket);
717       break;
718     case PROP_USED_SOCKET:
719       g_value_set_object (value, udpsrc->used_socket);
720       break;
721     case PROP_AUTO_MULTICAST:
722       g_value_set_boolean (value, udpsrc->auto_multicast);
723       break;
724     case PROP_REUSE:
725       g_value_set_boolean (value, udpsrc->reuse);
726       break;
727     default:
728       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
729       break;
730   }
731 }
732
733 /* create a socket for sending to remote machine */
734 static gboolean
735 gst_udpsrc_start (GstBaseSrc * bsrc)
736 {
737   GstUDPSrc *src;
738   GInetAddress *addr, *bind_addr;
739   GSocketAddress *bind_saddr;
740   GResolver *resolver;
741   GError *err = NULL;
742
743   src = GST_UDPSRC (bsrc);
744
745   if (src->socket == NULL) {
746     /* need to allocate a socket */
747     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->host, src->port);
748
749     addr = g_inet_address_new_from_string (src->host);
750     if (!addr) {
751       GList *results;
752
753       GST_DEBUG_OBJECT (src, "resolving IP address for host %s", src->host);
754       resolver = g_resolver_get_default ();
755       results =
756           g_resolver_lookup_by_name (resolver, src->host, src->cancellable,
757           &err);
758       if (!results)
759         goto name_resolve;
760       addr = G_INET_ADDRESS (g_object_ref (results->data));
761
762       g_resolver_free_addresses (results);
763       g_object_unref (resolver);
764     }
765 #ifndef GST_DISABLE_GST_DEBUG
766     {
767       gchar *ip = g_inet_address_to_string (addr);
768
769       GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
770       g_free (ip);
771     }
772 #endif
773
774     if ((src->used_socket =
775             g_socket_new (g_inet_address_get_family (addr),
776                 G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
777       goto no_socket;
778
779     src->external_socket = FALSE;
780
781     GST_DEBUG_OBJECT (src, "got socket %p", src->used_socket);
782
783     if (src->addr)
784       g_object_unref (src->addr);
785     src->addr =
786         G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
787
788     GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
789
790     if (g_inet_address_get_is_multicast (addr))
791       bind_addr = g_inet_address_new_any (g_inet_address_get_family (addr));
792     else
793       bind_addr = G_INET_ADDRESS (g_object_ref (addr));
794
795     g_object_unref (addr);
796
797     bind_saddr = g_inet_socket_address_new (bind_addr, src->port);
798     g_object_unref (bind_addr);
799     if (!g_socket_bind (src->used_socket, bind_saddr, src->reuse, &err))
800       goto bind_error;
801
802     g_object_unref (bind_saddr);
803   } else {
804     GST_DEBUG_OBJECT (src, "using provided socket %p", src->socket);
805     /* we use the configured socket, try to get some info about it */
806     src->used_socket = G_SOCKET (g_object_ref (src->socket));
807     src->external_socket = TRUE;
808
809     if (src->addr)
810       g_object_unref (src->addr);
811     src->addr =
812         G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
813             &err));
814     if (!src->addr)
815       goto getsockname_error;
816   }
817
818 #ifdef SO_RCVBUF
819   {
820     gint rcvsize, ret;
821     socklen_t len;
822
823     len = sizeof (rcvsize);
824     if (src->buffer_size != 0) {
825       rcvsize = src->buffer_size;
826
827       GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
828       /* set buffer size, Note that on Linux this is typically limited to a
829        * maximum of around 100K. Also a minimum of 128 bytes is required on
830        * Linux. */
831       ret =
832           setsockopt (g_socket_get_fd (src->used_socket), SOL_SOCKET, SO_RCVBUF,
833           (void *) &rcvsize, len);
834       if (ret != 0) {
835         GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
836             ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
837                 rcvsize, ret, g_strerror (errno), errno));
838       }
839     }
840
841     /* read the value of the receive buffer. Note that on linux this returns 2x the
842      * value we set because the kernel allocates extra memory for metadata.
843      * The default on Linux is about 100K (which is about 50K without metadata) */
844     ret =
845         getsockopt (g_socket_get_fd (src->used_socket), SOL_SOCKET, SO_RCVBUF,
846         (void *) &rcvsize, &len);
847     if (ret == 0)
848       GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
849     else
850       GST_DEBUG_OBJECT (src, "could not get udp buffer size");
851   }
852 #endif
853
854   g_socket_set_broadcast (src->used_socket, TRUE);
855
856   if (src->auto_multicast
857       &&
858       g_inet_address_get_is_multicast (g_inet_socket_address_get_address
859           (src->addr))) {
860     GST_DEBUG_OBJECT (src, "joining multicast group %s", src->host);
861     if (!g_socket_join_multicast_group (src->used_socket,
862             g_inet_socket_address_get_address (src->addr),
863             FALSE, src->multi_iface, &err))
864       goto membership;
865   }
866
867   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
868    * follows ss_family on both */
869   {
870     GInetSocketAddress *addr;
871     guint16 port;
872
873     addr =
874         G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
875             &err));
876     if (!addr)
877       goto getsockname_error;
878
879     port = g_inet_socket_address_get_port (addr);
880     GST_DEBUG_OBJECT (src, "bound, on port %d", port);
881     if (port != src->port) {
882       src->port = port;
883       GST_DEBUG_OBJECT (src, "notifying port %d", port);
884       g_object_notify (G_OBJECT (src), "port");
885     }
886     g_object_unref (addr);
887   }
888
889   return TRUE;
890
891   /* ERRORS */
892 name_resolve:
893   {
894     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
895         ("Name resolval failed: %s", err->message));
896     g_clear_error (&err);
897     g_object_unref (resolver);
898     return FALSE;
899   }
900 no_socket:
901   {
902     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
903         ("no socket error: %s", err->message));
904     g_clear_error (&err);
905     g_object_unref (addr);
906     return FALSE;
907   }
908 bind_error:
909   {
910     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
911         ("bind failed: %s", err->message));
912     g_clear_error (&err);
913     g_object_unref (bind_saddr);
914     gst_udpsrc_stop (GST_BASE_SRC (src));
915     return FALSE;
916   }
917 membership:
918   {
919     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
920         ("could add membership: %s", err->message));
921     g_clear_error (&err);
922     gst_udpsrc_stop (GST_BASE_SRC (src));
923     return FALSE;
924   }
925 getsockname_error:
926   {
927     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
928         ("getsockname failed: %s", err->message));
929     g_clear_error (&err);
930     gst_udpsrc_stop (GST_BASE_SRC (src));
931     return FALSE;
932   }
933 }
934
935 static gboolean
936 gst_udpsrc_unlock (GstBaseSrc * bsrc)
937 {
938   GstUDPSrc *src;
939
940   src = GST_UDPSRC (bsrc);
941
942   GST_LOG_OBJECT (src, "Flushing");
943   g_cancellable_cancel (src->cancellable);
944
945   return TRUE;
946 }
947
948 static gboolean
949 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
950 {
951   GstUDPSrc *src;
952
953   src = GST_UDPSRC (bsrc);
954
955   GST_LOG_OBJECT (src, "No longer flushing");
956   g_cancellable_reset (src->cancellable);
957
958   return TRUE;
959 }
960
961 static gboolean
962 gst_udpsrc_stop (GstBaseSrc * bsrc)
963 {
964   GstUDPSrc *src;
965
966   src = GST_UDPSRC (bsrc);
967
968   GST_DEBUG ("stopping, closing sockets");
969
970   if (src->used_socket) {
971     if (src->auto_multicast
972         &&
973         g_inet_address_get_is_multicast (g_inet_socket_address_get_address
974             (src->addr))) {
975       GError *err = NULL;
976
977       GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->host);
978
979       if (!g_socket_leave_multicast_group (src->used_socket,
980               g_inet_socket_address_get_address (src->addr), FALSE,
981               src->multi_iface, &err)) {
982         GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
983             err->message);
984         g_clear_error (&err);
985       }
986     }
987
988     if (src->close_socket || !src->external_socket) {
989       GError *err = NULL;
990       if (!g_socket_close (src->used_socket, &err)) {
991         GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
992         g_clear_error (&err);
993       }
994     }
995
996     g_object_unref (src->used_socket);
997     src->used_socket = NULL;
998     g_object_unref (src->addr);
999     src->addr = NULL;
1000   }
1001
1002   return TRUE;
1003 }
1004
1005 /*** GSTURIHANDLER INTERFACE *************************************************/
1006
1007 static GstURIType
1008 gst_udpsrc_uri_get_type (GType type)
1009 {
1010   return GST_URI_SRC;
1011 }
1012
1013 static const gchar *const *
1014 gst_udpsrc_uri_get_protocols (GType type)
1015 {
1016   static const gchar *protocols[] = { "udp", NULL };
1017
1018   return protocols;
1019 }
1020
1021 static gchar *
1022 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1023 {
1024   GstUDPSrc *src = GST_UDPSRC (handler);
1025
1026   return g_strdup (src->uri);
1027 }
1028
1029 static gboolean
1030 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1031     GError ** error)
1032 {
1033   return gst_udpsrc_set_uri (GST_UDPSRC (handler), uri, error);
1034 }
1035
1036 static void
1037 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1038 {
1039   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1040
1041   iface->get_type = gst_udpsrc_uri_get_type;
1042   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1043   iface->get_uri = gst_udpsrc_uri_get_uri;
1044   iface->set_uri = gst_udpsrc_uri_set_uri;
1045 }