1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Alexander Larsson <alexl@redhat.com>
25 #include <sys/types.h>
33 #include <glib/gstdio.h>
35 #include "gsimpleasyncresult.h"
36 #include "gunixinputstream.h"
37 #include "gcancellable.h"
38 #include "gasynchelper.h"
39 #include "gfiledescriptorbased.h"
44 * SECTION:gunixinputstream
45 * @short_description: Streaming input operations for UNIX file descriptors
46 * @include: gio/gunixinputstream.h
47 * @see_also: #GInputStream
49 * #GUnixInputStream implements #GInputStream for reading from a UNIX
50 * file descriptor, including asynchronous operations. (If the file
51 * descriptor refers to a socket or pipe, this will use poll() to do
52 * asynchronous I/O. If it refers to a regular file, it will fall back
53 * to doing asynchronous I/O in another thread like
54 * #GLocalFileInputStream.)
56 * Note that <filename><gio/gunixinputstream.h></filename> belongs
57 * to the UNIX-specific GIO interfaces, thus you have to use the
58 * <filename>gio-unix-2.0.pc</filename> pkg-config file when using it.
67 static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
68 static void g_unix_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface);
70 G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
71 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
72 g_unix_input_stream_pollable_iface_init)
73 G_IMPLEMENT_INTERFACE (G_TYPE_FILE_DESCRIPTOR_BASED,
74 g_unix_input_stream_file_descriptor_based_iface_init)
77 struct _GUnixInputStreamPrivate {
80 guint is_pipe_or_socket : 1;
83 static void g_unix_input_stream_set_property (GObject *object,
87 static void g_unix_input_stream_get_property (GObject *object,
91 static gssize g_unix_input_stream_read (GInputStream *stream,
94 GCancellable *cancellable,
96 static gboolean g_unix_input_stream_close (GInputStream *stream,
97 GCancellable *cancellable,
99 static void g_unix_input_stream_read_async (GInputStream *stream,
103 GCancellable *cancellable,
104 GAsyncReadyCallback callback,
106 static gssize g_unix_input_stream_read_finish (GInputStream *stream,
107 GAsyncResult *result,
109 static void g_unix_input_stream_skip_async (GInputStream *stream,
112 GCancellable *cancellable,
113 GAsyncReadyCallback callback,
115 static gssize g_unix_input_stream_skip_finish (GInputStream *stream,
116 GAsyncResult *result,
118 static void g_unix_input_stream_close_async (GInputStream *stream,
120 GCancellable *cancellable,
121 GAsyncReadyCallback callback,
123 static gboolean g_unix_input_stream_close_finish (GInputStream *stream,
124 GAsyncResult *result,
127 static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream);
128 static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
129 GCancellable *cancellable);
132 g_unix_input_stream_finalize (GObject *object)
134 G_OBJECT_CLASS (g_unix_input_stream_parent_class)->finalize (object);
138 g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
140 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
141 GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
143 g_type_class_add_private (klass, sizeof (GUnixInputStreamPrivate));
145 gobject_class->get_property = g_unix_input_stream_get_property;
146 gobject_class->set_property = g_unix_input_stream_set_property;
147 gobject_class->finalize = g_unix_input_stream_finalize;
149 stream_class->read_fn = g_unix_input_stream_read;
150 stream_class->close_fn = g_unix_input_stream_close;
151 stream_class->read_async = g_unix_input_stream_read_async;
152 stream_class->read_finish = g_unix_input_stream_read_finish;
155 /* TODO: Implement instead of using fallbacks */
156 stream_class->skip_async = g_unix_input_stream_skip_async;
157 stream_class->skip_finish = g_unix_input_stream_skip_finish;
159 stream_class->close_async = g_unix_input_stream_close_async;
160 stream_class->close_finish = g_unix_input_stream_close_finish;
163 * GUnixInputStream:fd:
165 * The file descriptor that the stream reads from.
169 g_object_class_install_property (gobject_class,
171 g_param_spec_int ("fd",
172 P_("File descriptor"),
173 P_("The file descriptor to read from"),
174 G_MININT, G_MAXINT, -1,
175 G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
178 * GUnixInputStream:close-fd:
180 * Whether to close the file descriptor when the stream is closed.
184 g_object_class_install_property (gobject_class,
186 g_param_spec_boolean ("close-fd",
187 P_("Close file descriptor"),
188 P_("Whether to close the file descriptor when the stream is closed"),
190 G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
194 g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
196 iface->is_readable = g_unix_input_stream_pollable_is_readable;
197 iface->create_source = g_unix_input_stream_pollable_create_source;
201 g_unix_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface)
203 iface->get_fd = (int (*) (GFileDescriptorBased *))g_unix_input_stream_get_fd;
207 g_unix_input_stream_set_property (GObject *object,
212 GUnixInputStream *unix_stream;
214 unix_stream = G_UNIX_INPUT_STREAM (object);
219 unix_stream->priv->fd = g_value_get_int (value);
220 if (lseek (unix_stream->priv->fd, 0, SEEK_CUR) == -1 && errno == ESPIPE)
221 unix_stream->priv->is_pipe_or_socket = TRUE;
223 unix_stream->priv->is_pipe_or_socket = FALSE;
226 unix_stream->priv->close_fd = g_value_get_boolean (value);
229 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
235 g_unix_input_stream_get_property (GObject *object,
240 GUnixInputStream *unix_stream;
242 unix_stream = G_UNIX_INPUT_STREAM (object);
247 g_value_set_int (value, unix_stream->priv->fd);
250 g_value_set_boolean (value, unix_stream->priv->close_fd);
253 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
258 g_unix_input_stream_init (GUnixInputStream *unix_stream)
260 unix_stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (unix_stream,
261 G_TYPE_UNIX_INPUT_STREAM,
262 GUnixInputStreamPrivate);
264 unix_stream->priv->fd = -1;
265 unix_stream->priv->close_fd = TRUE;
269 * g_unix_input_stream_new:
270 * @fd: a UNIX file descriptor
271 * @close_fd: %TRUE to close the file descriptor when done
273 * Creates a new #GUnixInputStream for the given @fd.
275 * If @close_fd is %TRUE, the file descriptor will be closed
276 * when the stream is closed.
278 * Returns: a new #GUnixInputStream
281 g_unix_input_stream_new (gint fd,
284 GUnixInputStream *stream;
286 g_return_val_if_fail (fd != -1, NULL);
288 stream = g_object_new (G_TYPE_UNIX_INPUT_STREAM,
290 "close-fd", close_fd,
293 return G_INPUT_STREAM (stream);
297 * g_unix_input_stream_set_close_fd:
298 * @stream: a #GUnixInputStream
299 * @close_fd: %TRUE to close the file descriptor when done
301 * Sets whether the file descriptor of @stream shall be closed
302 * when the stream is closed.
307 g_unix_input_stream_set_close_fd (GUnixInputStream *stream,
310 g_return_if_fail (G_IS_UNIX_INPUT_STREAM (stream));
312 close_fd = close_fd != FALSE;
313 if (stream->priv->close_fd != close_fd)
315 stream->priv->close_fd = close_fd;
316 g_object_notify (G_OBJECT (stream), "close-fd");
321 * g_unix_input_stream_get_close_fd:
322 * @stream: a #GUnixInputStream
324 * Returns whether the file descriptor of @stream will be
325 * closed when the stream is closed.
327 * Return value: %TRUE if the file descriptor is closed when done
332 g_unix_input_stream_get_close_fd (GUnixInputStream *stream)
334 g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), FALSE);
336 return stream->priv->close_fd;
340 * g_unix_input_stream_get_fd:
341 * @stream: a #GUnixInputStream
343 * Return the UNIX file descriptor that the stream reads from.
345 * Return value: The file descriptor of @stream
350 g_unix_input_stream_get_fd (GUnixInputStream *stream)
352 g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), -1);
354 return stream->priv->fd;
358 g_unix_input_stream_read (GInputStream *stream,
361 GCancellable *cancellable,
364 GUnixInputStream *unix_stream;
369 unix_stream = G_UNIX_INPUT_STREAM (stream);
371 if (unix_stream->priv->is_pipe_or_socket &&
372 g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
374 poll_fds[0].fd = unix_stream->priv->fd;
375 poll_fds[0].events = G_IO_IN;
377 poll_ret = g_poll (poll_fds, 2, -1);
378 while (poll_ret == -1 && errno == EINTR);
379 g_cancellable_release_fd (cancellable);
385 g_set_error (error, G_IO_ERROR,
386 g_io_error_from_errno (errsv),
387 _("Error reading from unix: %s"),
395 if (g_cancellable_set_error_if_cancelled (cancellable, error))
397 res = read (unix_stream->priv->fd, buffer, count);
405 g_set_error (error, G_IO_ERROR,
406 g_io_error_from_errno (errsv),
407 _("Error reading from unix: %s"),
418 g_unix_input_stream_close (GInputStream *stream,
419 GCancellable *cancellable,
422 GUnixInputStream *unix_stream;
425 unix_stream = G_UNIX_INPUT_STREAM (stream);
427 if (!unix_stream->priv->close_fd)
432 /* This might block during the close. Doesn't seem to be a way to avoid it though. */
433 res = close (unix_stream->priv->fd);
438 g_set_error (error, G_IO_ERROR,
439 g_io_error_from_errno (errsv),
440 _("Error closing unix: %s"),
452 GAsyncReadyCallback callback;
454 GCancellable *cancellable;
455 GUnixInputStream *stream;
459 read_async_cb (int fd,
460 GIOCondition condition,
463 GSimpleAsyncResult *simple;
464 GError *error = NULL;
467 /* We know that we can read from fd once without blocking */
470 if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
475 count_read = read (data->stream->priv->fd, data->buffer, data->count);
476 if (count_read == -1)
483 g_set_error (&error, G_IO_ERROR,
484 g_io_error_from_errno (errsv),
485 _("Error reading from unix: %s"),
491 simple = g_simple_async_result_new (G_OBJECT (data->stream),
494 g_unix_input_stream_read_async);
496 g_simple_async_result_set_op_res_gssize (simple, count_read);
498 if (count_read == -1)
499 g_simple_async_result_take_error (simple, error);
501 /* Complete immediately, not in idle, since we're already in a mainloop callout */
502 g_simple_async_result_complete (simple);
503 g_object_unref (simple);
509 g_unix_input_stream_read_async (GInputStream *stream,
513 GCancellable *cancellable,
514 GAsyncReadyCallback callback,
518 GUnixInputStream *unix_stream;
521 unix_stream = G_UNIX_INPUT_STREAM (stream);
523 if (!unix_stream->priv->is_pipe_or_socket)
525 G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
526 read_async (stream, buffer, count, io_priority,
527 cancellable, callback, user_data);
531 data = g_new0 (ReadAsyncData, 1);
533 data->buffer = buffer;
534 data->callback = callback;
535 data->user_data = user_data;
536 data->cancellable = cancellable;
537 data->stream = unix_stream;
539 source = _g_fd_source_new (unix_stream->priv->fd,
542 g_source_set_name (source, "GUnixInputStream");
544 g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
545 g_source_attach (source, g_main_context_get_thread_default ());
547 g_source_unref (source);
551 g_unix_input_stream_read_finish (GInputStream *stream,
552 GAsyncResult *result,
555 GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
556 GSimpleAsyncResult *simple;
559 if (!unix_stream->priv->is_pipe_or_socket)
561 return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
562 read_finish (stream, result, error);
565 simple = G_SIMPLE_ASYNC_RESULT (result);
566 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);
568 nread = g_simple_async_result_get_op_res_gssize (simple);
573 g_unix_input_stream_skip_async (GInputStream *stream,
576 GCancellable *cancellable,
577 GAsyncReadyCallback callback,
580 g_warn_if_reached ();
581 /* TODO: Not implemented */
585 g_unix_input_stream_skip_finish (GInputStream *stream,
586 GAsyncResult *result,
589 g_warn_if_reached ();
591 /* TODO: Not implemented */
596 GInputStream *stream;
597 GAsyncReadyCallback callback;
602 close_async_data_free (gpointer _data)
604 CloseAsyncData *data = _data;
610 close_async_cb (CloseAsyncData *data)
612 GUnixInputStream *unix_stream;
613 GSimpleAsyncResult *simple;
614 GError *error = NULL;
618 unix_stream = G_UNIX_INPUT_STREAM (data->stream);
620 if (!unix_stream->priv->close_fd)
628 res = close (unix_stream->priv->fd);
633 g_set_error (&error, G_IO_ERROR,
634 g_io_error_from_errno (errsv),
635 _("Error closing unix: %s"),
644 simple = g_simple_async_result_new (G_OBJECT (data->stream),
647 g_unix_input_stream_close_async);
650 g_simple_async_result_take_error (simple, error);
652 /* Complete immediately, not in idle, since we're already in a mainloop callout */
653 g_simple_async_result_complete (simple);
654 g_object_unref (simple);
660 g_unix_input_stream_close_async (GInputStream *stream,
662 GCancellable *cancellable,
663 GAsyncReadyCallback callback,
667 CloseAsyncData *data;
669 data = g_new0 (CloseAsyncData, 1);
671 data->stream = stream;
672 data->callback = callback;
673 data->user_data = user_data;
675 idle = g_idle_source_new ();
676 g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
677 g_source_attach (idle, g_main_context_get_thread_default ());
678 g_source_unref (idle);
682 g_unix_input_stream_close_finish (GInputStream *stream,
683 GAsyncResult *result,
686 /* Failures handled in generic close_finish code */
691 g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
693 GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
697 poll_fd.fd = unix_stream->priv->fd;
698 poll_fd.events = G_IO_IN;
701 result = g_poll (&poll_fd, 1, 0);
702 while (result == -1 && errno == EINTR);
704 return poll_fd.revents != 0;
708 g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
709 GCancellable *cancellable)
711 GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
712 GSource *inner_source, *pollable_source;
714 pollable_source = g_pollable_source_new (G_OBJECT (stream));
716 inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable);
717 g_source_set_dummy_callback (inner_source);
718 g_source_add_child_source (pollable_source, inner_source);
719 g_source_unref (inner_source);
721 return pollable_source;