GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
"webrtcdatachannel"););
+G_LOCK_DEFINE_STATIC (outstanding_channels_lock);
+static GList *outstanding_channels;
+
typedef enum
{
DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
GST_INFO_OBJECT (channel, "Received channel open");
if (channel->parent.negotiated) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Data channel was signalled as negotiated already");
g_return_val_if_reached (GST_FLOW_ERROR);
}
ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
if (ret != GST_FLOW_OK) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
- "Could not send ack packet");
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
return ret;
}
return ret;
} else {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Unknown message type in control protocol");
return GST_FLOW_ERROR;
}
{
g_free (label);
g_free (proto);
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
g_return_val_if_reached (GST_FLOW_ERROR);
}
}
buffer = gst_sample_get_buffer (sample);
if (!buffer) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
return GST_FLOW_ERROR;
}
receive = gst_sctp_buffer_get_receive_meta (buffer);
if (!receive) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"No SCTP Receive meta on the buffer");
return GST_FLOW_ERROR;
}
case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
GstMapInfo info = GST_MAP_INFO_INIT;
if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Failed to map received buffer");
ret = GST_FLOW_ERROR;
} else {
case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
GstMapInfo info = GST_MAP_INFO_INIT;
if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Failed to map received buffer");
ret = GST_FLOW_ERROR;
} else {
case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
struct map_info *info = g_new0 (struct map_info, 1);
if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Failed to map received buffer");
ret = GST_FLOW_ERROR;
} else {
NULL);
break;
default:
- g_set_error (error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Unknown SCTP PPID %u received", receive->ppid);
ret = GST_FLOW_ERROR;
break;
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
channel->parent.buffered_amount += gst_buffer_get_size (buffer);
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+ g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
buffer) == GST_FLOW_OK) {
_channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
} else {
GError *error = NULL;
- g_set_error (&error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (&error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Failed to send DCEP open packet");
_channel_store_error (channel, error);
_channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
g_return_if_fail (data != NULL);
if (!_is_within_max_message_size (channel, size)) {
GError *error = NULL;
- g_set_error (&error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (&error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Requested to send data that is too large");
_channel_store_error (channel, error);
_channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
channel->parent.buffered_amount += gst_buffer_get_size (buffer);
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+ g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
if (ret != GST_FLOW_OK) {
GError *error = NULL;
- g_set_error (&error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
+ g_set_error (&error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
_channel_store_error (channel, error);
_channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
}
ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
} else {
gsize size = strlen (str);
- gchar *str_copy = g_strdup (str);
+ gchar *str_copy;
if (!_is_within_max_message_size (channel, size)) {
GError *error = NULL;
- g_set_error (&error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
+ g_set_error (&error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
"Requested to send a string that is too large");
_channel_store_error (channel, error);
_channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
return;
}
+ str_copy = g_strdup (str);
buffer =
gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
size, 0, size, str_copy, g_free);
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
channel->parent.buffered_amount += gst_buffer_get_size (buffer);
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+ g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
if (ret != GST_FLOW_OK) {
GError *error = NULL;
- g_set_error (&error, GST_WEBRTC_BIN_ERROR,
- GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
+ g_set_error (&error, GST_WEBRTC_ERROR,
+ GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
_channel_store_error (channel, error);
_channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
}
}
}
+static WebRTCDataChannel *
+ensure_channel_alive (WebRTCDataChannel * channel)
+{
+ /* ghetto impl of, does the channel still exist?.
+ * Needed because g_signal_handler_disconnect*() will not disconnect any
+ * running functions and _finalize() implementation can complete and
+ * invalidate channel */
+ G_LOCK (outstanding_channels_lock);
+ if (g_list_find (outstanding_channels, channel)) {
+ g_object_ref (channel);
+ } else {
+ G_UNLOCK (outstanding_channels_lock);
+ return NULL;
+ }
+ G_UNLOCK (outstanding_channels_lock);
+
+ return channel;
+}
+
static void
_on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
WebRTCDataChannel * channel)
{
+ if (!(channel = ensure_channel_alive (channel)))
+ return;
+
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
_on_sctp_notify_state_unlocked (sctp_transport, channel);
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+
+ g_object_unref (channel);
}
static void
channel->parent.buffered_amount_low_threshold,
channel->parent.buffered_amount);
if (prev_amount >= channel->parent.buffered_amount_low_threshold
- && channel->parent.buffered_amount <
+ && channel->parent.buffered_amount <=
channel->parent.buffered_amount_low_threshold) {
_channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
NULL);
NULL);
}
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+ g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
}
return GST_PAD_PROBE_OK;
}
static void
+gst_webrtc_data_channel_dispose (GObject * object)
+{
+ G_LOCK (outstanding_channels_lock);
+ outstanding_channels = g_list_remove (outstanding_channels, object);
+ G_UNLOCK (outstanding_channels_lock);
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static void
gst_webrtc_data_channel_finalize (GObject * object)
{
WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
(GstWebRTCDataChannelClass *) klass;
gobject_class->constructed = gst_webrtc_data_channel_constructed;
+ gobject_class->dispose = gst_webrtc_data_channel_dispose;
gobject_class->finalize = gst_webrtc_data_channel_finalize;
channel_class->send_data = webrtc_data_channel_send_data;
static void
webrtc_data_channel_init (WebRTCDataChannel * channel)
{
+ G_LOCK (outstanding_channels_lock);
+ outstanding_channels = g_list_prepend (outstanding_channels, channel);
+ G_UNLOCK (outstanding_channels_lock);
}
static void
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
if (channel->sctp_transport)
g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
+ GST_TRACE ("%p set sctp %p", channel, sctp);
gst_object_replace ((GstObject **) & channel->sctp_transport,
GST_OBJECT (sctp));