webrtcdatachannel: Emit 'on-close' signal when calling 'close' action signal
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / ext / webrtc / webrtcdatachannel.c
1 /* GStreamer
2  * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:gstwebrtc-datachannel
22  * @short_description: RTCDataChannel object
23  * @title: GstWebRTCDataChannel
24  * @see_also: #GstWebRTCRTPTransceiver
25  *
26  * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
27  */
28
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
40
41 #include "gstwebrtcbin.h"
42 #include "utils.h"
43
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
46
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49     GST_TYPE_WEBRTC_DATA_CHANNEL,
50     GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51         "webrtcdatachannel"););
52
53 G_LOCK_DEFINE_STATIC (outstanding_channels_lock);
54 static GList *outstanding_channels;
55
56 typedef enum
57 {
58   DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
59   DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
60   DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
61   DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
62   DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
63   DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
64   DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
65 } DataChannelPPID;
66
67 typedef enum
68 {
69   CHANNEL_TYPE_RELIABLE = 0x00,
70   CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
71   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
72   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
73   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
74   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
75 } DataChannelReliabilityType;
76
77 typedef enum
78 {
79   CHANNEL_MESSAGE_ACK = 0x02,
80   CHANNEL_MESSAGE_OPEN = 0x03,
81 } DataChannelMessage;
82
83 static guint16
84 priority_type_to_uint (GstWebRTCPriorityType pri)
85 {
86   switch (pri) {
87     case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
88       return 64;
89     case GST_WEBRTC_PRIORITY_TYPE_LOW:
90       return 192;
91     case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
92       return 384;
93     case GST_WEBRTC_PRIORITY_TYPE_HIGH:
94       return 768;
95   }
96   g_assert_not_reached ();
97   return 0;
98 }
99
100 static GstWebRTCPriorityType
101 priority_uint_to_type (guint16 val)
102 {
103   if (val <= 128)
104     return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
105   if (val <= 256)
106     return GST_WEBRTC_PRIORITY_TYPE_LOW;
107   if (val <= 512)
108     return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
109   return GST_WEBRTC_PRIORITY_TYPE_HIGH;
110 }
111
112 static GstBuffer *
113 construct_open_packet (WebRTCDataChannel * channel)
114 {
115   GstByteWriter w;
116   gsize label_len = strlen (channel->parent.label);
117   gsize proto_len = strlen (channel->parent.protocol);
118   gsize size = 12 + label_len + proto_len;
119   DataChannelReliabilityType reliability = 0;
120   guint32 reliability_param = 0;
121   guint16 priority;
122   GstBuffer *buf;
123
124 /*
125  *    0                   1                   2                   3
126  *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
127  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
128  *   |  Message Type |  Channel Type |            Priority           |
129  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
130  *   |                    Reliability Parameter                      |
131  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
132  *   |         Label Length          |       Protocol Length         |
133  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
134  *   \                                                               /
135  *   |                             Label                             |
136  *   /                                                               \
137  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
138  *   \                                                               /
139  *   |                            Protocol                           |
140  *   /                                                               \
141  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
142  */
143
144   gst_byte_writer_init_with_size (&w, size, FALSE);
145
146   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
147     g_return_val_if_reached (NULL);
148
149   if (!channel->parent.ordered)
150     reliability |= 0x80;
151   if (channel->parent.max_retransmits != -1) {
152     reliability |= 0x01;
153     reliability_param = channel->parent.max_retransmits;
154   }
155   if (channel->parent.max_packet_lifetime != -1) {
156     reliability |= 0x02;
157     reliability_param = channel->parent.max_packet_lifetime;
158   }
159
160   priority = priority_type_to_uint (channel->parent.priority);
161
162   if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
163     g_return_val_if_reached (NULL);
164   if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
165     g_return_val_if_reached (NULL);
166   if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
167     g_return_val_if_reached (NULL);
168   if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
169     g_return_val_if_reached (NULL);
170   if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
171     g_return_val_if_reached (NULL);
172   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
173           label_len))
174     g_return_val_if_reached (NULL);
175   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
176           proto_len))
177     g_return_val_if_reached (NULL);
178
179   buf = gst_byte_writer_reset_and_get_buffer (&w);
180
181   /* send reliable and ordered */
182   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
183       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
184
185   return buf;
186 }
187
188 static GstBuffer *
189 construct_ack_packet (WebRTCDataChannel * channel)
190 {
191   GstByteWriter w;
192   GstBuffer *buf;
193
194 /*
195  *   0                   1                   2                   3
196  *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
197  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
198  *   |  Message Type |
199  *   +-+-+-+-+-+-+-+-+
200  */
201
202   gst_byte_writer_init_with_size (&w, 1, FALSE);
203
204   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
205     g_return_val_if_reached (NULL);
206
207   buf = gst_byte_writer_reset_and_get_buffer (&w);
208
209   /* send reliable and ordered */
210   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
211       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
212
213   return buf;
214 }
215
216 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
217     gpointer user_data);
218
219 struct task
220 {
221   GstWebRTCDataChannel *channel;
222   ChannelTask func;
223   gpointer user_data;
224   GDestroyNotify notify;
225 };
226
227 static GstStructure *
228 _execute_task (GstWebRTCBin * webrtc, struct task *task)
229 {
230   if (task->func)
231     task->func (task->channel, task->user_data);
232
233   return NULL;
234 }
235
236 static void
237 _free_task (struct task *task)
238 {
239   gst_object_unref (task->channel);
240
241   if (task->notify)
242     task->notify (task->user_data);
243   g_free (task);
244 }
245
246 static void
247 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
248     gpointer user_data, GDestroyNotify notify)
249 {
250   struct task *task = g_new0 (struct task, 1);
251
252   task->channel = gst_object_ref (channel);
253   task->func = func;
254   task->user_data = user_data;
255   task->notify = notify;
256
257   gst_webrtc_bin_enqueue_task (channel->webrtcbin,
258       (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
259       NULL);
260 }
261
262 static void
263 _channel_store_error (WebRTCDataChannel * channel, GError * error)
264 {
265   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
266   if (error) {
267     GST_WARNING_OBJECT (channel, "Error: %s",
268         error ? error->message : "Unknown");
269     if (!channel->stored_error)
270       channel->stored_error = error;
271     else
272       g_clear_error (&error);
273   }
274   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
275 }
276
277 static void
278 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
279 {
280   gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
281 }
282
283 static void
284 _transport_closed (WebRTCDataChannel * channel)
285 {
286   GError *error;
287 #ifndef TIZEN_FEATURE_WEBRTC_MODIFICATION
288   gboolean both_sides_closed;
289 #endif
290
291   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
292   error = channel->stored_error;
293   channel->stored_error = NULL;
294
295 #ifndef TIZEN_FEATURE_WEBRTC_MODIFICATION
296   both_sides_closed =
297       channel->peer_closed && channel->parent.buffered_amount <= 0;
298   if (both_sides_closed || error) {
299     channel->peer_closed = FALSE;
300   }
301 #endif
302   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
303
304   if (error) {
305     gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
306     g_clear_error (&error);
307   }
308 #ifdef TIZEN_FEATURE_WEBRTC_MODIFICATION
309   if (channel->parent.buffered_amount <= 0 || error) {
310 #else
311   if (both_sides_closed || error) {
312 #endif
313     gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
314   }
315 }
316
317 static void
318 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
319 {
320   GstPad *pad, *peer;
321
322   GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
323       channel->parent.id, channel->parent.label);
324
325   pad = gst_element_get_static_pad (channel->appsrc, "src");
326   peer = gst_pad_get_peer (pad);
327   gst_object_unref (pad);
328
329   if (peer) {
330     GstElement *sctpenc = gst_pad_get_parent_element (peer);
331
332     if (sctpenc) {
333       gst_element_release_request_pad (sctpenc, peer);
334       gst_object_unref (sctpenc);
335     }
336     gst_object_unref (peer);
337   }
338
339   _transport_closed (channel);
340 }
341
342 static void
343 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
344 {
345   /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
346   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
347   if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
348     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
349     return;
350   } else if (channel->parent.ready_state ==
351       GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
352     _channel_enqueue_task (channel, (ChannelTask) _transport_closed, NULL,
353         NULL);
354   } else if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
355 #ifdef TIZEN_FEATURE_WEBRTC_MODIFICATION
356     channel->parent.prev_ready_state = channel->parent.ready_state;
357 #endif
358     channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
359     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
360     g_object_notify (G_OBJECT (channel), "ready-state");
361
362     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
363     if (channel->parent.buffered_amount <= 0) {
364       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
365           NULL, NULL);
366     }
367   }
368
369   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
370 }
371
372 static void
373 _on_sctp_stream_reset (WebRTCSCTPTransport * sctp, guint stream_id,
374     WebRTCDataChannel * channel)
375 {
376   if (channel->parent.id == stream_id) {
377     GST_INFO_OBJECT (channel,
378         "Received channel close for SCTP stream %i label \"%s\"",
379         channel->parent.id, channel->parent.label);
380
381 #ifndef TIZEN_FEATURE_WEBRTC_MODIFICATION
382     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
383     channel->peer_closed = TRUE;
384     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
385
386 #endif
387     _channel_enqueue_task (channel, (ChannelTask) _close_procedure,
388         GUINT_TO_POINTER (stream_id), NULL);
389   }
390 }
391
392 static void
393 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
394 {
395   _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
396 }
397
398 static GstFlowReturn
399 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
400     gsize size, GError ** error)
401 {
402   GstByteReader r;
403   guint8 message_type;
404   gchar *label = NULL;
405   gchar *proto = NULL;
406
407   if (!data)
408     g_return_val_if_reached (GST_FLOW_ERROR);
409   if (size < 1)
410     g_return_val_if_reached (GST_FLOW_ERROR);
411
412   gst_byte_reader_init (&r, data, size);
413
414   if (!gst_byte_reader_get_uint8 (&r, &message_type))
415     g_return_val_if_reached (GST_FLOW_ERROR);
416
417   if (message_type == CHANNEL_MESSAGE_ACK) {
418     /* all good */
419     GST_INFO_OBJECT (channel, "Received channel ack");
420     return GST_FLOW_OK;
421   } else if (message_type == CHANNEL_MESSAGE_OPEN) {
422     guint8 reliability;
423     guint32 reliability_param;
424     guint16 priority, label_len, proto_len;
425     const guint8 *src;
426     GstBuffer *buffer;
427     GstFlowReturn ret;
428
429     GST_INFO_OBJECT (channel, "Received channel open");
430
431     if (channel->parent.negotiated) {
432       g_set_error (error, GST_WEBRTC_ERROR,
433           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
434           "Data channel was signalled as negotiated already");
435       g_return_val_if_reached (GST_FLOW_ERROR);
436     }
437
438     if (channel->opened)
439       return GST_FLOW_OK;
440
441     if (!gst_byte_reader_get_uint8 (&r, &reliability))
442       goto parse_error;
443     if (!gst_byte_reader_get_uint16_be (&r, &priority))
444       goto parse_error;
445     if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
446       goto parse_error;
447     if (!gst_byte_reader_get_uint16_be (&r, &label_len))
448       goto parse_error;
449     if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
450       goto parse_error;
451
452     label = g_new0 (gchar, (gsize) label_len + 1);
453     proto = g_new0 (gchar, (gsize) proto_len + 1);
454
455     if (!gst_byte_reader_get_data (&r, label_len, &src))
456       goto parse_error;
457     memcpy (label, src, label_len);
458     label[label_len] = '\0';
459     if (!gst_byte_reader_get_data (&r, proto_len, &src))
460       goto parse_error;
461     memcpy (proto, src, proto_len);
462     proto[proto_len] = '\0';
463
464     g_free (channel->parent.label);
465     channel->parent.label = label;
466     g_free (channel->parent.protocol);
467     channel->parent.protocol = proto;
468     channel->parent.priority = priority_uint_to_type (priority);
469     channel->parent.ordered = !(reliability & 0x80);
470     if (reliability & 0x01) {
471       channel->parent.max_retransmits = reliability_param;
472       channel->parent.max_packet_lifetime = -1;
473     } else if (reliability & 0x02) {
474       channel->parent.max_retransmits = -1;
475       channel->parent.max_packet_lifetime = reliability_param;
476     } else {
477       channel->parent.max_retransmits = -1;
478       channel->parent.max_packet_lifetime = -1;
479     }
480     channel->opened = TRUE;
481
482     GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
483         "label \"%s\" protocol %s ordered %s", channel->parent.id,
484         channel->parent.label, channel->parent.protocol,
485         channel->parent.ordered ? "true" : "false");
486
487     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
488
489     GST_INFO_OBJECT (channel, "Sending channel ack");
490     buffer = construct_ack_packet (channel);
491
492     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
493     channel->parent.buffered_amount += gst_buffer_get_size (buffer);
494     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
495
496     ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
497     if (ret != GST_FLOW_OK) {
498       g_set_error (error, GST_WEBRTC_ERROR,
499           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
500       return ret;
501     }
502
503     return ret;
504   } else {
505     g_set_error (error, GST_WEBRTC_ERROR,
506         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
507         "Unknown message type in control protocol");
508     return GST_FLOW_ERROR;
509   }
510
511 parse_error:
512   {
513     g_free (label);
514     g_free (proto);
515     g_set_error (error, GST_WEBRTC_ERROR,
516         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
517     g_return_val_if_reached (GST_FLOW_ERROR);
518   }
519 }
520
521 static void
522 on_sink_eos (GstAppSink * sink, gpointer user_data)
523 {
524 }
525
526 struct map_info
527 {
528   GstBuffer *buffer;
529   GstMapInfo map_info;
530 };
531
532 static void
533 buffer_unmap_and_unref (struct map_info *info)
534 {
535   gst_buffer_unmap (info->buffer, &info->map_info);
536   gst_buffer_unref (info->buffer);
537   g_free (info);
538 }
539
540 static void
541 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
542 {
543   gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
544       data);
545 }
546
547 static void
548 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
549 {
550   gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
551       str);
552 }
553
554 static GstFlowReturn
555 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
556     GError ** error)
557 {
558   GstSctpReceiveMeta *receive;
559   GstBuffer *buffer;
560   GstFlowReturn ret = GST_FLOW_OK;
561
562   GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
563
564   g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
565
566   buffer = gst_sample_get_buffer (sample);
567   if (!buffer) {
568     g_set_error (error, GST_WEBRTC_ERROR,
569         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
570     return GST_FLOW_ERROR;
571   }
572   receive = gst_sctp_buffer_get_receive_meta (buffer);
573   if (!receive) {
574     g_set_error (error, GST_WEBRTC_ERROR,
575         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
576         "No SCTP Receive meta on the buffer");
577     return GST_FLOW_ERROR;
578   }
579
580   switch (receive->ppid) {
581     case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
582       GstMapInfo info = GST_MAP_INFO_INIT;
583       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
584         g_set_error (error, GST_WEBRTC_ERROR,
585             GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
586             "Failed to map received buffer");
587         ret = GST_FLOW_ERROR;
588       } else {
589         ret = _parse_control_packet (channel, info.data, info.size, error);
590         gst_buffer_unmap (buffer, &info);
591       }
592       break;
593     }
594     case DATA_CHANNEL_PPID_WEBRTC_STRING:
595     case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
596       GstMapInfo info = GST_MAP_INFO_INIT;
597       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
598         g_set_error (error, GST_WEBRTC_ERROR,
599             GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
600             "Failed to map received buffer");
601         ret = GST_FLOW_ERROR;
602       } else {
603         gchar *str = g_strndup ((gchar *) info.data, info.size);
604         _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
605             g_free);
606         gst_buffer_unmap (buffer, &info);
607       }
608       break;
609     }
610     case DATA_CHANNEL_PPID_WEBRTC_BINARY:
611     case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
612       struct map_info *info = g_new0 (struct map_info, 1);
613       if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
614         g_set_error (error, GST_WEBRTC_ERROR,
615             GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
616             "Failed to map received buffer");
617         ret = GST_FLOW_ERROR;
618       } else {
619         GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
620             info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
621         info->buffer = gst_buffer_ref (buffer);
622         _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
623             (GDestroyNotify) g_bytes_unref);
624       }
625       break;
626     }
627     case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
628       _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
629           NULL);
630       break;
631     case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
632       _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
633           NULL);
634       break;
635     default:
636       g_set_error (error, GST_WEBRTC_ERROR,
637           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
638           "Unknown SCTP PPID %u received", receive->ppid);
639       ret = GST_FLOW_ERROR;
640       break;
641   }
642
643   return ret;
644 }
645
646 static GstFlowReturn
647 on_sink_preroll (GstAppSink * sink, gpointer user_data)
648 {
649   WebRTCDataChannel *channel = user_data;
650   GstSample *sample = gst_app_sink_pull_preroll (sink);
651   GstFlowReturn ret;
652
653   if (sample) {
654     /* This sample also seems to be provided by the sample callback
655        ret = _data_channel_have_sample (channel, sample); */
656     ret = GST_FLOW_OK;
657     gst_sample_unref (sample);
658   } else if (gst_app_sink_is_eos (sink)) {
659     ret = GST_FLOW_EOS;
660   } else {
661     ret = GST_FLOW_ERROR;
662   }
663
664   if (ret != GST_FLOW_OK) {
665     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
666   }
667
668   return ret;
669 }
670
671 static GstFlowReturn
672 on_sink_sample (GstAppSink * sink, gpointer user_data)
673 {
674   WebRTCDataChannel *channel = user_data;
675   GstSample *sample = gst_app_sink_pull_sample (sink);
676   GstFlowReturn ret;
677   GError *error = NULL;
678
679   if (sample) {
680     ret = _data_channel_have_sample (channel, sample, &error);
681     gst_sample_unref (sample);
682   } else if (gst_app_sink_is_eos (sink)) {
683     ret = GST_FLOW_EOS;
684   } else {
685     ret = GST_FLOW_ERROR;
686   }
687
688   if (error)
689     _channel_store_error (channel, error);
690
691   if (ret != GST_FLOW_OK) {
692     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
693   }
694
695   return ret;
696 }
697
698 static GstAppSinkCallbacks sink_callbacks = {
699   on_sink_eos,
700   on_sink_preroll,
701   on_sink_sample,
702 };
703
704 void
705 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
706 {
707   GstBuffer *buffer;
708
709   g_return_if_fail (!channel->parent.negotiated);
710   g_return_if_fail (channel->parent.id != -1);
711   g_return_if_fail (channel->sctp_transport != NULL);
712
713   buffer = construct_open_packet (channel);
714
715   GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
716       "label \"%s\" protocol %s ordered %s", channel->parent.id,
717       channel->parent.label, channel->parent.protocol,
718       channel->parent.ordered ? "true" : "false");
719
720   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
721   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
722   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
723   g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
724
725   if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
726           buffer) == GST_FLOW_OK) {
727     channel->opened = TRUE;
728 #ifdef TIZEN_FEATURE_WEBRTC_MODIFICATION
729     channel->webrtcbin->priv->data_channels_opened++;
730 #endif
731     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
732   } else {
733     GError *error = NULL;
734     g_set_error (&error, GST_WEBRTC_ERROR,
735         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
736         "Failed to send DCEP open packet");
737     _channel_store_error (channel, error);
738     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
739   }
740 }
741
742 static void
743 _get_sctp_reliability (WebRTCDataChannel * channel,
744     GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
745 {
746   if (channel->parent.max_retransmits != -1) {
747     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
748     *rel_param = channel->parent.max_retransmits;
749   } else if (channel->parent.max_packet_lifetime != -1) {
750     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
751     *rel_param = channel->parent.max_packet_lifetime;
752   } else {
753     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
754     *rel_param = 0;
755   }
756 }
757
758 static gboolean
759 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
760 {
761   return size <= channel->sctp_transport->max_message_size;
762 }
763
764 static void
765 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
766     GBytes * bytes)
767 {
768   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
769   GstSctpSendMetaPartiallyReliability reliability;
770   guint rel_param;
771   guint32 ppid;
772   GstBuffer *buffer;
773   GstFlowReturn ret;
774
775   if (!bytes) {
776     buffer = gst_buffer_new ();
777     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
778   } else {
779     gsize size;
780     guint8 *data;
781
782     data = (guint8 *) g_bytes_get_data (bytes, &size);
783     g_return_if_fail (data != NULL);
784     if (!_is_within_max_message_size (channel, size)) {
785       GError *error = NULL;
786       g_set_error (&error, GST_WEBRTC_ERROR,
787           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
788           "Requested to send data that is too large");
789       _channel_store_error (channel, error);
790       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
791           NULL);
792       return;
793     }
794
795     buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
796         0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
797     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
798   }
799
800   _get_sctp_reliability (channel, &reliability, &rel_param);
801   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
802       reliability, rel_param);
803
804   GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
805       buffer);
806
807   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
808   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
809   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
810   g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
811
812   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
813
814   if (ret != GST_FLOW_OK) {
815     GError *error = NULL;
816     g_set_error (&error, GST_WEBRTC_ERROR,
817         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
818     _channel_store_error (channel, error);
819     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
820   }
821 }
822
823 static void
824 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
825     const gchar * str)
826 {
827   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
828   GstSctpSendMetaPartiallyReliability reliability;
829   guint rel_param;
830   guint32 ppid;
831   GstBuffer *buffer;
832   GstFlowReturn ret;
833
834   if (!channel->parent.negotiated)
835     g_return_if_fail (channel->opened);
836   g_return_if_fail (channel->sctp_transport != NULL);
837
838   if (!str) {
839     buffer = gst_buffer_new ();
840     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
841   } else {
842     gsize size = strlen (str);
843     gchar *str_copy;
844
845     if (!_is_within_max_message_size (channel, size)) {
846       GError *error = NULL;
847       g_set_error (&error, GST_WEBRTC_ERROR,
848           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
849           "Requested to send a string that is too large");
850       _channel_store_error (channel, error);
851       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
852           NULL);
853       return;
854     }
855
856     str_copy = g_strdup (str);
857     buffer =
858         gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
859         size, 0, size, str_copy, g_free);
860     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
861   }
862
863   _get_sctp_reliability (channel, &reliability, &rel_param);
864   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
865       reliability, rel_param);
866
867   GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
868       buffer);
869
870   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
871   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
872   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
873   g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
874
875   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
876
877   if (ret != GST_FLOW_OK) {
878     GError *error = NULL;
879     g_set_error (&error, GST_WEBRTC_ERROR,
880         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
881     _channel_store_error (channel, error);
882     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
883   }
884 }
885
886 static void
887 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
888     WebRTCDataChannel * channel)
889 {
890   GstWebRTCSCTPTransportState state;
891
892   g_object_get (sctp_transport, "state", &state, NULL);
893   if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
894     if (channel->parent.negotiated)
895       _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
896   }
897 }
898
899 static WebRTCDataChannel *
900 ensure_channel_alive (WebRTCDataChannel * channel)
901 {
902   /* ghetto impl of, does the channel still exist?.
903    * Needed because g_signal_handler_disconnect*() will not disconnect any
904    * running functions and _finalize() implementation can complete and
905    * invalidate channel */
906   G_LOCK (outstanding_channels_lock);
907   if (g_list_find (outstanding_channels, channel)) {
908     g_object_ref (channel);
909   } else {
910     G_UNLOCK (outstanding_channels_lock);
911     return NULL;
912   }
913   G_UNLOCK (outstanding_channels_lock);
914
915   return channel;
916 }
917
918 static void
919 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
920     WebRTCDataChannel * channel)
921 {
922   if (!(channel = ensure_channel_alive (channel)))
923     return;
924
925   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
926   _on_sctp_notify_state_unlocked (sctp_transport, channel);
927   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
928
929   g_object_unref (channel);
930 }
931
932 static void
933 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
934 {
935   gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
936       (channel));
937 }
938
939 static GstPadProbeReturn
940 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
941 {
942   WebRTCDataChannel *channel = user_data;
943   guint64 prev_amount;
944   guint64 size = 0;
945
946   if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
947     GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
948     size = gst_buffer_get_size (buffer);
949   } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
950     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
951     size = gst_buffer_list_calculate_size (list);
952   }
953
954   if (size > 0) {
955     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
956     prev_amount = channel->parent.buffered_amount;
957     channel->parent.buffered_amount -= size;
958     GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
959         G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
960         G_GUINT64_FORMAT, prev_amount,
961         channel->parent.buffered_amount_low_threshold,
962         channel->parent.buffered_amount);
963     if (prev_amount >= channel->parent.buffered_amount_low_threshold
964         && channel->parent.buffered_amount <
965         channel->parent.buffered_amount_low_threshold) {
966       _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
967           NULL);
968     }
969
970     if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
971         && channel->parent.buffered_amount <= 0) {
972       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
973           NULL);
974     }
975     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
976     g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
977   }
978
979   return GST_PAD_PROBE_OK;
980 }
981
982 static void
983 gst_webrtc_data_channel_constructed (GObject * object)
984 {
985   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
986   GstPad *pad;
987   GstCaps *caps;
988
989   caps = gst_caps_new_any ();
990
991   channel->appsrc = gst_element_factory_make ("appsrc", NULL);
992   gst_object_ref_sink (channel->appsrc);
993   pad = gst_element_get_static_pad (channel->appsrc, "src");
994
995   channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
996       (GstPadProbeCallback) on_appsrc_data, channel, NULL);
997
998   channel->appsink = gst_element_factory_make ("appsink", NULL);
999   gst_object_ref_sink (channel->appsink);
1000   g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
1001       NULL);
1002   gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
1003       channel, NULL);
1004
1005   gst_object_unref (pad);
1006   gst_caps_unref (caps);
1007 }
1008
1009 static void
1010 gst_webrtc_data_channel_dispose (GObject * object)
1011 {
1012   G_LOCK (outstanding_channels_lock);
1013   outstanding_channels = g_list_remove (outstanding_channels, object);
1014   G_UNLOCK (outstanding_channels_lock);
1015
1016   G_OBJECT_CLASS (parent_class)->dispose (object);
1017 }
1018
1019 static void
1020 gst_webrtc_data_channel_finalize (GObject * object)
1021 {
1022   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
1023
1024   if (channel->src_probe) {
1025     GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
1026     gst_pad_remove_probe (pad, channel->src_probe);
1027     gst_object_unref (pad);
1028     channel->src_probe = 0;
1029   }
1030
1031   if (channel->sctp_transport)
1032     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1033   g_clear_object (&channel->sctp_transport);
1034
1035   g_clear_object (&channel->appsrc);
1036   g_clear_object (&channel->appsink);
1037
1038   G_OBJECT_CLASS (parent_class)->finalize (object);
1039 }
1040
1041 static void
1042 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
1043 {
1044   GObjectClass *gobject_class = (GObjectClass *) klass;
1045   GstWebRTCDataChannelClass *channel_class =
1046       (GstWebRTCDataChannelClass *) klass;
1047
1048   gobject_class->constructed = gst_webrtc_data_channel_constructed;
1049   gobject_class->dispose = gst_webrtc_data_channel_dispose;
1050   gobject_class->finalize = gst_webrtc_data_channel_finalize;
1051
1052   channel_class->send_data = webrtc_data_channel_send_data;
1053   channel_class->send_string = webrtc_data_channel_send_string;
1054   channel_class->close = webrtc_data_channel_close;
1055 }
1056
1057 static void
1058 webrtc_data_channel_init (WebRTCDataChannel * channel)
1059 {
1060   G_LOCK (outstanding_channels_lock);
1061   outstanding_channels = g_list_prepend (outstanding_channels, channel);
1062   G_UNLOCK (outstanding_channels_lock);
1063 }
1064
1065 static void
1066 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
1067     WebRTCSCTPTransport * sctp)
1068 {
1069   g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
1070   g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
1071
1072   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
1073   if (channel->sctp_transport)
1074     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1075   GST_TRACE ("%p set sctp %p", channel, sctp);
1076
1077   gst_object_replace ((GstObject **) & channel->sctp_transport,
1078       GST_OBJECT (sctp));
1079
1080   if (sctp) {
1081     g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_stream_reset),
1082         channel);
1083     g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
1084         channel);
1085   }
1086   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
1087 }
1088
1089 void
1090 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1091     WebRTCSCTPTransport * sctp_transport)
1092 {
1093   if (sctp_transport && !channel->sctp_transport) {
1094     gint id;
1095
1096     g_object_get (channel, "id", &id, NULL);
1097
1098     if (sctp_transport->association_established && id != -1) {
1099       gchar *pad_name;
1100
1101       _data_channel_set_sctp_transport (channel, sctp_transport);
1102       pad_name = g_strdup_printf ("sink_%u", id);
1103       if (!gst_element_link_pads (channel->appsrc, "src",
1104               channel->sctp_transport->sctpenc, pad_name))
1105         g_warn_if_reached ();
1106       g_free (pad_name);
1107
1108       _on_sctp_notify_state_unlocked (G_OBJECT (sctp_transport), channel);
1109     }
1110   }
1111 }