glib/tests: Clean up inclusion of unistd.h
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gioprivate.h"
29 #include "gseekable.h"
30 #include "gcancellable.h"
31 #include "gasyncresult.h"
32 #include "gioerror.h"
33 #include "gpollableinputstream.h"
34
35 /**
36  * SECTION:ginputstream
37  * @short_description: Base class for implementing streaming input
38  * @include: gio/gio.h
39  *
40  * #GInputStream has functions to read from a stream (g_input_stream_read()),
41  * to close a stream (g_input_stream_close()) and to skip some content
42  * (g_input_stream_skip()). 
43  *
44  * To copy the content of an input stream to an output stream without 
45  * manually handling the reads and writes, use g_output_stream_splice(). 
46  *
47  * All of these functions have async variants too.
48  **/
49
50 struct _GInputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   GAsyncReadyCallback outstanding_callback;
54 };
55
56 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
57
58 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
59                                                   gsize                 count,
60                                                   GCancellable         *cancellable,
61                                                   GError              **error);
62 static void     g_input_stream_real_read_async   (GInputStream         *stream,
63                                                   void                 *buffer,
64                                                   gsize                 count,
65                                                   int                   io_priority,
66                                                   GCancellable         *cancellable,
67                                                   GAsyncReadyCallback   callback,
68                                                   gpointer              user_data);
69 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
70                                                   GAsyncResult         *result,
71                                                   GError              **error);
72 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
73                                                   gsize                 count,
74                                                   int                   io_priority,
75                                                   GCancellable         *cancellable,
76                                                   GAsyncReadyCallback   callback,
77                                                   gpointer              data);
78 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
79                                                   GAsyncResult         *result,
80                                                   GError              **error);
81 static void     g_input_stream_real_close_async  (GInputStream         *stream,
82                                                   int                   io_priority,
83                                                   GCancellable         *cancellable,
84                                                   GAsyncReadyCallback   callback,
85                                                   gpointer              data);
86 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
87                                                   GAsyncResult         *result,
88                                                   GError              **error);
89
90 static void
91 g_input_stream_dispose (GObject *object)
92 {
93   GInputStream *stream;
94
95   stream = G_INPUT_STREAM (object);
96   
97   if (!stream->priv->closed)
98     g_input_stream_close (stream, NULL, NULL);
99
100   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
101 }
102
103
104 static void
105 g_input_stream_class_init (GInputStreamClass *klass)
106 {
107   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
108   
109   gobject_class->dispose = g_input_stream_dispose;
110   
111   klass->skip = g_input_stream_real_skip;
112   klass->read_async = g_input_stream_real_read_async;
113   klass->read_finish = g_input_stream_real_read_finish;
114   klass->skip_async = g_input_stream_real_skip_async;
115   klass->skip_finish = g_input_stream_real_skip_finish;
116   klass->close_async = g_input_stream_real_close_async;
117   klass->close_finish = g_input_stream_real_close_finish;
118 }
119
120 static void
121 g_input_stream_init (GInputStream *stream)
122 {
123   stream->priv = g_input_stream_get_instance_private (stream);
124 }
125
126 /**
127  * g_input_stream_read:
128  * @stream: a #GInputStream.
129  * @buffer: (array length=count) (element-type guint8): a buffer to
130  *     read data into (which should be at least count bytes long).
131  * @count: the number of bytes that will be read from the stream
132  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
133  * @error: location to store the error occurring, or %NULL to ignore
134  *
135  * Tries to read @count bytes from the stream into the buffer starting at
136  * @buffer. Will block during this read.
137  * 
138  * If count is zero returns zero and does nothing. A value of @count
139  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
140  *
141  * On success, the number of bytes read into the buffer is returned.
142  * It is not an error if this is not the same as the requested size, as it
143  * can happen e.g. near the end of a file. Zero is returned on end of file
144  * (or if @count is zero),  but never otherwise.
145  *
146  * If @cancellable is not %NULL, then the operation can be cancelled by
147  * triggering the cancellable object from another thread. If the operation
148  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
149  * operation was partially finished when the operation was cancelled the
150  * partial result will be returned, without an error.
151  *
152  * On error -1 is returned and @error is set accordingly.
153  * 
154  * Return value: Number of bytes read, or -1 on error, or 0 on end of file.
155  **/
156 gssize
157 g_input_stream_read  (GInputStream  *stream,
158                       void          *buffer,
159                       gsize          count,
160                       GCancellable  *cancellable,
161                       GError       **error)
162 {
163   GInputStreamClass *class;
164   gssize res;
165
166   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
167   g_return_val_if_fail (buffer != NULL, 0);
168
169   if (count == 0)
170     return 0;
171   
172   if (((gssize) count) < 0)
173     {
174       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
175                    _("Too large count value passed to %s"), G_STRFUNC);
176       return -1;
177     }
178
179   class = G_INPUT_STREAM_GET_CLASS (stream);
180
181   if (class->read_fn == NULL) 
182     {
183       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
184                            _("Input stream doesn't implement read"));
185       return -1;
186     }
187
188   if (!g_input_stream_set_pending (stream, error))
189     return -1;
190
191   if (cancellable)
192     g_cancellable_push_current (cancellable);
193   
194   res = class->read_fn (stream, buffer, count, cancellable, error);
195
196   if (cancellable)
197     g_cancellable_pop_current (cancellable);
198   
199   g_input_stream_clear_pending (stream);
200
201   return res;
202 }
203
204 /**
205  * g_input_stream_read_all:
206  * @stream: a #GInputStream.
207  * @buffer: (array length=count) (element-type guint8): a buffer to
208  *     read data into (which should be at least count bytes long).
209  * @count: the number of bytes that will be read from the stream
210  * @bytes_read: (out): location to store the number of bytes that was read from the stream
211  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
212  * @error: location to store the error occurring, or %NULL to ignore
213  *
214  * Tries to read @count bytes from the stream into the buffer starting at
215  * @buffer. Will block during this read.
216  *
217  * This function is similar to g_input_stream_read(), except it tries to
218  * read as many bytes as requested, only stopping on an error or end of stream.
219  *
220  * On a successful read of @count bytes, or if we reached the end of the
221  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
222  * read into @buffer.
223  * 
224  * If there is an error during the operation %FALSE is returned and @error
225  * is set to indicate the error status, @bytes_read is updated to contain
226  * the number of bytes read into @buffer before the error occurred.
227  *
228  * Return value: %TRUE on success, %FALSE if there was an error
229  **/
230 gboolean
231 g_input_stream_read_all (GInputStream  *stream,
232                          void          *buffer,
233                          gsize          count,
234                          gsize         *bytes_read,
235                          GCancellable  *cancellable,
236                          GError       **error)
237 {
238   gsize _bytes_read;
239   gssize res;
240
241   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
242   g_return_val_if_fail (buffer != NULL, FALSE);
243
244   _bytes_read = 0;
245   while (_bytes_read < count)
246     {
247       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
248                                  cancellable, error);
249       if (res == -1)
250         {
251           if (bytes_read)
252             *bytes_read = _bytes_read;
253           return FALSE;
254         }
255       
256       if (res == 0)
257         break;
258
259       _bytes_read += res;
260     }
261
262   if (bytes_read)
263     *bytes_read = _bytes_read;
264   return TRUE;
265 }
266
267 /**
268  * g_input_stream_read_bytes:
269  * @stream: a #GInputStream.
270  * @count: maximum number of bytes that will be read from the stream. Common
271  * values include 4096 and 8192.
272  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
273  * @error: location to store the error occurring, or %NULL to ignore
274  *
275  * Like g_input_stream_read(), this tries to read @count bytes from
276  * the stream in a blocking fashion. However, rather than reading into
277  * a user-supplied buffer, this will create a new #GBytes containing
278  * the data that was read. This may be easier to use from language
279  * bindings.
280  *
281  * If count is zero, returns a zero-length #GBytes and does nothing. A
282  * value of @count larger than %G_MAXSSIZE will cause a
283  * %G_IO_ERROR_INVALID_ARGUMENT error.
284  *
285  * On success, a new #GBytes is returned. It is not an error if the
286  * size of this object is not the same as the requested size, as it
287  * can happen e.g. near the end of a file. A zero-length #GBytes is
288  * returned on end of file (or if @count is zero), but never
289  * otherwise.
290  *
291  * If @cancellable is not %NULL, then the operation can be cancelled by
292  * triggering the cancellable object from another thread. If the operation
293  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
294  * operation was partially finished when the operation was cancelled the
295  * partial result will be returned, without an error.
296  *
297  * On error %NULL is returned and @error is set accordingly.
298  *
299  * Return value: a new #GBytes, or %NULL on error
300  **/
301 GBytes *
302 g_input_stream_read_bytes (GInputStream  *stream,
303                            gsize          count,
304                            GCancellable  *cancellable,
305                            GError       **error)
306 {
307   guchar *buf;
308   gssize nread;
309
310   buf = g_malloc (count);
311   nread = g_input_stream_read (stream, buf, count, cancellable, error);
312   if (nread == -1)
313     {
314       g_free (buf);
315       return NULL;
316     }
317   else if (nread == 0)
318     {
319       g_free (buf);
320       return g_bytes_new_static ("", 0);
321     }
322   else
323     return g_bytes_new_take (buf, nread);
324 }
325
326 /**
327  * g_input_stream_skip:
328  * @stream: a #GInputStream.
329  * @count: the number of bytes that will be skipped from the stream
330  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
331  * @error: location to store the error occurring, or %NULL to ignore
332  *
333  * Tries to skip @count bytes from the stream. Will block during the operation.
334  *
335  * This is identical to g_input_stream_read(), from a behaviour standpoint,
336  * but the bytes that are skipped are not returned to the user. Some
337  * streams have an implementation that is more efficient than reading the data.
338  *
339  * This function is optional for inherited classes, as the default implementation
340  * emulates it using read.
341  *
342  * If @cancellable is not %NULL, then the operation can be cancelled by
343  * triggering the cancellable object from another thread. If the operation
344  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
345  * operation was partially finished when the operation was cancelled the
346  * partial result will be returned, without an error.
347  *
348  * Return value: Number of bytes skipped, or -1 on error
349  **/
350 gssize
351 g_input_stream_skip (GInputStream  *stream,
352                      gsize          count,
353                      GCancellable  *cancellable,
354                      GError       **error)
355 {
356   GInputStreamClass *class;
357   gssize res;
358
359   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
360
361   if (count == 0)
362     return 0;
363
364   if (((gssize) count) < 0)
365     {
366       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
367                    _("Too large count value passed to %s"), G_STRFUNC);
368       return -1;
369     }
370   
371   class = G_INPUT_STREAM_GET_CLASS (stream);
372
373   if (!g_input_stream_set_pending (stream, error))
374     return -1;
375
376   if (cancellable)
377     g_cancellable_push_current (cancellable);
378   
379   res = class->skip (stream, count, cancellable, error);
380
381   if (cancellable)
382     g_cancellable_pop_current (cancellable);
383   
384   g_input_stream_clear_pending (stream);
385
386   return res;
387 }
388
389 static gssize
390 g_input_stream_real_skip (GInputStream  *stream,
391                           gsize          count,
392                           GCancellable  *cancellable,
393                           GError       **error)
394 {
395   GInputStreamClass *class;
396   gssize ret, read_bytes;
397   char buffer[8192];
398   GError *my_error;
399
400   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
401     {
402       if (g_seekable_seek (G_SEEKABLE (stream),
403                            count,
404                            G_SEEK_CUR,
405                            cancellable,
406                            NULL))
407         return count;
408     }
409
410   /* If not seekable, or seek failed, fall back to reading data: */
411
412   class = G_INPUT_STREAM_GET_CLASS (stream);
413
414   read_bytes = 0;
415   while (1)
416     {
417       my_error = NULL;
418
419       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
420                             cancellable, &my_error);
421       if (ret == -1)
422         {
423           if (read_bytes > 0 &&
424               my_error->domain == G_IO_ERROR &&
425               my_error->code == G_IO_ERROR_CANCELLED)
426             {
427               g_error_free (my_error);
428               return read_bytes;
429             }
430
431           g_propagate_error (error, my_error);
432           return -1;
433         }
434
435       count -= ret;
436       read_bytes += ret;
437
438       if (ret == 0 || count == 0)
439         return read_bytes;
440     }
441 }
442
443 /**
444  * g_input_stream_close:
445  * @stream: A #GInputStream.
446  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
447  * @error: location to store the error occurring, or %NULL to ignore
448  *
449  * Closes the stream, releasing resources related to it.
450  *
451  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
452  * Closing a stream multiple times will not return an error.
453  *
454  * Streams will be automatically closed when the last reference
455  * is dropped, but you might want to call this function to make sure 
456  * resources are released as early as possible.
457  *
458  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
459  * open after the stream is closed. See the documentation for the individual
460  * stream for details.
461  *
462  * On failure the first error that happened will be reported, but the close
463  * operation will finish as much as possible. A stream that failed to
464  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
465  * is important to check and report the error to the user.
466  *
467  * If @cancellable is not %NULL, then the operation can be cancelled by
468  * triggering the cancellable object from another thread. If the operation
469  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
470  * Cancelling a close will still leave the stream closed, but some streams
471  * can use a faster close that doesn't block to e.g. check errors. 
472  *
473  * Return value: %TRUE on success, %FALSE on failure
474  **/
475 gboolean
476 g_input_stream_close (GInputStream  *stream,
477                       GCancellable  *cancellable,
478                       GError       **error)
479 {
480   GInputStreamClass *class;
481   gboolean res;
482
483   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
484
485   class = G_INPUT_STREAM_GET_CLASS (stream);
486
487   if (stream->priv->closed)
488     return TRUE;
489
490   res = TRUE;
491
492   if (!g_input_stream_set_pending (stream, error))
493     return FALSE;
494
495   if (cancellable)
496     g_cancellable_push_current (cancellable);
497
498   if (class->close_fn)
499     res = class->close_fn (stream, cancellable, error);
500
501   if (cancellable)
502     g_cancellable_pop_current (cancellable);
503
504   g_input_stream_clear_pending (stream);
505   
506   stream->priv->closed = TRUE;
507   
508   return res;
509 }
510
511 static void
512 async_ready_callback_wrapper (GObject      *source_object,
513                               GAsyncResult *res,
514                               gpointer      user_data)
515 {
516   GInputStream *stream = G_INPUT_STREAM (source_object);
517
518   g_input_stream_clear_pending (stream);
519   if (stream->priv->outstanding_callback)
520     (*stream->priv->outstanding_callback) (source_object, res, user_data);
521   g_object_unref (stream);
522 }
523
524 static void
525 async_ready_close_callback_wrapper (GObject      *source_object,
526                                     GAsyncResult *res,
527                                     gpointer      user_data)
528 {
529   GInputStream *stream = G_INPUT_STREAM (source_object);
530
531   g_input_stream_clear_pending (stream);
532   stream->priv->closed = TRUE;
533   if (stream->priv->outstanding_callback)
534     (*stream->priv->outstanding_callback) (source_object, res, user_data);
535   g_object_unref (stream);
536 }
537
538 /**
539  * g_input_stream_read_async:
540  * @stream: A #GInputStream.
541  * @buffer: (array length=count) (element-type guint8): a buffer to
542  *     read data into (which should be at least count bytes long).
543  * @count: the number of bytes that will be read from the stream
544  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
545  * of the request. 
546  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
547  * @callback: (scope async): callback to call when the request is satisfied
548  * @user_data: (closure): the data to pass to callback function
549  *
550  * Request an asynchronous read of @count bytes from the stream into the buffer
551  * starting at @buffer. When the operation is finished @callback will be called. 
552  * You can then call g_input_stream_read_finish() to get the result of the 
553  * operation.
554  *
555  * During an async request no other sync and async calls are allowed on @stream, and will
556  * result in %G_IO_ERROR_PENDING errors. 
557  *
558  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
559  *
560  * On success, the number of bytes read into the buffer will be passed to the
561  * callback. It is not an error if this is not the same as the requested size, as it
562  * can happen e.g. near the end of a file, but generally we try to read
563  * as many bytes as requested. Zero is returned on end of file
564  * (or if @count is zero),  but never otherwise.
565  *
566  * Any outstanding i/o request with higher priority (lower numerical value) will
567  * be executed before an outstanding request with lower priority. Default
568  * priority is %G_PRIORITY_DEFAULT.
569  *
570  * The asyncronous methods have a default fallback that uses threads to implement
571  * asynchronicity, so they are optional for inheriting classes. However, if you
572  * override one you must override all.
573  **/
574 void
575 g_input_stream_read_async (GInputStream        *stream,
576                            void                *buffer,
577                            gsize                count,
578                            int                  io_priority,
579                            GCancellable        *cancellable,
580                            GAsyncReadyCallback  callback,
581                            gpointer             user_data)
582 {
583   GInputStreamClass *class;
584   GError *error = NULL;
585
586   g_return_if_fail (G_IS_INPUT_STREAM (stream));
587   g_return_if_fail (buffer != NULL);
588
589   if (count == 0)
590     {
591       GTask *task;
592
593       task = g_task_new (stream, cancellable, callback, user_data);
594       g_task_set_source_tag (task, g_input_stream_read_async);
595       g_task_return_int (task, 0);
596       g_object_unref (task);
597       return;
598     }
599   
600   if (((gssize) count) < 0)
601     {
602       g_task_report_new_error (stream, callback, user_data,
603                                g_input_stream_read_async,
604                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
605                                _("Too large count value passed to %s"),
606                                G_STRFUNC);
607       return;
608     }
609
610   if (!g_input_stream_set_pending (stream, &error))
611     {
612       g_task_report_error (stream, callback, user_data,
613                            g_input_stream_read_async,
614                            error);
615       return;
616     }
617
618   class = G_INPUT_STREAM_GET_CLASS (stream);
619   stream->priv->outstanding_callback = callback;
620   g_object_ref (stream);
621   class->read_async (stream, buffer, count, io_priority, cancellable,
622                      async_ready_callback_wrapper, user_data);
623 }
624
625 /**
626  * g_input_stream_read_finish:
627  * @stream: a #GInputStream.
628  * @result: a #GAsyncResult.
629  * @error: a #GError location to store the error occurring, or %NULL to 
630  * ignore.
631  * 
632  * Finishes an asynchronous stream read operation. 
633  * 
634  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
635  **/
636 gssize
637 g_input_stream_read_finish (GInputStream  *stream,
638                             GAsyncResult  *result,
639                             GError       **error)
640 {
641   GInputStreamClass *class;
642   
643   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
644   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
645
646   if (g_async_result_legacy_propagate_error (result, error))
647     return -1;
648   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
649     return g_task_propagate_int (G_TASK (result), error);
650
651   class = G_INPUT_STREAM_GET_CLASS (stream);
652   return class->read_finish (stream, result, error);
653 }
654
655 static void
656 read_bytes_callback (GObject      *stream,
657                      GAsyncResult *result,
658                      gpointer      user_data)
659 {
660   GTask *task = user_data;
661   guchar *buf = g_task_get_task_data (task);
662   GError *error = NULL;
663   gssize nread;
664   GBytes *bytes = NULL;
665
666   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
667                                       result, &error);
668   if (nread == -1)
669     {
670       g_free (buf);
671       g_task_return_error (task, error);
672     }
673   else if (nread == 0)
674     {
675       g_free (buf);
676       bytes = g_bytes_new_static ("", 0);
677     }
678   else
679     bytes = g_bytes_new_take (buf, nread);
680
681   if (bytes)
682     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
683
684   g_object_unref (task);
685 }
686
687 /**
688  * g_input_stream_read_bytes_async:
689  * @stream: A #GInputStream.
690  * @count: the number of bytes that will be read from the stream
691  * @io_priority: the <link linkend="io-priority">I/O priority</link>
692  *   of the request.
693  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
694  * @callback: (scope async): callback to call when the request is satisfied
695  * @user_data: (closure): the data to pass to callback function
696  *
697  * Request an asynchronous read of @count bytes from the stream into a
698  * new #GBytes. When the operation is finished @callback will be
699  * called. You can then call g_input_stream_read_bytes_finish() to get the
700  * result of the operation.
701  *
702  * During an async request no other sync and async calls are allowed
703  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
704  *
705  * A value of @count larger than %G_MAXSSIZE will cause a
706  * %G_IO_ERROR_INVALID_ARGUMENT error.
707  *
708  * On success, the new #GBytes will be passed to the callback. It is
709  * not an error if this is smaller than the requested size, as it can
710  * happen e.g. near the end of a file, but generally we try to read as
711  * many bytes as requested. Zero is returned on end of file (or if
712  * @count is zero), but never otherwise.
713  *
714  * Any outstanding I/O request with higher priority (lower numerical
715  * value) will be executed before an outstanding request with lower
716  * priority. Default priority is %G_PRIORITY_DEFAULT.
717  **/
718 void
719 g_input_stream_read_bytes_async (GInputStream          *stream,
720                                  gsize                  count,
721                                  int                    io_priority,
722                                  GCancellable          *cancellable,
723                                  GAsyncReadyCallback    callback,
724                                  gpointer               user_data)
725 {
726   GTask *task;
727   guchar *buf;
728
729   task = g_task_new (stream, cancellable, callback, user_data);
730   buf = g_malloc (count);
731   g_task_set_task_data (task, buf, NULL);
732
733   g_input_stream_read_async (stream, buf, count,
734                              io_priority, cancellable,
735                              read_bytes_callback, task);
736 }
737
738 /**
739  * g_input_stream_read_bytes_finish:
740  * @stream: a #GInputStream.
741  * @result: a #GAsyncResult.
742  * @error: a #GError location to store the error occurring, or %NULL to
743  *   ignore.
744  *
745  * Finishes an asynchronous stream read-into-#GBytes operation.
746  *
747  * Returns: the newly-allocated #GBytes, or %NULL on error
748  **/
749 GBytes *
750 g_input_stream_read_bytes_finish (GInputStream  *stream,
751                                   GAsyncResult  *result,
752                                   GError       **error)
753 {
754   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
755   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
756
757   return g_task_propagate_pointer (G_TASK (result), error);
758 }
759
760 /**
761  * g_input_stream_skip_async:
762  * @stream: A #GInputStream.
763  * @count: the number of bytes that will be skipped from the stream
764  * @io_priority: the <link linkend="io-priority">I/O priority</link>
765  * of the request.
766  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
767  * @callback: (scope async): callback to call when the request is satisfied
768  * @user_data: (closure): the data to pass to callback function
769  *
770  * Request an asynchronous skip of @count bytes from the stream.
771  * When the operation is finished @callback will be called.
772  * You can then call g_input_stream_skip_finish() to get the result
773  * of the operation.
774  *
775  * During an async request no other sync and async calls are allowed,
776  * and will result in %G_IO_ERROR_PENDING errors.
777  *
778  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
779  *
780  * On success, the number of bytes skipped will be passed to the callback.
781  * It is not an error if this is not the same as the requested size, as it
782  * can happen e.g. near the end of a file, but generally we try to skip
783  * as many bytes as requested. Zero is returned on end of file
784  * (or if @count is zero), but never otherwise.
785  *
786  * Any outstanding i/o request with higher priority (lower numerical value)
787  * will be executed before an outstanding request with lower priority.
788  * Default priority is %G_PRIORITY_DEFAULT.
789  *
790  * The asynchronous methods have a default fallback that uses threads to
791  * implement asynchronicity, so they are optional for inheriting classes.
792  * However, if you override one, you must override all.
793  **/
794 void
795 g_input_stream_skip_async (GInputStream        *stream,
796                            gsize                count,
797                            int                  io_priority,
798                            GCancellable        *cancellable,
799                            GAsyncReadyCallback  callback,
800                            gpointer             user_data)
801 {
802   GInputStreamClass *class;
803   GError *error = NULL;
804
805   g_return_if_fail (G_IS_INPUT_STREAM (stream));
806
807   if (count == 0)
808     {
809       GTask *task;
810
811       task = g_task_new (stream, cancellable, callback, user_data);
812       g_task_set_source_tag (task, g_input_stream_skip_async);
813       g_task_return_int (task, 0);
814       g_object_unref (task);
815       return;
816     }
817   
818   if (((gssize) count) < 0)
819     {
820       g_task_report_new_error (stream, callback, user_data,
821                                g_input_stream_skip_async,
822                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
823                                _("Too large count value passed to %s"),
824                                G_STRFUNC);
825       return;
826     }
827
828   if (!g_input_stream_set_pending (stream, &error))
829     {
830       g_task_report_error (stream, callback, user_data,
831                            g_input_stream_skip_async,
832                            error);
833       return;
834     }
835
836   class = G_INPUT_STREAM_GET_CLASS (stream);
837   stream->priv->outstanding_callback = callback;
838   g_object_ref (stream);
839   class->skip_async (stream, count, io_priority, cancellable,
840                      async_ready_callback_wrapper, user_data);
841 }
842
843 /**
844  * g_input_stream_skip_finish:
845  * @stream: a #GInputStream.
846  * @result: a #GAsyncResult.
847  * @error: a #GError location to store the error occurring, or %NULL to 
848  * ignore.
849  * 
850  * Finishes a stream skip operation.
851  * 
852  * Returns: the size of the bytes skipped, or %-1 on error.
853  **/
854 gssize
855 g_input_stream_skip_finish (GInputStream  *stream,
856                             GAsyncResult  *result,
857                             GError       **error)
858 {
859   GInputStreamClass *class;
860
861   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
862   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
863
864   if (g_async_result_legacy_propagate_error (result, error))
865     return -1;
866   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
867     return g_task_propagate_int (G_TASK (result), error);
868
869   class = G_INPUT_STREAM_GET_CLASS (stream);
870   return class->skip_finish (stream, result, error);
871 }
872
873 /**
874  * g_input_stream_close_async:
875  * @stream: A #GInputStream.
876  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
877  * of the request. 
878  * @cancellable: (allow-none): optional cancellable object
879  * @callback: (scope async): callback to call when the request is satisfied
880  * @user_data: (closure): the data to pass to callback function
881  *
882  * Requests an asynchronous closes of the stream, releasing resources related to it.
883  * When the operation is finished @callback will be called. 
884  * You can then call g_input_stream_close_finish() to get the result of the 
885  * operation.
886  *
887  * For behaviour details see g_input_stream_close().
888  *
889  * The asyncronous methods have a default fallback that uses threads to implement
890  * asynchronicity, so they are optional for inheriting classes. However, if you
891  * override one you must override all.
892  **/
893 void
894 g_input_stream_close_async (GInputStream        *stream,
895                             int                  io_priority,
896                             GCancellable        *cancellable,
897                             GAsyncReadyCallback  callback,
898                             gpointer             user_data)
899 {
900   GInputStreamClass *class;
901   GError *error = NULL;
902
903   g_return_if_fail (G_IS_INPUT_STREAM (stream));
904
905   if (stream->priv->closed)
906     {
907       GTask *task;
908
909       task = g_task_new (stream, cancellable, callback, user_data);
910       g_task_set_source_tag (task, g_input_stream_close_async);
911       g_task_return_boolean (task, TRUE);
912       g_object_unref (task);
913       return;
914     }
915
916   if (!g_input_stream_set_pending (stream, &error))
917     {
918       g_task_report_error (stream, callback, user_data,
919                            g_input_stream_close_async,
920                            error);
921       return;
922     }
923   
924   class = G_INPUT_STREAM_GET_CLASS (stream);
925   stream->priv->outstanding_callback = callback;
926   g_object_ref (stream);
927   class->close_async (stream, io_priority, cancellable,
928                       async_ready_close_callback_wrapper, user_data);
929 }
930
931 /**
932  * g_input_stream_close_finish:
933  * @stream: a #GInputStream.
934  * @result: a #GAsyncResult.
935  * @error: a #GError location to store the error occurring, or %NULL to 
936  * ignore.
937  * 
938  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
939  * 
940  * Returns: %TRUE if the stream was closed successfully.
941  **/
942 gboolean
943 g_input_stream_close_finish (GInputStream  *stream,
944                              GAsyncResult  *result,
945                              GError       **error)
946 {
947   GInputStreamClass *class;
948
949   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
950   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
951
952   if (g_async_result_legacy_propagate_error (result, error))
953     return FALSE;
954   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
955     return g_task_propagate_boolean (G_TASK (result), error);
956
957   class = G_INPUT_STREAM_GET_CLASS (stream);
958   return class->close_finish (stream, result, error);
959 }
960
961 /**
962  * g_input_stream_is_closed:
963  * @stream: input stream.
964  * 
965  * Checks if an input stream is closed.
966  * 
967  * Returns: %TRUE if the stream is closed.
968  **/
969 gboolean
970 g_input_stream_is_closed (GInputStream *stream)
971 {
972   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
973   
974   return stream->priv->closed;
975 }
976  
977 /**
978  * g_input_stream_has_pending:
979  * @stream: input stream.
980  * 
981  * Checks if an input stream has pending actions.
982  * 
983  * Returns: %TRUE if @stream has pending actions.
984  **/  
985 gboolean
986 g_input_stream_has_pending (GInputStream *stream)
987 {
988   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
989   
990   return stream->priv->pending;
991 }
992
993 /**
994  * g_input_stream_set_pending:
995  * @stream: input stream
996  * @error: a #GError location to store the error occurring, or %NULL to 
997  * ignore.
998  * 
999  * Sets @stream to have actions pending. If the pending flag is
1000  * already set or @stream is closed, it will return %FALSE and set
1001  * @error.
1002  *
1003  * Return value: %TRUE if pending was previously unset and is now set.
1004  **/
1005 gboolean
1006 g_input_stream_set_pending (GInputStream *stream, GError **error)
1007 {
1008   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1009   
1010   if (stream->priv->closed)
1011     {
1012       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1013                            _("Stream is already closed"));
1014       return FALSE;
1015     }
1016   
1017   if (stream->priv->pending)
1018     {
1019       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1020                 /* Translators: This is an error you get if there is already an
1021                  * operation running against this stream when you try to start
1022                  * one */
1023                  _("Stream has outstanding operation"));
1024       return FALSE;
1025     }
1026   
1027   stream->priv->pending = TRUE;
1028   return TRUE;
1029 }
1030
1031 /**
1032  * g_input_stream_clear_pending:
1033  * @stream: input stream
1034  * 
1035  * Clears the pending flag on @stream.
1036  **/
1037 void
1038 g_input_stream_clear_pending (GInputStream *stream)
1039 {
1040   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1041   
1042   stream->priv->pending = FALSE;
1043 }
1044
1045 /**
1046  * g_input_stream_async_read_is_via_threads:
1047  * @stream: input stream
1048  *
1049  * Checks if an input stream's read_async function uses threads.
1050  *
1051  * Returns: %TRUE if @stream's read_async function uses threads.
1052  **/
1053 gboolean
1054 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1055 {
1056   GInputStreamClass *class;
1057
1058   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1059
1060   class = G_INPUT_STREAM_GET_CLASS (stream);
1061
1062   return (class->read_async == g_input_stream_real_read_async &&
1063       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1064         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1065 }
1066
1067 /********************************************
1068  *   Default implementation of async ops    *
1069  ********************************************/
1070
1071 typedef struct {
1072   void   *buffer;
1073   gsize   count;
1074 } ReadData;
1075
1076 static void
1077 free_read_data (ReadData *op)
1078 {
1079   g_slice_free (ReadData, op);
1080 }
1081
1082 static void
1083 read_async_thread (GTask        *task,
1084                    gpointer      source_object,
1085                    gpointer      task_data,
1086                    GCancellable *cancellable)
1087 {
1088   GInputStream *stream = source_object;
1089   ReadData *op = task_data;
1090   GInputStreamClass *class;
1091   GError *error = NULL;
1092   gssize nread;
1093  
1094   class = G_INPUT_STREAM_GET_CLASS (stream);
1095
1096   nread = class->read_fn (stream,
1097                           op->buffer, op->count,
1098                           g_task_get_cancellable (task),
1099                           &error);
1100   if (nread == -1)
1101     g_task_return_error (task, error);
1102   else
1103     g_task_return_int (task, nread);
1104 }
1105
1106 static void read_async_pollable (GPollableInputStream *stream,
1107                                  GTask                *task);
1108
1109 static gboolean
1110 read_async_pollable_ready (GPollableInputStream *stream,
1111                            gpointer              user_data)
1112 {
1113   GTask *task = user_data;
1114
1115   read_async_pollable (stream, task);
1116   return FALSE;
1117 }
1118
1119 static void
1120 read_async_pollable (GPollableInputStream *stream,
1121                      GTask                *task)
1122 {
1123   ReadData *op = g_task_get_task_data (task);
1124   GError *error = NULL;
1125   gssize nread;
1126
1127   if (g_task_return_error_if_cancelled (task))
1128     return;
1129
1130   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1131     read_nonblocking (stream, op->buffer, op->count, &error);
1132
1133   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1134     {
1135       GSource *source;
1136
1137       g_error_free (error);
1138
1139       source = g_pollable_input_stream_create_source (stream,
1140                                                       g_task_get_cancellable (task));
1141       g_task_attach_source (task, source,
1142                             (GSourceFunc) read_async_pollable_ready);
1143       g_source_unref (source);
1144       return;
1145     }
1146
1147   if (nread == -1)
1148     g_task_return_error (task, error);
1149   else
1150     g_task_return_int (task, nread);
1151   /* g_input_stream_real_read_async() unrefs task */
1152 }
1153
1154
1155 static void
1156 g_input_stream_real_read_async (GInputStream        *stream,
1157                                 void                *buffer,
1158                                 gsize                count,
1159                                 int                  io_priority,
1160                                 GCancellable        *cancellable,
1161                                 GAsyncReadyCallback  callback,
1162                                 gpointer             user_data)
1163 {
1164   GTask *task;
1165   ReadData *op;
1166   
1167   op = g_slice_new0 (ReadData);
1168   task = g_task_new (stream, cancellable, callback, user_data);
1169   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1170   g_task_set_priority (task, io_priority);
1171   op->buffer = buffer;
1172   op->count = count;
1173
1174   if (!g_input_stream_async_read_is_via_threads (stream))
1175     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1176   else
1177     g_task_run_in_thread (task, read_async_thread);
1178   g_object_unref (task);
1179 }
1180
1181 static gssize
1182 g_input_stream_real_read_finish (GInputStream  *stream,
1183                                  GAsyncResult  *result,
1184                                  GError       **error)
1185 {
1186   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1187
1188   return g_task_propagate_int (G_TASK (result), error);
1189 }
1190
1191
1192 static void
1193 skip_async_thread (GTask        *task,
1194                    gpointer      source_object,
1195                    gpointer      task_data,
1196                    GCancellable *cancellable)
1197 {
1198   GInputStream *stream = source_object;
1199   gsize count = GPOINTER_TO_SIZE (task_data);
1200   GInputStreamClass *class;
1201   GError *error = NULL;
1202   gssize ret;
1203
1204   class = G_INPUT_STREAM_GET_CLASS (stream);
1205   ret = class->skip (stream, count,
1206                      g_task_get_cancellable (task),
1207                      &error);
1208   if (ret == -1)
1209     g_task_return_error (task, error);
1210   else
1211     g_task_return_int (task, ret);
1212 }
1213
1214 typedef struct {
1215   char buffer[8192];
1216   gsize count;
1217   gsize count_skipped;
1218 } SkipFallbackAsyncData;
1219
1220 static void
1221 skip_callback_wrapper (GObject      *source_object,
1222                        GAsyncResult *res,
1223                        gpointer      user_data)
1224 {
1225   GInputStreamClass *class;
1226   GTask *task = user_data;
1227   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1228   GError *error = NULL;
1229   gssize ret;
1230
1231   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1232
1233   if (ret > 0)
1234     {
1235       data->count -= ret;
1236       data->count_skipped += ret;
1237
1238       if (data->count > 0)
1239         {
1240           class = G_INPUT_STREAM_GET_CLASS (source_object);
1241           class->read_async (G_INPUT_STREAM (source_object),
1242                              data->buffer, MIN (8192, data->count),
1243                              g_task_get_priority (task),
1244                              g_task_get_cancellable (task),
1245                              skip_callback_wrapper, task);
1246           return;
1247         }
1248     }
1249
1250   if (ret == -1 &&
1251       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1252       data->count_skipped)
1253     {
1254       /* No error, return partial read */
1255       g_clear_error (&error);
1256     }
1257
1258   if (error)
1259     g_task_return_error (task, error);
1260   else
1261     g_task_return_int (task, data->count_skipped);
1262   g_object_unref (task);
1263  }
1264
1265 static void
1266 g_input_stream_real_skip_async (GInputStream        *stream,
1267                                 gsize                count,
1268                                 int                  io_priority,
1269                                 GCancellable        *cancellable,
1270                                 GAsyncReadyCallback  callback,
1271                                 gpointer             user_data)
1272 {
1273   GInputStreamClass *class;
1274   SkipFallbackAsyncData *data;
1275   GTask *task;
1276
1277   class = G_INPUT_STREAM_GET_CLASS (stream);
1278
1279   task = g_task_new (stream, cancellable, callback, user_data);
1280   g_task_set_priority (task, io_priority);
1281
1282   if (g_input_stream_async_read_is_via_threads (stream))
1283     {
1284       /* Read is thread-using async fallback.
1285        * Make skip use threads too, so that we can use a possible sync skip
1286        * implementation. */
1287       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1288
1289       g_task_run_in_thread (task, skip_async_thread);
1290       g_object_unref (task);
1291     }
1292   else
1293     {
1294       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1295       
1296       /* There is a custom async read function, lets use that. */
1297       data = g_new (SkipFallbackAsyncData, 1);
1298       data->count = count;
1299       data->count_skipped = 0;
1300       g_task_set_task_data (task, data, g_free);
1301       g_task_set_check_cancellable (task, FALSE);
1302       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1303                          skip_callback_wrapper, task);
1304     }
1305
1306 }
1307
1308 static gssize
1309 g_input_stream_real_skip_finish (GInputStream  *stream,
1310                                  GAsyncResult  *result,
1311                                  GError       **error)
1312 {
1313   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1314
1315   return g_task_propagate_int (G_TASK (result), error);
1316 }
1317
1318 static void
1319 close_async_thread (GTask        *task,
1320                     gpointer      source_object,
1321                     gpointer      task_data,
1322                     GCancellable *cancellable)
1323 {
1324   GInputStream *stream = source_object;
1325   GInputStreamClass *class;
1326   GError *error = NULL;
1327   gboolean result;
1328
1329   class = G_INPUT_STREAM_GET_CLASS (stream);
1330   if (class->close_fn)
1331     {
1332       result = class->close_fn (stream,
1333                                 g_task_get_cancellable (task),
1334                                 &error);
1335       if (!result)
1336         {
1337           g_task_return_error (task, error);
1338           return;
1339         }
1340     }
1341
1342   g_task_return_boolean (task, TRUE);
1343 }
1344
1345 static void
1346 g_input_stream_real_close_async (GInputStream        *stream,
1347                                  int                  io_priority,
1348                                  GCancellable        *cancellable,
1349                                  GAsyncReadyCallback  callback,
1350                                  gpointer             user_data)
1351 {
1352   GTask *task;
1353
1354   task = g_task_new (stream, cancellable, callback, user_data);
1355   g_task_set_check_cancellable (task, FALSE);
1356   g_task_set_priority (task, io_priority);
1357   
1358   g_task_run_in_thread (task, close_async_thread);
1359   g_object_unref (task);
1360 }
1361
1362 static gboolean
1363 g_input_stream_real_close_finish (GInputStream  *stream,
1364                                   GAsyncResult  *result,
1365                                   GError       **error)
1366 {
1367   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1368
1369   return g_task_propagate_boolean (G_TASK (result), error);
1370 }