webrtc/datachannel: handle error messages from appsrc/sink
authorMatthew Waters <matthew@centricular.com>
Thu, 10 Nov 2022 03:31:43 +0000 (14:31 +1100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 11 Nov 2022 10:13:27 +0000 (10:13 +0000)
Fixes a possible race where closing a data channel may produce e.g.
not-linked errors.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3381>

subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c
subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c
subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h

index dd16885..5d59ee6 100644 (file)
@@ -2407,11 +2407,11 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
     g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL],
         0, channel, FALSE);
 
-    gst_bin_add (GST_BIN (webrtc), channel->appsrc);
-    gst_bin_add (GST_BIN (webrtc), channel->appsink);
+    gst_bin_add (GST_BIN (webrtc), channel->src_bin);
+    gst_bin_add (GST_BIN (webrtc), channel->sink_bin);
 
-    gst_element_sync_state_with_parent (channel->appsrc);
-    gst_element_sync_state_with_parent (channel->appsink);
+    gst_element_sync_state_with_parent (channel->src_bin);
+    gst_element_sync_state_with_parent (channel->sink_bin);
 
     webrtc_data_channel_link_to_sctp (channel, webrtc->priv->sctp_transport);
 
@@ -2422,7 +2422,7 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
   g_signal_connect (channel, "notify::ready-state",
       G_CALLBACK (_on_data_channel_ready_state), webrtc);
 
-  sink_pad = gst_element_get_static_pad (channel->appsink, "sink");
+  sink_pad = gst_element_get_static_pad (channel->sink_bin, "sink");
   if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK)
     GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %"
         GST_PTR_FORMAT, GST_PAD_NAME (pad), channel);
@@ -6968,11 +6968,11 @@ gst_webrtc_bin_create_data_channel (GstWebRTCBin * webrtc, const gchar * label,
   g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL], 0,
       ret, TRUE);
 
-  gst_bin_add (GST_BIN (webrtc), ret->appsrc);
-  gst_bin_add (GST_BIN (webrtc), ret->appsink);
+  gst_bin_add (GST_BIN (webrtc), ret->src_bin);
+  gst_bin_add (GST_BIN (webrtc), ret->sink_bin);
 
-  gst_element_sync_state_with_parent (ret->appsrc);
-  gst_element_sync_state_with_parent (ret->appsink);
+  gst_element_sync_state_with_parent (ret->src_bin);
+  gst_element_sync_state_with_parent (ret->sink_bin);
 
   ret = gst_object_ref (ret);
   ret->webrtcbin = webrtc;
index bb2b023..0260c61 100644 (file)
 #define GST_CAT_DEFAULT webrtc_data_channel_debug
 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
 
+static void _close_procedure (WebRTCDataChannel * channel, gpointer user_data);
+
+typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
+    gpointer user_data);
+
+struct task
+{
+  GstWebRTCDataChannel *channel;
+  ChannelTask func;
+  gpointer user_data;
+  GDestroyNotify notify;
+};
+
+static GstStructure *
+_execute_task (GstWebRTCBin * webrtc, struct task *task)
+{
+  if (task->func)
+    task->func (task->channel, task->user_data);
+
+  return NULL;
+}
+
+static void
+_free_task (struct task *task)
+{
+  gst_object_unref (task->channel);
+
+  if (task->notify)
+    task->notify (task->user_data);
+  g_free (task);
+}
+
+static void
+_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
+    gpointer user_data, GDestroyNotify notify)
+{
+  struct task *task = g_new0 (struct task, 1);
+
+  task->channel = gst_object_ref (channel);
+  task->func = func;
+  task->user_data = user_data;
+  task->notify = notify;
+
+  gst_webrtc_bin_enqueue_task (channel->webrtcbin,
+      (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
+      NULL);
+}
+
+static void
+_channel_store_error (WebRTCDataChannel * channel, GError * error)
+{
+  GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
+  if (error) {
+    GST_WARNING_OBJECT (channel, "Error: %s",
+        error ? error->message : "Unknown");
+    if (!channel->stored_error)
+      channel->stored_error = error;
+    else
+      g_clear_error (&error);
+  }
+  GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
+}
+
+struct _WebRTCErrorIgnoreBin
+{
+  GstBin bin;
+
+  WebRTCDataChannel *data_channel;
+};
+
+G_DEFINE_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, GST_TYPE_BIN);
+
+static void
+webrtc_error_ignore_bin_handle_message (GstBin * bin, GstMessage * message)
+{
+  WebRTCErrorIgnoreBin *self = WEBRTC_ERROR_IGNORE_BIN (bin);
+
+  switch (GST_MESSAGE_TYPE (message)) {
+    case GST_MESSAGE_ERROR:{
+      GError *error = NULL;
+      gst_message_parse_error (message, &error, NULL);
+      GST_DEBUG_OBJECT (bin, "handling error message from internal element");
+      _channel_store_error (self->data_channel, error);
+      _channel_enqueue_task (self->data_channel, (ChannelTask) _close_procedure,
+          NULL, NULL);
+      break;
+    }
+    default:
+      GST_BIN_CLASS (webrtc_error_ignore_bin_parent_class)->handle_message (bin,
+          message);
+      break;
+  }
+}
+
+static void
+webrtc_error_ignore_bin_class_init (WebRTCErrorIgnoreBinClass * klass)
+{
+  GstBinClass *bin_class = (GstBinClass *) klass;
+
+  bin_class->handle_message = webrtc_error_ignore_bin_handle_message;
+}
+
+static void
+webrtc_error_ignore_bin_init (WebRTCErrorIgnoreBin * bin)
+{
+}
+
+static GstElement *
+webrtc_error_ignore_bin_new (WebRTCDataChannel * data_channel,
+    GstElement * other)
+{
+  WebRTCErrorIgnoreBin *self;
+  GstPad *pad;
+
+  self = g_object_new (webrtc_error_ignore_bin_get_type (), NULL);
+  self->data_channel = data_channel;
+
+  gst_bin_add (GST_BIN (self), other);
+
+  pad = gst_element_get_static_pad (other, "src");
+  if (pad) {
+    GstPad *ghost_pad = gst_ghost_pad_new ("src", pad);
+    gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
+    gst_clear_object (&pad);
+  }
+  pad = gst_element_get_static_pad (other, "sink");
+  if (pad) {
+    GstPad *ghost_pad = gst_ghost_pad_new ("sink", pad);
+    gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
+    gst_clear_object (&pad);
+  }
+
+  return (GstElement *) self;
+}
+
 #define webrtc_data_channel_parent_class parent_class
 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
     GST_TYPE_WEBRTC_DATA_CHANNEL,
@@ -213,67 +348,6 @@ construct_ack_packet (WebRTCDataChannel * channel)
   return buf;
 }
 
-typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
-    gpointer user_data);
-
-struct task
-{
-  GstWebRTCDataChannel *channel;
-  ChannelTask func;
-  gpointer user_data;
-  GDestroyNotify notify;
-};
-
-static GstStructure *
-_execute_task (GstWebRTCBin * webrtc, struct task *task)
-{
-  if (task->func)
-    task->func (task->channel, task->user_data);
-
-  return NULL;
-}
-
-static void
-_free_task (struct task *task)
-{
-  gst_object_unref (task->channel);
-
-  if (task->notify)
-    task->notify (task->user_data);
-  g_free (task);
-}
-
-static void
-_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
-    gpointer user_data, GDestroyNotify notify)
-{
-  struct task *task = g_new0 (struct task, 1);
-
-  task->channel = gst_object_ref (channel);
-  task->func = func;
-  task->user_data = user_data;
-  task->notify = notify;
-
-  gst_webrtc_bin_enqueue_task (channel->webrtcbin,
-      (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
-      NULL);
-}
-
-static void
-_channel_store_error (WebRTCDataChannel * channel, GError * error)
-{
-  GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
-  if (error) {
-    GST_WARNING_OBJECT (channel, "Error: %s",
-        error ? error->message : "Unknown");
-    if (!channel->stored_error)
-      channel->stored_error = error;
-    else
-      g_clear_error (&error);
-  }
-  GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
-}
-
 static void
 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
 {
@@ -290,6 +364,10 @@ _transport_closed (WebRTCDataChannel * channel)
   error = channel->stored_error;
   channel->stored_error = NULL;
 
+  GST_TRACE_OBJECT (channel, "transport closed, peer closed %u error %p "
+      "buffered %" G_GUINT64_FORMAT, channel->peer_closed, error,
+      channel->parent.buffered_amount);
+
   both_sides_closed =
       channel->peer_closed && channel->parent.buffered_amount <= 0;
   if (both_sides_closed || error) {
@@ -314,7 +392,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
   GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
       channel->parent.id, channel->parent.label);
 
-  pad = gst_element_get_static_pad (channel->appsrc, "src");
+  pad = gst_element_get_static_pad (channel->src_bin, "src");
   peer = gst_pad_get_peer (pad);
   gst_object_unref (pad);
 
@@ -322,6 +400,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
     GstElement *sctpenc = gst_pad_get_parent_element (peer);
 
     if (sctpenc) {
+      GST_TRACE_OBJECT (channel, "removing sctpenc pad %" GST_PTR_FORMAT, peer);
       gst_element_release_request_pad (sctpenc, peer);
       gst_object_unref (sctpenc);
     }
@@ -484,6 +563,8 @@ _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
     if (ret != GST_FLOW_OK) {
       g_set_error (error, GST_WEBRTC_ERROR,
           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
+      GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
+          gst_flow_get_name (ret));
       return ret;
     }
 
@@ -800,6 +881,8 @@ webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
   } else {
     g_set_error (error, GST_WEBRTC_ERROR,
         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
+    GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
+        gst_flow_get_name (ret));
 
     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
     channel->parent.buffered_amount -= size;
@@ -1001,6 +1084,8 @@ gst_webrtc_data_channel_constructed (GObject * object)
   channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
       (GstPadProbeCallback) on_appsrc_data, channel, NULL);
 
+  channel->src_bin = webrtc_error_ignore_bin_new (channel, channel->appsrc);
+
   channel->appsink = gst_element_factory_make ("appsink", NULL);
   gst_object_ref_sink (channel->appsink);
   g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
@@ -1008,6 +1093,8 @@ gst_webrtc_data_channel_constructed (GObject * object)
   gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
       channel, NULL);
 
+  channel->sink_bin = webrtc_error_ignore_bin_new (channel, channel->appsink);
+
   gst_object_unref (pad);
   gst_caps_unref (caps);
 }
@@ -1078,7 +1165,7 @@ _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
   if (channel->sctp_transport)
     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
-  GST_TRACE ("%p set sctp %p", channel, sctp);
+  GST_TRACE_OBJECT (channel, "set sctp %p", sctp);
 
   gst_object_replace ((GstObject **) & channel->sctp_transport,
       GST_OBJECT (sctp));
@@ -1106,7 +1193,7 @@ webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
 
       _data_channel_set_sctp_transport (channel, sctp_transport);
       pad_name = g_strdup_printf ("sink_%u", id);
-      if (!gst_element_link_pads (channel->appsrc, "src",
+      if (!gst_element_link_pads (channel->src_bin, "src",
               channel->sctp_transport->sctpenc, pad_name))
         g_warn_if_reached ();
       g_free (pad_name);
index a0b38a7..dd65a66 100644 (file)
@@ -46,7 +46,9 @@ struct _WebRTCDataChannel
   GstWebRTCDataChannel              parent;
 
   WebRTCSCTPTransport              *sctp_transport;
+  GstElement                       *src_bin;
   GstElement                       *appsrc;
+  GstElement                       *sink_bin;
   GstElement                       *appsink;
 
   GstWebRTCBin                     *webrtcbin;
@@ -70,6 +72,8 @@ G_GNUC_INTERNAL
 void    webrtc_data_channel_link_to_sctp (WebRTCDataChannel                 *channel,
                                           WebRTCSCTPTransport               *sctp_transport);
 
+G_DECLARE_FINAL_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, WEBRTC, ERROR_IGNORE_BIN, GstBin);
+
 G_END_DECLS
 
 #endif /* __WEBRTC_DATA_CHANNEL_H__ */