2 * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:gstwebrtc-datachannel
22 * @short_description: RTCDataChannel object
23 * @title: GstWebRTCDataChannel
24 * @see_also: #GstWebRTCRTPTransceiver
26 * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
41 #include "gstwebrtcbin.h"
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49 GST_TYPE_WEBRTC_DATA_CHANNEL,
50 GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51 "webrtcdatachannel"););
53 G_LOCK_DEFINE_STATIC (outstanding_channels_lock);
54 static GList *outstanding_channels;
58 DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
59 DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
60 DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
61 DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
62 DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
63 DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
64 DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
69 CHANNEL_TYPE_RELIABLE = 0x00,
70 CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
71 CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
72 CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
73 CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
74 CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
75 } DataChannelReliabilityType;
79 CHANNEL_MESSAGE_ACK = 0x02,
80 CHANNEL_MESSAGE_OPEN = 0x03,
84 priority_type_to_uint (GstWebRTCPriorityType pri)
87 case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
89 case GST_WEBRTC_PRIORITY_TYPE_LOW:
91 case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
93 case GST_WEBRTC_PRIORITY_TYPE_HIGH:
96 g_assert_not_reached ();
100 static GstWebRTCPriorityType
101 priority_uint_to_type (guint16 val)
104 return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
106 return GST_WEBRTC_PRIORITY_TYPE_LOW;
108 return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
109 return GST_WEBRTC_PRIORITY_TYPE_HIGH;
113 construct_open_packet (WebRTCDataChannel * channel)
116 gsize label_len = strlen (channel->parent.label);
117 gsize proto_len = strlen (channel->parent.protocol);
118 gsize size = 12 + label_len + proto_len;
119 DataChannelReliabilityType reliability = 0;
120 guint32 reliability_param = 0;
126 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
127 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
128 * | Message Type | Channel Type | Priority |
129 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
130 * | Reliability Parameter |
131 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
132 * | Label Length | Protocol Length |
133 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
137 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
141 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
144 gst_byte_writer_init_with_size (&w, size, FALSE);
146 if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
147 g_return_val_if_reached (NULL);
149 if (!channel->parent.ordered)
151 if (channel->parent.max_retransmits != -1) {
153 reliability_param = channel->parent.max_retransmits;
155 if (channel->parent.max_packet_lifetime != -1) {
157 reliability_param = channel->parent.max_packet_lifetime;
160 priority = priority_type_to_uint (channel->parent.priority);
162 if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
163 g_return_val_if_reached (NULL);
164 if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
165 g_return_val_if_reached (NULL);
166 if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
167 g_return_val_if_reached (NULL);
168 if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
169 g_return_val_if_reached (NULL);
170 if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
171 g_return_val_if_reached (NULL);
172 if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
174 g_return_val_if_reached (NULL);
175 if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
177 g_return_val_if_reached (NULL);
179 buf = gst_byte_writer_reset_and_get_buffer (&w);
181 /* send reliable and ordered */
182 gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
183 GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
189 construct_ack_packet (WebRTCDataChannel * channel)
196 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
197 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
202 gst_byte_writer_init_with_size (&w, 1, FALSE);
204 if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
205 g_return_val_if_reached (NULL);
207 buf = gst_byte_writer_reset_and_get_buffer (&w);
209 /* send reliable and ordered */
210 gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
211 GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
216 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
221 GstWebRTCDataChannel *channel;
224 GDestroyNotify notify;
227 static GstStructure *
228 _execute_task (GstWebRTCBin * webrtc, struct task *task)
231 task->func (task->channel, task->user_data);
237 _free_task (struct task *task)
239 gst_object_unref (task->channel);
242 task->notify (task->user_data);
247 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
248 gpointer user_data, GDestroyNotify notify)
250 struct task *task = g_new0 (struct task, 1);
252 task->channel = gst_object_ref (channel);
254 task->user_data = user_data;
255 task->notify = notify;
257 gst_webrtc_bin_enqueue_task (channel->webrtcbin,
258 (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
263 _channel_store_error (WebRTCDataChannel * channel, GError * error)
265 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
267 GST_WARNING_OBJECT (channel, "Error: %s",
268 error ? error->message : "Unknown");
269 if (!channel->stored_error)
270 channel->stored_error = error;
272 g_clear_error (&error);
274 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
278 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
280 gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
284 _transport_closed (WebRTCDataChannel * channel)
287 #ifndef TIZEN_FEATURE_WEBRTC_MODIFICATION
288 gboolean both_sides_closed;
291 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
292 error = channel->stored_error;
293 channel->stored_error = NULL;
295 #ifndef TIZEN_FEATURE_WEBRTC_MODIFICATION
297 channel->peer_closed && channel->parent.buffered_amount <= 0;
298 if (both_sides_closed || error) {
299 channel->peer_closed = FALSE;
302 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
305 gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
306 g_clear_error (&error);
308 #ifdef TIZEN_FEATURE_WEBRTC_MODIFICATION
309 if (channel->parent.buffered_amount <= 0 || error) {
311 if (both_sides_closed || error) {
313 gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
318 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
322 GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
323 channel->parent.id, channel->parent.label);
325 pad = gst_element_get_static_pad (channel->appsrc, "src");
326 peer = gst_pad_get_peer (pad);
327 gst_object_unref (pad);
330 GstElement *sctpenc = gst_pad_get_parent_element (peer);
333 gst_element_release_request_pad (sctpenc, peer);
334 gst_object_unref (sctpenc);
336 gst_object_unref (peer);
339 _transport_closed (channel);
343 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
345 /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
346 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
347 if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
348 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
350 } else if (channel->parent.ready_state ==
351 GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
352 _channel_enqueue_task (channel, (ChannelTask) _transport_closed, NULL,
354 } else if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
355 #ifdef TIZEN_FEATURE_WEBRTC_MODIFICATION
356 channel->parent.prev_ready_state = channel->parent.ready_state;
358 channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
359 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
360 g_object_notify (G_OBJECT (channel), "ready-state");
362 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
363 if (channel->parent.buffered_amount <= 0) {
364 _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
369 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
373 _on_sctp_stream_reset (WebRTCSCTPTransport * sctp, guint stream_id,
374 WebRTCDataChannel * channel)
376 if (channel->parent.id == stream_id) {
377 GST_INFO_OBJECT (channel,
378 "Received channel close for SCTP stream %i label \"%s\"",
379 channel->parent.id, channel->parent.label);
381 #ifndef TIZEN_FEATURE_WEBRTC_MODIFICATION
382 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
383 channel->peer_closed = TRUE;
384 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
387 _channel_enqueue_task (channel, (ChannelTask) _close_procedure,
388 GUINT_TO_POINTER (stream_id), NULL);
393 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
395 _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
399 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
400 gsize size, GError ** error)
408 g_return_val_if_reached (GST_FLOW_ERROR);
410 g_return_val_if_reached (GST_FLOW_ERROR);
412 gst_byte_reader_init (&r, data, size);
414 if (!gst_byte_reader_get_uint8 (&r, &message_type))
415 g_return_val_if_reached (GST_FLOW_ERROR);
417 if (message_type == CHANNEL_MESSAGE_ACK) {
419 GST_INFO_OBJECT (channel, "Received channel ack");
421 } else if (message_type == CHANNEL_MESSAGE_OPEN) {
423 guint32 reliability_param;
424 guint16 priority, label_len, proto_len;
429 GST_INFO_OBJECT (channel, "Received channel open");
431 if (channel->parent.negotiated) {
432 g_set_error (error, GST_WEBRTC_ERROR,
433 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
434 "Data channel was signalled as negotiated already");
435 g_return_val_if_reached (GST_FLOW_ERROR);
441 if (!gst_byte_reader_get_uint8 (&r, &reliability))
443 if (!gst_byte_reader_get_uint16_be (&r, &priority))
445 if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
447 if (!gst_byte_reader_get_uint16_be (&r, &label_len))
449 if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
452 label = g_new0 (gchar, (gsize) label_len + 1);
453 proto = g_new0 (gchar, (gsize) proto_len + 1);
455 if (!gst_byte_reader_get_data (&r, label_len, &src))
457 memcpy (label, src, label_len);
458 label[label_len] = '\0';
459 if (!gst_byte_reader_get_data (&r, proto_len, &src))
461 memcpy (proto, src, proto_len);
462 proto[proto_len] = '\0';
464 g_free (channel->parent.label);
465 channel->parent.label = label;
466 g_free (channel->parent.protocol);
467 channel->parent.protocol = proto;
468 channel->parent.priority = priority_uint_to_type (priority);
469 channel->parent.ordered = !(reliability & 0x80);
470 if (reliability & 0x01) {
471 channel->parent.max_retransmits = reliability_param;
472 channel->parent.max_packet_lifetime = -1;
473 } else if (reliability & 0x02) {
474 channel->parent.max_retransmits = -1;
475 channel->parent.max_packet_lifetime = reliability_param;
477 channel->parent.max_retransmits = -1;
478 channel->parent.max_packet_lifetime = -1;
480 channel->opened = TRUE;
482 GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
483 "label \"%s\" protocol %s ordered %s", channel->parent.id,
484 channel->parent.label, channel->parent.protocol,
485 channel->parent.ordered ? "true" : "false");
487 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
489 GST_INFO_OBJECT (channel, "Sending channel ack");
490 buffer = construct_ack_packet (channel);
492 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
493 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
494 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
496 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
497 if (ret != GST_FLOW_OK) {
498 g_set_error (error, GST_WEBRTC_ERROR,
499 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
505 g_set_error (error, GST_WEBRTC_ERROR,
506 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
507 "Unknown message type in control protocol");
508 return GST_FLOW_ERROR;
515 g_set_error (error, GST_WEBRTC_ERROR,
516 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
517 g_return_val_if_reached (GST_FLOW_ERROR);
522 on_sink_eos (GstAppSink * sink, gpointer user_data)
533 buffer_unmap_and_unref (struct map_info *info)
535 gst_buffer_unmap (info->buffer, &info->map_info);
536 gst_buffer_unref (info->buffer);
541 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
543 gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
548 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
550 gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
555 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
558 GstSctpReceiveMeta *receive;
560 GstFlowReturn ret = GST_FLOW_OK;
562 GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
564 g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
566 buffer = gst_sample_get_buffer (sample);
568 g_set_error (error, GST_WEBRTC_ERROR,
569 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
570 return GST_FLOW_ERROR;
572 receive = gst_sctp_buffer_get_receive_meta (buffer);
574 g_set_error (error, GST_WEBRTC_ERROR,
575 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
576 "No SCTP Receive meta on the buffer");
577 return GST_FLOW_ERROR;
580 switch (receive->ppid) {
581 case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
582 GstMapInfo info = GST_MAP_INFO_INIT;
583 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
584 g_set_error (error, GST_WEBRTC_ERROR,
585 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
586 "Failed to map received buffer");
587 ret = GST_FLOW_ERROR;
589 ret = _parse_control_packet (channel, info.data, info.size, error);
590 gst_buffer_unmap (buffer, &info);
594 case DATA_CHANNEL_PPID_WEBRTC_STRING:
595 case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
596 GstMapInfo info = GST_MAP_INFO_INIT;
597 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
598 g_set_error (error, GST_WEBRTC_ERROR,
599 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
600 "Failed to map received buffer");
601 ret = GST_FLOW_ERROR;
603 gchar *str = g_strndup ((gchar *) info.data, info.size);
604 _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
606 gst_buffer_unmap (buffer, &info);
610 case DATA_CHANNEL_PPID_WEBRTC_BINARY:
611 case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
612 struct map_info *info = g_new0 (struct map_info, 1);
613 if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
614 g_set_error (error, GST_WEBRTC_ERROR,
615 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
616 "Failed to map received buffer");
617 ret = GST_FLOW_ERROR;
619 GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
620 info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
621 info->buffer = gst_buffer_ref (buffer);
622 _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
623 (GDestroyNotify) g_bytes_unref);
627 case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
628 _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
631 case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
632 _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
636 g_set_error (error, GST_WEBRTC_ERROR,
637 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
638 "Unknown SCTP PPID %u received", receive->ppid);
639 ret = GST_FLOW_ERROR;
647 on_sink_preroll (GstAppSink * sink, gpointer user_data)
649 WebRTCDataChannel *channel = user_data;
650 GstSample *sample = gst_app_sink_pull_preroll (sink);
654 /* This sample also seems to be provided by the sample callback
655 ret = _data_channel_have_sample (channel, sample); */
657 gst_sample_unref (sample);
658 } else if (gst_app_sink_is_eos (sink)) {
661 ret = GST_FLOW_ERROR;
664 if (ret != GST_FLOW_OK) {
665 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
672 on_sink_sample (GstAppSink * sink, gpointer user_data)
674 WebRTCDataChannel *channel = user_data;
675 GstSample *sample = gst_app_sink_pull_sample (sink);
677 GError *error = NULL;
680 ret = _data_channel_have_sample (channel, sample, &error);
681 gst_sample_unref (sample);
682 } else if (gst_app_sink_is_eos (sink)) {
685 ret = GST_FLOW_ERROR;
689 _channel_store_error (channel, error);
691 if (ret != GST_FLOW_OK) {
692 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
698 static GstAppSinkCallbacks sink_callbacks = {
705 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
709 g_return_if_fail (!channel->parent.negotiated);
710 g_return_if_fail (channel->parent.id != -1);
711 g_return_if_fail (channel->sctp_transport != NULL);
713 buffer = construct_open_packet (channel);
715 GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
716 "label \"%s\" protocol %s ordered %s", channel->parent.id,
717 channel->parent.label, channel->parent.protocol,
718 channel->parent.ordered ? "true" : "false");
720 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
721 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
722 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
723 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
725 if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
726 buffer) == GST_FLOW_OK) {
727 channel->opened = TRUE;
728 #ifdef TIZEN_FEATURE_WEBRTC_MODIFICATION
729 channel->webrtcbin->priv->data_channels_opened++;
731 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
733 GError *error = NULL;
734 g_set_error (&error, GST_WEBRTC_ERROR,
735 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
736 "Failed to send DCEP open packet");
737 _channel_store_error (channel, error);
738 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
743 _get_sctp_reliability (WebRTCDataChannel * channel,
744 GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
746 if (channel->parent.max_retransmits != -1) {
747 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
748 *rel_param = channel->parent.max_retransmits;
749 } else if (channel->parent.max_packet_lifetime != -1) {
750 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
751 *rel_param = channel->parent.max_packet_lifetime;
753 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
759 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
761 return size <= channel->sctp_transport->max_message_size;
765 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
768 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
769 GstSctpSendMetaPartiallyReliability reliability;
776 buffer = gst_buffer_new ();
777 ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
782 data = (guint8 *) g_bytes_get_data (bytes, &size);
783 g_return_if_fail (data != NULL);
784 if (!_is_within_max_message_size (channel, size)) {
785 GError *error = NULL;
786 g_set_error (&error, GST_WEBRTC_ERROR,
787 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
788 "Requested to send data that is too large");
789 _channel_store_error (channel, error);
790 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
795 buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
796 0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
797 ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
800 _get_sctp_reliability (channel, &reliability, &rel_param);
801 gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
802 reliability, rel_param);
804 GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
807 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
808 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
809 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
810 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
812 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
814 if (ret != GST_FLOW_OK) {
815 GError *error = NULL;
816 g_set_error (&error, GST_WEBRTC_ERROR,
817 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
818 _channel_store_error (channel, error);
819 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
824 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
827 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
828 GstSctpSendMetaPartiallyReliability reliability;
834 if (!channel->parent.negotiated)
835 g_return_if_fail (channel->opened);
836 g_return_if_fail (channel->sctp_transport != NULL);
839 buffer = gst_buffer_new ();
840 ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
842 gsize size = strlen (str);
845 if (!_is_within_max_message_size (channel, size)) {
846 GError *error = NULL;
847 g_set_error (&error, GST_WEBRTC_ERROR,
848 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
849 "Requested to send a string that is too large");
850 _channel_store_error (channel, error);
851 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
856 str_copy = g_strdup (str);
858 gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
859 size, 0, size, str_copy, g_free);
860 ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
863 _get_sctp_reliability (channel, &reliability, &rel_param);
864 gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
865 reliability, rel_param);
867 GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
870 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
871 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
872 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
873 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
875 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
877 if (ret != GST_FLOW_OK) {
878 GError *error = NULL;
879 g_set_error (&error, GST_WEBRTC_ERROR,
880 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
881 _channel_store_error (channel, error);
882 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
887 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
888 WebRTCDataChannel * channel)
890 GstWebRTCSCTPTransportState state;
892 g_object_get (sctp_transport, "state", &state, NULL);
893 if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
894 if (channel->parent.negotiated)
895 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
899 static WebRTCDataChannel *
900 ensure_channel_alive (WebRTCDataChannel * channel)
902 /* ghetto impl of, does the channel still exist?.
903 * Needed because g_signal_handler_disconnect*() will not disconnect any
904 * running functions and _finalize() implementation can complete and
905 * invalidate channel */
906 G_LOCK (outstanding_channels_lock);
907 if (g_list_find (outstanding_channels, channel)) {
908 g_object_ref (channel);
910 G_UNLOCK (outstanding_channels_lock);
913 G_UNLOCK (outstanding_channels_lock);
919 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
920 WebRTCDataChannel * channel)
922 if (!(channel = ensure_channel_alive (channel)))
925 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
926 _on_sctp_notify_state_unlocked (sctp_transport, channel);
927 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
929 g_object_unref (channel);
933 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
935 gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
939 static GstPadProbeReturn
940 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
942 WebRTCDataChannel *channel = user_data;
946 if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
947 GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
948 size = gst_buffer_get_size (buffer);
949 } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
950 GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
951 size = gst_buffer_list_calculate_size (list);
955 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
956 prev_amount = channel->parent.buffered_amount;
957 channel->parent.buffered_amount -= size;
958 GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
959 G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
960 G_GUINT64_FORMAT, prev_amount,
961 channel->parent.buffered_amount_low_threshold,
962 channel->parent.buffered_amount);
963 if (prev_amount >= channel->parent.buffered_amount_low_threshold
964 && channel->parent.buffered_amount <
965 channel->parent.buffered_amount_low_threshold) {
966 _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
970 if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
971 && channel->parent.buffered_amount <= 0) {
972 _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
975 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
976 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
979 return GST_PAD_PROBE_OK;
983 gst_webrtc_data_channel_constructed (GObject * object)
985 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
989 caps = gst_caps_new_any ();
991 channel->appsrc = gst_element_factory_make ("appsrc", NULL);
992 gst_object_ref_sink (channel->appsrc);
993 pad = gst_element_get_static_pad (channel->appsrc, "src");
995 channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
996 (GstPadProbeCallback) on_appsrc_data, channel, NULL);
998 channel->appsink = gst_element_factory_make ("appsink", NULL);
999 gst_object_ref_sink (channel->appsink);
1000 g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
1002 gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
1005 gst_object_unref (pad);
1006 gst_caps_unref (caps);
1010 gst_webrtc_data_channel_dispose (GObject * object)
1012 G_LOCK (outstanding_channels_lock);
1013 outstanding_channels = g_list_remove (outstanding_channels, object);
1014 G_UNLOCK (outstanding_channels_lock);
1016 G_OBJECT_CLASS (parent_class)->dispose (object);
1020 gst_webrtc_data_channel_finalize (GObject * object)
1022 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
1024 if (channel->src_probe) {
1025 GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
1026 gst_pad_remove_probe (pad, channel->src_probe);
1027 gst_object_unref (pad);
1028 channel->src_probe = 0;
1031 if (channel->sctp_transport)
1032 g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1033 g_clear_object (&channel->sctp_transport);
1035 g_clear_object (&channel->appsrc);
1036 g_clear_object (&channel->appsink);
1038 G_OBJECT_CLASS (parent_class)->finalize (object);
1042 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
1044 GObjectClass *gobject_class = (GObjectClass *) klass;
1045 GstWebRTCDataChannelClass *channel_class =
1046 (GstWebRTCDataChannelClass *) klass;
1048 gobject_class->constructed = gst_webrtc_data_channel_constructed;
1049 gobject_class->dispose = gst_webrtc_data_channel_dispose;
1050 gobject_class->finalize = gst_webrtc_data_channel_finalize;
1052 channel_class->send_data = webrtc_data_channel_send_data;
1053 channel_class->send_string = webrtc_data_channel_send_string;
1054 channel_class->close = webrtc_data_channel_close;
1058 webrtc_data_channel_init (WebRTCDataChannel * channel)
1060 G_LOCK (outstanding_channels_lock);
1061 outstanding_channels = g_list_prepend (outstanding_channels, channel);
1062 G_UNLOCK (outstanding_channels_lock);
1066 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
1067 WebRTCSCTPTransport * sctp)
1069 g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
1070 g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
1072 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
1073 if (channel->sctp_transport)
1074 g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1075 GST_TRACE ("%p set sctp %p", channel, sctp);
1077 gst_object_replace ((GstObject **) & channel->sctp_transport,
1081 g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_stream_reset),
1083 g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
1086 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
1090 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1091 WebRTCSCTPTransport * sctp_transport)
1093 if (sctp_transport && !channel->sctp_transport) {
1096 g_object_get (channel, "id", &id, NULL);
1098 if (sctp_transport->association_established && id != -1) {
1101 _data_channel_set_sctp_transport (channel, sctp_transport);
1102 pad_name = g_strdup_printf ("sink_%u", id);
1103 if (!gst_element_link_pads (channel->appsrc, "src",
1104 channel->sctp_transport->sctpenc, pad_name))
1105 g_warn_if_reached ();
1108 _on_sctp_notify_state_unlocked (G_OBJECT (sctp_transport), channel);