GApplication: add a "resource base path"
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GInputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   GAsyncReadyCallback outstanding_callback;
52 };
53
54 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
55
56 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
57                                                   gsize                 count,
58                                                   GCancellable         *cancellable,
59                                                   GError              **error);
60 static void     g_input_stream_real_read_async   (GInputStream         *stream,
61                                                   void                 *buffer,
62                                                   gsize                 count,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              user_data);
67 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
71                                                   gsize                 count,
72                                                   int                   io_priority,
73                                                   GCancellable         *cancellable,
74                                                   GAsyncReadyCallback   callback,
75                                                   gpointer              data);
76 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
77                                                   GAsyncResult         *result,
78                                                   GError              **error);
79 static void     g_input_stream_real_close_async  (GInputStream         *stream,
80                                                   int                   io_priority,
81                                                   GCancellable         *cancellable,
82                                                   GAsyncReadyCallback   callback,
83                                                   gpointer              data);
84 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
85                                                   GAsyncResult         *result,
86                                                   GError              **error);
87
88 static void
89 g_input_stream_dispose (GObject *object)
90 {
91   GInputStream *stream;
92
93   stream = G_INPUT_STREAM (object);
94   
95   if (!stream->priv->closed)
96     g_input_stream_close (stream, NULL, NULL);
97
98   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
99 }
100
101
102 static void
103 g_input_stream_class_init (GInputStreamClass *klass)
104 {
105   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
106   
107   gobject_class->dispose = g_input_stream_dispose;
108   
109   klass->skip = g_input_stream_real_skip;
110   klass->read_async = g_input_stream_real_read_async;
111   klass->read_finish = g_input_stream_real_read_finish;
112   klass->skip_async = g_input_stream_real_skip_async;
113   klass->skip_finish = g_input_stream_real_skip_finish;
114   klass->close_async = g_input_stream_real_close_async;
115   klass->close_finish = g_input_stream_real_close_finish;
116 }
117
118 static void
119 g_input_stream_init (GInputStream *stream)
120 {
121   stream->priv = g_input_stream_get_instance_private (stream);
122 }
123
124 /**
125  * g_input_stream_read:
126  * @stream: a #GInputStream.
127  * @buffer: (array length=count) (element-type guint8): a buffer to
128  *     read data into (which should be at least count bytes long).
129  * @count: the number of bytes that will be read from the stream
130  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
131  * @error: location to store the error occurring, or %NULL to ignore
132  *
133  * Tries to read @count bytes from the stream into the buffer starting at
134  * @buffer. Will block during this read.
135  * 
136  * If count is zero returns zero and does nothing. A value of @count
137  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
138  *
139  * On success, the number of bytes read into the buffer is returned.
140  * It is not an error if this is not the same as the requested size, as it
141  * can happen e.g. near the end of a file. Zero is returned on end of file
142  * (or if @count is zero),  but never otherwise.
143  *
144  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
145  * at any position, and this function doesn't nul-terminate the @buffer.
146  *
147  * If @cancellable is not %NULL, then the operation can be cancelled by
148  * triggering the cancellable object from another thread. If the operation
149  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
150  * operation was partially finished when the operation was cancelled the
151  * partial result will be returned, without an error.
152  *
153  * On error -1 is returned and @error is set accordingly.
154  * 
155  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
156  **/
157 gssize
158 g_input_stream_read  (GInputStream  *stream,
159                       void          *buffer,
160                       gsize          count,
161                       GCancellable  *cancellable,
162                       GError       **error)
163 {
164   GInputStreamClass *class;
165   gssize res;
166
167   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
168   g_return_val_if_fail (buffer != NULL, 0);
169
170   if (count == 0)
171     return 0;
172   
173   if (((gssize) count) < 0)
174     {
175       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
176                    _("Too large count value passed to %s"), G_STRFUNC);
177       return -1;
178     }
179
180   class = G_INPUT_STREAM_GET_CLASS (stream);
181
182   if (class->read_fn == NULL) 
183     {
184       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
185                            _("Input stream doesn't implement read"));
186       return -1;
187     }
188
189   if (!g_input_stream_set_pending (stream, error))
190     return -1;
191
192   if (cancellable)
193     g_cancellable_push_current (cancellable);
194   
195   res = class->read_fn (stream, buffer, count, cancellable, error);
196
197   if (cancellable)
198     g_cancellable_pop_current (cancellable);
199   
200   g_input_stream_clear_pending (stream);
201
202   return res;
203 }
204
205 /**
206  * g_input_stream_read_all:
207  * @stream: a #GInputStream.
208  * @buffer: (array length=count) (element-type guint8): a buffer to
209  *     read data into (which should be at least count bytes long).
210  * @count: the number of bytes that will be read from the stream
211  * @bytes_read: (out): location to store the number of bytes that was read from the stream
212  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
213  * @error: location to store the error occurring, or %NULL to ignore
214  *
215  * Tries to read @count bytes from the stream into the buffer starting at
216  * @buffer. Will block during this read.
217  *
218  * This function is similar to g_input_stream_read(), except it tries to
219  * read as many bytes as requested, only stopping on an error or end of stream.
220  *
221  * On a successful read of @count bytes, or if we reached the end of the
222  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
223  * read into @buffer.
224  * 
225  * If there is an error during the operation %FALSE is returned and @error
226  * is set to indicate the error status, @bytes_read is updated to contain
227  * the number of bytes read into @buffer before the error occurred.
228  *
229  * Returns: %TRUE on success, %FALSE if there was an error
230  **/
231 gboolean
232 g_input_stream_read_all (GInputStream  *stream,
233                          void          *buffer,
234                          gsize          count,
235                          gsize         *bytes_read,
236                          GCancellable  *cancellable,
237                          GError       **error)
238 {
239   gsize _bytes_read;
240   gssize res;
241
242   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
243   g_return_val_if_fail (buffer != NULL, FALSE);
244
245   _bytes_read = 0;
246   while (_bytes_read < count)
247     {
248       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
249                                  cancellable, error);
250       if (res == -1)
251         {
252           if (bytes_read)
253             *bytes_read = _bytes_read;
254           return FALSE;
255         }
256       
257       if (res == 0)
258         break;
259
260       _bytes_read += res;
261     }
262
263   if (bytes_read)
264     *bytes_read = _bytes_read;
265   return TRUE;
266 }
267
268 /**
269  * g_input_stream_read_bytes:
270  * @stream: a #GInputStream.
271  * @count: maximum number of bytes that will be read from the stream. Common
272  * values include 4096 and 8192.
273  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
274  * @error: location to store the error occurring, or %NULL to ignore
275  *
276  * Like g_input_stream_read(), this tries to read @count bytes from
277  * the stream in a blocking fashion. However, rather than reading into
278  * a user-supplied buffer, this will create a new #GBytes containing
279  * the data that was read. This may be easier to use from language
280  * bindings.
281  *
282  * If count is zero, returns a zero-length #GBytes and does nothing. A
283  * value of @count larger than %G_MAXSSIZE will cause a
284  * %G_IO_ERROR_INVALID_ARGUMENT error.
285  *
286  * On success, a new #GBytes is returned. It is not an error if the
287  * size of this object is not the same as the requested size, as it
288  * can happen e.g. near the end of a file. A zero-length #GBytes is
289  * returned on end of file (or if @count is zero), but never
290  * otherwise.
291  *
292  * If @cancellable is not %NULL, then the operation can be cancelled by
293  * triggering the cancellable object from another thread. If the operation
294  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
295  * operation was partially finished when the operation was cancelled the
296  * partial result will be returned, without an error.
297  *
298  * On error %NULL is returned and @error is set accordingly.
299  *
300  * Returns: a new #GBytes, or %NULL on error
301  **/
302 GBytes *
303 g_input_stream_read_bytes (GInputStream  *stream,
304                            gsize          count,
305                            GCancellable  *cancellable,
306                            GError       **error)
307 {
308   guchar *buf;
309   gssize nread;
310
311   buf = g_malloc (count);
312   nread = g_input_stream_read (stream, buf, count, cancellable, error);
313   if (nread == -1)
314     {
315       g_free (buf);
316       return NULL;
317     }
318   else if (nread == 0)
319     {
320       g_free (buf);
321       return g_bytes_new_static ("", 0);
322     }
323   else
324     return g_bytes_new_take (buf, nread);
325 }
326
327 /**
328  * g_input_stream_skip:
329  * @stream: a #GInputStream.
330  * @count: the number of bytes that will be skipped from the stream
331  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
332  * @error: location to store the error occurring, or %NULL to ignore
333  *
334  * Tries to skip @count bytes from the stream. Will block during the operation.
335  *
336  * This is identical to g_input_stream_read(), from a behaviour standpoint,
337  * but the bytes that are skipped are not returned to the user. Some
338  * streams have an implementation that is more efficient than reading the data.
339  *
340  * This function is optional for inherited classes, as the default implementation
341  * emulates it using read.
342  *
343  * If @cancellable is not %NULL, then the operation can be cancelled by
344  * triggering the cancellable object from another thread. If the operation
345  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
346  * operation was partially finished when the operation was cancelled the
347  * partial result will be returned, without an error.
348  *
349  * Returns: Number of bytes skipped, or -1 on error
350  **/
351 gssize
352 g_input_stream_skip (GInputStream  *stream,
353                      gsize          count,
354                      GCancellable  *cancellable,
355                      GError       **error)
356 {
357   GInputStreamClass *class;
358   gssize res;
359
360   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
361
362   if (count == 0)
363     return 0;
364
365   if (((gssize) count) < 0)
366     {
367       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
368                    _("Too large count value passed to %s"), G_STRFUNC);
369       return -1;
370     }
371   
372   class = G_INPUT_STREAM_GET_CLASS (stream);
373
374   if (!g_input_stream_set_pending (stream, error))
375     return -1;
376
377   if (cancellable)
378     g_cancellable_push_current (cancellable);
379   
380   res = class->skip (stream, count, cancellable, error);
381
382   if (cancellable)
383     g_cancellable_pop_current (cancellable);
384   
385   g_input_stream_clear_pending (stream);
386
387   return res;
388 }
389
390 static gssize
391 g_input_stream_real_skip (GInputStream  *stream,
392                           gsize          count,
393                           GCancellable  *cancellable,
394                           GError       **error)
395 {
396   GInputStreamClass *class;
397   gssize ret, read_bytes;
398   char buffer[8192];
399   GError *my_error;
400
401   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
402     {
403       if (g_seekable_seek (G_SEEKABLE (stream),
404                            count,
405                            G_SEEK_CUR,
406                            cancellable,
407                            NULL))
408         return count;
409     }
410
411   /* If not seekable, or seek failed, fall back to reading data: */
412
413   class = G_INPUT_STREAM_GET_CLASS (stream);
414
415   read_bytes = 0;
416   while (1)
417     {
418       my_error = NULL;
419
420       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
421                             cancellable, &my_error);
422       if (ret == -1)
423         {
424           if (read_bytes > 0 &&
425               my_error->domain == G_IO_ERROR &&
426               my_error->code == G_IO_ERROR_CANCELLED)
427             {
428               g_error_free (my_error);
429               return read_bytes;
430             }
431
432           g_propagate_error (error, my_error);
433           return -1;
434         }
435
436       count -= ret;
437       read_bytes += ret;
438
439       if (ret == 0 || count == 0)
440         return read_bytes;
441     }
442 }
443
444 /**
445  * g_input_stream_close:
446  * @stream: A #GInputStream.
447  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
448  * @error: location to store the error occurring, or %NULL to ignore
449  *
450  * Closes the stream, releasing resources related to it.
451  *
452  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
453  * Closing a stream multiple times will not return an error.
454  *
455  * Streams will be automatically closed when the last reference
456  * is dropped, but you might want to call this function to make sure 
457  * resources are released as early as possible.
458  *
459  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
460  * open after the stream is closed. See the documentation for the individual
461  * stream for details.
462  *
463  * On failure the first error that happened will be reported, but the close
464  * operation will finish as much as possible. A stream that failed to
465  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
466  * is important to check and report the error to the user.
467  *
468  * If @cancellable is not %NULL, then the operation can be cancelled by
469  * triggering the cancellable object from another thread. If the operation
470  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
471  * Cancelling a close will still leave the stream closed, but some streams
472  * can use a faster close that doesn't block to e.g. check errors. 
473  *
474  * Returns: %TRUE on success, %FALSE on failure
475  **/
476 gboolean
477 g_input_stream_close (GInputStream  *stream,
478                       GCancellable  *cancellable,
479                       GError       **error)
480 {
481   GInputStreamClass *class;
482   gboolean res;
483
484   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
485
486   class = G_INPUT_STREAM_GET_CLASS (stream);
487
488   if (stream->priv->closed)
489     return TRUE;
490
491   res = TRUE;
492
493   if (!g_input_stream_set_pending (stream, error))
494     return FALSE;
495
496   if (cancellable)
497     g_cancellable_push_current (cancellable);
498
499   if (class->close_fn)
500     res = class->close_fn (stream, cancellable, error);
501
502   if (cancellable)
503     g_cancellable_pop_current (cancellable);
504
505   g_input_stream_clear_pending (stream);
506   
507   stream->priv->closed = TRUE;
508   
509   return res;
510 }
511
512 static void
513 async_ready_callback_wrapper (GObject      *source_object,
514                               GAsyncResult *res,
515                               gpointer      user_data)
516 {
517   GInputStream *stream = G_INPUT_STREAM (source_object);
518
519   g_input_stream_clear_pending (stream);
520   if (stream->priv->outstanding_callback)
521     (*stream->priv->outstanding_callback) (source_object, res, user_data);
522   g_object_unref (stream);
523 }
524
525 static void
526 async_ready_close_callback_wrapper (GObject      *source_object,
527                                     GAsyncResult *res,
528                                     gpointer      user_data)
529 {
530   GInputStream *stream = G_INPUT_STREAM (source_object);
531
532   g_input_stream_clear_pending (stream);
533   stream->priv->closed = TRUE;
534   if (stream->priv->outstanding_callback)
535     (*stream->priv->outstanding_callback) (source_object, res, user_data);
536   g_object_unref (stream);
537 }
538
539 /**
540  * g_input_stream_read_async:
541  * @stream: A #GInputStream.
542  * @buffer: (array length=count) (element-type guint8): a buffer to
543  *     read data into (which should be at least count bytes long).
544  * @count: the number of bytes that will be read from the stream
545  * @io_priority: the [I/O priority][io-priority]
546  * of the request. 
547  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
548  * @callback: (scope async): callback to call when the request is satisfied
549  * @user_data: (closure): the data to pass to callback function
550  *
551  * Request an asynchronous read of @count bytes from the stream into the buffer
552  * starting at @buffer. When the operation is finished @callback will be called. 
553  * You can then call g_input_stream_read_finish() to get the result of the 
554  * operation.
555  *
556  * During an async request no other sync and async calls are allowed on @stream, and will
557  * result in %G_IO_ERROR_PENDING errors. 
558  *
559  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
560  *
561  * On success, the number of bytes read into the buffer will be passed to the
562  * callback. It is not an error if this is not the same as the requested size, as it
563  * can happen e.g. near the end of a file, but generally we try to read
564  * as many bytes as requested. Zero is returned on end of file
565  * (or if @count is zero),  but never otherwise.
566  *
567  * Any outstanding i/o request with higher priority (lower numerical value) will
568  * be executed before an outstanding request with lower priority. Default
569  * priority is %G_PRIORITY_DEFAULT.
570  *
571  * The asyncronous methods have a default fallback that uses threads to implement
572  * asynchronicity, so they are optional for inheriting classes. However, if you
573  * override one you must override all.
574  **/
575 void
576 g_input_stream_read_async (GInputStream        *stream,
577                            void                *buffer,
578                            gsize                count,
579                            int                  io_priority,
580                            GCancellable        *cancellable,
581                            GAsyncReadyCallback  callback,
582                            gpointer             user_data)
583 {
584   GInputStreamClass *class;
585   GError *error = NULL;
586
587   g_return_if_fail (G_IS_INPUT_STREAM (stream));
588   g_return_if_fail (buffer != NULL);
589
590   if (count == 0)
591     {
592       GTask *task;
593
594       task = g_task_new (stream, cancellable, callback, user_data);
595       g_task_set_source_tag (task, g_input_stream_read_async);
596       g_task_return_int (task, 0);
597       g_object_unref (task);
598       return;
599     }
600   
601   if (((gssize) count) < 0)
602     {
603       g_task_report_new_error (stream, callback, user_data,
604                                g_input_stream_read_async,
605                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
606                                _("Too large count value passed to %s"),
607                                G_STRFUNC);
608       return;
609     }
610
611   if (!g_input_stream_set_pending (stream, &error))
612     {
613       g_task_report_error (stream, callback, user_data,
614                            g_input_stream_read_async,
615                            error);
616       return;
617     }
618
619   class = G_INPUT_STREAM_GET_CLASS (stream);
620   stream->priv->outstanding_callback = callback;
621   g_object_ref (stream);
622   class->read_async (stream, buffer, count, io_priority, cancellable,
623                      async_ready_callback_wrapper, user_data);
624 }
625
626 /**
627  * g_input_stream_read_finish:
628  * @stream: a #GInputStream.
629  * @result: a #GAsyncResult.
630  * @error: a #GError location to store the error occurring, or %NULL to 
631  * ignore.
632  * 
633  * Finishes an asynchronous stream read operation. 
634  * 
635  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
636  **/
637 gssize
638 g_input_stream_read_finish (GInputStream  *stream,
639                             GAsyncResult  *result,
640                             GError       **error)
641 {
642   GInputStreamClass *class;
643   
644   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
645   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
646
647   if (g_async_result_legacy_propagate_error (result, error))
648     return -1;
649   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
650     return g_task_propagate_int (G_TASK (result), error);
651
652   class = G_INPUT_STREAM_GET_CLASS (stream);
653   return class->read_finish (stream, result, error);
654 }
655
656 static void
657 read_bytes_callback (GObject      *stream,
658                      GAsyncResult *result,
659                      gpointer      user_data)
660 {
661   GTask *task = user_data;
662   guchar *buf = g_task_get_task_data (task);
663   GError *error = NULL;
664   gssize nread;
665   GBytes *bytes = NULL;
666
667   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
668                                       result, &error);
669   if (nread == -1)
670     {
671       g_free (buf);
672       g_task_return_error (task, error);
673     }
674   else if (nread == 0)
675     {
676       g_free (buf);
677       bytes = g_bytes_new_static ("", 0);
678     }
679   else
680     bytes = g_bytes_new_take (buf, nread);
681
682   if (bytes)
683     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
684
685   g_object_unref (task);
686 }
687
688 /**
689  * g_input_stream_read_bytes_async:
690  * @stream: A #GInputStream.
691  * @count: the number of bytes that will be read from the stream
692  * @io_priority: the [I/O priority][io-priority] of the request
693  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
694  * @callback: (scope async): callback to call when the request is satisfied
695  * @user_data: (closure): the data to pass to callback function
696  *
697  * Request an asynchronous read of @count bytes from the stream into a
698  * new #GBytes. When the operation is finished @callback will be
699  * called. You can then call g_input_stream_read_bytes_finish() to get the
700  * result of the operation.
701  *
702  * During an async request no other sync and async calls are allowed
703  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
704  *
705  * A value of @count larger than %G_MAXSSIZE will cause a
706  * %G_IO_ERROR_INVALID_ARGUMENT error.
707  *
708  * On success, the new #GBytes will be passed to the callback. It is
709  * not an error if this is smaller than the requested size, as it can
710  * happen e.g. near the end of a file, but generally we try to read as
711  * many bytes as requested. Zero is returned on end of file (or if
712  * @count is zero), but never otherwise.
713  *
714  * Any outstanding I/O request with higher priority (lower numerical
715  * value) will be executed before an outstanding request with lower
716  * priority. Default priority is %G_PRIORITY_DEFAULT.
717  **/
718 void
719 g_input_stream_read_bytes_async (GInputStream          *stream,
720                                  gsize                  count,
721                                  int                    io_priority,
722                                  GCancellable          *cancellable,
723                                  GAsyncReadyCallback    callback,
724                                  gpointer               user_data)
725 {
726   GTask *task;
727   guchar *buf;
728
729   task = g_task_new (stream, cancellable, callback, user_data);
730   buf = g_malloc (count);
731   g_task_set_task_data (task, buf, NULL);
732
733   g_input_stream_read_async (stream, buf, count,
734                              io_priority, cancellable,
735                              read_bytes_callback, task);
736 }
737
738 /**
739  * g_input_stream_read_bytes_finish:
740  * @stream: a #GInputStream.
741  * @result: a #GAsyncResult.
742  * @error: a #GError location to store the error occurring, or %NULL to
743  *   ignore.
744  *
745  * Finishes an asynchronous stream read-into-#GBytes operation.
746  *
747  * Returns: the newly-allocated #GBytes, or %NULL on error
748  **/
749 GBytes *
750 g_input_stream_read_bytes_finish (GInputStream  *stream,
751                                   GAsyncResult  *result,
752                                   GError       **error)
753 {
754   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
755   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
756
757   return g_task_propagate_pointer (G_TASK (result), error);
758 }
759
760 /**
761  * g_input_stream_skip_async:
762  * @stream: A #GInputStream.
763  * @count: the number of bytes that will be skipped from the stream
764  * @io_priority: the [I/O priority][io-priority] of the request
765  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
766  * @callback: (scope async): callback to call when the request is satisfied
767  * @user_data: (closure): the data to pass to callback function
768  *
769  * Request an asynchronous skip of @count bytes from the stream.
770  * When the operation is finished @callback will be called.
771  * You can then call g_input_stream_skip_finish() to get the result
772  * of the operation.
773  *
774  * During an async request no other sync and async calls are allowed,
775  * and will result in %G_IO_ERROR_PENDING errors.
776  *
777  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
778  *
779  * On success, the number of bytes skipped will be passed to the callback.
780  * It is not an error if this is not the same as the requested size, as it
781  * can happen e.g. near the end of a file, but generally we try to skip
782  * as many bytes as requested. Zero is returned on end of file
783  * (or if @count is zero), but never otherwise.
784  *
785  * Any outstanding i/o request with higher priority (lower numerical value)
786  * will be executed before an outstanding request with lower priority.
787  * Default priority is %G_PRIORITY_DEFAULT.
788  *
789  * The asynchronous methods have a default fallback that uses threads to
790  * implement asynchronicity, so they are optional for inheriting classes.
791  * However, if you override one, you must override all.
792  **/
793 void
794 g_input_stream_skip_async (GInputStream        *stream,
795                            gsize                count,
796                            int                  io_priority,
797                            GCancellable        *cancellable,
798                            GAsyncReadyCallback  callback,
799                            gpointer             user_data)
800 {
801   GInputStreamClass *class;
802   GError *error = NULL;
803
804   g_return_if_fail (G_IS_INPUT_STREAM (stream));
805
806   if (count == 0)
807     {
808       GTask *task;
809
810       task = g_task_new (stream, cancellable, callback, user_data);
811       g_task_set_source_tag (task, g_input_stream_skip_async);
812       g_task_return_int (task, 0);
813       g_object_unref (task);
814       return;
815     }
816   
817   if (((gssize) count) < 0)
818     {
819       g_task_report_new_error (stream, callback, user_data,
820                                g_input_stream_skip_async,
821                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
822                                _("Too large count value passed to %s"),
823                                G_STRFUNC);
824       return;
825     }
826
827   if (!g_input_stream_set_pending (stream, &error))
828     {
829       g_task_report_error (stream, callback, user_data,
830                            g_input_stream_skip_async,
831                            error);
832       return;
833     }
834
835   class = G_INPUT_STREAM_GET_CLASS (stream);
836   stream->priv->outstanding_callback = callback;
837   g_object_ref (stream);
838   class->skip_async (stream, count, io_priority, cancellable,
839                      async_ready_callback_wrapper, user_data);
840 }
841
842 /**
843  * g_input_stream_skip_finish:
844  * @stream: a #GInputStream.
845  * @result: a #GAsyncResult.
846  * @error: a #GError location to store the error occurring, or %NULL to 
847  * ignore.
848  * 
849  * Finishes a stream skip operation.
850  * 
851  * Returns: the size of the bytes skipped, or %-1 on error.
852  **/
853 gssize
854 g_input_stream_skip_finish (GInputStream  *stream,
855                             GAsyncResult  *result,
856                             GError       **error)
857 {
858   GInputStreamClass *class;
859
860   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
861   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
862
863   if (g_async_result_legacy_propagate_error (result, error))
864     return -1;
865   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
866     return g_task_propagate_int (G_TASK (result), error);
867
868   class = G_INPUT_STREAM_GET_CLASS (stream);
869   return class->skip_finish (stream, result, error);
870 }
871
872 /**
873  * g_input_stream_close_async:
874  * @stream: A #GInputStream.
875  * @io_priority: the [I/O priority][io-priority] of the request
876  * @cancellable: (allow-none): optional cancellable object
877  * @callback: (scope async): callback to call when the request is satisfied
878  * @user_data: (closure): the data to pass to callback function
879  *
880  * Requests an asynchronous closes of the stream, releasing resources related to it.
881  * When the operation is finished @callback will be called. 
882  * You can then call g_input_stream_close_finish() to get the result of the 
883  * operation.
884  *
885  * For behaviour details see g_input_stream_close().
886  *
887  * The asyncronous methods have a default fallback that uses threads to implement
888  * asynchronicity, so they are optional for inheriting classes. However, if you
889  * override one you must override all.
890  **/
891 void
892 g_input_stream_close_async (GInputStream        *stream,
893                             int                  io_priority,
894                             GCancellable        *cancellable,
895                             GAsyncReadyCallback  callback,
896                             gpointer             user_data)
897 {
898   GInputStreamClass *class;
899   GError *error = NULL;
900
901   g_return_if_fail (G_IS_INPUT_STREAM (stream));
902
903   if (stream->priv->closed)
904     {
905       GTask *task;
906
907       task = g_task_new (stream, cancellable, callback, user_data);
908       g_task_set_source_tag (task, g_input_stream_close_async);
909       g_task_return_boolean (task, TRUE);
910       g_object_unref (task);
911       return;
912     }
913
914   if (!g_input_stream_set_pending (stream, &error))
915     {
916       g_task_report_error (stream, callback, user_data,
917                            g_input_stream_close_async,
918                            error);
919       return;
920     }
921   
922   class = G_INPUT_STREAM_GET_CLASS (stream);
923   stream->priv->outstanding_callback = callback;
924   g_object_ref (stream);
925   class->close_async (stream, io_priority, cancellable,
926                       async_ready_close_callback_wrapper, user_data);
927 }
928
929 /**
930  * g_input_stream_close_finish:
931  * @stream: a #GInputStream.
932  * @result: a #GAsyncResult.
933  * @error: a #GError location to store the error occurring, or %NULL to 
934  * ignore.
935  * 
936  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
937  * 
938  * Returns: %TRUE if the stream was closed successfully.
939  **/
940 gboolean
941 g_input_stream_close_finish (GInputStream  *stream,
942                              GAsyncResult  *result,
943                              GError       **error)
944 {
945   GInputStreamClass *class;
946
947   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
948   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
949
950   if (g_async_result_legacy_propagate_error (result, error))
951     return FALSE;
952   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
953     return g_task_propagate_boolean (G_TASK (result), error);
954
955   class = G_INPUT_STREAM_GET_CLASS (stream);
956   return class->close_finish (stream, result, error);
957 }
958
959 /**
960  * g_input_stream_is_closed:
961  * @stream: input stream.
962  * 
963  * Checks if an input stream is closed.
964  * 
965  * Returns: %TRUE if the stream is closed.
966  **/
967 gboolean
968 g_input_stream_is_closed (GInputStream *stream)
969 {
970   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
971   
972   return stream->priv->closed;
973 }
974  
975 /**
976  * g_input_stream_has_pending:
977  * @stream: input stream.
978  * 
979  * Checks if an input stream has pending actions.
980  * 
981  * Returns: %TRUE if @stream has pending actions.
982  **/  
983 gboolean
984 g_input_stream_has_pending (GInputStream *stream)
985 {
986   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
987   
988   return stream->priv->pending;
989 }
990
991 /**
992  * g_input_stream_set_pending:
993  * @stream: input stream
994  * @error: a #GError location to store the error occurring, or %NULL to 
995  * ignore.
996  * 
997  * Sets @stream to have actions pending. If the pending flag is
998  * already set or @stream is closed, it will return %FALSE and set
999  * @error.
1000  *
1001  * Returns: %TRUE if pending was previously unset and is now set.
1002  **/
1003 gboolean
1004 g_input_stream_set_pending (GInputStream *stream, GError **error)
1005 {
1006   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1007   
1008   if (stream->priv->closed)
1009     {
1010       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1011                            _("Stream is already closed"));
1012       return FALSE;
1013     }
1014   
1015   if (stream->priv->pending)
1016     {
1017       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1018                 /* Translators: This is an error you get if there is already an
1019                  * operation running against this stream when you try to start
1020                  * one */
1021                  _("Stream has outstanding operation"));
1022       return FALSE;
1023     }
1024   
1025   stream->priv->pending = TRUE;
1026   return TRUE;
1027 }
1028
1029 /**
1030  * g_input_stream_clear_pending:
1031  * @stream: input stream
1032  * 
1033  * Clears the pending flag on @stream.
1034  **/
1035 void
1036 g_input_stream_clear_pending (GInputStream *stream)
1037 {
1038   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1039   
1040   stream->priv->pending = FALSE;
1041 }
1042
1043 /**
1044  * g_input_stream_async_read_is_via_threads:
1045  * @stream: input stream
1046  *
1047  * Checks if an input stream's read_async function uses threads.
1048  *
1049  * Returns: %TRUE if @stream's read_async function uses threads.
1050  **/
1051 gboolean
1052 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1053 {
1054   GInputStreamClass *class;
1055
1056   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1057
1058   class = G_INPUT_STREAM_GET_CLASS (stream);
1059
1060   return (class->read_async == g_input_stream_real_read_async &&
1061       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1062         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1063 }
1064
1065 /********************************************
1066  *   Default implementation of async ops    *
1067  ********************************************/
1068
1069 typedef struct {
1070   void   *buffer;
1071   gsize   count;
1072 } ReadData;
1073
1074 static void
1075 free_read_data (ReadData *op)
1076 {
1077   g_slice_free (ReadData, op);
1078 }
1079
1080 static void
1081 read_async_thread (GTask        *task,
1082                    gpointer      source_object,
1083                    gpointer      task_data,
1084                    GCancellable *cancellable)
1085 {
1086   GInputStream *stream = source_object;
1087   ReadData *op = task_data;
1088   GInputStreamClass *class;
1089   GError *error = NULL;
1090   gssize nread;
1091  
1092   class = G_INPUT_STREAM_GET_CLASS (stream);
1093
1094   nread = class->read_fn (stream,
1095                           op->buffer, op->count,
1096                           g_task_get_cancellable (task),
1097                           &error);
1098   if (nread == -1)
1099     g_task_return_error (task, error);
1100   else
1101     g_task_return_int (task, nread);
1102 }
1103
1104 static void read_async_pollable (GPollableInputStream *stream,
1105                                  GTask                *task);
1106
1107 static gboolean
1108 read_async_pollable_ready (GPollableInputStream *stream,
1109                            gpointer              user_data)
1110 {
1111   GTask *task = user_data;
1112
1113   read_async_pollable (stream, task);
1114   return FALSE;
1115 }
1116
1117 static void
1118 read_async_pollable (GPollableInputStream *stream,
1119                      GTask                *task)
1120 {
1121   ReadData *op = g_task_get_task_data (task);
1122   GError *error = NULL;
1123   gssize nread;
1124
1125   if (g_task_return_error_if_cancelled (task))
1126     return;
1127
1128   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1129     read_nonblocking (stream, op->buffer, op->count, &error);
1130
1131   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1132     {
1133       GSource *source;
1134
1135       g_error_free (error);
1136
1137       source = g_pollable_input_stream_create_source (stream,
1138                                                       g_task_get_cancellable (task));
1139       g_task_attach_source (task, source,
1140                             (GSourceFunc) read_async_pollable_ready);
1141       g_source_unref (source);
1142       return;
1143     }
1144
1145   if (nread == -1)
1146     g_task_return_error (task, error);
1147   else
1148     g_task_return_int (task, nread);
1149   /* g_input_stream_real_read_async() unrefs task */
1150 }
1151
1152
1153 static void
1154 g_input_stream_real_read_async (GInputStream        *stream,
1155                                 void                *buffer,
1156                                 gsize                count,
1157                                 int                  io_priority,
1158                                 GCancellable        *cancellable,
1159                                 GAsyncReadyCallback  callback,
1160                                 gpointer             user_data)
1161 {
1162   GTask *task;
1163   ReadData *op;
1164   
1165   op = g_slice_new0 (ReadData);
1166   task = g_task_new (stream, cancellable, callback, user_data);
1167   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1168   g_task_set_priority (task, io_priority);
1169   op->buffer = buffer;
1170   op->count = count;
1171
1172   if (!g_input_stream_async_read_is_via_threads (stream))
1173     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1174   else
1175     g_task_run_in_thread (task, read_async_thread);
1176   g_object_unref (task);
1177 }
1178
1179 static gssize
1180 g_input_stream_real_read_finish (GInputStream  *stream,
1181                                  GAsyncResult  *result,
1182                                  GError       **error)
1183 {
1184   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1185
1186   return g_task_propagate_int (G_TASK (result), error);
1187 }
1188
1189
1190 static void
1191 skip_async_thread (GTask        *task,
1192                    gpointer      source_object,
1193                    gpointer      task_data,
1194                    GCancellable *cancellable)
1195 {
1196   GInputStream *stream = source_object;
1197   gsize count = GPOINTER_TO_SIZE (task_data);
1198   GInputStreamClass *class;
1199   GError *error = NULL;
1200   gssize ret;
1201
1202   class = G_INPUT_STREAM_GET_CLASS (stream);
1203   ret = class->skip (stream, count,
1204                      g_task_get_cancellable (task),
1205                      &error);
1206   if (ret == -1)
1207     g_task_return_error (task, error);
1208   else
1209     g_task_return_int (task, ret);
1210 }
1211
1212 typedef struct {
1213   char buffer[8192];
1214   gsize count;
1215   gsize count_skipped;
1216 } SkipFallbackAsyncData;
1217
1218 static void
1219 skip_callback_wrapper (GObject      *source_object,
1220                        GAsyncResult *res,
1221                        gpointer      user_data)
1222 {
1223   GInputStreamClass *class;
1224   GTask *task = user_data;
1225   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1226   GError *error = NULL;
1227   gssize ret;
1228
1229   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1230
1231   if (ret > 0)
1232     {
1233       data->count -= ret;
1234       data->count_skipped += ret;
1235
1236       if (data->count > 0)
1237         {
1238           class = G_INPUT_STREAM_GET_CLASS (source_object);
1239           class->read_async (G_INPUT_STREAM (source_object),
1240                              data->buffer, MIN (8192, data->count),
1241                              g_task_get_priority (task),
1242                              g_task_get_cancellable (task),
1243                              skip_callback_wrapper, task);
1244           return;
1245         }
1246     }
1247
1248   if (ret == -1 &&
1249       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1250       data->count_skipped)
1251     {
1252       /* No error, return partial read */
1253       g_clear_error (&error);
1254     }
1255
1256   if (error)
1257     g_task_return_error (task, error);
1258   else
1259     g_task_return_int (task, data->count_skipped);
1260   g_object_unref (task);
1261  }
1262
1263 static void
1264 g_input_stream_real_skip_async (GInputStream        *stream,
1265                                 gsize                count,
1266                                 int                  io_priority,
1267                                 GCancellable        *cancellable,
1268                                 GAsyncReadyCallback  callback,
1269                                 gpointer             user_data)
1270 {
1271   GInputStreamClass *class;
1272   SkipFallbackAsyncData *data;
1273   GTask *task;
1274
1275   class = G_INPUT_STREAM_GET_CLASS (stream);
1276
1277   task = g_task_new (stream, cancellable, callback, user_data);
1278   g_task_set_priority (task, io_priority);
1279
1280   if (g_input_stream_async_read_is_via_threads (stream))
1281     {
1282       /* Read is thread-using async fallback.
1283        * Make skip use threads too, so that we can use a possible sync skip
1284        * implementation. */
1285       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1286
1287       g_task_run_in_thread (task, skip_async_thread);
1288       g_object_unref (task);
1289     }
1290   else
1291     {
1292       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1293       
1294       /* There is a custom async read function, lets use that. */
1295       data = g_new (SkipFallbackAsyncData, 1);
1296       data->count = count;
1297       data->count_skipped = 0;
1298       g_task_set_task_data (task, data, g_free);
1299       g_task_set_check_cancellable (task, FALSE);
1300       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1301                          skip_callback_wrapper, task);
1302     }
1303
1304 }
1305
1306 static gssize
1307 g_input_stream_real_skip_finish (GInputStream  *stream,
1308                                  GAsyncResult  *result,
1309                                  GError       **error)
1310 {
1311   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1312
1313   return g_task_propagate_int (G_TASK (result), error);
1314 }
1315
1316 static void
1317 close_async_thread (GTask        *task,
1318                     gpointer      source_object,
1319                     gpointer      task_data,
1320                     GCancellable *cancellable)
1321 {
1322   GInputStream *stream = source_object;
1323   GInputStreamClass *class;
1324   GError *error = NULL;
1325   gboolean result;
1326
1327   class = G_INPUT_STREAM_GET_CLASS (stream);
1328   if (class->close_fn)
1329     {
1330       result = class->close_fn (stream,
1331                                 g_task_get_cancellable (task),
1332                                 &error);
1333       if (!result)
1334         {
1335           g_task_return_error (task, error);
1336           return;
1337         }
1338     }
1339
1340   g_task_return_boolean (task, TRUE);
1341 }
1342
1343 static void
1344 g_input_stream_real_close_async (GInputStream        *stream,
1345                                  int                  io_priority,
1346                                  GCancellable        *cancellable,
1347                                  GAsyncReadyCallback  callback,
1348                                  gpointer             user_data)
1349 {
1350   GTask *task;
1351
1352   task = g_task_new (stream, cancellable, callback, user_data);
1353   g_task_set_check_cancellable (task, FALSE);
1354   g_task_set_priority (task, io_priority);
1355   
1356   g_task_run_in_thread (task, close_async_thread);
1357   g_object_unref (task);
1358 }
1359
1360 static gboolean
1361 g_input_stream_real_close_finish (GInputStream  *stream,
1362                                   GAsyncResult  *result,
1363                                   GError       **error)
1364 {
1365   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1366
1367   return g_task_propagate_boolean (G_TASK (result), error);
1368 }