char buf[sizeof (DATA)];
GInputStream *in;
GThread *thread;
+ GCancellable *cancellable;
+ gboolean success;
} PipeIOOverlapReader;
#define TEST_PIPE_IO_OVERLAP (1024 * 4)
g_object_unref (out_client);
}
+static gpointer
+pipe_io_concurrent_writer_thread (gpointer user_data)
+{
+ GOutputStream *out = user_data;
+ GError *err = NULL;
+ gsize bytes_written;
+
+ g_output_stream_write_all (out, DATA, 1, &bytes_written, NULL, &err);
+
+ g_assert_cmpuint (bytes_written, ==, 1);
+ g_assert_no_error (err);
+
+ return NULL;
+}
+
+static gpointer
+pipe_io_concurrent_reader_thread (gpointer user_data)
+{
+ PipeIOOverlapReader *p = user_data;
+ GError *err = NULL;
+ gsize read;
+
+ memset (p->buf, 0, sizeof (p->buf));
+ p->success = g_input_stream_read_all (p->in, p->buf, 1, &read, p->cancellable, &err);
+
+ /* only one thread will succeed, the other will be cancelled */
+ if (p->success)
+ {
+ /* continue the main thread */
+ write (writer_pipe[1], "", 1);
+ g_assert_cmpuint (read, ==, 1);
+ g_assert_no_error (err);
+ }
+
+ return NULL;
+}
+
+static void
+test_pipe_io_concurrent (void)
+{
+ GOutputStream *out_server;
+ GThread *writer_server;
+ PipeIOOverlapReader rc1, rc2;
+ HANDLE server, client;
+ gchar name[256], c;
+
+ g_snprintf (name, sizeof (name),
+ "\\\\.\\pipe\\gtest-io-concurrent-%u", (guint) getpid ());
+
+ server = CreateNamedPipe (name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_READMODE_BYTE | PIPE_WAIT,
+ 1, 0, 0, 0, NULL);
+ g_assert (server != INVALID_HANDLE_VALUE);
+ g_assert (_pipe (writer_pipe, 10, _O_BINARY) == 0);
+
+ client = CreateFile (name, GENERIC_WRITE | GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
+ g_assert (client != INVALID_HANDLE_VALUE);
+
+ rc1.in = g_win32_input_stream_new (client, TRUE);
+ rc1.success = FALSE;
+ rc1.cancellable = g_cancellable_new ();
+ rc1.thread = g_thread_new ("reader_client", pipe_io_concurrent_reader_thread, &rc1);
+
+ rc2.in = g_win32_input_stream_new (client, TRUE);
+ rc2.success = FALSE;
+ rc2.cancellable = g_cancellable_new ();
+ rc2.thread = g_thread_new ("reader_client", pipe_io_concurrent_reader_thread, &rc2);
+
+ /* FIXME: how to synchronize on both reader thread waiting in read,
+ before starting the writer thread? */
+ g_usleep (G_USEC_PER_SEC / 10);
+
+ out_server = g_win32_output_stream_new (server, TRUE);
+ writer_server = g_thread_new ("writer_server", pipe_io_concurrent_writer_thread, out_server);
+
+ read (writer_pipe[0], &c, 1);
+
+ g_assert (rc1.success ^ rc2.success);
+
+ g_cancellable_cancel (rc1.cancellable);
+ g_cancellable_cancel (rc2.cancellable);
+
+ g_thread_join (writer_server);
+ g_thread_join (rc1.thread);
+ g_thread_join (rc2.thread);
+
+ g_object_unref (rc1.in);
+ g_object_unref (rc2.in);
+ g_object_unref (out_server);
+
+ close (writer_pipe[0]);
+ close (writer_pipe[1]);
+}
+
int
main (int argc,
char *argv[])
g_test_add_func ("/win32-streams/pipe-io-test", test_pipe_io);
g_test_add_func ("/win32-streams/pipe-io-overlap-test", test_pipe_io_overlap);
+ g_test_add_func ("/win32-streams/pipe-io-concurrent-test", test_pipe_io_concurrent);
return g_test_run();
}