tests: Split racy "closing" into a test for stopping while reading or writing
authorOlivier Crête <olivier.crete@collabora.com>
Tue, 25 Feb 2014 04:05:44 +0000 (23:05 -0500)
committerOlivier Crête <olivier.crete@collabora.com>
Tue, 25 Feb 2014 05:51:31 +0000 (00:51 -0500)
We wan't to make sure not to have a race where a thread blocks in a blocking
receive or send while nice_agent_remove_stream() is called

.gitignore
tests/Makefile.am
tests/test-io-stream-closing-read.c [moved from tests/test-io-stream-closing.c with 51% similarity]
tests/test-io-stream-closing-write.c [new file with mode: 0644]

index d39e398..3939076 100644 (file)
@@ -136,7 +136,8 @@ tests/test-dribble
 tests/test-fallback
 tests/test-fullmode
 tests/test-io-stream-cancelling
-tests/test-io-stream-closing
+tests/test-io-stream-closing-read
+tests/test-io-stream-closing-write
 tests/test-io-stream-thread
 tests/test-io-stream-pollable
 tests/test-send-recv
index 051eab0..f82983e 100644 (file)
@@ -28,7 +28,8 @@ check_PROGRAMS = \
        test-add-remove-stream \
        test-build-io-stream \
        test-io-stream-thread \
-       test-io-stream-closing \
+       test-io-stream-closing-write \
+       test-io-stream-closing-read \
        test-io-stream-cancelling \
        test-io-stream-pollable \
        test-send-recv \
@@ -64,8 +65,11 @@ test_build_io_stream_LDADD = $(COMMON_LDADD)
 test_io_stream_thread_SOURCES = test-io-stream-thread.c test-io-stream-common.c
 test_io_stream_thread_LDADD = $(COMMON_LDADD)
 
-test_io_stream_closing_SOURCES = test-io-stream-closing.c test-io-stream-common.c
-test_io_stream_closing_LDADD = $(COMMON_LDADD)
+test_io_stream_closing_write_SOURCES = test-io-stream-closing-write.c test-io-stream-common.c
+test_io_stream_closing_write_LDADD = $(COMMON_LDADD)
+
+test_io_stream_closing_read_SOURCES = test-io-stream-closing-read.c test-io-stream-common.c
+test_io_stream_closing_read_LDADD = $(COMMON_LDADD)
 
 test_io_stream_cancelling_SOURCES = test-io-stream-cancelling.c test-io-stream-common.c
 test_io_stream_cancelling_LDADD = $(COMMON_LDADD)
similarity index 51%
rename from tests/test-io-stream-closing.c
rename to tests/test-io-stream-closing-read.c
index 1803a72..ec434dd 100644 (file)
 #include <unistd.h>
 #endif
 
-typedef struct {
-  gint recv_count;
-  gint *other_recv_count;
-} ClosingData;
-
 #define NUM_MESSAGES 10
 
+guint count = 0;
+GMutex count_lock;
+GCond count_cond;
+
 static void
 read_thread_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
 {
-  ClosingData *user_data = data->user_data;
-  GOutputStream *output_stream;
-  gpointer tmp;
-  guint stream_id;
   GError *error = NULL;
+  gssize len;
+  guint8 buf[MESSAGE_SIZE];
 
-  while (user_data->recv_count < NUM_MESSAGES) {
-    gchar expected_data[MESSAGE_SIZE];
-    guint8 buf[MESSAGE_SIZE];
-    gssize len;
-    gsize offset;
 
-    /* Block on receiving some data. */
-    len = g_input_stream_read (input_stream, buf, sizeof (buf), NULL, &error);
-    g_assert_no_error (error);
+  g_mutex_lock (&count_lock);
+  count++;
+  g_cond_broadcast (&count_cond);
+  g_mutex_unlock (&count_lock);
 
-    offset = 0;
-    while (len > 0) {
-      g_assert (len == MESSAGE_SIZE);
-      g_assert (user_data->recv_count < NUM_MESSAGES);
-
-      memset (expected_data, user_data->recv_count + '1', MESSAGE_SIZE);
-      g_assert (
-          memcmp (buf + offset, expected_data, sizeof (expected_data)) == 0);
-
-      user_data->recv_count++;
-
-      len -= MESSAGE_SIZE;
-      offset += MESSAGE_SIZE;
+  /* Block on receiving some data. */
+  do {
+    len = g_input_stream_read (input_stream, buf, sizeof (buf), NULL, &error);
+    if (!data->user_data) {
+      g_assert_cmpint (len, ==, sizeof(buf));
+      return;
     }
+  } while (len > 0);
+  g_assert_cmpint (len, ==, -1);
 
-    g_assert (len == 0);
-  }
-
-  /* Signal completion. */
-  output_stream = g_io_stream_get_output_stream (data->io_stream);
-  g_output_stream_write (output_stream, "Done", strlen ("Done"), NULL, &error);
-  g_assert_no_error (error);
+  g_assert_error (error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE);
+  g_clear_error (&error);
 
-  /* Wait for a done packet. */
-  while (TRUE) {
-    guint8 buf[4];
-    gssize len;
+  stop_main_loop (data->error_loop);
+}
 
-    len = g_input_stream_read (input_stream, buf, sizeof (buf), NULL, &error);
-    g_assert_no_error (error);
+static void
+write_thread_cb (GOutputStream *output_stream, TestIOStreamThreadData *data)
+{
+  gchar buf[MESSAGE_SIZE] = {0};
+  gssize ret;
+  GError *error = NULL;
+  gpointer tmp;
+  guint stream_id;
 
-    g_assert (len == 4);
-    g_assert (memcmp (buf, "Done", strlen ("Done")) == 0);
+  ret = g_output_stream_write (output_stream, buf, sizeof (buf), NULL,
+      &error);
 
-    break;
+  g_mutex_lock (&count_lock);
+  count++;
+  g_cond_broadcast (&count_cond);
+  if (data->user_data) {
+    g_assert_cmpint (ret, ==, sizeof(buf));
+    g_mutex_unlock (&count_lock);
+    return;
   }
 
-  user_data->recv_count = -1;
-
-  tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
-  stream_id = GPOINTER_TO_UINT (tmp);
+  while (count != 4)
+    g_cond_wait (&count_cond, &count_lock);
+  g_mutex_unlock (&count_lock);
 
-  nice_agent_remove_stream (data->agent, stream_id);
 
-  /* Have both threads finished? */
-  if (user_data->recv_count == -1 &&
-      *user_data->other_recv_count == -1) {
-    g_main_loop_quit (data->error_loop);
-  }
-}
-
-static void
-write_thread_cb (GOutputStream *output_stream, TestIOStreamThreadData *data)
-{
-  gchar buf[MESSAGE_SIZE];
-  guint i;
+  /* Now we remove the stream, lets see how the writer handles that */
 
-  for (i = 0; i < NUM_MESSAGES; i++) {
-    GError *error = NULL;
-
-    memset (buf, i + '1', MESSAGE_SIZE);
+  tmp = g_object_get_data (G_OBJECT (data->other->agent), "stream-id");
+  stream_id = GPOINTER_TO_UINT (tmp);
 
-    g_output_stream_write (output_stream, buf, sizeof (buf), NULL, &error);
-    g_assert_no_error (error);
-  }
+  nice_agent_remove_stream (data->other->agent, stream_id);
 }
 
 int main (void)
 {
-  ClosingData *l_data, *r_data;
-
   const TestIOStreamCallbacks callbacks = {
     read_thread_cb,
     write_thread_cb,
@@ -156,19 +130,7 @@ int main (void)
   g_type_init ();
   g_thread_init (NULL);
 
-  l_data = g_malloc0 (sizeof (ClosingData));
-  r_data = g_malloc0 (sizeof (ClosingData));
-
-  l_data->recv_count = 0;
-  l_data->other_recv_count = &r_data->recv_count;
-
-  r_data->recv_count = 0;
-  r_data->other_recv_count = &l_data->recv_count;
-
-  run_io_stream_test (30, TRUE, &callbacks, l_data, NULL, r_data, NULL);
-
-  g_free (r_data);
-  g_free (l_data);
+  run_io_stream_test (30, TRUE, &callbacks, (gpointer) TRUE, NULL, NULL, NULL);
 
 #ifdef G_OS_WIN32
   WSACleanup ();
diff --git a/tests/test-io-stream-closing-write.c b/tests/test-io-stream-closing-write.c
new file mode 100644 (file)
index 0000000..97e8347
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ * This file is part of the Nice GLib ICE library.
+ *
+ * (C) 2014 Collabora Ltd.
+ *  Contact: Philip Withnall
+ *
+ * The contents of this file are subject to the Mozilla Public License Version
+ * 1.1 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS" basis,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+ * for the specific language governing rights and limitations under the
+ * License.
+ *
+ * The Original Code is the Nice GLib ICE library.
+ *
+ * The Initial Developers of the Original Code are Collabora Ltd and Nokia
+ * Corporation. All Rights Reserved.
+ *
+ * Contributors:
+ *   Philip Withnall, Collabora Ltd.
+ *
+ * Alternatively, the contents of this file may be used under the terms of the
+ * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
+ * case the provisions of LGPL are applicable instead of those above. If you
+ * wish to allow use of your version of this file only under the terms of the
+ * LGPL and not to allow others to use your version of this file under the
+ * MPL, indicate your decision by deleting the provisions above and replace
+ * them with the notice and other provisions required by the LGPL. If you do
+ * not delete the provisions above, a recipient may use your version of this
+ * file under either the MPL or the LGPL.
+ */
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include "agent.h"
+#include "test-io-stream-common.h"
+
+#include <stdlib.h>
+#include <string.h>
+#ifndef G_OS_WIN32
+#include <unistd.h>
+#endif
+
+#define NUM_MESSAGES 10
+
+guint count = 0;
+GMutex count_lock;
+GCond count_cond;
+
+static void
+read_thread_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
+{
+  gpointer tmp;
+  guint stream_id;
+  GError *error = NULL;
+  gssize len;
+  guint8 buf[MESSAGE_SIZE];
+
+  /* Block on receiving some data. */
+  len = g_input_stream_read (input_stream, buf, sizeof (buf), NULL, &error);
+  g_assert_cmpint (len, ==, sizeof(buf));
+
+  g_mutex_lock (&count_lock);
+  count++;
+  g_cond_broadcast (&count_cond);
+  if (data->user_data) {
+    g_mutex_unlock (&count_lock);
+    return;
+  }
+
+  while (count != 4)
+    g_cond_wait (&count_cond, &count_lock);
+  g_mutex_unlock (&count_lock);
+
+  /* Now we remove the stream, lets see how the writer handles that */
+
+  tmp = g_object_get_data (G_OBJECT (data->other->agent), "stream-id");
+  stream_id = GPOINTER_TO_UINT (tmp);
+
+  nice_agent_remove_stream (data->other->agent, stream_id);
+}
+
+static void
+write_thread_cb (GOutputStream *output_stream, TestIOStreamThreadData *data)
+{
+  gchar buf[MESSAGE_SIZE] = {0};
+  gssize ret;
+  GError *error = NULL;
+
+  g_mutex_lock (&count_lock);
+  count++;
+  g_cond_broadcast (&count_cond);
+  g_mutex_unlock (&count_lock);
+
+  do {
+    g_assert_no_error (error);
+    ret = g_output_stream_write (output_stream, buf, sizeof (buf), NULL,
+        &error);
+
+    if (!data->user_data) {
+      g_assert_cmpint (ret, ==, sizeof (buf));
+      return;
+    }
+  } while (ret > 0);
+  g_assert_cmpint (ret, ==, -1);
+
+  g_assert_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED);
+  g_clear_error (&error);
+
+  stop_main_loop (data->error_loop);
+}
+
+int main (void)
+{
+  const TestIOStreamCallbacks callbacks = {
+    read_thread_cb,
+    write_thread_cb,
+    NULL,
+    NULL,
+  };
+
+#ifdef G_OS_WIN32
+  WSADATA w;
+  WSAStartup (0x0202, &w);
+#endif
+  g_type_init ();
+  g_thread_init (NULL);
+
+  run_io_stream_test (30, TRUE, &callbacks, (gpointer) TRUE, NULL, NULL, NULL);
+
+#ifdef G_OS_WIN32
+  WSACleanup ();
+#endif
+
+  return 0;
+}