116457da332da6ba4023454096e128cbaaa06709
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GInputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   GAsyncReadyCallback outstanding_callback;
52 };
53
54 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
55
56 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
57                                                   gsize                 count,
58                                                   GCancellable         *cancellable,
59                                                   GError              **error);
60 static void     g_input_stream_real_read_async   (GInputStream         *stream,
61                                                   void                 *buffer,
62                                                   gsize                 count,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              user_data);
67 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
71                                                   gsize                 count,
72                                                   int                   io_priority,
73                                                   GCancellable         *cancellable,
74                                                   GAsyncReadyCallback   callback,
75                                                   gpointer              data);
76 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
77                                                   GAsyncResult         *result,
78                                                   GError              **error);
79 static void     g_input_stream_real_close_async  (GInputStream         *stream,
80                                                   int                   io_priority,
81                                                   GCancellable         *cancellable,
82                                                   GAsyncReadyCallback   callback,
83                                                   gpointer              data);
84 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
85                                                   GAsyncResult         *result,
86                                                   GError              **error);
87
88 static void
89 g_input_stream_dispose (GObject *object)
90 {
91   GInputStream *stream;
92
93   stream = G_INPUT_STREAM (object);
94   
95   if (!stream->priv->closed)
96     g_input_stream_close (stream, NULL, NULL);
97
98   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
99 }
100
101
102 static void
103 g_input_stream_class_init (GInputStreamClass *klass)
104 {
105   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
106   
107   gobject_class->dispose = g_input_stream_dispose;
108   
109   klass->skip = g_input_stream_real_skip;
110   klass->read_async = g_input_stream_real_read_async;
111   klass->read_finish = g_input_stream_real_read_finish;
112   klass->skip_async = g_input_stream_real_skip_async;
113   klass->skip_finish = g_input_stream_real_skip_finish;
114   klass->close_async = g_input_stream_real_close_async;
115   klass->close_finish = g_input_stream_real_close_finish;
116 }
117
118 static void
119 g_input_stream_init (GInputStream *stream)
120 {
121   stream->priv = g_input_stream_get_instance_private (stream);
122 }
123
124 /**
125  * g_input_stream_read:
126  * @stream: a #GInputStream.
127  * @buffer: (array length=count) (element-type guint8): a buffer to
128  *     read data into (which should be at least count bytes long).
129  * @count: the number of bytes that will be read from the stream
130  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
131  * @error: location to store the error occurring, or %NULL to ignore
132  *
133  * Tries to read @count bytes from the stream into the buffer starting at
134  * @buffer. Will block during this read.
135  * 
136  * If count is zero returns zero and does nothing. A value of @count
137  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
138  *
139  * On success, the number of bytes read into the buffer is returned.
140  * It is not an error if this is not the same as the requested size, as it
141  * can happen e.g. near the end of a file. Zero is returned on end of file
142  * (or if @count is zero),  but never otherwise.
143  *
144  * If @cancellable is not %NULL, then the operation can be cancelled by
145  * triggering the cancellable object from another thread. If the operation
146  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
147  * operation was partially finished when the operation was cancelled the
148  * partial result will be returned, without an error.
149  *
150  * On error -1 is returned and @error is set accordingly.
151  * 
152  * Return value: Number of bytes read, or -1 on error, or 0 on end of file.
153  **/
154 gssize
155 g_input_stream_read  (GInputStream  *stream,
156                       void          *buffer,
157                       gsize          count,
158                       GCancellable  *cancellable,
159                       GError       **error)
160 {
161   GInputStreamClass *class;
162   gssize res;
163
164   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
165   g_return_val_if_fail (buffer != NULL, 0);
166
167   if (count == 0)
168     return 0;
169   
170   if (((gssize) count) < 0)
171     {
172       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
173                    _("Too large count value passed to %s"), G_STRFUNC);
174       return -1;
175     }
176
177   class = G_INPUT_STREAM_GET_CLASS (stream);
178
179   if (class->read_fn == NULL) 
180     {
181       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
182                            _("Input stream doesn't implement read"));
183       return -1;
184     }
185
186   if (!g_input_stream_set_pending (stream, error))
187     return -1;
188
189   if (cancellable)
190     g_cancellable_push_current (cancellable);
191   
192   res = class->read_fn (stream, buffer, count, cancellable, error);
193
194   if (cancellable)
195     g_cancellable_pop_current (cancellable);
196   
197   g_input_stream_clear_pending (stream);
198
199   return res;
200 }
201
202 /**
203  * g_input_stream_read_all:
204  * @stream: a #GInputStream.
205  * @buffer: (array length=count) (element-type guint8): a buffer to
206  *     read data into (which should be at least count bytes long).
207  * @count: the number of bytes that will be read from the stream
208  * @bytes_read: (out): location to store the number of bytes that was read from the stream
209  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
210  * @error: location to store the error occurring, or %NULL to ignore
211  *
212  * Tries to read @count bytes from the stream into the buffer starting at
213  * @buffer. Will block during this read.
214  *
215  * This function is similar to g_input_stream_read(), except it tries to
216  * read as many bytes as requested, only stopping on an error or end of stream.
217  *
218  * On a successful read of @count bytes, or if we reached the end of the
219  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
220  * read into @buffer.
221  * 
222  * If there is an error during the operation %FALSE is returned and @error
223  * is set to indicate the error status, @bytes_read is updated to contain
224  * the number of bytes read into @buffer before the error occurred.
225  *
226  * Return value: %TRUE on success, %FALSE if there was an error
227  **/
228 gboolean
229 g_input_stream_read_all (GInputStream  *stream,
230                          void          *buffer,
231                          gsize          count,
232                          gsize         *bytes_read,
233                          GCancellable  *cancellable,
234                          GError       **error)
235 {
236   gsize _bytes_read;
237   gssize res;
238
239   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
240   g_return_val_if_fail (buffer != NULL, FALSE);
241
242   _bytes_read = 0;
243   while (_bytes_read < count)
244     {
245       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
246                                  cancellable, error);
247       if (res == -1)
248         {
249           if (bytes_read)
250             *bytes_read = _bytes_read;
251           return FALSE;
252         }
253       
254       if (res == 0)
255         break;
256
257       _bytes_read += res;
258     }
259
260   if (bytes_read)
261     *bytes_read = _bytes_read;
262   return TRUE;
263 }
264
265 /**
266  * g_input_stream_read_bytes:
267  * @stream: a #GInputStream.
268  * @count: maximum number of bytes that will be read from the stream. Common
269  * values include 4096 and 8192.
270  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
271  * @error: location to store the error occurring, or %NULL to ignore
272  *
273  * Like g_input_stream_read(), this tries to read @count bytes from
274  * the stream in a blocking fashion. However, rather than reading into
275  * a user-supplied buffer, this will create a new #GBytes containing
276  * the data that was read. This may be easier to use from language
277  * bindings.
278  *
279  * If count is zero, returns a zero-length #GBytes and does nothing. A
280  * value of @count larger than %G_MAXSSIZE will cause a
281  * %G_IO_ERROR_INVALID_ARGUMENT error.
282  *
283  * On success, a new #GBytes is returned. It is not an error if the
284  * size of this object is not the same as the requested size, as it
285  * can happen e.g. near the end of a file. A zero-length #GBytes is
286  * returned on end of file (or if @count is zero), but never
287  * otherwise.
288  *
289  * If @cancellable is not %NULL, then the operation can be cancelled by
290  * triggering the cancellable object from another thread. If the operation
291  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
292  * operation was partially finished when the operation was cancelled the
293  * partial result will be returned, without an error.
294  *
295  * On error %NULL is returned and @error is set accordingly.
296  *
297  * Return value: a new #GBytes, or %NULL on error
298  **/
299 GBytes *
300 g_input_stream_read_bytes (GInputStream  *stream,
301                            gsize          count,
302                            GCancellable  *cancellable,
303                            GError       **error)
304 {
305   guchar *buf;
306   gssize nread;
307
308   buf = g_malloc (count);
309   nread = g_input_stream_read (stream, buf, count, cancellable, error);
310   if (nread == -1)
311     {
312       g_free (buf);
313       return NULL;
314     }
315   else if (nread == 0)
316     {
317       g_free (buf);
318       return g_bytes_new_static ("", 0);
319     }
320   else
321     return g_bytes_new_take (buf, nread);
322 }
323
324 /**
325  * g_input_stream_skip:
326  * @stream: a #GInputStream.
327  * @count: the number of bytes that will be skipped from the stream
328  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
329  * @error: location to store the error occurring, or %NULL to ignore
330  *
331  * Tries to skip @count bytes from the stream. Will block during the operation.
332  *
333  * This is identical to g_input_stream_read(), from a behaviour standpoint,
334  * but the bytes that are skipped are not returned to the user. Some
335  * streams have an implementation that is more efficient than reading the data.
336  *
337  * This function is optional for inherited classes, as the default implementation
338  * emulates it using read.
339  *
340  * If @cancellable is not %NULL, then the operation can be cancelled by
341  * triggering the cancellable object from another thread. If the operation
342  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
343  * operation was partially finished when the operation was cancelled the
344  * partial result will be returned, without an error.
345  *
346  * Return value: Number of bytes skipped, or -1 on error
347  **/
348 gssize
349 g_input_stream_skip (GInputStream  *stream,
350                      gsize          count,
351                      GCancellable  *cancellable,
352                      GError       **error)
353 {
354   GInputStreamClass *class;
355   gssize res;
356
357   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
358
359   if (count == 0)
360     return 0;
361
362   if (((gssize) count) < 0)
363     {
364       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
365                    _("Too large count value passed to %s"), G_STRFUNC);
366       return -1;
367     }
368   
369   class = G_INPUT_STREAM_GET_CLASS (stream);
370
371   if (!g_input_stream_set_pending (stream, error))
372     return -1;
373
374   if (cancellable)
375     g_cancellable_push_current (cancellable);
376   
377   res = class->skip (stream, count, cancellable, error);
378
379   if (cancellable)
380     g_cancellable_pop_current (cancellable);
381   
382   g_input_stream_clear_pending (stream);
383
384   return res;
385 }
386
387 static gssize
388 g_input_stream_real_skip (GInputStream  *stream,
389                           gsize          count,
390                           GCancellable  *cancellable,
391                           GError       **error)
392 {
393   GInputStreamClass *class;
394   gssize ret, read_bytes;
395   char buffer[8192];
396   GError *my_error;
397
398   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
399     {
400       if (g_seekable_seek (G_SEEKABLE (stream),
401                            count,
402                            G_SEEK_CUR,
403                            cancellable,
404                            NULL))
405         return count;
406     }
407
408   /* If not seekable, or seek failed, fall back to reading data: */
409
410   class = G_INPUT_STREAM_GET_CLASS (stream);
411
412   read_bytes = 0;
413   while (1)
414     {
415       my_error = NULL;
416
417       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
418                             cancellable, &my_error);
419       if (ret == -1)
420         {
421           if (read_bytes > 0 &&
422               my_error->domain == G_IO_ERROR &&
423               my_error->code == G_IO_ERROR_CANCELLED)
424             {
425               g_error_free (my_error);
426               return read_bytes;
427             }
428
429           g_propagate_error (error, my_error);
430           return -1;
431         }
432
433       count -= ret;
434       read_bytes += ret;
435
436       if (ret == 0 || count == 0)
437         return read_bytes;
438     }
439 }
440
441 /**
442  * g_input_stream_close:
443  * @stream: A #GInputStream.
444  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
445  * @error: location to store the error occurring, or %NULL to ignore
446  *
447  * Closes the stream, releasing resources related to it.
448  *
449  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
450  * Closing a stream multiple times will not return an error.
451  *
452  * Streams will be automatically closed when the last reference
453  * is dropped, but you might want to call this function to make sure 
454  * resources are released as early as possible.
455  *
456  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
457  * open after the stream is closed. See the documentation for the individual
458  * stream for details.
459  *
460  * On failure the first error that happened will be reported, but the close
461  * operation will finish as much as possible. A stream that failed to
462  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
463  * is important to check and report the error to the user.
464  *
465  * If @cancellable is not %NULL, then the operation can be cancelled by
466  * triggering the cancellable object from another thread. If the operation
467  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
468  * Cancelling a close will still leave the stream closed, but some streams
469  * can use a faster close that doesn't block to e.g. check errors. 
470  *
471  * Return value: %TRUE on success, %FALSE on failure
472  **/
473 gboolean
474 g_input_stream_close (GInputStream  *stream,
475                       GCancellable  *cancellable,
476                       GError       **error)
477 {
478   GInputStreamClass *class;
479   gboolean res;
480
481   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
482
483   class = G_INPUT_STREAM_GET_CLASS (stream);
484
485   if (stream->priv->closed)
486     return TRUE;
487
488   res = TRUE;
489
490   if (!g_input_stream_set_pending (stream, error))
491     return FALSE;
492
493   if (cancellable)
494     g_cancellable_push_current (cancellable);
495
496   if (class->close_fn)
497     res = class->close_fn (stream, cancellable, error);
498
499   if (cancellable)
500     g_cancellable_pop_current (cancellable);
501
502   g_input_stream_clear_pending (stream);
503   
504   stream->priv->closed = TRUE;
505   
506   return res;
507 }
508
509 static void
510 async_ready_callback_wrapper (GObject      *source_object,
511                               GAsyncResult *res,
512                               gpointer      user_data)
513 {
514   GInputStream *stream = G_INPUT_STREAM (source_object);
515
516   g_input_stream_clear_pending (stream);
517   if (stream->priv->outstanding_callback)
518     (*stream->priv->outstanding_callback) (source_object, res, user_data);
519   g_object_unref (stream);
520 }
521
522 static void
523 async_ready_close_callback_wrapper (GObject      *source_object,
524                                     GAsyncResult *res,
525                                     gpointer      user_data)
526 {
527   GInputStream *stream = G_INPUT_STREAM (source_object);
528
529   g_input_stream_clear_pending (stream);
530   stream->priv->closed = TRUE;
531   if (stream->priv->outstanding_callback)
532     (*stream->priv->outstanding_callback) (source_object, res, user_data);
533   g_object_unref (stream);
534 }
535
536 /**
537  * g_input_stream_read_async:
538  * @stream: A #GInputStream.
539  * @buffer: (array length=count) (element-type guint8): a buffer to
540  *     read data into (which should be at least count bytes long).
541  * @count: the number of bytes that will be read from the stream
542  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
543  * of the request. 
544  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
545  * @callback: (scope async): callback to call when the request is satisfied
546  * @user_data: (closure): the data to pass to callback function
547  *
548  * Request an asynchronous read of @count bytes from the stream into the buffer
549  * starting at @buffer. When the operation is finished @callback will be called. 
550  * You can then call g_input_stream_read_finish() to get the result of the 
551  * operation.
552  *
553  * During an async request no other sync and async calls are allowed on @stream, and will
554  * result in %G_IO_ERROR_PENDING errors. 
555  *
556  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
557  *
558  * On success, the number of bytes read into the buffer will be passed to the
559  * callback. It is not an error if this is not the same as the requested size, as it
560  * can happen e.g. near the end of a file, but generally we try to read
561  * as many bytes as requested. Zero is returned on end of file
562  * (or if @count is zero),  but never otherwise.
563  *
564  * Any outstanding i/o request with higher priority (lower numerical value) will
565  * be executed before an outstanding request with lower priority. Default
566  * priority is %G_PRIORITY_DEFAULT.
567  *
568  * The asyncronous methods have a default fallback that uses threads to implement
569  * asynchronicity, so they are optional for inheriting classes. However, if you
570  * override one you must override all.
571  **/
572 void
573 g_input_stream_read_async (GInputStream        *stream,
574                            void                *buffer,
575                            gsize                count,
576                            int                  io_priority,
577                            GCancellable        *cancellable,
578                            GAsyncReadyCallback  callback,
579                            gpointer             user_data)
580 {
581   GInputStreamClass *class;
582   GError *error = NULL;
583
584   g_return_if_fail (G_IS_INPUT_STREAM (stream));
585   g_return_if_fail (buffer != NULL);
586
587   if (count == 0)
588     {
589       GTask *task;
590
591       task = g_task_new (stream, cancellable, callback, user_data);
592       g_task_set_source_tag (task, g_input_stream_read_async);
593       g_task_return_int (task, 0);
594       g_object_unref (task);
595       return;
596     }
597   
598   if (((gssize) count) < 0)
599     {
600       g_task_report_new_error (stream, callback, user_data,
601                                g_input_stream_read_async,
602                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
603                                _("Too large count value passed to %s"),
604                                G_STRFUNC);
605       return;
606     }
607
608   if (!g_input_stream_set_pending (stream, &error))
609     {
610       g_task_report_error (stream, callback, user_data,
611                            g_input_stream_read_async,
612                            error);
613       return;
614     }
615
616   class = G_INPUT_STREAM_GET_CLASS (stream);
617   stream->priv->outstanding_callback = callback;
618   g_object_ref (stream);
619   class->read_async (stream, buffer, count, io_priority, cancellable,
620                      async_ready_callback_wrapper, user_data);
621 }
622
623 /**
624  * g_input_stream_read_finish:
625  * @stream: a #GInputStream.
626  * @result: a #GAsyncResult.
627  * @error: a #GError location to store the error occurring, or %NULL to 
628  * ignore.
629  * 
630  * Finishes an asynchronous stream read operation. 
631  * 
632  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
633  **/
634 gssize
635 g_input_stream_read_finish (GInputStream  *stream,
636                             GAsyncResult  *result,
637                             GError       **error)
638 {
639   GInputStreamClass *class;
640   
641   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
642   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
643
644   if (g_async_result_legacy_propagate_error (result, error))
645     return -1;
646   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
647     return g_task_propagate_int (G_TASK (result), error);
648
649   class = G_INPUT_STREAM_GET_CLASS (stream);
650   return class->read_finish (stream, result, error);
651 }
652
653 static void
654 read_bytes_callback (GObject      *stream,
655                      GAsyncResult *result,
656                      gpointer      user_data)
657 {
658   GTask *task = user_data;
659   guchar *buf = g_task_get_task_data (task);
660   GError *error = NULL;
661   gssize nread;
662   GBytes *bytes = NULL;
663
664   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
665                                       result, &error);
666   if (nread == -1)
667     {
668       g_free (buf);
669       g_task_return_error (task, error);
670     }
671   else if (nread == 0)
672     {
673       g_free (buf);
674       bytes = g_bytes_new_static ("", 0);
675     }
676   else
677     bytes = g_bytes_new_take (buf, nread);
678
679   if (bytes)
680     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
681
682   g_object_unref (task);
683 }
684
685 /**
686  * g_input_stream_read_bytes_async:
687  * @stream: A #GInputStream.
688  * @count: the number of bytes that will be read from the stream
689  * @io_priority: the <link linkend="io-priority">I/O priority</link>
690  *   of the request.
691  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
692  * @callback: (scope async): callback to call when the request is satisfied
693  * @user_data: (closure): the data to pass to callback function
694  *
695  * Request an asynchronous read of @count bytes from the stream into a
696  * new #GBytes. When the operation is finished @callback will be
697  * called. You can then call g_input_stream_read_bytes_finish() to get the
698  * result of the operation.
699  *
700  * During an async request no other sync and async calls are allowed
701  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
702  *
703  * A value of @count larger than %G_MAXSSIZE will cause a
704  * %G_IO_ERROR_INVALID_ARGUMENT error.
705  *
706  * On success, the new #GBytes will be passed to the callback. It is
707  * not an error if this is smaller than the requested size, as it can
708  * happen e.g. near the end of a file, but generally we try to read as
709  * many bytes as requested. Zero is returned on end of file (or if
710  * @count is zero), but never otherwise.
711  *
712  * Any outstanding I/O request with higher priority (lower numerical
713  * value) will be executed before an outstanding request with lower
714  * priority. Default priority is %G_PRIORITY_DEFAULT.
715  **/
716 void
717 g_input_stream_read_bytes_async (GInputStream          *stream,
718                                  gsize                  count,
719                                  int                    io_priority,
720                                  GCancellable          *cancellable,
721                                  GAsyncReadyCallback    callback,
722                                  gpointer               user_data)
723 {
724   GTask *task;
725   guchar *buf;
726
727   task = g_task_new (stream, cancellable, callback, user_data);
728   buf = g_malloc (count);
729   g_task_set_task_data (task, buf, NULL);
730
731   g_input_stream_read_async (stream, buf, count,
732                              io_priority, cancellable,
733                              read_bytes_callback, task);
734 }
735
736 /**
737  * g_input_stream_read_bytes_finish:
738  * @stream: a #GInputStream.
739  * @result: a #GAsyncResult.
740  * @error: a #GError location to store the error occurring, or %NULL to
741  *   ignore.
742  *
743  * Finishes an asynchronous stream read-into-#GBytes operation.
744  *
745  * Returns: the newly-allocated #GBytes, or %NULL on error
746  **/
747 GBytes *
748 g_input_stream_read_bytes_finish (GInputStream  *stream,
749                                   GAsyncResult  *result,
750                                   GError       **error)
751 {
752   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
753   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
754
755   return g_task_propagate_pointer (G_TASK (result), error);
756 }
757
758 /**
759  * g_input_stream_skip_async:
760  * @stream: A #GInputStream.
761  * @count: the number of bytes that will be skipped from the stream
762  * @io_priority: the <link linkend="io-priority">I/O priority</link>
763  * of the request.
764  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
765  * @callback: (scope async): callback to call when the request is satisfied
766  * @user_data: (closure): the data to pass to callback function
767  *
768  * Request an asynchronous skip of @count bytes from the stream.
769  * When the operation is finished @callback will be called.
770  * You can then call g_input_stream_skip_finish() to get the result
771  * of the operation.
772  *
773  * During an async request no other sync and async calls are allowed,
774  * and will result in %G_IO_ERROR_PENDING errors.
775  *
776  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
777  *
778  * On success, the number of bytes skipped will be passed to the callback.
779  * It is not an error if this is not the same as the requested size, as it
780  * can happen e.g. near the end of a file, but generally we try to skip
781  * as many bytes as requested. Zero is returned on end of file
782  * (or if @count is zero), but never otherwise.
783  *
784  * Any outstanding i/o request with higher priority (lower numerical value)
785  * will be executed before an outstanding request with lower priority.
786  * Default priority is %G_PRIORITY_DEFAULT.
787  *
788  * The asynchronous methods have a default fallback that uses threads to
789  * implement asynchronicity, so they are optional for inheriting classes.
790  * However, if you override one, you must override all.
791  **/
792 void
793 g_input_stream_skip_async (GInputStream        *stream,
794                            gsize                count,
795                            int                  io_priority,
796                            GCancellable        *cancellable,
797                            GAsyncReadyCallback  callback,
798                            gpointer             user_data)
799 {
800   GInputStreamClass *class;
801   GError *error = NULL;
802
803   g_return_if_fail (G_IS_INPUT_STREAM (stream));
804
805   if (count == 0)
806     {
807       GTask *task;
808
809       task = g_task_new (stream, cancellable, callback, user_data);
810       g_task_set_source_tag (task, g_input_stream_skip_async);
811       g_task_return_int (task, 0);
812       g_object_unref (task);
813       return;
814     }
815   
816   if (((gssize) count) < 0)
817     {
818       g_task_report_new_error (stream, callback, user_data,
819                                g_input_stream_skip_async,
820                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
821                                _("Too large count value passed to %s"),
822                                G_STRFUNC);
823       return;
824     }
825
826   if (!g_input_stream_set_pending (stream, &error))
827     {
828       g_task_report_error (stream, callback, user_data,
829                            g_input_stream_skip_async,
830                            error);
831       return;
832     }
833
834   class = G_INPUT_STREAM_GET_CLASS (stream);
835   stream->priv->outstanding_callback = callback;
836   g_object_ref (stream);
837   class->skip_async (stream, count, io_priority, cancellable,
838                      async_ready_callback_wrapper, user_data);
839 }
840
841 /**
842  * g_input_stream_skip_finish:
843  * @stream: a #GInputStream.
844  * @result: a #GAsyncResult.
845  * @error: a #GError location to store the error occurring, or %NULL to 
846  * ignore.
847  * 
848  * Finishes a stream skip operation.
849  * 
850  * Returns: the size of the bytes skipped, or %-1 on error.
851  **/
852 gssize
853 g_input_stream_skip_finish (GInputStream  *stream,
854                             GAsyncResult  *result,
855                             GError       **error)
856 {
857   GInputStreamClass *class;
858
859   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
860   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
861
862   if (g_async_result_legacy_propagate_error (result, error))
863     return -1;
864   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
865     return g_task_propagate_int (G_TASK (result), error);
866
867   class = G_INPUT_STREAM_GET_CLASS (stream);
868   return class->skip_finish (stream, result, error);
869 }
870
871 /**
872  * g_input_stream_close_async:
873  * @stream: A #GInputStream.
874  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
875  * of the request. 
876  * @cancellable: (allow-none): optional cancellable object
877  * @callback: (scope async): callback to call when the request is satisfied
878  * @user_data: (closure): the data to pass to callback function
879  *
880  * Requests an asynchronous closes of the stream, releasing resources related to it.
881  * When the operation is finished @callback will be called. 
882  * You can then call g_input_stream_close_finish() to get the result of the 
883  * operation.
884  *
885  * For behaviour details see g_input_stream_close().
886  *
887  * The asyncronous methods have a default fallback that uses threads to implement
888  * asynchronicity, so they are optional for inheriting classes. However, if you
889  * override one you must override all.
890  **/
891 void
892 g_input_stream_close_async (GInputStream        *stream,
893                             int                  io_priority,
894                             GCancellable        *cancellable,
895                             GAsyncReadyCallback  callback,
896                             gpointer             user_data)
897 {
898   GInputStreamClass *class;
899   GError *error = NULL;
900
901   g_return_if_fail (G_IS_INPUT_STREAM (stream));
902
903   if (stream->priv->closed)
904     {
905       GTask *task;
906
907       task = g_task_new (stream, cancellable, callback, user_data);
908       g_task_set_source_tag (task, g_input_stream_close_async);
909       g_task_return_boolean (task, TRUE);
910       g_object_unref (task);
911       return;
912     }
913
914   if (!g_input_stream_set_pending (stream, &error))
915     {
916       g_task_report_error (stream, callback, user_data,
917                            g_input_stream_close_async,
918                            error);
919       return;
920     }
921   
922   class = G_INPUT_STREAM_GET_CLASS (stream);
923   stream->priv->outstanding_callback = callback;
924   g_object_ref (stream);
925   class->close_async (stream, io_priority, cancellable,
926                       async_ready_close_callback_wrapper, user_data);
927 }
928
929 /**
930  * g_input_stream_close_finish:
931  * @stream: a #GInputStream.
932  * @result: a #GAsyncResult.
933  * @error: a #GError location to store the error occurring, or %NULL to 
934  * ignore.
935  * 
936  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
937  * 
938  * Returns: %TRUE if the stream was closed successfully.
939  **/
940 gboolean
941 g_input_stream_close_finish (GInputStream  *stream,
942                              GAsyncResult  *result,
943                              GError       **error)
944 {
945   GInputStreamClass *class;
946
947   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
948   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
949
950   if (g_async_result_legacy_propagate_error (result, error))
951     return FALSE;
952   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
953     return g_task_propagate_boolean (G_TASK (result), error);
954
955   class = G_INPUT_STREAM_GET_CLASS (stream);
956   return class->close_finish (stream, result, error);
957 }
958
959 /**
960  * g_input_stream_is_closed:
961  * @stream: input stream.
962  * 
963  * Checks if an input stream is closed.
964  * 
965  * Returns: %TRUE if the stream is closed.
966  **/
967 gboolean
968 g_input_stream_is_closed (GInputStream *stream)
969 {
970   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
971   
972   return stream->priv->closed;
973 }
974  
975 /**
976  * g_input_stream_has_pending:
977  * @stream: input stream.
978  * 
979  * Checks if an input stream has pending actions.
980  * 
981  * Returns: %TRUE if @stream has pending actions.
982  **/  
983 gboolean
984 g_input_stream_has_pending (GInputStream *stream)
985 {
986   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
987   
988   return stream->priv->pending;
989 }
990
991 /**
992  * g_input_stream_set_pending:
993  * @stream: input stream
994  * @error: a #GError location to store the error occurring, or %NULL to 
995  * ignore.
996  * 
997  * Sets @stream to have actions pending. If the pending flag is
998  * already set or @stream is closed, it will return %FALSE and set
999  * @error.
1000  *
1001  * Return value: %TRUE if pending was previously unset and is now set.
1002  **/
1003 gboolean
1004 g_input_stream_set_pending (GInputStream *stream, GError **error)
1005 {
1006   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1007   
1008   if (stream->priv->closed)
1009     {
1010       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1011                            _("Stream is already closed"));
1012       return FALSE;
1013     }
1014   
1015   if (stream->priv->pending)
1016     {
1017       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1018                 /* Translators: This is an error you get if there is already an
1019                  * operation running against this stream when you try to start
1020                  * one */
1021                  _("Stream has outstanding operation"));
1022       return FALSE;
1023     }
1024   
1025   stream->priv->pending = TRUE;
1026   return TRUE;
1027 }
1028
1029 /**
1030  * g_input_stream_clear_pending:
1031  * @stream: input stream
1032  * 
1033  * Clears the pending flag on @stream.
1034  **/
1035 void
1036 g_input_stream_clear_pending (GInputStream *stream)
1037 {
1038   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1039   
1040   stream->priv->pending = FALSE;
1041 }
1042
1043 /**
1044  * g_input_stream_async_read_is_via_threads:
1045  * @stream: input stream
1046  *
1047  * Checks if an input stream's read_async function uses threads.
1048  *
1049  * Returns: %TRUE if @stream's read_async function uses threads.
1050  **/
1051 gboolean
1052 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1053 {
1054   GInputStreamClass *class;
1055
1056   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1057
1058   class = G_INPUT_STREAM_GET_CLASS (stream);
1059
1060   return (class->read_async == g_input_stream_real_read_async &&
1061       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1062         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1063 }
1064
1065 /********************************************
1066  *   Default implementation of async ops    *
1067  ********************************************/
1068
1069 typedef struct {
1070   void   *buffer;
1071   gsize   count;
1072 } ReadData;
1073
1074 static void
1075 free_read_data (ReadData *op)
1076 {
1077   g_slice_free (ReadData, op);
1078 }
1079
1080 static void
1081 read_async_thread (GTask        *task,
1082                    gpointer      source_object,
1083                    gpointer      task_data,
1084                    GCancellable *cancellable)
1085 {
1086   GInputStream *stream = source_object;
1087   ReadData *op = task_data;
1088   GInputStreamClass *class;
1089   GError *error = NULL;
1090   gssize nread;
1091  
1092   class = G_INPUT_STREAM_GET_CLASS (stream);
1093
1094   nread = class->read_fn (stream,
1095                           op->buffer, op->count,
1096                           g_task_get_cancellable (task),
1097                           &error);
1098   if (nread == -1)
1099     g_task_return_error (task, error);
1100   else
1101     g_task_return_int (task, nread);
1102 }
1103
1104 static void read_async_pollable (GPollableInputStream *stream,
1105                                  GTask                *task);
1106
1107 static gboolean
1108 read_async_pollable_ready (GPollableInputStream *stream,
1109                            gpointer              user_data)
1110 {
1111   GTask *task = user_data;
1112
1113   read_async_pollable (stream, task);
1114   return FALSE;
1115 }
1116
1117 static void
1118 read_async_pollable (GPollableInputStream *stream,
1119                      GTask                *task)
1120 {
1121   ReadData *op = g_task_get_task_data (task);
1122   GError *error = NULL;
1123   gssize nread;
1124
1125   if (g_task_return_error_if_cancelled (task))
1126     return;
1127
1128   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1129     read_nonblocking (stream, op->buffer, op->count, &error);
1130
1131   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1132     {
1133       GSource *source;
1134
1135       g_error_free (error);
1136
1137       source = g_pollable_input_stream_create_source (stream,
1138                                                       g_task_get_cancellable (task));
1139       g_task_attach_source (task, source,
1140                             (GSourceFunc) read_async_pollable_ready);
1141       g_source_unref (source);
1142       return;
1143     }
1144
1145   if (nread == -1)
1146     g_task_return_error (task, error);
1147   else
1148     g_task_return_int (task, nread);
1149   /* g_input_stream_real_read_async() unrefs task */
1150 }
1151
1152
1153 static void
1154 g_input_stream_real_read_async (GInputStream        *stream,
1155                                 void                *buffer,
1156                                 gsize                count,
1157                                 int                  io_priority,
1158                                 GCancellable        *cancellable,
1159                                 GAsyncReadyCallback  callback,
1160                                 gpointer             user_data)
1161 {
1162   GTask *task;
1163   ReadData *op;
1164   
1165   op = g_slice_new0 (ReadData);
1166   task = g_task_new (stream, cancellable, callback, user_data);
1167   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1168   g_task_set_priority (task, io_priority);
1169   op->buffer = buffer;
1170   op->count = count;
1171
1172   if (!g_input_stream_async_read_is_via_threads (stream))
1173     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1174   else
1175     g_task_run_in_thread (task, read_async_thread);
1176   g_object_unref (task);
1177 }
1178
1179 static gssize
1180 g_input_stream_real_read_finish (GInputStream  *stream,
1181                                  GAsyncResult  *result,
1182                                  GError       **error)
1183 {
1184   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1185
1186   return g_task_propagate_int (G_TASK (result), error);
1187 }
1188
1189
1190 static void
1191 skip_async_thread (GTask        *task,
1192                    gpointer      source_object,
1193                    gpointer      task_data,
1194                    GCancellable *cancellable)
1195 {
1196   GInputStream *stream = source_object;
1197   gsize count = GPOINTER_TO_SIZE (task_data);
1198   GInputStreamClass *class;
1199   GError *error = NULL;
1200   gssize ret;
1201
1202   class = G_INPUT_STREAM_GET_CLASS (stream);
1203   ret = class->skip (stream, count,
1204                      g_task_get_cancellable (task),
1205                      &error);
1206   if (ret == -1)
1207     g_task_return_error (task, error);
1208   else
1209     g_task_return_int (task, ret);
1210 }
1211
1212 typedef struct {
1213   char buffer[8192];
1214   gsize count;
1215   gsize count_skipped;
1216 } SkipFallbackAsyncData;
1217
1218 static void
1219 skip_callback_wrapper (GObject      *source_object,
1220                        GAsyncResult *res,
1221                        gpointer      user_data)
1222 {
1223   GInputStreamClass *class;
1224   GTask *task = user_data;
1225   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1226   GError *error = NULL;
1227   gssize ret;
1228
1229   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1230
1231   if (ret > 0)
1232     {
1233       data->count -= ret;
1234       data->count_skipped += ret;
1235
1236       if (data->count > 0)
1237         {
1238           class = G_INPUT_STREAM_GET_CLASS (source_object);
1239           class->read_async (G_INPUT_STREAM (source_object),
1240                              data->buffer, MIN (8192, data->count),
1241                              g_task_get_priority (task),
1242                              g_task_get_cancellable (task),
1243                              skip_callback_wrapper, task);
1244           return;
1245         }
1246     }
1247
1248   if (ret == -1 &&
1249       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1250       data->count_skipped)
1251     {
1252       /* No error, return partial read */
1253       g_clear_error (&error);
1254     }
1255
1256   if (error)
1257     g_task_return_error (task, error);
1258   else
1259     g_task_return_int (task, data->count_skipped);
1260   g_object_unref (task);
1261  }
1262
1263 static void
1264 g_input_stream_real_skip_async (GInputStream        *stream,
1265                                 gsize                count,
1266                                 int                  io_priority,
1267                                 GCancellable        *cancellable,
1268                                 GAsyncReadyCallback  callback,
1269                                 gpointer             user_data)
1270 {
1271   GInputStreamClass *class;
1272   SkipFallbackAsyncData *data;
1273   GTask *task;
1274
1275   class = G_INPUT_STREAM_GET_CLASS (stream);
1276
1277   task = g_task_new (stream, cancellable, callback, user_data);
1278   g_task_set_priority (task, io_priority);
1279
1280   if (g_input_stream_async_read_is_via_threads (stream))
1281     {
1282       /* Read is thread-using async fallback.
1283        * Make skip use threads too, so that we can use a possible sync skip
1284        * implementation. */
1285       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1286
1287       g_task_run_in_thread (task, skip_async_thread);
1288       g_object_unref (task);
1289     }
1290   else
1291     {
1292       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1293       
1294       /* There is a custom async read function, lets use that. */
1295       data = g_new (SkipFallbackAsyncData, 1);
1296       data->count = count;
1297       data->count_skipped = 0;
1298       g_task_set_task_data (task, data, g_free);
1299       g_task_set_check_cancellable (task, FALSE);
1300       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1301                          skip_callback_wrapper, task);
1302     }
1303
1304 }
1305
1306 static gssize
1307 g_input_stream_real_skip_finish (GInputStream  *stream,
1308                                  GAsyncResult  *result,
1309                                  GError       **error)
1310 {
1311   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1312
1313   return g_task_propagate_int (G_TASK (result), error);
1314 }
1315
1316 static void
1317 close_async_thread (GTask        *task,
1318                     gpointer      source_object,
1319                     gpointer      task_data,
1320                     GCancellable *cancellable)
1321 {
1322   GInputStream *stream = source_object;
1323   GInputStreamClass *class;
1324   GError *error = NULL;
1325   gboolean result;
1326
1327   class = G_INPUT_STREAM_GET_CLASS (stream);
1328   if (class->close_fn)
1329     {
1330       result = class->close_fn (stream,
1331                                 g_task_get_cancellable (task),
1332                                 &error);
1333       if (!result)
1334         {
1335           g_task_return_error (task, error);
1336           return;
1337         }
1338     }
1339
1340   g_task_return_boolean (task, TRUE);
1341 }
1342
1343 static void
1344 g_input_stream_real_close_async (GInputStream        *stream,
1345                                  int                  io_priority,
1346                                  GCancellable        *cancellable,
1347                                  GAsyncReadyCallback  callback,
1348                                  gpointer             user_data)
1349 {
1350   GTask *task;
1351
1352   task = g_task_new (stream, cancellable, callback, user_data);
1353   g_task_set_check_cancellable (task, FALSE);
1354   g_task_set_priority (task, io_priority);
1355   
1356   g_task_run_in_thread (task, close_async_thread);
1357   g_object_unref (task);
1358 }
1359
1360 static gboolean
1361 g_input_stream_real_close_finish (GInputStream  *stream,
1362                                   GAsyncResult  *result,
1363                                   GError       **error)
1364 {
1365   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1366
1367   return g_task_propagate_boolean (G_TASK (result), error);
1368 }