Support g_main_context_push_thread_default() in gio
[platform/upstream/glib.git] / gio / gunixinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31
32 #include <glib.h>
33 #include <glib/gstdio.h>
34 #include "gioerror.h"
35 #include "gsimpleasyncresult.h"
36 #include "gunixinputstream.h"
37 #include "gcancellable.h"
38 #include "gasynchelper.h"
39 #include "glibintl.h"
40
41 #include "gioalias.h"
42
43 /**
44  * SECTION:gunixinputstream
45  * @short_description: Streaming input operations for UNIX file descriptors
46  * @include: gio/gunixinputstream.h
47  * @see_also: #GInputStream
48  *
49  * #GUnixInputStream implements #GInputStream for reading from a
50  * UNIX file descriptor, including asynchronous operations. The file
51  * descriptor must be selectable, so it doesn't work with opened files.
52  *
53  * Note that <filename>&lt;gio/gunixinputstream.h&gt;</filename> belongs 
54  * to the UNIX-specific GIO interfaces, thus you have to use the 
55  * <filename>gio-unix-2.0.pc</filename> pkg-config file when using it.
56  */
57
58 enum {
59   PROP_0,
60   PROP_FD,
61   PROP_CLOSE_FD
62 };
63
64 G_DEFINE_TYPE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM);
65
66 struct _GUnixInputStreamPrivate {
67   int fd;
68   gboolean close_fd;
69 };
70
71 static void     g_unix_input_stream_set_property (GObject              *object,
72                                                   guint                 prop_id,
73                                                   const GValue         *value,
74                                                   GParamSpec           *pspec);
75 static void     g_unix_input_stream_get_property (GObject              *object,
76                                                   guint                 prop_id,
77                                                   GValue               *value,
78                                                   GParamSpec           *pspec);
79 static gssize   g_unix_input_stream_read         (GInputStream         *stream,
80                                                   void                 *buffer,
81                                                   gsize                 count,
82                                                   GCancellable         *cancellable,
83                                                   GError              **error);
84 static gboolean g_unix_input_stream_close        (GInputStream         *stream,
85                                                   GCancellable         *cancellable,
86                                                   GError              **error);
87 static void     g_unix_input_stream_read_async   (GInputStream         *stream,
88                                                   void                 *buffer,
89                                                   gsize                 count,
90                                                   int                   io_priority,
91                                                   GCancellable         *cancellable,
92                                                   GAsyncReadyCallback   callback,
93                                                   gpointer              data);
94 static gssize   g_unix_input_stream_read_finish  (GInputStream         *stream,
95                                                   GAsyncResult         *result,
96                                                   GError              **error);
97 static void     g_unix_input_stream_skip_async   (GInputStream         *stream,
98                                                   gsize                 count,
99                                                   int                   io_priority,
100                                                   GCancellable         *cancellable,
101                                                   GAsyncReadyCallback   callback,
102                                                   gpointer              data);
103 static gssize   g_unix_input_stream_skip_finish  (GInputStream         *stream,
104                                                   GAsyncResult         *result,
105                                                   GError              **error);
106 static void     g_unix_input_stream_close_async  (GInputStream         *stream,
107                                                   int                   io_priority,
108                                                   GCancellable         *cancellable,
109                                                   GAsyncReadyCallback   callback,
110                                                   gpointer              data);
111 static gboolean g_unix_input_stream_close_finish (GInputStream         *stream,
112                                                   GAsyncResult         *result,
113                                                   GError              **error);
114
115
116 static void
117 g_unix_input_stream_finalize (GObject *object)
118 {
119   GUnixInputStream *stream;
120   
121   stream = G_UNIX_INPUT_STREAM (object);
122
123   G_OBJECT_CLASS (g_unix_input_stream_parent_class)->finalize (object);
124 }
125
126 static void
127 g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
128 {
129   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
130   GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
131   
132   g_type_class_add_private (klass, sizeof (GUnixInputStreamPrivate));
133
134   gobject_class->get_property = g_unix_input_stream_get_property;
135   gobject_class->set_property = g_unix_input_stream_set_property;
136   gobject_class->finalize = g_unix_input_stream_finalize;
137
138   stream_class->read_fn = g_unix_input_stream_read;
139   stream_class->close_fn = g_unix_input_stream_close;
140   stream_class->read_async = g_unix_input_stream_read_async;
141   stream_class->read_finish = g_unix_input_stream_read_finish;
142   if (0)
143     {
144       /* TODO: Implement instead of using fallbacks */
145       stream_class->skip_async = g_unix_input_stream_skip_async;
146       stream_class->skip_finish = g_unix_input_stream_skip_finish;
147     }
148   stream_class->close_async = g_unix_input_stream_close_async;
149   stream_class->close_finish = g_unix_input_stream_close_finish;
150
151   /**
152    * GUnixInputStream:fd:
153    *
154    * The file descriptor that the stream reads from.
155    *
156    * Since: 2.20
157    */
158   g_object_class_install_property (gobject_class,
159                                    PROP_FD,
160                                    g_param_spec_int ("fd",
161                                                      P_("File descriptor"),
162                                                      P_("The file descriptor to read from"),
163                                                      G_MININT, G_MAXINT, -1,
164                                                      G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
165
166   /**
167    * GUnixInputStream:close-fd:
168    *
169    * Whether to close the file descriptor when the stream is closed.
170    *
171    * Since: 2.20
172    */
173   g_object_class_install_property (gobject_class,
174                                    PROP_CLOSE_FD,
175                                    g_param_spec_boolean ("close-fd",
176                                                          P_("Close file descriptor"),
177                                                          P_("Whether to close the file descriptor when the stream is closed"),
178                                                          TRUE,
179                                                          G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB));
180 }
181
182 static void
183 g_unix_input_stream_set_property (GObject         *object,
184                                   guint            prop_id,
185                                   const GValue    *value,
186                                   GParamSpec      *pspec)
187 {
188   GUnixInputStream *unix_stream;
189   
190   unix_stream = G_UNIX_INPUT_STREAM (object);
191
192   switch (prop_id)
193     {
194     case PROP_FD:
195       unix_stream->priv->fd = g_value_get_int (value);
196       break;
197     case PROP_CLOSE_FD:
198       unix_stream->priv->close_fd = g_value_get_boolean (value);
199       break;
200     default:
201       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
202       break;
203     }
204 }
205
206 static void
207 g_unix_input_stream_get_property (GObject    *object,
208                                   guint       prop_id,
209                                   GValue     *value,
210                                   GParamSpec *pspec)
211 {
212   GUnixInputStream *unix_stream;
213
214   unix_stream = G_UNIX_INPUT_STREAM (object);
215
216   switch (prop_id)
217     {
218     case PROP_FD:
219       g_value_set_int (value, unix_stream->priv->fd);
220       break;
221     case PROP_CLOSE_FD:
222       g_value_set_boolean (value, unix_stream->priv->close_fd);
223       break;
224     default:
225       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
226     }
227 }
228
229 static void
230 g_unix_input_stream_init (GUnixInputStream *unix_stream)
231 {
232   unix_stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (unix_stream,
233                                                    G_TYPE_UNIX_INPUT_STREAM,
234                                                    GUnixInputStreamPrivate);
235
236   unix_stream->priv->fd = -1;
237   unix_stream->priv->close_fd = TRUE;
238 }
239
240 /**
241  * g_unix_input_stream_new:
242  * @fd: a UNIX file descriptor
243  * @close_fd: %TRUE to close the file descriptor when done
244  * 
245  * Creates a new #GUnixInputStream for the given @fd. 
246  *
247  * If @close_fd is %TRUE, the file descriptor will be closed 
248  * when the stream is closed.
249  * 
250  * Returns: a new #GUnixInputStream
251  **/
252 GInputStream *
253 g_unix_input_stream_new (gint     fd,
254                          gboolean close_fd)
255 {
256   GUnixInputStream *stream;
257
258   g_return_val_if_fail (fd != -1, NULL);
259
260   stream = g_object_new (G_TYPE_UNIX_INPUT_STREAM,
261                          "fd", fd,
262                          "close-fd", close_fd,
263                          NULL);
264
265   return G_INPUT_STREAM (stream);
266 }
267
268 /**
269  * g_unix_input_stream_set_close_fd:
270  * @stream: a #GUnixInputStream
271  * @close_fd: %TRUE to close the file descriptor when done
272  *
273  * Sets whether the file descriptor of @stream shall be closed
274  * when the stream is closed.
275  *
276  * Since: 2.20
277  */
278 void
279 g_unix_input_stream_set_close_fd (GUnixInputStream *stream,
280                                   gboolean          close_fd)
281 {
282   g_return_if_fail (G_IS_UNIX_INPUT_STREAM (stream));
283
284   close_fd = close_fd != FALSE;
285   if (stream->priv->close_fd != close_fd)
286     {
287       stream->priv->close_fd = close_fd;
288       g_object_notify (G_OBJECT (stream), "close-fd");
289     }
290 }
291
292 /**
293  * g_unix_input_stream_get_close_fd:
294  * @stream: a #GUnixInputStream
295  *
296  * Returns whether the file descriptor of @stream will be
297  * closed when the stream is closed.
298  *
299  * Return value: %TRUE if the file descriptor is closed when done
300  *
301  * Since: 2.20
302  */
303 gboolean
304 g_unix_input_stream_get_close_fd (GUnixInputStream *stream)
305 {
306   g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), FALSE);
307
308   return stream->priv->close_fd;
309 }
310
311 /**
312  * g_unix_input_stream_get_fd:
313  * @stream: a #GUnixInputStream
314  *
315  * Return the UNIX file descriptor that the stream reads from.
316  *
317  * Return value: The file descriptor of @stream
318  *
319  * Since: 2.20
320  */
321 gint
322 g_unix_input_stream_get_fd (GUnixInputStream *stream)
323 {
324   g_return_val_if_fail (G_IS_UNIX_INPUT_STREAM (stream), -1);
325   
326   return stream->priv->fd;
327 }
328
329 static gssize
330 g_unix_input_stream_read (GInputStream  *stream,
331                           void          *buffer,
332                           gsize          count,
333                           GCancellable  *cancellable,
334                           GError       **error)
335 {
336   GUnixInputStream *unix_stream;
337   gssize res;
338   GPollFD poll_fds[2];
339   int poll_ret;
340
341   unix_stream = G_UNIX_INPUT_STREAM (stream);
342
343   if (cancellable)
344     {
345       poll_fds[0].fd = unix_stream->priv->fd;
346       poll_fds[0].events = G_IO_IN;
347       g_cancellable_make_pollfd (cancellable, &poll_fds[1]);
348       do
349         poll_ret = g_poll (poll_fds, 2, -1);
350       while (poll_ret == -1 && errno == EINTR);
351       
352       if (poll_ret == -1)
353         {
354           int errsv = errno;
355
356           g_set_error (error, G_IO_ERROR,
357                        g_io_error_from_errno (errsv),
358                        _("Error reading from unix: %s"),
359                        g_strerror (errsv));
360           return -1;
361         }
362     }
363
364   while (1)
365     {
366       if (g_cancellable_set_error_if_cancelled (cancellable, error))
367         return -1;
368       res = read (unix_stream->priv->fd, buffer, count);
369       if (res == -1)
370         {
371           int errsv = errno;
372
373           if (errsv == EINTR)
374             continue;
375           
376           g_set_error (error, G_IO_ERROR,
377                        g_io_error_from_errno (errsv),
378                        _("Error reading from unix: %s"),
379                        g_strerror (errsv));
380         }
381       
382       break;
383     }
384
385   return res;
386 }
387
388 static gboolean
389 g_unix_input_stream_close (GInputStream  *stream,
390                            GCancellable  *cancellable,
391                            GError       **error)
392 {
393   GUnixInputStream *unix_stream;
394   int res;
395
396   unix_stream = G_UNIX_INPUT_STREAM (stream);
397
398   if (!unix_stream->priv->close_fd)
399     return TRUE;
400   
401   while (1)
402     {
403       /* This might block during the close. Doesn't seem to be a way to avoid it though. */
404       res = close (unix_stream->priv->fd);
405       if (res == -1)
406         {
407           int errsv = errno;
408
409           g_set_error (error, G_IO_ERROR,
410                        g_io_error_from_errno (errsv),
411                        _("Error closing unix: %s"),
412                        g_strerror (errsv));
413         }
414       break;
415     }
416   
417   return res != -1;
418 }
419
420 typedef struct {
421   gsize count;
422   void *buffer;
423   GAsyncReadyCallback callback;
424   gpointer user_data;
425   GCancellable *cancellable;
426   GUnixInputStream *stream;
427 } ReadAsyncData;
428
429 static gboolean
430 read_async_cb (ReadAsyncData *data,
431                GIOCondition   condition,
432                int            fd)
433 {
434   GSimpleAsyncResult *simple;
435   GError *error = NULL;
436   gssize count_read;
437
438   /* We know that we can read from fd once without blocking */
439   while (1)
440     {
441       if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
442         {
443           count_read = -1;
444           break;
445         }
446       count_read = read (data->stream->priv->fd, data->buffer, data->count);
447       if (count_read == -1)
448         {
449           int errsv = errno;
450
451           if (errsv == EINTR)
452             continue;
453           
454           g_set_error (&error, G_IO_ERROR,
455                        g_io_error_from_errno (errsv),
456                        _("Error reading from unix: %s"),
457                        g_strerror (errsv));
458         }
459       break;
460     }
461
462   simple = g_simple_async_result_new (G_OBJECT (data->stream),
463                                       data->callback,
464                                       data->user_data,
465                                       g_unix_input_stream_read_async);
466
467   g_simple_async_result_set_op_res_gssize (simple, count_read);
468
469   if (count_read == -1)
470     {
471       g_simple_async_result_set_from_error (simple, error);
472       g_error_free (error);
473     }
474
475   /* Complete immediately, not in idle, since we're already in a mainloop callout */
476   g_simple_async_result_complete (simple);
477   g_object_unref (simple);
478
479   return FALSE;
480 }
481
482 static void
483 g_unix_input_stream_read_async (GInputStream        *stream,
484                                 void                *buffer,
485                                 gsize                count,
486                                 int                  io_priority,
487                                 GCancellable        *cancellable,
488                                 GAsyncReadyCallback  callback,
489                                 gpointer             user_data)
490 {
491   GSource *source;
492   GUnixInputStream *unix_stream;
493   ReadAsyncData *data;
494
495   unix_stream = G_UNIX_INPUT_STREAM (stream);
496
497   data = g_new0 (ReadAsyncData, 1);
498   data->count = count;
499   data->buffer = buffer;
500   data->callback = callback;
501   data->user_data = user_data;
502   data->cancellable = cancellable;
503   data->stream = unix_stream;
504
505   source = _g_fd_source_new (unix_stream->priv->fd,
506                              G_IO_IN,
507                              cancellable);
508   
509   g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
510   g_source_attach (source, g_main_context_get_thread_default ());
511  
512   g_source_unref (source);
513 }
514
515 static gssize
516 g_unix_input_stream_read_finish (GInputStream  *stream,
517                                  GAsyncResult  *result,
518                                  GError       **error)
519 {
520   GSimpleAsyncResult *simple;
521   gssize nread;
522
523   simple = G_SIMPLE_ASYNC_RESULT (result);
524   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);
525   
526   nread = g_simple_async_result_get_op_res_gssize (simple);
527   return nread;
528 }
529
530 static void
531 g_unix_input_stream_skip_async (GInputStream        *stream,
532                                 gsize                count,
533                                 int                  io_priority,
534                                 GCancellable        *cancellable,
535                                 GAsyncReadyCallback  callback,
536                                 gpointer             data)
537 {
538   g_warn_if_reached ();
539   /* TODO: Not implemented */
540 }
541
542 static gssize
543 g_unix_input_stream_skip_finish  (GInputStream  *stream,
544                                   GAsyncResult  *result,
545                                   GError       **error)
546 {
547   g_warn_if_reached ();
548   return 0;
549   /* TODO: Not implemented */
550 }
551
552
553 typedef struct {
554   GInputStream *stream;
555   GAsyncReadyCallback callback;
556   gpointer user_data;
557 } CloseAsyncData;
558
559 static void
560 close_async_data_free (gpointer _data)
561 {
562   CloseAsyncData *data = _data;
563
564   g_free (data);
565 }
566
567 static gboolean
568 close_async_cb (CloseAsyncData *data)
569 {
570   GUnixInputStream *unix_stream;
571   GSimpleAsyncResult *simple;
572   GError *error = NULL;
573   gboolean result;
574   int res;
575
576   unix_stream = G_UNIX_INPUT_STREAM (data->stream);
577
578   if (!unix_stream->priv->close_fd)
579     {
580       result = TRUE;
581       goto out;
582     }
583   
584   while (1)
585     {
586       res = close (unix_stream->priv->fd);
587       if (res == -1)
588         {
589           int errsv = errno;
590
591           g_set_error (&error, G_IO_ERROR,
592                        g_io_error_from_errno (errsv),
593                        _("Error closing unix: %s"),
594                        g_strerror (errsv));
595         }
596       break;
597     }
598   
599   result = res != -1;
600
601  out:
602   simple = g_simple_async_result_new (G_OBJECT (data->stream),
603                                       data->callback,
604                                       data->user_data,
605                                       g_unix_input_stream_close_async);
606
607   if (!result)
608     {
609       g_simple_async_result_set_from_error (simple, error);
610       g_error_free (error);
611     }
612
613   /* Complete immediately, not in idle, since we're already in a mainloop callout */
614   g_simple_async_result_complete (simple);
615   g_object_unref (simple);
616   
617   return FALSE;
618 }
619
620 static void
621 g_unix_input_stream_close_async (GInputStream        *stream,
622                                  int                  io_priority,
623                                  GCancellable        *cancellable,
624                                  GAsyncReadyCallback  callback,
625                                  gpointer             user_data)
626 {
627   GSource *idle;
628   CloseAsyncData *data;
629
630   data = g_new0 (CloseAsyncData, 1);
631
632   data->stream = stream;
633   data->callback = callback;
634   data->user_data = user_data;
635   
636   idle = g_idle_source_new ();
637   g_source_set_callback (idle, (GSourceFunc)close_async_cb, data, close_async_data_free);
638   g_source_attach (idle, g_main_context_get_thread_default ());
639   g_source_unref (idle);
640 }
641
642 static gboolean
643 g_unix_input_stream_close_finish (GInputStream  *stream,
644                                   GAsyncResult  *result,
645                                   GError       **error)
646 {
647   /* Failures handled in generic close_finish code */
648   return TRUE;
649 }
650
651 #define __G_UNIX_INPUT_STREAM_C__
652 #include "gioaliasdef.c"