Merging gst-editing-services
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / udp / gstudpsrc.c
1 /* GStreamer
2  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  * Copyright (C) <2012> Collabora Ltd.
5  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
7  * Copyright (C) 2014 Centricular Ltd
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 /**
26  * SECTION:element-udpsrc
27  * @title: udpsrc
28  * @see_also: udpsink, multifdsink
29  *
30  * udpsrc is a network source that reads UDP packets from the network.
31  * It can be combined with RTP depayloaders to implement RTP streaming.
32  *
33  * The udpsrc element supports automatic port allocation by setting the
34  * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
35  * allocated port can be obtained by reading the port property.
36  *
37  * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
38  * property to the IP address of the multicast group.
39  *
40  * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:socket
41  * property, udpsrc will then not allocate a socket itself but use the provided
42  * one.
43  *
44  * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
45  * so that they can be autoplugged in GStreamer pipelines. This is very useful
46  * for RTP implementations where the contents of the UDP packets is transferred
47  * out-of-bounds using SDP or other means.
48  *
49  * The #GstUDPSrc:buffer-size property is used to change the default kernel
50  * buffersizes used for receiving packets. The buffer size may be increased for
51  * high-volume connections, or may be decreased to limit the possible backlog of
52  * incoming data. The system places an absolute limit on these values, on Linux,
53  * for example, the default buffer size is typically 50K and can be increased to
54  * maximally 100K.
55  *
56  * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
57  * number of bytes from the start of the raw udp packet and can be used to strip
58  * off proprietary header, for example.
59  *
60  * The udpsrc is always a live source. It does however not provide a #GstClock,
61  * this is left for downstream elements such as an RTP session manager or demuxer
62  * (such as an MPEG demuxer). As with all live sources, the captured buffers
63  * will have their timestamp set to the current running time of the pipeline.
64  *
65  * udpsrc implements a #GstURIHandler interface that handles udp://host:port
66  * type URIs.
67  *
68  * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
69  * will generate an element message named `GstUDPSrcTimeout`
70  * if no data was received in the given timeout.
71  *
72  * The message's structure contains one field:
73  *
74  * * #guint64 `timeout`: the timeout in microseconds that expired when waiting for data.
75  *
76  * The message is typically used to detect that no UDP arrives in the receiver
77  * because it is blocked by a firewall.
78  *
79  * A custom file descriptor can be configured with the
80  * #GstUDPSrc:socket property. The socket will be closed when setting
81  * the element to READY by default. This behaviour can be overridden
82  * with the #GstUDPSrc:close-socket property, in which case the
83  * application is responsible for closing the file descriptor.
84  *
85  * ## Examples
86  * |[
87  * gst-launch-1.0 -v udpsrc ! fakesink dump=1
88  * ]| A pipeline to read from the default port and dump the udp packets.
89  * To actually generate udp packets on the default port one can use the
90  * udpsink element. When running the following pipeline in another terminal, the
91  * above mentioned pipeline should dump data packets to the console.
92  * |[
93  * gst-launch-1.0 -v audiotestsrc ! udpsink
94  * ]|
95  * |[
96  * gst-launch-1.0 -v udpsrc port=0 ! fakesink
97  * ]| read udp packets from a free port.
98  *
99  */
100 #ifdef HAVE_CONFIG_H
101 #include "config.h"
102 #endif
103
104 /* Needed to get struct in6_pktinfo.
105  * Also all these have to be before glib.h is included as
106  * otherwise struct in6_pktinfo is not defined completely
107  * due to broken glibc headers */
108 #define _GNU_SOURCE
109 /* Needed for OSX/iOS to define the IPv6 variants */
110 #define __APPLE_USE_RFC_3542
111 #include <sys/types.h>
112 #ifdef HAVE_SYS_SOCKET_H
113 #include <sys/socket.h>
114 #endif
115
116 #include <string.h>
117 #include "gstudpelements.h"
118 #include "gstudpsrc.h"
119
120 #include <gst/net/gstnetaddressmeta.h>
121
122 #include <gio/gnetworking.h>
123
124 /* Required for other parts of in_pktinfo / in6_pktinfo but only
125  * on non-Windows and can be included after glib.h */
126 #ifndef G_PLATFORM_WIN32
127 #include <netinet/ip.h>
128 #endif
129
130 /* Control messages for getting the destination address */
131 #ifdef IP_PKTINFO
132 GType gst_ip_pktinfo_message_get_type (void);
133
134 #define GST_TYPE_IP_PKTINFO_MESSAGE         (gst_ip_pktinfo_message_get_type ())
135 #define GST_IP_PKTINFO_MESSAGE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessage))
136 #define GST_IP_PKTINFO_MESSAGE_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessageClass))
137 #define GST_IS_IP_PKTINFO_MESSAGE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IP_PKTINFO_MESSAGE))
138 #define GST_IS_IP_PKTINFO_MESSAGE_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IP_PKTINFO_MESSAGE))
139 #define GST_IP_PKTINFO_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessageClass))
140
141 typedef struct _GstIPPktinfoMessage GstIPPktinfoMessage;
142 typedef struct _GstIPPktinfoMessageClass GstIPPktinfoMessageClass;
143
144 struct _GstIPPktinfoMessageClass
145 {
146   GSocketControlMessageClass parent_class;
147
148 };
149
150 struct _GstIPPktinfoMessage
151 {
152   GSocketControlMessage parent;
153
154   guint ifindex;
155 #ifndef G_PLATFORM_WIN32
156 #ifndef __NetBSD__
157   struct in_addr spec_dst;
158 #endif
159 #endif
160   struct in_addr addr;
161 };
162
163 G_DEFINE_TYPE (GstIPPktinfoMessage, gst_ip_pktinfo_message,
164     G_TYPE_SOCKET_CONTROL_MESSAGE);
165
166 static gsize
167 gst_ip_pktinfo_message_get_size (GSocketControlMessage * message)
168 {
169   return sizeof (struct in_pktinfo);
170 }
171
172 static int
173 gst_ip_pktinfo_message_get_level (GSocketControlMessage * message)
174 {
175   return IPPROTO_IP;
176 }
177
178 static int
179 gst_ip_pktinfo_message_get_msg_type (GSocketControlMessage * message)
180 {
181   return IP_PKTINFO;
182 }
183
184 static GSocketControlMessage *
185 gst_ip_pktinfo_message_deserialize (gint level,
186     gint type, gsize size, gpointer data)
187 {
188   struct in_pktinfo *pktinfo;
189   GstIPPktinfoMessage *message;
190
191   if (level != IPPROTO_IP || type != IP_PKTINFO)
192     return NULL;
193
194   if (size < sizeof (struct in_pktinfo))
195     return NULL;
196
197   pktinfo = data;
198
199   message = g_object_new (GST_TYPE_IP_PKTINFO_MESSAGE, NULL);
200   message->ifindex = pktinfo->ipi_ifindex;
201 #ifndef G_PLATFORM_WIN32
202 #ifndef __NetBSD__
203   message->spec_dst = pktinfo->ipi_spec_dst;
204 #endif
205 #endif
206   message->addr = pktinfo->ipi_addr;
207
208   return G_SOCKET_CONTROL_MESSAGE (message);
209 }
210
211 static void
212 gst_ip_pktinfo_message_init (GstIPPktinfoMessage * message)
213 {
214 }
215
216 static void
217 gst_ip_pktinfo_message_class_init (GstIPPktinfoMessageClass * class)
218 {
219   GSocketControlMessageClass *scm_class;
220
221   scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
222   scm_class->get_size = gst_ip_pktinfo_message_get_size;
223   scm_class->get_level = gst_ip_pktinfo_message_get_level;
224   scm_class->get_type = gst_ip_pktinfo_message_get_msg_type;
225   scm_class->deserialize = gst_ip_pktinfo_message_deserialize;
226 }
227 #endif
228
229 #ifdef IPV6_PKTINFO
230 GType gst_ipv6_pktinfo_message_get_type (void);
231
232 #define GST_TYPE_IPV6_PKTINFO_MESSAGE         (gst_ipv6_pktinfo_message_get_type ())
233 #define GST_IPV6_PKTINFO_MESSAGE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessage))
234 #define GST_IPV6_PKTINFO_MESSAGE_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessageClass))
235 #define GST_IS_IPV6_PKTINFO_MESSAGE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE))
236 #define GST_IS_IPV6_PKTINFO_MESSAGE_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IPV6_PKTINFO_MESSAGE))
237 #define GST_IPV6_PKTINFO_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessageClass))
238
239 typedef struct _GstIPV6PktinfoMessage GstIPV6PktinfoMessage;
240 typedef struct _GstIPV6PktinfoMessageClass GstIPV6PktinfoMessageClass;
241
242 struct _GstIPV6PktinfoMessageClass
243 {
244   GSocketControlMessageClass parent_class;
245
246 };
247
248 struct _GstIPV6PktinfoMessage
249 {
250   GSocketControlMessage parent;
251
252   guint ifindex;
253   struct in6_addr addr;
254 };
255
256 G_DEFINE_TYPE (GstIPV6PktinfoMessage, gst_ipv6_pktinfo_message,
257     G_TYPE_SOCKET_CONTROL_MESSAGE);
258
259 static gsize
260 gst_ipv6_pktinfo_message_get_size (GSocketControlMessage * message)
261 {
262   return sizeof (struct in6_pktinfo);
263 }
264
265 static int
266 gst_ipv6_pktinfo_message_get_level (GSocketControlMessage * message)
267 {
268   return IPPROTO_IPV6;
269 }
270
271 static int
272 gst_ipv6_pktinfo_message_get_msg_type (GSocketControlMessage * message)
273 {
274   return IPV6_PKTINFO;
275 }
276
277 static GSocketControlMessage *
278 gst_ipv6_pktinfo_message_deserialize (gint level,
279     gint type, gsize size, gpointer data)
280 {
281   struct in6_pktinfo *pktinfo;
282   GstIPV6PktinfoMessage *message;
283
284   if (level != IPPROTO_IPV6 || type != IPV6_PKTINFO)
285     return NULL;
286
287   if (size < sizeof (struct in6_pktinfo))
288     return NULL;
289
290   pktinfo = data;
291
292   message = g_object_new (GST_TYPE_IPV6_PKTINFO_MESSAGE, NULL);
293   message->ifindex = pktinfo->ipi6_ifindex;
294   message->addr = pktinfo->ipi6_addr;
295
296   return G_SOCKET_CONTROL_MESSAGE (message);
297 }
298
299 static void
300 gst_ipv6_pktinfo_message_init (GstIPV6PktinfoMessage * message)
301 {
302 }
303
304 static void
305 gst_ipv6_pktinfo_message_class_init (GstIPV6PktinfoMessageClass * class)
306 {
307   GSocketControlMessageClass *scm_class;
308
309   scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
310   scm_class->get_size = gst_ipv6_pktinfo_message_get_size;
311   scm_class->get_level = gst_ipv6_pktinfo_message_get_level;
312   scm_class->get_type = gst_ipv6_pktinfo_message_get_msg_type;
313   scm_class->deserialize = gst_ipv6_pktinfo_message_deserialize;
314 }
315
316 #endif
317
318 #ifdef IP_RECVDSTADDR
319 GType gst_ip_recvdstaddr_message_get_type (void);
320
321 #define GST_TYPE_IP_RECVDSTADDR_MESSAGE         (gst_ip_recvdstaddr_message_get_type ())
322 #define GST_IP_RECVDSTADDR_MESSAGE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessage))
323 #define GST_IP_RECVDSTADDR_MESSAGE_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessageClass))
324 #define GST_IS_IP_RECVDSTADDR_MESSAGE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE))
325 #define GST_IS_IP_RECVDSTADDR_MESSAGE_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IP_RECVDSTADDR_MESSAGE))
326 #define GST_IP_RECVDSTADDR_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessageClass))
327
328 typedef struct _GstIPRecvdstaddrMessage GstIPRecvdstaddrMessage;
329 typedef struct _GstIPRecvdstaddrMessageClass GstIPRecvdstaddrMessageClass;
330
331 struct _GstIPRecvdstaddrMessageClass
332 {
333   GSocketControlMessageClass parent_class;
334
335 };
336
337 struct _GstIPRecvdstaddrMessage
338 {
339   GSocketControlMessage parent;
340
341   guint ifindex;
342   struct in_addr addr;
343 };
344
345 G_DEFINE_TYPE (GstIPRecvdstaddrMessage, gst_ip_recvdstaddr_message,
346     G_TYPE_SOCKET_CONTROL_MESSAGE);
347
348 static gsize
349 gst_ip_recvdstaddr_message_get_size (GSocketControlMessage * message)
350 {
351   return sizeof (struct in_addr);
352 }
353
354 static int
355 gst_ip_recvdstaddr_message_get_level (GSocketControlMessage * message)
356 {
357   return IPPROTO_IP;
358 }
359
360 static int
361 gst_ip_recvdstaddr_message_get_msg_type (GSocketControlMessage * message)
362 {
363   return IP_RECVDSTADDR;
364 }
365
366 static GSocketControlMessage *
367 gst_ip_recvdstaddr_message_deserialize (gint level,
368     gint type, gsize size, gpointer data)
369 {
370   struct in_addr *addr;
371   GstIPRecvdstaddrMessage *message;
372
373   if (level != IPPROTO_IP || type != IP_RECVDSTADDR)
374     return NULL;
375
376   if (size < sizeof (struct in_addr))
377     return NULL;
378
379   addr = data;
380
381   message = g_object_new (GST_TYPE_IP_RECVDSTADDR_MESSAGE, NULL);
382   message->addr = *addr;
383
384   return G_SOCKET_CONTROL_MESSAGE (message);
385 }
386
387 static void
388 gst_ip_recvdstaddr_message_init (GstIPRecvdstaddrMessage * message)
389 {
390 }
391
392 static void
393 gst_ip_recvdstaddr_message_class_init (GstIPRecvdstaddrMessageClass * class)
394 {
395   GSocketControlMessageClass *scm_class;
396
397   scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
398   scm_class->get_size = gst_ip_recvdstaddr_message_get_size;
399   scm_class->get_level = gst_ip_recvdstaddr_message_get_level;
400   scm_class->get_type = gst_ip_recvdstaddr_message_get_msg_type;
401   scm_class->deserialize = gst_ip_recvdstaddr_message_deserialize;
402 }
403 #endif
404
405 #define GST_TYPE_SOCKET_TIMESTAMP_MODE gst_socket_timestamp_mode_get_type()
406 #define GST_SOCKET_TIMESTAMP_MODE (gst_socket_timestamp_mode_get_type ())
407 static GType
408 gst_socket_timestamp_mode_get_type (void)
409 {
410   static GType socket_timestamp_mode_type = 0;
411   static const GEnumValue socket_timestamp_mode_types[] = {
412     {GST_SOCKET_TIMESTAMP_MODE_DISABLED, "Disable additional timestamps",
413         "disabled"},
414     {GST_SOCKET_TIMESTAMP_MODE_REALTIME,
415           "Timestamp with realtime clock (nsec resolution, may not be monotonic)",
416         "realtime"},
417     {0, NULL, NULL}
418   };
419
420   if (!socket_timestamp_mode_type)
421     socket_timestamp_mode_type =
422         g_enum_register_static ("GstSocketTimestampMode",
423         socket_timestamp_mode_types);
424
425   return socket_timestamp_mode_type;
426 }
427
428 #ifdef SO_TIMESTAMPNS
429 GType gst_socket_timestamp_message_get_type (void);
430
431 #define GST_TYPE_SOCKET_TIMESTAMP_MESSAGE          (gst_socket_timestamp_message_get_type ())
432 #define GST_SOCKET_TIMESTAMP_MESSAGE(o)            (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, GstSocketTimestampMessage))
433 #define GST_SOCKET_TIMESTAMP_MESSAGE_CLASS(c)      (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, GstSocketTimestampMessageClass))
434 #define GST_IS_SOCKET_TIMESTAMP_MESSAGE(o)         (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE))
435 #define GST_IS_SOCKET_TIMESTAMP_MESSAGE_CLASS(c)   (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE))
436 #define GST_SOCKET_TIMESTAMP_MESSAGE_GET_CLASS(o)  (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, GstSocketTimestampMessageClass))
437
438 typedef struct _GstSocketTimestampMessage GstSocketTimestampMessage;
439 typedef struct _GstSocketTimestampMessageClass GstSocketTimestampMessageClass;
440
441 struct _GstSocketTimestampMessageClass
442 {
443   GSocketControlMessageClass parent_class;
444 };
445
446 struct _GstSocketTimestampMessage
447 {
448   GSocketControlMessage parent;
449   struct timespec socket_ts;
450 };
451
452 G_DEFINE_TYPE (GstSocketTimestampMessage, gst_socket_timestamp_message,
453     G_TYPE_SOCKET_CONTROL_MESSAGE);
454
455 static gsize
456 gst_socket_timestamp_message_get_size (GSocketControlMessage * message)
457 {
458   return sizeof (struct timespec);
459 }
460
461 static int
462 gst_socket_timestamp_message_get_level (GSocketControlMessage * message)
463 {
464   return SOL_SOCKET;
465 }
466
467 static int
468 gst_socket_timestamp_message_get_msg_type (GSocketControlMessage * message)
469 {
470   return SCM_TIMESTAMPNS;
471 }
472
473 static GSocketControlMessage *
474 gst_socket_timestamp_message_deserialize (gint level,
475     gint type, gsize size, gpointer data)
476 {
477   GstSocketTimestampMessage *message;
478
479   if (level != SOL_SOCKET)
480     return NULL;
481
482   if (size < sizeof (struct timespec))
483     return NULL;
484
485   message = g_object_new (GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, NULL);
486   memcpy (&message->socket_ts, data, sizeof (struct timespec));
487
488   return G_SOCKET_CONTROL_MESSAGE (message);
489 }
490
491 static void
492 gst_socket_timestamp_message_init (GstSocketTimestampMessage * message)
493 {
494 }
495
496 static void
497 gst_socket_timestamp_message_class_init (GstSocketTimestampMessageClass * class)
498 {
499   GSocketControlMessageClass *scm_class;
500
501   scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
502   scm_class->get_size = gst_socket_timestamp_message_get_size;
503   scm_class->get_level = gst_socket_timestamp_message_get_level;
504   scm_class->get_type = gst_socket_timestamp_message_get_msg_type;
505   scm_class->deserialize = gst_socket_timestamp_message_deserialize;
506 }
507 #endif
508
509 static gboolean
510 gst_udpsrc_decide_allocation (GstBaseSrc * bsrc, GstQuery * query)
511 {
512   GstUDPSrc *udpsrc;
513   GstBufferPool *pool;
514   gboolean update;
515   GstStructure *config;
516   GstCaps *caps = NULL;
517
518   udpsrc = GST_UDPSRC (bsrc);
519
520   if (gst_query_get_n_allocation_pools (query) > 0) {
521     update = TRUE;
522   } else {
523     update = FALSE;
524   }
525
526   pool = gst_buffer_pool_new ();
527
528   config = gst_buffer_pool_get_config (pool);
529
530   gst_query_parse_allocation (query, &caps, NULL);
531
532   gst_buffer_pool_config_set_params (config, caps, udpsrc->mtu, 0, 0);
533
534   gst_buffer_pool_set_config (pool, config);
535
536   if (update)
537     gst_query_set_nth_allocation_pool (query, 0, pool, udpsrc->mtu, 0, 0);
538   else
539     gst_query_add_allocation_pool (query, pool, udpsrc->mtu, 0, 0);
540
541   gst_object_unref (pool);
542
543   return TRUE;
544 }
545
546 /* not 100% correct, but a good upper bound for memory allocation purposes */
547 #define MAX_IPV4_UDP_PACKET_SIZE (65536 - 8)
548
549 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
550 #define GST_CAT_DEFAULT (udpsrc_debug)
551
552 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
553     GST_PAD_SRC,
554     GST_PAD_ALWAYS,
555     GST_STATIC_CAPS_ANY);
556
557 #define UDP_DEFAULT_PORT                5004
558 #define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
559 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
560 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
561 #define UDP_DEFAULT_CAPS                NULL
562 #define UDP_DEFAULT_SOCKET              NULL
563 #define UDP_DEFAULT_BUFFER_SIZE         0
564 #define UDP_DEFAULT_TIMEOUT             0
565 #define UDP_DEFAULT_SKIP_FIRST_BYTES    0
566 #define UDP_DEFAULT_CLOSE_SOCKET       TRUE
567 #define UDP_DEFAULT_USED_SOCKET        NULL
568 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
569 #define UDP_DEFAULT_REUSE              TRUE
570 #define UDP_DEFAULT_LOOP               TRUE
571 #define UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS TRUE
572 #define UDP_DEFAULT_MTU                (1492)
573
574 enum
575 {
576   PROP_0,
577
578   PROP_PORT,
579   PROP_MULTICAST_GROUP,
580   PROP_MULTICAST_IFACE,
581   PROP_URI,
582   PROP_CAPS,
583   PROP_SOCKET,
584   PROP_BUFFER_SIZE,
585   PROP_TIMEOUT,
586   PROP_SKIP_FIRST_BYTES,
587   PROP_CLOSE_SOCKET,
588   PROP_USED_SOCKET,
589   PROP_AUTO_MULTICAST,
590   PROP_REUSE,
591   PROP_ADDRESS,
592   PROP_LOOP,
593   PROP_RETRIEVE_SENDER_ADDRESS,
594   PROP_MTU,
595   PROP_SOCKET_TIMESTAMP,
596 };
597
598 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
599
600 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
601 static gboolean gst_udpsrc_close (GstUDPSrc * src);
602 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
603 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
604 static GstFlowReturn gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf);
605
606 static void gst_udpsrc_finalize (GObject * object);
607
608 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
609     const GValue * value, GParamSpec * pspec);
610 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
611     GValue * value, GParamSpec * pspec);
612
613 static GstStateChangeReturn gst_udpsrc_change_state (GstElement * element,
614     GstStateChange transition);
615
616 #define gst_udpsrc_parent_class parent_class
617 G_DEFINE_TYPE_WITH_CODE (GstUDPSrc, gst_udpsrc, GST_TYPE_PUSH_SRC,
618     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_udpsrc_uri_handler_init));
619 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (udpsrc, "udpsrc", GST_RANK_NONE,
620     GST_TYPE_UDPSRC, udp_element_init (plugin));
621
622 static void
623 gst_udpsrc_class_init (GstUDPSrcClass * klass)
624 {
625   GObjectClass *gobject_class;
626   GstElementClass *gstelement_class;
627   GstBaseSrcClass *gstbasesrc_class;
628   GstPushSrcClass *gstpushsrc_class;
629
630   gobject_class = (GObjectClass *) klass;
631   gstelement_class = (GstElementClass *) klass;
632   gstbasesrc_class = (GstBaseSrcClass *) klass;
633   gstpushsrc_class = (GstPushSrcClass *) klass;
634
635   GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
636
637 #ifdef IP_PKTINFO
638   GST_TYPE_IP_PKTINFO_MESSAGE;
639 #endif
640 #ifdef IPV6_PKTINFO
641   GST_TYPE_IPV6_PKTINFO_MESSAGE;
642 #endif
643 #ifdef IP_RECVDSTADDR
644   GST_TYPE_IP_RECVDSTADDR_MESSAGE;
645 #endif
646 #ifdef SO_TIMESTAMPNS
647   GST_TYPE_SOCKET_TIMESTAMP_MESSAGE;
648 #endif
649
650   gobject_class->set_property = gst_udpsrc_set_property;
651   gobject_class->get_property = gst_udpsrc_get_property;
652   gobject_class->finalize = gst_udpsrc_finalize;
653
654   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
655       g_param_spec_int ("port", "Port",
656           "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
657           UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
658   /* FIXME 2.0: Remove multicast-group property */
659 #ifndef GST_REMOVE_DEPRECATED
660   g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
661       g_param_spec_string ("multicast-group", "Multicast Group",
662           "The Address of multicast group to join. (DEPRECATED: "
663           "Use address property instead)", UDP_DEFAULT_MULTICAST_GROUP,
664           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
665 #endif
666   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
667       g_param_spec_string ("multicast-iface", "Multicast Interface",
668           "The network interface on which to join the multicast group."
669           "This allows multiple interfaces separated by comma. (\"eth0,eth1\")",
670           UDP_DEFAULT_MULTICAST_IFACE,
671           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
672   g_object_class_install_property (gobject_class, PROP_URI,
673       g_param_spec_string ("uri", "URI",
674           "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
675           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
676   g_object_class_install_property (gobject_class, PROP_CAPS,
677       g_param_spec_boxed ("caps", "Caps",
678           "The caps of the source pad", GST_TYPE_CAPS,
679           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680   g_object_class_install_property (gobject_class, PROP_SOCKET,
681       g_param_spec_object ("socket", "Socket",
682           "Socket to use for UDP reception. (NULL == allocate)",
683           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
684   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
685       g_param_spec_int ("buffer-size", "Buffer Size",
686           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
687           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
688   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
689       g_param_spec_uint64 ("timeout", "Timeout",
690           "Post a message after timeout nanoseconds (0 = disabled)", 0,
691           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
692           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
693   g_object_class_install_property (G_OBJECT_CLASS (klass),
694       PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
695           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
696           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
697           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
698   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
699       g_param_spec_boolean ("close-socket", "Close socket",
700           "Close socket if passed as property on state change",
701           UDP_DEFAULT_CLOSE_SOCKET,
702           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
703   g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
704       g_param_spec_object ("used-socket", "Socket Handle",
705           "Socket currently in use for UDP reception. (NULL = no socket)",
706           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
707   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
708       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
709           "Automatically join/leave multicast groups",
710           UDP_DEFAULT_AUTO_MULTICAST,
711           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
712   g_object_class_install_property (gobject_class, PROP_REUSE,
713       g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
714           UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
715   g_object_class_install_property (gobject_class, PROP_ADDRESS,
716       g_param_spec_string ("address", "Address",
717           "Address to receive packets for. This is equivalent to the "
718           "multicast-group property for now", UDP_DEFAULT_MULTICAST_GROUP,
719           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
720   /**
721    * GstUDPSrc:loop:
722    *
723    * Can be used to disable multicast loopback.
724    *
725    * Since: 1.8
726    */
727   g_object_class_install_property (gobject_class, PROP_LOOP,
728       g_param_spec_boolean ("loop", "Multicast Loopback",
729           "Used for setting the multicast loop parameter. TRUE = enable,"
730           " FALSE = disable", UDP_DEFAULT_LOOP,
731           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
732   /**
733    * GstUDPSrc:retrieve-sender-address:
734    *
735    * Whether to retrieve the sender address and add it to the buffers as
736    * meta. Disabling this might result in minor performance improvements
737    * in certain scenarios.
738    *
739    * Since: 1.10
740    */
741   g_object_class_install_property (gobject_class, PROP_RETRIEVE_SENDER_ADDRESS,
742       g_param_spec_boolean ("retrieve-sender-address",
743           "Retrieve Sender Address",
744           "Whether to retrieve the sender address and add it to buffers as "
745           "meta. Disabling this might result in minor performance improvements "
746           "in certain scenarios", UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS,
747           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
748   /**
749    * GstUDPSrc:mtu:
750    *
751    * Maximum expected packet size. This directly defines the allocation
752    * size of the receive buffer pool.
753    *
754    * In case more data is received, a new #GstMemory is appended to the
755    * output buffer, ensuring no data is lost, this however leads to that
756    * buffer being freed and reallocated.
757    *
758    * Since: 1.14
759    */
760   g_object_class_install_property (gobject_class, PROP_MTU,
761       g_param_spec_uint ("mtu", "Expected Maximum Transmission Unit",
762           "Maximum expected packet size. This directly defines the allocation"
763           "size of the receive buffer pool.",
764           0, G_MAXINT, UDP_DEFAULT_MTU,
765           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
766
767   /**
768    * GstUDPSrc:socket-timestamp:
769    *
770    * Can be used to read the timestamp on incoming buffers using socket
771    * control messages and set as the DTS.
772    *
773    * Since: 1.20
774    */
775   g_object_class_install_property (gobject_class, PROP_SOCKET_TIMESTAMP,
776       g_param_spec_enum ("socket-timestamp",
777           "Use Socket Control Message Timestamp for DTS",
778           "Used for adding alternative timestamp using SO_TIMESTAMP.",
779           GST_SOCKET_TIMESTAMP_MODE, GST_SOCKET_TIMESTAMP_MODE_REALTIME,
780           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
781
782   gst_element_class_add_static_pad_template (gstelement_class, &src_template);
783
784   gst_element_class_set_static_metadata (gstelement_class,
785       "UDP packet receiver", "Source/Network",
786       "Receive data over the network via UDP",
787       "Wim Taymans <wim@fluendo.com>, "
788       "Thijs Vermeir <thijs.vermeir@barco.com>");
789
790   gstelement_class->change_state = gst_udpsrc_change_state;
791
792   gstbasesrc_class->unlock = gst_udpsrc_unlock;
793   gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
794   gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
795   gstbasesrc_class->decide_allocation = gst_udpsrc_decide_allocation;
796
797   gstpushsrc_class->fill = gst_udpsrc_fill;
798
799   gst_type_mark_as_plugin_api (GST_TYPE_SOCKET_TIMESTAMP_MODE, 0);
800 }
801
802 static void
803 gst_udpsrc_init (GstUDPSrc * udpsrc)
804 {
805   udpsrc->uri =
806       g_strdup_printf ("udp://%s:%u", UDP_DEFAULT_MULTICAST_GROUP,
807       UDP_DEFAULT_PORT);
808
809   udpsrc->address = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
810   udpsrc->port = UDP_DEFAULT_PORT;
811   udpsrc->socket = UDP_DEFAULT_SOCKET;
812   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
813   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
814   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
815   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
816   udpsrc->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
817   udpsrc->external_socket = (udpsrc->socket != NULL);
818   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
819   udpsrc->used_socket = UDP_DEFAULT_USED_SOCKET;
820   udpsrc->reuse = UDP_DEFAULT_REUSE;
821   udpsrc->loop = UDP_DEFAULT_LOOP;
822   udpsrc->retrieve_sender_address = UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS;
823   udpsrc->mtu = UDP_DEFAULT_MTU;
824
825   /* configure basesrc to be a live source */
826   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
827   /* make basesrc output a segment in time */
828   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
829   /* make basesrc set timestamps on outgoing buffers based on the running_time
830    * when they were captured */
831   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
832 }
833
834 static void
835 gst_udpsrc_finalize (GObject * object)
836 {
837   GstUDPSrc *udpsrc;
838
839   udpsrc = GST_UDPSRC (object);
840
841   if (udpsrc->caps)
842     gst_caps_unref (udpsrc->caps);
843   udpsrc->caps = NULL;
844
845   g_free (udpsrc->multi_iface);
846   udpsrc->multi_iface = NULL;
847
848   g_free (udpsrc->uri);
849   udpsrc->uri = NULL;
850
851   g_free (udpsrc->address);
852   udpsrc->address = NULL;
853
854   if (udpsrc->socket)
855     g_object_unref (udpsrc->socket);
856   udpsrc->socket = NULL;
857
858   if (udpsrc->used_socket)
859     g_object_unref (udpsrc->used_socket);
860   udpsrc->used_socket = NULL;
861
862   if (udpsrc->extra_mem)
863     gst_memory_unref (udpsrc->extra_mem);
864   udpsrc->extra_mem = NULL;
865
866   G_OBJECT_CLASS (parent_class)->finalize (object);
867 }
868
869 static GstCaps *
870 gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
871 {
872   GstUDPSrc *udpsrc;
873   GstCaps *caps, *result;
874
875   udpsrc = GST_UDPSRC (src);
876
877   GST_OBJECT_LOCK (src);
878   if ((caps = udpsrc->caps))
879     gst_caps_ref (caps);
880   GST_OBJECT_UNLOCK (src);
881
882   if (caps) {
883     if (filter) {
884       result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
885       gst_caps_unref (caps);
886     } else {
887       result = caps;
888     }
889   } else {
890     result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
891   }
892   return result;
893 }
894
895 static void
896 gst_udpsrc_create_cancellable (GstUDPSrc * src)
897 {
898   GPollFD pollfd;
899
900   src->cancellable = g_cancellable_new ();
901   src->made_cancel_fd = g_cancellable_make_pollfd (src->cancellable, &pollfd);
902 }
903
904 static void
905 gst_udpsrc_free_cancellable (GstUDPSrc * src)
906 {
907   if (src->made_cancel_fd) {
908     g_cancellable_release_fd (src->cancellable);
909     src->made_cancel_fd = FALSE;
910   }
911   g_object_unref (src->cancellable);
912   src->cancellable = NULL;
913 }
914
915 static GstFlowReturn
916 gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf)
917 {
918   GstUDPSrc *udpsrc;
919   GSocketAddress *saddr = NULL;
920   GSocketAddress **p_saddr;
921   gint flags = G_SOCKET_MSG_NONE;
922   gboolean try_again;
923   GError *err = NULL;
924   gssize res;
925   gsize offset;
926   GSocketControlMessage **msgs = NULL;
927   GSocketControlMessage ***p_msgs;
928   gint n_msgs = 0, i;
929   GstMapInfo info;
930   GstMapInfo extra_info;
931   GInputVector ivec[2];
932
933   udpsrc = GST_UDPSRC_CAST (psrc);
934
935   /* optimization: use messages only in multicast mode and
936    * if we can't let the kernel do the filtering for us */
937   p_msgs =
938       (g_inet_address_get_is_multicast (g_inet_socket_address_get_address
939           (udpsrc->addr))) ? &msgs : NULL;
940 #ifdef IP_MULTICAST_ALL
941   if (g_inet_address_get_family (g_inet_socket_address_get_address
942           (udpsrc->addr)) == G_SOCKET_FAMILY_IPV4)
943     p_msgs = NULL;
944 #endif
945 #ifdef SO_TIMESTAMPNS
946   if (udpsrc->socket_timestamp_mode == GST_SOCKET_TIMESTAMP_MODE_REALTIME)
947     p_msgs = &msgs;
948 #endif
949
950   /* Retrieve sender address unless we've been configured not to do so */
951   p_saddr = (udpsrc->retrieve_sender_address) ? &saddr : NULL;
952
953   if (!gst_buffer_map (outbuf, &info, GST_MAP_READWRITE))
954     goto buffer_map_error;
955
956   ivec[0].buffer = info.data;
957   ivec[0].size = info.size;
958
959   /* Prepare memory in case the data size exceeds mtu */
960   if (udpsrc->extra_mem == NULL) {
961     GstBufferPool *pool;
962     GstStructure *config;
963     GstAllocator *allocator = NULL;
964     GstAllocationParams params;
965
966     pool = gst_base_src_get_buffer_pool (GST_BASE_SRC_CAST (psrc));
967     config = gst_buffer_pool_get_config (pool);
968     gst_buffer_pool_config_get_allocator (config, &allocator, &params);
969
970     udpsrc->extra_mem =
971         gst_allocator_alloc (allocator, MAX_IPV4_UDP_PACKET_SIZE, &params);
972
973     gst_object_unref (pool);
974     gst_structure_free (config);
975     if (allocator)
976       gst_object_unref (allocator);
977   }
978
979   if (!gst_memory_map (udpsrc->extra_mem, &extra_info, GST_MAP_READWRITE))
980     goto memory_map_error;
981
982   ivec[1].buffer = extra_info.data;
983   ivec[1].size = extra_info.size;
984
985 retry:
986   if (saddr != NULL) {
987     g_object_unref (saddr);
988     saddr = NULL;
989   }
990
991   do {
992     gint64 timeout;
993
994     try_again = FALSE;
995
996     if (udpsrc->timeout)
997       timeout = udpsrc->timeout / 1000;
998     else
999       timeout = -1;
1000
1001     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GINT64_FORMAT, timeout);
1002
1003     if (!g_socket_condition_timed_wait (udpsrc->used_socket, G_IO_IN | G_IO_PRI,
1004             timeout, udpsrc->cancellable, &err)) {
1005       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)
1006           || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1007         goto stopped;
1008       } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1009         g_clear_error (&err);
1010         /* timeout, post element message */
1011         gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
1012             gst_message_new_element (GST_OBJECT_CAST (udpsrc),
1013                 gst_structure_new ("GstUDPSrcTimeout",
1014                     "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
1015       } else {
1016         goto select_error;
1017       }
1018
1019       try_again = TRUE;
1020     }
1021   } while (G_UNLIKELY (try_again));
1022
1023   res =
1024       g_socket_receive_message (udpsrc->used_socket, p_saddr, ivec, 2,
1025       p_msgs, &n_msgs, &flags, udpsrc->cancellable, &err);
1026
1027   if (G_UNLIKELY (res < 0)) {
1028     /* G_IO_ERROR_HOST_UNREACHABLE for a UDP socket means that a packet sent
1029      * with udpsink generated a "port unreachable" ICMP response. We ignore
1030      * that and try again.
1031      * On Windows we get G_IO_ERROR_CONNECTION_CLOSED instead */
1032     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE) ||
1033         g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
1034       g_clear_error (&err);
1035       goto retry;
1036     }
1037     goto receive_error;
1038   }
1039
1040   /* Retry if multicast and the destination address is not ours. We don't want
1041    * to receive arbitrary packets */
1042   if (p_msgs) {
1043     GInetAddress *iaddr = g_inet_socket_address_get_address (udpsrc->addr);
1044     gboolean skip_packet = FALSE;
1045     gsize iaddr_size = g_inet_address_get_native_size (iaddr);
1046     const guint8 *iaddr_bytes = g_inet_address_to_bytes (iaddr);
1047
1048     for (i = 0; i < n_msgs && !skip_packet; i++) {
1049 #ifdef IP_PKTINFO
1050       if (GST_IS_IP_PKTINFO_MESSAGE (msgs[i])) {
1051         GstIPPktinfoMessage *msg = GST_IP_PKTINFO_MESSAGE (msgs[i]);
1052
1053         if (sizeof (msg->addr) == iaddr_size
1054             && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
1055           skip_packet = TRUE;
1056       }
1057 #endif
1058 #ifdef IPV6_PKTINFO
1059       if (GST_IS_IPV6_PKTINFO_MESSAGE (msgs[i])) {
1060         GstIPV6PktinfoMessage *msg = GST_IPV6_PKTINFO_MESSAGE (msgs[i]);
1061
1062         if (sizeof (msg->addr) == iaddr_size
1063             && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
1064           skip_packet = TRUE;
1065       }
1066 #endif
1067 #ifdef IP_RECVDSTADDR
1068       if (GST_IS_IP_RECVDSTADDR_MESSAGE (msgs[i])) {
1069         GstIPRecvdstaddrMessage *msg = GST_IP_RECVDSTADDR_MESSAGE (msgs[i]);
1070
1071         if (sizeof (msg->addr) == iaddr_size
1072             && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
1073           skip_packet = TRUE;
1074       }
1075 #endif
1076 #ifdef SO_TIMESTAMPNS
1077       if (GST_IS_SOCKET_TIMESTAMP_MESSAGE (msgs[i])) {
1078         GstSocketTimestampMessage *msg = GST_SOCKET_TIMESTAMP_MESSAGE (msgs[i]);
1079         GstClock *clock;
1080         GstClockTime socket_ts;
1081
1082         socket_ts = GST_TIMESPEC_TO_TIME (msg->socket_ts);
1083         GST_TRACE_OBJECT (udpsrc,
1084             "Got SCM_TIMESTAMPNS %" GST_TIME_FORMAT " in msg",
1085             GST_TIME_ARGS (socket_ts));
1086
1087         clock = gst_element_get_clock (GST_ELEMENT_CAST (udpsrc));
1088         if (clock != NULL) {
1089           gint64 adjust_dts, cur_sys_time, delta;
1090           GstClockTime base_time, cur_gst_clk_time, running_time;
1091
1092           /*
1093            * We use g_get_real_time as the time reference for SCM timestamps
1094            * is always CLOCK_REALTIME.
1095            */
1096           cur_sys_time = g_get_real_time () * GST_USECOND;
1097           cur_gst_clk_time = gst_clock_get_time (clock);
1098
1099           delta = (gint64) cur_sys_time - (gint64) socket_ts;
1100           if (delta < 0) {
1101             /*
1102              * The current system time will always be greater than the SCM
1103              * timestamp as the packet would have been timestamped at least
1104              * some clock cycles before. If it is not, then the system time
1105              * was adjusted. Since we cannot rely on the delta calculation in
1106              * such a case, set the DTS to current pipeline clock when this
1107              * happens.
1108              */
1109             GST_LOG_OBJECT (udpsrc,
1110                 "Current system time is behind SCM timestamp, setting DTS to pipeline clock");
1111             GST_BUFFER_DTS (outbuf) = cur_gst_clk_time;
1112           } else {
1113             base_time = gst_element_get_base_time (GST_ELEMENT_CAST (udpsrc));
1114             running_time = cur_gst_clk_time - base_time;
1115             adjust_dts = (gint64) running_time - delta;
1116             /*
1117              * If the system time was adjusted much further ahead, we might
1118              * end up with delta > cur_gst_clk_time. Set the DTS to current
1119              * pipeline clock for this scenario as well.
1120              */
1121             if (adjust_dts < 0) {
1122               GST_LOG_OBJECT (udpsrc,
1123                   "Current system time much ahead in time, setting DTS to pipeline clock");
1124               GST_BUFFER_DTS (outbuf) = cur_gst_clk_time;
1125             } else {
1126               GST_BUFFER_DTS (outbuf) = adjust_dts;
1127               GST_LOG_OBJECT (udpsrc, "Setting DTS to %" GST_TIME_FORMAT,
1128                   GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)));
1129             }
1130           }
1131           g_object_unref (clock);
1132         } else {
1133           GST_ERROR_OBJECT (udpsrc,
1134               "Failed to get element clock, not setting DTS");
1135         }
1136       }
1137 #endif
1138     }
1139
1140     for (i = 0; i < n_msgs; i++) {
1141       g_object_unref (msgs[i]);
1142     }
1143     g_free (msgs);
1144
1145     if (skip_packet) {
1146       GST_DEBUG_OBJECT (udpsrc,
1147           "Dropping packet for a different multicast address");
1148       goto retry;
1149     }
1150   }
1151
1152   gst_buffer_unmap (outbuf, &info);
1153   gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1154
1155   /* If this is the case, the buffer will be freed once unreffed,
1156    * and the buffer pool will have to reallocate a new one.
1157    */
1158   if (res > udpsrc->mtu) {
1159     gst_buffer_append_memory (outbuf, udpsrc->extra_mem);
1160     udpsrc->extra_mem = NULL;
1161   }
1162
1163   offset = udpsrc->skip_first_bytes;
1164
1165   if (G_UNLIKELY (offset > 0 && res < offset))
1166     goto skip_error;
1167
1168   gst_buffer_resize (outbuf, offset, res - offset);
1169
1170   /* use buffer metadata so receivers can also track the address */
1171   if (saddr) {
1172     gst_buffer_add_net_address_meta (outbuf, saddr);
1173     g_object_unref (saddr);
1174     saddr = NULL;
1175   }
1176
1177   GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res);
1178
1179   return GST_FLOW_OK;
1180
1181   /* ERRORS */
1182 buffer_map_error:
1183   {
1184     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1185         ("Failed to map memory"));
1186     return GST_FLOW_ERROR;
1187   }
1188 memory_map_error:
1189   {
1190     gst_buffer_unmap (outbuf, &info);
1191     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1192         ("Failed to map memory"));
1193     return GST_FLOW_ERROR;
1194   }
1195 select_error:
1196   {
1197     gst_buffer_unmap (outbuf, &info);
1198     gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1199     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1200         ("select error: %s", err->message));
1201     g_clear_error (&err);
1202     return GST_FLOW_ERROR;
1203   }
1204 stopped:
1205   {
1206     gst_buffer_unmap (outbuf, &info);
1207     gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1208     GST_DEBUG ("stop called");
1209     g_clear_error (&err);
1210     return GST_FLOW_FLUSHING;
1211   }
1212 receive_error:
1213   {
1214     gst_buffer_unmap (outbuf, &info);
1215     gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1216     g_clear_object (&saddr);
1217     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
1218         g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1219       g_clear_error (&err);
1220       return GST_FLOW_FLUSHING;
1221     } else {
1222       GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1223           ("receive error %" G_GSSIZE_FORMAT ": %s", res, err->message));
1224       g_clear_error (&err);
1225       return GST_FLOW_ERROR;
1226     }
1227   }
1228 skip_error:
1229   {
1230     g_clear_object (&saddr);
1231     GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
1232         ("UDP buffer to small to skip header"));
1233     return GST_FLOW_ERROR;
1234   }
1235 }
1236
1237 static gboolean
1238 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
1239 {
1240   gchar *address;
1241   guint16 port;
1242
1243   if (!gst_udp_parse_uri (uri, &address, &port))
1244     goto wrong_uri;
1245
1246   if (port == (guint16) - 1)
1247     port = UDP_DEFAULT_PORT;
1248
1249   g_free (src->address);
1250   src->address = address;
1251   src->port = port;
1252
1253   g_free (src->uri);
1254   src->uri = g_strdup (uri);
1255
1256   return TRUE;
1257
1258   /* ERRORS */
1259 wrong_uri:
1260   {
1261     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
1262         ("error parsing uri %s", uri));
1263     g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
1264         "Could not parse UDP URI");
1265     return FALSE;
1266   }
1267 }
1268
1269 static void
1270 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
1271     GParamSpec * pspec)
1272 {
1273   GstUDPSrc *udpsrc = GST_UDPSRC (object);
1274
1275   switch (prop_id) {
1276     case PROP_BUFFER_SIZE:
1277       udpsrc->buffer_size = g_value_get_int (value);
1278       break;
1279     case PROP_PORT:
1280       udpsrc->port = g_value_get_int (value);
1281       g_free (udpsrc->uri);
1282       udpsrc->uri =
1283           g_strdup_printf ("udp://%s:%u", udpsrc->address, udpsrc->port);
1284       break;
1285     case PROP_MULTICAST_GROUP:
1286     case PROP_ADDRESS:
1287     {
1288       const gchar *group;
1289
1290       g_free (udpsrc->address);
1291       if ((group = g_value_get_string (value)))
1292         udpsrc->address = g_strdup (group);
1293       else
1294         udpsrc->address = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
1295
1296       g_free (udpsrc->uri);
1297       udpsrc->uri =
1298           g_strdup_printf ("udp://%s:%u", udpsrc->address, udpsrc->port);
1299       break;
1300     }
1301     case PROP_MULTICAST_IFACE:
1302       g_free (udpsrc->multi_iface);
1303
1304       if (g_value_get_string (value) == NULL)
1305         udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
1306       else
1307         udpsrc->multi_iface = g_value_dup_string (value);
1308       break;
1309     case PROP_URI:
1310       gst_udpsrc_set_uri (udpsrc, g_value_get_string (value), NULL);
1311       break;
1312     case PROP_CAPS:
1313     {
1314       const GstCaps *new_caps_val = gst_value_get_caps (value);
1315       GstCaps *new_caps;
1316       GstCaps *old_caps;
1317
1318       if (new_caps_val == NULL) {
1319         new_caps = gst_caps_new_any ();
1320       } else {
1321         new_caps = gst_caps_copy (new_caps_val);
1322       }
1323
1324       GST_OBJECT_LOCK (udpsrc);
1325       old_caps = udpsrc->caps;
1326       udpsrc->caps = new_caps;
1327       GST_OBJECT_UNLOCK (udpsrc);
1328       if (old_caps)
1329         gst_caps_unref (old_caps);
1330
1331       gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (udpsrc));
1332       break;
1333     }
1334     case PROP_SOCKET:
1335       if (udpsrc->socket != NULL && udpsrc->socket != udpsrc->used_socket &&
1336           udpsrc->close_socket) {
1337         GError *err = NULL;
1338
1339         if (!g_socket_close (udpsrc->socket, &err)) {
1340           GST_ERROR ("failed to close socket %p: %s", udpsrc->socket,
1341               err->message);
1342           g_clear_error (&err);
1343         }
1344       }
1345       if (udpsrc->socket)
1346         g_object_unref (udpsrc->socket);
1347       udpsrc->socket = g_value_dup_object (value);
1348       GST_DEBUG ("setting socket to %p", udpsrc->socket);
1349       break;
1350     case PROP_TIMEOUT:
1351       udpsrc->timeout = g_value_get_uint64 (value);
1352       break;
1353     case PROP_SKIP_FIRST_BYTES:
1354       udpsrc->skip_first_bytes = g_value_get_int (value);
1355       break;
1356     case PROP_CLOSE_SOCKET:
1357       udpsrc->close_socket = g_value_get_boolean (value);
1358       break;
1359     case PROP_AUTO_MULTICAST:
1360       udpsrc->auto_multicast = g_value_get_boolean (value);
1361       break;
1362     case PROP_REUSE:
1363       udpsrc->reuse = g_value_get_boolean (value);
1364       break;
1365     case PROP_LOOP:
1366       udpsrc->loop = g_value_get_boolean (value);
1367       break;
1368     case PROP_RETRIEVE_SENDER_ADDRESS:
1369       udpsrc->retrieve_sender_address = g_value_get_boolean (value);
1370       break;
1371     case PROP_MTU:
1372       udpsrc->mtu = g_value_get_uint (value);
1373       break;
1374     case PROP_SOCKET_TIMESTAMP:
1375       udpsrc->socket_timestamp_mode = g_value_get_enum (value);
1376       break;
1377     default:
1378       break;
1379   }
1380 }
1381
1382 static void
1383 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
1384     GParamSpec * pspec)
1385 {
1386   GstUDPSrc *udpsrc = GST_UDPSRC (object);
1387
1388   switch (prop_id) {
1389     case PROP_BUFFER_SIZE:
1390       g_value_set_int (value, udpsrc->buffer_size);
1391       break;
1392     case PROP_PORT:
1393       g_value_set_int (value, udpsrc->port);
1394       break;
1395     case PROP_MULTICAST_GROUP:
1396     case PROP_ADDRESS:
1397       g_value_set_string (value, udpsrc->address);
1398       break;
1399     case PROP_MULTICAST_IFACE:
1400       g_value_set_string (value, udpsrc->multi_iface);
1401       break;
1402     case PROP_URI:
1403       g_value_set_string (value, udpsrc->uri);
1404       break;
1405     case PROP_CAPS:
1406       GST_OBJECT_LOCK (udpsrc);
1407       gst_value_set_caps (value, udpsrc->caps);
1408       GST_OBJECT_UNLOCK (udpsrc);
1409       break;
1410     case PROP_SOCKET:
1411       g_value_set_object (value, udpsrc->socket);
1412       break;
1413     case PROP_TIMEOUT:
1414       g_value_set_uint64 (value, udpsrc->timeout);
1415       break;
1416     case PROP_SKIP_FIRST_BYTES:
1417       g_value_set_int (value, udpsrc->skip_first_bytes);
1418       break;
1419     case PROP_CLOSE_SOCKET:
1420       g_value_set_boolean (value, udpsrc->close_socket);
1421       break;
1422     case PROP_USED_SOCKET:
1423       g_value_set_object (value, udpsrc->used_socket);
1424       break;
1425     case PROP_AUTO_MULTICAST:
1426       g_value_set_boolean (value, udpsrc->auto_multicast);
1427       break;
1428     case PROP_REUSE:
1429       g_value_set_boolean (value, udpsrc->reuse);
1430       break;
1431     case PROP_LOOP:
1432       g_value_set_boolean (value, udpsrc->loop);
1433       break;
1434     case PROP_RETRIEVE_SENDER_ADDRESS:
1435       g_value_set_boolean (value, udpsrc->retrieve_sender_address);
1436       break;
1437     case PROP_MTU:
1438       g_value_set_uint (value, udpsrc->mtu);
1439       break;
1440     case PROP_SOCKET_TIMESTAMP:
1441       g_value_set_enum (value, udpsrc->socket_timestamp_mode);
1442       break;
1443     default:
1444       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1445       break;
1446   }
1447 }
1448
1449 static GInetAddress *
1450 gst_udpsrc_resolve (GstUDPSrc * src, const gchar * address)
1451 {
1452   GInetAddress *addr;
1453   GError *err = NULL;
1454   GResolver *resolver;
1455
1456   addr = g_inet_address_new_from_string (address);
1457   if (!addr) {
1458     GList *results;
1459
1460     GST_DEBUG_OBJECT (src, "resolving IP address for host %s", address);
1461     resolver = g_resolver_get_default ();
1462     results =
1463         g_resolver_lookup_by_name (resolver, address, src->cancellable, &err);
1464     if (!results)
1465       goto name_resolve;
1466     addr = G_INET_ADDRESS (g_object_ref (results->data));
1467
1468     g_resolver_free_addresses (results);
1469     g_object_unref (resolver);
1470   }
1471 #ifndef GST_DISABLE_GST_DEBUG
1472   {
1473     gchar *ip = g_inet_address_to_string (addr);
1474
1475     GST_DEBUG_OBJECT (src, "IP address for host %s is %s", address, ip);
1476     g_free (ip);
1477   }
1478 #endif
1479
1480   return addr;
1481
1482 name_resolve:
1483   {
1484     GST_WARNING_OBJECT (src, "Failed to resolve %s: %s", address, err->message);
1485     g_clear_error (&err);
1486     g_object_unref (resolver);
1487     return NULL;
1488   }
1489 }
1490
1491 static gint
1492 gst_udpsrc_get_rcvbuf (GstUDPSrc * src)
1493 {
1494   gint val = 0;
1495
1496   /* read the value of the receive buffer. Note that on linux this returns
1497    * 2x the value we set because the kernel allocates extra memory for
1498    * metadata. The default on Linux is about 100K (which is about 50K
1499    * without metadata) */
1500   if (!g_socket_get_option (src->used_socket, SOL_SOCKET, SO_RCVBUF, &val,
1501           NULL)) {
1502     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
1503     return 0;
1504   }
1505 #ifdef __linux__
1506   /* Devise by 2 so that the numbers matches when we do get/set */
1507   val /= 2;
1508 #endif
1509
1510   return val;
1511 }
1512
1513 /* create a socket for sending to remote machine */
1514 static gboolean
1515 gst_udpsrc_open (GstUDPSrc * src)
1516 {
1517   GInetAddress *addr, *bind_addr;
1518   GSocketAddress *bind_saddr;
1519   GError *err = NULL;
1520
1521   gst_udpsrc_create_cancellable (src);
1522
1523   if (src->socket == NULL) {
1524     /* need to allocate a socket */
1525     GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->address,
1526         src->port);
1527
1528     addr = gst_udpsrc_resolve (src, src->address);
1529     if (!addr)
1530       goto name_resolve;
1531
1532     if ((src->used_socket =
1533             g_socket_new (g_inet_address_get_family (addr),
1534                 G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1535       goto no_socket;
1536
1537     src->external_socket = FALSE;
1538
1539     GST_DEBUG_OBJECT (src, "got socket %p", src->used_socket);
1540
1541     if (src->addr)
1542       g_object_unref (src->addr);
1543     src->addr =
1544         G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
1545
1546     GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
1547
1548     /* For multicast, bind to ANY and join the multicast group later */
1549     if (g_inet_address_get_is_multicast (addr))
1550       bind_addr = g_inet_address_new_any (g_inet_address_get_family (addr));
1551     else
1552       bind_addr = G_INET_ADDRESS (g_object_ref (addr));
1553
1554     g_object_unref (addr);
1555
1556     bind_saddr = g_inet_socket_address_new (bind_addr, src->port);
1557     g_object_unref (bind_addr);
1558     if (!g_socket_bind (src->used_socket, bind_saddr, src->reuse, &err)) {
1559       GST_ERROR_OBJECT (src, "%s: error binding to %s:%d", err->message,
1560           src->address, src->port);
1561       goto bind_error;
1562     }
1563
1564     g_object_unref (bind_saddr);
1565     g_socket_set_multicast_loopback (src->used_socket, src->loop);
1566   } else {
1567     GInetSocketAddress *local_addr;
1568
1569     GST_DEBUG_OBJECT (src, "using provided socket %p", src->socket);
1570     /* we use the configured socket, try to get some info about it */
1571     src->used_socket = G_SOCKET (g_object_ref (src->socket));
1572     src->external_socket = TRUE;
1573
1574     local_addr =
1575         G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
1576             &err));
1577     if (!local_addr)
1578       goto getsockname_error;
1579
1580     addr = gst_udpsrc_resolve (src, src->address);
1581     if (!addr)
1582       goto name_resolve;
1583
1584     /* If bound to ANY and address points to a multicast address, make
1585      * sure that address is not overridden with ANY but we have the
1586      * opportunity later to join the multicast address. This ensures that we
1587      * have the same behaviour as for sockets created by udpsrc */
1588     if (!src->auto_multicast ||
1589         !g_inet_address_get_is_any (g_inet_socket_address_get_address
1590             (local_addr))
1591         || !g_inet_address_get_is_multicast (addr)) {
1592       g_object_unref (addr);
1593       if (src->addr)
1594         g_object_unref (src->addr);
1595       src->addr = local_addr;
1596     } else {
1597       g_object_unref (local_addr);
1598       if (src->addr)
1599         g_object_unref (src->addr);
1600       src->addr =
1601           G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
1602       g_object_unref (addr);
1603     }
1604   }
1605
1606   {
1607     gint val;
1608     GError *opt_err = NULL;
1609     gboolean force_rcvbuf G_GNUC_UNUSED = FALSE;
1610
1611     if (src->buffer_size != 0) {
1612       GST_INFO_OBJECT (src, "setting udp buffer of %d bytes", src->buffer_size);
1613       /* set buffer size, Note that on Linux this is typically limited to a
1614        * maximum of around 100K. Also a minimum of 128 bytes is required on
1615        * Linux. */
1616       if (!g_socket_set_option (src->used_socket, SOL_SOCKET, SO_RCVBUF,
1617               src->buffer_size, &opt_err)) {
1618         GST_INFO_OBJECT (src,
1619             "Could not create a buffer of requested %d bytes (%s) try forcing",
1620             src->buffer_size, opt_err->message);
1621         g_clear_error (&opt_err);
1622         force_rcvbuf = TRUE;
1623       }
1624     }
1625 #if defined(SO_RCVBUFFORCE)
1626     val = gst_udpsrc_get_rcvbuf (src);
1627     if (val < src->buffer_size)
1628       force_rcvbuf = TRUE;
1629
1630     if (force_rcvbuf) {
1631       GST_INFO_OBJECT (src,
1632           "forcibly setting udp buffer of %d bytes", src->buffer_size);
1633
1634       /* Will only work with CAP_NET_ADMIN privilege */
1635       if (!g_socket_set_option (src->used_socket, SOL_SOCKET, SO_RCVBUFFORCE,
1636               src->buffer_size, &opt_err)) {
1637         GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
1638             ("Could not create a buffer of requested %d bytes (%s). Need net.admin privilege?",
1639                 src->buffer_size, opt_err->message));
1640         g_clear_error (&opt_err);
1641       }
1642     }
1643 #endif
1644
1645     val = gst_udpsrc_get_rcvbuf (src);
1646     if (val < src->buffer_size)
1647       GST_WARNING_OBJECT (src,
1648           "have udp buffer of %d bytes while %d were requested",
1649           val, src->buffer_size);
1650     else
1651       GST_INFO_OBJECT (src, "have udp buffer of %d bytes", val);
1652   }
1653
1654   g_socket_set_broadcast (src->used_socket, TRUE);
1655
1656   if (src->auto_multicast
1657       &&
1658       g_inet_address_get_is_multicast (g_inet_socket_address_get_address
1659           (src->addr))) {
1660
1661     if (src->multi_iface) {
1662       GStrv multi_ifaces = g_strsplit (src->multi_iface, ",", -1);
1663       gchar **ifaces = multi_ifaces;
1664       while (*ifaces) {
1665         g_strstrip (*ifaces);
1666         GST_DEBUG_OBJECT (src, "joining multicast group %s interface %s",
1667             src->address, *ifaces);
1668         if (!g_socket_join_multicast_group (src->used_socket,
1669                 g_inet_socket_address_get_address (src->addr),
1670                 FALSE, *ifaces, &err)) {
1671           g_strfreev (multi_ifaces);
1672           goto membership;
1673         }
1674
1675         ifaces++;
1676       }
1677       g_strfreev (multi_ifaces);
1678     } else {
1679       GST_DEBUG_OBJECT (src, "joining multicast group %s", src->address);
1680       if (!g_socket_join_multicast_group (src->used_socket,
1681               g_inet_socket_address_get_address (src->addr), FALSE, NULL, &err))
1682         goto membership;
1683     }
1684
1685     if (g_inet_address_get_family (g_inet_socket_address_get_address
1686             (src->addr)) == G_SOCKET_FAMILY_IPV4) {
1687 #if defined(IP_MULTICAST_ALL)
1688       if (!g_socket_set_option (src->used_socket, IPPROTO_IP, IP_MULTICAST_ALL,
1689               0, &err)) {
1690         GST_WARNING_OBJECT (src, "Failed to disable IP_MULTICAST_ALL: %s",
1691             err->message);
1692         g_clear_error (&err);
1693       }
1694 #elif defined(IP_PKTINFO)
1695       if (!g_socket_set_option (src->used_socket, IPPROTO_IP, IP_PKTINFO, TRUE,
1696               &err)) {
1697         GST_WARNING_OBJECT (src, "Failed to enable IP_PKTINFO: %s",
1698             err->message);
1699         g_clear_error (&err);
1700       }
1701 #elif defined(IP_RECVDSTADDR)
1702       if (!g_socket_set_option (src->used_socket, IPPROTO_IP, IP_RECVDSTADDR,
1703               TRUE, &err)) {
1704         GST_WARNING_OBJECT (src, "Failed to enable IP_RECVDSTADDR: %s",
1705             err->message);
1706         g_clear_error (&err);
1707       }
1708 #else
1709 #pragma message("No API available for getting IPv4 destination address")
1710       GST_WARNING_OBJECT (src, "No API available for getting IPv4 destination "
1711           "address, will receive packets for every destination to our port");
1712 #endif
1713     } else
1714         if (g_inet_address_get_family (g_inet_socket_address_get_address
1715             (src->addr)) == G_SOCKET_FAMILY_IPV6) {
1716 #ifdef IPV6_PKTINFO
1717 #ifdef IPV6_RECVPKTINFO
1718       if (!g_socket_set_option (src->used_socket, IPPROTO_IPV6,
1719               IPV6_RECVPKTINFO, TRUE, &err)) {
1720 #else
1721       if (!g_socket_set_option (src->used_socket, IPPROTO_IPV6, IPV6_PKTINFO,
1722               TRUE, &err)) {
1723 #endif
1724         GST_WARNING_OBJECT (src, "Failed to enable IPV6_PKTINFO: %s",
1725             err->message);
1726         g_clear_error (&err);
1727       }
1728 #else
1729 #pragma message("No API available for getting IPv6 destination address")
1730       GST_WARNING_OBJECT (src, "No API available for getting IPv6 destination "
1731           "address, will receive packets for every destination to our port");
1732 #endif
1733     }
1734   }
1735
1736   if (src->socket_timestamp_mode == GST_SOCKET_TIMESTAMP_MODE_REALTIME) {
1737 #ifdef SO_TIMESTAMPNS
1738     if (!g_socket_set_option (src->used_socket, SOL_SOCKET, SO_TIMESTAMPNS,
1739             TRUE, &err)) {
1740       GST_WARNING_OBJECT (src,
1741           "Failed to enable socket control message timestamps: %s",
1742           err->message);
1743       g_clear_error (&err);
1744       src->socket_timestamp_mode = GST_SOCKET_TIMESTAMP_MODE_DISABLED;
1745       g_object_notify (G_OBJECT (src), "socket-timestamp");
1746     } else {
1747       GST_LOG_OBJECT (src, "Socket control message timestamps enabled");
1748     }
1749   }
1750 #else
1751     GST_WARNING_OBJECT (src,
1752         "socket-timestamp was requested but SO_TIMESTAMPNS is not defined");
1753   }
1754 #endif
1755
1756   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
1757    * follows ss_family on both */
1758   {
1759     GInetSocketAddress *addr;
1760     guint16 port;
1761
1762     addr =
1763         G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
1764             &err));
1765     if (!addr)
1766       goto getsockname_error;
1767
1768     port = g_inet_socket_address_get_port (addr);
1769     GST_DEBUG_OBJECT (src, "bound, on port %d", port);
1770     if (port != src->port) {
1771       src->port = port;
1772       GST_DEBUG_OBJECT (src, "notifying port %d", port);
1773       g_object_notify (G_OBJECT (src), "port");
1774     }
1775     g_object_unref (addr);
1776   }
1777
1778   return TRUE;
1779
1780   /* ERRORS */
1781 name_resolve:
1782   {
1783     return FALSE;
1784   }
1785 no_socket:
1786   {
1787     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
1788         ("no socket error: %s", err->message));
1789     g_clear_error (&err);
1790     g_object_unref (addr);
1791     return FALSE;
1792   }
1793 bind_error:
1794   {
1795     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
1796         ("bind failed: %s", err->message));
1797     g_clear_error (&err);
1798     g_object_unref (bind_saddr);
1799     gst_udpsrc_close (src);
1800     return FALSE;
1801   }
1802 membership:
1803   {
1804     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
1805         ("could not add membership: %s", err->message));
1806     g_clear_error (&err);
1807     gst_udpsrc_close (src);
1808     return FALSE;
1809   }
1810 getsockname_error:
1811   {
1812     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
1813         ("getsockname failed: %s", err->message));
1814     g_clear_error (&err);
1815     gst_udpsrc_close (src);
1816     return FALSE;
1817   }
1818 }
1819
1820 static gboolean
1821 gst_udpsrc_unlock (GstBaseSrc * bsrc)
1822 {
1823   GstUDPSrc *src;
1824
1825   src = GST_UDPSRC (bsrc);
1826
1827   GST_LOG_OBJECT (src, "Flushing");
1828   g_cancellable_cancel (src->cancellable);
1829
1830   return TRUE;
1831 }
1832
1833 static gboolean
1834 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
1835 {
1836   GstUDPSrc *src;
1837
1838   src = GST_UDPSRC (bsrc);
1839
1840   GST_LOG_OBJECT (src, "No longer flushing");
1841
1842   gst_udpsrc_free_cancellable (src);
1843   gst_udpsrc_create_cancellable (src);
1844
1845   return TRUE;
1846 }
1847
1848 static gboolean
1849 gst_udpsrc_close (GstUDPSrc * src)
1850 {
1851   GST_DEBUG ("closing sockets");
1852
1853   if (src->used_socket) {
1854     if (src->auto_multicast
1855         &&
1856         g_inet_address_get_is_multicast (g_inet_socket_address_get_address
1857             (src->addr))) {
1858       GError *err = NULL;
1859
1860       if (src->multi_iface) {
1861         GStrv multi_ifaces = g_strsplit (src->multi_iface, ",", -1);
1862         gchar **ifaces = multi_ifaces;
1863         while (*ifaces) {
1864           g_strstrip (*ifaces);
1865           GST_DEBUG_OBJECT (src, "leaving multicast group %s interface %s",
1866               src->address, *ifaces);
1867           if (!g_socket_leave_multicast_group (src->used_socket,
1868                   g_inet_socket_address_get_address (src->addr),
1869                   FALSE, *ifaces, &err)) {
1870             GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
1871                 err->message);
1872             g_clear_error (&err);
1873           }
1874           ifaces++;
1875         }
1876         g_strfreev (multi_ifaces);
1877
1878       } else {
1879         GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->address);
1880         if (!g_socket_leave_multicast_group (src->used_socket,
1881                 g_inet_socket_address_get_address (src->addr), FALSE,
1882                 NULL, &err)) {
1883           GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
1884               err->message);
1885           g_clear_error (&err);
1886         }
1887       }
1888     }
1889
1890     if (src->close_socket || !src->external_socket) {
1891       GError *err = NULL;
1892       if (!g_socket_close (src->used_socket, &err)) {
1893         GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
1894         g_clear_error (&err);
1895       }
1896     }
1897
1898     g_object_unref (src->used_socket);
1899     src->used_socket = NULL;
1900     g_object_unref (src->addr);
1901     src->addr = NULL;
1902   }
1903
1904   gst_udpsrc_free_cancellable (src);
1905
1906   return TRUE;
1907 }
1908
1909
1910 static GstStateChangeReturn
1911 gst_udpsrc_change_state (GstElement * element, GstStateChange transition)
1912 {
1913   GstUDPSrc *src;
1914   GstStateChangeReturn result;
1915
1916   src = GST_UDPSRC (element);
1917
1918   switch (transition) {
1919     case GST_STATE_CHANGE_NULL_TO_READY:
1920       if (!gst_udpsrc_open (src))
1921         goto open_failed;
1922       break;
1923     default:
1924       break;
1925   }
1926   if ((result =
1927           GST_ELEMENT_CLASS (parent_class)->change_state (element,
1928               transition)) == GST_STATE_CHANGE_FAILURE)
1929     goto failure;
1930
1931   switch (transition) {
1932     case GST_STATE_CHANGE_READY_TO_NULL:
1933       gst_udpsrc_close (src);
1934       break;
1935     default:
1936       break;
1937   }
1938   return result;
1939   /* ERRORS */
1940 open_failed:
1941   {
1942     GST_DEBUG_OBJECT (src, "failed to open socket");
1943     return GST_STATE_CHANGE_FAILURE;
1944   }
1945 failure:
1946   {
1947     GST_DEBUG_OBJECT (src, "parent failed state change");
1948     return result;
1949   }
1950 }
1951
1952
1953
1954
1955 /*** GSTURIHANDLER INTERFACE *************************************************/
1956
1957 static GstURIType
1958 gst_udpsrc_uri_get_type (GType type)
1959 {
1960   return GST_URI_SRC;
1961 }
1962
1963 static const gchar *const *
1964 gst_udpsrc_uri_get_protocols (GType type)
1965 {
1966   static const gchar *protocols[] = { "udp", NULL };
1967
1968   return protocols;
1969 }
1970
1971 static gchar *
1972 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1973 {
1974   GstUDPSrc *src = GST_UDPSRC (handler);
1975
1976   return g_strdup (src->uri);
1977 }
1978
1979 static gboolean
1980 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1981     GError ** error)
1982 {
1983   return gst_udpsrc_set_uri (GST_UDPSRC (handler), uri, error);
1984 }
1985
1986 static void
1987 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1988 {
1989   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1990
1991   iface->get_type = gst_udpsrc_uri_get_type;
1992   iface->get_protocols = gst_udpsrc_uri_get_protocols;
1993   iface->get_uri = gst_udpsrc_uri_get_uri;
1994   iface->set_uri = gst_udpsrc_uri_set_uri;
1995 }