fde12613c8d6bbd2b6e4f764a66071ac91d75db8
[platform/upstream/gstreamer.git] / ext / webrtc / webrtcdatachannel.c
1 /* GStreamer
2  * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:gstwebrtc-datachannel
22  * @short_description: RTCDataChannel object
23  * @title: GstWebRTCDataChannel
24  * @see_also: #GstWebRTCRTPTransceiver
25  *
26  * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
27  */
28
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
40
41 #include "gstwebrtcbin.h"
42 #include "utils.h"
43
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
46
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49     GST_TYPE_WEBRTC_DATA_CHANNEL,
50     GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51         "webrtcdatachannel"););
52
53 typedef enum
54 {
55   DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
56   DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
57   DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
58   DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
59   DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
60   DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
61   DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
62 } DataChannelPPID;
63
64 typedef enum
65 {
66   CHANNEL_TYPE_RELIABLE = 0x00,
67   CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
68   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
69   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
70   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
71   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
72 } DataChannelReliabilityType;
73
74 typedef enum
75 {
76   CHANNEL_MESSAGE_ACK = 0x02,
77   CHANNEL_MESSAGE_OPEN = 0x03,
78 } DataChannelMessage;
79
80 static guint16
81 priority_type_to_uint (GstWebRTCPriorityType pri)
82 {
83   switch (pri) {
84     case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
85       return 64;
86     case GST_WEBRTC_PRIORITY_TYPE_LOW:
87       return 192;
88     case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
89       return 384;
90     case GST_WEBRTC_PRIORITY_TYPE_HIGH:
91       return 768;
92   }
93   g_assert_not_reached ();
94   return 0;
95 }
96
97 static GstWebRTCPriorityType
98 priority_uint_to_type (guint16 val)
99 {
100   if (val <= 128)
101     return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
102   if (val <= 256)
103     return GST_WEBRTC_PRIORITY_TYPE_LOW;
104   if (val <= 512)
105     return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
106   return GST_WEBRTC_PRIORITY_TYPE_HIGH;
107 }
108
109 static GstBuffer *
110 construct_open_packet (WebRTCDataChannel * channel)
111 {
112   GstByteWriter w;
113   gsize label_len = strlen (channel->parent.label);
114   gsize proto_len = strlen (channel->parent.protocol);
115   gsize size = 12 + label_len + proto_len;
116   DataChannelReliabilityType reliability = 0;
117   guint32 reliability_param = 0;
118   guint16 priority;
119   GstBuffer *buf;
120
121 /*
122  *    0                   1                   2                   3
123  *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
124  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
125  *   |  Message Type |  Channel Type |            Priority           |
126  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
127  *   |                    Reliability Parameter                      |
128  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
129  *   |         Label Length          |       Protocol Length         |
130  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
131  *   \                                                               /
132  *   |                             Label                             |
133  *   /                                                               \
134  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
135  *   \                                                               /
136  *   |                            Protocol                           |
137  *   /                                                               \
138  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
139  */
140
141   gst_byte_writer_init_with_size (&w, size, FALSE);
142
143   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
144     g_return_val_if_reached (NULL);
145
146   if (!channel->parent.ordered)
147     reliability |= 0x80;
148   if (channel->parent.max_retransmits != -1) {
149     reliability |= 0x01;
150     reliability_param = channel->parent.max_retransmits;
151   }
152   if (channel->parent.max_packet_lifetime != -1) {
153     reliability |= 0x02;
154     reliability_param = channel->parent.max_packet_lifetime;
155   }
156
157   priority = priority_type_to_uint (channel->parent.priority);
158
159   if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
160     g_return_val_if_reached (NULL);
161   if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
162     g_return_val_if_reached (NULL);
163   if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
164     g_return_val_if_reached (NULL);
165   if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
166     g_return_val_if_reached (NULL);
167   if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
168     g_return_val_if_reached (NULL);
169   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
170           label_len))
171     g_return_val_if_reached (NULL);
172   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
173           proto_len))
174     g_return_val_if_reached (NULL);
175
176   buf = gst_byte_writer_reset_and_get_buffer (&w);
177
178   /* send reliable and ordered */
179   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
180       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
181
182   return buf;
183 }
184
185 static GstBuffer *
186 construct_ack_packet (WebRTCDataChannel * channel)
187 {
188   GstByteWriter w;
189   GstBuffer *buf;
190
191 /*
192  *   0                   1                   2                   3
193  *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
194  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
195  *   |  Message Type |
196  *   +-+-+-+-+-+-+-+-+
197  */
198
199   gst_byte_writer_init_with_size (&w, 1, FALSE);
200
201   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
202     g_return_val_if_reached (NULL);
203
204   buf = gst_byte_writer_reset_and_get_buffer (&w);
205
206   /* send reliable and ordered */
207   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
208       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
209
210   return buf;
211 }
212
213 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
214     gpointer user_data);
215
216 struct task
217 {
218   GstWebRTCDataChannel *channel;
219   ChannelTask func;
220   gpointer user_data;
221   GDestroyNotify notify;
222 };
223
224 static void
225 _execute_task (GstWebRTCBin * webrtc, struct task *task)
226 {
227   if (task->func)
228     task->func (task->channel, task->user_data);
229 }
230
231 static void
232 _free_task (struct task *task)
233 {
234   gst_object_unref (task->channel);
235
236   if (task->notify)
237     task->notify (task->user_data);
238   g_free (task);
239 }
240
241 static void
242 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
243     gpointer user_data, GDestroyNotify notify)
244 {
245   struct task *task = g_new0 (struct task, 1);
246
247   task->channel = gst_object_ref (channel);
248   task->func = func;
249   task->user_data = user_data;
250   task->notify = notify;
251
252   gst_webrtc_bin_enqueue_task (channel->webrtcbin,
253       (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
254       NULL);
255 }
256
257 static void
258 _channel_store_error (WebRTCDataChannel * channel, GError * error)
259 {
260   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
261   if (error) {
262     GST_WARNING_OBJECT (channel, "Error: %s",
263         error ? error->message : "Unknown");
264     if (!channel->stored_error)
265       channel->stored_error = error;
266     else
267       g_clear_error (&error);
268   }
269   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
270 }
271
272 static void
273 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
274 {
275   gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
276 }
277
278 static void
279 _transport_closed (WebRTCDataChannel * channel)
280 {
281   GError *error;
282
283   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
284   error = channel->stored_error;
285   channel->stored_error = NULL;
286   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
287
288   if (error) {
289     gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
290     g_clear_error (&error);
291   }
292   gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
293 }
294
295 static void
296 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
297 {
298   GstPad *pad, *peer;
299
300   pad = gst_element_get_static_pad (channel->appsrc, "src");
301   peer = gst_pad_get_peer (pad);
302   gst_object_unref (pad);
303
304   if (peer) {
305     GstElement *sctpenc = gst_pad_get_parent_element (peer);
306
307     if (sctpenc) {
308       gst_element_release_request_pad (sctpenc, peer);
309       gst_object_unref (sctpenc);
310     }
311     gst_object_unref (peer);
312   }
313
314   _transport_closed (channel);
315 }
316
317 static void
318 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
319 {
320   /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
321   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
322   if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED
323       || channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
324     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
325     return;
326   }
327   channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
328   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
329   g_object_notify (G_OBJECT (channel), "ready-state");
330
331   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
332   if (channel->parent.buffered_amount <= 0) {
333     _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
334         NULL, NULL);
335   }
336
337   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
338 }
339
340 static void
341 _on_sctp_reset_stream (GstWebRTCSCTPTransport * sctp, guint stream_id,
342     WebRTCDataChannel * channel)
343 {
344   if (channel->parent.id == stream_id)
345     _channel_enqueue_task (channel, (ChannelTask) _transport_closed,
346         GUINT_TO_POINTER (stream_id), NULL);
347 }
348
349 static void
350 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
351 {
352   _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
353 }
354
355 static GstFlowReturn
356 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
357     gsize size, GError ** error)
358 {
359   GstByteReader r;
360   guint8 message_type;
361   gchar *label = NULL;
362   gchar *proto = NULL;
363
364   if (!data)
365     g_return_val_if_reached (GST_FLOW_ERROR);
366   if (size < 1)
367     g_return_val_if_reached (GST_FLOW_ERROR);
368
369   gst_byte_reader_init (&r, data, size);
370
371   if (!gst_byte_reader_get_uint8 (&r, &message_type))
372     g_return_val_if_reached (GST_FLOW_ERROR);
373
374   if (message_type == CHANNEL_MESSAGE_ACK) {
375     /* all good */
376     GST_INFO_OBJECT (channel, "Received channel ack");
377     return GST_FLOW_OK;
378   } else if (message_type == CHANNEL_MESSAGE_OPEN) {
379     guint8 reliability;
380     guint32 reliability_param;
381     guint16 priority, label_len, proto_len;
382     const guint8 *src;
383     GstBuffer *buffer;
384     GstFlowReturn ret;
385
386     GST_INFO_OBJECT (channel, "Received channel open");
387
388     if (channel->parent.negotiated) {
389       g_set_error (error, GST_WEBRTC_BIN_ERROR,
390           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
391           "Data channel was signalled as negotiated already");
392       g_return_val_if_reached (GST_FLOW_ERROR);
393     }
394
395     if (channel->opened)
396       return GST_FLOW_OK;
397
398     if (!gst_byte_reader_get_uint8 (&r, &reliability))
399       goto parse_error;
400     if (!gst_byte_reader_get_uint16_be (&r, &priority))
401       goto parse_error;
402     if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
403       goto parse_error;
404     if (!gst_byte_reader_get_uint16_be (&r, &label_len))
405       goto parse_error;
406     if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
407       goto parse_error;
408
409     label = g_new0 (gchar, (gsize) label_len + 1);
410     proto = g_new0 (gchar, (gsize) proto_len + 1);
411
412     if (!gst_byte_reader_get_data (&r, label_len, &src))
413       goto parse_error;
414     memcpy (label, src, label_len);
415     label[label_len] = '\0';
416     if (!gst_byte_reader_get_data (&r, proto_len, &src))
417       goto parse_error;
418     memcpy (proto, src, proto_len);
419     proto[proto_len] = '\0';
420
421     g_free (channel->parent.label);
422     channel->parent.label = label;
423     g_free (channel->parent.protocol);
424     channel->parent.protocol = proto;
425     channel->parent.priority = priority_uint_to_type (priority);
426     channel->parent.ordered = !(reliability & 0x80);
427     if (reliability & 0x01) {
428       channel->parent.max_retransmits = reliability_param;
429       channel->parent.max_packet_lifetime = -1;
430     } else if (reliability & 0x02) {
431       channel->parent.max_retransmits = -1;
432       channel->parent.max_packet_lifetime = reliability_param;
433     } else {
434       channel->parent.max_retransmits = -1;
435       channel->parent.max_packet_lifetime = -1;
436     }
437     channel->opened = TRUE;
438
439     GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
440         "label %s protocol %s ordered %s", channel->parent.id,
441         channel->parent.label, channel->parent.protocol,
442         channel->parent.ordered ? "true" : "false");
443
444     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
445
446     GST_INFO_OBJECT (channel, "Sending channel ack");
447     buffer = construct_ack_packet (channel);
448
449     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
450     channel->parent.buffered_amount += gst_buffer_get_size (buffer);
451     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
452
453     ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
454     if (ret != GST_FLOW_OK) {
455       g_set_error (error, GST_WEBRTC_BIN_ERROR,
456           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
457           "Could not send ack packet");
458       return ret;
459     }
460
461     return ret;
462   } else {
463     g_set_error (error, GST_WEBRTC_BIN_ERROR,
464         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
465         "Unknown message type in control protocol");
466     return GST_FLOW_ERROR;
467   }
468
469 parse_error:
470   {
471     g_free (label);
472     g_free (proto);
473     g_set_error (error, GST_WEBRTC_BIN_ERROR,
474         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
475     g_return_val_if_reached (GST_FLOW_ERROR);
476   }
477 }
478
479 static void
480 on_sink_eos (GstAppSink * sink, gpointer user_data)
481 {
482 }
483
484 struct map_info
485 {
486   GstBuffer *buffer;
487   GstMapInfo map_info;
488 };
489
490 static void
491 buffer_unmap_and_unref (struct map_info *info)
492 {
493   gst_buffer_unmap (info->buffer, &info->map_info);
494   gst_buffer_unref (info->buffer);
495   g_free (info);
496 }
497
498 static void
499 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
500 {
501   gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
502       data);
503 }
504
505 static void
506 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
507 {
508   gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
509       str);
510 }
511
512 static GstFlowReturn
513 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
514     GError ** error)
515 {
516   GstSctpReceiveMeta *receive;
517   GstBuffer *buffer;
518   GstFlowReturn ret = GST_FLOW_OK;
519
520   GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
521
522   g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
523
524   buffer = gst_sample_get_buffer (sample);
525   if (!buffer) {
526     g_set_error (error, GST_WEBRTC_BIN_ERROR,
527         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
528     return GST_FLOW_ERROR;
529   }
530   receive = gst_sctp_buffer_get_receive_meta (buffer);
531   if (!receive) {
532     g_set_error (error, GST_WEBRTC_BIN_ERROR,
533         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
534         "No SCTP Receive meta on the buffer");
535     return GST_FLOW_ERROR;
536   }
537
538   switch (receive->ppid) {
539     case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
540       GstMapInfo info = GST_MAP_INFO_INIT;
541       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
542         g_set_error (error, GST_WEBRTC_BIN_ERROR,
543             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
544             "Failed to map received buffer");
545         ret = GST_FLOW_ERROR;
546       } else {
547         ret = _parse_control_packet (channel, info.data, info.size, error);
548         gst_buffer_unmap (buffer, &info);
549       }
550       break;
551     }
552     case DATA_CHANNEL_PPID_WEBRTC_STRING:
553     case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
554       GstMapInfo info = GST_MAP_INFO_INIT;
555       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
556         g_set_error (error, GST_WEBRTC_BIN_ERROR,
557             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
558             "Failed to map received buffer");
559         ret = GST_FLOW_ERROR;
560       } else {
561         gchar *str = g_strndup ((gchar *) info.data, info.size);
562         _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
563             g_free);
564         gst_buffer_unmap (buffer, &info);
565       }
566       break;
567     }
568     case DATA_CHANNEL_PPID_WEBRTC_BINARY:
569     case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
570       struct map_info *info = g_new0 (struct map_info, 1);
571       if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
572         g_set_error (error, GST_WEBRTC_BIN_ERROR,
573             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
574             "Failed to map received buffer");
575         ret = GST_FLOW_ERROR;
576       } else {
577         GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
578             info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
579         info->buffer = gst_buffer_ref (buffer);
580         _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
581             (GDestroyNotify) g_bytes_unref);
582       }
583       break;
584     }
585     case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
586       _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
587           NULL);
588       break;
589     case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
590       _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
591           NULL);
592       break;
593     default:
594       g_set_error (error, GST_WEBRTC_BIN_ERROR,
595           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
596           "Unknown SCTP PPID %u received", receive->ppid);
597       ret = GST_FLOW_ERROR;
598       break;
599   }
600
601   return ret;
602 }
603
604 static GstFlowReturn
605 on_sink_preroll (GstAppSink * sink, gpointer user_data)
606 {
607   WebRTCDataChannel *channel = user_data;
608   GstSample *sample = gst_app_sink_pull_preroll (sink);
609   GstFlowReturn ret;
610
611   if (sample) {
612     /* This sample also seems to be provided by the sample callback
613        ret = _data_channel_have_sample (channel, sample); */
614     ret = GST_FLOW_OK;
615     gst_sample_unref (sample);
616   } else if (gst_app_sink_is_eos (sink)) {
617     ret = GST_FLOW_EOS;
618   } else {
619     ret = GST_FLOW_ERROR;
620   }
621
622   if (ret != GST_FLOW_OK) {
623     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
624   }
625
626   return ret;
627 }
628
629 static GstFlowReturn
630 on_sink_sample (GstAppSink * sink, gpointer user_data)
631 {
632   WebRTCDataChannel *channel = user_data;
633   GstSample *sample = gst_app_sink_pull_sample (sink);
634   GstFlowReturn ret;
635   GError *error = NULL;
636
637   if (sample) {
638     ret = _data_channel_have_sample (channel, sample, &error);
639     gst_sample_unref (sample);
640   } else if (gst_app_sink_is_eos (sink)) {
641     ret = GST_FLOW_EOS;
642   } else {
643     ret = GST_FLOW_ERROR;
644   }
645
646   if (error)
647     _channel_store_error (channel, error);
648
649   if (ret != GST_FLOW_OK) {
650     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
651   }
652
653   return ret;
654 }
655
656 static GstAppSinkCallbacks sink_callbacks = {
657   on_sink_eos,
658   on_sink_preroll,
659   on_sink_sample,
660 };
661
662 void
663 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
664 {
665   GstBuffer *buffer;
666
667   g_return_if_fail (!channel->parent.negotiated);
668   g_return_if_fail (channel->parent.id != -1);
669   g_return_if_fail (channel->sctp_transport != NULL);
670
671   buffer = construct_open_packet (channel);
672
673   GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
674       "label %s protocol %s ordered %s", channel->parent.id,
675       channel->parent.label, channel->parent.protocol,
676       channel->parent.ordered ? "true" : "false");
677
678   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
679   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
680   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
681
682   if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
683           buffer) == GST_FLOW_OK) {
684     channel->opened = TRUE;
685     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
686   } else {
687     GError *error = NULL;
688     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
689         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
690         "Failed to send DCEP open packet");
691     _channel_store_error (channel, error);
692     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
693   }
694 }
695
696 static void
697 _get_sctp_reliability (WebRTCDataChannel * channel,
698     GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
699 {
700   if (channel->parent.max_retransmits != -1) {
701     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
702     *rel_param = channel->parent.max_retransmits;
703   } else if (channel->parent.max_packet_lifetime != -1) {
704     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
705     *rel_param = channel->parent.max_packet_lifetime;
706   } else {
707     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
708     *rel_param = 0;
709   }
710 }
711
712 static gboolean
713 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
714 {
715   return size <= channel->sctp_transport->max_message_size;
716 }
717
718 static void
719 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
720     GBytes * bytes)
721 {
722   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
723   GstSctpSendMetaPartiallyReliability reliability;
724   guint rel_param;
725   guint32 ppid;
726   GstBuffer *buffer;
727   GstFlowReturn ret;
728
729   if (!bytes) {
730     buffer = gst_buffer_new ();
731     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
732   } else {
733     gsize size;
734     guint8 *data;
735
736     data = (guint8 *) g_bytes_get_data (bytes, &size);
737     g_return_if_fail (data != NULL);
738     if (!_is_within_max_message_size (channel, size)) {
739       GError *error = NULL;
740       g_set_error (&error, GST_WEBRTC_BIN_ERROR,
741           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
742           "Requested to send data that is too large");
743       _channel_store_error (channel, error);
744       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
745           NULL);
746       return;
747     }
748
749     buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
750         0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
751     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
752   }
753
754   _get_sctp_reliability (channel, &reliability, &rel_param);
755   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
756       reliability, rel_param);
757
758   GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
759       buffer);
760
761   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
762   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
763   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
764
765   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
766
767   if (ret != GST_FLOW_OK) {
768     GError *error = NULL;
769     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
770         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
771     _channel_store_error (channel, error);
772     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
773   }
774 }
775
776 static void
777 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
778     const gchar * str)
779 {
780   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
781   GstSctpSendMetaPartiallyReliability reliability;
782   guint rel_param;
783   guint32 ppid;
784   GstBuffer *buffer;
785   GstFlowReturn ret;
786
787   if (!channel->parent.negotiated)
788     g_return_if_fail (channel->opened);
789   g_return_if_fail (channel->sctp_transport != NULL);
790
791   if (!str) {
792     buffer = gst_buffer_new ();
793     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
794   } else {
795     gsize size = strlen (str);
796     gchar *str_copy = g_strdup (str);
797
798     if (!_is_within_max_message_size (channel, size)) {
799       GError *error = NULL;
800       g_set_error (&error, GST_WEBRTC_BIN_ERROR,
801           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
802           "Requested to send a string that is too large");
803       _channel_store_error (channel, error);
804       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
805           NULL);
806       return;
807     }
808
809     buffer =
810         gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
811         size, 0, size, str_copy, g_free);
812     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
813   }
814
815   _get_sctp_reliability (channel, &reliability, &rel_param);
816   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
817       reliability, rel_param);
818
819   GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
820       buffer);
821
822   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
823   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
824   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
825
826   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
827
828   if (ret != GST_FLOW_OK) {
829     GError *error = NULL;
830     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
831         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
832     _channel_store_error (channel, error);
833     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
834   }
835 }
836
837 static void
838 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
839     WebRTCDataChannel * channel)
840 {
841   GstWebRTCSCTPTransportState state;
842
843   g_object_get (sctp_transport, "state", &state, NULL);
844   if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
845     if (channel->parent.negotiated)
846       _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
847   }
848 }
849
850 static void
851 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
852     WebRTCDataChannel * channel)
853 {
854   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
855   _on_sctp_notify_state_unlocked (sctp_transport, channel);
856   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
857 }
858
859 static void
860 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
861 {
862   gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
863       (channel));
864 }
865
866 static GstPadProbeReturn
867 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
868 {
869   WebRTCDataChannel *channel = user_data;
870   guint64 prev_amount;
871   guint64 size = 0;
872
873   if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
874     GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
875     size = gst_buffer_get_size (buffer);
876   } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
877     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
878     size = gst_buffer_list_calculate_size (list);
879   }
880
881   if (size > 0) {
882     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
883     prev_amount = channel->parent.buffered_amount;
884     channel->parent.buffered_amount -= size;
885     GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
886         G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
887         G_GUINT64_FORMAT, prev_amount,
888         channel->parent.buffered_amount_low_threshold,
889         channel->parent.buffered_amount);
890     if (prev_amount >= channel->parent.buffered_amount_low_threshold
891         && channel->parent.buffered_amount <
892         channel->parent.buffered_amount_low_threshold) {
893       _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
894           NULL);
895     }
896
897     if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
898         && channel->parent.buffered_amount <= 0) {
899       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
900           NULL);
901     }
902     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
903   }
904
905   return GST_PAD_PROBE_OK;
906 }
907
908 static void
909 gst_webrtc_data_channel_constructed (GObject * object)
910 {
911   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
912   GstPad *pad;
913   GstCaps *caps;
914
915   caps = gst_caps_new_any ();
916
917   channel->appsrc = gst_element_factory_make ("appsrc", NULL);
918   gst_object_ref_sink (channel->appsrc);
919   pad = gst_element_get_static_pad (channel->appsrc, "src");
920
921   channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
922       (GstPadProbeCallback) on_appsrc_data, channel, NULL);
923
924   channel->appsink = gst_element_factory_make ("appsink", NULL);
925   gst_object_ref_sink (channel->appsink);
926   g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
927       NULL);
928   gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
929       channel, NULL);
930
931   gst_object_unref (pad);
932   gst_caps_unref (caps);
933 }
934
935 static void
936 gst_webrtc_data_channel_finalize (GObject * object)
937 {
938   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
939
940   if (channel->src_probe) {
941     GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
942     gst_pad_remove_probe (pad, channel->src_probe);
943     gst_object_unref (pad);
944     channel->src_probe = 0;
945   }
946
947   if (channel->sctp_transport)
948     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
949   g_clear_object (&channel->sctp_transport);
950
951   g_clear_object (&channel->appsrc);
952   g_clear_object (&channel->appsink);
953
954   G_OBJECT_CLASS (parent_class)->finalize (object);
955 }
956
957 static void
958 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
959 {
960   GObjectClass *gobject_class = (GObjectClass *) klass;
961   GstWebRTCDataChannelClass *channel_class =
962       (GstWebRTCDataChannelClass *) klass;
963
964   gobject_class->constructed = gst_webrtc_data_channel_constructed;
965   gobject_class->finalize = gst_webrtc_data_channel_finalize;
966
967   channel_class->send_data = webrtc_data_channel_send_data;
968   channel_class->send_string = webrtc_data_channel_send_string;
969   channel_class->close = webrtc_data_channel_close;
970 }
971
972 static void
973 webrtc_data_channel_init (WebRTCDataChannel * channel)
974 {
975 }
976
977 static void
978 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
979     GstWebRTCSCTPTransport * sctp)
980 {
981   g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
982   g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
983
984   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
985   if (channel->sctp_transport)
986     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
987
988   gst_object_replace ((GstObject **) & channel->sctp_transport,
989       GST_OBJECT (sctp));
990
991   if (sctp) {
992     g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_reset_stream),
993         channel);
994     g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
995         channel);
996     _on_sctp_notify_state_unlocked (G_OBJECT (sctp), channel);
997   }
998   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
999 }
1000
1001 void
1002 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1003     GstWebRTCSCTPTransport * sctp_transport)
1004 {
1005   if (sctp_transport && !channel->sctp_transport) {
1006     gint id;
1007
1008     g_object_get (channel, "id", &id, NULL);
1009
1010     if (sctp_transport->association_established && id != -1) {
1011       gchar *pad_name;
1012
1013       _data_channel_set_sctp_transport (channel, sctp_transport);
1014       pad_name = g_strdup_printf ("sink_%u", id);
1015       if (!gst_element_link_pads (channel->appsrc, "src",
1016               channel->sctp_transport->sctpenc, pad_name))
1017         g_warn_if_reached ();
1018       g_free (pad_name);
1019     }
1020   }
1021 }