win32: add pipe-io-cancel-test
[platform/upstream/glib.git] / gio / tests / win32-streams.c
1 /* GLib testing framework examples and tests
2  * Copyright (C) 2008 Red Hat, Inc
3  *
4  * This work is provided "as is"; redistribution and modification
5  * in whole or in part, in any medium, physical or electronic is
6  * permitted without restriction.
7  *
8  * This work is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  *
12  * In no event shall the authors or contributors be liable for any
13  * direct, indirect, incidental, special, exemplary, or consequential
14  * damages (including, but not limited to, procurement of substitute
15  * goods or services; loss of use, data, or profits; or business
16  * interruption) however caused and on any theory of liability, whether
17  * in contract, strict liability, or tort (including negligence or
18  * otherwise) arising in any way out of the use of this software, even
19  * if advised of the possibility of such damage.
20  */
21
22 #include <glib/glib.h>
23 #include <gio/gio.h>
24 #include <gio/gwin32inputstream.h>
25 #include <gio/gwin32outputstream.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <io.h>
30 #include <unistd.h>
31
32 #include <windows.h>
33
34 #define DATA "abcdefghijklmnopqrstuvwxyz"
35
36 int writer_pipe[2], reader_pipe[2];
37 GCancellable *writer_cancel, *reader_cancel, *main_cancel;
38 GMainLoop *loop;
39
40 static gpointer
41 writer_thread (gpointer user_data)
42 {
43   GOutputStream *out;
44   gssize nwrote, offset;
45   GError *err = NULL;
46   HANDLE out_handle;
47
48   g_assert (DuplicateHandle (GetCurrentProcess (),
49                              (HANDLE) (gintptr) _get_osfhandle (writer_pipe[1]),
50                              GetCurrentProcess (),
51                              &out_handle,
52                              0, FALSE,
53                              DUPLICATE_SAME_ACCESS));
54   close (writer_pipe[1]);
55
56   out = g_win32_output_stream_new (out_handle, TRUE);
57   do
58     {
59       g_usleep (10);
60
61       offset = 0;
62       while (offset < (gssize) sizeof (DATA))
63         {
64           nwrote = g_output_stream_write (out, DATA + offset,
65                                           sizeof (DATA) - offset,
66                                           writer_cancel, &err);
67           if (nwrote <= 0 || err != NULL)
68             break;
69           offset += nwrote;
70         }
71
72       g_assert (nwrote > 0 || err != NULL);
73     }
74   while (err == NULL);
75
76   if (g_cancellable_is_cancelled (writer_cancel))
77     {
78       g_cancellable_cancel (main_cancel);
79       g_object_unref (out);
80       return NULL;
81     }
82
83   g_warning ("writer: %s", err->message);
84   g_assert_not_reached ();
85 }
86
87 static gpointer
88 reader_thread (gpointer user_data)
89 {
90   GInputStream *in;
91   gssize nread = 0, total;
92   GError *err = NULL;
93   char buf[sizeof (DATA)];
94   HANDLE in_handle;
95
96   g_assert (DuplicateHandle (GetCurrentProcess (),
97                              (HANDLE) (gintptr) _get_osfhandle (reader_pipe[0]),
98                              GetCurrentProcess (),
99                              &in_handle,
100                              0, FALSE,
101                              DUPLICATE_SAME_ACCESS));
102   close (reader_pipe[0]);
103
104   in = g_win32_input_stream_new (in_handle, TRUE);
105
106   do
107     {
108       total = 0;
109       while (total < (gssize) sizeof (DATA))
110         {
111           nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
112                                        reader_cancel, &err);
113           if (nread <= 0 || err != NULL)
114             break;
115           total += nread;
116         }
117
118       if (err)
119         break;
120
121       if (nread == 0)
122         {
123           g_assert (err == NULL);
124           /* pipe closed */
125           g_object_unref (in);
126           return NULL;
127         }
128
129       g_assert_cmpstr (buf, ==, DATA);
130       g_assert (!g_cancellable_is_cancelled (reader_cancel));
131     }
132   while (err == NULL);
133
134   g_warning ("reader: %s", err->message);
135   g_assert_not_reached ();
136 }
137
138 char main_buf[sizeof (DATA)];
139 gssize main_len, main_offset;
140
141 static void readable (GObject *source, GAsyncResult *res, gpointer user_data);
142 static void writable (GObject *source, GAsyncResult *res, gpointer user_data);
143
144 static void
145 do_main_cancel (GOutputStream *out)
146 {
147   g_output_stream_close (out, NULL, NULL);
148   g_main_loop_quit (loop);
149 }
150
151 static void
152 readable (GObject *source, GAsyncResult *res, gpointer user_data)
153 {
154   GInputStream *in = G_INPUT_STREAM (source);
155   GOutputStream *out = user_data;
156   GError *err = NULL;
157
158   main_len = g_input_stream_read_finish (in, res, &err);
159
160   if (g_cancellable_is_cancelled (main_cancel))
161     {
162       do_main_cancel (out);
163       return;
164     }
165
166   g_assert (err == NULL);
167
168   main_offset = 0;
169   g_output_stream_write_async (out, main_buf, main_len,
170                                G_PRIORITY_DEFAULT, main_cancel,
171                                writable, in);
172 }
173
174 static void
175 writable (GObject *source, GAsyncResult *res, gpointer user_data)
176 {
177   GOutputStream *out = G_OUTPUT_STREAM (source);
178   GInputStream *in = user_data;
179   GError *err = NULL;
180   gssize nwrote;
181
182   nwrote = g_output_stream_write_finish (out, res, &err);
183
184   if (g_cancellable_is_cancelled (main_cancel))
185     {
186       do_main_cancel (out);
187       return;
188     }
189
190   g_assert (err == NULL);
191   g_assert_cmpint (nwrote, <=, main_len - main_offset);
192
193   main_offset += nwrote;
194   if (main_offset == main_len)
195     {
196       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
197                                  G_PRIORITY_DEFAULT, main_cancel,
198                                  readable, out);
199     }
200   else
201     {
202       g_output_stream_write_async (out, main_buf + main_offset,
203                                    main_len - main_offset,
204                                    G_PRIORITY_DEFAULT, main_cancel,
205                                    writable, in);
206     }
207 }
208
209 static gboolean
210 timeout (gpointer cancellable)
211 {
212   g_cancellable_cancel (cancellable);
213   return FALSE;
214 }
215
216 static void
217 test_pipe_io (void)
218 {
219   GThread *writer, *reader;
220   GInputStream *in;
221   GOutputStream *out;
222   HANDLE in_handle, out_handle;
223
224   /* Split off two (additional) threads, a reader and a writer. From
225    * the writer thread, write data synchronously in small chunks,
226    * which gets read asynchronously by the main thread and then
227    * written asynchronously to the reader thread, which reads it
228    * synchronously. Eventually a timeout in the main thread will cause
229    * it to cancel the writer thread, which will in turn cancel the
230    * read op in the main thread, which will then close the pipe to
231    * the reader thread, causing the read op to fail.
232    */
233
234   g_assert (_pipe (writer_pipe, 10, _O_BINARY) == 0 && _pipe (reader_pipe, 10, _O_BINARY) == 0);
235
236   writer_cancel = g_cancellable_new ();
237   reader_cancel = g_cancellable_new ();
238   main_cancel = g_cancellable_new ();
239
240   writer = g_thread_new ("writer", writer_thread, NULL);
241   reader = g_thread_new ("reader", reader_thread, NULL);
242
243   g_assert (DuplicateHandle (GetCurrentProcess (),
244                              (HANDLE) (gintptr) _get_osfhandle (writer_pipe[0]),
245                              GetCurrentProcess (),
246                              &in_handle,
247                              0, FALSE,
248                              DUPLICATE_SAME_ACCESS));
249   close (writer_pipe[0]);
250
251   g_assert (DuplicateHandle (GetCurrentProcess (),
252                              (HANDLE) (gintptr) _get_osfhandle (reader_pipe[1]),
253                              GetCurrentProcess (),
254                              &out_handle,
255                              0, FALSE,
256                              DUPLICATE_SAME_ACCESS));
257   close (reader_pipe[1]);
258
259   in = g_win32_input_stream_new (in_handle, TRUE);
260   out = g_win32_output_stream_new (out_handle, TRUE);
261
262   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
263                              G_PRIORITY_DEFAULT, main_cancel,
264                              readable, out);
265
266   g_timeout_add (500, timeout, writer_cancel);
267
268   loop = g_main_loop_new (NULL, TRUE);
269   g_main_loop_run (loop);
270   g_main_loop_unref (loop);
271
272   g_thread_join (reader);
273   g_thread_join (writer);
274
275   g_object_unref (main_cancel);
276   g_object_unref (reader_cancel);
277   g_object_unref (writer_cancel);
278   g_object_unref (in);
279   g_object_unref (out);
280 }
281
282 typedef struct _PipeIOOverlapReader
283 {
284   char buf[sizeof (DATA)];
285   GInputStream *in;
286   GThread *thread;
287   GCancellable *cancellable;
288   gboolean success;
289 } PipeIOOverlapReader;
290
291 #define TEST_PIPE_IO_OVERLAP (1024 * 4)
292
293 static gpointer
294 pipe_io_overlap_reader_thread (gpointer user_data)
295 {
296   PipeIOOverlapReader *p = user_data;
297   GError *err = NULL;
298   gsize read;
299   guint i;
300
301   for (i = 0; i < TEST_PIPE_IO_OVERLAP; ++i) {
302     memset (p->buf, 0, sizeof (p->buf));
303     g_input_stream_read_all (p->in, p->buf, sizeof (p->buf),
304                              &read, NULL, &err);
305
306     g_assert_cmpuint (read, ==, sizeof (p->buf));
307     g_assert_no_error (err);
308     g_assert_cmpstr (p->buf, ==, DATA);
309   }
310
311   return NULL;
312 }
313
314 static gpointer
315 pipe_io_overlap_writer_thread (gpointer user_data)
316 {
317   GOutputStream *out = user_data;
318   GError *err = NULL;
319   gsize bytes_written;
320   guint i;
321
322   for (i = 0; i < TEST_PIPE_IO_OVERLAP; ++i) {
323     g_output_stream_write_all (out, DATA, sizeof (DATA),
324                                &bytes_written, NULL, &err);
325
326     g_assert_cmpuint (bytes_written, ==, sizeof (DATA));
327     g_assert_no_error (err);
328   }
329
330   return NULL;
331 }
332
333 static void
334 test_pipe_io_overlap (void)
335 {
336   GOutputStream *out_server, *out_client;
337   GThread *writer_server, *writer_client;
338   PipeIOOverlapReader rs, rc;
339   HANDLE server, client;
340   gchar name[256];
341
342   g_snprintf (name, sizeof (name),
343               "\\\\.\\pipe\\gtest-io-overlap-%u", (guint) getpid ());
344
345   server = CreateNamedPipe (name,
346                             PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
347                             PIPE_READMODE_BYTE | PIPE_WAIT,
348                             1, 0, 0, 0, NULL);
349   g_assert (server != INVALID_HANDLE_VALUE);
350
351   client = CreateFile (name, GENERIC_WRITE | GENERIC_READ, 0, NULL, OPEN_EXISTING, 0, NULL);
352   g_assert (client != INVALID_HANDLE_VALUE);
353
354   out_server = g_win32_output_stream_new (server, TRUE);
355   writer_server = g_thread_new ("writer_server", pipe_io_overlap_writer_thread, out_server);
356   rs.in = g_win32_input_stream_new (server, TRUE);
357   rs.thread = g_thread_new ("reader_server", pipe_io_overlap_reader_thread, &rs);
358
359   out_client = g_win32_output_stream_new (client, TRUE);
360   writer_client = g_thread_new ("writer_client", pipe_io_overlap_writer_thread, out_client);
361   rc.in = g_win32_input_stream_new (client, TRUE);
362   rc.thread = g_thread_new ("reader_client", pipe_io_overlap_reader_thread, &rc);
363
364   g_thread_join (writer_client);
365   g_thread_join (writer_server);
366   g_thread_join (rc.thread);
367   g_thread_join (rs.thread);
368
369   g_object_unref (rs.in);
370   g_object_unref (rc.in);
371   g_object_unref (out_server);
372   g_object_unref (out_client);
373 }
374
375 static gpointer
376 pipe_io_concurrent_writer_thread (gpointer user_data)
377 {
378   GOutputStream *out = user_data;
379   GError *err = NULL;
380   gsize bytes_written;
381
382   g_output_stream_write_all (out, DATA, 1, &bytes_written, NULL, &err);
383
384   g_assert_cmpuint (bytes_written, ==, 1);
385   g_assert_no_error (err);
386
387   return NULL;
388 }
389
390 static gpointer
391 pipe_io_concurrent_reader_thread (gpointer user_data)
392 {
393   PipeIOOverlapReader *p = user_data;
394   GError *err = NULL;
395   gsize read;
396
397   memset (p->buf, 0, sizeof (p->buf));
398   p->success = g_input_stream_read_all (p->in, p->buf, 1, &read, p->cancellable, &err);
399
400   /* only one thread will succeed, the other will be cancelled */
401   if (p->success)
402     {
403       /* continue the main thread */
404       write (writer_pipe[1], "", 1);
405       g_assert_cmpuint (read, ==, 1);
406       g_assert_no_error (err);
407     }
408
409   return NULL;
410 }
411
412 static void
413 test_pipe_io_concurrent (void)
414 {
415   GOutputStream *out_server;
416   GThread *writer_server;
417   PipeIOOverlapReader rc1, rc2;
418   HANDLE server, client;
419   gchar name[256], c;
420
421   g_snprintf (name, sizeof (name),
422               "\\\\.\\pipe\\gtest-io-concurrent-%u", (guint) getpid ());
423
424   server = CreateNamedPipe (name,
425                             PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
426                             PIPE_READMODE_BYTE | PIPE_WAIT,
427                             1, 0, 0, 0, NULL);
428   g_assert (server != INVALID_HANDLE_VALUE);
429   g_assert (_pipe (writer_pipe, 10, _O_BINARY) == 0);
430
431   client = CreateFile (name, GENERIC_WRITE | GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
432   g_assert (client != INVALID_HANDLE_VALUE);
433
434   rc1.in = g_win32_input_stream_new (client, TRUE);
435   rc1.success = FALSE;
436   rc1.cancellable = g_cancellable_new ();
437   rc1.thread = g_thread_new ("reader_client", pipe_io_concurrent_reader_thread, &rc1);
438
439   rc2.in = g_win32_input_stream_new (client, TRUE);
440   rc2.success = FALSE;
441   rc2.cancellable = g_cancellable_new ();
442   rc2.thread = g_thread_new ("reader_client", pipe_io_concurrent_reader_thread, &rc2);
443
444   /* FIXME: how to synchronize on both reader thread waiting in read,
445      before starting the writer thread? */
446   g_usleep (G_USEC_PER_SEC / 10);
447
448   out_server = g_win32_output_stream_new (server, TRUE);
449   writer_server = g_thread_new ("writer_server", pipe_io_concurrent_writer_thread, out_server);
450
451   read (writer_pipe[0], &c, 1);
452
453   g_assert (rc1.success ^ rc2.success);
454
455   g_cancellable_cancel (rc1.cancellable);
456   g_cancellable_cancel (rc2.cancellable);
457
458   g_thread_join (writer_server);
459   g_thread_join (rc1.thread);
460   g_thread_join (rc2.thread);
461
462   g_object_unref (rc1.in);
463   g_object_unref (rc2.in);
464   g_object_unref (out_server);
465
466   close (writer_pipe[0]);
467   close (writer_pipe[1]);
468 }
469
470 static void
471 readable_cancel (GObject *source, GAsyncResult *res, gpointer user_data)
472 {
473   GInputStream *in = G_INPUT_STREAM (source);
474   GError *err = NULL;
475   gssize len;
476
477   len = g_input_stream_read_finish (in, res, &err);
478   g_assert_cmpint (len, ==, -1);
479   g_assert_error (err, G_IO_ERROR, G_IO_ERROR_CANCELLED);
480   g_error_free (err);
481
482   g_main_loop_quit (loop);
483 }
484
485 static void
486 test_pipe_io_cancel (void)
487 {
488   GInputStream *in;
489   GOutputStream *out;
490   HANDLE in_handle, out_handle;
491   gchar name[256];
492
493   g_snprintf (name, sizeof (name),
494               "\\\\.\\pipe\\gtest-io-cancel-%u", (guint) getpid ());
495
496   in_handle = CreateNamedPipe (name,
497                                PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
498                                PIPE_READMODE_BYTE | PIPE_WAIT,
499                                1, 0, 0, 0, NULL);
500   g_assert (in_handle != INVALID_HANDLE_VALUE);
501
502   out_handle = CreateFile (name, GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
503   g_assert (out_handle != INVALID_HANDLE_VALUE);
504
505   in = g_win32_input_stream_new (in_handle, TRUE);
506   out = g_win32_output_stream_new (out_handle, TRUE);
507
508   reader_cancel = g_cancellable_new ();
509   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
510                              G_PRIORITY_DEFAULT, reader_cancel,
511                              readable_cancel, out);
512
513   g_timeout_add (500, timeout, reader_cancel);
514
515   loop = g_main_loop_new (NULL, TRUE);
516   g_main_loop_run (loop);
517   g_main_loop_unref (loop);
518
519   g_object_unref (reader_cancel);
520   g_object_unref (in);
521   g_object_unref (out);
522 }
523
524 int
525 main (int   argc,
526       char *argv[])
527 {
528   g_type_init ();
529   g_test_init (&argc, &argv, NULL);
530
531   g_test_add_func ("/win32-streams/pipe-io-test", test_pipe_io);
532   g_test_add_func ("/win32-streams/pipe-io-cancel-test", test_pipe_io_cancel);
533   g_test_add_func ("/win32-streams/pipe-io-overlap-test", test_pipe_io_overlap);
534   g_test_add_func ("/win32-streams/pipe-io-concurrent-test", test_pipe_io_concurrent);
535
536   return g_test_run();
537 }