docs: use "Returns:" consistently
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GInputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   GAsyncReadyCallback outstanding_callback;
52 };
53
54 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
55
56 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
57                                                   gsize                 count,
58                                                   GCancellable         *cancellable,
59                                                   GError              **error);
60 static void     g_input_stream_real_read_async   (GInputStream         *stream,
61                                                   void                 *buffer,
62                                                   gsize                 count,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              user_data);
67 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
71                                                   gsize                 count,
72                                                   int                   io_priority,
73                                                   GCancellable         *cancellable,
74                                                   GAsyncReadyCallback   callback,
75                                                   gpointer              data);
76 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
77                                                   GAsyncResult         *result,
78                                                   GError              **error);
79 static void     g_input_stream_real_close_async  (GInputStream         *stream,
80                                                   int                   io_priority,
81                                                   GCancellable         *cancellable,
82                                                   GAsyncReadyCallback   callback,
83                                                   gpointer              data);
84 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
85                                                   GAsyncResult         *result,
86                                                   GError              **error);
87
88 static void
89 g_input_stream_dispose (GObject *object)
90 {
91   GInputStream *stream;
92
93   stream = G_INPUT_STREAM (object);
94   
95   if (!stream->priv->closed)
96     g_input_stream_close (stream, NULL, NULL);
97
98   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
99 }
100
101
102 static void
103 g_input_stream_class_init (GInputStreamClass *klass)
104 {
105   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
106   
107   gobject_class->dispose = g_input_stream_dispose;
108   
109   klass->skip = g_input_stream_real_skip;
110   klass->read_async = g_input_stream_real_read_async;
111   klass->read_finish = g_input_stream_real_read_finish;
112   klass->skip_async = g_input_stream_real_skip_async;
113   klass->skip_finish = g_input_stream_real_skip_finish;
114   klass->close_async = g_input_stream_real_close_async;
115   klass->close_finish = g_input_stream_real_close_finish;
116 }
117
118 static void
119 g_input_stream_init (GInputStream *stream)
120 {
121   stream->priv = g_input_stream_get_instance_private (stream);
122 }
123
124 /**
125  * g_input_stream_read:
126  * @stream: a #GInputStream.
127  * @buffer: (array length=count) (element-type guint8): a buffer to
128  *     read data into (which should be at least count bytes long).
129  * @count: the number of bytes that will be read from the stream
130  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
131  * @error: location to store the error occurring, or %NULL to ignore
132  *
133  * Tries to read @count bytes from the stream into the buffer starting at
134  * @buffer. Will block during this read.
135  * 
136  * If count is zero returns zero and does nothing. A value of @count
137  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
138  *
139  * On success, the number of bytes read into the buffer is returned.
140  * It is not an error if this is not the same as the requested size, as it
141  * can happen e.g. near the end of a file. Zero is returned on end of file
142  * (or if @count is zero),  but never otherwise.
143  *
144  * If @cancellable is not %NULL, then the operation can be cancelled by
145  * triggering the cancellable object from another thread. If the operation
146  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
147  * operation was partially finished when the operation was cancelled the
148  * partial result will be returned, without an error.
149  *
150  * On error -1 is returned and @error is set accordingly.
151  * 
152  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
153  **/
154 gssize
155 g_input_stream_read  (GInputStream  *stream,
156                       void          *buffer,
157                       gsize          count,
158                       GCancellable  *cancellable,
159                       GError       **error)
160 {
161   GInputStreamClass *class;
162   gssize res;
163
164   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
165   g_return_val_if_fail (buffer != NULL, 0);
166
167   if (count == 0)
168     return 0;
169   
170   if (((gssize) count) < 0)
171     {
172       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
173                    _("Too large count value passed to %s"), G_STRFUNC);
174       return -1;
175     }
176
177   class = G_INPUT_STREAM_GET_CLASS (stream);
178
179   if (class->read_fn == NULL) 
180     {
181       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
182                            _("Input stream doesn't implement read"));
183       return -1;
184     }
185
186   if (!g_input_stream_set_pending (stream, error))
187     return -1;
188
189   if (cancellable)
190     g_cancellable_push_current (cancellable);
191   
192   res = class->read_fn (stream, buffer, count, cancellable, error);
193
194   if (cancellable)
195     g_cancellable_pop_current (cancellable);
196   
197   g_input_stream_clear_pending (stream);
198
199   return res;
200 }
201
202 /**
203  * g_input_stream_read_all:
204  * @stream: a #GInputStream.
205  * @buffer: (array length=count) (element-type guint8): a buffer to
206  *     read data into (which should be at least count bytes long).
207  * @count: the number of bytes that will be read from the stream
208  * @bytes_read: (out): location to store the number of bytes that was read from the stream
209  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
210  * @error: location to store the error occurring, or %NULL to ignore
211  *
212  * Tries to read @count bytes from the stream into the buffer starting at
213  * @buffer. Will block during this read.
214  *
215  * This function is similar to g_input_stream_read(), except it tries to
216  * read as many bytes as requested, only stopping on an error or end of stream.
217  *
218  * On a successful read of @count bytes, or if we reached the end of the
219  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
220  * read into @buffer.
221  * 
222  * If there is an error during the operation %FALSE is returned and @error
223  * is set to indicate the error status, @bytes_read is updated to contain
224  * the number of bytes read into @buffer before the error occurred.
225  *
226  * Returns: %TRUE on success, %FALSE if there was an error
227  **/
228 gboolean
229 g_input_stream_read_all (GInputStream  *stream,
230                          void          *buffer,
231                          gsize          count,
232                          gsize         *bytes_read,
233                          GCancellable  *cancellable,
234                          GError       **error)
235 {
236   gsize _bytes_read;
237   gssize res;
238
239   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
240   g_return_val_if_fail (buffer != NULL, FALSE);
241
242   _bytes_read = 0;
243   while (_bytes_read < count)
244     {
245       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
246                                  cancellable, error);
247       if (res == -1)
248         {
249           if (bytes_read)
250             *bytes_read = _bytes_read;
251           return FALSE;
252         }
253       
254       if (res == 0)
255         break;
256
257       _bytes_read += res;
258     }
259
260   if (bytes_read)
261     *bytes_read = _bytes_read;
262   return TRUE;
263 }
264
265 /**
266  * g_input_stream_read_bytes:
267  * @stream: a #GInputStream.
268  * @count: maximum number of bytes that will be read from the stream. Common
269  * values include 4096 and 8192.
270  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
271  * @error: location to store the error occurring, or %NULL to ignore
272  *
273  * Like g_input_stream_read(), this tries to read @count bytes from
274  * the stream in a blocking fashion. However, rather than reading into
275  * a user-supplied buffer, this will create a new #GBytes containing
276  * the data that was read. This may be easier to use from language
277  * bindings.
278  *
279  * If count is zero, returns a zero-length #GBytes and does nothing. A
280  * value of @count larger than %G_MAXSSIZE will cause a
281  * %G_IO_ERROR_INVALID_ARGUMENT error.
282  *
283  * On success, a new #GBytes is returned. It is not an error if the
284  * size of this object is not the same as the requested size, as it
285  * can happen e.g. near the end of a file. A zero-length #GBytes is
286  * returned on end of file (or if @count is zero), but never
287  * otherwise.
288  *
289  * If @cancellable is not %NULL, then the operation can be cancelled by
290  * triggering the cancellable object from another thread. If the operation
291  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
292  * operation was partially finished when the operation was cancelled the
293  * partial result will be returned, without an error.
294  *
295  * On error %NULL is returned and @error is set accordingly.
296  *
297  * Returns: a new #GBytes, or %NULL on error
298  **/
299 GBytes *
300 g_input_stream_read_bytes (GInputStream  *stream,
301                            gsize          count,
302                            GCancellable  *cancellable,
303                            GError       **error)
304 {
305   guchar *buf;
306   gssize nread;
307
308   buf = g_malloc (count);
309   nread = g_input_stream_read (stream, buf, count, cancellable, error);
310   if (nread == -1)
311     {
312       g_free (buf);
313       return NULL;
314     }
315   else if (nread == 0)
316     {
317       g_free (buf);
318       return g_bytes_new_static ("", 0);
319     }
320   else
321     return g_bytes_new_take (buf, nread);
322 }
323
324 /**
325  * g_input_stream_skip:
326  * @stream: a #GInputStream.
327  * @count: the number of bytes that will be skipped from the stream
328  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
329  * @error: location to store the error occurring, or %NULL to ignore
330  *
331  * Tries to skip @count bytes from the stream. Will block during the operation.
332  *
333  * This is identical to g_input_stream_read(), from a behaviour standpoint,
334  * but the bytes that are skipped are not returned to the user. Some
335  * streams have an implementation that is more efficient than reading the data.
336  *
337  * This function is optional for inherited classes, as the default implementation
338  * emulates it using read.
339  *
340  * If @cancellable is not %NULL, then the operation can be cancelled by
341  * triggering the cancellable object from another thread. If the operation
342  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
343  * operation was partially finished when the operation was cancelled the
344  * partial result will be returned, without an error.
345  *
346  * Returns: Number of bytes skipped, or -1 on error
347  **/
348 gssize
349 g_input_stream_skip (GInputStream  *stream,
350                      gsize          count,
351                      GCancellable  *cancellable,
352                      GError       **error)
353 {
354   GInputStreamClass *class;
355   gssize res;
356
357   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
358
359   if (count == 0)
360     return 0;
361
362   if (((gssize) count) < 0)
363     {
364       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
365                    _("Too large count value passed to %s"), G_STRFUNC);
366       return -1;
367     }
368   
369   class = G_INPUT_STREAM_GET_CLASS (stream);
370
371   if (!g_input_stream_set_pending (stream, error))
372     return -1;
373
374   if (cancellable)
375     g_cancellable_push_current (cancellable);
376   
377   res = class->skip (stream, count, cancellable, error);
378
379   if (cancellable)
380     g_cancellable_pop_current (cancellable);
381   
382   g_input_stream_clear_pending (stream);
383
384   return res;
385 }
386
387 static gssize
388 g_input_stream_real_skip (GInputStream  *stream,
389                           gsize          count,
390                           GCancellable  *cancellable,
391                           GError       **error)
392 {
393   GInputStreamClass *class;
394   gssize ret, read_bytes;
395   char buffer[8192];
396   GError *my_error;
397
398   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
399     {
400       if (g_seekable_seek (G_SEEKABLE (stream),
401                            count,
402                            G_SEEK_CUR,
403                            cancellable,
404                            NULL))
405         return count;
406     }
407
408   /* If not seekable, or seek failed, fall back to reading data: */
409
410   class = G_INPUT_STREAM_GET_CLASS (stream);
411
412   read_bytes = 0;
413   while (1)
414     {
415       my_error = NULL;
416
417       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
418                             cancellable, &my_error);
419       if (ret == -1)
420         {
421           if (read_bytes > 0 &&
422               my_error->domain == G_IO_ERROR &&
423               my_error->code == G_IO_ERROR_CANCELLED)
424             {
425               g_error_free (my_error);
426               return read_bytes;
427             }
428
429           g_propagate_error (error, my_error);
430           return -1;
431         }
432
433       count -= ret;
434       read_bytes += ret;
435
436       if (ret == 0 || count == 0)
437         return read_bytes;
438     }
439 }
440
441 /**
442  * g_input_stream_close:
443  * @stream: A #GInputStream.
444  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
445  * @error: location to store the error occurring, or %NULL to ignore
446  *
447  * Closes the stream, releasing resources related to it.
448  *
449  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
450  * Closing a stream multiple times will not return an error.
451  *
452  * Streams will be automatically closed when the last reference
453  * is dropped, but you might want to call this function to make sure 
454  * resources are released as early as possible.
455  *
456  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
457  * open after the stream is closed. See the documentation for the individual
458  * stream for details.
459  *
460  * On failure the first error that happened will be reported, but the close
461  * operation will finish as much as possible. A stream that failed to
462  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
463  * is important to check and report the error to the user.
464  *
465  * If @cancellable is not %NULL, then the operation can be cancelled by
466  * triggering the cancellable object from another thread. If the operation
467  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
468  * Cancelling a close will still leave the stream closed, but some streams
469  * can use a faster close that doesn't block to e.g. check errors. 
470  *
471  * Returns: %TRUE on success, %FALSE on failure
472  **/
473 gboolean
474 g_input_stream_close (GInputStream  *stream,
475                       GCancellable  *cancellable,
476                       GError       **error)
477 {
478   GInputStreamClass *class;
479   gboolean res;
480
481   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
482
483   class = G_INPUT_STREAM_GET_CLASS (stream);
484
485   if (stream->priv->closed)
486     return TRUE;
487
488   res = TRUE;
489
490   if (!g_input_stream_set_pending (stream, error))
491     return FALSE;
492
493   if (cancellable)
494     g_cancellable_push_current (cancellable);
495
496   if (class->close_fn)
497     res = class->close_fn (stream, cancellable, error);
498
499   if (cancellable)
500     g_cancellable_pop_current (cancellable);
501
502   g_input_stream_clear_pending (stream);
503   
504   stream->priv->closed = TRUE;
505   
506   return res;
507 }
508
509 static void
510 async_ready_callback_wrapper (GObject      *source_object,
511                               GAsyncResult *res,
512                               gpointer      user_data)
513 {
514   GInputStream *stream = G_INPUT_STREAM (source_object);
515
516   g_input_stream_clear_pending (stream);
517   if (stream->priv->outstanding_callback)
518     (*stream->priv->outstanding_callback) (source_object, res, user_data);
519   g_object_unref (stream);
520 }
521
522 static void
523 async_ready_close_callback_wrapper (GObject      *source_object,
524                                     GAsyncResult *res,
525                                     gpointer      user_data)
526 {
527   GInputStream *stream = G_INPUT_STREAM (source_object);
528
529   g_input_stream_clear_pending (stream);
530   stream->priv->closed = TRUE;
531   if (stream->priv->outstanding_callback)
532     (*stream->priv->outstanding_callback) (source_object, res, user_data);
533   g_object_unref (stream);
534 }
535
536 /**
537  * g_input_stream_read_async:
538  * @stream: A #GInputStream.
539  * @buffer: (array length=count) (element-type guint8): a buffer to
540  *     read data into (which should be at least count bytes long).
541  * @count: the number of bytes that will be read from the stream
542  * @io_priority: the [I/O priority][io-priority]
543  * of the request. 
544  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
545  * @callback: (scope async): callback to call when the request is satisfied
546  * @user_data: (closure): the data to pass to callback function
547  *
548  * Request an asynchronous read of @count bytes from the stream into the buffer
549  * starting at @buffer. When the operation is finished @callback will be called. 
550  * You can then call g_input_stream_read_finish() to get the result of the 
551  * operation.
552  *
553  * During an async request no other sync and async calls are allowed on @stream, and will
554  * result in %G_IO_ERROR_PENDING errors. 
555  *
556  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
557  *
558  * On success, the number of bytes read into the buffer will be passed to the
559  * callback. It is not an error if this is not the same as the requested size, as it
560  * can happen e.g. near the end of a file, but generally we try to read
561  * as many bytes as requested. Zero is returned on end of file
562  * (or if @count is zero),  but never otherwise.
563  *
564  * Any outstanding i/o request with higher priority (lower numerical value) will
565  * be executed before an outstanding request with lower priority. Default
566  * priority is %G_PRIORITY_DEFAULT.
567  *
568  * The asyncronous methods have a default fallback that uses threads to implement
569  * asynchronicity, so they are optional for inheriting classes. However, if you
570  * override one you must override all.
571  **/
572 void
573 g_input_stream_read_async (GInputStream        *stream,
574                            void                *buffer,
575                            gsize                count,
576                            int                  io_priority,
577                            GCancellable        *cancellable,
578                            GAsyncReadyCallback  callback,
579                            gpointer             user_data)
580 {
581   GInputStreamClass *class;
582   GError *error = NULL;
583
584   g_return_if_fail (G_IS_INPUT_STREAM (stream));
585   g_return_if_fail (buffer != NULL);
586
587   if (count == 0)
588     {
589       GTask *task;
590
591       task = g_task_new (stream, cancellable, callback, user_data);
592       g_task_set_source_tag (task, g_input_stream_read_async);
593       g_task_return_int (task, 0);
594       g_object_unref (task);
595       return;
596     }
597   
598   if (((gssize) count) < 0)
599     {
600       g_task_report_new_error (stream, callback, user_data,
601                                g_input_stream_read_async,
602                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
603                                _("Too large count value passed to %s"),
604                                G_STRFUNC);
605       return;
606     }
607
608   if (!g_input_stream_set_pending (stream, &error))
609     {
610       g_task_report_error (stream, callback, user_data,
611                            g_input_stream_read_async,
612                            error);
613       return;
614     }
615
616   class = G_INPUT_STREAM_GET_CLASS (stream);
617   stream->priv->outstanding_callback = callback;
618   g_object_ref (stream);
619   class->read_async (stream, buffer, count, io_priority, cancellable,
620                      async_ready_callback_wrapper, user_data);
621 }
622
623 /**
624  * g_input_stream_read_finish:
625  * @stream: a #GInputStream.
626  * @result: a #GAsyncResult.
627  * @error: a #GError location to store the error occurring, or %NULL to 
628  * ignore.
629  * 
630  * Finishes an asynchronous stream read operation. 
631  * 
632  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
633  **/
634 gssize
635 g_input_stream_read_finish (GInputStream  *stream,
636                             GAsyncResult  *result,
637                             GError       **error)
638 {
639   GInputStreamClass *class;
640   
641   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
642   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
643
644   if (g_async_result_legacy_propagate_error (result, error))
645     return -1;
646   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
647     return g_task_propagate_int (G_TASK (result), error);
648
649   class = G_INPUT_STREAM_GET_CLASS (stream);
650   return class->read_finish (stream, result, error);
651 }
652
653 static void
654 read_bytes_callback (GObject      *stream,
655                      GAsyncResult *result,
656                      gpointer      user_data)
657 {
658   GTask *task = user_data;
659   guchar *buf = g_task_get_task_data (task);
660   GError *error = NULL;
661   gssize nread;
662   GBytes *bytes = NULL;
663
664   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
665                                       result, &error);
666   if (nread == -1)
667     {
668       g_free (buf);
669       g_task_return_error (task, error);
670     }
671   else if (nread == 0)
672     {
673       g_free (buf);
674       bytes = g_bytes_new_static ("", 0);
675     }
676   else
677     bytes = g_bytes_new_take (buf, nread);
678
679   if (bytes)
680     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
681
682   g_object_unref (task);
683 }
684
685 /**
686  * g_input_stream_read_bytes_async:
687  * @stream: A #GInputStream.
688  * @count: the number of bytes that will be read from the stream
689  * @io_priority: the [I/O priority][io-priority] of the request
690  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
691  * @callback: (scope async): callback to call when the request is satisfied
692  * @user_data: (closure): the data to pass to callback function
693  *
694  * Request an asynchronous read of @count bytes from the stream into a
695  * new #GBytes. When the operation is finished @callback will be
696  * called. You can then call g_input_stream_read_bytes_finish() to get the
697  * result of the operation.
698  *
699  * During an async request no other sync and async calls are allowed
700  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
701  *
702  * A value of @count larger than %G_MAXSSIZE will cause a
703  * %G_IO_ERROR_INVALID_ARGUMENT error.
704  *
705  * On success, the new #GBytes will be passed to the callback. It is
706  * not an error if this is smaller than the requested size, as it can
707  * happen e.g. near the end of a file, but generally we try to read as
708  * many bytes as requested. Zero is returned on end of file (or if
709  * @count is zero), but never otherwise.
710  *
711  * Any outstanding I/O request with higher priority (lower numerical
712  * value) will be executed before an outstanding request with lower
713  * priority. Default priority is %G_PRIORITY_DEFAULT.
714  **/
715 void
716 g_input_stream_read_bytes_async (GInputStream          *stream,
717                                  gsize                  count,
718                                  int                    io_priority,
719                                  GCancellable          *cancellable,
720                                  GAsyncReadyCallback    callback,
721                                  gpointer               user_data)
722 {
723   GTask *task;
724   guchar *buf;
725
726   task = g_task_new (stream, cancellable, callback, user_data);
727   buf = g_malloc (count);
728   g_task_set_task_data (task, buf, NULL);
729
730   g_input_stream_read_async (stream, buf, count,
731                              io_priority, cancellable,
732                              read_bytes_callback, task);
733 }
734
735 /**
736  * g_input_stream_read_bytes_finish:
737  * @stream: a #GInputStream.
738  * @result: a #GAsyncResult.
739  * @error: a #GError location to store the error occurring, or %NULL to
740  *   ignore.
741  *
742  * Finishes an asynchronous stream read-into-#GBytes operation.
743  *
744  * Returns: the newly-allocated #GBytes, or %NULL on error
745  **/
746 GBytes *
747 g_input_stream_read_bytes_finish (GInputStream  *stream,
748                                   GAsyncResult  *result,
749                                   GError       **error)
750 {
751   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
752   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
753
754   return g_task_propagate_pointer (G_TASK (result), error);
755 }
756
757 /**
758  * g_input_stream_skip_async:
759  * @stream: A #GInputStream.
760  * @count: the number of bytes that will be skipped from the stream
761  * @io_priority: the [I/O priority][io-priority] of the request
762  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
763  * @callback: (scope async): callback to call when the request is satisfied
764  * @user_data: (closure): the data to pass to callback function
765  *
766  * Request an asynchronous skip of @count bytes from the stream.
767  * When the operation is finished @callback will be called.
768  * You can then call g_input_stream_skip_finish() to get the result
769  * of the operation.
770  *
771  * During an async request no other sync and async calls are allowed,
772  * and will result in %G_IO_ERROR_PENDING errors.
773  *
774  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
775  *
776  * On success, the number of bytes skipped will be passed to the callback.
777  * It is not an error if this is not the same as the requested size, as it
778  * can happen e.g. near the end of a file, but generally we try to skip
779  * as many bytes as requested. Zero is returned on end of file
780  * (or if @count is zero), but never otherwise.
781  *
782  * Any outstanding i/o request with higher priority (lower numerical value)
783  * will be executed before an outstanding request with lower priority.
784  * Default priority is %G_PRIORITY_DEFAULT.
785  *
786  * The asynchronous methods have a default fallback that uses threads to
787  * implement asynchronicity, so they are optional for inheriting classes.
788  * However, if you override one, you must override all.
789  **/
790 void
791 g_input_stream_skip_async (GInputStream        *stream,
792                            gsize                count,
793                            int                  io_priority,
794                            GCancellable        *cancellable,
795                            GAsyncReadyCallback  callback,
796                            gpointer             user_data)
797 {
798   GInputStreamClass *class;
799   GError *error = NULL;
800
801   g_return_if_fail (G_IS_INPUT_STREAM (stream));
802
803   if (count == 0)
804     {
805       GTask *task;
806
807       task = g_task_new (stream, cancellable, callback, user_data);
808       g_task_set_source_tag (task, g_input_stream_skip_async);
809       g_task_return_int (task, 0);
810       g_object_unref (task);
811       return;
812     }
813   
814   if (((gssize) count) < 0)
815     {
816       g_task_report_new_error (stream, callback, user_data,
817                                g_input_stream_skip_async,
818                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
819                                _("Too large count value passed to %s"),
820                                G_STRFUNC);
821       return;
822     }
823
824   if (!g_input_stream_set_pending (stream, &error))
825     {
826       g_task_report_error (stream, callback, user_data,
827                            g_input_stream_skip_async,
828                            error);
829       return;
830     }
831
832   class = G_INPUT_STREAM_GET_CLASS (stream);
833   stream->priv->outstanding_callback = callback;
834   g_object_ref (stream);
835   class->skip_async (stream, count, io_priority, cancellable,
836                      async_ready_callback_wrapper, user_data);
837 }
838
839 /**
840  * g_input_stream_skip_finish:
841  * @stream: a #GInputStream.
842  * @result: a #GAsyncResult.
843  * @error: a #GError location to store the error occurring, or %NULL to 
844  * ignore.
845  * 
846  * Finishes a stream skip operation.
847  * 
848  * Returns: the size of the bytes skipped, or %-1 on error.
849  **/
850 gssize
851 g_input_stream_skip_finish (GInputStream  *stream,
852                             GAsyncResult  *result,
853                             GError       **error)
854 {
855   GInputStreamClass *class;
856
857   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
858   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
859
860   if (g_async_result_legacy_propagate_error (result, error))
861     return -1;
862   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
863     return g_task_propagate_int (G_TASK (result), error);
864
865   class = G_INPUT_STREAM_GET_CLASS (stream);
866   return class->skip_finish (stream, result, error);
867 }
868
869 /**
870  * g_input_stream_close_async:
871  * @stream: A #GInputStream.
872  * @io_priority: the [I/O priority][io-priority] of the request
873  * @cancellable: (allow-none): optional cancellable object
874  * @callback: (scope async): callback to call when the request is satisfied
875  * @user_data: (closure): the data to pass to callback function
876  *
877  * Requests an asynchronous closes of the stream, releasing resources related to it.
878  * When the operation is finished @callback will be called. 
879  * You can then call g_input_stream_close_finish() to get the result of the 
880  * operation.
881  *
882  * For behaviour details see g_input_stream_close().
883  *
884  * The asyncronous methods have a default fallback that uses threads to implement
885  * asynchronicity, so they are optional for inheriting classes. However, if you
886  * override one you must override all.
887  **/
888 void
889 g_input_stream_close_async (GInputStream        *stream,
890                             int                  io_priority,
891                             GCancellable        *cancellable,
892                             GAsyncReadyCallback  callback,
893                             gpointer             user_data)
894 {
895   GInputStreamClass *class;
896   GError *error = NULL;
897
898   g_return_if_fail (G_IS_INPUT_STREAM (stream));
899
900   if (stream->priv->closed)
901     {
902       GTask *task;
903
904       task = g_task_new (stream, cancellable, callback, user_data);
905       g_task_set_source_tag (task, g_input_stream_close_async);
906       g_task_return_boolean (task, TRUE);
907       g_object_unref (task);
908       return;
909     }
910
911   if (!g_input_stream_set_pending (stream, &error))
912     {
913       g_task_report_error (stream, callback, user_data,
914                            g_input_stream_close_async,
915                            error);
916       return;
917     }
918   
919   class = G_INPUT_STREAM_GET_CLASS (stream);
920   stream->priv->outstanding_callback = callback;
921   g_object_ref (stream);
922   class->close_async (stream, io_priority, cancellable,
923                       async_ready_close_callback_wrapper, user_data);
924 }
925
926 /**
927  * g_input_stream_close_finish:
928  * @stream: a #GInputStream.
929  * @result: a #GAsyncResult.
930  * @error: a #GError location to store the error occurring, or %NULL to 
931  * ignore.
932  * 
933  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
934  * 
935  * Returns: %TRUE if the stream was closed successfully.
936  **/
937 gboolean
938 g_input_stream_close_finish (GInputStream  *stream,
939                              GAsyncResult  *result,
940                              GError       **error)
941 {
942   GInputStreamClass *class;
943
944   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
945   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
946
947   if (g_async_result_legacy_propagate_error (result, error))
948     return FALSE;
949   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
950     return g_task_propagate_boolean (G_TASK (result), error);
951
952   class = G_INPUT_STREAM_GET_CLASS (stream);
953   return class->close_finish (stream, result, error);
954 }
955
956 /**
957  * g_input_stream_is_closed:
958  * @stream: input stream.
959  * 
960  * Checks if an input stream is closed.
961  * 
962  * Returns: %TRUE if the stream is closed.
963  **/
964 gboolean
965 g_input_stream_is_closed (GInputStream *stream)
966 {
967   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
968   
969   return stream->priv->closed;
970 }
971  
972 /**
973  * g_input_stream_has_pending:
974  * @stream: input stream.
975  * 
976  * Checks if an input stream has pending actions.
977  * 
978  * Returns: %TRUE if @stream has pending actions.
979  **/  
980 gboolean
981 g_input_stream_has_pending (GInputStream *stream)
982 {
983   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
984   
985   return stream->priv->pending;
986 }
987
988 /**
989  * g_input_stream_set_pending:
990  * @stream: input stream
991  * @error: a #GError location to store the error occurring, or %NULL to 
992  * ignore.
993  * 
994  * Sets @stream to have actions pending. If the pending flag is
995  * already set or @stream is closed, it will return %FALSE and set
996  * @error.
997  *
998  * Returns: %TRUE if pending was previously unset and is now set.
999  **/
1000 gboolean
1001 g_input_stream_set_pending (GInputStream *stream, GError **error)
1002 {
1003   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1004   
1005   if (stream->priv->closed)
1006     {
1007       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1008                            _("Stream is already closed"));
1009       return FALSE;
1010     }
1011   
1012   if (stream->priv->pending)
1013     {
1014       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1015                 /* Translators: This is an error you get if there is already an
1016                  * operation running against this stream when you try to start
1017                  * one */
1018                  _("Stream has outstanding operation"));
1019       return FALSE;
1020     }
1021   
1022   stream->priv->pending = TRUE;
1023   return TRUE;
1024 }
1025
1026 /**
1027  * g_input_stream_clear_pending:
1028  * @stream: input stream
1029  * 
1030  * Clears the pending flag on @stream.
1031  **/
1032 void
1033 g_input_stream_clear_pending (GInputStream *stream)
1034 {
1035   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1036   
1037   stream->priv->pending = FALSE;
1038 }
1039
1040 /**
1041  * g_input_stream_async_read_is_via_threads:
1042  * @stream: input stream
1043  *
1044  * Checks if an input stream's read_async function uses threads.
1045  *
1046  * Returns: %TRUE if @stream's read_async function uses threads.
1047  **/
1048 gboolean
1049 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1050 {
1051   GInputStreamClass *class;
1052
1053   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1054
1055   class = G_INPUT_STREAM_GET_CLASS (stream);
1056
1057   return (class->read_async == g_input_stream_real_read_async &&
1058       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1059         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1060 }
1061
1062 /********************************************
1063  *   Default implementation of async ops    *
1064  ********************************************/
1065
1066 typedef struct {
1067   void   *buffer;
1068   gsize   count;
1069 } ReadData;
1070
1071 static void
1072 free_read_data (ReadData *op)
1073 {
1074   g_slice_free (ReadData, op);
1075 }
1076
1077 static void
1078 read_async_thread (GTask        *task,
1079                    gpointer      source_object,
1080                    gpointer      task_data,
1081                    GCancellable *cancellable)
1082 {
1083   GInputStream *stream = source_object;
1084   ReadData *op = task_data;
1085   GInputStreamClass *class;
1086   GError *error = NULL;
1087   gssize nread;
1088  
1089   class = G_INPUT_STREAM_GET_CLASS (stream);
1090
1091   nread = class->read_fn (stream,
1092                           op->buffer, op->count,
1093                           g_task_get_cancellable (task),
1094                           &error);
1095   if (nread == -1)
1096     g_task_return_error (task, error);
1097   else
1098     g_task_return_int (task, nread);
1099 }
1100
1101 static void read_async_pollable (GPollableInputStream *stream,
1102                                  GTask                *task);
1103
1104 static gboolean
1105 read_async_pollable_ready (GPollableInputStream *stream,
1106                            gpointer              user_data)
1107 {
1108   GTask *task = user_data;
1109
1110   read_async_pollable (stream, task);
1111   return FALSE;
1112 }
1113
1114 static void
1115 read_async_pollable (GPollableInputStream *stream,
1116                      GTask                *task)
1117 {
1118   ReadData *op = g_task_get_task_data (task);
1119   GError *error = NULL;
1120   gssize nread;
1121
1122   if (g_task_return_error_if_cancelled (task))
1123     return;
1124
1125   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1126     read_nonblocking (stream, op->buffer, op->count, &error);
1127
1128   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1129     {
1130       GSource *source;
1131
1132       g_error_free (error);
1133
1134       source = g_pollable_input_stream_create_source (stream,
1135                                                       g_task_get_cancellable (task));
1136       g_task_attach_source (task, source,
1137                             (GSourceFunc) read_async_pollable_ready);
1138       g_source_unref (source);
1139       return;
1140     }
1141
1142   if (nread == -1)
1143     g_task_return_error (task, error);
1144   else
1145     g_task_return_int (task, nread);
1146   /* g_input_stream_real_read_async() unrefs task */
1147 }
1148
1149
1150 static void
1151 g_input_stream_real_read_async (GInputStream        *stream,
1152                                 void                *buffer,
1153                                 gsize                count,
1154                                 int                  io_priority,
1155                                 GCancellable        *cancellable,
1156                                 GAsyncReadyCallback  callback,
1157                                 gpointer             user_data)
1158 {
1159   GTask *task;
1160   ReadData *op;
1161   
1162   op = g_slice_new0 (ReadData);
1163   task = g_task_new (stream, cancellable, callback, user_data);
1164   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1165   g_task_set_priority (task, io_priority);
1166   op->buffer = buffer;
1167   op->count = count;
1168
1169   if (!g_input_stream_async_read_is_via_threads (stream))
1170     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1171   else
1172     g_task_run_in_thread (task, read_async_thread);
1173   g_object_unref (task);
1174 }
1175
1176 static gssize
1177 g_input_stream_real_read_finish (GInputStream  *stream,
1178                                  GAsyncResult  *result,
1179                                  GError       **error)
1180 {
1181   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1182
1183   return g_task_propagate_int (G_TASK (result), error);
1184 }
1185
1186
1187 static void
1188 skip_async_thread (GTask        *task,
1189                    gpointer      source_object,
1190                    gpointer      task_data,
1191                    GCancellable *cancellable)
1192 {
1193   GInputStream *stream = source_object;
1194   gsize count = GPOINTER_TO_SIZE (task_data);
1195   GInputStreamClass *class;
1196   GError *error = NULL;
1197   gssize ret;
1198
1199   class = G_INPUT_STREAM_GET_CLASS (stream);
1200   ret = class->skip (stream, count,
1201                      g_task_get_cancellable (task),
1202                      &error);
1203   if (ret == -1)
1204     g_task_return_error (task, error);
1205   else
1206     g_task_return_int (task, ret);
1207 }
1208
1209 typedef struct {
1210   char buffer[8192];
1211   gsize count;
1212   gsize count_skipped;
1213 } SkipFallbackAsyncData;
1214
1215 static void
1216 skip_callback_wrapper (GObject      *source_object,
1217                        GAsyncResult *res,
1218                        gpointer      user_data)
1219 {
1220   GInputStreamClass *class;
1221   GTask *task = user_data;
1222   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1223   GError *error = NULL;
1224   gssize ret;
1225
1226   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1227
1228   if (ret > 0)
1229     {
1230       data->count -= ret;
1231       data->count_skipped += ret;
1232
1233       if (data->count > 0)
1234         {
1235           class = G_INPUT_STREAM_GET_CLASS (source_object);
1236           class->read_async (G_INPUT_STREAM (source_object),
1237                              data->buffer, MIN (8192, data->count),
1238                              g_task_get_priority (task),
1239                              g_task_get_cancellable (task),
1240                              skip_callback_wrapper, task);
1241           return;
1242         }
1243     }
1244
1245   if (ret == -1 &&
1246       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1247       data->count_skipped)
1248     {
1249       /* No error, return partial read */
1250       g_clear_error (&error);
1251     }
1252
1253   if (error)
1254     g_task_return_error (task, error);
1255   else
1256     g_task_return_int (task, data->count_skipped);
1257   g_object_unref (task);
1258  }
1259
1260 static void
1261 g_input_stream_real_skip_async (GInputStream        *stream,
1262                                 gsize                count,
1263                                 int                  io_priority,
1264                                 GCancellable        *cancellable,
1265                                 GAsyncReadyCallback  callback,
1266                                 gpointer             user_data)
1267 {
1268   GInputStreamClass *class;
1269   SkipFallbackAsyncData *data;
1270   GTask *task;
1271
1272   class = G_INPUT_STREAM_GET_CLASS (stream);
1273
1274   task = g_task_new (stream, cancellable, callback, user_data);
1275   g_task_set_priority (task, io_priority);
1276
1277   if (g_input_stream_async_read_is_via_threads (stream))
1278     {
1279       /* Read is thread-using async fallback.
1280        * Make skip use threads too, so that we can use a possible sync skip
1281        * implementation. */
1282       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1283
1284       g_task_run_in_thread (task, skip_async_thread);
1285       g_object_unref (task);
1286     }
1287   else
1288     {
1289       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1290       
1291       /* There is a custom async read function, lets use that. */
1292       data = g_new (SkipFallbackAsyncData, 1);
1293       data->count = count;
1294       data->count_skipped = 0;
1295       g_task_set_task_data (task, data, g_free);
1296       g_task_set_check_cancellable (task, FALSE);
1297       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1298                          skip_callback_wrapper, task);
1299     }
1300
1301 }
1302
1303 static gssize
1304 g_input_stream_real_skip_finish (GInputStream  *stream,
1305                                  GAsyncResult  *result,
1306                                  GError       **error)
1307 {
1308   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1309
1310   return g_task_propagate_int (G_TASK (result), error);
1311 }
1312
1313 static void
1314 close_async_thread (GTask        *task,
1315                     gpointer      source_object,
1316                     gpointer      task_data,
1317                     GCancellable *cancellable)
1318 {
1319   GInputStream *stream = source_object;
1320   GInputStreamClass *class;
1321   GError *error = NULL;
1322   gboolean result;
1323
1324   class = G_INPUT_STREAM_GET_CLASS (stream);
1325   if (class->close_fn)
1326     {
1327       result = class->close_fn (stream,
1328                                 g_task_get_cancellable (task),
1329                                 &error);
1330       if (!result)
1331         {
1332           g_task_return_error (task, error);
1333           return;
1334         }
1335     }
1336
1337   g_task_return_boolean (task, TRUE);
1338 }
1339
1340 static void
1341 g_input_stream_real_close_async (GInputStream        *stream,
1342                                  int                  io_priority,
1343                                  GCancellable        *cancellable,
1344                                  GAsyncReadyCallback  callback,
1345                                  gpointer             user_data)
1346 {
1347   GTask *task;
1348
1349   task = g_task_new (stream, cancellable, callback, user_data);
1350   g_task_set_check_cancellable (task, FALSE);
1351   g_task_set_priority (task, io_priority);
1352   
1353   g_task_run_in_thread (task, close_async_thread);
1354   g_object_unref (task);
1355 }
1356
1357 static gboolean
1358 g_input_stream_real_close_finish (GInputStream  *stream,
1359                                   GAsyncResult  *result,
1360                                   GError       **error)
1361 {
1362   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1363
1364   return g_task_propagate_boolean (G_TASK (result), error);
1365 }