gio: callback_data is the task not the task data.
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gcancellable.h"
30 #include "gasyncresult.h"
31 #include "gioerror.h"
32 #include "gpollableinputstream.h"
33
34 /**
35  * SECTION:ginputstream
36  * @short_description: Base class for implementing streaming input
37  * @include: gio/gio.h
38  *
39  * #GInputStream has functions to read from a stream (g_input_stream_read()),
40  * to close a stream (g_input_stream_close()) and to skip some content
41  * (g_input_stream_skip()). 
42  *
43  * To copy the content of an input stream to an output stream without 
44  * manually handling the reads and writes, use g_output_stream_splice(). 
45  *
46  * All of these functions have async variants too.
47  **/
48
49 G_DEFINE_ABSTRACT_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
50
51 struct _GInputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
58                                                   gsize                 count,
59                                                   GCancellable         *cancellable,
60                                                   GError              **error);
61 static void     g_input_stream_real_read_async   (GInputStream         *stream,
62                                                   void                 *buffer,
63                                                   gsize                 count,
64                                                   int                   io_priority,
65                                                   GCancellable         *cancellable,
66                                                   GAsyncReadyCallback   callback,
67                                                   gpointer              user_data);
68 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
69                                                   GAsyncResult         *result,
70                                                   GError              **error);
71 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
72                                                   gsize                 count,
73                                                   int                   io_priority,
74                                                   GCancellable         *cancellable,
75                                                   GAsyncReadyCallback   callback,
76                                                   gpointer              data);
77 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
78                                                   GAsyncResult         *result,
79                                                   GError              **error);
80 static void     g_input_stream_real_close_async  (GInputStream         *stream,
81                                                   int                   io_priority,
82                                                   GCancellable         *cancellable,
83                                                   GAsyncReadyCallback   callback,
84                                                   gpointer              data);
85 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
86                                                   GAsyncResult         *result,
87                                                   GError              **error);
88
89 static void
90 g_input_stream_finalize (GObject *object)
91 {
92   G_OBJECT_CLASS (g_input_stream_parent_class)->finalize (object);
93 }
94
95 static void
96 g_input_stream_dispose (GObject *object)
97 {
98   GInputStream *stream;
99
100   stream = G_INPUT_STREAM (object);
101   
102   if (!stream->priv->closed)
103     g_input_stream_close (stream, NULL, NULL);
104
105   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
106 }
107
108
109 static void
110 g_input_stream_class_init (GInputStreamClass *klass)
111 {
112   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
113   
114   g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
115   
116   gobject_class->finalize = g_input_stream_finalize;
117   gobject_class->dispose = g_input_stream_dispose;
118   
119   klass->skip = g_input_stream_real_skip;
120   klass->read_async = g_input_stream_real_read_async;
121   klass->read_finish = g_input_stream_real_read_finish;
122   klass->skip_async = g_input_stream_real_skip_async;
123   klass->skip_finish = g_input_stream_real_skip_finish;
124   klass->close_async = g_input_stream_real_close_async;
125   klass->close_finish = g_input_stream_real_close_finish;
126 }
127
128 static void
129 g_input_stream_init (GInputStream *stream)
130 {
131   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
132                                               G_TYPE_INPUT_STREAM,
133                                               GInputStreamPrivate);
134 }
135
136 /**
137  * g_input_stream_read:
138  * @stream: a #GInputStream.
139  * @buffer: (array length=count) (element-type guint8): a buffer to
140  *     read data into (which should be at least count bytes long).
141  * @count: the number of bytes that will be read from the stream
142  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
143  * @error: location to store the error occurring, or %NULL to ignore
144  *
145  * Tries to read @count bytes from the stream into the buffer starting at
146  * @buffer. Will block during this read.
147  * 
148  * If count is zero returns zero and does nothing. A value of @count
149  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
150  *
151  * On success, the number of bytes read into the buffer is returned.
152  * It is not an error if this is not the same as the requested size, as it
153  * can happen e.g. near the end of a file. Zero is returned on end of file
154  * (or if @count is zero),  but never otherwise.
155  *
156  * If @cancellable is not %NULL, then the operation can be cancelled by
157  * triggering the cancellable object from another thread. If the operation
158  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
159  * operation was partially finished when the operation was cancelled the
160  * partial result will be returned, without an error.
161  *
162  * On error -1 is returned and @error is set accordingly.
163  * 
164  * Return value: Number of bytes read, or -1 on error, or 0 on end of file.
165  **/
166 gssize
167 g_input_stream_read  (GInputStream  *stream,
168                       void          *buffer,
169                       gsize          count,
170                       GCancellable  *cancellable,
171                       GError       **error)
172 {
173   GInputStreamClass *class;
174   gssize res;
175
176   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
177   g_return_val_if_fail (buffer != NULL, 0);
178
179   if (count == 0)
180     return 0;
181   
182   if (((gssize) count) < 0)
183     {
184       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
185                    _("Too large count value passed to %s"), G_STRFUNC);
186       return -1;
187     }
188
189   class = G_INPUT_STREAM_GET_CLASS (stream);
190
191   if (class->read_fn == NULL) 
192     {
193       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
194                            _("Input stream doesn't implement read"));
195       return -1;
196     }
197
198   if (!g_input_stream_set_pending (stream, error))
199     return -1;
200
201   if (cancellable)
202     g_cancellable_push_current (cancellable);
203   
204   res = class->read_fn (stream, buffer, count, cancellable, error);
205
206   if (cancellable)
207     g_cancellable_pop_current (cancellable);
208   
209   g_input_stream_clear_pending (stream);
210
211   return res;
212 }
213
214 /**
215  * g_input_stream_read_all:
216  * @stream: a #GInputStream.
217  * @buffer: (array length=count) (element-type guint8): a buffer to
218  *     read data into (which should be at least count bytes long).
219  * @count: the number of bytes that will be read from the stream
220  * @bytes_read: (out): location to store the number of bytes that was read from the stream
221  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
222  * @error: location to store the error occurring, or %NULL to ignore
223  *
224  * Tries to read @count bytes from the stream into the buffer starting at
225  * @buffer. Will block during this read.
226  *
227  * This function is similar to g_input_stream_read(), except it tries to
228  * read as many bytes as requested, only stopping on an error or end of stream.
229  *
230  * On a successful read of @count bytes, or if we reached the end of the
231  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
232  * read into @buffer.
233  * 
234  * If there is an error during the operation %FALSE is returned and @error
235  * is set to indicate the error status, @bytes_read is updated to contain
236  * the number of bytes read into @buffer before the error occurred.
237  *
238  * Return value: %TRUE on success, %FALSE if there was an error
239  **/
240 gboolean
241 g_input_stream_read_all (GInputStream  *stream,
242                          void          *buffer,
243                          gsize          count,
244                          gsize         *bytes_read,
245                          GCancellable  *cancellable,
246                          GError       **error)
247 {
248   gsize _bytes_read;
249   gssize res;
250
251   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252   g_return_val_if_fail (buffer != NULL, FALSE);
253
254   _bytes_read = 0;
255   while (_bytes_read < count)
256     {
257       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258                                  cancellable, error);
259       if (res == -1)
260         {
261           if (bytes_read)
262             *bytes_read = _bytes_read;
263           return FALSE;
264         }
265       
266       if (res == 0)
267         break;
268
269       _bytes_read += res;
270     }
271
272   if (bytes_read)
273     *bytes_read = _bytes_read;
274   return TRUE;
275 }
276
277 /**
278  * g_input_stream_read_bytes:
279  * @stream: a #GInputStream.
280  * @count: maximum number of bytes that will be read from the stream. Common
281  * values include 4096 and 8192.
282  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
283  * @error: location to store the error occurring, or %NULL to ignore
284  *
285  * Like g_input_stream_read(), this tries to read @count bytes from
286  * the stream in a blocking fashion. However, rather than reading into
287  * a user-supplied buffer, this will create a new #GBytes containing
288  * the data that was read. This may be easier to use from language
289  * bindings.
290  *
291  * If count is zero, returns a zero-length #GBytes and does nothing. A
292  * value of @count larger than %G_MAXSSIZE will cause a
293  * %G_IO_ERROR_INVALID_ARGUMENT error.
294  *
295  * On success, a new #GBytes is returned. It is not an error if the
296  * size of this object is not the same as the requested size, as it
297  * can happen e.g. near the end of a file. A zero-length #GBytes is
298  * returned on end of file (or if @count is zero), but never
299  * otherwise.
300  *
301  * If @cancellable is not %NULL, then the operation can be cancelled by
302  * triggering the cancellable object from another thread. If the operation
303  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304  * operation was partially finished when the operation was cancelled the
305  * partial result will be returned, without an error.
306  *
307  * On error %NULL is returned and @error is set accordingly.
308  *
309  * Return value: a new #GBytes, or %NULL on error
310  **/
311 GBytes *
312 g_input_stream_read_bytes (GInputStream  *stream,
313                            gsize          count,
314                            GCancellable  *cancellable,
315                            GError       **error)
316 {
317   guchar *buf;
318   gssize nread;
319
320   buf = g_malloc (count);
321   nread = g_input_stream_read (stream, buf, count, cancellable, error);
322   if (nread == -1)
323     {
324       g_free (buf);
325       return NULL;
326     }
327   else if (nread == 0)
328     {
329       g_free (buf);
330       return g_bytes_new_static ("", 0);
331     }
332   else
333     return g_bytes_new_take (buf, nread);
334 }
335
336 /**
337  * g_input_stream_skip:
338  * @stream: a #GInputStream.
339  * @count: the number of bytes that will be skipped from the stream
340  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
341  * @error: location to store the error occurring, or %NULL to ignore
342  *
343  * Tries to skip @count bytes from the stream. Will block during the operation.
344  *
345  * This is identical to g_input_stream_read(), from a behaviour standpoint,
346  * but the bytes that are skipped are not returned to the user. Some
347  * streams have an implementation that is more efficient than reading the data.
348  *
349  * This function is optional for inherited classes, as the default implementation
350  * emulates it using read.
351  *
352  * If @cancellable is not %NULL, then the operation can be cancelled by
353  * triggering the cancellable object from another thread. If the operation
354  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
355  * operation was partially finished when the operation was cancelled the
356  * partial result will be returned, without an error.
357  *
358  * Return value: Number of bytes skipped, or -1 on error
359  **/
360 gssize
361 g_input_stream_skip (GInputStream  *stream,
362                      gsize          count,
363                      GCancellable  *cancellable,
364                      GError       **error)
365 {
366   GInputStreamClass *class;
367   gssize res;
368
369   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
370
371   if (count == 0)
372     return 0;
373
374   if (((gssize) count) < 0)
375     {
376       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
377                    _("Too large count value passed to %s"), G_STRFUNC);
378       return -1;
379     }
380   
381   class = G_INPUT_STREAM_GET_CLASS (stream);
382
383   if (!g_input_stream_set_pending (stream, error))
384     return -1;
385
386   if (cancellable)
387     g_cancellable_push_current (cancellable);
388   
389   res = class->skip (stream, count, cancellable, error);
390
391   if (cancellable)
392     g_cancellable_pop_current (cancellable);
393   
394   g_input_stream_clear_pending (stream);
395
396   return res;
397 }
398
399 static gssize
400 g_input_stream_real_skip (GInputStream  *stream,
401                           gsize          count,
402                           GCancellable  *cancellable,
403                           GError       **error)
404 {
405   GInputStreamClass *class;
406   gssize ret, read_bytes;
407   char buffer[8192];
408   GError *my_error;
409
410   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
411     {
412       if (g_seekable_seek (G_SEEKABLE (stream),
413                            count,
414                            G_SEEK_CUR,
415                            cancellable,
416                            NULL))
417         return count;
418     }
419
420   /* If not seekable, or seek failed, fall back to reading data: */
421
422   class = G_INPUT_STREAM_GET_CLASS (stream);
423
424   read_bytes = 0;
425   while (1)
426     {
427       my_error = NULL;
428
429       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
430                             cancellable, &my_error);
431       if (ret == -1)
432         {
433           if (read_bytes > 0 &&
434               my_error->domain == G_IO_ERROR &&
435               my_error->code == G_IO_ERROR_CANCELLED)
436             {
437               g_error_free (my_error);
438               return read_bytes;
439             }
440
441           g_propagate_error (error, my_error);
442           return -1;
443         }
444
445       count -= ret;
446       read_bytes += ret;
447
448       if (ret == 0 || count == 0)
449         return read_bytes;
450     }
451 }
452
453 /**
454  * g_input_stream_close:
455  * @stream: A #GInputStream.
456  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
457  * @error: location to store the error occurring, or %NULL to ignore
458  *
459  * Closes the stream, releasing resources related to it.
460  *
461  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
462  * Closing a stream multiple times will not return an error.
463  *
464  * Streams will be automatically closed when the last reference
465  * is dropped, but you might want to call this function to make sure 
466  * resources are released as early as possible.
467  *
468  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
469  * open after the stream is closed. See the documentation for the individual
470  * stream for details.
471  *
472  * On failure the first error that happened will be reported, but the close
473  * operation will finish as much as possible. A stream that failed to
474  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
475  * is important to check and report the error to the user.
476  *
477  * If @cancellable is not %NULL, then the operation can be cancelled by
478  * triggering the cancellable object from another thread. If the operation
479  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
480  * Cancelling a close will still leave the stream closed, but some streams
481  * can use a faster close that doesn't block to e.g. check errors. 
482  *
483  * Return value: %TRUE on success, %FALSE on failure
484  **/
485 gboolean
486 g_input_stream_close (GInputStream  *stream,
487                       GCancellable  *cancellable,
488                       GError       **error)
489 {
490   GInputStreamClass *class;
491   gboolean res;
492
493   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
494
495   class = G_INPUT_STREAM_GET_CLASS (stream);
496
497   if (stream->priv->closed)
498     return TRUE;
499
500   res = TRUE;
501
502   if (!g_input_stream_set_pending (stream, error))
503     return FALSE;
504
505   if (cancellable)
506     g_cancellable_push_current (cancellable);
507
508   if (class->close_fn)
509     res = class->close_fn (stream, cancellable, error);
510
511   if (cancellable)
512     g_cancellable_pop_current (cancellable);
513
514   g_input_stream_clear_pending (stream);
515   
516   stream->priv->closed = TRUE;
517   
518   return res;
519 }
520
521 static void
522 async_ready_callback_wrapper (GObject      *source_object,
523                               GAsyncResult *res,
524                               gpointer      user_data)
525 {
526   GInputStream *stream = G_INPUT_STREAM (source_object);
527
528   g_input_stream_clear_pending (stream);
529   if (stream->priv->outstanding_callback)
530     (*stream->priv->outstanding_callback) (source_object, res, user_data);
531   g_object_unref (stream);
532 }
533
534 static void
535 async_ready_close_callback_wrapper (GObject      *source_object,
536                                     GAsyncResult *res,
537                                     gpointer      user_data)
538 {
539   GInputStream *stream = G_INPUT_STREAM (source_object);
540
541   g_input_stream_clear_pending (stream);
542   stream->priv->closed = TRUE;
543   if (stream->priv->outstanding_callback)
544     (*stream->priv->outstanding_callback) (source_object, res, user_data);
545   g_object_unref (stream);
546 }
547
548 /**
549  * g_input_stream_read_async:
550  * @stream: A #GInputStream.
551  * @buffer: (array length=count) (element-type guint8): a buffer to
552  *     read data into (which should be at least count bytes long).
553  * @count: the number of bytes that will be read from the stream
554  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
555  * of the request. 
556  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
557  * @callback: (scope async): callback to call when the request is satisfied
558  * @user_data: (closure): the data to pass to callback function
559  *
560  * Request an asynchronous read of @count bytes from the stream into the buffer
561  * starting at @buffer. When the operation is finished @callback will be called. 
562  * You can then call g_input_stream_read_finish() to get the result of the 
563  * operation.
564  *
565  * During an async request no other sync and async calls are allowed on @stream, and will
566  * result in %G_IO_ERROR_PENDING errors. 
567  *
568  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
569  *
570  * On success, the number of bytes read into the buffer will be passed to the
571  * callback. It is not an error if this is not the same as the requested size, as it
572  * can happen e.g. near the end of a file, but generally we try to read
573  * as many bytes as requested. Zero is returned on end of file
574  * (or if @count is zero),  but never otherwise.
575  *
576  * Any outstanding i/o request with higher priority (lower numerical value) will
577  * be executed before an outstanding request with lower priority. Default
578  * priority is %G_PRIORITY_DEFAULT.
579  *
580  * The asyncronous methods have a default fallback that uses threads to implement
581  * asynchronicity, so they are optional for inheriting classes. However, if you
582  * override one you must override all.
583  **/
584 void
585 g_input_stream_read_async (GInputStream        *stream,
586                            void                *buffer,
587                            gsize                count,
588                            int                  io_priority,
589                            GCancellable        *cancellable,
590                            GAsyncReadyCallback  callback,
591                            gpointer             user_data)
592 {
593   GInputStreamClass *class;
594   GError *error = NULL;
595
596   g_return_if_fail (G_IS_INPUT_STREAM (stream));
597   g_return_if_fail (buffer != NULL);
598
599   if (count == 0)
600     {
601       GTask *task;
602
603       task = g_task_new (stream, cancellable, callback, user_data);
604       g_task_set_source_tag (task, g_input_stream_read_async);
605       g_task_return_int (task, 0);
606       g_object_unref (task);
607       return;
608     }
609   
610   if (((gssize) count) < 0)
611     {
612       g_task_report_new_error (stream, callback, user_data,
613                                g_input_stream_read_async,
614                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
615                                _("Too large count value passed to %s"),
616                                G_STRFUNC);
617       return;
618     }
619
620   if (!g_input_stream_set_pending (stream, &error))
621     {
622       g_task_report_error (stream, callback, user_data,
623                            g_input_stream_read_async,
624                            error);
625       return;
626     }
627
628   class = G_INPUT_STREAM_GET_CLASS (stream);
629   stream->priv->outstanding_callback = callback;
630   g_object_ref (stream);
631   class->read_async (stream, buffer, count, io_priority, cancellable,
632                      async_ready_callback_wrapper, user_data);
633 }
634
635 /**
636  * g_input_stream_read_finish:
637  * @stream: a #GInputStream.
638  * @result: a #GAsyncResult.
639  * @error: a #GError location to store the error occurring, or %NULL to 
640  * ignore.
641  * 
642  * Finishes an asynchronous stream read operation. 
643  * 
644  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
645  **/
646 gssize
647 g_input_stream_read_finish (GInputStream  *stream,
648                             GAsyncResult  *result,
649                             GError       **error)
650 {
651   GInputStreamClass *class;
652   
653   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
654   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
655
656   if (g_async_result_legacy_propagate_error (result, error))
657     return -1;
658   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
659     return g_task_propagate_int (G_TASK (result), error);
660
661   class = G_INPUT_STREAM_GET_CLASS (stream);
662   return class->read_finish (stream, result, error);
663 }
664
665 static void
666 read_bytes_callback (GObject      *stream,
667                      GAsyncResult *result,
668                      gpointer      user_data)
669 {
670   GTask *task = user_data;
671   guchar *buf = g_task_get_task_data (task);
672   GError *error = NULL;
673   gssize nread;
674   GBytes *bytes = NULL;
675
676   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
677                                       result, &error);
678   if (nread == -1)
679     {
680       g_free (buf);
681       g_task_return_error (task, error);
682     }
683   else if (nread == 0)
684     {
685       g_free (buf);
686       bytes = g_bytes_new_static ("", 0);
687     }
688   else
689     bytes = g_bytes_new_take (buf, nread);
690
691   if (bytes)
692     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
693
694   g_object_unref (task);
695 }
696
697 /**
698  * g_input_stream_read_bytes_async:
699  * @stream: A #GInputStream.
700  * @count: the number of bytes that will be read from the stream
701  * @io_priority: the <link linkend="io-priority">I/O priority</link>
702  *   of the request.
703  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
704  * @callback: (scope async): callback to call when the request is satisfied
705  * @user_data: (closure): the data to pass to callback function
706  *
707  * Request an asynchronous read of @count bytes from the stream into a
708  * new #GBytes. When the operation is finished @callback will be
709  * called. You can then call g_input_stream_read_bytes_finish() to get the
710  * result of the operation.
711  *
712  * During an async request no other sync and async calls are allowed
713  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
714  *
715  * A value of @count larger than %G_MAXSSIZE will cause a
716  * %G_IO_ERROR_INVALID_ARGUMENT error.
717  *
718  * On success, the new #GBytes will be passed to the callback. It is
719  * not an error if this is smaller than the requested size, as it can
720  * happen e.g. near the end of a file, but generally we try to read as
721  * many bytes as requested. Zero is returned on end of file (or if
722  * @count is zero), but never otherwise.
723  *
724  * Any outstanding I/O request with higher priority (lower numerical
725  * value) will be executed before an outstanding request with lower
726  * priority. Default priority is %G_PRIORITY_DEFAULT.
727  **/
728 void
729 g_input_stream_read_bytes_async (GInputStream          *stream,
730                                  gsize                  count,
731                                  int                    io_priority,
732                                  GCancellable          *cancellable,
733                                  GAsyncReadyCallback    callback,
734                                  gpointer               user_data)
735 {
736   GTask *task;
737   guchar *buf;
738
739   task = g_task_new (stream, cancellable, callback, user_data);
740   buf = g_malloc (count);
741   g_task_set_task_data (task, buf, NULL);
742
743   g_input_stream_read_async (stream, buf, count,
744                              io_priority, cancellable,
745                              read_bytes_callback, task);
746 }
747
748 /**
749  * g_input_stream_read_bytes_finish:
750  * @stream: a #GInputStream.
751  * @result: a #GAsyncResult.
752  * @error: a #GError location to store the error occurring, or %NULL to
753  *   ignore.
754  *
755  * Finishes an asynchronous stream read-into-#GBytes operation.
756  *
757  * Returns: the newly-allocated #GBytes, or %NULL on error
758  **/
759 GBytes *
760 g_input_stream_read_bytes_finish (GInputStream  *stream,
761                                   GAsyncResult  *result,
762                                   GError       **error)
763 {
764   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
765   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
766
767   return g_task_propagate_pointer (G_TASK (result), error);
768 }
769
770 /**
771  * g_input_stream_skip_async:
772  * @stream: A #GInputStream.
773  * @count: the number of bytes that will be skipped from the stream
774  * @io_priority: the <link linkend="io-priority">I/O priority</link>
775  * of the request.
776  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
777  * @callback: (scope async): callback to call when the request is satisfied
778  * @user_data: (closure): the data to pass to callback function
779  *
780  * Request an asynchronous skip of @count bytes from the stream.
781  * When the operation is finished @callback will be called.
782  * You can then call g_input_stream_skip_finish() to get the result
783  * of the operation.
784  *
785  * During an async request no other sync and async calls are allowed,
786  * and will result in %G_IO_ERROR_PENDING errors.
787  *
788  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
789  *
790  * On success, the number of bytes skipped will be passed to the callback.
791  * It is not an error if this is not the same as the requested size, as it
792  * can happen e.g. near the end of a file, but generally we try to skip
793  * as many bytes as requested. Zero is returned on end of file
794  * (or if @count is zero), but never otherwise.
795  *
796  * Any outstanding i/o request with higher priority (lower numerical value)
797  * will be executed before an outstanding request with lower priority.
798  * Default priority is %G_PRIORITY_DEFAULT.
799  *
800  * The asynchronous methods have a default fallback that uses threads to
801  * implement asynchronicity, so they are optional for inheriting classes.
802  * However, if you override one, you must override all.
803  **/
804 void
805 g_input_stream_skip_async (GInputStream        *stream,
806                            gsize                count,
807                            int                  io_priority,
808                            GCancellable        *cancellable,
809                            GAsyncReadyCallback  callback,
810                            gpointer             user_data)
811 {
812   GInputStreamClass *class;
813   GError *error = NULL;
814
815   g_return_if_fail (G_IS_INPUT_STREAM (stream));
816
817   if (count == 0)
818     {
819       GTask *task;
820
821       task = g_task_new (stream, cancellable, callback, user_data);
822       g_task_set_source_tag (task, g_input_stream_skip_async);
823       g_task_return_int (task, 0);
824       g_object_unref (task);
825       return;
826     }
827   
828   if (((gssize) count) < 0)
829     {
830       g_task_report_new_error (stream, callback, user_data,
831                                g_input_stream_skip_async,
832                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
833                                _("Too large count value passed to %s"),
834                                G_STRFUNC);
835       return;
836     }
837
838   if (!g_input_stream_set_pending (stream, &error))
839     {
840       g_task_report_error (stream, callback, user_data,
841                            g_input_stream_skip_async,
842                            error);
843       return;
844     }
845
846   class = G_INPUT_STREAM_GET_CLASS (stream);
847   stream->priv->outstanding_callback = callback;
848   g_object_ref (stream);
849   class->skip_async (stream, count, io_priority, cancellable,
850                      async_ready_callback_wrapper, user_data);
851 }
852
853 /**
854  * g_input_stream_skip_finish:
855  * @stream: a #GInputStream.
856  * @result: a #GAsyncResult.
857  * @error: a #GError location to store the error occurring, or %NULL to 
858  * ignore.
859  * 
860  * Finishes a stream skip operation.
861  * 
862  * Returns: the size of the bytes skipped, or %-1 on error.
863  **/
864 gssize
865 g_input_stream_skip_finish (GInputStream  *stream,
866                             GAsyncResult  *result,
867                             GError       **error)
868 {
869   GInputStreamClass *class;
870
871   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
872   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
873
874   if (g_async_result_legacy_propagate_error (result, error))
875     return -1;
876   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
877     return g_task_propagate_int (G_TASK (result), error);
878
879   class = G_INPUT_STREAM_GET_CLASS (stream);
880   return class->skip_finish (stream, result, error);
881 }
882
883 /**
884  * g_input_stream_close_async:
885  * @stream: A #GInputStream.
886  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
887  * of the request. 
888  * @cancellable: (allow-none): optional cancellable object
889  * @callback: (scope async): callback to call when the request is satisfied
890  * @user_data: (closure): the data to pass to callback function
891  *
892  * Requests an asynchronous closes of the stream, releasing resources related to it.
893  * When the operation is finished @callback will be called. 
894  * You can then call g_input_stream_close_finish() to get the result of the 
895  * operation.
896  *
897  * For behaviour details see g_input_stream_close().
898  *
899  * The asyncronous methods have a default fallback that uses threads to implement
900  * asynchronicity, so they are optional for inheriting classes. However, if you
901  * override one you must override all.
902  **/
903 void
904 g_input_stream_close_async (GInputStream        *stream,
905                             int                  io_priority,
906                             GCancellable        *cancellable,
907                             GAsyncReadyCallback  callback,
908                             gpointer             user_data)
909 {
910   GInputStreamClass *class;
911   GError *error = NULL;
912
913   g_return_if_fail (G_IS_INPUT_STREAM (stream));
914
915   if (stream->priv->closed)
916     {
917       GTask *task;
918
919       task = g_task_new (stream, cancellable, callback, user_data);
920       g_task_set_source_tag (task, g_input_stream_close_async);
921       g_task_return_boolean (task, TRUE);
922       g_object_unref (task);
923       return;
924     }
925
926   if (!g_input_stream_set_pending (stream, &error))
927     {
928       g_task_report_error (stream, callback, user_data,
929                            g_input_stream_close_async,
930                            error);
931       return;
932     }
933   
934   class = G_INPUT_STREAM_GET_CLASS (stream);
935   stream->priv->outstanding_callback = callback;
936   g_object_ref (stream);
937   class->close_async (stream, io_priority, cancellable,
938                       async_ready_close_callback_wrapper, user_data);
939 }
940
941 /**
942  * g_input_stream_close_finish:
943  * @stream: a #GInputStream.
944  * @result: a #GAsyncResult.
945  * @error: a #GError location to store the error occurring, or %NULL to 
946  * ignore.
947  * 
948  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
949  * 
950  * Returns: %TRUE if the stream was closed successfully.
951  **/
952 gboolean
953 g_input_stream_close_finish (GInputStream  *stream,
954                              GAsyncResult  *result,
955                              GError       **error)
956 {
957   GInputStreamClass *class;
958
959   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
960   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
961
962   if (g_async_result_legacy_propagate_error (result, error))
963     return FALSE;
964   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
965     return g_task_propagate_boolean (G_TASK (result), error);
966
967   class = G_INPUT_STREAM_GET_CLASS (stream);
968   return class->close_finish (stream, result, error);
969 }
970
971 /**
972  * g_input_stream_is_closed:
973  * @stream: input stream.
974  * 
975  * Checks if an input stream is closed.
976  * 
977  * Returns: %TRUE if the stream is closed.
978  **/
979 gboolean
980 g_input_stream_is_closed (GInputStream *stream)
981 {
982   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
983   
984   return stream->priv->closed;
985 }
986  
987 /**
988  * g_input_stream_has_pending:
989  * @stream: input stream.
990  * 
991  * Checks if an input stream has pending actions.
992  * 
993  * Returns: %TRUE if @stream has pending actions.
994  **/  
995 gboolean
996 g_input_stream_has_pending (GInputStream *stream)
997 {
998   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
999   
1000   return stream->priv->pending;
1001 }
1002
1003 /**
1004  * g_input_stream_set_pending:
1005  * @stream: input stream
1006  * @error: a #GError location to store the error occurring, or %NULL to 
1007  * ignore.
1008  * 
1009  * Sets @stream to have actions pending. If the pending flag is
1010  * already set or @stream is closed, it will return %FALSE and set
1011  * @error.
1012  *
1013  * Return value: %TRUE if pending was previously unset and is now set.
1014  **/
1015 gboolean
1016 g_input_stream_set_pending (GInputStream *stream, GError **error)
1017 {
1018   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1019   
1020   if (stream->priv->closed)
1021     {
1022       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1023                            _("Stream is already closed"));
1024       return FALSE;
1025     }
1026   
1027   if (stream->priv->pending)
1028     {
1029       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1030                 /* Translators: This is an error you get if there is already an
1031                  * operation running against this stream when you try to start
1032                  * one */
1033                  _("Stream has outstanding operation"));
1034       return FALSE;
1035     }
1036   
1037   stream->priv->pending = TRUE;
1038   return TRUE;
1039 }
1040
1041 /**
1042  * g_input_stream_clear_pending:
1043  * @stream: input stream
1044  * 
1045  * Clears the pending flag on @stream.
1046  **/
1047 void
1048 g_input_stream_clear_pending (GInputStream *stream)
1049 {
1050   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1051   
1052   stream->priv->pending = FALSE;
1053 }
1054
1055 /********************************************
1056  *   Default implementation of async ops    *
1057  ********************************************/
1058
1059 typedef struct {
1060   void   *buffer;
1061   gsize   count;
1062 } ReadData;
1063
1064 static void
1065 free_read_data (ReadData *op)
1066 {
1067   g_slice_free (ReadData, op);
1068 }
1069
1070 static void
1071 read_async_thread (GTask        *task,
1072                    gpointer      source_object,
1073                    gpointer      task_data,
1074                    GCancellable *cancellable)
1075 {
1076   GInputStream *stream = source_object;
1077   ReadData *op = task_data;
1078   GInputStreamClass *class;
1079   GError *error = NULL;
1080   gssize nread;
1081  
1082   class = G_INPUT_STREAM_GET_CLASS (stream);
1083
1084   nread = class->read_fn (stream,
1085                           op->buffer, op->count,
1086                           g_task_get_cancellable (task),
1087                           &error);
1088   if (nread == -1)
1089     g_task_return_error (task, error);
1090   else
1091     g_task_return_int (task, nread);
1092 }
1093
1094 static void read_async_pollable (GPollableInputStream *stream,
1095                                  GTask                *task);
1096
1097 static gboolean
1098 read_async_pollable_ready (GPollableInputStream *stream,
1099                            gpointer              user_data)
1100 {
1101   GTask *task = user_data;
1102
1103   read_async_pollable (stream, task);
1104   return FALSE;
1105 }
1106
1107 static void
1108 read_async_pollable (GPollableInputStream *stream,
1109                      GTask                *task)
1110 {
1111   ReadData *op = g_task_get_task_data (task);
1112   GError *error = NULL;
1113   gssize nread;
1114
1115   if (g_task_return_error_if_cancelled (task))
1116     return;
1117
1118   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1119     read_nonblocking (stream, op->buffer, op->count, &error);
1120
1121   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1122     {
1123       GSource *source;
1124
1125       g_error_free (error);
1126
1127       source = g_pollable_input_stream_create_source (stream,
1128                                                       g_task_get_cancellable (task));
1129       g_task_attach_source (task, source,
1130                             (GSourceFunc) read_async_pollable_ready);
1131       g_source_unref (source);
1132       return;
1133     }
1134
1135   if (nread == -1)
1136     g_task_return_error (task, error);
1137   else
1138     g_task_return_int (task, nread);
1139   /* g_input_stream_real_read_async() unrefs task */
1140 }
1141
1142 #define CAN_DO_NONBLOCKING_READS(stream) \
1143   (G_IS_POLLABLE_INPUT_STREAM (stream) && \
1144    g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
1145
1146
1147 static void
1148 g_input_stream_real_read_async (GInputStream        *stream,
1149                                 void                *buffer,
1150                                 gsize                count,
1151                                 int                  io_priority,
1152                                 GCancellable        *cancellable,
1153                                 GAsyncReadyCallback  callback,
1154                                 gpointer             user_data)
1155 {
1156   GTask *task;
1157   ReadData *op;
1158   
1159   op = g_slice_new0 (ReadData);
1160   task = g_task_new (stream, cancellable, callback, user_data);
1161   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1162   g_task_set_priority (task, io_priority);
1163   op->buffer = buffer;
1164   op->count = count;
1165
1166   if (CAN_DO_NONBLOCKING_READS (stream))
1167     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1168   else
1169     g_task_run_in_thread (task, read_async_thread);
1170   g_object_unref (task);
1171 }
1172
1173 static gssize
1174 g_input_stream_real_read_finish (GInputStream  *stream,
1175                                  GAsyncResult  *result,
1176                                  GError       **error)
1177 {
1178   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1179
1180   return g_task_propagate_int (G_TASK (result), error);
1181 }
1182
1183
1184 static void
1185 skip_async_thread (GTask        *task,
1186                    gpointer      source_object,
1187                    gpointer      task_data,
1188                    GCancellable *cancellable)
1189 {
1190   GInputStream *stream = source_object;
1191   gsize count = GPOINTER_TO_SIZE (task_data);
1192   GInputStreamClass *class;
1193   GError *error = NULL;
1194   gssize ret;
1195
1196   class = G_INPUT_STREAM_GET_CLASS (stream);
1197   ret = class->skip (stream, count,
1198                      g_task_get_cancellable (task),
1199                      &error);
1200   if (ret == -1)
1201     g_task_return_error (task, error);
1202   else
1203     g_task_return_int (task, ret);
1204 }
1205
1206 typedef struct {
1207   char buffer[8192];
1208   gsize count;
1209   gsize count_skipped;
1210 } SkipFallbackAsyncData;
1211
1212 static void
1213 skip_callback_wrapper (GObject      *source_object,
1214                        GAsyncResult *res,
1215                        gpointer      user_data)
1216 {
1217   GInputStreamClass *class;
1218   GTask *task = user_data;
1219   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1220   GError *error = NULL;
1221   gssize ret;
1222
1223   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1224
1225   if (ret > 0)
1226     {
1227       data->count -= ret;
1228       data->count_skipped += ret;
1229
1230       if (data->count > 0)
1231         {
1232           class = G_INPUT_STREAM_GET_CLASS (source_object);
1233           class->read_async (G_INPUT_STREAM (source_object),
1234                              data->buffer, MIN (8192, data->count),
1235                              g_task_get_priority (task),
1236                              g_task_get_cancellable (task),
1237                              skip_callback_wrapper, task);
1238           return;
1239         }
1240     }
1241
1242   if (ret == -1 &&
1243       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1244       data->count_skipped)
1245     {
1246       /* No error, return partial read */
1247       g_clear_error (&error);
1248     }
1249
1250   if (error)
1251     g_task_return_error (task, error);
1252   else
1253     g_task_return_int (task, data->count_skipped);
1254   g_object_unref (task);
1255  }
1256
1257 static void
1258 g_input_stream_real_skip_async (GInputStream        *stream,
1259                                 gsize                count,
1260                                 int                  io_priority,
1261                                 GCancellable        *cancellable,
1262                                 GAsyncReadyCallback  callback,
1263                                 gpointer             user_data)
1264 {
1265   GInputStreamClass *class;
1266   SkipFallbackAsyncData *data;
1267   GTask *task;
1268
1269   class = G_INPUT_STREAM_GET_CLASS (stream);
1270
1271   task = g_task_new (stream, cancellable, callback, user_data);
1272   g_task_set_priority (task, io_priority);
1273
1274   if (class->read_async == g_input_stream_real_read_async &&
1275       !CAN_DO_NONBLOCKING_READS (stream))
1276     {
1277       /* Read is thread-using async fallback.
1278        * Make skip use threads too, so that we can use a possible sync skip
1279        * implementation. */
1280       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1281
1282       g_task_run_in_thread (task, skip_async_thread);
1283       g_object_unref (task);
1284     }
1285   else
1286     {
1287       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1288       
1289       /* There is a custom async read function, lets use that. */
1290       data = g_new (SkipFallbackAsyncData, 1);
1291       data->count = count;
1292       data->count_skipped = 0;
1293       g_task_set_task_data (task, data, g_free);
1294       g_task_set_check_cancellable (task, FALSE);
1295       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1296                          skip_callback_wrapper, task);
1297     }
1298
1299 }
1300
1301 static gssize
1302 g_input_stream_real_skip_finish (GInputStream  *stream,
1303                                  GAsyncResult  *result,
1304                                  GError       **error)
1305 {
1306   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1307
1308   return g_task_propagate_int (G_TASK (result), error);
1309 }
1310
1311 static void
1312 close_async_thread (GTask        *task,
1313                     gpointer      source_object,
1314                     gpointer      task_data,
1315                     GCancellable *cancellable)
1316 {
1317   GInputStream *stream = source_object;
1318   GInputStreamClass *class;
1319   GError *error = NULL;
1320   gboolean result;
1321
1322   class = G_INPUT_STREAM_GET_CLASS (stream);
1323   if (class->close_fn)
1324     {
1325       result = class->close_fn (stream,
1326                                 g_task_get_cancellable (task),
1327                                 &error);
1328       if (!result)
1329         {
1330           g_task_return_error (task, error);
1331           return;
1332         }
1333     }
1334
1335   g_task_return_boolean (task, TRUE);
1336 }
1337
1338 static void
1339 g_input_stream_real_close_async (GInputStream        *stream,
1340                                  int                  io_priority,
1341                                  GCancellable        *cancellable,
1342                                  GAsyncReadyCallback  callback,
1343                                  gpointer             user_data)
1344 {
1345   GTask *task;
1346
1347   task = g_task_new (stream, cancellable, callback, user_data);
1348   g_task_set_check_cancellable (task, FALSE);
1349   g_task_set_priority (task, io_priority);
1350   
1351   g_task_run_in_thread (task, close_async_thread);
1352   g_object_unref (task);
1353 }
1354
1355 static gboolean
1356 g_input_stream_real_close_finish (GInputStream  *stream,
1357                                   GAsyncResult  *result,
1358                                   GError       **error)
1359 {
1360   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1361
1362   return g_task_propagate_boolean (G_TASK (result), error);
1363 }