char main_buf[sizeof (DATA)];
gssize main_len, main_offset;
-static void readable (GObject *source, GAsyncResult *res, gpointer user_data);
-static void writable (GObject *source, GAsyncResult *res, gpointer user_data);
+static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
+static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
+static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
static void
do_main_cancel (GOutputStream *out)
}
static void
-readable (GObject *source, GAsyncResult *res, gpointer user_data)
+main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
{
GInputStream *in = G_INPUT_STREAM (source);
GOutputStream *out = user_data;
GError *err = NULL;
+ gssize nskipped;
- main_len = g_input_stream_read_finish (in, res, &err);
+ nskipped = g_input_stream_skip_finish (in, res, &err);
if (g_cancellable_is_cancelled (main_cancel))
{
return;
}
- g_assert (err == NULL);
+ g_assert_no_error (err);
- main_offset = 0;
- g_output_stream_write_async (out, main_buf, main_len,
- G_PRIORITY_DEFAULT, main_cancel,
- writable, in);
+ main_offset += nskipped;
+ if (main_offset == main_len)
+ {
+ main_offset = 0;
+ g_output_stream_write_async (out, main_buf, main_len,
+ G_PRIORITY_DEFAULT, main_cancel,
+ main_thread_wrote, in);
+ }
+ else
+ {
+ g_input_stream_skip_async (in, main_len - main_offset,
+ G_PRIORITY_DEFAULT, main_cancel,
+ main_thread_skipped, out);
+ }
}
static void
-writable (GObject *source, GAsyncResult *res, gpointer user_data)
+main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ GInputStream *in = G_INPUT_STREAM (source);
+ GOutputStream *out = user_data;
+ GError *err = NULL;
+ gssize nread;
+
+ nread = g_input_stream_read_finish (in, res, &err);
+
+ if (g_cancellable_is_cancelled (main_cancel))
+ {
+ do_main_cancel (out);
+ return;
+ }
+
+ g_assert_no_error (err);
+
+ main_offset += nread;
+ if (main_offset == sizeof (DATA))
+ {
+ main_len = main_offset;
+ main_offset = 0;
+ /* Now skip the same amount */
+ g_input_stream_skip_async (in, main_len,
+ G_PRIORITY_DEFAULT, main_cancel,
+ main_thread_skipped, out);
+ }
+ else
+ {
+ g_input_stream_read_async (in, main_buf, sizeof (main_buf),
+ G_PRIORITY_DEFAULT, main_cancel,
+ main_thread_read, out);
+ }
+}
+
+static void
+main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
{
GOutputStream *out = G_OUTPUT_STREAM (source);
GInputStream *in = user_data;
return;
}
- g_assert (err == NULL);
+ g_assert_no_error (err);
g_assert_cmpint (nwrote, <=, main_len - main_offset);
main_offset += nwrote;
if (main_offset == main_len)
{
+ main_offset = 0;
g_input_stream_read_async (in, main_buf, sizeof (main_buf),
G_PRIORITY_DEFAULT, main_cancel,
- readable, out);
+ main_thread_read, out);
}
else
{
g_output_stream_write_async (out, main_buf + main_offset,
main_len - main_offset,
G_PRIORITY_DEFAULT, main_cancel,
- writable, in);
+ main_thread_wrote, in);
}
}
/* Split off two (additional) threads, a reader and a writer. From
* the writer thread, write data synchronously in small chunks,
- * which gets read asynchronously by the main thread and then
- * written asynchronously to the reader thread, which reads it
- * synchronously. Eventually a timeout in the main thread will cause
- * it to cancel the writer thread, which will in turn cancel the
- * read op in the main thread, which will then close the pipe to
- * the reader thread, causing the read op to fail.
+ * which gets alternately read and skipped asynchronously by the
+ * main thread and then (if not skipped) written asynchronously to
+ * the reader thread, which reads it synchronously. Eventually a
+ * timeout in the main thread will cause it to cancel the writer
+ * thread, which will in turn cancel the read op in the main thread,
+ * which will then close the pipe to the reader thread, causing the
+ * read op to fail.
*/
g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
g_input_stream_read_async (in, main_buf, sizeof (main_buf),
G_PRIORITY_DEFAULT, main_cancel,
- readable, out);
+ main_thread_read, out);
g_timeout_add (500, timeout, writer_cancel);
g_assert (!g_unix_input_stream_get_close_fd (is));
g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
+ g_assert (!g_input_stream_has_pending (G_INPUT_STREAM (is)));
+
g_object_unref (is);
os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
g_assert (!g_unix_output_stream_get_close_fd (os));
g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
+ g_assert (!g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
+
g_object_unref (os);
}