GUnixInputStream, GUnixOutputStream: support ordinary files better
[platform/upstream/glib.git] / gio / gunixinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31
32 #include <glib.h>
33 #include <glib/gstdio.h>
34 #include "gioerror.h"
35 #include "gsimpleasyncresult.h"
36 #include "gunixinputstream.h"
37 #include "gcancellable.h"
38 #include "gasynchelper.h"
39 #include "gfiledescriptorbased.h"
40 #include "glibintl.h"
41
42
43 /**
44  * SECTION:gunixinputstream
45  * @short_description: Streaming input operations for UNIX file descriptors
46  * @include: gio/gunixinputstream.h
47  * @see_also: #GInputStream
48  *
49  * #GUnixInputStream implements #GInputStream for reading from a UNIX
50  * file descriptor, including asynchronous operations. (If the file
51  * descriptor refers to a socket or pipe, this will use poll() to do
52  * asynchronous I/O. If it refers to a regular file, it will fall back
53  * to doing asynchronous I/O in another thread like
54  * #GLocalFileInputStream.)
55  *
56  * Note that <filename>&lt;gio/gunixinputstream.h&gt;</filename> belongs
57  * to the UNIX-specific GIO interfaces, thus you have to use the
58  * <filename>gio-unix-2.0.pc</filename> pkg-config file when using it.
59  */
60
61 enum {
62   PROP_0,
63   PROP_FD,
64   PROP_CLOSE_FD
65 };
66
67 static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
68 static void g_unix_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface);
69
70 G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
71                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
72                                                 g_unix_input_stream_pollable_iface_init)
73                          G_IMPLEMENT_INTERFACE (G_TYPE_FILE_DESCRIPTOR_BASED,
74                                                 g_unix_input_stream_file_descriptor_based_iface_init)
75                          )
76
77 struct _GUnixInputStreamPrivate {
78   int fd;
79   guint close_fd : 1;
80   guint is_pipe_or_socket : 1;
81 };
82
83 static void     g_unix_input_stream_set_property (GObject              *object,
84                                                   guint                 prop_id,
85                                                   const GValue         *value,
86                                                   GParamSpec           *pspec);
87 static void     g_unix_input_stream_get_property (GObject              *object,
88                                                   guint                 prop_id,
89                                                   GValue               *value,
90                                                   GParamSpec           *pspec);
91 static gssize   g_unix_input_stream_read         (GInputStream         *stream,
92                                                   void                 *buffer,
93                                                   gsize                 count,
94                                                   GCancellable         *cancellable,
95                                                   GError              **error);
96 static gboolean g_unix_input_stream_close        (GInputStream         *stream,
97                                                   GCancellable         *cancellable,
98                                                   GError              **error);
99 static void     g_unix_input_stream_read_async   (GInputStream         *stream,
100                                                   void                 *buffer,
101                                                   gsize                 count,
102                                                   int                   io_priority,
103                                                   GCancellable         *cancellable,
104                                                   GAsyncReadyCallback   callback,
105                                                   gpointer              data);
106 static gssize   g_unix_input_stream_read_finish  (GInputStream         *stream,
107                                                   GAsyncResult         *result,
108                                                   GError              **error);
109 static void     g_unix_input_stream_skip_async   (GInputStream         *stream,
110                                                   gsize                 count,
111                                                   int                   io_priority,
112                                                   GCancellable         *cancellable,
113                                                   GAsyncReadyCallback   callback,
114                                                   gpointer              data);
115 static gssize   g_unix_input_stream_skip_finish  (GInputStream         *stream,
116                                                   GAsyncResult         *result,
117                                                   GError              **error);
118 static void     g_unix_input_stream_close_async  (GInputStream         *stream,
119                                                   int                   io_priority,
120                                                   GCancellable         *cancellable,
121                                                   GAsyncReadyCallback   callback,
122                                                   gpointer              data);
123 static gboolean g_unix_input_stream_close_finish (GInputStream         *stream,
124                                                   GAsyncResult         *result,
125                                                   GError              **error);
126
127 static gboolean g_unix_input_stream_pollable_is_readable   (GPollableInputStream *stream);
128 static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
129                                                             GCancellable         *cancellable);
130
131 static void
132 g_unix_input_stream_finalize (GObject *object)
133 {
134   G_OBJECT_CLASS (g_unix_input_stream_parent_class)->finalize (object);
135 }
136
137 static void
138 g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
139 {
140   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
141   GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
142   
143   g_type_class_add_private (klass, sizeof (GUnixInputStreamPrivate));
144
145   gobject_class->get_property = g_unix_input_stream_get_property;
146   gobject_class->set_property = g_unix_input_stream_set_property;
147   gobject_class->finalize = g_unix_input_stream_finalize;
148
149   stream_class->read_fn = g_unix_input_stream_read;
150   stream_class->close_fn = g_unix_input_stream_close;
151   stream_class->read_async = g_unix_input_stream_read_async;
152   stream_class->read_finish = g_unix_input_stream_read_finish;
153   if (0)
154     {
155       /* TODO: Implement instead of using fallbacks */
156       stream_class->skip_async = g_unix_input_stream_skip_async;
157       stream_class->skip_finish = g_unix_input_stream_skip_finish;
158     }
159   stream_class->close_async = g_unix_input_stream_close_async;
160   stream_class->close_finish = g_unix_input_stream_close_finish;
161
162   /**
163    * GUnixInputStream:fd:
164    *
165    * The file descriptor that the stream reads from.
166    *
167    * Since: 2.20
168    */
169   g_object_class_install_property (gobject_class,
170                                    PROP_FD,
171                                    g_param_spec_int ("fd",
172                                                      P_("File descriptor"),
173                                                      P_("The file descriptor to read from"),
174                                                      G_MININT, G_MAXINT, -1,
175                                                      G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
176
177   /**
178    * GUnixInputStream:close-fd:
179    *
180    * Whether to close the file descriptor when the stream is closed.
181    *
182    * Since: 2.20
183    */
184   g_object_class_install_property (gobject_class,
185                                    PROP_CLOSE_FD,
186                                    g_param_spec_boolean ("close-fd",
187                                                          P_("Close file descriptor"),
188                                                          P_("Whether to close the file descriptor when the stream is closed"),
189                                                          TRUE,
190                                                          G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
191 }
192
193 static void
194 g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
195 {
196   iface->is_readable = g_unix_input_stream_pollable_is_readable;
197   iface->create_source = g_unix_input_stream_pollable_create_source;
198 }
199
200 static void
201 g_unix_input_stream_file_descriptor_based_iface_init (GFileDescriptorBasedIface *iface)
202 {
203   iface->get_fd = (int (*) (GFileDescriptorBased *))g_unix_input_stream_get_fd;
204 }
205
206 static void
207 g_unix_input_stream_set_property (GObject         *object,
208                                   guint            prop_id,
209                                   const GValue    *value,
210                                   GParamSpec      *pspec)
211 {
212   GUnixInputStream *unix_stream;
213   
214   unix_stream = G_UNIX_INPUT_STREAM (object);
215
216   switch (prop_id)
217     {
218     case PROP_FD:
219       unix_stream->priv->fd = g_value_get_int (value);
220       if (lseek (unix_stream->priv->fd, 0, SEEK_CUR) == -1 && errno == ESPIPE)
221         unix_stream->priv->is_pipe_or_socket = TRUE;
222       else
223         unix_stream->priv->is_pipe_or_socket = FALSE;
224       break;
225     case PROP_CLOSE_FD:
226       unix_stream->priv->close_fd = g_value_get_boolean (value);
227       break;
228     default:
229       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
230       break;
231     }
232 }
233
234 static void
235 g_unix_input_stream_get_property (GObject    *object,
236                                   guint       prop_id,
237                                   GValue     *value,
238                                   GParamSpec *pspec)
239 {
240   GUnixInputStream *unix_stream;
241
242   unix_stream = G_UNIX_INPUT_STREAM (object);
243
244   switch (prop_id)
245     {
246     case PROP_FD:
247       g_value_set_int (value, unix_stream->priv->fd);
248       break;
249     case PROP_CLOSE_FD:
250       g_value_set_boolean (value, unix_stream->priv->close_fd);
251       break;
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254     }
255 }
256
257 static void
258 g_unix_input_stream_init (GUnixInputStream *unix_stream)
259 {
260   unix_stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (unix_stream,
261                                                    G_TYPE_UNIX_INPUT_STREAM,
262                                                    GUnixInputStreamPrivate);
263
264   unix_stream->priv->fd = -1;
265   unix_stream->priv->close_fd = TRUE;
266 }
267
268 /**
269  * g_unix_input_stream_new:
270  * @fd: a UNIX file descriptor
271  * @close_fd: %TRUE to close the file descriptor when done
272  * 
273  * Creates a new #GUnixInputStream for the given @fd. 
274  *
275  * If @close_fd is %TRUE, the file descriptor will be closed 
276  * when the stream is closed.
277  * 
278  * Returns: a new #GUnixInputStream
279  **/
280 GInputStream *
281 g_unix_input_stream_new (gint     fd,
282                          gboolean close_fd)
283 {
284   GUnixInputStream *stream;
285
286   g_return_val_if_fail (fd != -1, NULL);
287
288   stream = g_object_new (G_TYPE_UNIX_INPUT_STREAM,
289                          "fd", fd,
290                          "close-fd", close_fd,
291                          NULL);
292
293   return G_INPUT_STREAM (stream);
294 }
295
296 /**
297  * g_unix_input_stream_set_close_fd:
298  * @stream: a #GUnixInputStream
299  * @close_fd: %TRUE to close the file descriptor when done
300  *
301  * Sets whether the file descriptor of @stream shall be closed
302  * when the stream is closed.
303  *
304  * Since: 2.20
305  */
306 void
307 g_unix_input_stream_set_close_fd (GUnixInputStream *stream,
308                                   gboolean          close_fd)
309 {
310   g_return_if_fail (G_IS_UNIX_INPUT_STREAM (stream));
311
312   close_fd = close_fd != FALSE;
313   if (stream->priv->close_fd != close_fd)
314     {
315       stream->priv->close_fd = close_fd;
316       g_object_notify (G_OBJECT (stream), "close-fd");
317     }
318 }
319
320 /**
321  * g_unix_input_stream_get_close_fd:
322  * @stream: a #GUnixInputStream
323  *
324  * Returns whether the file descriptor of @stream will be
325  * closed when the stream is closed.
326  *
327  * Return value: %TRUE if the file descriptor is closed when done
328  *
329  * Since: 2.20
330  */
331 gboolean
332 g_unix_input_stream_get_close_fd (GUnixInputStream *stream)
333 {
334   g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), FALSE);
335
336   return stream->priv->close_fd;
337 }
338
339 /**
340  * g_unix_input_stream_get_fd:
341  * @stream: a #GUnixInputStream
342  *
343  * Return the UNIX file descriptor that the stream reads from.
344  *
345  * Return value: The file descriptor of @stream
346  *
347  * Since: 2.20
348  */
349 gint
350 g_unix_input_stream_get_fd (GUnixInputStream *stream)
351 {
352   g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), -1);
353   
354   return stream->priv->fd;
355 }
356
357 static gssize
358 g_unix_input_stream_read (GInputStream  *stream,
359                           void          *buffer,
360                           gsize          count,
361                           GCancellable  *cancellable,
362                           GError       **error)
363 {
364   GUnixInputStream *unix_stream;
365   gssize res;
366   GPollFD poll_fds[2];
367   int poll_ret;
368
369   unix_stream = G_UNIX_INPUT_STREAM (stream);
370
371   if (unix_stream->priv->is_pipe_or_socket &&
372       g_cancellable_make_pollfd (cancellable, &poll_fds[1]))
373     {
374       poll_fds[0].fd = unix_stream->priv->fd;
375       poll_fds[0].events = G_IO_IN;
376       do
377         poll_ret = g_poll (poll_fds, 2, -1);
378       while (poll_ret == -1 && errno == EINTR);
379       g_cancellable_release_fd (cancellable);
380
381       if (poll_ret == -1)
382         {
383           int errsv = errno;
384
385           g_set_error (error, G_IO_ERROR,
386                        g_io_error_from_errno (errsv),
387                        _("Error reading from unix: %s"),
388                        g_strerror (errsv));
389           return -1;
390         }
391     }
392
393   while (1)
394     {
395       if (g_cancellable_set_error_if_cancelled (cancellable, error))
396         return -1;
397       res = read (unix_stream->priv->fd, buffer, count);
398       if (res == -1)
399         {
400           int errsv = errno;
401
402           if (errsv == EINTR)
403             continue;
404           
405           g_set_error (error, G_IO_ERROR,
406                        g_io_error_from_errno (errsv),
407                        _("Error reading from unix: %s"),
408                        g_strerror (errsv));
409         }
410       
411       break;
412     }
413
414   return res;
415 }
416
417 static gboolean
418 g_unix_input_stream_close (GInputStream  *stream,
419                            GCancellable  *cancellable,
420                            GError       **error)
421 {
422   GUnixInputStream *unix_stream;
423   int res;
424
425   unix_stream = G_UNIX_INPUT_STREAM (stream);
426
427   if (!unix_stream->priv->close_fd)
428     return TRUE;
429   
430   while (1)
431     {
432       /* This might block during the close. Doesn't seem to be a way to avoid it though. */
433       res = close (unix_stream->priv->fd);
434       if (res == -1)
435         {
436           int errsv = errno;
437
438           g_set_error (error, G_IO_ERROR,
439                        g_io_error_from_errno (errsv),
440                        _("Error closing unix: %s"),
441                        g_strerror (errsv));
442         }
443       break;
444     }
445   
446   return res != -1;
447 }
448
449 typedef struct {
450   gsize count;
451   void *buffer;
452   GAsyncReadyCallback callback;
453   gpointer user_data;
454   GCancellable *cancellable;
455   GUnixInputStream *stream;
456 } ReadAsyncData;
457
458 static gboolean
459 read_async_cb (int            fd,
460                GIOCondition   condition,
461                ReadAsyncData *data)
462 {
463   GSimpleAsyncResult *simple;
464   GError *error = NULL;
465   gssize count_read;
466
467   /* We know that we can read from fd once without blocking */
468   while (1)
469     {
470       if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
471         {
472           count_read = -1;
473           break;
474         }
475       count_read = read (data->stream->priv->fd, data->buffer, data->count);
476       if (count_read == -1)
477         {
478           int errsv = errno;
479
480           if (errsv == EINTR)
481             continue;
482           
483           g_set_error (&error, G_IO_ERROR,
484                        g_io_error_from_errno (errsv),
485                        _("Error reading from unix: %s"),
486                        g_strerror (errsv));
487         }
488       break;
489     }
490
491   simple = g_simple_async_result_new (G_OBJECT (data->stream),
492                                       data->callback,
493                                       data->user_data,
494                                       g_unix_input_stream_read_async);
495
496   g_simple_async_result_set_op_res_gssize (simple, count_read);
497
498   if (count_read == -1)
499     g_simple_async_result_take_error (simple, error);
500
501   /* Complete immediately, not in idle, since we're already in a mainloop callout */
502   g_simple_async_result_complete (simple);
503   g_object_unref (simple);
504
505   return FALSE;
506 }
507
508 static void
509 g_unix_input_stream_read_async (GInputStream        *stream,
510                                 void                *buffer,
511                                 gsize                count,
512                                 int                  io_priority,
513                                 GCancellable        *cancellable,
514                                 GAsyncReadyCallback  callback,
515                                 gpointer             user_data)
516 {
517   GSource *source;
518   GUnixInputStream *unix_stream;
519   ReadAsyncData *data;
520
521   unix_stream = G_UNIX_INPUT_STREAM (stream);
522
523   if (!unix_stream->priv->is_pipe_or_socket)
524     {
525       G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
526         read_async (stream, buffer, count, io_priority,
527                     cancellable, callback, user_data);
528       return;
529     }
530
531   data = g_new0 (ReadAsyncData, 1);
532   data->count = count;
533   data->buffer = buffer;
534   data->callback = callback;
535   data->user_data = user_data;
536   data->cancellable = cancellable;
537   data->stream = unix_stream;
538
539   source = _g_fd_source_new (unix_stream->priv->fd,
540                              G_IO_IN,
541                              cancellable);
542   g_source_set_name (source, "GUnixInputStream");
543   
544   g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
545   g_source_attach (source, g_main_context_get_thread_default ());
546  
547   g_source_unref (source);
548 }
549
550 static gssize
551 g_unix_input_stream_read_finish (GInputStream  *stream,
552                                  GAsyncResult  *result,
553                                  GError       **error)
554 {
555   GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
556   GSimpleAsyncResult *simple;
557   gssize nread;
558
559   if (!unix_stream->priv->is_pipe_or_socket)
560     {
561       return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
562         read_finish (stream, result, error);
563     }
564
565   simple = G_SIMPLE_ASYNC_RESULT (result);
566   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);
567   
568   nread = g_simple_async_result_get_op_res_gssize (simple);
569   return nread;
570 }
571
572 static void
573 g_unix_input_stream_skip_async (GInputStream        *stream,
574                                 gsize                count,
575                                 int                  io_priority,
576                                 GCancellable        *cancellable,
577                                 GAsyncReadyCallback  callback,
578                                 gpointer             data)
579 {
580   g_warn_if_reached ();
581   /* TODO: Not implemented */
582 }
583
584 static gssize
585 g_unix_input_stream_skip_finish  (GInputStream  *stream,
586                                   GAsyncResult  *result,
587                                   GError       **error)
588 {
589   g_warn_if_reached ();
590   return 0;
591   /* TODO: Not implemented */
592 }
593
594
595 typedef struct {
596   GInputStream *stream;
597   GAsyncReadyCallback callback;
598   gpointer user_data;
599 } CloseAsyncData;
600
601 static void
602 close_async_data_free (gpointer _data)
603 {
604   CloseAsyncData *data = _data;
605
606   g_free (data);
607 }
608
609 static gboolean
610 close_async_cb (CloseAsyncData *data)
611 {
612   GUnixInputStream *unix_stream;
613   GSimpleAsyncResult *simple;
614   GError *error = NULL;
615   gboolean result;
616   int res;
617
618   unix_stream = G_UNIX_INPUT_STREAM (data->stream);
619
620   if (!unix_stream->priv->close_fd)
621     {
622       result = TRUE;
623       goto out;
624     }
625   
626   while (1)
627     {
628       res = close (unix_stream->priv->fd);
629       if (res == -1)
630         {
631           int errsv = errno;
632
633           g_set_error (&error, G_IO_ERROR,
634                        g_io_error_from_errno (errsv),
635                        _("Error closing unix: %s"),
636                        g_strerror (errsv));
637         }
638       break;
639     }
640   
641   result = res != -1;
642
643  out:
644   simple = g_simple_async_result_new (G_OBJECT (data->stream),
645                                       data->callback,
646                                       data->user_data,
647                                       g_unix_input_stream_close_async);
648
649   if (!result)
650     g_simple_async_result_take_error (simple, error);
651
652   /* Complete immediately, not in idle, since we're already in a mainloop callout */
653   g_simple_async_result_complete (simple);
654   g_object_unref (simple);
655   
656   return FALSE;
657 }
658
659 static void
660 g_unix_input_stream_close_async (GInputStream        *stream,
661                                  int                  io_priority,
662                                  GCancellable        *cancellable,
663                                  GAsyncReadyCallback  callback,
664                                  gpointer             user_data)
665 {
666   GSource *idle;
667   CloseAsyncData *data;
668
669   data = g_new0 (CloseAsyncData, 1);
670
671   data->stream = stream;
672   data->callback = callback;
673   data->user_data = user_data;
674   
675   idle = g_idle_source_new ();
676   g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
677   g_source_attach (idle, g_main_context_get_thread_default ());
678   g_source_unref (idle);
679 }
680
681 static gboolean
682 g_unix_input_stream_close_finish (GInputStream  *stream,
683                                   GAsyncResult  *result,
684                                   GError       **error)
685 {
686   /* Failures handled in generic close_finish code */
687   return TRUE;
688 }
689
690 static gboolean
691 g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
692 {
693   GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
694   GPollFD poll_fd;
695   gint result;
696
697   poll_fd.fd = unix_stream->priv->fd;
698   poll_fd.events = G_IO_IN;
699
700   do
701     result = g_poll (&poll_fd, 1, 0);
702   while (result == -1 && errno == EINTR);
703
704   return poll_fd.revents != 0;
705 }
706
707 static GSource *
708 g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
709                                             GCancellable         *cancellable)
710 {
711   GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
712   GSource *inner_source, *pollable_source;
713
714   pollable_source = g_pollable_source_new (G_OBJECT (stream));
715
716   inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable);
717   g_source_set_dummy_callback (inner_source);
718   g_source_add_child_source (pollable_source, inner_source);
719   g_source_unref (inner_source);
720
721   return pollable_source;
722 }