f7006cb1fc210b952f1beffd3471d8deccedea46
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gcancellable.h"
30 #include "gasyncresult.h"
31 #include "gioerror.h"
32 #include "gpollableinputstream.h"
33
34 /**
35  * SECTION:ginputstream
36  * @short_description: Base class for implementing streaming input
37  * @include: gio/gio.h
38  *
39  * #GInputStream has functions to read from a stream (g_input_stream_read()),
40  * to close a stream (g_input_stream_close()) and to skip some content
41  * (g_input_stream_skip()). 
42  *
43  * To copy the content of an input stream to an output stream without 
44  * manually handling the reads and writes, use g_output_stream_splice(). 
45  *
46  * All of these functions have async variants too.
47  **/
48
49 G_DEFINE_ABSTRACT_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
50
51 struct _GInputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
58                                                   gsize                 count,
59                                                   GCancellable         *cancellable,
60                                                   GError              **error);
61 static void     g_input_stream_real_read_async   (GInputStream         *stream,
62                                                   void                 *buffer,
63                                                   gsize                 count,
64                                                   int                   io_priority,
65                                                   GCancellable         *cancellable,
66                                                   GAsyncReadyCallback   callback,
67                                                   gpointer              user_data);
68 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
69                                                   GAsyncResult         *result,
70                                                   GError              **error);
71 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
72                                                   gsize                 count,
73                                                   int                   io_priority,
74                                                   GCancellable         *cancellable,
75                                                   GAsyncReadyCallback   callback,
76                                                   gpointer              data);
77 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
78                                                   GAsyncResult         *result,
79                                                   GError              **error);
80 static void     g_input_stream_real_close_async  (GInputStream         *stream,
81                                                   int                   io_priority,
82                                                   GCancellable         *cancellable,
83                                                   GAsyncReadyCallback   callback,
84                                                   gpointer              data);
85 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
86                                                   GAsyncResult         *result,
87                                                   GError              **error);
88
89 static void
90 g_input_stream_finalize (GObject *object)
91 {
92   G_OBJECT_CLASS (g_input_stream_parent_class)->finalize (object);
93 }
94
95 static void
96 g_input_stream_dispose (GObject *object)
97 {
98   GInputStream *stream;
99
100   stream = G_INPUT_STREAM (object);
101   
102   if (!stream->priv->closed)
103     g_input_stream_close (stream, NULL, NULL);
104
105   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
106 }
107
108
109 static void
110 g_input_stream_class_init (GInputStreamClass *klass)
111 {
112   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
113   
114   g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
115   
116   gobject_class->finalize = g_input_stream_finalize;
117   gobject_class->dispose = g_input_stream_dispose;
118   
119   klass->skip = g_input_stream_real_skip;
120   klass->read_async = g_input_stream_real_read_async;
121   klass->read_finish = g_input_stream_real_read_finish;
122   klass->skip_async = g_input_stream_real_skip_async;
123   klass->skip_finish = g_input_stream_real_skip_finish;
124   klass->close_async = g_input_stream_real_close_async;
125   klass->close_finish = g_input_stream_real_close_finish;
126 }
127
128 static void
129 g_input_stream_init (GInputStream *stream)
130 {
131   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
132                                               G_TYPE_INPUT_STREAM,
133                                               GInputStreamPrivate);
134 }
135
136 /**
137  * g_input_stream_read:
138  * @stream: a #GInputStream.
139  * @buffer: a buffer to read data into (which should be at least count bytes long).
140  * @count: the number of bytes that will be read from the stream
141  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
142  * @error: location to store the error occurring, or %NULL to ignore
143  *
144  * Tries to read @count bytes from the stream into the buffer starting at
145  * @buffer. Will block during this read.
146  * 
147  * If count is zero returns zero and does nothing. A value of @count
148  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
149  *
150  * On success, the number of bytes read into the buffer is returned.
151  * It is not an error if this is not the same as the requested size, as it
152  * can happen e.g. near the end of a file. Zero is returned on end of file
153  * (or if @count is zero),  but never otherwise.
154  *
155  * If @cancellable is not %NULL, then the operation can be cancelled by
156  * triggering the cancellable object from another thread. If the operation
157  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
158  * operation was partially finished when the operation was cancelled the
159  * partial result will be returned, without an error.
160  *
161  * On error -1 is returned and @error is set accordingly.
162  * 
163  * Return value: Number of bytes read, or -1 on error, or 0 on end of file.
164  **/
165 gssize
166 g_input_stream_read  (GInputStream  *stream,
167                       void          *buffer,
168                       gsize          count,
169                       GCancellable  *cancellable,
170                       GError       **error)
171 {
172   GInputStreamClass *class;
173   gssize res;
174
175   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
176   g_return_val_if_fail (buffer != NULL, 0);
177
178   if (count == 0)
179     return 0;
180   
181   if (((gssize) count) < 0)
182     {
183       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
184                    _("Too large count value passed to %s"), G_STRFUNC);
185       return -1;
186     }
187
188   class = G_INPUT_STREAM_GET_CLASS (stream);
189
190   if (class->read_fn == NULL) 
191     {
192       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
193                            _("Input stream doesn't implement read"));
194       return -1;
195     }
196
197   if (!g_input_stream_set_pending (stream, error))
198     return -1;
199
200   if (cancellable)
201     g_cancellable_push_current (cancellable);
202   
203   res = class->read_fn (stream, buffer, count, cancellable, error);
204
205   if (cancellable)
206     g_cancellable_pop_current (cancellable);
207   
208   g_input_stream_clear_pending (stream);
209
210   return res;
211 }
212
213 /**
214  * g_input_stream_read_all:
215  * @stream: a #GInputStream.
216  * @buffer: a buffer to read data into (which should be at least count bytes long).
217  * @count: the number of bytes that will be read from the stream
218  * @bytes_read: (out): location to store the number of bytes that was read from the stream
219  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
220  * @error: location to store the error occurring, or %NULL to ignore
221  *
222  * Tries to read @count bytes from the stream into the buffer starting at
223  * @buffer. Will block during this read.
224  *
225  * This function is similar to g_input_stream_read(), except it tries to
226  * read as many bytes as requested, only stopping on an error or end of stream.
227  *
228  * On a successful read of @count bytes, or if we reached the end of the
229  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
230  * read into @buffer.
231  * 
232  * If there is an error during the operation %FALSE is returned and @error
233  * is set to indicate the error status, @bytes_read is updated to contain
234  * the number of bytes read into @buffer before the error occurred.
235  *
236  * Return value: %TRUE on success, %FALSE if there was an error
237  **/
238 gboolean
239 g_input_stream_read_all (GInputStream  *stream,
240                          void          *buffer,
241                          gsize          count,
242                          gsize         *bytes_read,
243                          GCancellable  *cancellable,
244                          GError       **error)
245 {
246   gsize _bytes_read;
247   gssize res;
248
249   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
250   g_return_val_if_fail (buffer != NULL, FALSE);
251
252   _bytes_read = 0;
253   while (_bytes_read < count)
254     {
255       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
256                                  cancellable, error);
257       if (res == -1)
258         {
259           if (bytes_read)
260             *bytes_read = _bytes_read;
261           return FALSE;
262         }
263       
264       if (res == 0)
265         break;
266
267       _bytes_read += res;
268     }
269
270   if (bytes_read)
271     *bytes_read = _bytes_read;
272   return TRUE;
273 }
274
275 /**
276  * g_input_stream_read_bytes:
277  * @stream: a #GInputStream.
278  * @count: maximum number of bytes that will be read from the stream. Common
279  * values include 4096 and 8192.
280  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
281  * @error: location to store the error occurring, or %NULL to ignore
282  *
283  * Like g_input_stream_read(), this tries to read @count bytes from
284  * the stream in a blocking fashion. However, rather than reading into
285  * a user-supplied buffer, this will create a new #GBytes containing
286  * the data that was read. This may be easier to use from language
287  * bindings.
288  *
289  * If count is zero, returns a zero-length #GBytes and does nothing. A
290  * value of @count larger than %G_MAXSSIZE will cause a
291  * %G_IO_ERROR_INVALID_ARGUMENT error.
292  *
293  * On success, a new #GBytes is returned. It is not an error if the
294  * size of this object is not the same as the requested size, as it
295  * can happen e.g. near the end of a file. A zero-length #GBytes is
296  * returned on end of file (or if @count is zero), but never
297  * otherwise.
298  *
299  * If @cancellable is not %NULL, then the operation can be cancelled by
300  * triggering the cancellable object from another thread. If the operation
301  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
302  * operation was partially finished when the operation was cancelled the
303  * partial result will be returned, without an error.
304  *
305  * On error %NULL is returned and @error is set accordingly.
306  *
307  * Return value: a new #GBytes, or %NULL on error
308  **/
309 GBytes *
310 g_input_stream_read_bytes (GInputStream  *stream,
311                            gsize          count,
312                            GCancellable  *cancellable,
313                            GError       **error)
314 {
315   guchar *buf;
316   gssize nread;
317
318   buf = g_malloc (count);
319   nread = g_input_stream_read (stream, buf, count, cancellable, error);
320   if (nread == -1)
321     {
322       g_free (buf);
323       return NULL;
324     }
325   else if (nread == 0)
326     {
327       g_free (buf);
328       return g_bytes_new_static ("", 0);
329     }
330   else
331     return g_bytes_new_take (buf, nread);
332 }
333
334 /**
335  * g_input_stream_skip:
336  * @stream: a #GInputStream.
337  * @count: the number of bytes that will be skipped from the stream
338  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
339  * @error: location to store the error occurring, or %NULL to ignore
340  *
341  * Tries to skip @count bytes from the stream. Will block during the operation.
342  *
343  * This is identical to g_input_stream_read(), from a behaviour standpoint,
344  * but the bytes that are skipped are not returned to the user. Some
345  * streams have an implementation that is more efficient than reading the data.
346  *
347  * This function is optional for inherited classes, as the default implementation
348  * emulates it using read.
349  *
350  * If @cancellable is not %NULL, then the operation can be cancelled by
351  * triggering the cancellable object from another thread. If the operation
352  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
353  * operation was partially finished when the operation was cancelled the
354  * partial result will be returned, without an error.
355  *
356  * Return value: Number of bytes skipped, or -1 on error
357  **/
358 gssize
359 g_input_stream_skip (GInputStream  *stream,
360                      gsize          count,
361                      GCancellable  *cancellable,
362                      GError       **error)
363 {
364   GInputStreamClass *class;
365   gssize res;
366
367   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
368
369   if (count == 0)
370     return 0;
371
372   if (((gssize) count) < 0)
373     {
374       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
375                    _("Too large count value passed to %s"), G_STRFUNC);
376       return -1;
377     }
378   
379   class = G_INPUT_STREAM_GET_CLASS (stream);
380
381   if (!g_input_stream_set_pending (stream, error))
382     return -1;
383
384   if (cancellable)
385     g_cancellable_push_current (cancellable);
386   
387   res = class->skip (stream, count, cancellable, error);
388
389   if (cancellable)
390     g_cancellable_pop_current (cancellable);
391   
392   g_input_stream_clear_pending (stream);
393
394   return res;
395 }
396
397 static gssize
398 g_input_stream_real_skip (GInputStream  *stream,
399                           gsize          count,
400                           GCancellable  *cancellable,
401                           GError       **error)
402 {
403   GInputStreamClass *class;
404   gssize ret, read_bytes;
405   char buffer[8192];
406   GError *my_error;
407
408   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
409     {
410       if (g_seekable_seek (G_SEEKABLE (stream),
411                            count,
412                            G_SEEK_CUR,
413                            cancellable,
414                            NULL))
415         return count;
416     }
417
418   /* If not seekable, or seek failed, fall back to reading data: */
419
420   class = G_INPUT_STREAM_GET_CLASS (stream);
421
422   read_bytes = 0;
423   while (1)
424     {
425       my_error = NULL;
426
427       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
428                             cancellable, &my_error);
429       if (ret == -1)
430         {
431           if (read_bytes > 0 &&
432               my_error->domain == G_IO_ERROR &&
433               my_error->code == G_IO_ERROR_CANCELLED)
434             {
435               g_error_free (my_error);
436               return read_bytes;
437             }
438
439           g_propagate_error (error, my_error);
440           return -1;
441         }
442
443       count -= ret;
444       read_bytes += ret;
445
446       if (ret == 0 || count == 0)
447         return read_bytes;
448     }
449 }
450
451 /**
452  * g_input_stream_close:
453  * @stream: A #GInputStream.
454  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
455  * @error: location to store the error occurring, or %NULL to ignore
456  *
457  * Closes the stream, releasing resources related to it.
458  *
459  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
460  * Closing a stream multiple times will not return an error.
461  *
462  * Streams will be automatically closed when the last reference
463  * is dropped, but you might want to call this function to make sure 
464  * resources are released as early as possible.
465  *
466  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
467  * open after the stream is closed. See the documentation for the individual
468  * stream for details.
469  *
470  * On failure the first error that happened will be reported, but the close
471  * operation will finish as much as possible. A stream that failed to
472  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
473  * is important to check and report the error to the user.
474  *
475  * If @cancellable is not %NULL, then the operation can be cancelled by
476  * triggering the cancellable object from another thread. If the operation
477  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
478  * Cancelling a close will still leave the stream closed, but some streams
479  * can use a faster close that doesn't block to e.g. check errors. 
480  *
481  * Return value: %TRUE on success, %FALSE on failure
482  **/
483 gboolean
484 g_input_stream_close (GInputStream  *stream,
485                       GCancellable  *cancellable,
486                       GError       **error)
487 {
488   GInputStreamClass *class;
489   gboolean res;
490
491   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
492
493   class = G_INPUT_STREAM_GET_CLASS (stream);
494
495   if (stream->priv->closed)
496     return TRUE;
497
498   res = TRUE;
499
500   if (!g_input_stream_set_pending (stream, error))
501     return FALSE;
502
503   if (cancellable)
504     g_cancellable_push_current (cancellable);
505
506   if (class->close_fn)
507     res = class->close_fn (stream, cancellable, error);
508
509   if (cancellable)
510     g_cancellable_pop_current (cancellable);
511
512   g_input_stream_clear_pending (stream);
513   
514   stream->priv->closed = TRUE;
515   
516   return res;
517 }
518
519 static void
520 async_ready_callback_wrapper (GObject      *source_object,
521                               GAsyncResult *res,
522                               gpointer      user_data)
523 {
524   GInputStream *stream = G_INPUT_STREAM (source_object);
525
526   g_input_stream_clear_pending (stream);
527   if (stream->priv->outstanding_callback)
528     (*stream->priv->outstanding_callback) (source_object, res, user_data);
529   g_object_unref (stream);
530 }
531
532 static void
533 async_ready_close_callback_wrapper (GObject      *source_object,
534                                     GAsyncResult *res,
535                                     gpointer      user_data)
536 {
537   GInputStream *stream = G_INPUT_STREAM (source_object);
538
539   g_input_stream_clear_pending (stream);
540   stream->priv->closed = TRUE;
541   if (stream->priv->outstanding_callback)
542     (*stream->priv->outstanding_callback) (source_object, res, user_data);
543   g_object_unref (stream);
544 }
545
546 /**
547  * g_input_stream_read_async:
548  * @stream: A #GInputStream.
549  * @buffer: a buffer to read data into (which should be at least count bytes long).
550  * @count: the number of bytes that will be read from the stream
551  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
552  * of the request. 
553  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
554  * @callback: (scope async): callback to call when the request is satisfied
555  * @user_data: (closure): the data to pass to callback function
556  *
557  * Request an asynchronous read of @count bytes from the stream into the buffer
558  * starting at @buffer. When the operation is finished @callback will be called. 
559  * You can then call g_input_stream_read_finish() to get the result of the 
560  * operation.
561  *
562  * During an async request no other sync and async calls are allowed on @stream, and will
563  * result in %G_IO_ERROR_PENDING errors. 
564  *
565  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
566  *
567  * On success, the number of bytes read into the buffer will be passed to the
568  * callback. It is not an error if this is not the same as the requested size, as it
569  * can happen e.g. near the end of a file, but generally we try to read
570  * as many bytes as requested. Zero is returned on end of file
571  * (or if @count is zero),  but never otherwise.
572  *
573  * Any outstanding i/o request with higher priority (lower numerical value) will
574  * be executed before an outstanding request with lower priority. Default
575  * priority is %G_PRIORITY_DEFAULT.
576  *
577  * The asyncronous methods have a default fallback that uses threads to implement
578  * asynchronicity, so they are optional for inheriting classes. However, if you
579  * override one you must override all.
580  **/
581 void
582 g_input_stream_read_async (GInputStream        *stream,
583                            void                *buffer,
584                            gsize                count,
585                            int                  io_priority,
586                            GCancellable        *cancellable,
587                            GAsyncReadyCallback  callback,
588                            gpointer             user_data)
589 {
590   GInputStreamClass *class;
591   GError *error = NULL;
592
593   g_return_if_fail (G_IS_INPUT_STREAM (stream));
594   g_return_if_fail (buffer != NULL);
595
596   if (count == 0)
597     {
598       GTask *task;
599
600       task = g_task_new (stream, cancellable, callback, user_data);
601       g_task_set_source_tag (task, g_input_stream_read_async);
602       g_task_return_int (task, 0);
603       g_object_unref (task);
604       return;
605     }
606   
607   if (((gssize) count) < 0)
608     {
609       g_task_report_new_error (stream, callback, user_data,
610                                g_input_stream_read_async,
611                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
612                                _("Too large count value passed to %s"),
613                                G_STRFUNC);
614       return;
615     }
616
617   if (!g_input_stream_set_pending (stream, &error))
618     {
619       g_task_report_error (stream, callback, user_data,
620                            g_input_stream_read_async,
621                            error);
622       return;
623     }
624
625   class = G_INPUT_STREAM_GET_CLASS (stream);
626   stream->priv->outstanding_callback = callback;
627   g_object_ref (stream);
628   class->read_async (stream, buffer, count, io_priority, cancellable,
629                      async_ready_callback_wrapper, user_data);
630 }
631
632 /**
633  * g_input_stream_read_finish:
634  * @stream: a #GInputStream.
635  * @result: a #GAsyncResult.
636  * @error: a #GError location to store the error occurring, or %NULL to 
637  * ignore.
638  * 
639  * Finishes an asynchronous stream read operation. 
640  * 
641  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
642  **/
643 gssize
644 g_input_stream_read_finish (GInputStream  *stream,
645                             GAsyncResult  *result,
646                             GError       **error)
647 {
648   GInputStreamClass *class;
649   
650   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
651   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
652
653   if (g_async_result_legacy_propagate_error (result, error))
654     return -1;
655   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
656     return g_task_propagate_int (G_TASK (result), error);
657
658   class = G_INPUT_STREAM_GET_CLASS (stream);
659   return class->read_finish (stream, result, error);
660 }
661
662 static void
663 read_bytes_callback (GObject      *stream,
664                      GAsyncResult *result,
665                      gpointer      user_data)
666 {
667   GTask *task = user_data;
668   guchar *buf = g_task_get_task_data (task);
669   GError *error = NULL;
670   gssize nread;
671   GBytes *bytes = NULL;
672
673   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
674                                       result, &error);
675   if (nread == -1)
676     {
677       g_free (buf);
678       g_task_return_error (task, error);
679     }
680   else if (nread == 0)
681     {
682       g_free (buf);
683       bytes = g_bytes_new_static ("", 0);
684     }
685   else
686     bytes = g_bytes_new_take (buf, nread);
687
688   if (bytes)
689     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
690
691   g_object_unref (task);
692 }
693
694 /**
695  * g_input_stream_read_bytes_async:
696  * @stream: A #GInputStream.
697  * @count: the number of bytes that will be read from the stream
698  * @io_priority: the <link linkend="io-priority">I/O priority</link>
699  *   of the request.
700  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
701  * @callback: (scope async): callback to call when the request is satisfied
702  * @user_data: (closure): the data to pass to callback function
703  *
704  * Request an asynchronous read of @count bytes from the stream into a
705  * new #GBytes. When the operation is finished @callback will be
706  * called. You can then call g_input_stream_read_bytes_finish() to get the
707  * result of the operation.
708  *
709  * During an async request no other sync and async calls are allowed
710  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
711  *
712  * A value of @count larger than %G_MAXSSIZE will cause a
713  * %G_IO_ERROR_INVALID_ARGUMENT error.
714  *
715  * On success, the new #GBytes will be passed to the callback. It is
716  * not an error if this is smaller than the requested size, as it can
717  * happen e.g. near the end of a file, but generally we try to read as
718  * many bytes as requested. Zero is returned on end of file (or if
719  * @count is zero), but never otherwise.
720  *
721  * Any outstanding I/O request with higher priority (lower numerical
722  * value) will be executed before an outstanding request with lower
723  * priority. Default priority is %G_PRIORITY_DEFAULT.
724  **/
725 void
726 g_input_stream_read_bytes_async (GInputStream          *stream,
727                                  gsize                  count,
728                                  int                    io_priority,
729                                  GCancellable          *cancellable,
730                                  GAsyncReadyCallback    callback,
731                                  gpointer               user_data)
732 {
733   GTask *task;
734   guchar *buf;
735
736   task = g_task_new (stream, cancellable, callback, user_data);
737   buf = g_malloc (count);
738   g_task_set_task_data (task, buf, NULL);
739
740   g_input_stream_read_async (stream, buf, count,
741                              io_priority, cancellable,
742                              read_bytes_callback, task);
743 }
744
745 /**
746  * g_input_stream_read_bytes_finish:
747  * @stream: a #GInputStream.
748  * @result: a #GAsyncResult.
749  * @error: a #GError location to store the error occurring, or %NULL to
750  *   ignore.
751  *
752  * Finishes an asynchronous stream read-into-#GBytes operation.
753  *
754  * Returns: the newly-allocated #GBytes, or %NULL on error
755  **/
756 GBytes *
757 g_input_stream_read_bytes_finish (GInputStream  *stream,
758                                   GAsyncResult  *result,
759                                   GError       **error)
760 {
761   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
762   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
763
764   return g_task_propagate_pointer (G_TASK (result), error);
765 }
766
767 /**
768  * g_input_stream_skip_async:
769  * @stream: A #GInputStream.
770  * @count: the number of bytes that will be skipped from the stream
771  * @io_priority: the <link linkend="io-priority">I/O priority</link>
772  * of the request.
773  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
774  * @callback: (scope async): callback to call when the request is satisfied
775  * @user_data: (closure): the data to pass to callback function
776  *
777  * Request an asynchronous skip of @count bytes from the stream.
778  * When the operation is finished @callback will be called.
779  * You can then call g_input_stream_skip_finish() to get the result
780  * of the operation.
781  *
782  * During an async request no other sync and async calls are allowed,
783  * and will result in %G_IO_ERROR_PENDING errors.
784  *
785  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
786  *
787  * On success, the number of bytes skipped will be passed to the callback.
788  * It is not an error if this is not the same as the requested size, as it
789  * can happen e.g. near the end of a file, but generally we try to skip
790  * as many bytes as requested. Zero is returned on end of file
791  * (or if @count is zero), but never otherwise.
792  *
793  * Any outstanding i/o request with higher priority (lower numerical value)
794  * will be executed before an outstanding request with lower priority.
795  * Default priority is %G_PRIORITY_DEFAULT.
796  *
797  * The asynchronous methods have a default fallback that uses threads to
798  * implement asynchronicity, so they are optional for inheriting classes.
799  * However, if you override one, you must override all.
800  **/
801 void
802 g_input_stream_skip_async (GInputStream        *stream,
803                            gsize                count,
804                            int                  io_priority,
805                            GCancellable        *cancellable,
806                            GAsyncReadyCallback  callback,
807                            gpointer             user_data)
808 {
809   GInputStreamClass *class;
810   GError *error = NULL;
811
812   g_return_if_fail (G_IS_INPUT_STREAM (stream));
813
814   if (count == 0)
815     {
816       GTask *task;
817
818       task = g_task_new (stream, cancellable, callback, user_data);
819       g_task_set_source_tag (task, g_input_stream_skip_async);
820       g_task_return_int (task, 0);
821       g_object_unref (task);
822       return;
823     }
824   
825   if (((gssize) count) < 0)
826     {
827       g_task_report_new_error (stream, callback, user_data,
828                                g_input_stream_skip_async,
829                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
830                                _("Too large count value passed to %s"),
831                                G_STRFUNC);
832       return;
833     }
834
835   if (!g_input_stream_set_pending (stream, &error))
836     {
837       g_task_report_error (stream, callback, user_data,
838                            g_input_stream_skip_async,
839                            error);
840       return;
841     }
842
843   class = G_INPUT_STREAM_GET_CLASS (stream);
844   stream->priv->outstanding_callback = callback;
845   g_object_ref (stream);
846   class->skip_async (stream, count, io_priority, cancellable,
847                      async_ready_callback_wrapper, user_data);
848 }
849
850 /**
851  * g_input_stream_skip_finish:
852  * @stream: a #GInputStream.
853  * @result: a #GAsyncResult.
854  * @error: a #GError location to store the error occurring, or %NULL to 
855  * ignore.
856  * 
857  * Finishes a stream skip operation.
858  * 
859  * Returns: the size of the bytes skipped, or %-1 on error.
860  **/
861 gssize
862 g_input_stream_skip_finish (GInputStream  *stream,
863                             GAsyncResult  *result,
864                             GError       **error)
865 {
866   GInputStreamClass *class;
867
868   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
869   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
870
871   if (g_async_result_legacy_propagate_error (result, error))
872     return -1;
873   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
874     return g_task_propagate_int (G_TASK (result), error);
875
876   class = G_INPUT_STREAM_GET_CLASS (stream);
877   return class->skip_finish (stream, result, error);
878 }
879
880 /**
881  * g_input_stream_close_async:
882  * @stream: A #GInputStream.
883  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
884  * of the request. 
885  * @cancellable: (allow-none): optional cancellable object
886  * @callback: (scope async): callback to call when the request is satisfied
887  * @user_data: (closure): the data to pass to callback function
888  *
889  * Requests an asynchronous closes of the stream, releasing resources related to it.
890  * When the operation is finished @callback will be called. 
891  * You can then call g_input_stream_close_finish() to get the result of the 
892  * operation.
893  *
894  * For behaviour details see g_input_stream_close().
895  *
896  * The asyncronous methods have a default fallback that uses threads to implement
897  * asynchronicity, so they are optional for inheriting classes. However, if you
898  * override one you must override all.
899  **/
900 void
901 g_input_stream_close_async (GInputStream        *stream,
902                             int                  io_priority,
903                             GCancellable        *cancellable,
904                             GAsyncReadyCallback  callback,
905                             gpointer             user_data)
906 {
907   GInputStreamClass *class;
908   GError *error = NULL;
909
910   g_return_if_fail (G_IS_INPUT_STREAM (stream));
911
912   if (stream->priv->closed)
913     {
914       GTask *task;
915
916       task = g_task_new (stream, cancellable, callback, user_data);
917       g_task_set_source_tag (task, g_input_stream_close_async);
918       g_task_return_boolean (task, TRUE);
919       g_object_unref (task);
920       return;
921     }
922
923   if (!g_input_stream_set_pending (stream, &error))
924     {
925       g_task_report_error (stream, callback, user_data,
926                            g_input_stream_close_async,
927                            error);
928       return;
929     }
930   
931   class = G_INPUT_STREAM_GET_CLASS (stream);
932   stream->priv->outstanding_callback = callback;
933   g_object_ref (stream);
934   class->close_async (stream, io_priority, cancellable,
935                       async_ready_close_callback_wrapper, user_data);
936 }
937
938 /**
939  * g_input_stream_close_finish:
940  * @stream: a #GInputStream.
941  * @result: a #GAsyncResult.
942  * @error: a #GError location to store the error occurring, or %NULL to 
943  * ignore.
944  * 
945  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
946  * 
947  * Returns: %TRUE if the stream was closed successfully.
948  **/
949 gboolean
950 g_input_stream_close_finish (GInputStream  *stream,
951                              GAsyncResult  *result,
952                              GError       **error)
953 {
954   GInputStreamClass *class;
955
956   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
957   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
958
959   if (g_async_result_legacy_propagate_error (result, error))
960     return FALSE;
961   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
962     return g_task_propagate_boolean (G_TASK (result), error);
963
964   class = G_INPUT_STREAM_GET_CLASS (stream);
965   return class->close_finish (stream, result, error);
966 }
967
968 /**
969  * g_input_stream_is_closed:
970  * @stream: input stream.
971  * 
972  * Checks if an input stream is closed.
973  * 
974  * Returns: %TRUE if the stream is closed.
975  **/
976 gboolean
977 g_input_stream_is_closed (GInputStream *stream)
978 {
979   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
980   
981   return stream->priv->closed;
982 }
983  
984 /**
985  * g_input_stream_has_pending:
986  * @stream: input stream.
987  * 
988  * Checks if an input stream has pending actions.
989  * 
990  * Returns: %TRUE if @stream has pending actions.
991  **/  
992 gboolean
993 g_input_stream_has_pending (GInputStream *stream)
994 {
995   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
996   
997   return stream->priv->pending;
998 }
999
1000 /**
1001  * g_input_stream_set_pending:
1002  * @stream: input stream
1003  * @error: a #GError location to store the error occurring, or %NULL to 
1004  * ignore.
1005  * 
1006  * Sets @stream to have actions pending. If the pending flag is
1007  * already set or @stream is closed, it will return %FALSE and set
1008  * @error.
1009  *
1010  * Return value: %TRUE if pending was previously unset and is now set.
1011  **/
1012 gboolean
1013 g_input_stream_set_pending (GInputStream *stream, GError **error)
1014 {
1015   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1016   
1017   if (stream->priv->closed)
1018     {
1019       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1020                            _("Stream is already closed"));
1021       return FALSE;
1022     }
1023   
1024   if (stream->priv->pending)
1025     {
1026       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1027                 /* Translators: This is an error you get if there is already an
1028                  * operation running against this stream when you try to start
1029                  * one */
1030                  _("Stream has outstanding operation"));
1031       return FALSE;
1032     }
1033   
1034   stream->priv->pending = TRUE;
1035   return TRUE;
1036 }
1037
1038 /**
1039  * g_input_stream_clear_pending:
1040  * @stream: input stream
1041  * 
1042  * Clears the pending flag on @stream.
1043  **/
1044 void
1045 g_input_stream_clear_pending (GInputStream *stream)
1046 {
1047   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1048   
1049   stream->priv->pending = FALSE;
1050 }
1051
1052 /********************************************
1053  *   Default implementation of async ops    *
1054  ********************************************/
1055
1056 typedef struct {
1057   void   *buffer;
1058   gsize   count;
1059 } ReadData;
1060
1061 static void
1062 free_read_data (ReadData *op)
1063 {
1064   g_slice_free (ReadData, op);
1065 }
1066
1067 static void
1068 read_async_thread (GTask        *task,
1069                    gpointer      source_object,
1070                    gpointer      task_data,
1071                    GCancellable *cancellable)
1072 {
1073   GInputStream *stream = source_object;
1074   ReadData *op = task_data;
1075   GInputStreamClass *class;
1076   GError *error = NULL;
1077   gssize nread;
1078  
1079   class = G_INPUT_STREAM_GET_CLASS (stream);
1080
1081   nread = class->read_fn (stream,
1082                           op->buffer, op->count,
1083                           g_task_get_cancellable (task),
1084                           &error);
1085   if (nread == -1)
1086     g_task_return_error (task, error);
1087   else
1088     g_task_return_int (task, nread);
1089 }
1090
1091 static void read_async_pollable (GPollableInputStream *stream,
1092                                  GTask                *task);
1093
1094 static gboolean
1095 read_async_pollable_ready (GPollableInputStream *stream,
1096                            gpointer              user_data)
1097 {
1098   GTask *task = user_data;
1099
1100   read_async_pollable (stream, task);
1101   return FALSE;
1102 }
1103
1104 static void
1105 read_async_pollable (GPollableInputStream *stream,
1106                      GTask                *task)
1107 {
1108   ReadData *op = g_task_get_task_data (task);
1109   GError *error = NULL;
1110   gssize nread;
1111
1112   if (g_task_return_error_if_cancelled (task))
1113     return;
1114
1115   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1116     read_nonblocking (stream, op->buffer, op->count, &error);
1117
1118   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1119     {
1120       GSource *source;
1121
1122       g_error_free (error);
1123
1124       source = g_pollable_input_stream_create_source (stream,
1125                                                       g_task_get_cancellable (task));
1126       g_task_attach_source (task, source,
1127                             (GSourceFunc) read_async_pollable_ready);
1128       g_source_unref (source);
1129       return;
1130     }
1131
1132   if (nread == -1)
1133     g_task_return_error (task, error);
1134   else
1135     g_task_return_int (task, nread);
1136   /* g_input_stream_real_read_async() unrefs task */
1137 }
1138
1139 #define CAN_DO_NONBLOCKING_READS(stream) \
1140   (G_IS_POLLABLE_INPUT_STREAM (stream) && \
1141    g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
1142
1143
1144 static void
1145 g_input_stream_real_read_async (GInputStream        *stream,
1146                                 void                *buffer,
1147                                 gsize                count,
1148                                 int                  io_priority,
1149                                 GCancellable        *cancellable,
1150                                 GAsyncReadyCallback  callback,
1151                                 gpointer             user_data)
1152 {
1153   GTask *task;
1154   ReadData *op;
1155   
1156   op = g_slice_new0 (ReadData);
1157   task = g_task_new (stream, cancellable, callback, user_data);
1158   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1159   g_task_set_priority (task, io_priority);
1160   op->buffer = buffer;
1161   op->count = count;
1162
1163   if (CAN_DO_NONBLOCKING_READS (stream))
1164     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1165   else
1166     g_task_run_in_thread (task, read_async_thread);
1167   g_object_unref (task);
1168 }
1169
1170 static gssize
1171 g_input_stream_real_read_finish (GInputStream  *stream,
1172                                  GAsyncResult  *result,
1173                                  GError       **error)
1174 {
1175   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1176
1177   return g_task_propagate_int (G_TASK (result), error);
1178 }
1179
1180
1181 static void
1182 skip_async_thread (GTask        *task,
1183                    gpointer      source_object,
1184                    gpointer      task_data,
1185                    GCancellable *cancellable)
1186 {
1187   GInputStream *stream = source_object;
1188   gsize count = GPOINTER_TO_SIZE (task_data);
1189   GInputStreamClass *class;
1190   GError *error = NULL;
1191   gssize ret;
1192
1193   class = G_INPUT_STREAM_GET_CLASS (stream);
1194   ret = class->skip (stream, count,
1195                      g_task_get_cancellable (task),
1196                      &error);
1197   if (ret == -1)
1198     g_task_return_error (task, error);
1199   else
1200     g_task_return_int (task, ret);
1201 }
1202
1203 typedef struct {
1204   char buffer[8192];
1205   gsize count;
1206   gsize count_skipped;
1207 } SkipFallbackAsyncData;
1208
1209 static void
1210 skip_callback_wrapper (GObject      *source_object,
1211                        GAsyncResult *res,
1212                        gpointer      user_data)
1213 {
1214   GInputStreamClass *class;
1215   GTask *task = user_data;
1216   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1217   GError *error = NULL;
1218   gssize ret;
1219
1220   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1221
1222   if (ret > 0)
1223     {
1224       data->count -= ret;
1225       data->count_skipped += ret;
1226
1227       if (data->count > 0)
1228         {
1229           class = G_INPUT_STREAM_GET_CLASS (source_object);
1230           class->read_async (G_INPUT_STREAM (source_object),
1231                              data->buffer, MIN (8192, data->count),
1232                              g_task_get_priority (task),
1233                              g_task_get_cancellable (task),
1234                              skip_callback_wrapper, data);
1235           return;
1236         }
1237     }
1238
1239   if (ret == -1 &&
1240       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1241       data->count_skipped)
1242     {
1243       /* No error, return partial read */
1244       g_clear_error (&error);
1245     }
1246
1247   if (error)
1248     g_task_return_error (task, error);
1249   else
1250     g_task_return_int (task, data->count_skipped);
1251   g_object_unref (task);
1252  }
1253
1254 static void
1255 g_input_stream_real_skip_async (GInputStream        *stream,
1256                                 gsize                count,
1257                                 int                  io_priority,
1258                                 GCancellable        *cancellable,
1259                                 GAsyncReadyCallback  callback,
1260                                 gpointer             user_data)
1261 {
1262   GInputStreamClass *class;
1263   SkipFallbackAsyncData *data;
1264   GTask *task;
1265
1266   class = G_INPUT_STREAM_GET_CLASS (stream);
1267
1268   task = g_task_new (stream, cancellable, callback, user_data);
1269   g_task_set_priority (task, io_priority);
1270
1271   if (class->read_async == g_input_stream_real_read_async &&
1272       !CAN_DO_NONBLOCKING_READS (stream))
1273     {
1274       /* Read is thread-using async fallback.
1275        * Make skip use threads too, so that we can use a possible sync skip
1276        * implementation. */
1277       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1278
1279       g_task_run_in_thread (task, skip_async_thread);
1280       g_object_unref (task);
1281     }
1282   else
1283     {
1284       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1285       
1286       /* There is a custom async read function, lets use that. */
1287       data = g_new (SkipFallbackAsyncData, 1);
1288       data->count = count;
1289       data->count_skipped = 0;
1290       g_task_set_task_data (task, data, g_free);
1291       g_task_set_check_cancellable (task, FALSE);
1292       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1293                          skip_callback_wrapper, data);
1294     }
1295
1296 }
1297
1298 static gssize
1299 g_input_stream_real_skip_finish (GInputStream  *stream,
1300                                  GAsyncResult  *result,
1301                                  GError       **error)
1302 {
1303   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1304
1305   return g_task_propagate_int (G_TASK (result), error);
1306 }
1307
1308 static void
1309 close_async_thread (GTask        *task,
1310                     gpointer      source_object,
1311                     gpointer      task_data,
1312                     GCancellable *cancellable)
1313 {
1314   GInputStream *stream = source_object;
1315   GInputStreamClass *class;
1316   GError *error = NULL;
1317   gboolean result;
1318
1319   class = G_INPUT_STREAM_GET_CLASS (stream);
1320   if (class->close_fn)
1321     {
1322       result = class->close_fn (stream,
1323                                 g_task_get_cancellable (task),
1324                                 &error);
1325       if (!result)
1326         {
1327           g_task_return_error (task, error);
1328           return;
1329         }
1330     }
1331
1332   g_task_return_boolean (task, TRUE);
1333 }
1334
1335 static void
1336 g_input_stream_real_close_async (GInputStream        *stream,
1337                                  int                  io_priority,
1338                                  GCancellable        *cancellable,
1339                                  GAsyncReadyCallback  callback,
1340                                  gpointer             user_data)
1341 {
1342   GTask *task;
1343
1344   task = g_task_new (stream, cancellable, callback, user_data);
1345   g_task_set_check_cancellable (task, FALSE);
1346   g_task_set_priority (task, io_priority);
1347   
1348   g_task_run_in_thread (task, close_async_thread);
1349   g_object_unref (task);
1350 }
1351
1352 static gboolean
1353 g_input_stream_real_close_finish (GInputStream  *stream,
1354                                   GAsyncResult  *result,
1355                                   GError       **error)
1356 {
1357   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1358
1359   return g_task_propagate_boolean (G_TASK (result), error);
1360 }