rtpsrc: re-use the same src pad for streams that have the same payload type
[platform/upstream/gstreamer.git] / gst / rtp / gstrtpsrc.c
1 /* GStreamer
2  * Copyright (C) <2018> Marc Leeman <marc.leeman@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION: gstrtpsrc
22  * @title: GstRtpSrc
23  * @short description: element with Uri interface to get RTP data from
24  * the network.
25  *
26  * RTP (RFC 3550) is a protocol to stream media over the network while
27  * retaining the timing information and providing enough information to
28  * reconstruct the correct timing domain by the receiver.
29  *
30  * The RTP data port should be even, while the RTCP port should be
31  * odd. The URI that is entered defines the data port, the RTCP port will
32  * be allocated to the next port.
33  *
34  * This element hooks up the correct sockets to support both RTP as the
35  * accompanying RTCP layer.
36  *
37  * This Bin handles taking in of data from the network and provides the
38  * RTP payloaded data.
39  *
40  * This element also implements the URI scheme `rtp://` allowing to render
41  * RTP streams in GStreamer based media players. The RTP URI handler also
42  * allows setting properties through the URI query.
43  */
44 #ifdef HAVE_CONFIG_H
45 #include <config.h>
46 #endif
47
48 #include <gst/net/net.h>
49 #include <gst/rtp/gstrtppayloads.h>
50
51 #include "gstrtpsrc.h"
52 #include "gstrtp-utils.h"
53
54 GST_DEBUG_CATEGORY_STATIC (gst_rtp_src_debug);
55 #define GST_CAT_DEFAULT gst_rtp_src_debug
56
57 #define DEFAULT_PROP_TTL              64
58 #define DEFAULT_PROP_TTL_MC           1
59 #define DEFAULT_PROP_ENCODING_NAME    NULL
60 #define DEFAULT_PROP_LATENCY          200
61
62 #define DEFAULT_PROP_ADDRESS          "0.0.0.0"
63 #define DEFAULT_PROP_PORT             5004
64 #define DEFAULT_PROP_URI              "rtp://"DEFAULT_PROP_ADDRESS":"G_STRINGIFY(DEFAULT_PROP_PORT)
65 #define DEFAULT_PROP_MULTICAST_IFACE  NULL
66
67 enum
68 {
69   PROP_0,
70
71   PROP_URI,
72   PROP_ADDRESS,
73   PROP_PORT,
74   PROP_TTL,
75   PROP_TTL_MC,
76   PROP_ENCODING_NAME,
77   PROP_LATENCY,
78   PROP_MULTICAST_IFACE,
79
80   PROP_LAST
81 };
82
83 static void gst_rtp_src_uri_handler_init (gpointer g_iface,
84     gpointer iface_data);
85
86 #define gst_rtp_src_parent_class parent_class
87 G_DEFINE_TYPE_WITH_CODE (GstRtpSrc, gst_rtp_src, GST_TYPE_BIN,
88     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtp_src_uri_handler_init);
89     GST_DEBUG_CATEGORY_INIT (gst_rtp_src_debug, "rtpsrc", 0, "RTP Source"));
90
91 #define GST_RTP_SRC_GET_LOCK(obj) (&((GstRtpSrc*)(obj))->lock)
92 #define GST_RTP_SRC_LOCK(obj) (g_mutex_lock (GST_RTP_SRC_GET_LOCK(obj)))
93 #define GST_RTP_SRC_UNLOCK(obj) (g_mutex_unlock (GST_RTP_SRC_GET_LOCK(obj)))
94
95 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
96     GST_PAD_SRC,
97     GST_PAD_SOMETIMES,
98     GST_STATIC_CAPS ("application/x-rtp"));
99
100 static GstStateChangeReturn
101 gst_rtp_src_change_state (GstElement * element, GstStateChange transition);
102
103 /**
104  * gst_rtp_src_rtpbin_request_pt_map_cb:
105  * @self: The current #GstRtpSrc object
106  *
107  * #GstRtpBin callback to map a pt on RTP caps.
108  *
109  * Returns: (transfer none): the guess on the RTP caps based on the PT
110  * and caps.
111  */
112 static GstCaps *
113 gst_rtp_src_rtpbin_request_pt_map_cb (GstElement * rtpbin, guint session_id,
114     guint pt, gpointer data)
115 {
116   GstRtpSrc *self = GST_RTP_SRC (data);
117   const GstRTPPayloadInfo *p = NULL;
118
119   GST_DEBUG_OBJECT (self,
120       "Requesting caps for session-id 0x%x and pt %u.", session_id, pt);
121
122   /* the encoding-name has more relevant information */
123   if (self->encoding_name != NULL) {
124     /* Unfortunately, the media needs to be passed in the function. Since
125      * it is not known, try for video if video not found. */
126     p = gst_rtp_payload_info_for_name ("video", self->encoding_name);
127     if (p == NULL)
128       p = gst_rtp_payload_info_for_name ("audio", self->encoding_name);
129
130   }
131
132   /* If info has been found before based on the encoding-name, go with
133    * it. If not, try to look it up on with a static one. Needs to be guarded
134    * because some encoders do not use dynamic values for H.264 */
135   if (p == NULL) {
136     /* Static payload types, this is a simple lookup */
137     if (!GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
138       p = gst_rtp_payload_info_for_pt (pt);
139     }
140   }
141
142   if (p != NULL) {
143     GstCaps *ret = gst_caps_new_simple ("application/x-rtp",
144         "encoding-name", G_TYPE_STRING, p->encoding_name,
145         "clock-rate", G_TYPE_INT, p->clock_rate,
146         "media", G_TYPE_STRING, p->media, NULL);
147
148     GST_DEBUG_OBJECT (self, "Decided on caps %" GST_PTR_FORMAT, ret);
149
150     return ret;
151   }
152
153   GST_DEBUG_OBJECT (self, "Could not determine caps based on pt and"
154       " the encoding-name was not set.");
155   return NULL;
156 }
157
158 static void
159 gst_rtp_src_set_property (GObject * object, guint prop_id,
160     const GValue * value, GParamSpec * pspec)
161 {
162   GstRtpSrc *self = GST_RTP_SRC (object);
163   GstCaps *caps;
164
165   switch (prop_id) {
166     case PROP_URI:{
167       GstUri *uri = NULL;
168
169       GST_RTP_SRC_LOCK (object);
170       uri = gst_uri_from_string (g_value_get_string (value));
171       if (uri == NULL)
172         break;
173
174       if (self->uri)
175         gst_uri_unref (self->uri);
176       self->uri = uri;
177
178       /* Recursive set to self, do not use the same lock in all property
179        * setters. */
180       g_object_set (self, "address", gst_uri_get_host (self->uri), NULL);
181       g_object_set (self, "port", gst_uri_get_port (self->uri), NULL);
182       gst_rtp_utils_set_properties_from_uri_query (G_OBJECT (self), self->uri);
183       GST_RTP_SRC_UNLOCK (object);
184       break;
185     }
186     case PROP_ADDRESS:{
187       gst_uri_set_host (self->uri, g_value_get_string (value));
188       g_object_set_property (G_OBJECT (self->rtp_src), "address", value);
189       g_object_set_property (G_OBJECT (self->rtcp_src), "address", value);
190       break;
191     }
192     case PROP_PORT:{
193       guint port = g_value_get_uint (value);
194
195       /* According to RFC 3550, 11, RTCP receiver port should be even
196        * number and RTCP port should be the RTP port + 1 */
197       if (port & 0x1)
198         GST_WARNING_OBJECT (self,
199             "Port %u is odd, this is not standard (see RFC 3550).", port);
200
201       gst_uri_set_port (self->uri, port);
202       g_object_set (self->rtp_src, "port", port, NULL);
203       g_object_set (self->rtcp_src, "port", port + 1, NULL);
204       break;
205     }
206     case PROP_TTL:
207       self->ttl = g_value_get_int (value);
208       break;
209     case PROP_TTL_MC:
210       self->ttl_mc = g_value_get_int (value);
211       break;
212     case PROP_ENCODING_NAME:
213       g_free (self->encoding_name);
214       self->encoding_name = g_value_dup_string (value);
215       if (self->rtp_src) {
216         caps = gst_rtp_src_rtpbin_request_pt_map_cb (NULL, 0, 96, self);
217         g_object_set (G_OBJECT (self->rtp_src), "caps", caps, NULL);
218         gst_caps_unref (caps);
219       }
220       break;
221     case PROP_LATENCY:
222       g_object_set (self->rtpbin, "latency", g_value_get_uint (value), NULL);
223       break;
224     case PROP_MULTICAST_IFACE:
225       g_free (self->multi_iface);
226
227       if (g_value_get_string (value) == NULL)
228         self->multi_iface = g_strdup (DEFAULT_PROP_MULTICAST_IFACE);
229       else
230         self->multi_iface = g_value_dup_string (value);
231       break;
232     default:
233       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
234       break;
235   }
236 }
237
238 static void
239 gst_rtp_src_get_property (GObject * object, guint prop_id,
240     GValue * value, GParamSpec * pspec)
241 {
242   GstRtpSrc *self = GST_RTP_SRC (object);
243
244   switch (prop_id) {
245     case PROP_URI:
246       GST_RTP_SRC_LOCK (object);
247       if (self->uri)
248         g_value_take_string (value, gst_uri_to_string (self->uri));
249       else
250         g_value_set_string (value, NULL);
251       GST_RTP_SRC_UNLOCK (object);
252       break;
253     case PROP_ADDRESS:
254       g_value_set_string (value, gst_uri_get_host (self->uri));
255       break;
256     case PROP_PORT:
257       g_value_set_uint (value, gst_uri_get_port (self->uri));
258       break;
259     case PROP_TTL:
260       g_value_set_int (value, self->ttl);
261       break;
262     case PROP_TTL_MC:
263       g_value_set_int (value, self->ttl_mc);
264       break;
265     case PROP_ENCODING_NAME:
266       g_value_set_string (value, self->encoding_name);
267       break;
268     case PROP_LATENCY:
269       g_object_get_property (G_OBJECT (self->rtpbin), "latency", value);
270       break;
271     case PROP_MULTICAST_IFACE:
272       g_value_set_string (value, self->multi_iface);
273       break;
274     default:
275       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
276       break;
277   }
278 }
279
280 static void
281 gst_rtp_src_finalize (GObject * gobject)
282 {
283   GstRtpSrc *self = GST_RTP_SRC (gobject);
284
285   if (self->uri)
286     gst_uri_unref (self->uri);
287   g_free (self->encoding_name);
288
289   g_free (self->multi_iface);
290
291   g_mutex_clear (&self->lock);
292   G_OBJECT_CLASS (parent_class)->finalize (gobject);
293 }
294
295 static void
296 gst_rtp_src_handle_message (GstBin * bin, GstMessage * message)
297 {
298   switch (GST_MESSAGE_TYPE (message)) {
299     case GST_MESSAGE_STREAM_START:
300     case GST_MESSAGE_EOS:
301       /* drop stream-start & eos from our internal udp sink(s);
302          https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1368 */
303       gst_message_unref (message);
304       break;
305     default:
306       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
307       break;
308   }
309 }
310
311 static void
312 gst_rtp_src_class_init (GstRtpSrcClass * klass)
313 {
314   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
315   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
316   GstBinClass *gstbin_class = GST_BIN_CLASS (klass);
317
318   gobject_class->set_property = gst_rtp_src_set_property;
319   gobject_class->get_property = gst_rtp_src_get_property;
320   gobject_class->finalize = gst_rtp_src_finalize;
321   gstelement_class->change_state = gst_rtp_src_change_state;
322   gstbin_class->handle_message = gst_rtp_src_handle_message;
323
324   /**
325    * GstRtpSrc:uri:
326    *
327    * uri to an RTP from. All GStreamer parameters can be
328    * encoded in the URI, this URI format is RFC compliant.
329    */
330   g_object_class_install_property (gobject_class, PROP_URI,
331       g_param_spec_string ("uri", "URI",
332           "URI in the form of rtp://host:port?query", DEFAULT_PROP_URI,
333           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334
335   /**
336    * GstRtpSrc:address:
337    *
338    * Address to receive packets from (can be IPv4 or IPv6).
339    */
340   g_object_class_install_property (gobject_class, PROP_ADDRESS,
341       g_param_spec_string ("address", "Address",
342           "Address to receive packets from (can be IPv4 or IPv6).",
343           DEFAULT_PROP_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345   /**
346    * GstRtpSrc:port:
347    *
348    * The port to listen to RTP packets, the RTCP port is this value
349    * +1. This port must be an even number.
350    */
351   g_object_class_install_property (gobject_class, PROP_PORT,
352       g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, "
353           "the RTCP port is this value + 1. This port must be an even number.",
354           2, 65534, DEFAULT_PROP_PORT,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
356
357   /**
358    * GstRtpSrc:ttl:
359    *
360    * Set the unicast TTL parameter. In RTP this of importance for RTCP.
361    */
362   g_object_class_install_property (gobject_class, PROP_TTL,
363       g_param_spec_int ("ttl", "Unicast TTL",
364           "Used for setting the unicast TTL parameter",
365           0, 255, DEFAULT_PROP_TTL,
366           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367
368   /**
369    * GstRtpSrc:ttl-mc:
370    *
371    * Set the multicast TTL parameter. In RTP this of importance for RTCP.
372    */
373   g_object_class_install_property (gobject_class, PROP_TTL_MC,
374       g_param_spec_int ("ttl-mc", "Multicast TTL",
375           "Used for setting the multicast TTL parameter", 0, 255,
376           DEFAULT_PROP_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
377
378   /**
379    * GstRtpSrc:encoding-name:
380    *
381    * Set the encoding name of the stream to use. This is a short-hand for
382    * the full caps and maps typically to the encoding-name in the RTP caps.
383    */
384   g_object_class_install_property (gobject_class, PROP_ENCODING_NAME,
385       g_param_spec_string ("encoding-name", "Caps encoding name",
386           "Encoding name use to determine caps parameters",
387           DEFAULT_PROP_ENCODING_NAME,
388           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389
390   /**
391    * GstRtpSrc:latency:
392    *
393    * Set the size of the latency buffer in the
394    * GstRtpBin/GstRtpJitterBuffer to compensate for network jitter.
395    */
396   g_object_class_install_property (gobject_class, PROP_LATENCY,
397       g_param_spec_uint ("latency", "Buffer latency in ms",
398           "Default amount of ms to buffer in the jitterbuffers", 0,
399           G_MAXUINT, DEFAULT_PROP_LATENCY,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401
402   /**
403    * GstRtpSink:multicast-iface:
404    *
405    * The networkinterface on which to join the multicast group
406    */
407   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
408       g_param_spec_string ("multicast-iface", "Multicast Interface",
409           "The network interface on which to join the multicast group."
410           "This allows multiple interfaces separated by comma. (\"eth0,eth1\")",
411           DEFAULT_PROP_MULTICAST_IFACE,
412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414   gst_element_class_add_pad_template (gstelement_class,
415       gst_static_pad_template_get (&src_template));
416
417   gst_element_class_set_static_metadata (gstelement_class,
418       "RTP Source element",
419       "Generic/Bin/Src",
420       "Simple RTP src", "Marc Leeman <marc.leeman@gmail.com>");
421 }
422
423 static void
424 gst_rtp_src_rtpbin_pad_added_cb (GstElement * element, GstPad * pad,
425     gpointer data)
426 {
427   GstRtpSrc *self = GST_RTP_SRC (data);
428   GstCaps *caps = gst_pad_query_caps (pad, NULL);
429   const GstStructure *s;
430   GstPad *upad = NULL;
431   gint pt = -1;
432   gchar name[48];
433
434   /* Expose RTP data pad only */
435   GST_INFO_OBJECT (self,
436       "Element %" GST_PTR_FORMAT " added pad %" GST_PTR_FORMAT "with caps %"
437       GST_PTR_FORMAT ".", element, pad, caps);
438
439   /* Sanity checks */
440   if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK) {
441     /* Sink pad, do not expose */
442     gst_caps_unref (caps);
443     return;
444   }
445
446   if (G_LIKELY (caps)) {
447     GstCaps *ref_caps = gst_caps_new_empty_simple ("application/x-rtcp");
448
449     if (gst_caps_can_intersect (caps, ref_caps)) {
450       /* SRC RTCP caps, do not expose */
451       gst_caps_unref (ref_caps);
452       gst_caps_unref (caps);
453
454       return;
455     }
456     gst_caps_unref (ref_caps);
457   } else {
458     GST_ERROR_OBJECT (self, "Pad with no caps detected.");
459     gst_caps_unref (caps);
460
461     return;
462   }
463
464   s = gst_caps_get_structure (caps, 0);
465   gst_structure_get_int (s, "payload", &pt);
466   gst_caps_unref (caps);
467
468   GST_RTP_SRC_LOCK (self);
469   g_snprintf (name, 48, "src_%u", pt);
470   upad = gst_element_get_static_pad (GST_ELEMENT (self), name);
471
472   if (!upad) {
473     GST_DEBUG_OBJECT (self, "Adding new pad: %s", name);
474
475     upad = gst_ghost_pad_new (name, pad);
476     gst_pad_set_active (upad, TRUE);
477     gst_element_add_pad (GST_ELEMENT (self), upad);
478   } else {
479     GST_DEBUG_OBJECT (self, "Re-using existing pad: %s", GST_PAD_NAME (upad));
480     gst_ghost_pad_set_target (GST_GHOST_PAD (upad), pad);
481     gst_object_unref (upad);
482   }
483   GST_RTP_SRC_UNLOCK (self);
484 }
485
486 static void
487 gst_rtp_src_rtpbin_pad_removed_cb (GstElement * element, GstPad * pad,
488     gpointer data)
489 {
490   GstRtpSrc *self = GST_RTP_SRC (data);
491   GST_INFO_OBJECT (self,
492       "Element %" GST_PTR_FORMAT " removed pad %" GST_PTR_FORMAT ".", element,
493       pad);
494 }
495
496 static void
497 gst_rtp_src_rtpbin_on_ssrc_collision_cb (GstElement * rtpbin, guint session_id,
498     guint ssrc, gpointer data)
499 {
500   GstRtpSrc *self = GST_RTP_SRC (data);
501
502   GST_INFO_OBJECT (self,
503       "Detected an SSRC collision: session-id 0x%x, ssrc 0x%x.", session_id,
504       ssrc);
505 }
506
507 static void
508 gst_rtp_src_rtpbin_on_new_ssrc_cb (GstElement * rtpbin, guint session_id,
509     guint ssrc, gpointer data)
510 {
511   GstRtpSrc *self = GST_RTP_SRC (data);
512
513   GST_INFO_OBJECT (self, "Detected a new SSRC: session-id 0x%x, ssrc 0x%x.",
514       session_id, ssrc);
515 }
516
517 static GstPadProbeReturn
518 gst_rtp_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
519     gpointer user_data)
520 {
521   GstRtpSrc *self = GST_RTP_SRC (user_data);
522   GstBuffer *buffer;
523   GstNetAddressMeta *meta;
524
525   if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
526     GstBufferList *buffer_list = info->data;
527     buffer = gst_buffer_list_get (buffer_list, 0);
528   } else {
529     buffer = info->data;
530   }
531
532   meta = gst_buffer_get_net_address_meta (buffer);
533
534   GST_OBJECT_LOCK (self);
535   g_clear_object (&self->rtcp_send_addr);
536   self->rtcp_send_addr = g_object_ref (meta->addr);
537   GST_OBJECT_UNLOCK (self);
538
539   return GST_PAD_PROBE_OK;
540 }
541
542 static inline void
543 gst_rtp_src_attach_net_address_meta (GstRtpSrc * self, GstBuffer * buffer)
544 {
545   GST_OBJECT_LOCK (self);
546   if (self->rtcp_send_addr)
547     gst_buffer_add_net_address_meta (buffer, self->rtcp_send_addr);
548   GST_OBJECT_UNLOCK (self);
549 }
550
551 static GstPadProbeReturn
552 gst_rtp_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
553     gpointer user_data)
554 {
555   GstRtpSrc *self = GST_RTP_SRC (user_data);
556
557   if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
558     GstBufferList *buffer_list = info->data;
559     GstBuffer *buffer;
560     gint i;
561
562     info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
563     for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
564       buffer = gst_buffer_list_get (buffer_list, i);
565       gst_rtp_src_attach_net_address_meta (self, buffer);
566     }
567   } else {
568     GstBuffer *buffer = info->data;
569     info->data = buffer = gst_buffer_make_writable (buffer);
570     gst_rtp_src_attach_net_address_meta (self, buffer);
571   }
572
573   return GST_PAD_PROBE_OK;
574 }
575
576 static gboolean
577 gst_rtp_src_start (GstRtpSrc * self)
578 {
579   GstPad *pad;
580   GSocket *socket;
581   GInetAddress *iaddr;
582   GstCaps *caps;
583   GError *error = NULL;
584
585   /* Should not be NULL */
586   g_return_val_if_fail (self->uri != NULL, FALSE);
587
588   /* share the socket created by the source */
589   g_object_get (G_OBJECT (self->rtcp_src), "used-socket", &socket, NULL);
590   if (!G_IS_SOCKET (socket)) {
591     GST_WARNING_OBJECT (self, "Could not retrieve RTCP src socket.");
592   }
593
594   iaddr = g_inet_address_new_from_string (gst_uri_get_host (self->uri));
595   if (!iaddr) {
596     GList *results;
597     GResolver *resolver = NULL;
598
599     resolver = g_resolver_get_default ();
600     results =
601         g_resolver_lookup_by_name (resolver, gst_uri_get_host (self->uri), NULL,
602         &error);
603
604     if (!results) {
605       g_object_unref (resolver);
606       goto dns_resolve_failed;
607     }
608
609     iaddr = G_INET_ADDRESS (g_object_ref (results->data));
610
611     g_resolver_free_addresses (results);
612     g_object_unref (resolver);
613   }
614
615   if (g_inet_address_get_is_multicast (iaddr)) {
616     /* mc-ttl is not supported by dynudpsink */
617     g_socket_set_multicast_ttl (socket, self->ttl_mc);
618     /* In multicast, send RTCP to the multicast group */
619     self->rtcp_send_addr =
620         g_inet_socket_address_new (iaddr, gst_uri_get_port (self->uri) + 1);
621
622     /* set multicast-iface on the udpsrc and udpsink elements */
623     g_object_set (self->rtcp_src, "multicast-iface", self->multi_iface, NULL);
624     g_object_set (self->rtcp_sink, "multicast-iface", self->multi_iface, NULL);
625     g_object_set (self->rtp_src, "multicast-iface", self->multi_iface, NULL);
626   } else {
627     /* In unicast, send RTCP to the detected sender address */
628     g_socket_set_ttl (socket, self->ttl);
629     pad = gst_element_get_static_pad (self->rtcp_src, "src");
630     self->rtcp_recv_probe = gst_pad_add_probe (pad,
631         GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
632         gst_rtp_src_on_recv_rtcp, self, NULL);
633     gst_object_unref (pad);
634   }
635   g_object_unref (iaddr);
636
637   /* no need to set address if unicast */
638   caps = gst_caps_new_empty_simple ("application/x-rtcp");
639   g_object_set (self->rtcp_src, "caps", caps, NULL);
640   gst_caps_unref (caps);
641
642   pad = gst_element_get_static_pad (self->rtcp_sink, "sink");
643   self->rtcp_send_probe = gst_pad_add_probe (pad,
644       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
645       gst_rtp_src_on_send_rtcp, self, NULL);
646   gst_object_unref (pad);
647
648   g_object_set (self->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
649   g_object_unref (socket);
650
651   gst_element_set_locked_state (self->rtcp_sink, FALSE);
652   gst_element_sync_state_with_parent (self->rtcp_sink);
653
654   return TRUE;
655
656 dns_resolve_failed:
657   GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
658       ("Could not resolve hostname '%s'", gst_uri_get_host (self->uri)),
659       ("DNS resolver reported: %s", error->message));
660   g_error_free (error);
661   return FALSE;
662 }
663
664 static void
665 gst_rtp_src_stop (GstRtpSrc * self)
666 {
667   GstPad *pad;
668
669   if (self->rtcp_recv_probe) {
670     pad = gst_element_get_static_pad (self->rtcp_src, "src");
671     gst_pad_remove_probe (pad, self->rtcp_recv_probe);
672     self->rtcp_recv_probe = 0;
673     gst_object_unref (pad);
674   }
675
676   pad = gst_element_get_static_pad (self->rtcp_sink, "sink");
677   gst_pad_remove_probe (pad, self->rtcp_send_probe);
678   self->rtcp_send_probe = 0;
679   gst_object_unref (pad);
680 }
681
682 static GstStateChangeReturn
683 gst_rtp_src_change_state (GstElement * element, GstStateChange transition)
684 {
685   GstRtpSrc *self = GST_RTP_SRC (element);
686   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
687
688   GST_DEBUG_OBJECT (self, "Changing state: %s => %s",
689       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
690       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
691
692   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
693   if (ret == GST_STATE_CHANGE_FAILURE)
694     return ret;
695
696   switch (transition) {
697     case GST_STATE_CHANGE_NULL_TO_READY:
698       if (gst_rtp_src_start (self) == FALSE)
699         return GST_STATE_CHANGE_FAILURE;
700       break;
701     case GST_STATE_CHANGE_READY_TO_PAUSED:
702       ret = GST_STATE_CHANGE_NO_PREROLL;
703       break;
704     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
705       ret = GST_STATE_CHANGE_NO_PREROLL;
706       break;
707     case GST_STATE_CHANGE_READY_TO_NULL:
708       gst_rtp_src_stop (self);
709       break;
710     default:
711       break;
712   }
713
714   return ret;
715 }
716
717 static void
718 gst_rtp_src_init (GstRtpSrc * self)
719 {
720   gchar name[48];
721   const gchar *missing_plugin = NULL;
722
723   self->rtpbin = NULL;
724   self->rtp_src = NULL;
725   self->rtcp_src = NULL;
726   self->rtcp_sink = NULL;
727   self->multi_iface = g_strdup (DEFAULT_PROP_MULTICAST_IFACE);
728
729   self->uri = gst_uri_from_string (DEFAULT_PROP_URI);
730   self->ttl = DEFAULT_PROP_TTL;
731   self->ttl_mc = DEFAULT_PROP_TTL_MC;
732   self->encoding_name = DEFAULT_PROP_ENCODING_NAME;
733
734   GST_OBJECT_FLAG_SET (GST_OBJECT (self), GST_ELEMENT_FLAG_SOURCE);
735   gst_bin_set_suppressed_flags (GST_BIN (self),
736       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
737
738   g_mutex_init (&self->lock);
739
740   /* Construct the RTP receiver pipeline.
741    *
742    * udpsrc -> [recv_rtp_sink_%u]  --------  [recv_rtp_src_%u_%u_%u]
743    *                              | rtpbin |
744    * udpsrc -> [recv_rtcp_sink_%u] --------  [send_rtcp_src_%u] -> udpsink
745    *
746    * This pipeline is fixed for now, note that optionally an FEC stream could
747    * be added later.
748    */
749
750   self->rtpbin = gst_element_factory_make ("rtpbin", "rtp_recv_rtpbin0");
751   if (self->rtpbin == NULL) {
752     missing_plugin = "rtpmanager";
753     goto missing_plugin;
754   }
755
756   gst_bin_add (GST_BIN (self), self->rtpbin);
757
758   /* Add rtpbin callbacks to monitor the operation of rtpbin */
759   g_signal_connect_object (self->rtpbin, "pad-added",
760       G_CALLBACK (gst_rtp_src_rtpbin_pad_added_cb), self, 0);
761   g_signal_connect_object (self->rtpbin, "pad-removed",
762       G_CALLBACK (gst_rtp_src_rtpbin_pad_removed_cb), self, 0);
763   g_signal_connect_object (self->rtpbin, "request-pt-map",
764       G_CALLBACK (gst_rtp_src_rtpbin_request_pt_map_cb), self, 0);
765   g_signal_connect_object (self->rtpbin, "on-new-ssrc",
766       G_CALLBACK (gst_rtp_src_rtpbin_on_new_ssrc_cb), self, 0);
767   g_signal_connect_object (self->rtpbin, "on-ssrc-collision",
768       G_CALLBACK (gst_rtp_src_rtpbin_on_ssrc_collision_cb), self, 0);
769
770   self->rtp_src = gst_element_factory_make ("udpsrc", "rtp_rtp_udpsrc0");
771   if (self->rtp_src == NULL) {
772     missing_plugin = "udp";
773     goto missing_plugin;
774   }
775
776   self->rtcp_src = gst_element_factory_make ("udpsrc", "rtp_rtcp_udpsrc0");
777   if (self->rtcp_src == NULL) {
778     missing_plugin = "udp";
779     goto missing_plugin;
780   }
781
782   self->rtcp_sink =
783       gst_element_factory_make ("dynudpsink", "rtp_rtcp_dynudpsink0");
784   if (self->rtcp_sink == NULL) {
785     missing_plugin = "udp";
786     goto missing_plugin;
787   }
788
789   /* Add elements as needed, since udpsrc/udpsink for RTCP share a socket,
790    * not all at the same moment */
791   gst_bin_add (GST_BIN (self), self->rtp_src);
792   gst_bin_add (GST_BIN (self), self->rtcp_src);
793   gst_bin_add (GST_BIN (self), self->rtcp_sink);
794
795   g_object_set (self->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
796   gst_element_set_locked_state (self->rtcp_sink, TRUE);
797
798   /* pads are all named */
799   g_snprintf (name, 48, "recv_rtp_sink_%u", GST_ELEMENT (self)->numpads);
800   gst_element_link_pads (self->rtp_src, "src", self->rtpbin, name);
801   g_snprintf (name, 48, "recv_rtcp_sink_%u", GST_ELEMENT (self)->numpads);
802   gst_element_link_pads (self->rtcp_src, "src", self->rtpbin, name);
803   g_snprintf (name, 48, "send_rtcp_src_%u", GST_ELEMENT (self)->numpads);
804   gst_element_link_pads (self->rtpbin, name, self->rtcp_sink, "sink");
805
806   if (missing_plugin == NULL)
807     return;
808
809 missing_plugin:
810   {
811     GST_ERROR_OBJECT (self, "'%s' plugin is missing.", missing_plugin);
812   }
813 }
814
815 static GstURIType
816 gst_rtp_src_uri_get_type (GType type)
817 {
818   return GST_URI_SRC;
819 }
820
821 static const gchar *const *
822 gst_rtp_src_uri_get_protocols (GType type)
823 {
824   static const gchar *protocols[] = { (char *) "rtp", NULL };
825
826   return protocols;
827 }
828
829 static gchar *
830 gst_rtp_src_uri_get_uri (GstURIHandler * handler)
831 {
832   GstRtpSrc *self = (GstRtpSrc *) handler;
833
834   return gst_uri_to_string (self->uri);
835 }
836
837 static gboolean
838 gst_rtp_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
839     GError ** error)
840 {
841   GstRtpSrc *self = (GstRtpSrc *) handler;
842
843   g_object_set (G_OBJECT (self), "uri", uri, NULL);
844
845   return TRUE;
846 }
847
848 static void
849 gst_rtp_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
850 {
851   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
852
853   iface->get_type = gst_rtp_src_uri_get_type;
854   iface->get_protocols = gst_rtp_src_uri_get_protocols;
855   iface->get_uri = gst_rtp_src_uri_get_uri;
856   iface->set_uri = gst_rtp_src_uri_set_uri;
857 }
858
859 /* ex: set tabstop=2 shiftwidth=2 expandtab: */