webrtcbin: Add unit test for closing of data channels
authorJohan Sternerup <johast@axis.com>
Thu, 29 Apr 2021 14:51:27 +0000 (16:51 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 12 May 2021 03:02:27 +0000 (03:02 +0000)
Add test for verifying that the data channel "close" action signal
triggers an SCTP_RESET_STREAMS request that is propagated to the other
side and eventually leads to both sides closing properly.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/2186>

tests/check/elements/webrtcbin.c

index 282c4b0..03a6671 100644 (file)
@@ -2045,6 +2045,183 @@ GST_START_TEST (test_data_channel_create_after_negotiate)
 
 GST_END_TEST;
 
+struct test_data_channel
+{
+  GObject *dc1;
+  GObject *dc2;
+  gint n_open;
+  gint n_closed;
+  gint n_destroyed;
+};
+
+static void
+have_data_channel_mark_open (struct test_webrtc *t,
+    GstElement * element, GObject * our, gpointer user_data)
+{
+  struct test_data_channel *tdc = t->data_channel_data;
+
+  tdc->dc2 = our;
+  if (g_atomic_int_add (&tdc->n_open, 1) == 1) {
+    test_webrtc_signal_state_unlocked (t, STATE_CUSTOM);
+  }
+}
+
+static gboolean
+is_data_channel_open (GObject * channel)
+{
+  GstWebRTCDataChannelState ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED;
+
+  if (channel) {
+    g_object_get (channel, "ready-state", &ready_state, NULL);
+  }
+
+  return ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN;
+}
+
+static void
+on_data_channel_open (GObject * channel, GParamSpec * pspec,
+    struct test_webrtc *t)
+{
+  struct test_data_channel *tdc = t->data_channel_data;
+
+  if (is_data_channel_open (channel)) {
+    if (g_atomic_int_add (&tdc->n_open, 1) == 1) {
+      test_webrtc_signal_state (t, STATE_CUSTOM);
+    }
+  }
+}
+
+static void
+on_data_channel_close (GObject * channel, GParamSpec * pspec,
+    struct test_webrtc *t)
+{
+  struct test_data_channel *tdc = t->data_channel_data;
+  GstWebRTCDataChannelState ready_state;
+
+  g_object_get (channel, "ready-state", &ready_state, NULL);
+
+  if (ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
+    g_atomic_int_add (&tdc->n_closed, 1);
+  }
+}
+
+static void
+on_data_channel_destroyed (gpointer data, GObject * where_the_object_was)
+{
+  struct test_webrtc *t = data;
+  struct test_data_channel *tdc = t->data_channel_data;
+
+  if (where_the_object_was == tdc->dc1) {
+    tdc->dc1 = NULL;
+  } else if (where_the_object_was == tdc->dc2) {
+    tdc->dc2 = NULL;
+  }
+
+  if (g_atomic_int_add (&tdc->n_destroyed, 1) == 1) {
+    test_webrtc_signal_state (t, STATE_CUSTOM);
+  }
+}
+
+GST_START_TEST (test_data_channel_close)
+{
+#define NUM_CHANNELS 3
+  struct test_webrtc *t = test_webrtc_new ();
+  struct test_data_channel tdc = { NULL, NULL, 0, 0, 0 };
+  guint channel_id[NUM_CHANNELS] = { 0, 1, 2 };
+  gulong sigid = 0;
+  int i;
+  VAL_SDP_INIT (media_count, _count_num_sdp_media, GUINT_TO_POINTER (1), NULL);
+  VAL_SDP_INIT (offer, on_sdp_has_datachannel, NULL, &media_count);
+
+  t->on_negotiation_needed = NULL;
+  t->on_ice_candidate = NULL;
+  t->on_data_channel = have_data_channel_mark_open;
+  t->data_channel_data = &tdc;
+
+  fail_if (gst_element_set_state (t->webrtc1,
+          GST_STATE_READY) == GST_STATE_CHANGE_FAILURE);
+  fail_if (gst_element_set_state (t->webrtc2,
+          GST_STATE_READY) == GST_STATE_CHANGE_FAILURE);
+
+  /* open and close NUM_CHANNELS data channels to verify that we can reuse the
+   * stream id of a previously closed data channel and that we have the same
+   * behaviour no matter if we create the channel in READY or PLAYING state */
+  for (i = 0; i < NUM_CHANNELS; i++) {
+    tdc.n_open = 0;
+    tdc.n_closed = 0;
+    tdc.n_destroyed = 0;
+
+    g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
+        &tdc.dc1);
+    g_assert_nonnull (tdc.dc1);
+    g_object_unref (tdc.dc1);   /* webrtcbin should still hold a ref */
+    g_object_weak_ref (tdc.dc1, on_data_channel_destroyed, t);
+    g_signal_connect (tdc.dc1, "on-error",
+        G_CALLBACK (on_channel_error_not_reached), NULL);
+    sigid = g_signal_connect (tdc.dc1, "notify::ready-state",
+        G_CALLBACK (on_data_channel_open), t);
+
+    if (i == 0) {
+      fail_if (gst_element_set_state (t->webrtc1,
+              GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);
+      fail_if (gst_element_set_state (t->webrtc2,
+              GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);
+
+      test_validate_sdp_full (t, &offer, &offer, 1 << STATE_CUSTOM, FALSE);
+    } else {
+      /* FIXME: Creating a data channel may result in "on-open" being sent
+       * before we even had a chance to register the signal. For this test we
+       * want to make sure that the channel is actually open before we try to
+       * close it. So if we didn't receive the signal we fall back to a 1s
+       * timeout where we explicitly check if both channels are open. */
+      gint64 timeout = g_get_monotonic_time () + 1 * G_TIME_SPAN_SECOND;
+      g_mutex_lock (&t->lock);
+      while (((1 << t->state) & STATE_CUSTOM) == 0) {
+        if (!g_cond_wait_until (&t->cond, &t->lock, timeout)) {
+          g_assert (is_data_channel_open (tdc.dc1)
+              && is_data_channel_open (tdc.dc2));
+          break;
+        }
+      }
+      g_mutex_unlock (&t->lock);
+    }
+
+    g_object_get (tdc.dc1, "id", &channel_id[i], NULL);
+
+    g_signal_handler_disconnect (tdc.dc1, sigid);
+    g_object_weak_ref (tdc.dc2, on_data_channel_destroyed, t);
+    g_signal_connect (tdc.dc1, "notify::ready-state",
+        G_CALLBACK (on_data_channel_close), t);
+    g_signal_connect (tdc.dc2, "notify::ready-state",
+        G_CALLBACK (on_data_channel_close), t);
+    test_webrtc_signal_state (t, STATE_NEW);
+
+    /* currently we assume there is no renegotiation if the last data channel is
+     * removed but if it changes this test could be extended to verify both
+     * the behaviour of removing the last channel as well as the behaviour when
+     * there are still data channels remaining */
+    t->on_negotiation_needed = _negotiation_not_reached;
+    g_signal_emit_by_name (tdc.dc1, "close");
+
+    test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
+
+    assert_equals_int (g_atomic_int_get (&tdc.n_closed), 2);
+    assert_equals_pointer (tdc.dc1, NULL);
+    assert_equals_pointer (tdc.dc2, NULL);
+
+    test_webrtc_signal_state (t, STATE_NEW);
+  }
+
+  /* verify the same stream id has been reused for each data channel */
+  assert_equals_int (channel_id[0], channel_id[1]);
+  assert_equals_int (channel_id[0], channel_id[2]);
+
+  test_webrtc_free (t);
+#undef NUM_CHANNELS
+}
+
+GST_END_TEST;
+
 static void
 on_buffered_amount_low_emitted (GObject * channel, struct test_webrtc *t)
 {
@@ -3752,6 +3929,7 @@ webrtcbin_suite (void)
       tcase_add_test (tc, test_data_channel_transfer_string);
       tcase_add_test (tc, test_data_channel_transfer_data);
       tcase_add_test (tc, test_data_channel_create_after_negotiate);
+      tcase_add_test (tc, test_data_channel_close);
       tcase_add_test (tc, test_data_channel_low_threshold);
       tcase_add_test (tc, test_data_channel_max_message_size);
       tcase_add_test (tc, test_data_channel_pre_negotiated);