gio: implement GPollableInput/OutputStream in more stream types
[platform/upstream/glib.git] / gio / gconverterinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2009 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <string.h>
26
27 #include "gconverterinputstream.h"
28 #include "gpollableinputstream.h"
29 #include "gsimpleasyncresult.h"
30 #include "gcancellable.h"
31 #include "gioenumtypes.h"
32 #include "gioerror.h"
33 #include "glibintl.h"
34
35
36 /**
37  * SECTION:gconverterinputstream
38  * @short_description: Converter Input Stream
39  * @include: gio/gio.h
40  * @see_also: #GInputStream, #GConverter
41  *
42  * Converter input stream implements #GInputStream and allows
43  * conversion of data of various types during reading.
44  *
45  * As of GLib 2.34, #GConverterInputStream implements
46  * #GPollableInputStream.
47  **/
48
49 #define INITIAL_BUFFER_SIZE 4096
50
51 typedef struct {
52   char *data;
53   gsize start;
54   gsize end;
55   gsize size;
56 } Buffer;
57
58 struct _GConverterInputStreamPrivate {
59   gboolean at_input_end;
60   gboolean finished;
61   gboolean need_input;
62   GConverter *converter;
63   Buffer input_buffer;
64   Buffer converted_buffer;
65 };
66
67 enum {
68   PROP_0,
69   PROP_CONVERTER
70 };
71
72 static void   g_converter_input_stream_set_property (GObject       *object,
73                                                      guint          prop_id,
74                                                      const GValue  *value,
75                                                      GParamSpec    *pspec);
76 static void   g_converter_input_stream_get_property (GObject       *object,
77                                                      guint          prop_id,
78                                                      GValue        *value,
79                                                      GParamSpec    *pspec);
80 static void   g_converter_input_stream_finalize     (GObject       *object);
81 static gssize g_converter_input_stream_read         (GInputStream  *stream,
82                                                      void          *buffer,
83                                                      gsize          count,
84                                                      GCancellable  *cancellable,
85                                                      GError       **error);
86
87 static gboolean g_converter_input_stream_can_poll         (GPollableInputStream *stream);
88 static gboolean g_converter_input_stream_is_readable      (GPollableInputStream *stream);
89 static gssize   g_converter_input_stream_read_nonblocking (GPollableInputStream  *stream,
90                                                            void                  *buffer,
91                                                            gsize                  size,
92                                                            GError               **error);
93
94 static GSource *g_converter_input_stream_create_source    (GPollableInputStream *stream,
95                                                            GCancellable          *cancellable);
96
97 static void g_converter_input_stream_pollable_iface_init  (GPollableInputStreamInterface *iface);
98
99 G_DEFINE_TYPE_WITH_CODE (GConverterInputStream,
100                          g_converter_input_stream,
101                          G_TYPE_FILTER_INPUT_STREAM,
102                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
103                                                 g_converter_input_stream_pollable_iface_init);
104                          )
105
106 static void
107 g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
108 {
109   GObjectClass *object_class;
110   GInputStreamClass *istream_class;
111
112   g_type_class_add_private (klass, sizeof (GConverterInputStreamPrivate));
113
114   object_class = G_OBJECT_CLASS (klass);
115   object_class->get_property = g_converter_input_stream_get_property;
116   object_class->set_property = g_converter_input_stream_set_property;
117   object_class->finalize     = g_converter_input_stream_finalize;
118
119   istream_class = G_INPUT_STREAM_CLASS (klass);
120   istream_class->read_fn = g_converter_input_stream_read;
121
122   g_object_class_install_property (object_class,
123                                    PROP_CONVERTER,
124                                    g_param_spec_object ("converter",
125                                                         P_("Converter"),
126                                                         P_("The converter object"),
127                                                         G_TYPE_CONVERTER,
128                                                         G_PARAM_READWRITE|
129                                                         G_PARAM_CONSTRUCT_ONLY|
130                                                         G_PARAM_STATIC_STRINGS));
131
132 }
133
134 static void
135 g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
136 {
137   iface->can_poll = g_converter_input_stream_can_poll;
138   iface->is_readable = g_converter_input_stream_is_readable;
139   iface->read_nonblocking = g_converter_input_stream_read_nonblocking;
140   iface->create_source = g_converter_input_stream_create_source;
141 }
142
143 static void
144 g_converter_input_stream_finalize (GObject *object)
145 {
146   GConverterInputStreamPrivate *priv;
147   GConverterInputStream        *stream;
148
149   stream = G_CONVERTER_INPUT_STREAM (object);
150   priv = stream->priv;
151
152   g_free (priv->input_buffer.data);
153   g_free (priv->converted_buffer.data);
154   if (priv->converter)
155     g_object_unref (priv->converter);
156
157   G_OBJECT_CLASS (g_converter_input_stream_parent_class)->finalize (object);
158 }
159
160 static void
161 g_converter_input_stream_set_property (GObject      *object,
162                                        guint         prop_id,
163                                        const GValue *value,
164                                        GParamSpec   *pspec)
165 {
166   GConverterInputStream *cstream;
167
168   cstream = G_CONVERTER_INPUT_STREAM (object);
169
170    switch (prop_id)
171     {
172     case PROP_CONVERTER:
173       cstream->priv->converter = g_value_dup_object (value);
174       break;
175
176     default:
177       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
178       break;
179     }
180
181 }
182
183 static void
184 g_converter_input_stream_get_property (GObject    *object,
185                                        guint       prop_id,
186                                        GValue     *value,
187                                        GParamSpec *pspec)
188 {
189   GConverterInputStreamPrivate *priv;
190   GConverterInputStream        *cstream;
191
192   cstream = G_CONVERTER_INPUT_STREAM (object);
193   priv = cstream->priv;
194
195   switch (prop_id)
196     {
197     case PROP_CONVERTER:
198       g_value_set_object (value, priv->converter);
199       break;
200
201     default:
202       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
203       break;
204     }
205
206 }
207 static void
208 g_converter_input_stream_init (GConverterInputStream *stream)
209 {
210   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
211                                               G_TYPE_CONVERTER_INPUT_STREAM,
212                                               GConverterInputStreamPrivate);
213 }
214
215 /**
216  * g_converter_input_stream_new:
217  * @base_stream: a #GInputStream
218  * @converter: a #GConverter
219  *
220  * Creates a new converter input stream for the @base_stream.
221  *
222  * Returns: a new #GInputStream.
223  **/
224 GInputStream *
225 g_converter_input_stream_new (GInputStream *base_stream,
226                               GConverter   *converter)
227 {
228   GInputStream *stream;
229
230   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
231
232   stream = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM,
233                          "base-stream", base_stream,
234                          "converter", converter,
235                          NULL);
236
237   return stream;
238 }
239
240 static gsize
241 buffer_data_size (Buffer *buffer)
242 {
243   return buffer->end - buffer->start;
244 }
245
246 static gsize
247 buffer_tailspace (Buffer *buffer)
248 {
249   return buffer->size - buffer->end;
250 }
251
252 static char *
253 buffer_data (Buffer *buffer)
254 {
255   return buffer->data + buffer->start;
256 }
257
258 static void
259 buffer_consumed (Buffer *buffer,
260                  gsize count)
261 {
262   buffer->start += count;
263   if (buffer->start == buffer->end)
264     buffer->start = buffer->end = 0;
265 }
266
267 static void
268 buffer_read (Buffer *buffer,
269              char *dest,
270              gsize count)
271 {
272   memcpy (dest, buffer->data + buffer->start, count);
273   buffer_consumed (buffer, count);
274 }
275
276 static void
277 compact_buffer (Buffer *buffer)
278 {
279   gsize in_buffer;
280
281   in_buffer = buffer_data_size (buffer);
282   memmove (buffer->data,
283            buffer->data + buffer->start,
284            in_buffer);
285   buffer->end -= buffer->start;
286   buffer->start = 0;
287 }
288
289 static void
290 grow_buffer (Buffer *buffer)
291 {
292   char *data;
293   gsize size, in_buffer;
294
295   if (buffer->size == 0)
296     size = INITIAL_BUFFER_SIZE;
297   else
298     size = buffer->size * 2;
299
300   data = g_malloc (size);
301   in_buffer = buffer_data_size (buffer);
302
303   memcpy (data,
304           buffer->data + buffer->start,
305           in_buffer);
306   g_free (buffer->data);
307   buffer->data = data;
308   buffer->end -= buffer->start;
309   buffer->start = 0;
310   buffer->size = size;
311 }
312
313 /* Ensures that the buffer can fit at_least_size bytes,
314  * *including* the current in-buffer data */
315 static void
316 buffer_ensure_space (Buffer *buffer,
317                      gsize at_least_size)
318 {
319   gsize in_buffer, left_to_fill;
320
321   in_buffer = buffer_data_size (buffer);
322
323   if (in_buffer >= at_least_size)
324     return;
325
326   left_to_fill = buffer_tailspace (buffer);
327
328   if (in_buffer + left_to_fill >= at_least_size)
329     {
330       /* We fit in remaining space at end */
331       /* If the copy is small, compact now anyway so we can fill more */
332       if (in_buffer < 256)
333         compact_buffer (buffer);
334     }
335   else if (buffer->size >= at_least_size)
336     {
337       /* We fit, but only if we compact */
338       compact_buffer (buffer);
339     }
340   else
341     {
342       /* Need to grow buffer */
343       while (buffer->size < at_least_size)
344         grow_buffer (buffer);
345     }
346 }
347
348 static gssize
349 fill_input_buffer (GConverterInputStream  *stream,
350                    gsize                   at_least_size,
351                    gboolean                blocking,
352                    GCancellable           *cancellable,
353                    GError                **error)
354 {
355   GConverterInputStreamPrivate *priv;
356   GInputStream *base_stream;
357   gssize nread;
358
359   priv = stream->priv;
360
361   buffer_ensure_space (&priv->input_buffer, at_least_size);
362
363   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
364   nread = g_pollable_stream_read (base_stream,
365                                   priv->input_buffer.data + priv->input_buffer.end,
366                                   buffer_tailspace (&priv->input_buffer),
367                                   blocking,
368                                   cancellable,
369                                   error);
370
371   if (nread > 0)
372     {
373       priv->input_buffer.end += nread;
374       priv->need_input = FALSE;
375     }
376
377   return nread;
378 }
379
380
381 static gssize
382 read_internal (GInputStream *stream,
383                void         *buffer,
384                gsize         count,
385                gboolean      blocking,
386                GCancellable *cancellable,
387                GError      **error)
388 {
389   GConverterInputStream *cstream;
390   GConverterInputStreamPrivate *priv;
391   gsize available, total_bytes_read;
392   gssize nread;
393   GConverterResult res;
394   gsize bytes_read;
395   gsize bytes_written;
396   GError *my_error;
397   GError *my_error2;
398
399   cstream = G_CONVERTER_INPUT_STREAM (stream);
400   priv = cstream->priv;
401
402   available = buffer_data_size (&priv->converted_buffer);
403
404   if (available > 0 &&
405       count <= available)
406     {
407       /* Converted data available, return that */
408       buffer_read (&priv->converted_buffer, buffer, count);
409       return count;
410     }
411
412   /* Full request not available, read all currently available and request
413      refill/conversion for more */
414
415   buffer_read (&priv->converted_buffer, buffer, available);
416
417   total_bytes_read = available;
418   count -= available;
419
420   /* If there is no data to convert, and no pre-converted data,
421      do some i/o for more input */
422   if (buffer_data_size (&priv->input_buffer) == 0 &&
423       total_bytes_read == 0 &&
424       !priv->at_input_end)
425     {
426       nread = fill_input_buffer (cstream, count, blocking, cancellable, error);
427       if (nread < 0)
428         return -1;
429       if (nread == 0)
430         priv->at_input_end = TRUE;
431     }
432
433   /* First try to convert any available data (or state) directly to the user buffer: */
434   if (!priv->finished)
435     {
436       my_error = NULL;
437       res = g_converter_convert (priv->converter,
438                                  buffer_data (&priv->input_buffer),
439                                  buffer_data_size (&priv->input_buffer),
440                                  buffer, count,
441                                  priv->at_input_end ? G_CONVERTER_INPUT_AT_END : 0,
442                                  &bytes_read,
443                                  &bytes_written,
444                                  &my_error);
445       if (res != G_CONVERTER_ERROR)
446         {
447           total_bytes_read += bytes_written;
448           buffer_consumed (&priv->input_buffer, bytes_read);
449           if (res == G_CONVERTER_FINISHED)
450             priv->finished = TRUE; /* We're done converting */
451         }
452       else if (total_bytes_read == 0 &&
453                !g_error_matches (my_error,
454                                  G_IO_ERROR,
455                                  G_IO_ERROR_PARTIAL_INPUT) &&
456                !g_error_matches (my_error,
457                                  G_IO_ERROR,
458                                  G_IO_ERROR_NO_SPACE))
459         {
460           /* No previously read data and no "special" error, return error */
461           g_propagate_error (error, my_error);
462           return -1;
463         }
464       else
465         g_error_free (my_error);
466     }
467
468   /* We had some pre-converted data and/or we converted directly to the
469      user buffer */
470   if (total_bytes_read > 0)
471     return total_bytes_read;
472
473   /* If there is no more to convert, return EOF */
474   if (priv->finished)
475     {
476       g_assert (buffer_data_size (&priv->converted_buffer) == 0);
477       return 0;
478     }
479
480   /* There was "complexity" in the straight-to-buffer conversion,
481    * convert to our own buffer and write from that.
482    * At this point we didn't produce any data into @buffer.
483    */
484
485   /* Ensure we have *some* initial target space */
486   buffer_ensure_space (&priv->converted_buffer, count);
487
488   while (TRUE)
489     {
490       g_assert (!priv->finished);
491
492       /* Try to convert to our buffer */
493       my_error = NULL;
494       res = g_converter_convert (priv->converter,
495                                  buffer_data (&priv->input_buffer),
496                                  buffer_data_size (&priv->input_buffer),
497                                  buffer_data (&priv->converted_buffer),
498                                  buffer_tailspace (&priv->converted_buffer),
499                                  priv->at_input_end ? G_CONVERTER_INPUT_AT_END : 0,
500                                  &bytes_read,
501                                  &bytes_written,
502                                  &my_error);
503       if (res != G_CONVERTER_ERROR)
504         {
505           priv->converted_buffer.end += bytes_written;
506           buffer_consumed (&priv->input_buffer, bytes_read);
507
508           /* Maybe we consumed without producing any output */
509           if (buffer_data_size (&priv->converted_buffer) == 0 && res != G_CONVERTER_FINISHED)
510             continue; /* Convert more */
511
512           if (res == G_CONVERTER_FINISHED)
513             priv->finished = TRUE;
514
515           total_bytes_read = MIN (count, buffer_data_size (&priv->converted_buffer));
516           buffer_read (&priv->converted_buffer, buffer, total_bytes_read);
517
518           g_assert (priv->finished || total_bytes_read > 0);
519
520           return total_bytes_read;
521         }
522
523       /* There was some kind of error filling our buffer */
524
525       if (g_error_matches (my_error,
526                            G_IO_ERROR,
527                            G_IO_ERROR_PARTIAL_INPUT) &&
528           !priv->at_input_end)
529         {
530           /* Need more data */
531           my_error2 = NULL;
532           nread = fill_input_buffer (cstream,
533                                      buffer_data_size (&priv->input_buffer) + 4096,
534                                      blocking,
535                                      cancellable,
536                                      &my_error2);
537           if (nread < 0)
538             {
539               /* Can't read any more data, return that error */
540               g_error_free (my_error);
541               g_propagate_error (error, my_error2);
542               priv->need_input = TRUE;
543               return -1;
544             }
545           else if (nread == 0)
546             {
547               /* End of file, try INPUT_AT_END */
548               priv->at_input_end = TRUE;
549             }
550           g_error_free (my_error);
551           continue;
552         }
553
554       if (g_error_matches (my_error,
555                            G_IO_ERROR,
556                            G_IO_ERROR_NO_SPACE))
557         {
558           /* Need more destination space, grow it
559            * Note: if we actually grow the buffer (as opposed to compacting it),
560            * this will double the size, not just add one byte. */
561           buffer_ensure_space (&priv->converted_buffer,
562                                priv->converted_buffer.size + 1);
563           g_error_free (my_error);
564           continue;
565         }
566
567       /* Any other random error, return it */
568       g_propagate_error (error, my_error);
569       return -1;
570     }
571
572   g_assert_not_reached ();
573 }
574
575 static gssize
576 g_converter_input_stream_read (GInputStream *stream,
577                                void         *buffer,
578                                gsize         count,
579                                GCancellable *cancellable,
580                                GError      **error)
581 {
582   return read_internal (stream, buffer, count, TRUE, cancellable, error);
583 }
584
585 static gboolean
586 g_converter_input_stream_can_poll (GPollableInputStream *stream)
587 {
588   GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
589
590   return (G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
591           g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream)));
592 }
593
594 static gboolean
595 g_converter_input_stream_is_readable (GPollableInputStream *stream)
596 {
597   GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
598   GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream);
599
600   if (buffer_data_size (&cstream->priv->converted_buffer))
601     return TRUE;
602   else if (buffer_data_size (&cstream->priv->input_buffer) &&
603            !cstream->priv->need_input)
604     return TRUE;
605   else
606     return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream));
607 }
608
609 static gssize
610 g_converter_input_stream_read_nonblocking (GPollableInputStream  *stream,
611                                            void                  *buffer,
612                                            gsize                  count,
613                                            GError               **error)
614 {
615   return read_internal (G_INPUT_STREAM (stream), buffer, count,
616                         FALSE, NULL, error);
617 }
618
619 static GSource *
620 g_converter_input_stream_create_source (GPollableInputStream *stream,
621                                         GCancellable         *cancellable)
622 {
623   GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
624   GSource *base_source, *pollable_source;
625
626   if (g_pollable_input_stream_is_readable (stream))
627     base_source = g_timeout_source_new (0);
628   else
629     base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL);
630
631   pollable_source = g_pollable_source_new_full (stream, base_source,
632                                                 cancellable);
633   g_source_unref (base_source);
634
635   return pollable_source;
636 }
637
638
639 /**
640  * g_converter_input_stream_get_converter:
641  * @converter_stream: a #GConverterInputStream
642  *
643  * Gets the #GConverter that is used by @converter_stream.
644  *
645  * Returns: (transfer none): the converter of the converter input stream
646  *
647  * Since: 2.24
648  */
649 GConverter *
650 g_converter_input_stream_get_converter (GConverterInputStream *converter_stream)
651 {
652   return converter_stream->priv->converter;
653 }