Revert "GIOScheduler: Avoid constant iteration over pending job list"
[platform/upstream/glib.git] / gio / gconverterinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2009 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <string.h>
26
27 #include "gconverterinputstream.h"
28 #include "gpollableinputstream.h"
29 #include "gsimpleasyncresult.h"
30 #include "gcancellable.h"
31 #include "gioenumtypes.h"
32 #include "gioerror.h"
33 #include "glibintl.h"
34
35
36 /**
37  * SECTION:gconverterinputstream
38  * @short_description: Converter Input Stream
39  * @include: gio/gio.h
40  * @see_also: #GInputStream, #GConverter
41  *
42  * Converter input stream implements #GInputStream and allows
43  * conversion of data of various types during reading.
44  *
45  * As of GLib 2.34, #GConverterInputStream implements
46  * #GPollableInputStream.
47  **/
48
49 #define INITIAL_BUFFER_SIZE 4096
50
51 typedef struct {
52   char *data;
53   gsize start;
54   gsize end;
55   gsize size;
56 } Buffer;
57
58 struct _GConverterInputStreamPrivate {
59   gboolean at_input_end;
60   gboolean finished;
61   gboolean need_input;
62   GConverter *converter;
63   Buffer input_buffer;
64   Buffer converted_buffer;
65 };
66
67 enum {
68   PROP_0,
69   PROP_CONVERTER
70 };
71
72 static void   g_converter_input_stream_set_property (GObject       *object,
73                                                      guint          prop_id,
74                                                      const GValue  *value,
75                                                      GParamSpec    *pspec);
76 static void   g_converter_input_stream_get_property (GObject       *object,
77                                                      guint          prop_id,
78                                                      GValue        *value,
79                                                      GParamSpec    *pspec);
80 static void   g_converter_input_stream_finalize     (GObject       *object);
81 static gssize g_converter_input_stream_read         (GInputStream  *stream,
82                                                      void          *buffer,
83                                                      gsize          count,
84                                                      GCancellable  *cancellable,
85                                                      GError       **error);
86
87 static gboolean g_converter_input_stream_can_poll         (GPollableInputStream *stream);
88 static gboolean g_converter_input_stream_is_readable      (GPollableInputStream *stream);
89 static gssize   g_converter_input_stream_read_nonblocking (GPollableInputStream  *stream,
90                                                            void                  *buffer,
91                                                            gsize                  size,
92                                                            GError               **error);
93
94 static GSource *g_converter_input_stream_create_source    (GPollableInputStream *stream,
95                                                            GCancellable          *cancellable);
96
97 static void g_converter_input_stream_pollable_iface_init  (GPollableInputStreamInterface *iface);
98
99 G_DEFINE_TYPE_WITH_CODE (GConverterInputStream,
100                          g_converter_input_stream,
101                          G_TYPE_FILTER_INPUT_STREAM,
102                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
103                                                 g_converter_input_stream_pollable_iface_init);
104                          )
105
106 static void
107 g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
108 {
109   GObjectClass *object_class;
110   GInputStreamClass *istream_class;
111
112   g_type_class_add_private (klass, sizeof (GConverterInputStreamPrivate));
113
114   object_class = G_OBJECT_CLASS (klass);
115   object_class->get_property = g_converter_input_stream_get_property;
116   object_class->set_property = g_converter_input_stream_set_property;
117   object_class->finalize     = g_converter_input_stream_finalize;
118
119   istream_class = G_INPUT_STREAM_CLASS (klass);
120   istream_class->read_fn = g_converter_input_stream_read;
121
122   g_object_class_install_property (object_class,
123                                    PROP_CONVERTER,
124                                    g_param_spec_object ("converter",
125                                                         P_("Converter"),
126                                                         P_("The converter object"),
127                                                         G_TYPE_CONVERTER,
128                                                         G_PARAM_READWRITE|
129                                                         G_PARAM_CONSTRUCT_ONLY|
130                                                         G_PARAM_STATIC_STRINGS));
131
132 }
133
134 static void
135 g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
136 {
137   iface->can_poll = g_converter_input_stream_can_poll;
138   iface->is_readable = g_converter_input_stream_is_readable;
139   iface->read_nonblocking = g_converter_input_stream_read_nonblocking;
140   iface->create_source = g_converter_input_stream_create_source;
141 }
142
143 static void
144 g_converter_input_stream_finalize (GObject *object)
145 {
146   GConverterInputStreamPrivate *priv;
147   GConverterInputStream        *stream;
148
149   stream = G_CONVERTER_INPUT_STREAM (object);
150   priv = stream->priv;
151
152   g_free (priv->input_buffer.data);
153   g_free (priv->converted_buffer.data);
154   if (priv->converter)
155     g_object_unref (priv->converter);
156
157   G_OBJECT_CLASS (g_converter_input_stream_parent_class)->finalize (object);
158 }
159
160 static void
161 g_converter_input_stream_set_property (GObject      *object,
162                                        guint         prop_id,
163                                        const GValue *value,
164                                        GParamSpec   *pspec)
165 {
166   GConverterInputStream *cstream;
167
168   cstream = G_CONVERTER_INPUT_STREAM (object);
169
170    switch (prop_id)
171     {
172     case PROP_CONVERTER:
173       cstream->priv->converter = g_value_dup_object (value);
174       break;
175
176     default:
177       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
178       break;
179     }
180
181 }
182
183 static void
184 g_converter_input_stream_get_property (GObject    *object,
185                                        guint       prop_id,
186                                        GValue     *value,
187                                        GParamSpec *pspec)
188 {
189   GConverterInputStreamPrivate *priv;
190   GConverterInputStream        *cstream;
191
192   cstream = G_CONVERTER_INPUT_STREAM (object);
193   priv = cstream->priv;
194
195   switch (prop_id)
196     {
197     case PROP_CONVERTER:
198       g_value_set_object (value, priv->converter);
199       break;
200
201     default:
202       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
203       break;
204     }
205
206 }
207 static void
208 g_converter_input_stream_init (GConverterInputStream *stream)
209 {
210   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
211                                               G_TYPE_CONVERTER_INPUT_STREAM,
212                                               GConverterInputStreamPrivate);
213 }
214
215 /**
216  * g_converter_input_stream_new:
217  * @base_stream: a #GInputStream
218  * @converter: a #GConverter
219  *
220  * Creates a new converter input stream for the @base_stream.
221  *
222  * Returns: a new #GInputStream.
223  **/
224 GInputStream *
225 g_converter_input_stream_new (GInputStream *base_stream,
226                               GConverter   *converter)
227 {
228   GInputStream *stream;
229
230   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
231
232   stream = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM,
233                          "base-stream", base_stream,
234                          "converter", converter,
235                          NULL);
236
237   return stream;
238 }
239
240 static gsize
241 buffer_data_size (Buffer *buffer)
242 {
243   return buffer->end - buffer->start;
244 }
245
246 static gsize
247 buffer_tailspace (Buffer *buffer)
248 {
249   return buffer->size - buffer->end;
250 }
251
252 static char *
253 buffer_data (Buffer *buffer)
254 {
255   return buffer->data + buffer->start;
256 }
257
258 static void
259 buffer_consumed (Buffer *buffer,
260                  gsize count)
261 {
262   buffer->start += count;
263   if (buffer->start == buffer->end)
264     buffer->start = buffer->end = 0;
265 }
266
267 static void
268 buffer_read (Buffer *buffer,
269              char *dest,
270              gsize count)
271 {
272   memcpy (dest, buffer->data + buffer->start, count);
273   buffer_consumed (buffer, count);
274 }
275
276 static void
277 compact_buffer (Buffer *buffer)
278 {
279   gsize in_buffer;
280
281   in_buffer = buffer_data_size (buffer);
282   memmove (buffer->data,
283            buffer->data + buffer->start,
284            in_buffer);
285   buffer->end -= buffer->start;
286   buffer->start = 0;
287 }
288
289 static void
290 grow_buffer (Buffer *buffer)
291 {
292   char *data;
293   gsize size, in_buffer;
294
295   if (buffer->size == 0)
296     size = INITIAL_BUFFER_SIZE;
297   else
298     size = buffer->size * 2;
299
300   data = g_malloc (size);
301   in_buffer = buffer_data_size (buffer);
302
303   memcpy (data,
304           buffer->data + buffer->start,
305           in_buffer);
306   g_free (buffer->data);
307   buffer->data = data;
308   buffer->end -= buffer->start;
309   buffer->start = 0;
310   buffer->size = size;
311 }
312
313 /* Ensures that the buffer can fit at_least_size bytes,
314  * *including* the current in-buffer data */
315 static void
316 buffer_ensure_space (Buffer *buffer,
317                      gsize at_least_size)
318 {
319   gsize in_buffer, left_to_fill;
320
321   in_buffer = buffer_data_size (buffer);
322
323   if (in_buffer >= at_least_size)
324     return;
325
326   left_to_fill = buffer_tailspace (buffer);
327
328   if (in_buffer + left_to_fill >= at_least_size)
329     {
330       /* We fit in remaining space at end */
331       /* If the copy is small, compact now anyway so we can fill more */
332       if (in_buffer < 256)
333         compact_buffer (buffer);
334     }
335   else if (buffer->size >= at_least_size)
336     {
337       /* We fit, but only if we compact */
338       compact_buffer (buffer);
339     }
340   else
341     {
342       /* Need to grow buffer */
343       while (buffer->size < at_least_size)
344         grow_buffer (buffer);
345     }
346 }
347
348 static gssize
349 fill_input_buffer (GConverterInputStream  *stream,
350                    gsize                   at_least_size,
351                    gboolean                blocking,
352                    GCancellable           *cancellable,
353                    GError                **error)
354 {
355   GConverterInputStreamPrivate *priv;
356   GInputStream *base_stream;
357   gssize nread;
358
359   priv = stream->priv;
360
361   buffer_ensure_space (&priv->input_buffer, at_least_size);
362
363   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
364   nread = g_pollable_stream_read (base_stream,
365                                   priv->input_buffer.data + priv->input_buffer.end,
366                                   buffer_tailspace (&priv->input_buffer),
367                                   blocking,
368                                   cancellable,
369                                   error);
370
371   if (nread > 0)
372     {
373       priv->input_buffer.end += nread;
374       priv->need_input = FALSE;
375     }
376
377   return nread;
378 }
379
380
381 static gssize
382 read_internal (GInputStream *stream,
383                void         *buffer,
384                gsize         count,
385                gboolean      blocking,
386                GCancellable *cancellable,
387                GError      **error)
388 {
389   GConverterInputStream *cstream;
390   GConverterInputStreamPrivate *priv;
391   gsize available, total_bytes_read;
392   gssize nread;
393   GConverterResult res;
394   gsize bytes_read;
395   gsize bytes_written;
396   GError *my_error;
397   GError *my_error2;
398
399   cstream = G_CONVERTER_INPUT_STREAM (stream);
400   priv = cstream->priv;
401
402   available = buffer_data_size (&priv->converted_buffer);
403
404   if (available > 0 &&
405       count <= available)
406     {
407       /* Converted data available, return that */
408       buffer_read (&priv->converted_buffer, buffer, count);
409       return count;
410     }
411
412   /* Full request not available, read all currently available and request
413      refill/conversion for more */
414
415   buffer_read (&priv->converted_buffer, buffer, available);
416
417   total_bytes_read = available;
418   buffer = (char *) buffer + available;
419   count -= available;
420
421   /* If there is no data to convert, and no pre-converted data,
422      do some i/o for more input */
423   if (buffer_data_size (&priv->input_buffer) == 0 &&
424       total_bytes_read == 0 &&
425       !priv->at_input_end)
426     {
427       nread = fill_input_buffer (cstream, count, blocking, cancellable, error);
428       if (nread < 0)
429         return -1;
430       if (nread == 0)
431         priv->at_input_end = TRUE;
432     }
433
434   /* First try to convert any available data (or state) directly to the user buffer: */
435   if (!priv->finished)
436     {
437       my_error = NULL;
438       res = g_converter_convert (priv->converter,
439                                  buffer_data (&priv->input_buffer),
440                                  buffer_data_size (&priv->input_buffer),
441                                  buffer, count,
442                                  priv->at_input_end ? G_CONVERTER_INPUT_AT_END : 0,
443                                  &bytes_read,
444                                  &bytes_written,
445                                  &my_error);
446       if (res != G_CONVERTER_ERROR)
447         {
448           total_bytes_read += bytes_written;
449           buffer_consumed (&priv->input_buffer, bytes_read);
450           if (res == G_CONVERTER_FINISHED)
451             priv->finished = TRUE; /* We're done converting */
452         }
453       else if (total_bytes_read == 0 &&
454                !g_error_matches (my_error,
455                                  G_IO_ERROR,
456                                  G_IO_ERROR_PARTIAL_INPUT) &&
457                !g_error_matches (my_error,
458                                  G_IO_ERROR,
459                                  G_IO_ERROR_NO_SPACE))
460         {
461           /* No previously read data and no "special" error, return error */
462           g_propagate_error (error, my_error);
463           return -1;
464         }
465       else
466         g_error_free (my_error);
467     }
468
469   /* We had some pre-converted data and/or we converted directly to the
470      user buffer */
471   if (total_bytes_read > 0)
472     return total_bytes_read;
473
474   /* If there is no more to convert, return EOF */
475   if (priv->finished)
476     {
477       g_assert (buffer_data_size (&priv->converted_buffer) == 0);
478       return 0;
479     }
480
481   /* There was "complexity" in the straight-to-buffer conversion,
482    * convert to our own buffer and write from that.
483    * At this point we didn't produce any data into @buffer.
484    */
485
486   /* Ensure we have *some* initial target space */
487   buffer_ensure_space (&priv->converted_buffer, count);
488
489   while (TRUE)
490     {
491       g_assert (!priv->finished);
492
493       /* Try to convert to our buffer */
494       my_error = NULL;
495       res = g_converter_convert (priv->converter,
496                                  buffer_data (&priv->input_buffer),
497                                  buffer_data_size (&priv->input_buffer),
498                                  buffer_data (&priv->converted_buffer),
499                                  buffer_tailspace (&priv->converted_buffer),
500                                  priv->at_input_end ? G_CONVERTER_INPUT_AT_END : 0,
501                                  &bytes_read,
502                                  &bytes_written,
503                                  &my_error);
504       if (res != G_CONVERTER_ERROR)
505         {
506           priv->converted_buffer.end += bytes_written;
507           buffer_consumed (&priv->input_buffer, bytes_read);
508
509           /* Maybe we consumed without producing any output */
510           if (buffer_data_size (&priv->converted_buffer) == 0 && res != G_CONVERTER_FINISHED)
511             continue; /* Convert more */
512
513           if (res == G_CONVERTER_FINISHED)
514             priv->finished = TRUE;
515
516           total_bytes_read = MIN (count, buffer_data_size (&priv->converted_buffer));
517           buffer_read (&priv->converted_buffer, buffer, total_bytes_read);
518
519           g_assert (priv->finished || total_bytes_read > 0);
520
521           return total_bytes_read;
522         }
523
524       /* There was some kind of error filling our buffer */
525
526       if (g_error_matches (my_error,
527                            G_IO_ERROR,
528                            G_IO_ERROR_PARTIAL_INPUT) &&
529           !priv->at_input_end)
530         {
531           /* Need more data */
532           my_error2 = NULL;
533           nread = fill_input_buffer (cstream,
534                                      buffer_data_size (&priv->input_buffer) + 4096,
535                                      blocking,
536                                      cancellable,
537                                      &my_error2);
538           if (nread < 0)
539             {
540               /* Can't read any more data, return that error */
541               g_error_free (my_error);
542               g_propagate_error (error, my_error2);
543               priv->need_input = TRUE;
544               return -1;
545             }
546           else if (nread == 0)
547             {
548               /* End of file, try INPUT_AT_END */
549               priv->at_input_end = TRUE;
550             }
551           g_error_free (my_error);
552           continue;
553         }
554
555       if (g_error_matches (my_error,
556                            G_IO_ERROR,
557                            G_IO_ERROR_NO_SPACE))
558         {
559           /* Need more destination space, grow it
560            * Note: if we actually grow the buffer (as opposed to compacting it),
561            * this will double the size, not just add one byte. */
562           buffer_ensure_space (&priv->converted_buffer,
563                                priv->converted_buffer.size + 1);
564           g_error_free (my_error);
565           continue;
566         }
567
568       /* Any other random error, return it */
569       g_propagate_error (error, my_error);
570       return -1;
571     }
572
573   g_assert_not_reached ();
574 }
575
576 static gssize
577 g_converter_input_stream_read (GInputStream *stream,
578                                void         *buffer,
579                                gsize         count,
580                                GCancellable *cancellable,
581                                GError      **error)
582 {
583   return read_internal (stream, buffer, count, TRUE, cancellable, error);
584 }
585
586 static gboolean
587 g_converter_input_stream_can_poll (GPollableInputStream *stream)
588 {
589   GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
590
591   return (G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
592           g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream)));
593 }
594
595 static gboolean
596 g_converter_input_stream_is_readable (GPollableInputStream *stream)
597 {
598   GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
599   GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream);
600
601   if (buffer_data_size (&cstream->priv->converted_buffer))
602     return TRUE;
603   else if (buffer_data_size (&cstream->priv->input_buffer) &&
604            !cstream->priv->need_input)
605     return TRUE;
606   else
607     return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream));
608 }
609
610 static gssize
611 g_converter_input_stream_read_nonblocking (GPollableInputStream  *stream,
612                                            void                  *buffer,
613                                            gsize                  count,
614                                            GError               **error)
615 {
616   return read_internal (G_INPUT_STREAM (stream), buffer, count,
617                         FALSE, NULL, error);
618 }
619
620 static GSource *
621 g_converter_input_stream_create_source (GPollableInputStream *stream,
622                                         GCancellable         *cancellable)
623 {
624   GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
625   GSource *base_source, *pollable_source;
626
627   if (g_pollable_input_stream_is_readable (stream))
628     base_source = g_timeout_source_new (0);
629   else
630     base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL);
631
632   pollable_source = g_pollable_source_new_full (stream, base_source,
633                                                 cancellable);
634   g_source_unref (base_source);
635
636   return pollable_source;
637 }
638
639
640 /**
641  * g_converter_input_stream_get_converter:
642  * @converter_stream: a #GConverterInputStream
643  *
644  * Gets the #GConverter that is used by @converter_stream.
645  *
646  * Returns: (transfer none): the converter of the converter input stream
647  *
648  * Since: 2.24
649  */
650 GConverter *
651 g_converter_input_stream_get_converter (GConverterInputStream *converter_stream)
652 {
653   return converter_stream->priv->converter;
654 }