gkdbus: Fix underflow and unreachable code bug
[platform/upstream/glib.git] / gio / tests / unix-streams.c
1 /* GLib testing framework examples and tests
2  * Copyright (C) 2008 Red Hat, Inc
3  *
4  * SPDX-License-Identifier: LicenseRef-old-glib-tests
5  *
6  * This work is provided "as is"; redistribution and modification
7  * in whole or in part, in any medium, physical or electronic is
8  * permitted without restriction.
9  *
10  * This work is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13  *
14  * In no event shall the authors or contributors be liable for any
15  * direct, indirect, incidental, special, exemplary, or consequential
16  * damages (including, but not limited to, procurement of substitute
17  * goods or services; loss of use, data, or profits; or business
18  * interruption) however caused and on any theory of liability, whether
19  * in contract, strict liability, or tort (including negligence or
20  * otherwise) arising in any way out of the use of this software, even
21  * if advised of the possibility of such damage.
22  */
23
24 #include <gio/gio.h>
25 #include <gio/gunixinputstream.h>
26 #include <gio/gunixoutputstream.h>
27 #include <glib.h>
28 #include <glib/glib-unix.h>
29 #include <signal.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
33 #include <fcntl.h>
34
35 /* sizeof(DATA) will give the number of bytes in the array, plus the terminating nul */
36 static const gchar DATA[] = "abcdefghijklmnopqrstuvwxyz";
37
38 int writer_pipe[2], reader_pipe[2];
39 GCancellable *writer_cancel, *reader_cancel, *main_cancel;
40 GMainLoop *loop;
41
42
43 static gpointer
44 writer_thread (gpointer user_data)
45 {
46   GOutputStream *out;
47   gssize nwrote, offset;
48   GError *err = NULL;
49
50   out = g_unix_output_stream_new (writer_pipe[1], TRUE);
51
52   do
53     {
54       g_usleep (10);
55
56       offset = 0;
57       while (offset < (gssize) sizeof (DATA))
58         {
59           nwrote = g_output_stream_write (out, DATA + offset,
60                                           sizeof (DATA) - offset,
61                                           writer_cancel, &err);
62           if (nwrote <= 0 || err != NULL)
63             break;
64           offset += nwrote;
65         }
66
67       g_assert_true (nwrote > 0 || err != NULL);
68     }
69   while (err == NULL);
70
71   if (g_cancellable_is_cancelled (writer_cancel))
72     {
73       g_clear_error (&err);
74       g_cancellable_cancel (main_cancel);
75       g_object_unref (out);
76       return NULL;
77     }
78
79   g_warning ("writer: %s", err->message);
80   g_assert_not_reached ();
81 }
82
83 static gpointer
84 reader_thread (gpointer user_data)
85 {
86   GInputStream *in;
87   gssize nread = 0, total;
88   GError *err = NULL;
89   char buf[sizeof (DATA)];
90
91   in = g_unix_input_stream_new (reader_pipe[0], TRUE);
92
93   do
94     {
95       total = 0;
96       while (total < (gssize) sizeof (DATA))
97         {
98           nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
99                                        reader_cancel, &err);
100           if (nread <= 0 || err != NULL)
101             break;
102           total += nread;
103         }
104
105       if (err)
106         break;
107
108       if (nread == 0)
109         {
110           g_assert_no_error (err);
111           /* pipe closed */
112           g_object_unref (in);
113           return NULL;
114         }
115
116       g_assert_cmpstr (buf, ==, DATA);
117       g_assert_false (g_cancellable_is_cancelled (reader_cancel));
118     }
119   while (err == NULL);
120
121   g_warning ("reader: %s", err->message);
122   g_assert_not_reached ();
123 }
124
125 static char main_buf[sizeof (DATA)];
126 static gssize main_len, main_offset;
127
128 static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
129 static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
130 static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
131
132 static void
133 do_main_cancel (GOutputStream *out)
134 {
135   g_output_stream_close (out, NULL, NULL);
136   g_main_loop_quit (loop);
137 }
138
139 static void
140 main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
141 {
142   GInputStream *in = G_INPUT_STREAM (source);
143   GOutputStream *out = user_data;
144   GError *err = NULL;
145   gssize nskipped;
146
147   nskipped = g_input_stream_skip_finish (in, res, &err);
148
149   if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
150     {
151       g_assert_true (g_cancellable_is_cancelled (main_cancel));
152       do_main_cancel (out);
153       g_clear_error (&err);
154       return;
155     }
156
157   g_assert_no_error (err);
158
159   main_offset += nskipped;
160   if (main_offset == main_len)
161     {
162       main_offset = 0;
163       g_output_stream_write_async (out, main_buf, main_len,
164                                    G_PRIORITY_DEFAULT, main_cancel,
165                                    main_thread_wrote, in);
166     }
167   else
168     {
169       g_input_stream_skip_async (in, main_len - main_offset,
170                                  G_PRIORITY_DEFAULT, main_cancel,
171                                  main_thread_skipped, out);
172     }
173 }
174
175 static void
176 main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
177 {
178   GInputStream *in = G_INPUT_STREAM (source);
179   GOutputStream *out = user_data;
180   GError *err = NULL;
181   gssize nread;
182
183   nread = g_input_stream_read_finish (in, res, &err);
184
185   if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
186     {
187       g_assert_true (g_cancellable_is_cancelled (main_cancel));
188       do_main_cancel (out);
189       g_clear_error (&err);
190       return;
191     }
192
193   g_assert_no_error (err);
194
195   main_offset += nread;
196   if (main_offset == sizeof (DATA))
197     {
198       main_len = main_offset;
199       main_offset = 0;
200       /* Now skip the same amount */
201       g_input_stream_skip_async (in, main_len,
202                                  G_PRIORITY_DEFAULT, main_cancel,
203                                  main_thread_skipped, out);
204     }
205   else
206     {
207       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
208                                  G_PRIORITY_DEFAULT, main_cancel,
209                                  main_thread_read, out);
210     }
211 }
212
213 static void
214 main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
215 {
216   GOutputStream *out = G_OUTPUT_STREAM (source);
217   GInputStream *in = user_data;
218   GError *err = NULL;
219   gssize nwrote;
220
221   nwrote = g_output_stream_write_finish (out, res, &err);
222
223   if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
224     {
225       g_assert_true (g_cancellable_is_cancelled (main_cancel));
226       do_main_cancel (out);
227       g_clear_error (&err);
228       return;
229     }
230
231   g_assert_no_error (err);
232   g_assert_cmpint (nwrote, <=, main_len - main_offset);
233
234   main_offset += nwrote;
235   if (main_offset == main_len)
236     {
237       main_offset = 0;
238       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
239                                  G_PRIORITY_DEFAULT, main_cancel,
240                                  main_thread_read, out);
241     }
242   else
243     {
244       g_output_stream_write_async (out, main_buf + main_offset,
245                                    main_len - main_offset,
246                                    G_PRIORITY_DEFAULT, main_cancel,
247                                    main_thread_wrote, in);
248     }
249 }
250
251 static gboolean
252 timeout (gpointer cancellable)
253 {
254   g_cancellable_cancel (cancellable);
255   return FALSE;
256 }
257
258 static void
259 test_pipe_io (gconstpointer nonblocking)
260 {
261   GThread *writer, *reader;
262   GInputStream *in;
263   GOutputStream *out;
264
265   /* Split off two (additional) threads, a reader and a writer. From
266    * the writer thread, write data synchronously in small chunks,
267    * which gets alternately read and skipped asynchronously by the
268    * main thread and then (if not skipped) written asynchronously to
269    * the reader thread, which reads it synchronously. Eventually a
270    * timeout in the main thread will cause it to cancel the writer
271    * thread, which will in turn cancel the read op in the main thread,
272    * which will then close the pipe to the reader thread, causing the
273    * read op to fail.
274    */
275
276   g_assert_true (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
277
278   if (nonblocking)
279     {
280       GError *error = NULL;
281
282       g_unix_set_fd_nonblocking (writer_pipe[0], TRUE, &error);
283       g_assert_no_error (error);
284       g_unix_set_fd_nonblocking (writer_pipe[1], TRUE, &error);
285       g_assert_no_error (error);
286       g_unix_set_fd_nonblocking (reader_pipe[0], TRUE, &error);
287       g_assert_no_error (error);
288       g_unix_set_fd_nonblocking (reader_pipe[1], TRUE, &error);
289       g_assert_no_error (error);
290     }
291
292   writer_cancel = g_cancellable_new ();
293   reader_cancel = g_cancellable_new ();
294   main_cancel = g_cancellable_new ();
295
296   writer = g_thread_new ("writer", writer_thread, NULL);
297   reader = g_thread_new ("reader", reader_thread, NULL);
298
299   in = g_unix_input_stream_new (writer_pipe[0], TRUE);
300   out = g_unix_output_stream_new (reader_pipe[1], TRUE);
301
302   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
303                              G_PRIORITY_DEFAULT, main_cancel,
304                              main_thread_read, out);
305
306   g_timeout_add (500, timeout, writer_cancel);
307
308   loop = g_main_loop_new (NULL, TRUE);
309   g_main_loop_run (loop);
310   g_main_loop_unref (loop);
311
312   g_thread_join (reader);
313   g_thread_join (writer);
314
315   g_object_unref (main_cancel);
316   g_object_unref (reader_cancel);
317   g_object_unref (writer_cancel);
318   g_object_unref (in);
319   g_object_unref (out);
320 }
321
322 static void
323 test_basic (void)
324 {
325   GUnixInputStream *is;
326   GUnixOutputStream *os;
327   gint fd;
328   gboolean close_fd;
329
330   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
331   g_object_get (is,
332                 "fd", &fd,
333                 "close-fd", &close_fd,
334                 NULL);
335   g_assert_cmpint (fd, ==, 0);
336   g_assert_true (close_fd);
337
338   g_unix_input_stream_set_close_fd (is, FALSE);
339   g_assert_false (g_unix_input_stream_get_close_fd (is));
340   g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
341
342   g_assert_false (g_input_stream_has_pending (G_INPUT_STREAM (is)));
343
344   g_object_unref (is);
345
346   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
347   g_object_get (os,
348                 "fd", &fd,
349                 "close-fd", &close_fd,
350                 NULL);
351   g_assert_cmpint (fd, ==, 1);
352   g_assert_true (close_fd);
353
354   g_unix_output_stream_set_close_fd (os, FALSE);
355   g_assert_false (g_unix_output_stream_get_close_fd (os));
356   g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
357
358   g_assert_false (g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
359
360   g_object_unref (os);
361 }
362
363 typedef struct {
364   GInputStream *is;
365   GOutputStream *os;
366   const guint8 *write_data;
367   guint8 *read_data;
368 } TestReadWriteData;
369
370 static gpointer
371 test_read_write_write_thread (gpointer user_data)
372 {
373   TestReadWriteData *data = user_data;
374   gsize bytes_written;
375   GError *error = NULL;
376   gboolean res;
377
378   res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
379   g_assert_true (res);
380   g_assert_no_error (error);
381   g_assert_cmpuint (bytes_written, ==, 1024);
382
383   return NULL;
384 }
385
386 static gpointer
387 test_read_write_read_thread (gpointer user_data)
388 {
389   TestReadWriteData *data = user_data;
390   gsize bytes_read;
391   GError *error = NULL;
392   gboolean res;
393
394   res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
395   g_assert_true (res);
396   g_assert_no_error (error);
397   g_assert_cmpuint (bytes_read, ==, 1024);
398
399   return NULL;
400 }
401
402 static gpointer
403 test_read_write_writev_thread (gpointer user_data)
404 {
405   TestReadWriteData *data = user_data;
406   gsize bytes_written;
407   GError *error = NULL;
408   gboolean res;
409   GOutputVector vectors[3];
410
411   vectors[0].buffer = data->write_data;
412   vectors[0].size = 256;
413   vectors[1].buffer = data->write_data + 256;
414   vectors[1].size = 256;
415   vectors[2].buffer = data->write_data + 512;
416   vectors[2].size = 512;
417
418   res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
419   g_assert_true (res);
420   g_assert_no_error (error);
421   g_assert_cmpuint (bytes_written, ==, 1024);
422
423   return NULL;
424 }
425
426 /* test if normal writing/reading from a pipe works */
427 static void
428 test_read_write (gconstpointer user_data)
429 {
430   gboolean writev = GPOINTER_TO_INT (user_data);
431   GUnixInputStream *is;
432   GUnixOutputStream *os;
433   gint fd[2];
434   guint8 data_write[1024], data_read[1024];
435   guint i;
436   GThread *write_thread, *read_thread;
437   TestReadWriteData data;
438
439   for (i = 0; i < sizeof (data_write); i++)
440     data_write[i] = i;
441
442   g_assert_cmpint (pipe (fd), ==, 0);
443
444   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
445   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
446
447   data.is = G_INPUT_STREAM (is);
448   data.os = G_OUTPUT_STREAM (os);
449   data.read_data = data_read;
450   data.write_data = data_write;
451
452   if (writev)
453     write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
454   else
455     write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
456   read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
457
458   g_thread_join (write_thread);
459   g_thread_join (read_thread);
460
461   g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
462
463   g_object_unref (os);
464   g_object_unref (is);
465 }
466
467 /* test if g_pollable_output_stream_write_nonblocking() and
468  * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
469  * and correctly reset their status afterwards again, and all data that is
470  * written can also be read again.
471  */
472 static void
473 test_write_wouldblock (void)
474 {
475 #ifndef F_GETPIPE_SZ
476   g_test_skip ("F_GETPIPE_SZ not defined");
477 #else  /* if F_GETPIPE_SZ */
478   GUnixInputStream *is;
479   GUnixOutputStream *os;
480   gint fd[2];
481   GError *err = NULL;
482   guint8 data_write[1024], data_read[1024];
483   gsize i;
484   int retval;
485   gsize pipe_capacity;
486
487   for (i = 0; i < sizeof (data_write); i++)
488     data_write[i] = i;
489
490   g_assert_cmpint (pipe (fd), ==, 0);
491
492   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
493   retval = fcntl (fd[0], F_GETPIPE_SZ);
494   g_assert_cmpint (retval, >=, 0);
495   pipe_capacity = (gsize) retval;
496   g_assert_cmpint (pipe_capacity, >=, 4096);
497   g_assert_cmpint (pipe_capacity % 1024, >=, 0);
498
499   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
500   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
501
502   /* Run the whole thing three times to make sure that the streams
503    * reset the writability/readability state again */
504   for (i = 0; i < 3; i++) {
505     gssize written = 0, written_complete = 0;
506     gssize read = 0, read_complete = 0;
507
508     do
509       {
510         written_complete += written;
511         written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
512                                                               data_write,
513                                                               sizeof (data_write),
514                                                               NULL,
515                                                               &err);
516       }
517     while (written > 0);
518
519     g_assert_cmpuint (written_complete, >, 0);
520     g_assert_nonnull (err);
521     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
522     g_clear_error (&err);
523
524     do
525       {
526         read_complete += read;
527         read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
528                                                          data_read,
529                                                          sizeof (data_read),
530                                                          NULL,
531                                                          &err);
532         if (read > 0)
533           g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
534       }
535     while (read > 0);
536
537     g_assert_cmpuint (read_complete, ==, written_complete);
538     g_assert_nonnull (err);
539     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
540     g_clear_error (&err);
541   }
542
543   g_object_unref (os);
544   g_object_unref (is);
545 #endif  /* if F_GETPIPE_SZ */
546 }
547
548 /* test if g_pollable_output_stream_writev_nonblocking() and
549  * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
550  * and correctly reset their status afterwards again, and all data that is
551  * written can also be read again.
552  */
553 static void
554 test_writev_wouldblock (void)
555 {
556 #ifndef F_GETPIPE_SZ
557   g_test_skip ("F_GETPIPE_SZ not defined");
558 #else  /* if F_GETPIPE_SZ */
559   GUnixInputStream *is;
560   GUnixOutputStream *os;
561   gint fd[2];
562   GError *err = NULL;
563   guint8 data_write[1024], data_read[1024];
564   gsize i;
565   int retval;
566   gsize pipe_capacity;
567   GOutputVector vectors[4];
568   GPollableReturn res;
569
570   for (i = 0; i < sizeof (data_write); i++)
571     data_write[i] = i;
572
573   g_assert_cmpint (pipe (fd), ==, 0);
574
575   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
576   retval = fcntl (fd[0], F_GETPIPE_SZ);
577   g_assert_cmpint (retval, >=, 0);
578   pipe_capacity = (gsize) retval;
579   g_assert_cmpint (pipe_capacity, >=, 4096);
580   g_assert_cmpint (pipe_capacity % 1024, >=, 0);
581
582   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
583   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
584
585   /* Run the whole thing three times to make sure that the streams
586    * reset the writability/readability state again */
587   for (i = 0; i < 3; i++) {
588     gsize written = 0, written_complete = 0;
589     gssize read = 0, read_complete = 0;
590
591     do
592     {
593         written_complete += written;
594
595         vectors[0].buffer = data_write;
596         vectors[0].size = 256;
597         vectors[1].buffer = data_write + 256;
598         vectors[1].size = 256;
599         vectors[2].buffer = data_write + 512;
600         vectors[2].size = 256;
601         vectors[3].buffer = data_write + 768;
602         vectors[3].size = 256;
603
604         res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
605                                                            vectors,
606                                                            G_N_ELEMENTS (vectors),
607                                                            &written,
608                                                            NULL,
609                                                            &err);
610       }
611     while (res == G_POLLABLE_RETURN_OK);
612
613     g_assert_cmpuint (written_complete, >, 0);
614     g_assert_null (err);
615     g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
616     /* writev() on UNIX streams either succeeds fully or not at all */
617     g_assert_cmpuint (written, ==, 0);
618
619     do
620       {
621         read_complete += read;
622         read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
623                                                          data_read,
624                                                          sizeof (data_read),
625                                                          NULL,
626                                                          &err);
627         if (read > 0)
628           g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
629       }
630     while (read > 0);
631
632     g_assert_cmpuint (read_complete, ==, written_complete);
633     g_assert_nonnull (err);
634     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
635     g_clear_error (&err);
636   }
637
638   g_object_unref (os);
639   g_object_unref (is);
640 #endif  /* if F_GETPIPE_SZ */
641 }
642
643 #ifdef F_GETPIPE_SZ
644 static void
645 write_async_wouldblock_cb (GUnixOutputStream *os,
646                            GAsyncResult      *result,
647                            gpointer           user_data)
648 {
649   gsize *bytes_written = user_data;
650   GError *err = NULL;
651
652   g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
653   g_assert_no_error (err);
654 }
655
656 static void
657 read_async_wouldblock_cb (GUnixInputStream  *is,
658                           GAsyncResult      *result,
659                           gpointer           user_data)
660 {
661   gsize *bytes_read = user_data;
662   GError *err = NULL;
663
664   g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
665   g_assert_no_error (err);
666 }
667 #endif  /* if F_GETPIPE_SZ */
668
669 /* test if the async implementation of write_all() and read_all() in G*Stream
670  * around the GPollable*Stream API is working correctly.
671  */
672 static void
673 test_write_async_wouldblock (void)
674 {
675 #ifndef F_GETPIPE_SZ
676   g_test_skip ("F_GETPIPE_SZ not defined");
677 #else  /* if F_GETPIPE_SZ */
678   GUnixInputStream *is;
679   GUnixOutputStream *os;
680   gint fd[2];
681   guint8 *data, *data_read;
682   gsize i;
683   int retval;
684   gsize pipe_capacity;
685   gsize bytes_written = 0, bytes_read = 0;
686
687   g_assert_cmpint (pipe (fd), ==, 0);
688
689   /* FIXME: These should not be needed but otherwise
690    * g_unix_output_stream_write() will block because
691    *   a) the fd is writable
692    *   b) writing 4x capacity will block because writes are atomic
693    *   c) the fd is blocking
694    *
695    * See https://gitlab.gnome.org/GNOME/glib/issues/1654
696    */
697   g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
698   g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
699
700   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
701   retval = fcntl (fd[0], F_GETPIPE_SZ);
702   g_assert_cmpint (retval, >=, 0);
703   pipe_capacity = (gsize) retval;
704   g_assert_cmpint (pipe_capacity, >=, 4096);
705
706   data = g_new (guint8, 4 * pipe_capacity);
707   for (i = 0; i < 4 * pipe_capacity; i++)
708     data[i] = i;
709   data_read = g_new (guint8, 4 * pipe_capacity);
710
711   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
712   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
713
714   g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
715                                    data,
716                                    4 * pipe_capacity,
717                                    G_PRIORITY_DEFAULT,
718                                    NULL,
719                                    (GAsyncReadyCallback) write_async_wouldblock_cb,
720                                    &bytes_written);
721
722   g_input_stream_read_all_async (G_INPUT_STREAM (is),
723                                  data_read,
724                                  4 * pipe_capacity,
725                                  G_PRIORITY_DEFAULT,
726                                  NULL,
727                                  (GAsyncReadyCallback) read_async_wouldblock_cb,
728                                  &bytes_read);
729
730   while (bytes_written == 0 && bytes_read == 0)
731     g_main_context_iteration (NULL, TRUE);
732
733   g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
734   g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
735   g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
736
737   g_free (data);
738   g_free (data_read);
739
740   g_object_unref (os);
741   g_object_unref (is);
742 #endif  /* if F_GETPIPE_SZ */
743 }
744
745 #ifdef F_GETPIPE_SZ
746 static void
747 writev_async_wouldblock_cb (GUnixOutputStream *os,
748                             GAsyncResult      *result,
749                             gpointer           user_data)
750 {
751   gsize *bytes_written = user_data;
752   GError *err = NULL;
753
754   g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
755   g_assert_no_error (err);
756 }
757 #endif  /* if F_GETPIPE_SZ */
758
759 /* test if the async implementation of writev_all() and read_all() in G*Stream
760  * around the GPollable*Stream API is working correctly.
761  */
762 static void
763 test_writev_async_wouldblock (void)
764 {
765 #ifndef F_GETPIPE_SZ
766   g_test_skip ("F_GETPIPE_SZ not defined");
767 #else  /* if F_GETPIPE_SZ */
768   GUnixInputStream *is;
769   GUnixOutputStream *os;
770   gint fd[2];
771   guint8 *data, *data_read;
772   gsize i;
773   int retval;
774   gsize pipe_capacity;
775   gsize bytes_written = 0, bytes_read = 0;
776   GOutputVector vectors[4];
777
778   g_assert_cmpint (pipe (fd), ==, 0);
779
780   /* FIXME: These should not be needed but otherwise
781    * g_unix_output_stream_writev() will block because
782    *   a) the fd is writable
783    *   b) writing 4x capacity will block because writes are atomic
784    *   c) the fd is blocking
785    *
786    * See https://gitlab.gnome.org/GNOME/glib/issues/1654
787    */
788   g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
789   g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
790
791   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
792   retval = fcntl (fd[0], F_GETPIPE_SZ);
793   g_assert_cmpint (retval, >=, 0);
794   pipe_capacity = (gsize) retval;
795   g_assert_cmpint (pipe_capacity, >=, 4096);
796
797   data = g_new (guint8, 4 * pipe_capacity);
798   for (i = 0; i < 4 * pipe_capacity; i++)
799     data[i] = i;
800   data_read = g_new (guint8, 4 * pipe_capacity);
801
802   vectors[0].buffer = data;
803   vectors[0].size = 1024;
804   vectors[1].buffer = data + 1024;
805   vectors[1].size = 1024;
806   vectors[2].buffer = data + 2048;
807   vectors[2].size = 1024;
808   vectors[3].buffer = data + 3072;
809   vectors[3].size = 4 * pipe_capacity - 3072;
810
811   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
812   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
813
814   g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
815                                     vectors,
816                                     G_N_ELEMENTS (vectors),
817                                     G_PRIORITY_DEFAULT,
818                                     NULL,
819                                     (GAsyncReadyCallback) writev_async_wouldblock_cb,
820                                     &bytes_written);
821
822   g_input_stream_read_all_async (G_INPUT_STREAM (is),
823                                  data_read,
824                                  4 * pipe_capacity,
825                                  G_PRIORITY_DEFAULT,
826                                  NULL,
827                                  (GAsyncReadyCallback) read_async_wouldblock_cb,
828                                  &bytes_read);
829
830   while (bytes_written == 0 && bytes_read == 0)
831     g_main_context_iteration (NULL, TRUE);
832
833   g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
834   g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
835   g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
836
837   g_free (data);
838   g_free (data_read);
839
840   g_object_unref (os);
841   g_object_unref (is);
842 #endif  /* F_GETPIPE_SZ */
843 }
844
845 int
846 main (int   argc,
847       char *argv[])
848 {
849   g_test_init (&argc, &argv, NULL);
850
851   g_test_add_func ("/unix-streams/basic", test_basic);
852   g_test_add_data_func ("/unix-streams/pipe-io-test",
853                         GINT_TO_POINTER (FALSE),
854                         test_pipe_io);
855   g_test_add_data_func ("/unix-streams/nonblocking-io-test",
856                         GINT_TO_POINTER (TRUE),
857                         test_pipe_io);
858
859   g_test_add_data_func ("/unix-streams/read_write",
860                         GINT_TO_POINTER (FALSE),
861                         test_read_write);
862
863   g_test_add_data_func ("/unix-streams/read_writev",
864                         GINT_TO_POINTER (TRUE),
865                         test_read_write);
866
867   g_test_add_func ("/unix-streams/write-wouldblock",
868                    test_write_wouldblock);
869   g_test_add_func ("/unix-streams/writev-wouldblock",
870                    test_writev_wouldblock);
871
872   g_test_add_func ("/unix-streams/write-async-wouldblock",
873                    test_write_async_wouldblock);
874   g_test_add_func ("/unix-streams/writev-async-wouldblock",
875                    test_writev_async_wouldblock);
876
877   return g_test_run();
878 }