2 * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:gstwebrtc-datachannel
22 * @short_description: RTCDataChannel object
23 * @title: GstWebRTCDataChannel
24 * @see_also: #GstWebRTCRTPTransceiver
26 * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
41 #include "gstwebrtcbin.h"
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49 GST_TYPE_WEBRTC_DATA_CHANNEL,
50 GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51 "webrtcdatachannel"););
55 DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
56 DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
57 DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
58 DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
59 DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
60 DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
61 DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
66 CHANNEL_TYPE_RELIABLE = 0x00,
67 CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
68 CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
69 CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
70 CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
71 CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
72 } DataChannelReliabilityType;
76 CHANNEL_MESSAGE_ACK = 0x02,
77 CHANNEL_MESSAGE_OPEN = 0x03,
81 priority_type_to_uint (GstWebRTCPriorityType pri)
84 case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
86 case GST_WEBRTC_PRIORITY_TYPE_LOW:
88 case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
90 case GST_WEBRTC_PRIORITY_TYPE_HIGH:
93 g_assert_not_reached ();
97 static GstWebRTCPriorityType
98 priority_uint_to_type (guint16 val)
101 return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
103 return GST_WEBRTC_PRIORITY_TYPE_LOW;
105 return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
106 return GST_WEBRTC_PRIORITY_TYPE_HIGH;
110 construct_open_packet (WebRTCDataChannel * channel)
113 gsize label_len = strlen (channel->parent.label);
114 gsize proto_len = strlen (channel->parent.protocol);
115 gsize size = 12 + label_len + proto_len;
116 DataChannelReliabilityType reliability = 0;
117 guint32 reliability_param = 0;
123 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
124 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
125 * | Message Type | Channel Type | Priority |
126 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
127 * | Reliability Parameter |
128 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
129 * | Label Length | Protocol Length |
130 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
134 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
138 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
141 gst_byte_writer_init_with_size (&w, size, FALSE);
143 if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
144 g_return_val_if_reached (NULL);
146 if (!channel->parent.ordered)
148 if (channel->parent.max_retransmits != -1) {
150 reliability_param = channel->parent.max_retransmits;
152 if (channel->parent.max_packet_lifetime != -1) {
154 reliability_param = channel->parent.max_packet_lifetime;
157 priority = priority_type_to_uint (channel->parent.priority);
159 if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
160 g_return_val_if_reached (NULL);
161 if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
162 g_return_val_if_reached (NULL);
163 if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
164 g_return_val_if_reached (NULL);
165 if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
166 g_return_val_if_reached (NULL);
167 if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
168 g_return_val_if_reached (NULL);
169 if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
171 g_return_val_if_reached (NULL);
172 if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
174 g_return_val_if_reached (NULL);
176 buf = gst_byte_writer_reset_and_get_buffer (&w);
178 /* send reliable and ordered */
179 gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
180 GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
186 construct_ack_packet (WebRTCDataChannel * channel)
193 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
194 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
199 gst_byte_writer_init_with_size (&w, 1, FALSE);
201 if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
202 g_return_val_if_reached (NULL);
204 buf = gst_byte_writer_reset_and_get_buffer (&w);
206 /* send reliable and ordered */
207 gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
208 GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
213 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
218 GstWebRTCDataChannel *channel;
221 GDestroyNotify notify;
224 static GstStructure *
225 _execute_task (GstWebRTCBin * webrtc, struct task *task)
228 task->func (task->channel, task->user_data);
234 _free_task (struct task *task)
236 gst_object_unref (task->channel);
239 task->notify (task->user_data);
244 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
245 gpointer user_data, GDestroyNotify notify)
247 struct task *task = g_new0 (struct task, 1);
249 task->channel = gst_object_ref (channel);
251 task->user_data = user_data;
252 task->notify = notify;
254 gst_webrtc_bin_enqueue_task (channel->webrtcbin,
255 (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
260 _channel_store_error (WebRTCDataChannel * channel, GError * error)
262 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
264 GST_WARNING_OBJECT (channel, "Error: %s",
265 error ? error->message : "Unknown");
266 if (!channel->stored_error)
267 channel->stored_error = error;
269 g_clear_error (&error);
271 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
275 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
277 gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
281 _transport_closed (WebRTCDataChannel * channel)
285 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
286 error = channel->stored_error;
287 channel->stored_error = NULL;
288 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
291 gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
292 g_clear_error (&error);
294 gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
298 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
302 pad = gst_element_get_static_pad (channel->appsrc, "src");
303 peer = gst_pad_get_peer (pad);
304 gst_object_unref (pad);
307 GstElement *sctpenc = gst_pad_get_parent_element (peer);
310 gst_element_release_request_pad (sctpenc, peer);
311 gst_object_unref (sctpenc);
313 gst_object_unref (peer);
316 _transport_closed (channel);
320 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
322 /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
323 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
324 if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED
325 || channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
326 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
329 channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
330 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
331 g_object_notify (G_OBJECT (channel), "ready-state");
333 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
334 if (channel->parent.buffered_amount <= 0) {
335 _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
339 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
343 _on_sctp_reset_stream (GstWebRTCSCTPTransport * sctp, guint stream_id,
344 WebRTCDataChannel * channel)
346 if (channel->parent.id == stream_id)
347 _channel_enqueue_task (channel, (ChannelTask) _transport_closed,
348 GUINT_TO_POINTER (stream_id), NULL);
352 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
354 _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
358 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
359 gsize size, GError ** error)
367 g_return_val_if_reached (GST_FLOW_ERROR);
369 g_return_val_if_reached (GST_FLOW_ERROR);
371 gst_byte_reader_init (&r, data, size);
373 if (!gst_byte_reader_get_uint8 (&r, &message_type))
374 g_return_val_if_reached (GST_FLOW_ERROR);
376 if (message_type == CHANNEL_MESSAGE_ACK) {
378 GST_INFO_OBJECT (channel, "Received channel ack");
380 } else if (message_type == CHANNEL_MESSAGE_OPEN) {
382 guint32 reliability_param;
383 guint16 priority, label_len, proto_len;
388 GST_INFO_OBJECT (channel, "Received channel open");
390 if (channel->parent.negotiated) {
391 g_set_error (error, GST_WEBRTC_BIN_ERROR,
392 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
393 "Data channel was signalled as negotiated already");
394 g_return_val_if_reached (GST_FLOW_ERROR);
400 if (!gst_byte_reader_get_uint8 (&r, &reliability))
402 if (!gst_byte_reader_get_uint16_be (&r, &priority))
404 if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
406 if (!gst_byte_reader_get_uint16_be (&r, &label_len))
408 if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
411 label = g_new0 (gchar, (gsize) label_len + 1);
412 proto = g_new0 (gchar, (gsize) proto_len + 1);
414 if (!gst_byte_reader_get_data (&r, label_len, &src))
416 memcpy (label, src, label_len);
417 label[label_len] = '\0';
418 if (!gst_byte_reader_get_data (&r, proto_len, &src))
420 memcpy (proto, src, proto_len);
421 proto[proto_len] = '\0';
423 g_free (channel->parent.label);
424 channel->parent.label = label;
425 g_free (channel->parent.protocol);
426 channel->parent.protocol = proto;
427 channel->parent.priority = priority_uint_to_type (priority);
428 channel->parent.ordered = !(reliability & 0x80);
429 if (reliability & 0x01) {
430 channel->parent.max_retransmits = reliability_param;
431 channel->parent.max_packet_lifetime = -1;
432 } else if (reliability & 0x02) {
433 channel->parent.max_retransmits = -1;
434 channel->parent.max_packet_lifetime = reliability_param;
436 channel->parent.max_retransmits = -1;
437 channel->parent.max_packet_lifetime = -1;
439 channel->opened = TRUE;
441 GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
442 "label %s protocol %s ordered %s", channel->parent.id,
443 channel->parent.label, channel->parent.protocol,
444 channel->parent.ordered ? "true" : "false");
446 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
448 GST_INFO_OBJECT (channel, "Sending channel ack");
449 buffer = construct_ack_packet (channel);
451 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
452 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
453 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
455 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
456 if (ret != GST_FLOW_OK) {
457 g_set_error (error, GST_WEBRTC_BIN_ERROR,
458 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
459 "Could not send ack packet");
465 g_set_error (error, GST_WEBRTC_BIN_ERROR,
466 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
467 "Unknown message type in control protocol");
468 return GST_FLOW_ERROR;
475 g_set_error (error, GST_WEBRTC_BIN_ERROR,
476 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
477 g_return_val_if_reached (GST_FLOW_ERROR);
482 on_sink_eos (GstAppSink * sink, gpointer user_data)
493 buffer_unmap_and_unref (struct map_info *info)
495 gst_buffer_unmap (info->buffer, &info->map_info);
496 gst_buffer_unref (info->buffer);
501 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
503 gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
508 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
510 gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
515 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
518 GstSctpReceiveMeta *receive;
520 GstFlowReturn ret = GST_FLOW_OK;
522 GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
524 g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
526 buffer = gst_sample_get_buffer (sample);
528 g_set_error (error, GST_WEBRTC_BIN_ERROR,
529 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
530 return GST_FLOW_ERROR;
532 receive = gst_sctp_buffer_get_receive_meta (buffer);
534 g_set_error (error, GST_WEBRTC_BIN_ERROR,
535 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
536 "No SCTP Receive meta on the buffer");
537 return GST_FLOW_ERROR;
540 switch (receive->ppid) {
541 case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
542 GstMapInfo info = GST_MAP_INFO_INIT;
543 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
544 g_set_error (error, GST_WEBRTC_BIN_ERROR,
545 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
546 "Failed to map received buffer");
547 ret = GST_FLOW_ERROR;
549 ret = _parse_control_packet (channel, info.data, info.size, error);
550 gst_buffer_unmap (buffer, &info);
554 case DATA_CHANNEL_PPID_WEBRTC_STRING:
555 case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
556 GstMapInfo info = GST_MAP_INFO_INIT;
557 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
558 g_set_error (error, GST_WEBRTC_BIN_ERROR,
559 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
560 "Failed to map received buffer");
561 ret = GST_FLOW_ERROR;
563 gchar *str = g_strndup ((gchar *) info.data, info.size);
564 _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
566 gst_buffer_unmap (buffer, &info);
570 case DATA_CHANNEL_PPID_WEBRTC_BINARY:
571 case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
572 struct map_info *info = g_new0 (struct map_info, 1);
573 if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
574 g_set_error (error, GST_WEBRTC_BIN_ERROR,
575 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
576 "Failed to map received buffer");
577 ret = GST_FLOW_ERROR;
579 GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
580 info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
581 info->buffer = gst_buffer_ref (buffer);
582 _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
583 (GDestroyNotify) g_bytes_unref);
587 case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
588 _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
591 case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
592 _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
596 g_set_error (error, GST_WEBRTC_BIN_ERROR,
597 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
598 "Unknown SCTP PPID %u received", receive->ppid);
599 ret = GST_FLOW_ERROR;
607 on_sink_preroll (GstAppSink * sink, gpointer user_data)
609 WebRTCDataChannel *channel = user_data;
610 GstSample *sample = gst_app_sink_pull_preroll (sink);
614 /* This sample also seems to be provided by the sample callback
615 ret = _data_channel_have_sample (channel, sample); */
617 gst_sample_unref (sample);
618 } else if (gst_app_sink_is_eos (sink)) {
621 ret = GST_FLOW_ERROR;
624 if (ret != GST_FLOW_OK) {
625 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
632 on_sink_sample (GstAppSink * sink, gpointer user_data)
634 WebRTCDataChannel *channel = user_data;
635 GstSample *sample = gst_app_sink_pull_sample (sink);
637 GError *error = NULL;
640 ret = _data_channel_have_sample (channel, sample, &error);
641 gst_sample_unref (sample);
642 } else if (gst_app_sink_is_eos (sink)) {
645 ret = GST_FLOW_ERROR;
649 _channel_store_error (channel, error);
651 if (ret != GST_FLOW_OK) {
652 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
658 static GstAppSinkCallbacks sink_callbacks = {
665 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
669 g_return_if_fail (!channel->parent.negotiated);
670 g_return_if_fail (channel->parent.id != -1);
671 g_return_if_fail (channel->sctp_transport != NULL);
673 buffer = construct_open_packet (channel);
675 GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
676 "label %s protocol %s ordered %s", channel->parent.id,
677 channel->parent.label, channel->parent.protocol,
678 channel->parent.ordered ? "true" : "false");
680 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
681 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
682 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
684 if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
685 buffer) == GST_FLOW_OK) {
686 channel->opened = TRUE;
687 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
689 GError *error = NULL;
690 g_set_error (&error, GST_WEBRTC_BIN_ERROR,
691 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
692 "Failed to send DCEP open packet");
693 _channel_store_error (channel, error);
694 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
699 _get_sctp_reliability (WebRTCDataChannel * channel,
700 GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
702 if (channel->parent.max_retransmits != -1) {
703 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
704 *rel_param = channel->parent.max_retransmits;
705 } else if (channel->parent.max_packet_lifetime != -1) {
706 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
707 *rel_param = channel->parent.max_packet_lifetime;
709 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
715 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
717 return size <= channel->sctp_transport->max_message_size;
721 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
724 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
725 GstSctpSendMetaPartiallyReliability reliability;
732 buffer = gst_buffer_new ();
733 ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
738 data = (guint8 *) g_bytes_get_data (bytes, &size);
739 g_return_if_fail (data != NULL);
740 if (!_is_within_max_message_size (channel, size)) {
741 GError *error = NULL;
742 g_set_error (&error, GST_WEBRTC_BIN_ERROR,
743 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
744 "Requested to send data that is too large");
745 _channel_store_error (channel, error);
746 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
751 buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
752 0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
753 ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
756 _get_sctp_reliability (channel, &reliability, &rel_param);
757 gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
758 reliability, rel_param);
760 GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
763 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
764 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
765 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
767 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
769 if (ret != GST_FLOW_OK) {
770 GError *error = NULL;
771 g_set_error (&error, GST_WEBRTC_BIN_ERROR,
772 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
773 _channel_store_error (channel, error);
774 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
779 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
782 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
783 GstSctpSendMetaPartiallyReliability reliability;
789 if (!channel->parent.negotiated)
790 g_return_if_fail (channel->opened);
791 g_return_if_fail (channel->sctp_transport != NULL);
794 buffer = gst_buffer_new ();
795 ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
797 gsize size = strlen (str);
798 gchar *str_copy = g_strdup (str);
800 if (!_is_within_max_message_size (channel, size)) {
801 GError *error = NULL;
802 g_set_error (&error, GST_WEBRTC_BIN_ERROR,
803 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
804 "Requested to send a string that is too large");
805 _channel_store_error (channel, error);
806 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
812 gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
813 size, 0, size, str_copy, g_free);
814 ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
817 _get_sctp_reliability (channel, &reliability, &rel_param);
818 gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
819 reliability, rel_param);
821 GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
824 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
825 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
826 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
828 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
830 if (ret != GST_FLOW_OK) {
831 GError *error = NULL;
832 g_set_error (&error, GST_WEBRTC_BIN_ERROR,
833 GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
834 _channel_store_error (channel, error);
835 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
840 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
841 WebRTCDataChannel * channel)
843 GstWebRTCSCTPTransportState state;
845 g_object_get (sctp_transport, "state", &state, NULL);
846 if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
847 if (channel->parent.negotiated)
848 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
853 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
854 WebRTCDataChannel * channel)
856 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
857 _on_sctp_notify_state_unlocked (sctp_transport, channel);
858 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
862 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
864 gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
868 static GstPadProbeReturn
869 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
871 WebRTCDataChannel *channel = user_data;
875 if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
876 GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
877 size = gst_buffer_get_size (buffer);
878 } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
879 GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
880 size = gst_buffer_list_calculate_size (list);
884 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
885 prev_amount = channel->parent.buffered_amount;
886 channel->parent.buffered_amount -= size;
887 GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
888 G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
889 G_GUINT64_FORMAT, prev_amount,
890 channel->parent.buffered_amount_low_threshold,
891 channel->parent.buffered_amount);
892 if (prev_amount >= channel->parent.buffered_amount_low_threshold
893 && channel->parent.buffered_amount <
894 channel->parent.buffered_amount_low_threshold) {
895 _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
899 if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
900 && channel->parent.buffered_amount <= 0) {
901 _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
904 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
907 return GST_PAD_PROBE_OK;
911 gst_webrtc_data_channel_constructed (GObject * object)
913 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
917 caps = gst_caps_new_any ();
919 channel->appsrc = gst_element_factory_make ("appsrc", NULL);
920 gst_object_ref_sink (channel->appsrc);
921 pad = gst_element_get_static_pad (channel->appsrc, "src");
923 channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
924 (GstPadProbeCallback) on_appsrc_data, channel, NULL);
926 channel->appsink = gst_element_factory_make ("appsink", NULL);
927 gst_object_ref_sink (channel->appsink);
928 g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
930 gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
933 gst_object_unref (pad);
934 gst_caps_unref (caps);
938 gst_webrtc_data_channel_finalize (GObject * object)
940 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
942 if (channel->src_probe) {
943 GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
944 gst_pad_remove_probe (pad, channel->src_probe);
945 gst_object_unref (pad);
946 channel->src_probe = 0;
949 if (channel->sctp_transport)
950 g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
951 g_clear_object (&channel->sctp_transport);
953 g_clear_object (&channel->appsrc);
954 g_clear_object (&channel->appsink);
956 G_OBJECT_CLASS (parent_class)->finalize (object);
960 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
962 GObjectClass *gobject_class = (GObjectClass *) klass;
963 GstWebRTCDataChannelClass *channel_class =
964 (GstWebRTCDataChannelClass *) klass;
966 gobject_class->constructed = gst_webrtc_data_channel_constructed;
967 gobject_class->finalize = gst_webrtc_data_channel_finalize;
969 channel_class->send_data = webrtc_data_channel_send_data;
970 channel_class->send_string = webrtc_data_channel_send_string;
971 channel_class->close = webrtc_data_channel_close;
975 webrtc_data_channel_init (WebRTCDataChannel * channel)
980 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
981 GstWebRTCSCTPTransport * sctp)
983 g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
984 g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
986 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
987 if (channel->sctp_transport)
988 g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
990 gst_object_replace ((GstObject **) & channel->sctp_transport,
994 g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_reset_stream),
996 g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
998 _on_sctp_notify_state_unlocked (G_OBJECT (sctp), channel);
1000 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
1004 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1005 GstWebRTCSCTPTransport * sctp_transport)
1007 if (sctp_transport && !channel->sctp_transport) {
1010 g_object_get (channel, "id", &id, NULL);
1012 if (sctp_transport->association_established && id != -1) {
1015 _data_channel_set_sctp_transport (channel, sctp_transport);
1016 pad_name = g_strdup_printf ("sink_%u", id);
1017 if (!gst_element_link_pads (channel->appsrc, "src",
1018 channel->sctp_transport->sctpenc, pad_name))
1019 g_warn_if_reached ();