gio: Use the new private instance data declaration
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gcancellable.h"
30 #include "gasyncresult.h"
31 #include "gioerror.h"
32 #include "gpollableinputstream.h"
33
34 /**
35  * SECTION:ginputstream
36  * @short_description: Base class for implementing streaming input
37  * @include: gio/gio.h
38  *
39  * #GInputStream has functions to read from a stream (g_input_stream_read()),
40  * to close a stream (g_input_stream_close()) and to skip some content
41  * (g_input_stream_skip()). 
42  *
43  * To copy the content of an input stream to an output stream without 
44  * manually handling the reads and writes, use g_output_stream_splice(). 
45  *
46  * All of these functions have async variants too.
47  **/
48
49 struct _GInputStreamPrivate {
50   guint closed : 1;
51   guint pending : 1;
52   GAsyncReadyCallback outstanding_callback;
53 };
54
55 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
56
57 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
58                                                   gsize                 count,
59                                                   GCancellable         *cancellable,
60                                                   GError              **error);
61 static void     g_input_stream_real_read_async   (GInputStream         *stream,
62                                                   void                 *buffer,
63                                                   gsize                 count,
64                                                   int                   io_priority,
65                                                   GCancellable         *cancellable,
66                                                   GAsyncReadyCallback   callback,
67                                                   gpointer              user_data);
68 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
69                                                   GAsyncResult         *result,
70                                                   GError              **error);
71 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
72                                                   gsize                 count,
73                                                   int                   io_priority,
74                                                   GCancellable         *cancellable,
75                                                   GAsyncReadyCallback   callback,
76                                                   gpointer              data);
77 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
78                                                   GAsyncResult         *result,
79                                                   GError              **error);
80 static void     g_input_stream_real_close_async  (GInputStream         *stream,
81                                                   int                   io_priority,
82                                                   GCancellable         *cancellable,
83                                                   GAsyncReadyCallback   callback,
84                                                   gpointer              data);
85 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
86                                                   GAsyncResult         *result,
87                                                   GError              **error);
88
89 static void
90 g_input_stream_dispose (GObject *object)
91 {
92   GInputStream *stream;
93
94   stream = G_INPUT_STREAM (object);
95   
96   if (!stream->priv->closed)
97     g_input_stream_close (stream, NULL, NULL);
98
99   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
100 }
101
102
103 static void
104 g_input_stream_class_init (GInputStreamClass *klass)
105 {
106   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
107   
108   gobject_class->dispose = g_input_stream_dispose;
109   
110   klass->skip = g_input_stream_real_skip;
111   klass->read_async = g_input_stream_real_read_async;
112   klass->read_finish = g_input_stream_real_read_finish;
113   klass->skip_async = g_input_stream_real_skip_async;
114   klass->skip_finish = g_input_stream_real_skip_finish;
115   klass->close_async = g_input_stream_real_close_async;
116   klass->close_finish = g_input_stream_real_close_finish;
117 }
118
119 static void
120 g_input_stream_init (GInputStream *stream)
121 {
122   stream->priv = g_input_stream_get_private (stream);
123 }
124
125 /**
126  * g_input_stream_read:
127  * @stream: a #GInputStream.
128  * @buffer: (array length=count) (element-type guint8): a buffer to
129  *     read data into (which should be at least count bytes long).
130  * @count: the number of bytes that will be read from the stream
131  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
132  * @error: location to store the error occurring, or %NULL to ignore
133  *
134  * Tries to read @count bytes from the stream into the buffer starting at
135  * @buffer. Will block during this read.
136  * 
137  * If count is zero returns zero and does nothing. A value of @count
138  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
139  *
140  * On success, the number of bytes read into the buffer is returned.
141  * It is not an error if this is not the same as the requested size, as it
142  * can happen e.g. near the end of a file. Zero is returned on end of file
143  * (or if @count is zero),  but never otherwise.
144  *
145  * If @cancellable is not %NULL, then the operation can be cancelled by
146  * triggering the cancellable object from another thread. If the operation
147  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
148  * operation was partially finished when the operation was cancelled the
149  * partial result will be returned, without an error.
150  *
151  * On error -1 is returned and @error is set accordingly.
152  * 
153  * Return value: Number of bytes read, or -1 on error, or 0 on end of file.
154  **/
155 gssize
156 g_input_stream_read  (GInputStream  *stream,
157                       void          *buffer,
158                       gsize          count,
159                       GCancellable  *cancellable,
160                       GError       **error)
161 {
162   GInputStreamClass *class;
163   gssize res;
164
165   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
166   g_return_val_if_fail (buffer != NULL, 0);
167
168   if (count == 0)
169     return 0;
170   
171   if (((gssize) count) < 0)
172     {
173       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
174                    _("Too large count value passed to %s"), G_STRFUNC);
175       return -1;
176     }
177
178   class = G_INPUT_STREAM_GET_CLASS (stream);
179
180   if (class->read_fn == NULL) 
181     {
182       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
183                            _("Input stream doesn't implement read"));
184       return -1;
185     }
186
187   if (!g_input_stream_set_pending (stream, error))
188     return -1;
189
190   if (cancellable)
191     g_cancellable_push_current (cancellable);
192   
193   res = class->read_fn (stream, buffer, count, cancellable, error);
194
195   if (cancellable)
196     g_cancellable_pop_current (cancellable);
197   
198   g_input_stream_clear_pending (stream);
199
200   return res;
201 }
202
203 /**
204  * g_input_stream_read_all:
205  * @stream: a #GInputStream.
206  * @buffer: (array length=count) (element-type guint8): a buffer to
207  *     read data into (which should be at least count bytes long).
208  * @count: the number of bytes that will be read from the stream
209  * @bytes_read: (out): location to store the number of bytes that was read from the stream
210  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
211  * @error: location to store the error occurring, or %NULL to ignore
212  *
213  * Tries to read @count bytes from the stream into the buffer starting at
214  * @buffer. Will block during this read.
215  *
216  * This function is similar to g_input_stream_read(), except it tries to
217  * read as many bytes as requested, only stopping on an error or end of stream.
218  *
219  * On a successful read of @count bytes, or if we reached the end of the
220  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
221  * read into @buffer.
222  * 
223  * If there is an error during the operation %FALSE is returned and @error
224  * is set to indicate the error status, @bytes_read is updated to contain
225  * the number of bytes read into @buffer before the error occurred.
226  *
227  * Return value: %TRUE on success, %FALSE if there was an error
228  **/
229 gboolean
230 g_input_stream_read_all (GInputStream  *stream,
231                          void          *buffer,
232                          gsize          count,
233                          gsize         *bytes_read,
234                          GCancellable  *cancellable,
235                          GError       **error)
236 {
237   gsize _bytes_read;
238   gssize res;
239
240   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
241   g_return_val_if_fail (buffer != NULL, FALSE);
242
243   _bytes_read = 0;
244   while (_bytes_read < count)
245     {
246       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
247                                  cancellable, error);
248       if (res == -1)
249         {
250           if (bytes_read)
251             *bytes_read = _bytes_read;
252           return FALSE;
253         }
254       
255       if (res == 0)
256         break;
257
258       _bytes_read += res;
259     }
260
261   if (bytes_read)
262     *bytes_read = _bytes_read;
263   return TRUE;
264 }
265
266 /**
267  * g_input_stream_read_bytes:
268  * @stream: a #GInputStream.
269  * @count: maximum number of bytes that will be read from the stream. Common
270  * values include 4096 and 8192.
271  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
272  * @error: location to store the error occurring, or %NULL to ignore
273  *
274  * Like g_input_stream_read(), this tries to read @count bytes from
275  * the stream in a blocking fashion. However, rather than reading into
276  * a user-supplied buffer, this will create a new #GBytes containing
277  * the data that was read. This may be easier to use from language
278  * bindings.
279  *
280  * If count is zero, returns a zero-length #GBytes and does nothing. A
281  * value of @count larger than %G_MAXSSIZE will cause a
282  * %G_IO_ERROR_INVALID_ARGUMENT error.
283  *
284  * On success, a new #GBytes is returned. It is not an error if the
285  * size of this object is not the same as the requested size, as it
286  * can happen e.g. near the end of a file. A zero-length #GBytes is
287  * returned on end of file (or if @count is zero), but never
288  * otherwise.
289  *
290  * If @cancellable is not %NULL, then the operation can be cancelled by
291  * triggering the cancellable object from another thread. If the operation
292  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
293  * operation was partially finished when the operation was cancelled the
294  * partial result will be returned, without an error.
295  *
296  * On error %NULL is returned and @error is set accordingly.
297  *
298  * Return value: a new #GBytes, or %NULL on error
299  **/
300 GBytes *
301 g_input_stream_read_bytes (GInputStream  *stream,
302                            gsize          count,
303                            GCancellable  *cancellable,
304                            GError       **error)
305 {
306   guchar *buf;
307   gssize nread;
308
309   buf = g_malloc (count);
310   nread = g_input_stream_read (stream, buf, count, cancellable, error);
311   if (nread == -1)
312     {
313       g_free (buf);
314       return NULL;
315     }
316   else if (nread == 0)
317     {
318       g_free (buf);
319       return g_bytes_new_static ("", 0);
320     }
321   else
322     return g_bytes_new_take (buf, nread);
323 }
324
325 /**
326  * g_input_stream_skip:
327  * @stream: a #GInputStream.
328  * @count: the number of bytes that will be skipped from the stream
329  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
330  * @error: location to store the error occurring, or %NULL to ignore
331  *
332  * Tries to skip @count bytes from the stream. Will block during the operation.
333  *
334  * This is identical to g_input_stream_read(), from a behaviour standpoint,
335  * but the bytes that are skipped are not returned to the user. Some
336  * streams have an implementation that is more efficient than reading the data.
337  *
338  * This function is optional for inherited classes, as the default implementation
339  * emulates it using read.
340  *
341  * If @cancellable is not %NULL, then the operation can be cancelled by
342  * triggering the cancellable object from another thread. If the operation
343  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
344  * operation was partially finished when the operation was cancelled the
345  * partial result will be returned, without an error.
346  *
347  * Return value: Number of bytes skipped, or -1 on error
348  **/
349 gssize
350 g_input_stream_skip (GInputStream  *stream,
351                      gsize          count,
352                      GCancellable  *cancellable,
353                      GError       **error)
354 {
355   GInputStreamClass *class;
356   gssize res;
357
358   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
359
360   if (count == 0)
361     return 0;
362
363   if (((gssize) count) < 0)
364     {
365       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
366                    _("Too large count value passed to %s"), G_STRFUNC);
367       return -1;
368     }
369   
370   class = G_INPUT_STREAM_GET_CLASS (stream);
371
372   if (!g_input_stream_set_pending (stream, error))
373     return -1;
374
375   if (cancellable)
376     g_cancellable_push_current (cancellable);
377   
378   res = class->skip (stream, count, cancellable, error);
379
380   if (cancellable)
381     g_cancellable_pop_current (cancellable);
382   
383   g_input_stream_clear_pending (stream);
384
385   return res;
386 }
387
388 static gssize
389 g_input_stream_real_skip (GInputStream  *stream,
390                           gsize          count,
391                           GCancellable  *cancellable,
392                           GError       **error)
393 {
394   GInputStreamClass *class;
395   gssize ret, read_bytes;
396   char buffer[8192];
397   GError *my_error;
398
399   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
400     {
401       if (g_seekable_seek (G_SEEKABLE (stream),
402                            count,
403                            G_SEEK_CUR,
404                            cancellable,
405                            NULL))
406         return count;
407     }
408
409   /* If not seekable, or seek failed, fall back to reading data: */
410
411   class = G_INPUT_STREAM_GET_CLASS (stream);
412
413   read_bytes = 0;
414   while (1)
415     {
416       my_error = NULL;
417
418       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
419                             cancellable, &my_error);
420       if (ret == -1)
421         {
422           if (read_bytes > 0 &&
423               my_error->domain == G_IO_ERROR &&
424               my_error->code == G_IO_ERROR_CANCELLED)
425             {
426               g_error_free (my_error);
427               return read_bytes;
428             }
429
430           g_propagate_error (error, my_error);
431           return -1;
432         }
433
434       count -= ret;
435       read_bytes += ret;
436
437       if (ret == 0 || count == 0)
438         return read_bytes;
439     }
440 }
441
442 /**
443  * g_input_stream_close:
444  * @stream: A #GInputStream.
445  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
446  * @error: location to store the error occurring, or %NULL to ignore
447  *
448  * Closes the stream, releasing resources related to it.
449  *
450  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
451  * Closing a stream multiple times will not return an error.
452  *
453  * Streams will be automatically closed when the last reference
454  * is dropped, but you might want to call this function to make sure 
455  * resources are released as early as possible.
456  *
457  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
458  * open after the stream is closed. See the documentation for the individual
459  * stream for details.
460  *
461  * On failure the first error that happened will be reported, but the close
462  * operation will finish as much as possible. A stream that failed to
463  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
464  * is important to check and report the error to the user.
465  *
466  * If @cancellable is not %NULL, then the operation can be cancelled by
467  * triggering the cancellable object from another thread. If the operation
468  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
469  * Cancelling a close will still leave the stream closed, but some streams
470  * can use a faster close that doesn't block to e.g. check errors. 
471  *
472  * Return value: %TRUE on success, %FALSE on failure
473  **/
474 gboolean
475 g_input_stream_close (GInputStream  *stream,
476                       GCancellable  *cancellable,
477                       GError       **error)
478 {
479   GInputStreamClass *class;
480   gboolean res;
481
482   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
483
484   class = G_INPUT_STREAM_GET_CLASS (stream);
485
486   if (stream->priv->closed)
487     return TRUE;
488
489   res = TRUE;
490
491   if (!g_input_stream_set_pending (stream, error))
492     return FALSE;
493
494   if (cancellable)
495     g_cancellable_push_current (cancellable);
496
497   if (class->close_fn)
498     res = class->close_fn (stream, cancellable, error);
499
500   if (cancellable)
501     g_cancellable_pop_current (cancellable);
502
503   g_input_stream_clear_pending (stream);
504   
505   stream->priv->closed = TRUE;
506   
507   return res;
508 }
509
510 static void
511 async_ready_callback_wrapper (GObject      *source_object,
512                               GAsyncResult *res,
513                               gpointer      user_data)
514 {
515   GInputStream *stream = G_INPUT_STREAM (source_object);
516
517   g_input_stream_clear_pending (stream);
518   if (stream->priv->outstanding_callback)
519     (*stream->priv->outstanding_callback) (source_object, res, user_data);
520   g_object_unref (stream);
521 }
522
523 static void
524 async_ready_close_callback_wrapper (GObject      *source_object,
525                                     GAsyncResult *res,
526                                     gpointer      user_data)
527 {
528   GInputStream *stream = G_INPUT_STREAM (source_object);
529
530   g_input_stream_clear_pending (stream);
531   stream->priv->closed = TRUE;
532   if (stream->priv->outstanding_callback)
533     (*stream->priv->outstanding_callback) (source_object, res, user_data);
534   g_object_unref (stream);
535 }
536
537 /**
538  * g_input_stream_read_async:
539  * @stream: A #GInputStream.
540  * @buffer: (array length=count) (element-type guint8): a buffer to
541  *     read data into (which should be at least count bytes long).
542  * @count: the number of bytes that will be read from the stream
543  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
544  * of the request. 
545  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
546  * @callback: (scope async): callback to call when the request is satisfied
547  * @user_data: (closure): the data to pass to callback function
548  *
549  * Request an asynchronous read of @count bytes from the stream into the buffer
550  * starting at @buffer. When the operation is finished @callback will be called. 
551  * You can then call g_input_stream_read_finish() to get the result of the 
552  * operation.
553  *
554  * During an async request no other sync and async calls are allowed on @stream, and will
555  * result in %G_IO_ERROR_PENDING errors. 
556  *
557  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
558  *
559  * On success, the number of bytes read into the buffer will be passed to the
560  * callback. It is not an error if this is not the same as the requested size, as it
561  * can happen e.g. near the end of a file, but generally we try to read
562  * as many bytes as requested. Zero is returned on end of file
563  * (or if @count is zero),  but never otherwise.
564  *
565  * Any outstanding i/o request with higher priority (lower numerical value) will
566  * be executed before an outstanding request with lower priority. Default
567  * priority is %G_PRIORITY_DEFAULT.
568  *
569  * The asyncronous methods have a default fallback that uses threads to implement
570  * asynchronicity, so they are optional for inheriting classes. However, if you
571  * override one you must override all.
572  **/
573 void
574 g_input_stream_read_async (GInputStream        *stream,
575                            void                *buffer,
576                            gsize                count,
577                            int                  io_priority,
578                            GCancellable        *cancellable,
579                            GAsyncReadyCallback  callback,
580                            gpointer             user_data)
581 {
582   GInputStreamClass *class;
583   GError *error = NULL;
584
585   g_return_if_fail (G_IS_INPUT_STREAM (stream));
586   g_return_if_fail (buffer != NULL);
587
588   if (count == 0)
589     {
590       GTask *task;
591
592       task = g_task_new (stream, cancellable, callback, user_data);
593       g_task_set_source_tag (task, g_input_stream_read_async);
594       g_task_return_int (task, 0);
595       g_object_unref (task);
596       return;
597     }
598   
599   if (((gssize) count) < 0)
600     {
601       g_task_report_new_error (stream, callback, user_data,
602                                g_input_stream_read_async,
603                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
604                                _("Too large count value passed to %s"),
605                                G_STRFUNC);
606       return;
607     }
608
609   if (!g_input_stream_set_pending (stream, &error))
610     {
611       g_task_report_error (stream, callback, user_data,
612                            g_input_stream_read_async,
613                            error);
614       return;
615     }
616
617   class = G_INPUT_STREAM_GET_CLASS (stream);
618   stream->priv->outstanding_callback = callback;
619   g_object_ref (stream);
620   class->read_async (stream, buffer, count, io_priority, cancellable,
621                      async_ready_callback_wrapper, user_data);
622 }
623
624 /**
625  * g_input_stream_read_finish:
626  * @stream: a #GInputStream.
627  * @result: a #GAsyncResult.
628  * @error: a #GError location to store the error occurring, or %NULL to 
629  * ignore.
630  * 
631  * Finishes an asynchronous stream read operation. 
632  * 
633  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
634  **/
635 gssize
636 g_input_stream_read_finish (GInputStream  *stream,
637                             GAsyncResult  *result,
638                             GError       **error)
639 {
640   GInputStreamClass *class;
641   
642   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
643   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
644
645   if (g_async_result_legacy_propagate_error (result, error))
646     return -1;
647   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
648     return g_task_propagate_int (G_TASK (result), error);
649
650   class = G_INPUT_STREAM_GET_CLASS (stream);
651   return class->read_finish (stream, result, error);
652 }
653
654 static void
655 read_bytes_callback (GObject      *stream,
656                      GAsyncResult *result,
657                      gpointer      user_data)
658 {
659   GTask *task = user_data;
660   guchar *buf = g_task_get_task_data (task);
661   GError *error = NULL;
662   gssize nread;
663   GBytes *bytes = NULL;
664
665   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
666                                       result, &error);
667   if (nread == -1)
668     {
669       g_free (buf);
670       g_task_return_error (task, error);
671     }
672   else if (nread == 0)
673     {
674       g_free (buf);
675       bytes = g_bytes_new_static ("", 0);
676     }
677   else
678     bytes = g_bytes_new_take (buf, nread);
679
680   if (bytes)
681     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
682
683   g_object_unref (task);
684 }
685
686 /**
687  * g_input_stream_read_bytes_async:
688  * @stream: A #GInputStream.
689  * @count: the number of bytes that will be read from the stream
690  * @io_priority: the <link linkend="io-priority">I/O priority</link>
691  *   of the request.
692  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
693  * @callback: (scope async): callback to call when the request is satisfied
694  * @user_data: (closure): the data to pass to callback function
695  *
696  * Request an asynchronous read of @count bytes from the stream into a
697  * new #GBytes. When the operation is finished @callback will be
698  * called. You can then call g_input_stream_read_bytes_finish() to get the
699  * result of the operation.
700  *
701  * During an async request no other sync and async calls are allowed
702  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
703  *
704  * A value of @count larger than %G_MAXSSIZE will cause a
705  * %G_IO_ERROR_INVALID_ARGUMENT error.
706  *
707  * On success, the new #GBytes will be passed to the callback. It is
708  * not an error if this is smaller than the requested size, as it can
709  * happen e.g. near the end of a file, but generally we try to read as
710  * many bytes as requested. Zero is returned on end of file (or if
711  * @count is zero), but never otherwise.
712  *
713  * Any outstanding I/O request with higher priority (lower numerical
714  * value) will be executed before an outstanding request with lower
715  * priority. Default priority is %G_PRIORITY_DEFAULT.
716  **/
717 void
718 g_input_stream_read_bytes_async (GInputStream          *stream,
719                                  gsize                  count,
720                                  int                    io_priority,
721                                  GCancellable          *cancellable,
722                                  GAsyncReadyCallback    callback,
723                                  gpointer               user_data)
724 {
725   GTask *task;
726   guchar *buf;
727
728   task = g_task_new (stream, cancellable, callback, user_data);
729   buf = g_malloc (count);
730   g_task_set_task_data (task, buf, NULL);
731
732   g_input_stream_read_async (stream, buf, count,
733                              io_priority, cancellable,
734                              read_bytes_callback, task);
735 }
736
737 /**
738  * g_input_stream_read_bytes_finish:
739  * @stream: a #GInputStream.
740  * @result: a #GAsyncResult.
741  * @error: a #GError location to store the error occurring, or %NULL to
742  *   ignore.
743  *
744  * Finishes an asynchronous stream read-into-#GBytes operation.
745  *
746  * Returns: the newly-allocated #GBytes, or %NULL on error
747  **/
748 GBytes *
749 g_input_stream_read_bytes_finish (GInputStream  *stream,
750                                   GAsyncResult  *result,
751                                   GError       **error)
752 {
753   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
754   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
755
756   return g_task_propagate_pointer (G_TASK (result), error);
757 }
758
759 /**
760  * g_input_stream_skip_async:
761  * @stream: A #GInputStream.
762  * @count: the number of bytes that will be skipped from the stream
763  * @io_priority: the <link linkend="io-priority">I/O priority</link>
764  * of the request.
765  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
766  * @callback: (scope async): callback to call when the request is satisfied
767  * @user_data: (closure): the data to pass to callback function
768  *
769  * Request an asynchronous skip of @count bytes from the stream.
770  * When the operation is finished @callback will be called.
771  * You can then call g_input_stream_skip_finish() to get the result
772  * of the operation.
773  *
774  * During an async request no other sync and async calls are allowed,
775  * and will result in %G_IO_ERROR_PENDING errors.
776  *
777  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
778  *
779  * On success, the number of bytes skipped will be passed to the callback.
780  * It is not an error if this is not the same as the requested size, as it
781  * can happen e.g. near the end of a file, but generally we try to skip
782  * as many bytes as requested. Zero is returned on end of file
783  * (or if @count is zero), but never otherwise.
784  *
785  * Any outstanding i/o request with higher priority (lower numerical value)
786  * will be executed before an outstanding request with lower priority.
787  * Default priority is %G_PRIORITY_DEFAULT.
788  *
789  * The asynchronous methods have a default fallback that uses threads to
790  * implement asynchronicity, so they are optional for inheriting classes.
791  * However, if you override one, you must override all.
792  **/
793 void
794 g_input_stream_skip_async (GInputStream        *stream,
795                            gsize                count,
796                            int                  io_priority,
797                            GCancellable        *cancellable,
798                            GAsyncReadyCallback  callback,
799                            gpointer             user_data)
800 {
801   GInputStreamClass *class;
802   GError *error = NULL;
803
804   g_return_if_fail (G_IS_INPUT_STREAM (stream));
805
806   if (count == 0)
807     {
808       GTask *task;
809
810       task = g_task_new (stream, cancellable, callback, user_data);
811       g_task_set_source_tag (task, g_input_stream_skip_async);
812       g_task_return_int (task, 0);
813       g_object_unref (task);
814       return;
815     }
816   
817   if (((gssize) count) < 0)
818     {
819       g_task_report_new_error (stream, callback, user_data,
820                                g_input_stream_skip_async,
821                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
822                                _("Too large count value passed to %s"),
823                                G_STRFUNC);
824       return;
825     }
826
827   if (!g_input_stream_set_pending (stream, &error))
828     {
829       g_task_report_error (stream, callback, user_data,
830                            g_input_stream_skip_async,
831                            error);
832       return;
833     }
834
835   class = G_INPUT_STREAM_GET_CLASS (stream);
836   stream->priv->outstanding_callback = callback;
837   g_object_ref (stream);
838   class->skip_async (stream, count, io_priority, cancellable,
839                      async_ready_callback_wrapper, user_data);
840 }
841
842 /**
843  * g_input_stream_skip_finish:
844  * @stream: a #GInputStream.
845  * @result: a #GAsyncResult.
846  * @error: a #GError location to store the error occurring, or %NULL to 
847  * ignore.
848  * 
849  * Finishes a stream skip operation.
850  * 
851  * Returns: the size of the bytes skipped, or %-1 on error.
852  **/
853 gssize
854 g_input_stream_skip_finish (GInputStream  *stream,
855                             GAsyncResult  *result,
856                             GError       **error)
857 {
858   GInputStreamClass *class;
859
860   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
861   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
862
863   if (g_async_result_legacy_propagate_error (result, error))
864     return -1;
865   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
866     return g_task_propagate_int (G_TASK (result), error);
867
868   class = G_INPUT_STREAM_GET_CLASS (stream);
869   return class->skip_finish (stream, result, error);
870 }
871
872 /**
873  * g_input_stream_close_async:
874  * @stream: A #GInputStream.
875  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
876  * of the request. 
877  * @cancellable: (allow-none): optional cancellable object
878  * @callback: (scope async): callback to call when the request is satisfied
879  * @user_data: (closure): the data to pass to callback function
880  *
881  * Requests an asynchronous closes of the stream, releasing resources related to it.
882  * When the operation is finished @callback will be called. 
883  * You can then call g_input_stream_close_finish() to get the result of the 
884  * operation.
885  *
886  * For behaviour details see g_input_stream_close().
887  *
888  * The asyncronous methods have a default fallback that uses threads to implement
889  * asynchronicity, so they are optional for inheriting classes. However, if you
890  * override one you must override all.
891  **/
892 void
893 g_input_stream_close_async (GInputStream        *stream,
894                             int                  io_priority,
895                             GCancellable        *cancellable,
896                             GAsyncReadyCallback  callback,
897                             gpointer             user_data)
898 {
899   GInputStreamClass *class;
900   GError *error = NULL;
901
902   g_return_if_fail (G_IS_INPUT_STREAM (stream));
903
904   if (stream->priv->closed)
905     {
906       GTask *task;
907
908       task = g_task_new (stream, cancellable, callback, user_data);
909       g_task_set_source_tag (task, g_input_stream_close_async);
910       g_task_return_boolean (task, TRUE);
911       g_object_unref (task);
912       return;
913     }
914
915   if (!g_input_stream_set_pending (stream, &error))
916     {
917       g_task_report_error (stream, callback, user_data,
918                            g_input_stream_close_async,
919                            error);
920       return;
921     }
922   
923   class = G_INPUT_STREAM_GET_CLASS (stream);
924   stream->priv->outstanding_callback = callback;
925   g_object_ref (stream);
926   class->close_async (stream, io_priority, cancellable,
927                       async_ready_close_callback_wrapper, user_data);
928 }
929
930 /**
931  * g_input_stream_close_finish:
932  * @stream: a #GInputStream.
933  * @result: a #GAsyncResult.
934  * @error: a #GError location to store the error occurring, or %NULL to 
935  * ignore.
936  * 
937  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
938  * 
939  * Returns: %TRUE if the stream was closed successfully.
940  **/
941 gboolean
942 g_input_stream_close_finish (GInputStream  *stream,
943                              GAsyncResult  *result,
944                              GError       **error)
945 {
946   GInputStreamClass *class;
947
948   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
949   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
950
951   if (g_async_result_legacy_propagate_error (result, error))
952     return FALSE;
953   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
954     return g_task_propagate_boolean (G_TASK (result), error);
955
956   class = G_INPUT_STREAM_GET_CLASS (stream);
957   return class->close_finish (stream, result, error);
958 }
959
960 /**
961  * g_input_stream_is_closed:
962  * @stream: input stream.
963  * 
964  * Checks if an input stream is closed.
965  * 
966  * Returns: %TRUE if the stream is closed.
967  **/
968 gboolean
969 g_input_stream_is_closed (GInputStream *stream)
970 {
971   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
972   
973   return stream->priv->closed;
974 }
975  
976 /**
977  * g_input_stream_has_pending:
978  * @stream: input stream.
979  * 
980  * Checks if an input stream has pending actions.
981  * 
982  * Returns: %TRUE if @stream has pending actions.
983  **/  
984 gboolean
985 g_input_stream_has_pending (GInputStream *stream)
986 {
987   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
988   
989   return stream->priv->pending;
990 }
991
992 /**
993  * g_input_stream_set_pending:
994  * @stream: input stream
995  * @error: a #GError location to store the error occurring, or %NULL to 
996  * ignore.
997  * 
998  * Sets @stream to have actions pending. If the pending flag is
999  * already set or @stream is closed, it will return %FALSE and set
1000  * @error.
1001  *
1002  * Return value: %TRUE if pending was previously unset and is now set.
1003  **/
1004 gboolean
1005 g_input_stream_set_pending (GInputStream *stream, GError **error)
1006 {
1007   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1008   
1009   if (stream->priv->closed)
1010     {
1011       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1012                            _("Stream is already closed"));
1013       return FALSE;
1014     }
1015   
1016   if (stream->priv->pending)
1017     {
1018       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1019                 /* Translators: This is an error you get if there is already an
1020                  * operation running against this stream when you try to start
1021                  * one */
1022                  _("Stream has outstanding operation"));
1023       return FALSE;
1024     }
1025   
1026   stream->priv->pending = TRUE;
1027   return TRUE;
1028 }
1029
1030 /**
1031  * g_input_stream_clear_pending:
1032  * @stream: input stream
1033  * 
1034  * Clears the pending flag on @stream.
1035  **/
1036 void
1037 g_input_stream_clear_pending (GInputStream *stream)
1038 {
1039   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1040   
1041   stream->priv->pending = FALSE;
1042 }
1043
1044 /********************************************
1045  *   Default implementation of async ops    *
1046  ********************************************/
1047
1048 typedef struct {
1049   void   *buffer;
1050   gsize   count;
1051 } ReadData;
1052
1053 static void
1054 free_read_data (ReadData *op)
1055 {
1056   g_slice_free (ReadData, op);
1057 }
1058
1059 static void
1060 read_async_thread (GTask        *task,
1061                    gpointer      source_object,
1062                    gpointer      task_data,
1063                    GCancellable *cancellable)
1064 {
1065   GInputStream *stream = source_object;
1066   ReadData *op = task_data;
1067   GInputStreamClass *class;
1068   GError *error = NULL;
1069   gssize nread;
1070  
1071   class = G_INPUT_STREAM_GET_CLASS (stream);
1072
1073   nread = class->read_fn (stream,
1074                           op->buffer, op->count,
1075                           g_task_get_cancellable (task),
1076                           &error);
1077   if (nread == -1)
1078     g_task_return_error (task, error);
1079   else
1080     g_task_return_int (task, nread);
1081 }
1082
1083 static void read_async_pollable (GPollableInputStream *stream,
1084                                  GTask                *task);
1085
1086 static gboolean
1087 read_async_pollable_ready (GPollableInputStream *stream,
1088                            gpointer              user_data)
1089 {
1090   GTask *task = user_data;
1091
1092   read_async_pollable (stream, task);
1093   return FALSE;
1094 }
1095
1096 static void
1097 read_async_pollable (GPollableInputStream *stream,
1098                      GTask                *task)
1099 {
1100   ReadData *op = g_task_get_task_data (task);
1101   GError *error = NULL;
1102   gssize nread;
1103
1104   if (g_task_return_error_if_cancelled (task))
1105     return;
1106
1107   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1108     read_nonblocking (stream, op->buffer, op->count, &error);
1109
1110   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1111     {
1112       GSource *source;
1113
1114       g_error_free (error);
1115
1116       source = g_pollable_input_stream_create_source (stream,
1117                                                       g_task_get_cancellable (task));
1118       g_task_attach_source (task, source,
1119                             (GSourceFunc) read_async_pollable_ready);
1120       g_source_unref (source);
1121       return;
1122     }
1123
1124   if (nread == -1)
1125     g_task_return_error (task, error);
1126   else
1127     g_task_return_int (task, nread);
1128   /* g_input_stream_real_read_async() unrefs task */
1129 }
1130
1131 #define CAN_DO_NONBLOCKING_READS(stream) \
1132   (G_IS_POLLABLE_INPUT_STREAM (stream) && \
1133    g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
1134
1135
1136 static void
1137 g_input_stream_real_read_async (GInputStream        *stream,
1138                                 void                *buffer,
1139                                 gsize                count,
1140                                 int                  io_priority,
1141                                 GCancellable        *cancellable,
1142                                 GAsyncReadyCallback  callback,
1143                                 gpointer             user_data)
1144 {
1145   GTask *task;
1146   ReadData *op;
1147   
1148   op = g_slice_new0 (ReadData);
1149   task = g_task_new (stream, cancellable, callback, user_data);
1150   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1151   g_task_set_priority (task, io_priority);
1152   op->buffer = buffer;
1153   op->count = count;
1154
1155   if (CAN_DO_NONBLOCKING_READS (stream))
1156     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1157   else
1158     g_task_run_in_thread (task, read_async_thread);
1159   g_object_unref (task);
1160 }
1161
1162 static gssize
1163 g_input_stream_real_read_finish (GInputStream  *stream,
1164                                  GAsyncResult  *result,
1165                                  GError       **error)
1166 {
1167   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1168
1169   return g_task_propagate_int (G_TASK (result), error);
1170 }
1171
1172
1173 static void
1174 skip_async_thread (GTask        *task,
1175                    gpointer      source_object,
1176                    gpointer      task_data,
1177                    GCancellable *cancellable)
1178 {
1179   GInputStream *stream = source_object;
1180   gsize count = GPOINTER_TO_SIZE (task_data);
1181   GInputStreamClass *class;
1182   GError *error = NULL;
1183   gssize ret;
1184
1185   class = G_INPUT_STREAM_GET_CLASS (stream);
1186   ret = class->skip (stream, count,
1187                      g_task_get_cancellable (task),
1188                      &error);
1189   if (ret == -1)
1190     g_task_return_error (task, error);
1191   else
1192     g_task_return_int (task, ret);
1193 }
1194
1195 typedef struct {
1196   char buffer[8192];
1197   gsize count;
1198   gsize count_skipped;
1199 } SkipFallbackAsyncData;
1200
1201 static void
1202 skip_callback_wrapper (GObject      *source_object,
1203                        GAsyncResult *res,
1204                        gpointer      user_data)
1205 {
1206   GInputStreamClass *class;
1207   GTask *task = user_data;
1208   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1209   GError *error = NULL;
1210   gssize ret;
1211
1212   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1213
1214   if (ret > 0)
1215     {
1216       data->count -= ret;
1217       data->count_skipped += ret;
1218
1219       if (data->count > 0)
1220         {
1221           class = G_INPUT_STREAM_GET_CLASS (source_object);
1222           class->read_async (G_INPUT_STREAM (source_object),
1223                              data->buffer, MIN (8192, data->count),
1224                              g_task_get_priority (task),
1225                              g_task_get_cancellable (task),
1226                              skip_callback_wrapper, task);
1227           return;
1228         }
1229     }
1230
1231   if (ret == -1 &&
1232       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1233       data->count_skipped)
1234     {
1235       /* No error, return partial read */
1236       g_clear_error (&error);
1237     }
1238
1239   if (error)
1240     g_task_return_error (task, error);
1241   else
1242     g_task_return_int (task, data->count_skipped);
1243   g_object_unref (task);
1244  }
1245
1246 static void
1247 g_input_stream_real_skip_async (GInputStream        *stream,
1248                                 gsize                count,
1249                                 int                  io_priority,
1250                                 GCancellable        *cancellable,
1251                                 GAsyncReadyCallback  callback,
1252                                 gpointer             user_data)
1253 {
1254   GInputStreamClass *class;
1255   SkipFallbackAsyncData *data;
1256   GTask *task;
1257
1258   class = G_INPUT_STREAM_GET_CLASS (stream);
1259
1260   task = g_task_new (stream, cancellable, callback, user_data);
1261   g_task_set_priority (task, io_priority);
1262
1263   if (class->read_async == g_input_stream_real_read_async &&
1264       !CAN_DO_NONBLOCKING_READS (stream))
1265     {
1266       /* Read is thread-using async fallback.
1267        * Make skip use threads too, so that we can use a possible sync skip
1268        * implementation. */
1269       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1270
1271       g_task_run_in_thread (task, skip_async_thread);
1272       g_object_unref (task);
1273     }
1274   else
1275     {
1276       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1277       
1278       /* There is a custom async read function, lets use that. */
1279       data = g_new (SkipFallbackAsyncData, 1);
1280       data->count = count;
1281       data->count_skipped = 0;
1282       g_task_set_task_data (task, data, g_free);
1283       g_task_set_check_cancellable (task, FALSE);
1284       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1285                          skip_callback_wrapper, task);
1286     }
1287
1288 }
1289
1290 static gssize
1291 g_input_stream_real_skip_finish (GInputStream  *stream,
1292                                  GAsyncResult  *result,
1293                                  GError       **error)
1294 {
1295   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1296
1297   return g_task_propagate_int (G_TASK (result), error);
1298 }
1299
1300 static void
1301 close_async_thread (GTask        *task,
1302                     gpointer      source_object,
1303                     gpointer      task_data,
1304                     GCancellable *cancellable)
1305 {
1306   GInputStream *stream = source_object;
1307   GInputStreamClass *class;
1308   GError *error = NULL;
1309   gboolean result;
1310
1311   class = G_INPUT_STREAM_GET_CLASS (stream);
1312   if (class->close_fn)
1313     {
1314       result = class->close_fn (stream,
1315                                 g_task_get_cancellable (task),
1316                                 &error);
1317       if (!result)
1318         {
1319           g_task_return_error (task, error);
1320           return;
1321         }
1322     }
1323
1324   g_task_return_boolean (task, TRUE);
1325 }
1326
1327 static void
1328 g_input_stream_real_close_async (GInputStream        *stream,
1329                                  int                  io_priority,
1330                                  GCancellable        *cancellable,
1331                                  GAsyncReadyCallback  callback,
1332                                  gpointer             user_data)
1333 {
1334   GTask *task;
1335
1336   task = g_task_new (stream, cancellable, callback, user_data);
1337   g_task_set_check_cancellable (task, FALSE);
1338   g_task_set_priority (task, io_priority);
1339   
1340   g_task_run_in_thread (task, close_async_thread);
1341   g_object_unref (task);
1342 }
1343
1344 static gboolean
1345 g_input_stream_real_close_finish (GInputStream  *stream,
1346                                   GAsyncResult  *result,
1347                                   GError       **error)
1348 {
1349   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1350
1351   return g_task_propagate_boolean (G_TASK (result), error);
1352 }