gsize count,
GCancellable *cancellable,
GError **error);
-static void g_buffered_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data);
-static gssize g_buffered_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
gssize count,
GCancellable *cancellable,
istream_class->skip_async = g_buffered_input_stream_skip_async;
istream_class->skip_finish = g_buffered_input_stream_skip_finish;
istream_class->read_fn = g_buffered_input_stream_read;
- istream_class->read_async = g_buffered_input_stream_read_async;
- istream_class->read_finish = g_buffered_input_stream_read_finish;
bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
bstream_class->fill = g_buffered_input_stream_real_fill;
typedef struct
{
- gssize bytes_read;
- gssize count;
- void *buffer;
-} ReadAsyncData;
-
-static void
-free_read_async_data (gpointer _data)
-{
- ReadAsyncData *data = _data;
- g_slice_free (ReadAsyncData, data);
-}
-
-static void
-large_read_callback (GObject *source_object,
- GAsyncResult *result,
- gpointer user_data)
-{
- GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
- ReadAsyncData *data;
- GError *error;
- gssize nread;
-
- data = g_simple_async_result_get_op_res_gpointer (simple);
-
- error = NULL;
- nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
- result, &error);
-
- /* Only report the error if we've not already read some data */
- if (nread < 0 && data->bytes_read == 0)
- g_simple_async_result_take_error (simple, error);
- else if (error)
- g_error_free (error);
-
- if (nread > 0)
- data->bytes_read += nread;
-
- /* Complete immediately, not in idle, since we're already
- * in a mainloop callout
- */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-}
-
-static void
-read_fill_buffer_callback (GObject *source_object,
- GAsyncResult *result,
- gpointer user_data)
-{
- GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
- GBufferedInputStream *bstream;
- GBufferedInputStreamPrivate *priv;
- ReadAsyncData *data;
- GError *error;
- gssize nread;
- gsize available;
-
- bstream = G_BUFFERED_INPUT_STREAM (source_object);
- priv = bstream->priv;
-
- data = g_simple_async_result_get_op_res_gpointer (simple);
-
- error = NULL;
- nread = g_buffered_input_stream_fill_finish (bstream,
- result, &error);
-
- if (nread < 0 && data->bytes_read == 0)
- g_simple_async_result_take_error (simple, error);
- else if (error)
- g_error_free (error);
-
- if (nread > 0)
- {
- available = priv->end - priv->pos;
- data->count = MIN (data->count, available);
-
- memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
- data->bytes_read += data->count;
- priv->pos += data->count;
- }
-
- /* Complete immediately, not in idle, since we're already
- * in a mainloop callout
- */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-}
-
-static void
-g_buffered_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GBufferedInputStream *bstream;
- GBufferedInputStreamPrivate *priv;
- GBufferedInputStreamClass *class;
- GInputStream *base_stream;
- gsize available;
- GSimpleAsyncResult *simple;
- ReadAsyncData *data;
-
- bstream = G_BUFFERED_INPUT_STREAM (stream);
- priv = bstream->priv;
-
- data = g_slice_new (ReadAsyncData);
- data->buffer = buffer;
- data->bytes_read = 0;
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- g_buffered_input_stream_read_async);
- g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
-
- available = priv->end - priv->pos;
-
- if (count <= available)
- {
- memcpy (buffer, priv->buffer + priv->pos, count);
- priv->pos += count;
- data->bytes_read = count;
-
- g_simple_async_result_complete_in_idle (simple);
- g_object_unref (simple);
- return;
- }
-
-
- /* Full request not available, read all currently available
- * and request refill for more
- */
-
- memcpy (buffer, priv->buffer + priv->pos, available);
- priv->pos = 0;
- priv->end = 0;
-
- count -= available;
-
- data->bytes_read = available;
- data->count = count;
-
- if (count > priv->len)
- {
- /* Large request, shortcut buffer */
-
- base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
-
- g_input_stream_read_async (base_stream,
- (char *)buffer + data->bytes_read,
- count,
- io_priority, cancellable,
- large_read_callback,
- simple);
- }
- else
- {
- class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
- class->fill_async (bstream, priv->len, io_priority, cancellable,
- read_fill_buffer_callback, simple);
- }
-}
-
-static gssize
-g_buffered_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- ReadAsyncData *data;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
-
- data = g_simple_async_result_get_op_res_gpointer (simple);
-
- return data->bytes_read;
-}
-
-typedef struct
-{
gssize bytes_skipped;
gssize count;
} SkipAsyncData;
GCancellable *cancellable,
GError **error);
-static void g_buffered_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_buffered_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_buffered_output_stream_flush_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
ostream_class->write_fn = g_buffered_output_stream_write;
ostream_class->flush = g_buffered_output_stream_flush;
ostream_class->close_fn = g_buffered_output_stream_close;
- ostream_class->write_async = g_buffered_output_stream_write_async;
- ostream_class->write_finish = g_buffered_output_stream_write_finish;
ostream_class->flush_async = g_buffered_output_stream_flush_async;
ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
ostream_class->close_async = g_buffered_output_stream_close_async;
g_simple_async_result_take_error (result, error);
}
-typedef struct {
-
- FlushData fdata;
-
- gsize count;
- const void *buffer;
-
-} WriteData;
-
-static void
-free_write_data (gpointer data)
-{
- g_slice_free (WriteData, data);
-}
-
-static void
-g_buffered_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data)
-{
- GBufferedOutputStream *buffered_stream;
- GBufferedOutputStreamPrivate *priv;
- GSimpleAsyncResult *res;
- WriteData *wdata;
-
- buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
- priv = buffered_stream->priv;
-
- wdata = g_slice_new (WriteData);
- wdata->count = count;
- wdata->buffer = buffer;
-
- res = g_simple_async_result_new (G_OBJECT (stream),
- callback,
- data,
- g_buffered_output_stream_write_async);
-
- g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
-
- /* if we have space left directly call the
- * callback (from idle) otherwise schedule a buffer
- * flush in the thread. In both cases the actual
- * copying of the data to the buffer will be done in
- * the write_finish () func since that should
- * be fast enough */
- if (priv->len - priv->pos > 0)
- {
- g_simple_async_result_complete_in_idle (res);
- }
- else
- {
- wdata->fdata.flush_stream = FALSE;
- wdata->fdata.close_stream = FALSE;
- g_simple_async_result_run_in_thread (res,
- flush_buffer_thread,
- io_priority,
- cancellable);
- }
- g_object_unref (res);
-}
-
-static gssize
-g_buffered_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GBufferedOutputStreamPrivate *priv;
- GBufferedOutputStream *buffered_stream;
- GSimpleAsyncResult *simple;
- WriteData *wdata;
- gssize count;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
- priv = buffered_stream->priv;
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
- g_buffered_output_stream_write_async);
-
- wdata = g_simple_async_result_get_op_res_gpointer (simple);
-
- /* Now do the real copying of data to the buffer */
- count = priv->len - priv->pos;
- count = MIN (wdata->count, count);
-
- memcpy (priv->buffer + priv->pos, wdata->buffer, count);
-
- priv->pos += count;
-
- return count;
-}
-
static void
g_buffered_output_stream_flush_async (GOutputStream *stream,
int io_priority,
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "gioerror.h"
-
+#include "gpollableinputstream.h"
/**
* SECTION:ginputstream
void *buffer;
gsize count_requested;
gssize count_read;
+
+ GCancellable *cancellable;
+ gint io_priority;
+ gboolean need_idle;
} ReadData;
static void
g_simple_async_result_take_error (res, error);
}
+static void read_async_pollable (GPollableInputStream *stream,
+ GSimpleAsyncResult *result);
+
+static gboolean
+read_async_pollable_ready (GPollableInputStream *stream,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *result = user_data;
+
+ read_async_pollable (stream, result);
+ return FALSE;
+}
+
+static void
+read_async_pollable (GPollableInputStream *stream,
+ GSimpleAsyncResult *result)
+{
+ GError *error = NULL;
+ ReadData *op = g_simple_async_result_get_op_res_gpointer (result);
+
+ if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
+ op->count_read = -1;
+ else
+ {
+ op->count_read = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+ read_nonblocking (stream, op->buffer, op->count_requested, &error);
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ GSource *source;
+
+ g_error_free (error);
+ op->need_idle = FALSE;
+
+ source = g_pollable_input_stream_create_source (stream, op->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) read_async_pollable_ready,
+ g_object_ref (result), g_object_unref);
+ g_source_set_priority (source, op->io_priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ return;
+ }
+
+ if (op->count_read == -1)
+ g_simple_async_result_take_error (result, error);
+
+ if (op->need_idle)
+ g_simple_async_result_complete_in_idle (result);
+ else
+ g_simple_async_result_complete (result);
+}
+
static void
g_input_stream_real_read_async (GInputStream *stream,
void *buffer,
g_simple_async_result_set_op_res_gpointer (res, op, g_free);
op->buffer = buffer;
op->count_requested = count;
-
- g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
+ op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+ op->io_priority = io_priority;
+ op->need_idle = TRUE;
+
+ if (G_IS_POLLABLE_INPUT_STREAM (stream) &&
+ g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
+ read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), res);
+ else
+ g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
g_object_unref (res);
}
static gboolean g_memory_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_memory_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data);
-static gssize g_memory_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_memory_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
istream_class->skip = g_memory_input_stream_skip;
istream_class->close_fn = g_memory_input_stream_close;
- istream_class->read_async = g_memory_input_stream_read_async;
- istream_class->read_finish = g_memory_input_stream_read_finish;
istream_class->skip_async = g_memory_input_stream_skip_async;
istream_class->skip_finish = g_memory_input_stream_skip_finish;
istream_class->close_async = g_memory_input_stream_close_async;
}
static void
-g_memory_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize nread;
-
- nread = G_INPUT_STREAM_GET_CLASS (stream)->read_fn (stream,
- buffer,
- count,
- cancellable,
- &error);
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback,
- user_data,
- g_memory_input_stream_read_async);
- if (error)
- g_simple_async_result_take_error (simple, error);
- else
- g_simple_async_result_set_op_res_gssize (simple, nread);
- g_simple_async_result_complete_in_idle (simple);
- g_object_unref (simple);
-}
-
-static gssize
-g_memory_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize nread;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_read_async);
-
- nread = g_simple_async_result_get_op_res_gssize (simple);
- return nread;
-}
-
-static void
g_memory_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
GCancellable *cancellable,
GError **error);
-static void g_memory_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_memory_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_memory_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
ostream_class->write_fn = g_memory_output_stream_write;
ostream_class->close_fn = g_memory_output_stream_close;
- ostream_class->write_async = g_memory_output_stream_write_async;
- ostream_class->write_finish = g_memory_output_stream_write_finish;
ostream_class->close_async = g_memory_output_stream_close_async;
ostream_class->close_finish = g_memory_output_stream_close_finish;
}
static void
-g_memory_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize nwritten;
-
- nwritten = G_OUTPUT_STREAM_GET_CLASS (stream)->write_fn (stream,
- buffer,
- count,
- cancellable,
- &error);
-
- simple = g_simple_async_result_new (G_OBJECT (stream),
- callback,
- data,
- g_memory_output_stream_write_async);
-
- if (error)
- g_simple_async_result_take_error (simple, error);
- else
- g_simple_async_result_set_op_res_gssize (simple, nwritten);
- g_simple_async_result_complete_in_idle (simple);
- g_object_unref (simple);
-}
-
-static gssize
-g_memory_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize nwritten;
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
- g_memory_output_stream_write_async);
-
- nwritten = g_simple_async_result_get_op_res_gssize (simple);
-
- return nwritten;
-}
-
-static void
g_memory_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
#include "ginputstream.h"
#include "gioerror.h"
#include "glibintl.h"
-
+#include "gpollableoutputstream.h"
/**
* SECTION:goutputstream
const void *buffer;
gsize count_requested;
gssize count_written;
+
+ GCancellable *cancellable;
+ gint io_priority;
+ gboolean need_idle;
} WriteData;
static void
g_simple_async_result_take_error (res, error);
}
+static void write_async_pollable (GPollableOutputStream *stream,
+ GSimpleAsyncResult *result);
+
+static gboolean
+write_async_pollable_ready (GPollableOutputStream *stream,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *result = user_data;
+
+ write_async_pollable (stream, result);
+ return FALSE;
+}
+
+static void
+write_async_pollable (GPollableOutputStream *stream,
+ GSimpleAsyncResult *result)
+{
+ GError *error = NULL;
+ WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
+
+ if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
+ op->count_written = -1;
+ else
+ {
+ op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+ write_nonblocking (stream, op->buffer, op->count_requested, &error);
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ GSource *source;
+
+ g_error_free (error);
+ op->need_idle = FALSE;
+
+ source = g_pollable_output_stream_create_source (stream, op->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) write_async_pollable_ready,
+ g_object_ref (result), g_object_unref);
+ g_source_set_priority (source, op->io_priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ return;
+ }
+
+ if (op->count_written == -1)
+ g_simple_async_result_take_error (result, error);
+
+ if (op->need_idle)
+ g_simple_async_result_complete_in_idle (result);
+ else
+ g_simple_async_result_complete (result);
+}
+
static void
g_output_stream_real_write_async (GOutputStream *stream,
const void *buffer,
op->buffer = buffer;
op->count_requested = count;
- g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
+ if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
+ g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
+ write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
+ else
+ g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
g_object_unref (res);
}
}
static gboolean
-g_socket_input_stream_read_ready (GSocket *socket,
- GIOCondition condition,
- GSocketInputStream *stream)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize result;
-
- result = g_socket_receive_with_blocking (stream->priv->socket,
- stream->priv->buffer,
- stream->priv->count,
- FALSE,
- stream->priv->cancellable,
- &error);
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
- return TRUE;
-
- simple = stream->priv->result;
- stream->priv->result = NULL;
-
- if (result >= 0)
- g_simple_async_result_set_op_res_gssize (simple, result);
-
- if (error)
- g_simple_async_result_take_error (simple, error);
-
- if (stream->priv->cancellable)
- g_object_unref (stream->priv->cancellable);
-
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
-
-static void
-g_socket_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- gint io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
- GSource *source;
-
- g_assert (input_stream->priv->result == NULL);
-
- input_stream->priv->result =
- g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
- g_socket_input_stream_read_async);
- if (cancellable)
- g_object_ref (cancellable);
- input_stream->priv->cancellable = cancellable;
- input_stream->priv->buffer = buffer;
- input_stream->priv->count = count;
-
- source = g_socket_create_source (input_stream->priv->socket,
- G_IO_IN | G_IO_HUP | G_IO_ERR,
- cancellable);
- g_source_set_callback (source,
- (GSourceFunc) g_socket_input_stream_read_ready,
- g_object_ref (input_stream), g_object_unref);
- g_source_attach (source, g_main_context_get_thread_default ());
- g_source_unref (source);
-}
-
-static gssize
-g_socket_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize count;
-
- g_return_val_if_fail (G_IS_SOCKET_INPUT_STREAM (stream), -1);
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
-
- count = g_simple_async_result_get_op_res_gssize (simple);
-
- return count;
-}
-
-static gboolean
g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
gobject_class->set_property = g_socket_input_stream_set_property;
ginputstream_class->read_fn = g_socket_input_stream_read;
- ginputstream_class->read_async = g_socket_input_stream_read_async;
- ginputstream_class->read_finish = g_socket_input_stream_read_finish;
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
}
static gboolean
-g_socket_output_stream_write_ready (GSocket *socket,
- GIOCondition condition,
- GSocketOutputStream *stream)
+g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize result;
-
- result = g_socket_send_with_blocking (stream->priv->socket,
- stream->priv->buffer,
- stream->priv->count,
- FALSE,
- stream->priv->cancellable,
- &error);
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
- return TRUE;
-
- simple = stream->priv->result;
- stream->priv->result = NULL;
-
- if (result >= 0)
- g_simple_async_result_set_op_res_gssize (simple, result);
-
- if (error)
- g_simple_async_result_take_error (simple, error);
-
- if (stream->priv->cancellable)
- g_object_unref (stream->priv->cancellable);
-
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
+ GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
-static void
-g_socket_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- gint io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (stream);
- GSource *source;
-
- g_assert (output_stream->priv->result == NULL);
-
- output_stream->priv->result =
- g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
- g_socket_output_stream_write_async);
- if (cancellable)
- g_object_ref (cancellable);
- output_stream->priv->cancellable = cancellable;
- output_stream->priv->buffer = buffer;
- output_stream->priv->count = count;
-
- source = g_socket_create_source (output_stream->priv->socket,
- G_IO_OUT | G_IO_HUP | G_IO_ERR,
- cancellable);
- g_source_set_callback (source,
- (GSourceFunc) g_socket_output_stream_write_ready,
- g_object_ref (output_stream), g_object_unref);
- g_source_attach (source, g_main_context_get_thread_default ());
- g_source_unref (source);
+ return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
}
static gssize
-g_socket_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
- gssize count;
-
- g_return_val_if_fail (G_IS_SOCKET_OUTPUT_STREAM (stream), -1);
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_output_stream_write_async);
-
- count = g_simple_async_result_get_op_res_gssize (simple);
-
- return count;
-}
-
-static gboolean
-g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
+g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
+ const void *buffer,
+ gsize size,
+ GError **error)
{
GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
- return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
+ return g_socket_send_with_blocking (output_stream->priv->socket,
+ buffer, size, FALSE,
+ NULL, error);
}
static GSource *
return pollable_source;
}
-static gssize
-g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
- const void *buffer,
- gsize size,
- GError **error)
-{
- GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
-
- return g_socket_send_with_blocking (output_stream->priv->socket,
- buffer, size, FALSE,
- NULL, error);
-}
-
#ifdef G_OS_UNIX
static int
g_socket_output_stream_get_fd (GFileDescriptorBased *fd_based)
gobject_class->set_property = g_socket_output_stream_set_property;
goutputstream_class->write_fn = g_socket_output_stream_write;
- goutputstream_class->write_async = g_socket_output_stream_write_async;
- goutputstream_class->write_finish = g_socket_output_stream_write_finish;
g_object_class_install_property (gobject_class, PROP_SOCKET,
g_param_spec_object ("socket",
static gboolean g_unix_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_unix_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_unix_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_unix_input_stream_skip_async (GInputStream *stream,
gsize count,
int io_priority,
GAsyncResult *result,
GError **error);
+static gboolean g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream);
static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream);
static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
stream_class->read_fn = g_unix_input_stream_read;
stream_class->close_fn = g_unix_input_stream_close;
- stream_class->read_async = g_unix_input_stream_read_async;
- stream_class->read_finish = g_unix_input_stream_read_finish;
if (0)
{
/* TODO: Implement instead of using fallbacks */
static void
g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
+ iface->can_poll = g_unix_input_stream_pollable_can_poll;
iface->is_readable = g_unix_input_stream_pollable_is_readable;
iface->create_source = g_unix_input_stream_pollable_create_source;
}
return res != -1;
}
-typedef struct {
- gsize count;
- void *buffer;
- GAsyncReadyCallback callback;
- gpointer user_data;
- GCancellable *cancellable;
- GUnixInputStream *stream;
-} ReadAsyncData;
-
-static gboolean
-read_async_cb (int fd,
- GIOCondition condition,
- ReadAsyncData *data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize count_read;
-
- /* We know that we can read from fd once without blocking */
- while (1)
- {
- if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
- {
- count_read = -1;
- break;
- }
- count_read = read (data->stream->priv->fd, data->buffer, data->count);
- if (count_read == -1)
- {
- int errsv = errno;
-
- if (errsv == EINTR || errsv == EAGAIN)
- return TRUE;
-
- g_set_error (&error, G_IO_ERROR,
- g_io_error_from_errno (errsv),
- _("Error reading from file descriptor: %s"),
- g_strerror (errsv));
- }
- break;
- }
-
- simple = g_simple_async_result_new (G_OBJECT (data->stream),
- data->callback,
- data->user_data,
- g_unix_input_stream_read_async);
-
- g_simple_async_result_set_op_res_gssize (simple, count_read);
-
- if (count_read == -1)
- g_simple_async_result_take_error (simple, error);
-
- /* Complete immediately, not in idle, since we're already in a mainloop callout */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
-
-static void
-g_unix_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSource *source;
- GUnixInputStream *unix_stream;
- ReadAsyncData *data;
-
- unix_stream = G_UNIX_INPUT_STREAM (stream);
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
- read_async (stream, buffer, count, io_priority,
- cancellable, callback, user_data);
- return;
- }
-
- data = g_new0 (ReadAsyncData, 1);
- data->count = count;
- data->buffer = buffer;
- data->callback = callback;
- data->user_data = user_data;
- data->cancellable = cancellable;
- data->stream = unix_stream;
-
- source = _g_fd_source_new (unix_stream->priv->fd,
- G_IO_IN,
- cancellable);
- g_source_set_name (source, "GUnixInputStream");
-
- g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
- g_source_attach (source, g_main_context_get_thread_default ());
-
- g_source_unref (source);
-}
-
-static gssize
-g_unix_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
- GSimpleAsyncResult *simple;
- gssize nread;
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
- read_finish (stream, result, error);
- }
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);
-
- nread = g_simple_async_result_get_op_res_gssize (simple);
- return nread;
-}
-
static void
g_unix_input_stream_skip_async (GInputStream *stream,
gsize count,
}
static gboolean
+g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream)
+{
+ return G_UNIX_INPUT_STREAM (stream)->priv->is_pipe_or_socket;
+}
+
+static gboolean
g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
{
GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
static gboolean g_unix_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
-static void g_unix_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize g_unix_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error);
static void g_unix_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
stream_class->write_fn = g_unix_output_stream_write;
stream_class->close_fn = g_unix_output_stream_close;
- stream_class->write_async = g_unix_output_stream_write_async;
- stream_class->write_finish = g_unix_output_stream_write_finish;
stream_class->close_async = g_unix_output_stream_close_async;
stream_class->close_finish = g_unix_output_stream_close_finish;
}
typedef struct {
- gsize count;
- const void *buffer;
- GAsyncReadyCallback callback;
- gpointer user_data;
- GCancellable *cancellable;
- GUnixOutputStream *stream;
-} WriteAsyncData;
-
-static gboolean
-write_async_cb (int fd,
- GIOCondition condition,
- WriteAsyncData *data)
-{
- GSimpleAsyncResult *simple;
- GError *error = NULL;
- gssize count_written;
-
- while (1)
- {
- if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
- {
- count_written = -1;
- break;
- }
-
- count_written = write (data->stream->priv->fd, data->buffer, data->count);
- if (count_written == -1)
- {
- int errsv = errno;
-
- if (errsv == EINTR || errsv == EAGAIN)
- return TRUE;
-
- g_set_error (&error, G_IO_ERROR,
- g_io_error_from_errno (errsv),
- _("Error writing to file descriptor: %s"),
- g_strerror (errsv));
- }
- break;
- }
-
- simple = g_simple_async_result_new (G_OBJECT (data->stream),
- data->callback,
- data->user_data,
- g_unix_output_stream_write_async);
-
- g_simple_async_result_set_op_res_gssize (simple, count_written);
-
- if (count_written == -1)
- g_simple_async_result_take_error (simple, error);
-
- /* Complete immediately, not in idle, since we're already in a mainloop callout */
- g_simple_async_result_complete (simple);
- g_object_unref (simple);
-
- return FALSE;
-}
-
-static void
-g_unix_output_stream_write_async (GOutputStream *stream,
- const void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSource *source;
- GUnixOutputStream *unix_stream;
- WriteAsyncData *data;
-
- unix_stream = G_UNIX_OUTPUT_STREAM (stream);
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
- write_async (stream, buffer, count, io_priority,
- cancellable, callback, user_data);
- return;
- }
-
- data = g_new0 (WriteAsyncData, 1);
- data->count = count;
- data->buffer = buffer;
- data->callback = callback;
- data->user_data = user_data;
- data->cancellable = cancellable;
- data->stream = unix_stream;
-
- source = _g_fd_source_new (unix_stream->priv->fd,
- G_IO_OUT,
- cancellable);
- g_source_set_name (source, "GUnixOutputStream");
-
- g_source_set_callback (source, (GSourceFunc)write_async_cb, data, g_free);
- g_source_attach (source, g_main_context_get_thread_default ());
-
- g_source_unref (source);
-}
-
-static gssize
-g_unix_output_stream_write_finish (GOutputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
- GSimpleAsyncResult *simple;
- gssize nwritten;
-
- if (!unix_stream->priv->is_pipe_or_socket)
- {
- return G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
- write_finish (stream, result, error);
- }
-
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async);
-
- nwritten = g_simple_async_result_get_op_res_gssize (simple);
- return nwritten;
-}
-
-typedef struct {
GOutputStream *stream;
GAsyncReadyCallback callback;
gpointer user_data;