08e1657129dcfe480cbb2047c7bec8eb314762c0
[platform/upstream/gstreamer.git] / gst / rist / gstristsink.c
1 /* GStreamer RIST plugin
2  * Copyright (C) 2019 Net Insight AB
3  *     Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * SECTION:element-ristsink
23  * @title: ristsink
24  * @see_also: ristsrc
25  *
26  * This element implements RIST TR-06-1 Simple Profile transmitter. It
27  * currently supports any registered RTP static payload types such as
28  * MPEG TS. The stream passed to this element must be already RTP
29  * payloaded.  Even though RTP SSRC collision are rare in
30  * unidirectional streaming, this element expects the upstream elements
31  * to obey to collision events and change the SSRC in use. Collisions
32  * will occur when transmitting and receiving over multicast on the
33  * same host, and will be properly ignored.
34  *
35  * ## Example gst-launch line
36  * |[
37  * gst-launch-1.0 udpsrc ! tsparse set-timestamps=1 smoothing-latency=40000 ! \
38  * rtpmp2tpay ! ristsink address=10.0.0.1 port=5004
39  * ]|
40  *
41  * Additionally, this element supports bonding, which consist of using
42  * multiple links in order to transmit the streams. The address of
43  * each link is configured through the "bonding-addresses"
44  * property. When set, this will replace the value that might have
45  * been set on the "address" and "port" properties. Each link will be
46  * mapped to its own RTP session. RTX request are only replied to on the
47  * link the NACK was received from.
48  *
49  * There are currently two bonding methods in place: "broadcast" and "round-robin".
50  * In "broadcast" mode, all the packets are duplicated over all sessions.
51  * While in "round-robin" mode, packets are evenly distributed over the links. One
52  * can also implement its own dispatcher element and configure it using the
53  * "dispatcher" property. As a reference, "broadcast" mode is implemented with
54  * the "tee" element, while "round-robin" mode is implemented with the
55  * "round-robin" element.
56  *
57  * ## Example gst-launch line for bonding
58  * |[
59  * gst-launch-1.0 udpsrc ! tsparse set-timestamps=1 smoothing-latency=40000 ! \
60  *  rtpmp2tpay ! ristsink bonding-addresses="10.0.0.1:5004,11.0.0.1:5006"
61  * ]|
62  */
63
64 /* using GValueArray, which has not replacement */
65 #define GLIB_DISABLE_DEPRECATION_WARNINGS
66
67 #ifdef HAVE_CONFIG_H
68 #include "config.h"
69 #endif
70
71 #include <gio/gio.h>
72 #include <gst/rtp/rtp.h>
73
74 /* for strtol() */
75 #include <stdlib.h>
76
77 #include "gstrist.h"
78
79 GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug);
80 #define GST_CAT_DEFAULT gst_rist_sink_debug
81
82 enum
83 {
84   PROP_ADDRESS = 1,
85   PROP_PORT,
86   PROP_SENDER_BUFFER,
87   PROP_MIN_RTCP_INTERVAL,
88   PROP_MAX_RTCP_BANDWIDTH,
89   PROP_STATS_UPDATE_INTERVAL,
90   PROP_STATS,
91   PROP_CNAME,
92   PROP_MULTICAST_LOOPBACK,
93   PROP_MULTICAST_IFACE,
94   PROP_MULTICAST_TTL,
95   PROP_BONDING_ADDRESSES,
96   PROP_BONDING_METHOD,
97   PROP_DISPATCHER
98 };
99
100 typedef enum
101 {
102   GST_RIST_BONDING_METHOD_BROADCAST,
103   GST_RIST_BONDING_METHOD_ROUND_ROBIN,
104 } GstRistBondingMethod;
105
106 static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
107     GST_PAD_SINK,
108     GST_PAD_ALWAYS,
109     GST_STATIC_CAPS ("application/x-rtp"));
110
111 typedef struct
112 {
113   guint session;
114   gchar *address;
115   gchar *multicast_iface;
116   guint port;
117   GstElement *rtcp_src;
118   GstElement *rtp_sink;
119   GstElement *rtcp_sink;
120   GstElement *rtx_send;
121   GstElement *rtx_queue;
122   guint32 rtcp_ssrc;
123 } RistSenderBond;
124
125 struct _GstRistSink
126 {
127   GstBin parent;
128
129   /* Common elements in the pipeline */
130   GstElement *rtpbin;
131   GstElement *ssrc_filter;
132   GstPad *sinkpad;
133   GstElement *rtxbin;
134   GstElement *dispatcher;
135
136   /* Common properties, protected by bonds_lock */
137   gint multicast_ttl;
138   gboolean multicast_loopback;
139   GstClockTime min_rtcp_interval;
140   gdouble max_rtcp_bandwidth;
141   GstRistBondingMethod bonding_method;
142
143   /* Bonds */
144   GPtrArray *bonds;
145   /* this is needed as setting sibling properties will try to take the object
146    * lock. Thus, any properties that affects the bonds will be protected with
147    * that lock instead of the object lock. */
148   GMutex bonds_lock;
149
150   /* For stats */
151   guint stats_interval;
152   guint32 rtp_ssrc;
153   GstClockID stats_cid;
154
155   /* This is set whenever there is a pipeline construction failure, and used
156    * to fail state changes later */
157   gboolean construct_failed;
158   const gchar *missing_plugin;
159 };
160
161 static GType
162 gst_rist_bonding_method_get_type (void)
163 {
164   static gsize id = 0;
165   static const GEnumValue values[] = {
166     {GST_RIST_BONDING_METHOD_BROADCAST,
167         "GST_RIST_BONDING_METHOD_BROADCAST", "broadcast"},
168     {GST_RIST_BONDING_METHOD_ROUND_ROBIN,
169         "GST_RIST_BONDING_METHOD_ROUND_ROBIN", "round-robin"},
170     {0, NULL, NULL}
171   };
172
173   if (g_once_init_enter (&id)) {
174     GType tmp = g_enum_register_static ("GstRistBondingMethodType", values);
175     g_once_init_leave (&id, tmp);
176   }
177
178   return (GType) id;
179 }
180
181 G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN,
182     GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink"));
183
184 static RistSenderBond *
185 gst_rist_sink_add_bond (GstRistSink * sink)
186 {
187   RistSenderBond *bond = g_slice_new0 (RistSenderBond);
188   GstPad *pad, *gpad;
189   gchar name[32];
190
191   bond->session = sink->bonds->len;
192   bond->address = g_strdup ("localhost");
193
194   g_snprintf (name, 32, "rist_rtp_udpsink%u", bond->session);
195   bond->rtp_sink = gst_element_factory_make ("udpsink", name);
196   if (!bond->rtp_sink) {
197     g_slice_free (RistSenderBond, bond);
198     sink->missing_plugin = "udp";
199     return NULL;
200   }
201
202   /* these are all from UDP plugin, so they cannot fail */
203   g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
204   bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
205   g_snprintf (name, 32, "rist_rtcp_udpsink%u", bond->session);
206   bond->rtcp_sink = gst_element_factory_make ("udpsink", name);
207   g_object_set (bond->rtcp_sink, "async", FALSE, NULL);
208
209   gst_bin_add_many (GST_BIN (sink), bond->rtp_sink, bond->rtcp_src,
210       bond->rtcp_sink, NULL);
211   gst_element_set_locked_state (bond->rtcp_src, TRUE);
212   gst_element_set_locked_state (bond->rtcp_sink, TRUE);
213
214   g_snprintf (name, 32, "rist_rtx_queue%u", bond->session);
215   bond->rtx_queue = gst_element_factory_make ("queue", name);
216   gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_queue);
217
218   g_snprintf (name, 32, "rist_rtx_send%u", bond->session);
219   bond->rtx_send = gst_element_factory_make ("ristrtxsend", name);
220   if (!bond->rtx_send) {
221     sink->missing_plugin = "rtpmanager";
222     g_slice_free (RistSenderBond, bond);
223     return NULL;
224   }
225   gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_send);
226
227   gst_element_link (bond->rtx_queue, bond->rtx_send);
228
229   pad = gst_element_get_static_pad (bond->rtx_send, "src");
230   g_snprintf (name, 32, "src_%u", bond->session);
231   gpad = gst_ghost_pad_new (name, pad);
232   gst_object_unref (pad);
233   gst_element_add_pad (sink->rtxbin, gpad);
234
235   g_object_set (bond->rtx_send, "max-size-packets", 0, NULL);
236
237   g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
238   if (bond->session == 0) {
239     gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, name);
240   } else {
241     GstPad *pad;
242
243     /* to make a sender, we need to create an unused pad on rtpbin, which will
244      * require an unused pad on the rtxbin */
245     g_snprintf (name, 32, "sink_%u", bond->session);
246     pad = gst_ghost_pad_new_no_target (name, GST_PAD_SINK);
247     gst_element_add_pad (sink->rtxbin, pad);
248
249     g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
250     pad = gst_element_get_request_pad (sink->rtpbin, name);
251     gst_object_unref (pad);
252   }
253
254   g_snprintf (name, 32, "send_rtp_src_%u", bond->session);
255   gst_element_link_pads (sink->rtpbin, name, bond->rtp_sink, "sink");
256
257   g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
258   gst_element_link_pads (bond->rtcp_src, "src", sink->rtpbin, name);
259
260   g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
261   gst_element_link_pads (sink->rtpbin, name, bond->rtcp_sink, "sink");
262
263   g_ptr_array_add (sink->bonds, bond);
264   return bond;
265 }
266
267 static GstCaps *
268 gst_rist_sink_request_pt_map (GstRistSrc * sink, guint session_id, guint pt,
269     GstElement * rtpbin)
270 {
271   const GstRTPPayloadInfo *pt_info;
272   GstCaps *ret;
273
274   pt_info = gst_rtp_payload_info_for_pt (pt);
275   if (!pt_info || !pt_info->clock_rate)
276     return NULL;
277
278   ret = gst_caps_new_simple ("application/x-rtp",
279       "media", G_TYPE_STRING, pt_info->media,
280       "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
281       "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
282
283   /* FIXME add sprop-parameter-set if any */
284   g_warn_if_fail (pt_info->encoding_parameters == NULL);
285
286   return ret;
287 }
288
289 static GstElement *
290 gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id,
291     GstElement * rtpbin)
292 {
293   return gst_object_ref (sink->rtxbin);
294 }
295
296 static void
297 on_app_rtcp (GObject * session, guint32 subtype, guint32 ssrc,
298     const gchar * name, GstBuffer * data, GstElement * rtpsession)
299 {
300   if (g_str_equal (name, "RIST")) {
301     if (subtype == 0) {
302       GstEvent *event;
303       GstPad *send_rtp_sink;
304       GstMapInfo map;
305       gint i;
306
307       send_rtp_sink = gst_element_get_static_pad (rtpsession, "send_rtp_sink");
308       if (send_rtp_sink) {
309         gst_buffer_map (data, &map, GST_MAP_READ);
310
311         for (i = 0; i < map.size; i += sizeof (guint32)) {
312           guint32 dword = GST_READ_UINT32_BE (map.data + i);
313           guint16 seqnum = dword >> 16;
314           guint16 num = dword & 0x0000FFFF;
315           guint16 j;
316
317           GST_DEBUG ("got RIST nack packet, #%u %u", seqnum, num);
318
319           /* num is inclusive, i.e. it can be 0, which means exactly 1 seqnum */
320           for (j = 0; j <= num; j++) {
321             event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
322                 gst_structure_new ("GstRTPRetransmissionRequest",
323                     "seqnum", G_TYPE_UINT, (guint) seqnum + j,
324                     "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
325             gst_pad_push_event (send_rtp_sink, event);
326           }
327         }
328
329         gst_buffer_unmap (data, &map);
330         gst_object_unref (send_rtp_sink);
331       }
332     }
333   }
334 }
335
336 static void
337 gst_rist_sink_on_new_sender_ssrc (GstRistSink * sink, guint session_id,
338     guint ssrc, GstElement * rtpbin)
339 {
340   GObject *gstsession = NULL;
341   GObject *session = NULL;
342   GObject *source = NULL;
343
344   if (session_id != 0)
345     return;
346
347   g_signal_emit_by_name (rtpbin, "get-session", session_id, &gstsession);
348   g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
349   g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
350
351   if (ssrc & 1)
352     g_object_set (source, "disable-rtcp", TRUE, NULL);
353   else
354     g_signal_connect (session, "on-app-rtcp", (GCallback) on_app_rtcp,
355         gstsession);
356
357   g_object_unref (source);
358   g_object_unref (session);
359 }
360
361 static void
362 gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id,
363     guint ssrc, GstElement * rtpbin)
364 {
365   RistSenderBond *bond;
366
367   if (session_id != 0)
368     return;
369
370   GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc);
371   bond = g_ptr_array_index (sink->bonds, session_id);
372   bond->rtcp_ssrc = ssrc;
373 }
374
375 static GstPadProbeReturn
376 gst_rist_sink_fix_collision (GstPad * pad, GstPadProbeInfo * info,
377     gpointer user_data)
378 {
379   GstEvent *event = info->data;
380   const GstStructure *cs;
381   GstStructure *s;
382   guint ssrc;
383
384   /* We simply ignore collisions */
385   if (GST_EVENT_TYPE (event) != GST_EVENT_CUSTOM_UPSTREAM)
386     return GST_PAD_PROBE_OK;
387
388   cs = gst_event_get_structure (event);
389   if (!gst_structure_has_name (cs, "GstRTPCollision"))
390     return GST_PAD_PROBE_OK;
391
392   gst_structure_get_uint (cs, "suggested-ssrc", &ssrc);
393   if ((ssrc & 1) == 0)
394     return GST_PAD_PROBE_OK;
395
396   event = info->data = gst_event_make_writable (event);
397   /* we can drop the const qualifier as we ensured writability */
398   s = (GstStructure *) gst_event_get_structure (event);
399   gst_structure_set (s, "suggested-ssrc", G_TYPE_UINT, ssrc - 1, NULL);
400
401   return GST_PAD_PROBE_OK;
402 }
403
404 static gboolean
405 gst_rist_sink_set_caps (GstRistSink * sink, GstCaps * caps)
406 {
407   const GstStructure *s = gst_caps_get_structure (caps, 0);
408
409   if (!gst_structure_get_uint (s, "ssrc", &sink->rtp_ssrc)) {
410     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, ("No 'ssrc' field in caps."),
411         (NULL));
412     return FALSE;
413   }
414
415   if (sink->rtp_ssrc & 1) {
416     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION,
417         ("Invalid RIST SSRC, LSB must be zero."), (NULL));
418     return FALSE;
419   }
420
421   return TRUE;
422 }
423
424 static gboolean
425 gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
426 {
427   GstRistSink *sink = GST_RIST_SINK (parent);
428   GstCaps *caps;
429   gboolean ret = TRUE;
430
431   switch (GST_EVENT_TYPE (event)) {
432     case GST_EVENT_CAPS:
433       gst_event_parse_caps (event, &caps);
434       ret = gst_rist_sink_set_caps (sink, caps);
435       break;
436     default:
437       break;
438   }
439
440   if (ret)
441     ret = gst_pad_event_default (pad, parent, event);
442   else
443     gst_event_unref (event);
444
445   return ret;
446 }
447
448 static void
449 gst_rist_sink_init (GstRistSink * sink)
450 {
451   GstPad *ssrc_filter_sinkpad, *rtxbin_gpad;
452   GstCaps *ssrc_caps;
453   GstStructure *sdes = NULL;
454   RistSenderBond *bond;
455
456   g_mutex_init (&sink->bonds_lock);
457   sink->bonds = g_ptr_array_new ();
458
459   /* Construct the RIST RTP sender pipeline.
460    *
461    * capsfilter*-> [send_rtp_sink_%u]   --------  [send_rtp_src_%u]  -> udpsink
462    *                                   | rtpbin |
463    * udpsrc     -> [recv_rtcp_sink_%u]  --------  [send_rtcp_src_%u] -> * udpsink
464    *
465    * * To select RIST compatible SSRC
466    */
467   sink->rtpbin = gst_element_factory_make ("rtpbin", "rist_send_rtpbin");
468   if (!sink->rtpbin) {
469     sink->missing_plugin = "rtpmanager";
470     goto missing_plugin;
471   }
472
473   /* RIST specification says the SDES should only contain the CNAME */
474   g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
475   gst_structure_remove_field (sdes, "tool");
476
477   gst_bin_add (GST_BIN (sink), sink->rtpbin);
478   g_object_set (sink->rtpbin, "do-retransmission", TRUE,
479       "rtp-profile", 3 /* AVFP */ ,
480       "sdes", sdes, NULL);
481   gst_structure_free (sdes);
482
483   g_signal_connect_swapped (sink->rtpbin, "request-pt-map",
484       G_CALLBACK (gst_rist_sink_request_pt_map), sink);
485   g_signal_connect_swapped (sink->rtpbin, "request-aux-sender",
486       G_CALLBACK (gst_rist_sink_request_aux_sender), sink);
487   g_signal_connect_swapped (sink->rtpbin, "on-new-sender-ssrc",
488       G_CALLBACK (gst_rist_sink_on_new_sender_ssrc), sink);
489   g_signal_connect_swapped (sink->rtpbin, "on-new-ssrc",
490       G_CALLBACK (gst_rist_sink_on_new_receiver_ssrc), sink);
491
492   sink->rtxbin = gst_bin_new ("rist_send_rtxbin");
493   g_object_ref_sink (sink->rtxbin);
494
495   rtxbin_gpad = gst_ghost_pad_new_no_target ("sink_0", GST_PAD_SINK);
496   gst_element_add_pad (sink->rtxbin, rtxbin_gpad);
497
498   sink->ssrc_filter = gst_element_factory_make ("capsfilter",
499       "rist_ssrc_filter");
500   gst_bin_add (GST_BIN (sink), sink->ssrc_filter);
501
502   /* RIST RTP SSRC should have LSB set to 0 */
503   sink->rtp_ssrc = g_random_int () & ~1;
504   ssrc_caps = gst_caps_new_simple ("application/x-rtp",
505       "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL);
506   gst_caps_append_structure (ssrc_caps,
507       gst_structure_new_empty ("application/x-rtp"));
508   g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL);
509   gst_caps_unref (ssrc_caps);
510
511   ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink");
512   sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad,
513       gst_static_pad_template_get (&sink_templ));
514   gst_pad_set_event_function (sink->sinkpad, gst_rist_sink_event);
515   gst_element_add_pad (GST_ELEMENT (sink), sink->sinkpad);
516   gst_object_unref (ssrc_filter_sinkpad);
517
518   gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
519       gst_rist_sink_fix_collision, sink, NULL);
520
521   bond = gst_rist_sink_add_bond (sink);
522   if (!bond)
523     goto missing_plugin;
524
525   return;
526
527 missing_plugin:
528   {
529     GST_ERROR_OBJECT (sink, "'%s' plugin is missing.", sink->missing_plugin);
530     sink->construct_failed = TRUE;
531     /* Just make our element valid, so we fail cleanly */
532     gst_element_add_pad (GST_ELEMENT (sink),
533         gst_pad_new_from_static_template (&sink_templ, "sink"));
534   }
535 }
536
537 static gboolean
538 gst_rist_sink_setup_rtcp_socket (GstRistSink * sink, RistSenderBond * bond)
539 {
540   GSocket *socket = NULL;
541   GInetAddress *iaddr = NULL;
542   gchar *remote_addr = NULL;
543   guint port = bond->port + 1;
544   GError *error = NULL;
545
546   iaddr = g_inet_address_new_from_string (bond->address);
547   if (!iaddr) {
548     GList *results;
549     GResolver *resolver = NULL;
550
551     resolver = g_resolver_get_default ();
552     results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);
553
554     if (!results) {
555       g_object_unref (resolver);
556       goto dns_resolve_failed;
557     }
558
559     iaddr = G_INET_ADDRESS (g_object_ref (results->data));
560
561     g_resolver_free_addresses (results);
562     g_object_unref (resolver);
563   }
564   remote_addr = g_inet_address_to_string (iaddr);
565
566   if (g_inet_address_get_is_multicast (iaddr)) {
567     g_object_set (bond->rtcp_src, "address", remote_addr, "port", port, NULL);
568   } else {
569     const gchar *any_addr;
570
571     if (g_inet_address_get_family (iaddr) == G_SOCKET_FAMILY_IPV6)
572       any_addr = "::";
573     else
574       any_addr = "0.0.0.0";
575
576     g_object_set (bond->rtcp_src, "address", any_addr, "port", 0, NULL);
577   }
578   g_object_unref (iaddr);
579
580   gst_element_set_locked_state (bond->rtcp_src, FALSE);
581   gst_element_sync_state_with_parent (bond->rtcp_src);
582
583   /* share the socket created by the sink */
584   g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
585   g_object_set (bond->rtcp_sink, "socket", socket, "auto-multicast", FALSE,
586       "close-socket", FALSE, NULL);
587   g_object_unref (socket);
588
589   gst_element_set_locked_state (bond->rtcp_sink, FALSE);
590   gst_element_sync_state_with_parent (bond->rtcp_sink);
591
592   return GST_STATE_CHANGE_SUCCESS;
593
594 dns_resolve_failed:
595   GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
596       ("Could not resolve hostname '%s'", GST_STR_NULL (remote_addr)),
597       ("DNS resolver reported: %s", error->message));
598   g_free (remote_addr);
599   g_error_free (error);
600   return GST_STATE_CHANGE_FAILURE;
601 }
602
603 static GstStateChangeReturn
604 gst_rist_sink_start (GstRistSink * sink)
605 {
606   GstPad *dispatcher_sinkpad, *rtxbin_gpad;
607   gint i;
608
609   /* Unless a custom dispatcher was provided, use the specified bonding method
610    * to create one */
611   if (!sink->dispatcher) {
612     switch (sink->bonding_method) {
613       case GST_RIST_BONDING_METHOD_BROADCAST:
614         sink->dispatcher = gst_element_factory_make ("tee", "rist_dispatcher");
615         if (!sink->dispatcher) {
616           sink->missing_plugin = "coreelements";
617           sink->construct_failed = TRUE;
618         }
619         break;
620       case GST_RIST_BONDING_METHOD_ROUND_ROBIN:
621         sink->dispatcher = gst_element_factory_make ("roundrobin",
622             "rist_dispatcher");
623         g_assert (sink->dispatcher);
624         break;
625     }
626   }
627
628   if (sink->construct_failed) {
629     GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
630         ("Your GStreamer installation is missing plugin '%s'",
631             sink->missing_plugin), (NULL));
632     return GST_STATE_CHANGE_FAILURE;
633   }
634
635   gst_bin_add (GST_BIN (sink->rtxbin), sink->dispatcher);
636   dispatcher_sinkpad = gst_element_get_static_pad (sink->dispatcher, "sink");
637   rtxbin_gpad = gst_element_get_static_pad (sink->rtxbin, "sink_0");
638   gst_ghost_pad_set_target (GST_GHOST_PAD (rtxbin_gpad), dispatcher_sinkpad);
639   gst_object_unref (dispatcher_sinkpad);
640   gst_object_unref (rtxbin_gpad);
641
642   for (i = 0; i < sink->bonds->len; i++) {
643     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
644     GObject *session = NULL;
645     GstPad *pad;
646     gchar name[32];
647
648     g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session);
649     g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval,
650         "rtcp-fraction", sink->max_rtcp_bandwidth, NULL);
651     g_object_unref (session);
652
653     g_snprintf (name, 32, "src_%u", bond->session);
654     pad = gst_element_get_request_pad (sink->dispatcher, name);
655     gst_element_link_pads (sink->dispatcher, name, bond->rtx_queue, "sink");
656     gst_object_unref (pad);
657
658     if (!gst_rist_sink_setup_rtcp_socket (sink, bond))
659       return GST_STATE_CHANGE_FAILURE;
660   }
661
662   return GST_STATE_CHANGE_SUCCESS;
663 }
664
665
666 static GstStructure *
667 gst_rist_sink_create_stats (GstRistSink * sink)
668 {
669   RistSenderBond *bond;
670   GstStructure *ret;
671   GValueArray *session_stats;
672   guint64 total_pkt_sent = 0, total_rtx_sent = 0;
673   gint i;
674
675   ret = gst_structure_new_empty ("rist/x-sender-stats");
676   session_stats = g_value_array_new (sink->bonds->len);
677
678   for (i = 0; i < sink->bonds->len; i++) {
679     GObject *session = NULL, *source = NULL;
680     GstStructure *sstats = NULL, *stats;
681     guint64 pkt_sent = 0, rtx_sent = 0, rtt;
682     guint rb_rtt = 0;
683     GValue value = G_VALUE_INIT;
684
685     g_signal_emit_by_name (sink->rtpbin, "get-internal-session", i, &session);
686     if (!session)
687       continue;
688
689     stats = gst_structure_new_empty ("rist/x-sender-session-stats");
690     bond = g_ptr_array_index (sink->bonds, i);
691
692     g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtp_ssrc,
693         &source);
694     if (source) {
695       g_object_get (source, "stats", &sstats, NULL);
696       gst_structure_get_uint64 (sstats, "packets-sent", &pkt_sent);
697       gst_structure_free (sstats);
698       g_clear_object (&source);
699     }
700
701     g_signal_emit_by_name (session, "get-source-by-ssrc", bond->rtcp_ssrc,
702         &source);
703     if (source) {
704       g_object_get (source, "stats", &sstats, NULL);
705       gst_structure_get_uint (sstats, "rb-round-trip", &rb_rtt);
706       gst_structure_free (sstats);
707       g_clear_object (&source);
708     }
709     g_object_unref (session);
710
711     g_object_get (bond->rtx_send, "num-rtx-packets", &rtx_sent, NULL);
712
713     /* rb_rtt is in Q16 in NTP time */
714     rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536);
715
716     gst_structure_set (stats, "session-id", G_TYPE_INT, i,
717         "sent-original-packets", G_TYPE_UINT64, pkt_sent,
718         "sent-retransmitted-packets", G_TYPE_UINT64, rtx_sent,
719         "round-trip-time", G_TYPE_UINT64, rtt, NULL);
720
721     g_value_init (&value, GST_TYPE_STRUCTURE);
722     g_value_take_boxed (&value, stats);
723     g_value_array_append (session_stats, &value);
724     g_value_unset (&value);
725
726     total_pkt_sent += pkt_sent;
727     total_rtx_sent += rtx_sent;
728   }
729
730   gst_structure_set (ret,
731       "sent-original-packets", G_TYPE_UINT64, total_pkt_sent,
732       "sent-retransmitted-packets", G_TYPE_UINT64, total_rtx_sent,
733       "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL);
734   g_value_array_free (session_stats);
735
736   return ret;
737 }
738
739 static gboolean
740 gst_rist_sink_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
741     gpointer user_data)
742 {
743   GstRistSink *sink = GST_RIST_SINK (user_data);
744   GstStructure *stats = gst_rist_sink_create_stats (sink);
745
746   gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (sink), stats);
747
748   gst_structure_free (stats);
749   return TRUE;
750 }
751
752 static void
753 gst_rist_sink_enable_stats_interval (GstRistSink * sink)
754 {
755   GstClock *clock;
756   GstClockTime start, interval;
757
758   if (sink->stats_interval == 0)
759     return;
760
761   interval = sink->stats_interval * GST_MSECOND;
762   clock = gst_system_clock_obtain ();
763   start = gst_clock_get_time (clock) + interval;
764
765   sink->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
766   gst_clock_id_wait_async (sink->stats_cid, gst_rist_sink_dump_stats,
767       gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
768
769   gst_object_unref (clock);
770 }
771
772 static void
773 gst_rist_sink_disable_stats_interval (GstRistSink * sink)
774 {
775   if (sink->stats_cid) {
776     gst_clock_id_unschedule (sink->stats_cid);
777     gst_clock_id_unref (sink->stats_cid);
778     sink->stats_cid = NULL;
779   }
780 }
781
782 static GstStateChangeReturn
783 gst_rist_sink_change_state (GstElement * element, GstStateChange transition)
784 {
785   GstRistSink *sink = GST_RIST_SINK (element);
786   GstStateChangeReturn ret;
787
788   switch (transition) {
789     case GST_STATE_CHANGE_PAUSED_TO_READY:
790       gst_rist_sink_disable_stats_interval (sink);
791       break;
792     default:
793       break;
794   }
795
796   ret = GST_ELEMENT_CLASS (gst_rist_sink_parent_class)->change_state (element,
797       transition);
798
799   switch (transition) {
800     case GST_STATE_CHANGE_NULL_TO_READY:
801       ret = gst_rist_sink_start (sink);
802       break;
803     case GST_STATE_CHANGE_READY_TO_PAUSED:
804       gst_rist_sink_enable_stats_interval (sink);
805       break;
806     default:
807       break;
808   }
809
810   return ret;
811 }
812
813 /* called with bonds lock */
814 static void
815 gst_rist_sink_update_bond_address (GstRistSink * sink, RistSenderBond * bond,
816     const gchar * address, guint port, const gchar * multicast_iface)
817 {
818   g_free (bond->address);
819   g_free (bond->multicast_iface);
820   bond->address = g_strdup (address);
821   bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
822   bond->port = port;
823
824   g_object_set (G_OBJECT (bond->rtp_sink), "host", address, "port", port,
825       "multicast-iface", bond->multicast_iface, NULL);
826   g_object_set (G_OBJECT (bond->rtcp_sink), "host", address,
827       "port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
828
829   /* TODO add runtime support
830    *  - add blocking the pad probe
831    *  - update RTCP socket
832    *  - cycle elements through NULL state
833    */
834 }
835
836 /* called with bonds lock */
837 static gchar *
838 gst_rist_sink_get_bonds (GstRistSink * sink)
839 {
840   GString *bonds = g_string_new ("");
841   gint i;
842
843   for (i = 0; i < sink->bonds->len; i++) {
844     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
845     if (bonds->len > 0)
846       g_string_append_c (bonds, ':');
847
848     g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
849
850     if (bond->multicast_iface)
851       g_string_append_printf (bonds, "/%s", bond->multicast_iface);
852   }
853
854   return g_string_free (bonds, FALSE);
855 }
856
857 struct RistAddress
858 {
859   gchar *address;
860   char *multicast_iface;
861   guint port;
862 };
863
864 /* called with bonds lock */
865 static void
866 gst_rist_sink_set_bonds (GstRistSink * sink, const gchar * bonds)
867 {
868   GStrv tokens = NULL;
869   struct RistAddress *addrs;
870   gint i;
871
872   if (bonds == NULL)
873     goto missing_address;
874
875   tokens = g_strsplit (bonds, ",", 0);
876   if (tokens[0] == NULL)
877     goto missing_address;
878
879   addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
880
881   /* parse the address list */
882   for (i = 0; tokens[i]; i++) {
883     gchar *address = tokens[i];
884     char *port_ptr, *iface_ptr, *endptr;
885     guint port;
886
887     port_ptr = g_utf8_strrchr (address, -1, ':');
888     iface_ptr = g_utf8_strrchr (address, -1, '/');
889
890     if (!port_ptr)
891       goto bad_parameter;
892     if (!g_ascii_isdigit (port_ptr[1]))
893       goto bad_parameter;
894
895     if (iface_ptr) {
896       if (iface_ptr < port_ptr)
897         goto bad_parameter;
898       iface_ptr[0] = '\0';
899     }
900
901     port = strtol (port_ptr + 1, &endptr, 0);
902     if (endptr[0] != '\0')
903       goto bad_parameter;
904
905     /* port must be a multiple of 2 between 2 and 65534 */
906     if (port < 2 || (port & 1) || port > G_MAXUINT16)
907       goto invalid_port;
908
909     port_ptr[0] = '\0';
910     addrs[i].port = port;
911     addrs[i].address = g_strstrip (address);
912     if (iface_ptr)
913       addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
914   }
915
916   /* configure the bonds */
917   for (i = 0; tokens[i]; i++) {
918     RistSenderBond *bond = NULL;
919
920     if (i < sink->bonds->len)
921       bond = g_ptr_array_index (sink->bonds, i);
922     else
923       bond = gst_rist_sink_add_bond (sink);
924
925     gst_rist_sink_update_bond_address (sink, bond, addrs[i].address,
926         addrs[i].port, addrs[i].multicast_iface);
927   }
928
929   g_strfreev (tokens);
930   return;
931
932 missing_address:
933   g_warning ("'bonding-addresses' cannot be empty");
934   g_strfreev (tokens);
935   return;
936
937 bad_parameter:
938   g_warning ("Failed to parse address '%s", tokens[i]);
939   g_strfreev (tokens);
940   g_free (addrs);
941   return;
942
943 invalid_port:
944   g_warning ("RIST port must valid UDP port and a multiple of 2.");
945   g_strfreev (tokens);
946   g_free (addrs);
947   return;
948 }
949
950 static void
951 gst_rist_sink_set_multicast_loopback (GstRistSink * sink, gboolean loop)
952 {
953   gint i;
954
955   sink->multicast_loopback = loop;
956   for (i = 0; i < sink->bonds->len; i++) {
957     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
958     g_object_set (G_OBJECT (bond->rtp_sink), "loop", loop, NULL);
959     g_object_set (G_OBJECT (bond->rtcp_sink), "loop", loop, NULL);
960   }
961 }
962
963 /* called with bonds lock */
964 static void
965 gst_rist_sink_set_multicast_ttl (GstRistSink * sink, gint ttl)
966 {
967   gint i;
968
969   sink->multicast_ttl = ttl;
970   for (i = 0; i < sink->bonds->len; i++) {
971     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
972     g_object_set (G_OBJECT (bond->rtp_sink), "ttl-mc", ttl, NULL);
973     g_object_set (G_OBJECT (bond->rtcp_sink), "ttl-mc", ttl, NULL);
974   }
975 }
976
977 static void
978 gst_rist_sink_get_property (GObject * object, guint prop_id,
979     GValue * value, GParamSpec * pspec)
980 {
981   GstRistSink *sink = GST_RIST_SINK (object);
982   GstStructure *sdes;
983   RistSenderBond *bond;
984
985   if (sink->construct_failed)
986     return;
987
988   g_mutex_lock (&sink->bonds_lock);
989
990   bond = g_ptr_array_index (sink->bonds, 0);
991
992   switch (prop_id) {
993     case PROP_ADDRESS:
994       g_value_set_string (value, bond->address);
995       break;
996
997     case PROP_PORT:
998       g_value_set_uint (value, bond->port);
999       break;
1000
1001     case PROP_SENDER_BUFFER:
1002       g_object_get_property (G_OBJECT (bond->rtx_send), "max-size-time", value);
1003       break;
1004
1005     case PROP_MIN_RTCP_INTERVAL:
1006       g_value_set_uint (value, (guint) (sink->min_rtcp_interval / GST_MSECOND));
1007       break;
1008
1009     case PROP_MAX_RTCP_BANDWIDTH:
1010       g_value_set_double (value, sink->max_rtcp_bandwidth);
1011       break;
1012
1013     case PROP_STATS_UPDATE_INTERVAL:
1014       g_value_set_uint (value, sink->stats_interval);
1015       break;
1016
1017     case PROP_STATS:
1018       g_value_take_boxed (value, gst_rist_sink_create_stats (sink));
1019       break;
1020
1021     case PROP_CNAME:
1022       g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
1023       g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
1024       gst_structure_free (sdes);
1025       break;
1026
1027     case PROP_MULTICAST_LOOPBACK:
1028       g_value_set_boolean (value, sink->multicast_loopback);
1029       break;
1030
1031     case PROP_MULTICAST_IFACE:
1032       g_value_set_string (value, bond->multicast_iface);
1033       break;
1034
1035     case PROP_MULTICAST_TTL:
1036       g_value_set_int (value, sink->multicast_ttl);
1037       break;
1038
1039     case PROP_BONDING_ADDRESSES:
1040       g_value_take_string (value, gst_rist_sink_get_bonds (sink));
1041       break;
1042
1043     case PROP_BONDING_METHOD:
1044       g_value_set_enum (value, sink->bonding_method);
1045       break;
1046
1047     case PROP_DISPATCHER:
1048       g_value_set_object (value, sink->dispatcher);
1049       break;
1050
1051     default:
1052       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1053       break;
1054   }
1055
1056   g_mutex_unlock (&sink->bonds_lock);
1057 }
1058
1059 static void
1060 gst_rist_sink_set_property (GObject * object, guint prop_id,
1061     const GValue * value, GParamSpec * pspec)
1062 {
1063   GstRistSink *sink = GST_RIST_SINK (object);
1064   GstStructure *sdes;
1065   RistSenderBond *bond;
1066
1067   if (sink->construct_failed)
1068     return;
1069
1070   g_mutex_lock (&sink->bonds_lock);
1071
1072   bond = g_ptr_array_index (sink->bonds, 0);
1073
1074   switch (prop_id) {
1075     case PROP_ADDRESS:
1076       g_free (bond->address);
1077       bond->address = g_value_dup_string (value);
1078       g_object_set_property (G_OBJECT (bond->rtp_sink), "host", value);
1079       g_object_set_property (G_OBJECT (bond->rtcp_sink), "host", value);
1080       break;
1081
1082     case PROP_PORT:{
1083       guint port = g_value_get_uint (value);
1084
1085       /* According to 5.1.1, RTCP receiver port most be event number and RTCP
1086        * port should be the RTP port + 1 */
1087
1088       if (port & 0x1) {
1089         g_warning ("Invalid RIST port %u, should be an even number.", port);
1090         return;
1091       }
1092
1093       bond->port = port;
1094       g_object_set (bond->rtp_sink, "port", port, NULL);
1095       g_object_set (bond->rtcp_sink, "port", port + 1, NULL);
1096       break;
1097     }
1098
1099     case PROP_SENDER_BUFFER:
1100       g_object_set (bond->rtx_send,
1101           "max-size-time", g_value_get_uint (value), NULL);
1102       break;
1103
1104     case PROP_MIN_RTCP_INTERVAL:
1105       sink->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
1106       break;
1107
1108     case PROP_MAX_RTCP_BANDWIDTH:
1109       sink->max_rtcp_bandwidth = g_value_get_double (value);
1110       break;
1111
1112     case PROP_STATS_UPDATE_INTERVAL:
1113       sink->stats_interval = g_value_get_uint (value);
1114       break;
1115
1116     case PROP_CNAME:
1117       g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
1118       gst_structure_set_value (sdes, "cname", value);
1119       g_object_set (sink->rtpbin, "sdes", sdes, NULL);
1120       gst_structure_free (sdes);
1121       break;
1122
1123     case PROP_MULTICAST_LOOPBACK:
1124       gst_rist_sink_set_multicast_loopback (sink, g_value_get_boolean (value));
1125       break;
1126
1127     case PROP_MULTICAST_IFACE:
1128       g_free (bond->multicast_iface);
1129       bond->multicast_iface = g_value_dup_string (value);
1130       g_object_set_property (G_OBJECT (bond->rtp_sink),
1131           "multicast-iface", value);
1132       g_object_set_property (G_OBJECT (bond->rtcp_sink),
1133           "multicast-iface", value);
1134       break;
1135
1136     case PROP_MULTICAST_TTL:
1137       gst_rist_sink_set_multicast_ttl (sink, g_value_get_int (value));
1138       break;
1139
1140     case PROP_BONDING_ADDRESSES:
1141       gst_rist_sink_set_bonds (sink, g_value_get_string (value));
1142       break;
1143
1144     case PROP_BONDING_METHOD:
1145       sink->bonding_method = g_value_get_enum (value);
1146       break;
1147
1148     case PROP_DISPATCHER:
1149       if (sink->dispatcher)
1150         g_object_unref (sink->dispatcher);
1151       sink->dispatcher = g_object_ref_sink (g_value_get_object (value));
1152       break;
1153
1154     default:
1155       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1156       break;
1157   }
1158
1159   g_mutex_unlock (&sink->bonds_lock);
1160 }
1161
1162 static void
1163 gst_rist_sink_finalize (GObject * object)
1164 {
1165   GstRistSink *sink = GST_RIST_SINK (object);
1166   gint i;
1167
1168   g_mutex_lock (&sink->bonds_lock);
1169
1170   for (i = 0; i < sink->bonds->len; i++) {
1171     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
1172     g_free (bond->address);
1173     g_free (bond->multicast_iface);
1174     g_slice_free (RistSenderBond, bond);
1175   }
1176   g_ptr_array_free (sink->bonds, TRUE);
1177
1178   g_clear_object (&sink->rtxbin);
1179
1180   g_mutex_unlock (&sink->bonds_lock);
1181   g_mutex_clear (&sink->bonds_lock);
1182
1183   G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object);
1184 }
1185
1186 static void
1187 gst_rist_sink_class_init (GstRistSinkClass * klass)
1188 {
1189   GstElementClass *element_class = (GstElementClass *) klass;
1190   GObjectClass *object_class = (GObjectClass *) klass;
1191
1192   gst_element_class_set_metadata (element_class,
1193       "RIST Sink", "Source/Network",
1194       "Sink that implements RIST TR-06-1 streaming specification",
1195       "Nicolas Dufresne <nicolas.dufresne@collabora.com");
1196   gst_element_class_add_static_pad_template (element_class, &sink_templ);
1197
1198   element_class->change_state = gst_rist_sink_change_state;
1199
1200   object_class->get_property = gst_rist_sink_get_property;
1201   object_class->set_property = gst_rist_sink_set_property;
1202   object_class->finalize = gst_rist_sink_finalize;
1203
1204   g_object_class_install_property (object_class, PROP_ADDRESS,
1205       g_param_spec_string ("address", "Address",
1206           "Address to send packets to (can be IPv4 or IPv6).", "0.0.0.0",
1207           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1208
1209   g_object_class_install_property (object_class, PROP_PORT,
1210       g_param_spec_uint ("port", "Port", "The port RTP packets will be sent, "
1211           "the RTCP port is this value + 1. This port must be an even number.",
1212           2, 65534, 5004,
1213           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1214
1215   g_object_class_install_property (object_class, PROP_SENDER_BUFFER,
1216       g_param_spec_uint ("sender-buffer", "Sender Buffer",
1217           "Size of the retransmission queue (in ms)", 0, G_MAXUINT, 1200,
1218           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1219
1220   g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
1221       g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
1222           "The minimum interval (in ms) between two regular successive RTCP "
1223           "packets.", 0, 100, 100,
1224           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1225
1226   g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
1227       g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
1228           "The maximum bandwidth used for RTCP as a fraction of RTP bandwdith",
1229           0.0, 0.05, 0.05,
1230           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1231
1232   g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
1233       g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
1234           "The interval between 'stats' update notification (in ms) (0 disabled)",
1235           0, G_MAXUINT, 0,
1236           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1237
1238   g_object_class_install_property (object_class, PROP_STATS,
1239       g_param_spec_boxed ("stats", "Statistics",
1240           "Statistic in a GstStructure named 'rist/x-sender-stats'",
1241           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1242
1243   g_object_class_install_property (object_class, PROP_CNAME,
1244       g_param_spec_string ("cname", "CName",
1245           "Set the CNAME in the SDES block of the sender report.", NULL,
1246           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1247           GST_PARAM_DOC_SHOW_DEFAULT));
1248
1249   g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
1250       g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
1251           "When enabled, the packet will be received locally.", FALSE,
1252           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1253
1254   g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
1255       g_param_spec_string ("multicast-iface", "multicast-iface",
1256           "The multicast interface to use to send packets.", NULL,
1257           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1258
1259   g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
1260       g_param_spec_int ("multicast-ttl", "Multicast TTL",
1261           "The multicast time-to-live parameter.", 0, 255, 1,
1262           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1263
1264   g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
1265       g_param_spec_string ("bonding-addresses", "Bonding Addresses",
1266           "Comma (,) seperated list of <address>:<port> to send to. ", NULL,
1267           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1268
1269   g_object_class_install_property (object_class, PROP_BONDING_METHOD,
1270       g_param_spec_enum ("bonding-method", "Bonding Method",
1271           "Defines the bonding method to use.",
1272           gst_rist_bonding_method_get_type (),
1273           GST_RIST_BONDING_METHOD_BROADCAST,
1274           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1275
1276   g_object_class_install_property (object_class, PROP_DISPATCHER,
1277       g_param_spec_object ("dispatcher", "Bonding Dispatcher",
1278           "An element that takes care of multi-plexing bonded links. When set "
1279           "\"bonding-method\" is ignored.",
1280           GST_TYPE_ELEMENT,
1281           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1282           GST_PARAM_MUTABLE_READY));
1283 }