1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Alexander Larsson <alexl@redhat.com>
25 #include <sys/types.h>
34 #include <glib/gstdio.h>
36 #include "gsimpleasyncresult.h"
37 #include "gsocketinputstream.h"
38 #include "gcancellable.h"
39 #include "gasynchelper.h"
43 * SECTION:gsocketinputstream
44 * @short_description: Socket Input Stream
45 * @see_also: #GInputStream.
47 * #GSocketInputStream implements #GInputStream for reading from a socket,
48 * including asynchronous operations.
51 G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
53 struct _GSocketInputStreamPrivate {
55 gboolean close_fd_at_close;
58 static gssize g_socket_input_stream_read (GInputStream *stream,
61 GCancellable *cancellable,
63 static gboolean g_socket_input_stream_close (GInputStream *stream,
64 GCancellable *cancellable,
66 static void g_socket_input_stream_read_async (GInputStream *stream,
70 GCancellable *cancellable,
71 GAsyncReadyCallback callback,
73 static gssize g_socket_input_stream_read_finish (GInputStream *stream,
76 static void g_socket_input_stream_skip_async (GInputStream *stream,
79 GCancellable *cancellable,
80 GAsyncReadyCallback callback,
82 static gssize g_socket_input_stream_skip_finish (GInputStream *stream,
85 static void g_socket_input_stream_close_async (GInputStream *stream,
87 GCancellable *cancellable,
88 GAsyncReadyCallback callback,
90 static gboolean g_socket_input_stream_close_finish (GInputStream *stream,
95 g_socket_input_stream_finalize (GObject *object)
97 GSocketInputStream *stream;
99 stream = G_SOCKET_INPUT_STREAM (object);
101 if (G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize)
102 (*G_OBJECT_CLASS (g_socket_input_stream_parent_class)->finalize) (object);
106 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
108 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109 GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
111 g_type_class_add_private (klass, sizeof (GSocketInputStreamPrivate));
113 gobject_class->finalize = g_socket_input_stream_finalize;
115 stream_class->read = g_socket_input_stream_read;
116 stream_class->close = g_socket_input_stream_close;
117 stream_class->read_async = g_socket_input_stream_read_async;
118 stream_class->read_finish = g_socket_input_stream_read_finish;
121 /* TODO: Implement instead of using fallbacks */
122 stream_class->skip_async = g_socket_input_stream_skip_async;
123 stream_class->skip_finish = g_socket_input_stream_skip_finish;
125 stream_class->close_async = g_socket_input_stream_close_async;
126 stream_class->close_finish = g_socket_input_stream_close_finish;
130 g_socket_input_stream_init (GSocketInputStream *socket)
132 socket->priv = G_TYPE_INSTANCE_GET_PRIVATE (socket,
133 G_TYPE_SOCKET_INPUT_STREAM,
134 GSocketInputStreamPrivate);
138 * g_socket_input_stream_new:
139 * @fd: socket file descriptor.
140 * @close_fd_at_close: a #gboolean.
142 * Creates a new #GSocketInputStream for the given @fd. If @close_fd_at_close
143 * is %TRUE, the socket will be closed when the stream is closed.
145 * Returns: a #GSocketInputStream.
148 g_socket_input_stream_new (int fd,
149 gboolean close_fd_at_close)
151 GSocketInputStream *stream;
153 g_return_val_if_fail (fd != -1, NULL);
155 stream = g_object_new (G_TYPE_SOCKET_INPUT_STREAM, NULL);
157 stream->priv->fd = fd;
158 stream->priv->close_fd_at_close = close_fd_at_close;
160 return G_INPUT_STREAM (stream);
164 g_socket_input_stream_read (GInputStream *stream,
167 GCancellable *cancellable,
170 GSocketInputStream *socket_stream;
172 struct pollfd poll_fds[2];
176 socket_stream = G_SOCKET_INPUT_STREAM (stream);
178 cancel_fd = g_cancellable_get_fd (cancellable);
183 poll_fds[0].events = POLLIN;
184 poll_fds[0].fd = socket_stream->priv->fd;
185 poll_fds[1].events = POLLIN;
186 poll_fds[1].fd = cancel_fd;
187 poll_ret = poll (poll_fds, 2, -1);
189 while (poll_ret == -1 && errno == EINTR);
193 g_set_error (error, G_IO_ERROR,
194 g_io_error_from_errno (errno),
195 _("Error reading from socket: %s"),
203 if (g_cancellable_set_error_if_cancelled (cancellable, error))
205 res = read (socket_stream->priv->fd, buffer, count);
211 g_set_error (error, G_IO_ERROR,
212 g_io_error_from_errno (errno),
213 _("Error reading from socket: %s"),
224 g_socket_input_stream_close (GInputStream *stream,
225 GCancellable *cancellable,
228 GSocketInputStream *socket_stream;
231 socket_stream = G_SOCKET_INPUT_STREAM (stream);
233 if (!socket_stream->priv->close_fd_at_close)
238 /* This might block during the close. Doesn't seem to be a way to avoid it though. */
239 res = close (socket_stream->priv->fd);
242 g_set_error (error, G_IO_ERROR,
243 g_io_error_from_errno (errno),
244 _("Error closing socket: %s"),
256 GAsyncReadyCallback callback;
258 GCancellable *cancellable;
259 GSocketInputStream *stream;
263 read_async_cb (ReadAsyncData *data,
264 GIOCondition condition,
267 GSimpleAsyncResult *simple;
268 GError *error = NULL;
271 /* We know that we can read from fd once without blocking */
274 if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
279 count_read = read (data->stream->priv->fd, data->buffer, data->count);
280 if (count_read == -1)
285 g_set_error (&error, G_IO_ERROR,
286 g_io_error_from_errno (errno),
287 _("Error reading from socket: %s"),
293 simple = g_simple_async_result_new (G_OBJECT (data->stream),
296 g_socket_input_stream_read_async);
298 g_simple_async_result_set_op_res_gssize (simple, count_read);
300 if (count_read == -1)
302 g_simple_async_result_set_from_error (simple, error);
303 g_error_free (error);
306 /* Complete immediately, not in idle, since we're already in a mainloop callout */
307 g_simple_async_result_complete (simple);
308 g_object_unref (simple);
314 g_socket_input_stream_read_async (GInputStream *stream,
318 GCancellable *cancellable,
319 GAsyncReadyCallback callback,
323 GSocketInputStream *socket_stream;
326 socket_stream = G_SOCKET_INPUT_STREAM (stream);
328 data = g_new0 (ReadAsyncData, 1);
330 data->buffer = buffer;
331 data->callback = callback;
332 data->user_data = user_data;
333 data->cancellable = cancellable;
334 data->stream = socket_stream;
336 source = _g_fd_source_new (socket_stream->priv->fd,
340 g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
341 g_source_attach (source, NULL);
343 g_source_unref (source);
347 g_socket_input_stream_read_finish (GInputStream *stream,
348 GAsyncResult *result,
351 GSimpleAsyncResult *simple;
354 simple = G_SIMPLE_ASYNC_RESULT (result);
355 g_assert (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
357 nread = g_simple_async_result_get_op_res_gssize (simple);
362 g_socket_input_stream_skip_async (GInputStream *stream,
365 GCancellable *cancellable,
366 GAsyncReadyCallback callback,
369 g_assert_not_reached ();
370 /* TODO: Not implemented */
374 g_socket_input_stream_skip_finish (GInputStream *stream,
375 GAsyncResult *result,
378 g_assert_not_reached ();
379 /* TODO: Not implemented */
384 GInputStream *stream;
385 GAsyncReadyCallback callback;
390 close_async_data_free (gpointer _data)
392 CloseAsyncData *data = _data;
398 close_async_cb (CloseAsyncData *data)
400 GSocketInputStream *socket_stream;
401 GSimpleAsyncResult *simple;
402 GError *error = NULL;
406 socket_stream = G_SOCKET_INPUT_STREAM (data->stream);
408 if (!socket_stream->priv->close_fd_at_close)
416 res = close (socket_stream->priv->fd);
419 g_set_error (&error, G_IO_ERROR,
420 g_io_error_from_errno (errno),
421 _("Error closing socket: %s"),
430 simple = g_simple_async_result_new (G_OBJECT (data->stream),
433 g_socket_input_stream_close_async);
437 g_simple_async_result_set_from_error (simple, error);
438 g_error_free (error);
441 /* Complete immediately, not in idle, since we're already in a mainloop callout */
442 g_simple_async_result_complete (simple);
443 g_object_unref (simple);
449 g_socket_input_stream_close_async (GInputStream *stream,
451 GCancellable *cancellable,
452 GAsyncReadyCallback callback,
456 CloseAsyncData *data;
458 data = g_new0 (CloseAsyncData, 1);
460 data->stream = stream;
461 data->callback = callback;
462 data->user_data = user_data;
464 idle = g_idle_source_new ();
465 g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
466 g_source_attach (idle, NULL);
467 g_source_unref (idle);
471 g_socket_input_stream_close_finish (GInputStream *stream,
472 GAsyncResult *result,
475 /* Failures handled in generic close_finish code */