webrtc: do not tear down data channel before data is flushed
authorDaniel Moberg <daniemob@axis.com>
Tue, 18 Apr 2023 09:21:05 +0000 (11:21 +0200)
committerTim-Philipp Müller <tim@centricular.com>
Mon, 15 May 2023 07:09:36 +0000 (08:09 +0100)
Current implementation can in some cases detect
that all data is sent but in reality it is not,
leading to a push to an unlinked pad.
This is a race between the probe used to track data sent and a
call to close.

This patch sends an EOS before starting the close procedure
and then waits for the EOS event to come through to the
src pad before commencing with tear down.
This ensures that any queued data before EOS is flushed.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4633>

subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c

index 85e8d23..1305c74 100644 (file)
@@ -437,11 +437,15 @@ _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
     g_object_notify (G_OBJECT (channel), "ready-state");
 
-    GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
-    if (channel->parent.buffered_amount <= 0) {
-      _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
-          NULL, NULL);
+    /* Make sure that all data enqueued gets properly sent before data channel is closed. */
+    GstFlowReturn ret =
+        gst_app_src_end_of_stream (GST_APP_SRC (WEBRTC_DATA_CHANNEL
+            (channel)->appsrc));
+    if (ret != GST_FLOW_OK) {
+      GST_WARNING_OBJECT (channel, "Send end of stream returned %i, %s", ret,
+          gst_flow_get_name (ret));
     }
+    return;
   }
 
   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
@@ -1045,6 +1049,16 @@ on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
   } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
     size = gst_buffer_list_calculate_size (list);
+  } else if (GST_PAD_PROBE_INFO_TYPE (info) &
+      GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
+    GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
+    if (GST_EVENT_TYPE (event) == GST_EVENT_EOS
+        && channel->parent.ready_state ==
+        GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
+      _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
+          NULL);
+      return GST_PAD_PROBE_DROP;
+    }
   }
 
   if (size > 0) {
@@ -1063,11 +1077,6 @@ on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
           NULL);
     }
 
-    if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
-        && channel->parent.buffered_amount <= 0) {
-      _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
-          NULL);
-    }
     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
     g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
   }