webrtcbin: Move GstPromise reply to operation framework
[platform/upstream/gstreamer.git] / ext / webrtc / webrtcdatachannel.c
1 /* GStreamer
2  * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:gstwebrtc-datachannel
22  * @short_description: RTCDataChannel object
23  * @title: GstWebRTCDataChannel
24  * @see_also: #GstWebRTCRTPTransceiver
25  *
26  * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
27  */
28
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
40
41 #include "gstwebrtcbin.h"
42 #include "utils.h"
43
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
46
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49     GST_TYPE_WEBRTC_DATA_CHANNEL,
50     GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51         "webrtcdatachannel"););
52
53 typedef enum
54 {
55   DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
56   DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
57   DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
58   DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
59   DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
60   DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
61   DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
62 } DataChannelPPID;
63
64 typedef enum
65 {
66   CHANNEL_TYPE_RELIABLE = 0x00,
67   CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
68   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
69   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
70   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
71   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
72 } DataChannelReliabilityType;
73
74 typedef enum
75 {
76   CHANNEL_MESSAGE_ACK = 0x02,
77   CHANNEL_MESSAGE_OPEN = 0x03,
78 } DataChannelMessage;
79
80 static guint16
81 priority_type_to_uint (GstWebRTCPriorityType pri)
82 {
83   switch (pri) {
84     case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
85       return 64;
86     case GST_WEBRTC_PRIORITY_TYPE_LOW:
87       return 192;
88     case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
89       return 384;
90     case GST_WEBRTC_PRIORITY_TYPE_HIGH:
91       return 768;
92   }
93   g_assert_not_reached ();
94   return 0;
95 }
96
97 static GstWebRTCPriorityType
98 priority_uint_to_type (guint16 val)
99 {
100   if (val <= 128)
101     return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
102   if (val <= 256)
103     return GST_WEBRTC_PRIORITY_TYPE_LOW;
104   if (val <= 512)
105     return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
106   return GST_WEBRTC_PRIORITY_TYPE_HIGH;
107 }
108
109 static GstBuffer *
110 construct_open_packet (WebRTCDataChannel * channel)
111 {
112   GstByteWriter w;
113   gsize label_len = strlen (channel->parent.label);
114   gsize proto_len = strlen (channel->parent.protocol);
115   gsize size = 12 + label_len + proto_len;
116   DataChannelReliabilityType reliability = 0;
117   guint32 reliability_param = 0;
118   guint16 priority;
119   GstBuffer *buf;
120
121 /*
122  *    0                   1                   2                   3
123  *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
124  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
125  *   |  Message Type |  Channel Type |            Priority           |
126  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
127  *   |                    Reliability Parameter                      |
128  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
129  *   |         Label Length          |       Protocol Length         |
130  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
131  *   \                                                               /
132  *   |                             Label                             |
133  *   /                                                               \
134  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
135  *   \                                                               /
136  *   |                            Protocol                           |
137  *   /                                                               \
138  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
139  */
140
141   gst_byte_writer_init_with_size (&w, size, FALSE);
142
143   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
144     g_return_val_if_reached (NULL);
145
146   if (!channel->parent.ordered)
147     reliability |= 0x80;
148   if (channel->parent.max_retransmits != -1) {
149     reliability |= 0x01;
150     reliability_param = channel->parent.max_retransmits;
151   }
152   if (channel->parent.max_packet_lifetime != -1) {
153     reliability |= 0x02;
154     reliability_param = channel->parent.max_packet_lifetime;
155   }
156
157   priority = priority_type_to_uint (channel->parent.priority);
158
159   if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
160     g_return_val_if_reached (NULL);
161   if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
162     g_return_val_if_reached (NULL);
163   if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
164     g_return_val_if_reached (NULL);
165   if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
166     g_return_val_if_reached (NULL);
167   if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
168     g_return_val_if_reached (NULL);
169   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
170           label_len))
171     g_return_val_if_reached (NULL);
172   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
173           proto_len))
174     g_return_val_if_reached (NULL);
175
176   buf = gst_byte_writer_reset_and_get_buffer (&w);
177
178   /* send reliable and ordered */
179   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
180       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
181
182   return buf;
183 }
184
185 static GstBuffer *
186 construct_ack_packet (WebRTCDataChannel * channel)
187 {
188   GstByteWriter w;
189   GstBuffer *buf;
190
191 /*
192  *   0                   1                   2                   3
193  *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
194  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
195  *   |  Message Type |
196  *   +-+-+-+-+-+-+-+-+
197  */
198
199   gst_byte_writer_init_with_size (&w, 1, FALSE);
200
201   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
202     g_return_val_if_reached (NULL);
203
204   buf = gst_byte_writer_reset_and_get_buffer (&w);
205
206   /* send reliable and ordered */
207   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
208       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
209
210   return buf;
211 }
212
213 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
214     gpointer user_data);
215
216 struct task
217 {
218   GstWebRTCDataChannel *channel;
219   ChannelTask func;
220   gpointer user_data;
221   GDestroyNotify notify;
222 };
223
224 static GstStructure *
225 _execute_task (GstWebRTCBin * webrtc, struct task *task)
226 {
227   if (task->func)
228     task->func (task->channel, task->user_data);
229
230   return NULL;
231 }
232
233 static void
234 _free_task (struct task *task)
235 {
236   gst_object_unref (task->channel);
237
238   if (task->notify)
239     task->notify (task->user_data);
240   g_free (task);
241 }
242
243 static void
244 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
245     gpointer user_data, GDestroyNotify notify)
246 {
247   struct task *task = g_new0 (struct task, 1);
248
249   task->channel = gst_object_ref (channel);
250   task->func = func;
251   task->user_data = user_data;
252   task->notify = notify;
253
254   gst_webrtc_bin_enqueue_task (channel->webrtcbin,
255       (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
256       NULL);
257 }
258
259 static void
260 _channel_store_error (WebRTCDataChannel * channel, GError * error)
261 {
262   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
263   if (error) {
264     GST_WARNING_OBJECT (channel, "Error: %s",
265         error ? error->message : "Unknown");
266     if (!channel->stored_error)
267       channel->stored_error = error;
268     else
269       g_clear_error (&error);
270   }
271   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
272 }
273
274 static void
275 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
276 {
277   gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
278 }
279
280 static void
281 _transport_closed (WebRTCDataChannel * channel)
282 {
283   GError *error;
284
285   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
286   error = channel->stored_error;
287   channel->stored_error = NULL;
288   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
289
290   if (error) {
291     gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
292     g_clear_error (&error);
293   }
294   gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
295 }
296
297 static void
298 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
299 {
300   GstPad *pad, *peer;
301
302   pad = gst_element_get_static_pad (channel->appsrc, "src");
303   peer = gst_pad_get_peer (pad);
304   gst_object_unref (pad);
305
306   if (peer) {
307     GstElement *sctpenc = gst_pad_get_parent_element (peer);
308
309     if (sctpenc) {
310       gst_element_release_request_pad (sctpenc, peer);
311       gst_object_unref (sctpenc);
312     }
313     gst_object_unref (peer);
314   }
315
316   _transport_closed (channel);
317 }
318
319 static void
320 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
321 {
322   /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
323   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
324   if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED
325       || channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
326     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
327     return;
328   }
329   channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
330   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
331   g_object_notify (G_OBJECT (channel), "ready-state");
332
333   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
334   if (channel->parent.buffered_amount <= 0) {
335     _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
336         NULL, NULL);
337   }
338
339   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
340 }
341
342 static void
343 _on_sctp_reset_stream (GstWebRTCSCTPTransport * sctp, guint stream_id,
344     WebRTCDataChannel * channel)
345 {
346   if (channel->parent.id == stream_id)
347     _channel_enqueue_task (channel, (ChannelTask) _transport_closed,
348         GUINT_TO_POINTER (stream_id), NULL);
349 }
350
351 static void
352 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
353 {
354   _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
355 }
356
357 static GstFlowReturn
358 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
359     gsize size, GError ** error)
360 {
361   GstByteReader r;
362   guint8 message_type;
363   gchar *label = NULL;
364   gchar *proto = NULL;
365
366   if (!data)
367     g_return_val_if_reached (GST_FLOW_ERROR);
368   if (size < 1)
369     g_return_val_if_reached (GST_FLOW_ERROR);
370
371   gst_byte_reader_init (&r, data, size);
372
373   if (!gst_byte_reader_get_uint8 (&r, &message_type))
374     g_return_val_if_reached (GST_FLOW_ERROR);
375
376   if (message_type == CHANNEL_MESSAGE_ACK) {
377     /* all good */
378     GST_INFO_OBJECT (channel, "Received channel ack");
379     return GST_FLOW_OK;
380   } else if (message_type == CHANNEL_MESSAGE_OPEN) {
381     guint8 reliability;
382     guint32 reliability_param;
383     guint16 priority, label_len, proto_len;
384     const guint8 *src;
385     GstBuffer *buffer;
386     GstFlowReturn ret;
387
388     GST_INFO_OBJECT (channel, "Received channel open");
389
390     if (channel->parent.negotiated) {
391       g_set_error (error, GST_WEBRTC_BIN_ERROR,
392           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
393           "Data channel was signalled as negotiated already");
394       g_return_val_if_reached (GST_FLOW_ERROR);
395     }
396
397     if (channel->opened)
398       return GST_FLOW_OK;
399
400     if (!gst_byte_reader_get_uint8 (&r, &reliability))
401       goto parse_error;
402     if (!gst_byte_reader_get_uint16_be (&r, &priority))
403       goto parse_error;
404     if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
405       goto parse_error;
406     if (!gst_byte_reader_get_uint16_be (&r, &label_len))
407       goto parse_error;
408     if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
409       goto parse_error;
410
411     label = g_new0 (gchar, (gsize) label_len + 1);
412     proto = g_new0 (gchar, (gsize) proto_len + 1);
413
414     if (!gst_byte_reader_get_data (&r, label_len, &src))
415       goto parse_error;
416     memcpy (label, src, label_len);
417     label[label_len] = '\0';
418     if (!gst_byte_reader_get_data (&r, proto_len, &src))
419       goto parse_error;
420     memcpy (proto, src, proto_len);
421     proto[proto_len] = '\0';
422
423     g_free (channel->parent.label);
424     channel->parent.label = label;
425     g_free (channel->parent.protocol);
426     channel->parent.protocol = proto;
427     channel->parent.priority = priority_uint_to_type (priority);
428     channel->parent.ordered = !(reliability & 0x80);
429     if (reliability & 0x01) {
430       channel->parent.max_retransmits = reliability_param;
431       channel->parent.max_packet_lifetime = -1;
432     } else if (reliability & 0x02) {
433       channel->parent.max_retransmits = -1;
434       channel->parent.max_packet_lifetime = reliability_param;
435     } else {
436       channel->parent.max_retransmits = -1;
437       channel->parent.max_packet_lifetime = -1;
438     }
439     channel->opened = TRUE;
440
441     GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
442         "label %s protocol %s ordered %s", channel->parent.id,
443         channel->parent.label, channel->parent.protocol,
444         channel->parent.ordered ? "true" : "false");
445
446     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
447
448     GST_INFO_OBJECT (channel, "Sending channel ack");
449     buffer = construct_ack_packet (channel);
450
451     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
452     channel->parent.buffered_amount += gst_buffer_get_size (buffer);
453     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
454
455     ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
456     if (ret != GST_FLOW_OK) {
457       g_set_error (error, GST_WEBRTC_BIN_ERROR,
458           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
459           "Could not send ack packet");
460       return ret;
461     }
462
463     return ret;
464   } else {
465     g_set_error (error, GST_WEBRTC_BIN_ERROR,
466         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
467         "Unknown message type in control protocol");
468     return GST_FLOW_ERROR;
469   }
470
471 parse_error:
472   {
473     g_free (label);
474     g_free (proto);
475     g_set_error (error, GST_WEBRTC_BIN_ERROR,
476         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
477     g_return_val_if_reached (GST_FLOW_ERROR);
478   }
479 }
480
481 static void
482 on_sink_eos (GstAppSink * sink, gpointer user_data)
483 {
484 }
485
486 struct map_info
487 {
488   GstBuffer *buffer;
489   GstMapInfo map_info;
490 };
491
492 static void
493 buffer_unmap_and_unref (struct map_info *info)
494 {
495   gst_buffer_unmap (info->buffer, &info->map_info);
496   gst_buffer_unref (info->buffer);
497   g_free (info);
498 }
499
500 static void
501 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
502 {
503   gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
504       data);
505 }
506
507 static void
508 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
509 {
510   gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
511       str);
512 }
513
514 static GstFlowReturn
515 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
516     GError ** error)
517 {
518   GstSctpReceiveMeta *receive;
519   GstBuffer *buffer;
520   GstFlowReturn ret = GST_FLOW_OK;
521
522   GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
523
524   g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
525
526   buffer = gst_sample_get_buffer (sample);
527   if (!buffer) {
528     g_set_error (error, GST_WEBRTC_BIN_ERROR,
529         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
530     return GST_FLOW_ERROR;
531   }
532   receive = gst_sctp_buffer_get_receive_meta (buffer);
533   if (!receive) {
534     g_set_error (error, GST_WEBRTC_BIN_ERROR,
535         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
536         "No SCTP Receive meta on the buffer");
537     return GST_FLOW_ERROR;
538   }
539
540   switch (receive->ppid) {
541     case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
542       GstMapInfo info = GST_MAP_INFO_INIT;
543       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
544         g_set_error (error, GST_WEBRTC_BIN_ERROR,
545             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
546             "Failed to map received buffer");
547         ret = GST_FLOW_ERROR;
548       } else {
549         ret = _parse_control_packet (channel, info.data, info.size, error);
550         gst_buffer_unmap (buffer, &info);
551       }
552       break;
553     }
554     case DATA_CHANNEL_PPID_WEBRTC_STRING:
555     case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
556       GstMapInfo info = GST_MAP_INFO_INIT;
557       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
558         g_set_error (error, GST_WEBRTC_BIN_ERROR,
559             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
560             "Failed to map received buffer");
561         ret = GST_FLOW_ERROR;
562       } else {
563         gchar *str = g_strndup ((gchar *) info.data, info.size);
564         _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
565             g_free);
566         gst_buffer_unmap (buffer, &info);
567       }
568       break;
569     }
570     case DATA_CHANNEL_PPID_WEBRTC_BINARY:
571     case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
572       struct map_info *info = g_new0 (struct map_info, 1);
573       if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
574         g_set_error (error, GST_WEBRTC_BIN_ERROR,
575             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
576             "Failed to map received buffer");
577         ret = GST_FLOW_ERROR;
578       } else {
579         GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
580             info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
581         info->buffer = gst_buffer_ref (buffer);
582         _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
583             (GDestroyNotify) g_bytes_unref);
584       }
585       break;
586     }
587     case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
588       _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
589           NULL);
590       break;
591     case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
592       _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
593           NULL);
594       break;
595     default:
596       g_set_error (error, GST_WEBRTC_BIN_ERROR,
597           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
598           "Unknown SCTP PPID %u received", receive->ppid);
599       ret = GST_FLOW_ERROR;
600       break;
601   }
602
603   return ret;
604 }
605
606 static GstFlowReturn
607 on_sink_preroll (GstAppSink * sink, gpointer user_data)
608 {
609   WebRTCDataChannel *channel = user_data;
610   GstSample *sample = gst_app_sink_pull_preroll (sink);
611   GstFlowReturn ret;
612
613   if (sample) {
614     /* This sample also seems to be provided by the sample callback
615        ret = _data_channel_have_sample (channel, sample); */
616     ret = GST_FLOW_OK;
617     gst_sample_unref (sample);
618   } else if (gst_app_sink_is_eos (sink)) {
619     ret = GST_FLOW_EOS;
620   } else {
621     ret = GST_FLOW_ERROR;
622   }
623
624   if (ret != GST_FLOW_OK) {
625     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
626   }
627
628   return ret;
629 }
630
631 static GstFlowReturn
632 on_sink_sample (GstAppSink * sink, gpointer user_data)
633 {
634   WebRTCDataChannel *channel = user_data;
635   GstSample *sample = gst_app_sink_pull_sample (sink);
636   GstFlowReturn ret;
637   GError *error = NULL;
638
639   if (sample) {
640     ret = _data_channel_have_sample (channel, sample, &error);
641     gst_sample_unref (sample);
642   } else if (gst_app_sink_is_eos (sink)) {
643     ret = GST_FLOW_EOS;
644   } else {
645     ret = GST_FLOW_ERROR;
646   }
647
648   if (error)
649     _channel_store_error (channel, error);
650
651   if (ret != GST_FLOW_OK) {
652     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
653   }
654
655   return ret;
656 }
657
658 static GstAppSinkCallbacks sink_callbacks = {
659   on_sink_eos,
660   on_sink_preroll,
661   on_sink_sample,
662 };
663
664 void
665 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
666 {
667   GstBuffer *buffer;
668
669   g_return_if_fail (!channel->parent.negotiated);
670   g_return_if_fail (channel->parent.id != -1);
671   g_return_if_fail (channel->sctp_transport != NULL);
672
673   buffer = construct_open_packet (channel);
674
675   GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
676       "label %s protocol %s ordered %s", channel->parent.id,
677       channel->parent.label, channel->parent.protocol,
678       channel->parent.ordered ? "true" : "false");
679
680   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
681   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
682   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
683
684   if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
685           buffer) == GST_FLOW_OK) {
686     channel->opened = TRUE;
687     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
688   } else {
689     GError *error = NULL;
690     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
691         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
692         "Failed to send DCEP open packet");
693     _channel_store_error (channel, error);
694     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
695   }
696 }
697
698 static void
699 _get_sctp_reliability (WebRTCDataChannel * channel,
700     GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
701 {
702   if (channel->parent.max_retransmits != -1) {
703     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
704     *rel_param = channel->parent.max_retransmits;
705   } else if (channel->parent.max_packet_lifetime != -1) {
706     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
707     *rel_param = channel->parent.max_packet_lifetime;
708   } else {
709     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
710     *rel_param = 0;
711   }
712 }
713
714 static gboolean
715 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
716 {
717   return size <= channel->sctp_transport->max_message_size;
718 }
719
720 static void
721 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
722     GBytes * bytes)
723 {
724   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
725   GstSctpSendMetaPartiallyReliability reliability;
726   guint rel_param;
727   guint32 ppid;
728   GstBuffer *buffer;
729   GstFlowReturn ret;
730
731   if (!bytes) {
732     buffer = gst_buffer_new ();
733     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
734   } else {
735     gsize size;
736     guint8 *data;
737
738     data = (guint8 *) g_bytes_get_data (bytes, &size);
739     g_return_if_fail (data != NULL);
740     if (!_is_within_max_message_size (channel, size)) {
741       GError *error = NULL;
742       g_set_error (&error, GST_WEBRTC_BIN_ERROR,
743           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
744           "Requested to send data that is too large");
745       _channel_store_error (channel, error);
746       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
747           NULL);
748       return;
749     }
750
751     buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
752         0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
753     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
754   }
755
756   _get_sctp_reliability (channel, &reliability, &rel_param);
757   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
758       reliability, rel_param);
759
760   GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
761       buffer);
762
763   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
764   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
765   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
766
767   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
768
769   if (ret != GST_FLOW_OK) {
770     GError *error = NULL;
771     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
772         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
773     _channel_store_error (channel, error);
774     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
775   }
776 }
777
778 static void
779 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
780     const gchar * str)
781 {
782   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
783   GstSctpSendMetaPartiallyReliability reliability;
784   guint rel_param;
785   guint32 ppid;
786   GstBuffer *buffer;
787   GstFlowReturn ret;
788
789   if (!channel->parent.negotiated)
790     g_return_if_fail (channel->opened);
791   g_return_if_fail (channel->sctp_transport != NULL);
792
793   if (!str) {
794     buffer = gst_buffer_new ();
795     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
796   } else {
797     gsize size = strlen (str);
798     gchar *str_copy = g_strdup (str);
799
800     if (!_is_within_max_message_size (channel, size)) {
801       GError *error = NULL;
802       g_set_error (&error, GST_WEBRTC_BIN_ERROR,
803           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
804           "Requested to send a string that is too large");
805       _channel_store_error (channel, error);
806       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
807           NULL);
808       return;
809     }
810
811     buffer =
812         gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
813         size, 0, size, str_copy, g_free);
814     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
815   }
816
817   _get_sctp_reliability (channel, &reliability, &rel_param);
818   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
819       reliability, rel_param);
820
821   GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
822       buffer);
823
824   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
825   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
826   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
827
828   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
829
830   if (ret != GST_FLOW_OK) {
831     GError *error = NULL;
832     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
833         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
834     _channel_store_error (channel, error);
835     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
836   }
837 }
838
839 static void
840 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
841     WebRTCDataChannel * channel)
842 {
843   GstWebRTCSCTPTransportState state;
844
845   g_object_get (sctp_transport, "state", &state, NULL);
846   if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
847     if (channel->parent.negotiated)
848       _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
849   }
850 }
851
852 static void
853 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
854     WebRTCDataChannel * channel)
855 {
856   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
857   _on_sctp_notify_state_unlocked (sctp_transport, channel);
858   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
859 }
860
861 static void
862 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
863 {
864   gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
865       (channel));
866 }
867
868 static GstPadProbeReturn
869 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
870 {
871   WebRTCDataChannel *channel = user_data;
872   guint64 prev_amount;
873   guint64 size = 0;
874
875   if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
876     GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
877     size = gst_buffer_get_size (buffer);
878   } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
879     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
880     size = gst_buffer_list_calculate_size (list);
881   }
882
883   if (size > 0) {
884     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
885     prev_amount = channel->parent.buffered_amount;
886     channel->parent.buffered_amount -= size;
887     GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
888         G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
889         G_GUINT64_FORMAT, prev_amount,
890         channel->parent.buffered_amount_low_threshold,
891         channel->parent.buffered_amount);
892     if (prev_amount >= channel->parent.buffered_amount_low_threshold
893         && channel->parent.buffered_amount <
894         channel->parent.buffered_amount_low_threshold) {
895       _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
896           NULL);
897     }
898
899     if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
900         && channel->parent.buffered_amount <= 0) {
901       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
902           NULL);
903     }
904     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
905   }
906
907   return GST_PAD_PROBE_OK;
908 }
909
910 static void
911 gst_webrtc_data_channel_constructed (GObject * object)
912 {
913   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
914   GstPad *pad;
915   GstCaps *caps;
916
917   caps = gst_caps_new_any ();
918
919   channel->appsrc = gst_element_factory_make ("appsrc", NULL);
920   gst_object_ref_sink (channel->appsrc);
921   pad = gst_element_get_static_pad (channel->appsrc, "src");
922
923   channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
924       (GstPadProbeCallback) on_appsrc_data, channel, NULL);
925
926   channel->appsink = gst_element_factory_make ("appsink", NULL);
927   gst_object_ref_sink (channel->appsink);
928   g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
929       NULL);
930   gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
931       channel, NULL);
932
933   gst_object_unref (pad);
934   gst_caps_unref (caps);
935 }
936
937 static void
938 gst_webrtc_data_channel_finalize (GObject * object)
939 {
940   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
941
942   if (channel->src_probe) {
943     GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
944     gst_pad_remove_probe (pad, channel->src_probe);
945     gst_object_unref (pad);
946     channel->src_probe = 0;
947   }
948
949   if (channel->sctp_transport)
950     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
951   g_clear_object (&channel->sctp_transport);
952
953   g_clear_object (&channel->appsrc);
954   g_clear_object (&channel->appsink);
955
956   G_OBJECT_CLASS (parent_class)->finalize (object);
957 }
958
959 static void
960 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
961 {
962   GObjectClass *gobject_class = (GObjectClass *) klass;
963   GstWebRTCDataChannelClass *channel_class =
964       (GstWebRTCDataChannelClass *) klass;
965
966   gobject_class->constructed = gst_webrtc_data_channel_constructed;
967   gobject_class->finalize = gst_webrtc_data_channel_finalize;
968
969   channel_class->send_data = webrtc_data_channel_send_data;
970   channel_class->send_string = webrtc_data_channel_send_string;
971   channel_class->close = webrtc_data_channel_close;
972 }
973
974 static void
975 webrtc_data_channel_init (WebRTCDataChannel * channel)
976 {
977 }
978
979 static void
980 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
981     GstWebRTCSCTPTransport * sctp)
982 {
983   g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
984   g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
985
986   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
987   if (channel->sctp_transport)
988     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
989
990   gst_object_replace ((GstObject **) & channel->sctp_transport,
991       GST_OBJECT (sctp));
992
993   if (sctp) {
994     g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_reset_stream),
995         channel);
996     g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
997         channel);
998     _on_sctp_notify_state_unlocked (G_OBJECT (sctp), channel);
999   }
1000   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
1001 }
1002
1003 void
1004 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1005     GstWebRTCSCTPTransport * sctp_transport)
1006 {
1007   if (sctp_transport && !channel->sctp_transport) {
1008     gint id;
1009
1010     g_object_get (channel, "id", &id, NULL);
1011
1012     if (sctp_transport->association_established && id != -1) {
1013       gchar *pad_name;
1014
1015       _data_channel_set_sctp_transport (channel, sctp_transport);
1016       pad_name = g_strdup_printf ("sink_%u", id);
1017       if (!gst_element_link_pads (channel->appsrc, "src",
1018               channel->sctp_transport->sctpenc, pad_name))
1019         g_warn_if_reached ();
1020       g_free (pad_name);
1021     }
1022   }
1023 }