g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL],
0, channel, FALSE);
- gst_bin_add (GST_BIN (webrtc), channel->appsrc);
- gst_bin_add (GST_BIN (webrtc), channel->appsink);
+ gst_bin_add (GST_BIN (webrtc), channel->src_bin);
+ gst_bin_add (GST_BIN (webrtc), channel->sink_bin);
- gst_element_sync_state_with_parent (channel->appsrc);
- gst_element_sync_state_with_parent (channel->appsink);
+ gst_element_sync_state_with_parent (channel->src_bin);
+ gst_element_sync_state_with_parent (channel->sink_bin);
webrtc_data_channel_link_to_sctp (channel, webrtc->priv->sctp_transport);
g_signal_connect (channel, "notify::ready-state",
G_CALLBACK (_on_data_channel_ready_state), webrtc);
- sink_pad = gst_element_get_static_pad (channel->appsink, "sink");
+ sink_pad = gst_element_get_static_pad (channel->sink_bin, "sink");
if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK)
GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %"
GST_PTR_FORMAT, GST_PAD_NAME (pad), channel);
g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL], 0,
ret, TRUE);
- gst_bin_add (GST_BIN (webrtc), ret->appsrc);
- gst_bin_add (GST_BIN (webrtc), ret->appsink);
+ gst_bin_add (GST_BIN (webrtc), ret->src_bin);
+ gst_bin_add (GST_BIN (webrtc), ret->sink_bin);
- gst_element_sync_state_with_parent (ret->appsrc);
- gst_element_sync_state_with_parent (ret->appsink);
+ gst_element_sync_state_with_parent (ret->src_bin);
+ gst_element_sync_state_with_parent (ret->sink_bin);
ret = gst_object_ref (ret);
ret->webrtcbin = webrtc;
#define GST_CAT_DEFAULT webrtc_data_channel_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
+static void _close_procedure (WebRTCDataChannel * channel, gpointer user_data);
+
+typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
+ gpointer user_data);
+
+struct task
+{
+ GstWebRTCDataChannel *channel;
+ ChannelTask func;
+ gpointer user_data;
+ GDestroyNotify notify;
+};
+
+static GstStructure *
+_execute_task (GstWebRTCBin * webrtc, struct task *task)
+{
+ if (task->func)
+ task->func (task->channel, task->user_data);
+
+ return NULL;
+}
+
+static void
+_free_task (struct task *task)
+{
+ gst_object_unref (task->channel);
+
+ if (task->notify)
+ task->notify (task->user_data);
+ g_free (task);
+}
+
+static void
+_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
+ gpointer user_data, GDestroyNotify notify)
+{
+ struct task *task = g_new0 (struct task, 1);
+
+ task->channel = gst_object_ref (channel);
+ task->func = func;
+ task->user_data = user_data;
+ task->notify = notify;
+
+ gst_webrtc_bin_enqueue_task (channel->webrtcbin,
+ (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
+ NULL);
+}
+
+static void
+_channel_store_error (WebRTCDataChannel * channel, GError * error)
+{
+ GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
+ if (error) {
+ GST_WARNING_OBJECT (channel, "Error: %s",
+ error ? error->message : "Unknown");
+ if (!channel->stored_error)
+ channel->stored_error = error;
+ else
+ g_clear_error (&error);
+ }
+ GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+}
+
+struct _WebRTCErrorIgnoreBin
+{
+ GstBin bin;
+
+ WebRTCDataChannel *data_channel;
+};
+
+G_DEFINE_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, GST_TYPE_BIN);
+
+static void
+webrtc_error_ignore_bin_handle_message (GstBin * bin, GstMessage * message)
+{
+ WebRTCErrorIgnoreBin *self = WEBRTC_ERROR_IGNORE_BIN (bin);
+
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_ERROR:{
+ GError *error = NULL;
+ gst_message_parse_error (message, &error, NULL);
+ GST_DEBUG_OBJECT (bin, "handling error message from internal element");
+ _channel_store_error (self->data_channel, error);
+ _channel_enqueue_task (self->data_channel, (ChannelTask) _close_procedure,
+ NULL, NULL);
+ break;
+ }
+ default:
+ GST_BIN_CLASS (webrtc_error_ignore_bin_parent_class)->handle_message (bin,
+ message);
+ break;
+ }
+}
+
+static void
+webrtc_error_ignore_bin_class_init (WebRTCErrorIgnoreBinClass * klass)
+{
+ GstBinClass *bin_class = (GstBinClass *) klass;
+
+ bin_class->handle_message = webrtc_error_ignore_bin_handle_message;
+}
+
+static void
+webrtc_error_ignore_bin_init (WebRTCErrorIgnoreBin * bin)
+{
+}
+
+static GstElement *
+webrtc_error_ignore_bin_new (WebRTCDataChannel * data_channel,
+ GstElement * other)
+{
+ WebRTCErrorIgnoreBin *self;
+ GstPad *pad;
+
+ self = g_object_new (webrtc_error_ignore_bin_get_type (), NULL);
+ self->data_channel = data_channel;
+
+ gst_bin_add (GST_BIN (self), other);
+
+ pad = gst_element_get_static_pad (other, "src");
+ if (pad) {
+ GstPad *ghost_pad = gst_ghost_pad_new ("src", pad);
+ gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
+ gst_clear_object (&pad);
+ }
+ pad = gst_element_get_static_pad (other, "sink");
+ if (pad) {
+ GstPad *ghost_pad = gst_ghost_pad_new ("sink", pad);
+ gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
+ gst_clear_object (&pad);
+ }
+
+ return (GstElement *) self;
+}
+
#define webrtc_data_channel_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
GST_TYPE_WEBRTC_DATA_CHANNEL,
return buf;
}
-typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
- gpointer user_data);
-
-struct task
-{
- GstWebRTCDataChannel *channel;
- ChannelTask func;
- gpointer user_data;
- GDestroyNotify notify;
-};
-
-static GstStructure *
-_execute_task (GstWebRTCBin * webrtc, struct task *task)
-{
- if (task->func)
- task->func (task->channel, task->user_data);
-
- return NULL;
-}
-
-static void
-_free_task (struct task *task)
-{
- gst_object_unref (task->channel);
-
- if (task->notify)
- task->notify (task->user_data);
- g_free (task);
-}
-
-static void
-_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
- gpointer user_data, GDestroyNotify notify)
-{
- struct task *task = g_new0 (struct task, 1);
-
- task->channel = gst_object_ref (channel);
- task->func = func;
- task->user_data = user_data;
- task->notify = notify;
-
- gst_webrtc_bin_enqueue_task (channel->webrtcbin,
- (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
- NULL);
-}
-
-static void
-_channel_store_error (WebRTCDataChannel * channel, GError * error)
-{
- GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
- if (error) {
- GST_WARNING_OBJECT (channel, "Error: %s",
- error ? error->message : "Unknown");
- if (!channel->stored_error)
- channel->stored_error = error;
- else
- g_clear_error (&error);
- }
- GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
-}
-
static void
_emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
{
error = channel->stored_error;
channel->stored_error = NULL;
+ GST_TRACE_OBJECT (channel, "transport closed, peer closed %u error %p "
+ "buffered %" G_GUINT64_FORMAT, channel->peer_closed, error,
+ channel->parent.buffered_amount);
+
both_sides_closed =
channel->peer_closed && channel->parent.buffered_amount <= 0;
if (both_sides_closed || error) {
GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
channel->parent.id, channel->parent.label);
- pad = gst_element_get_static_pad (channel->appsrc, "src");
+ pad = gst_element_get_static_pad (channel->src_bin, "src");
peer = gst_pad_get_peer (pad);
gst_object_unref (pad);
GstElement *sctpenc = gst_pad_get_parent_element (peer);
if (sctpenc) {
+ GST_TRACE_OBJECT (channel, "removing sctpenc pad %" GST_PTR_FORMAT, peer);
gst_element_release_request_pad (sctpenc, peer);
gst_object_unref (sctpenc);
}
if (ret != GST_FLOW_OK) {
g_set_error (error, GST_WEBRTC_ERROR,
GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
+ GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
+ gst_flow_get_name (ret));
return ret;
}
} else {
g_set_error (error, GST_WEBRTC_ERROR,
GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
+ GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
+ gst_flow_get_name (ret));
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
channel->parent.buffered_amount -= size;
channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
(GstPadProbeCallback) on_appsrc_data, channel, NULL);
+ channel->src_bin = webrtc_error_ignore_bin_new (channel, channel->appsrc);
+
channel->appsink = gst_element_factory_make ("appsink", NULL);
gst_object_ref_sink (channel->appsink);
g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
channel, NULL);
+ channel->sink_bin = webrtc_error_ignore_bin_new (channel, channel->appsink);
+
gst_object_unref (pad);
gst_caps_unref (caps);
}
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
if (channel->sctp_transport)
g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
- GST_TRACE ("%p set sctp %p", channel, sctp);
+ GST_TRACE_OBJECT (channel, "set sctp %p", sctp);
gst_object_replace ((GstObject **) & channel->sctp_transport,
GST_OBJECT (sctp));
_data_channel_set_sctp_transport (channel, sctp_transport);
pad_name = g_strdup_printf ("sink_%u", id);
- if (!gst_element_link_pads (channel->appsrc, "src",
+ if (!gst_element_link_pads (channel->src_bin, "src",
channel->sctp_transport->sctpenc, pad_name))
g_warn_if_reached ();
g_free (pad_name);