webrtcbin: an element that handles the transport aspects of webrtc connections
[platform/upstream/gst-plugins-bad.git] / ext / webrtc / transportreceivebin.c
1 /* GStreamer
2  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "transportreceivebin.h"
25 #include "utils.h"
26
27 /*
28  * ,----------------------------transport_receive_%u-----------------------------,
29  * ;       (rtp)                                                                 ;
30  * ;  ,---nicesrc----,  ,-capsfilter-,  ,----dtlssrtpdec----,      ,--funnel--,  ;
31  * ;  ;          src o--o sink   src o--o sink      rtp_src o------o sink_0   ;  ;
32  * ;  '--------------'  '------------'  ;                   ;      ;      src o--o rtp_src
33  * ;                                    ;          rtcp_src o-, ,--o sink_1   ;  ;
34  * ;                                    '-------------------' ; ;  '----------'  ;
35  * ;                                                          ; ;  ,--funnel--,  ;
36  * ;                                                          '-+--o sink_0   ;  ;
37  * ;                                                          ,-'  ;      src o--o rtcp_src
38  * ;       (rtcp)                                             ;  ,-o sink_1   ;  ;
39  * ;  ,---nicesrc----,  ,-capsfilter-,  ,----dtlssrtpdec----, ;  ; '----------'  ;
40  * ;  ;          src o--o sink   src o--o sink      rtp_src o-'  ;               ;
41  * ;  '--------------'  '------------'  ;                   ;    ;               ;
42  * ;                                    ;          rtcp_src o----'               ;
43  * ;                                    '-------------------'                    ;
44  * '-----------------------------------------------------------------------------'
45  *
46  * Do we really wnat to be *that* permissive in what we accept?
47  *
48  * FIXME: When and how do we want to clear the possibly stored buffers?
49  */
50
51 #define GST_CAT_DEFAULT gst_webrtc_transport_receive_bin_debug
52 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
53
54 #define transport_receive_bin_parent_class parent_class
55 G_DEFINE_TYPE_WITH_CODE (TransportReceiveBin, transport_receive_bin,
56     GST_TYPE_BIN,
57     GST_DEBUG_CATEGORY_INIT (gst_webrtc_transport_receive_bin_debug,
58         "webrtctransportreceivebin", 0, "webrtctransportreceivebin");
59     );
60
61 static GstStaticPadTemplate rtp_sink_template =
62 GST_STATIC_PAD_TEMPLATE ("rtp_src",
63     GST_PAD_SINK,
64     GST_PAD_ALWAYS,
65     GST_STATIC_CAPS ("application/x-rtp"));
66
67 static GstStaticPadTemplate rtcp_sink_template =
68 GST_STATIC_PAD_TEMPLATE ("rtcp_src",
69     GST_PAD_SINK,
70     GST_PAD_ALWAYS,
71     GST_STATIC_CAPS ("application/x-rtp"));
72
73 enum
74 {
75   PROP_0,
76   PROP_STREAM,
77 };
78
79 static const gchar *
80 _receive_state_to_string (ReceiveState state)
81 {
82   switch (state) {
83     case RECEIVE_STATE_BLOCK:
84       return "block";
85     case RECEIVE_STATE_DROP:
86       return "drop";
87     case RECEIVE_STATE_PASS:
88       return "pass";
89     default:
90       return "Unknown";
91   }
92 }
93
94 static GstPadProbeReturn
95 pad_block (GstPad * pad, GstPadProbeInfo * info, TransportReceiveBin * receive)
96 {
97   GstPadProbeReturn ret;
98
99   g_mutex_lock (&receive->pad_block_lock);
100   while (receive->receive_state == RECEIVE_STATE_BLOCK) {
101     g_cond_wait (&receive->pad_block_cond, &receive->pad_block_lock);
102     GST_DEBUG_OBJECT (pad, "probe waited. new state %s",
103         _receive_state_to_string (receive->receive_state));
104   }
105   ret = GST_PAD_PROBE_PASS;
106
107   if (receive->receive_state == RECEIVE_STATE_DROP) {
108     ret = GST_PAD_PROBE_DROP;
109   } else if (receive->receive_state == RECEIVE_STATE_PASS) {
110     ret = GST_PAD_PROBE_OK;
111   }
112
113   g_mutex_unlock (&receive->pad_block_lock);
114
115   return ret;
116 }
117
118 void
119 transport_receive_bin_set_receive_state (TransportReceiveBin * receive,
120     ReceiveState state)
121 {
122   g_mutex_lock (&receive->pad_block_lock);
123   receive->receive_state = state;
124   GST_DEBUG_OBJECT (receive, "changing receive state to %s",
125       _receive_state_to_string (state));
126   g_cond_signal (&receive->pad_block_cond);
127   g_mutex_unlock (&receive->pad_block_lock);
128 }
129
130 static void
131 transport_receive_bin_set_property (GObject * object, guint prop_id,
132     const GValue * value, GParamSpec * pspec)
133 {
134   TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
135
136   GST_OBJECT_LOCK (receive);
137   switch (prop_id) {
138     case PROP_STREAM:
139       /* XXX: weak-ref this? */
140       receive->stream = TRANSPORT_STREAM (g_value_get_object (value));
141       break;
142     default:
143       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
144       break;
145   }
146   GST_OBJECT_UNLOCK (receive);
147 }
148
149 static void
150 transport_receive_bin_get_property (GObject * object, guint prop_id,
151     GValue * value, GParamSpec * pspec)
152 {
153   TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
154
155   GST_OBJECT_LOCK (receive);
156   switch (prop_id) {
157     case PROP_STREAM:
158       g_value_set_object (value, receive->stream);
159       break;
160     default:
161       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
162       break;
163   }
164   GST_OBJECT_UNLOCK (receive);
165 }
166
167 static void
168 transport_receive_bin_finalize (GObject * object)
169 {
170   TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
171
172   g_mutex_clear (&receive->pad_block_lock);
173   g_cond_clear (&receive->pad_block_cond);
174
175   G_OBJECT_CLASS (parent_class)->finalize (object);
176 }
177
178 static GstStateChangeReturn
179 transport_receive_bin_change_state (GstElement * element,
180     GstStateChange transition)
181 {
182   TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (element);
183   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
184
185   GST_DEBUG ("changing state: %s => %s",
186       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
187       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
188
189   switch (transition) {
190     case GST_STATE_CHANGE_NULL_TO_READY:{
191       GstElement *elem;
192
193       receive->rtp_block =
194           _create_pad_block (GST_ELEMENT (receive), receive->rtp_src, 0, NULL,
195           NULL);
196       receive->rtp_block->block_id =
197           gst_pad_add_probe (receive->rtp_src, GST_PAD_PROBE_TYPE_ALL_BOTH,
198           (GstPadProbeCallback) pad_block, receive, NULL);
199
200       /* XXX: because nice needs the nicesrc internal main loop running in order
201        * correctly STUN... */
202       /* FIXME: this races with the pad exposure later and may get not-linked */
203       elem = receive->stream->transport->transport->src;
204       gst_element_set_locked_state (elem, TRUE);
205       gst_element_set_state (elem, GST_STATE_PLAYING);
206       elem = receive->stream->rtcp_transport->transport->src;
207       gst_element_set_locked_state (elem, TRUE);
208       gst_element_set_state (elem, GST_STATE_PLAYING);
209       break;
210     }
211     default:
212       break;
213   }
214
215   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
216   if (ret == GST_STATE_CHANGE_FAILURE)
217     return ret;
218
219   switch (transition) {
220     case GST_STATE_CHANGE_READY_TO_NULL:{
221       GstElement *elem;
222
223       elem = receive->stream->transport->transport->src;
224       gst_element_set_locked_state (elem, FALSE);
225       gst_element_set_state (elem, GST_STATE_NULL);
226       elem = receive->stream->rtcp_transport->transport->src;
227       gst_element_set_locked_state (elem, FALSE);
228       gst_element_set_state (elem, GST_STATE_NULL);
229
230       if (receive->rtp_block)
231         _free_pad_block (receive->rtp_block);
232       receive->rtp_block = NULL;
233       break;
234     }
235     default:
236       break;
237   }
238
239   return ret;
240 }
241
242 static void
243 rtp_queue_overrun (GstElement * queue, TransportReceiveBin * receive)
244 {
245   GST_WARNING_OBJECT (receive, "Internal receive queue overrun. Dropping data");
246 }
247
248 static void
249 transport_receive_bin_constructed (GObject * object)
250 {
251   TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
252   GstWebRTCDTLSTransport *transport;
253   GstPad *ghost, *pad;
254   GstElement *capsfilter, *funnel, *queue;
255   GstCaps *caps;
256
257   g_return_if_fail (receive->stream);
258
259   /* link ice src, dtlsrtp together for rtp */
260   transport = receive->stream->transport;
261   gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->dtlssrtpdec));
262
263   capsfilter = gst_element_factory_make ("capsfilter", NULL);
264   caps = gst_caps_new_empty_simple ("application/x-rtp");
265   g_object_set (capsfilter, "caps", caps, NULL);
266   gst_caps_unref (caps);
267
268   gst_bin_add (GST_BIN (receive), GST_ELEMENT (capsfilter));
269   if (!gst_element_link_pads (capsfilter, "src", transport->dtlssrtpdec,
270           "sink"))
271     g_warn_if_reached ();
272
273   gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->transport->src));
274
275   if (!gst_element_link_pads (GST_ELEMENT (transport->transport->src), "src",
276           GST_ELEMENT (capsfilter), "sink"))
277     g_warn_if_reached ();
278
279   /* link ice src, dtlsrtp together for rtcp */
280   transport = receive->stream->rtcp_transport;
281   gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->dtlssrtpdec));
282
283   capsfilter = gst_element_factory_make ("capsfilter", NULL);
284   caps = gst_caps_new_empty_simple ("application/x-rtcp");
285   g_object_set (capsfilter, "caps", caps, NULL);
286   gst_caps_unref (caps);
287
288   gst_bin_add (GST_BIN (receive), GST_ELEMENT (capsfilter));
289   if (!gst_element_link_pads (capsfilter, "src", transport->dtlssrtpdec,
290           "sink"))
291     g_warn_if_reached ();
292
293   gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->transport->src));
294
295   if (!gst_element_link_pads (GST_ELEMENT (transport->transport->src), "src",
296           GST_ELEMENT (capsfilter), "sink"))
297     g_warn_if_reached ();
298
299   /* create funnel for rtp_src */
300   funnel = gst_element_factory_make ("funnel", NULL);
301   gst_bin_add (GST_BIN (receive), funnel);
302   if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
303           "rtp_src", funnel, "sink_0"))
304     g_warn_if_reached ();
305   if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
306           "rtp_src", funnel, "sink_1"))
307     g_warn_if_reached ();
308
309   queue = gst_element_factory_make ("queue", NULL);
310   /* FIXME: make this configurable? */
311   g_object_set (queue, "leaky", 2, "max-size-time", (guint64) 0,
312       "max-size-buffers", 0, "max-size-bytes", 5 * 1024 * 1024, NULL);
313   g_signal_connect (queue, "overrun", G_CALLBACK (rtp_queue_overrun), receive);
314   gst_bin_add (GST_BIN (receive), queue);
315   if (!gst_element_link_pads (funnel, "src", queue, "sink"))
316     g_warn_if_reached ();
317
318   pad = gst_element_get_static_pad (queue, "src");
319   receive->rtp_src = gst_ghost_pad_new ("rtp_src", pad);
320
321   gst_element_add_pad (GST_ELEMENT (receive), receive->rtp_src);
322   gst_object_unref (pad);
323
324   /* create funnel for rtcp_src */
325   funnel = gst_element_factory_make ("funnel", NULL);
326   gst_bin_add (GST_BIN (receive), funnel);
327   if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
328           "rtcp_src", funnel, "sink_0"))
329     g_warn_if_reached ();
330   if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
331           "rtcp_src", funnel, "sink_1"))
332     g_warn_if_reached ();
333
334   pad = gst_element_get_static_pad (funnel, "src");
335   ghost = gst_ghost_pad_new ("rtcp_src", pad);
336   gst_element_add_pad (GST_ELEMENT (receive), ghost);
337   gst_object_unref (pad);
338
339   G_OBJECT_CLASS (parent_class)->constructed (object);
340 }
341
342 static void
343 transport_receive_bin_class_init (TransportReceiveBinClass * klass)
344 {
345   GObjectClass *gobject_class = (GObjectClass *) klass;
346   GstElementClass *element_class = (GstElementClass *) klass;
347
348   element_class->change_state = transport_receive_bin_change_state;
349
350   gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
351   gst_element_class_add_static_pad_template (element_class,
352       &rtcp_sink_template);
353
354   gst_element_class_set_metadata (element_class, "WebRTC Transport Receive Bin",
355       "Filter/Network/WebRTC", "A bin for webrtc connections",
356       "Matthew Waters <matthew@centricular.com>");
357
358   gobject_class->constructed = transport_receive_bin_constructed;
359   gobject_class->get_property = transport_receive_bin_get_property;
360   gobject_class->set_property = transport_receive_bin_set_property;
361   gobject_class->finalize = transport_receive_bin_finalize;
362
363   g_object_class_install_property (gobject_class,
364       PROP_STREAM,
365       g_param_spec_object ("stream", "Stream",
366           "The TransportStream for this receiveing bin",
367           transport_stream_get_type (),
368           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
369 }
370
371 static void
372 transport_receive_bin_init (TransportReceiveBin * receive)
373 {
374   g_mutex_init (&receive->pad_block_lock);
375   g_cond_init (&receive->pad_block_cond);
376 }