Revert "GIOScheduler: Avoid constant iteration over pending job list"
[platform/upstream/glib.git] / gio / gconverteroutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2009 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24
25 #include <string.h>
26
27 #include "gconverteroutputstream.h"
28 #include "gpollableoutputstream.h"
29 #include "gsimpleasyncresult.h"
30 #include "gcancellable.h"
31 #include "gioenumtypes.h"
32 #include "gioerror.h"
33 #include "glibintl.h"
34
35
36 /**
37  * SECTION:gconverteroutputstream
38  * @short_description: Converter Output Stream
39  * @include: gio/gio.h
40  * @see_also: #GOutputStream, #GConverter
41  *
42  * Converter output stream implements #GOutputStream and allows
43  * conversion of data of various types during reading.
44  *
45  * As of GLib 2.34, #GConverterOutputStream implements
46  * #GPollableOutputStream.
47  **/
48
49 #define INITIAL_BUFFER_SIZE 4096
50
51 typedef struct {
52   char *data;
53   gsize start;
54   gsize end;
55   gsize size;
56 } Buffer;
57
58 struct _GConverterOutputStreamPrivate {
59   gboolean at_output_end;
60   gboolean finished;
61   GConverter *converter;
62   Buffer output_buffer; /* To be converted and written */
63   Buffer converted_buffer; /* Already converted */
64 };
65
66 /* Buffering strategy:
67  *
68  * Each time we write we must at least consume some input, or
69  * return an error. Thus we start with writing all already
70  * converted data and *then* we start converting (reporting
71  * an error at any point in this).
72  *
73  * Its possible that what the user wrote is not enough data
74  * for the converter, so we must then buffer it in output_buffer
75  * and ask for more data, but we want to avoid this as much as
76  * possible, converting directly from the users buffer.
77  */
78
79 enum {
80   PROP_0,
81   PROP_CONVERTER
82 };
83
84 static void   g_converter_output_stream_set_property (GObject        *object,
85                                                       guint           prop_id,
86                                                       const GValue   *value,
87                                                       GParamSpec     *pspec);
88 static void   g_converter_output_stream_get_property (GObject        *object,
89                                                       guint           prop_id,
90                                                       GValue         *value,
91                                                       GParamSpec     *pspec);
92 static void   g_converter_output_stream_finalize     (GObject        *object);
93 static gssize g_converter_output_stream_write        (GOutputStream  *stream,
94                                                       const void     *buffer,
95                                                       gsize           count,
96                                                       GCancellable   *cancellable,
97                                                       GError        **error);
98 static gboolean g_converter_output_stream_flush      (GOutputStream  *stream,
99                                                       GCancellable   *cancellable,
100                                                       GError        **error);
101
102 static gboolean g_converter_output_stream_can_poll          (GPollableOutputStream *stream);
103 static gboolean g_converter_output_stream_is_writable       (GPollableOutputStream *stream);
104 static gssize   g_converter_output_stream_write_nonblocking (GPollableOutputStream  *stream,
105                                                              const void             *buffer,
106                                                              gsize                  size,
107                                                              GError               **error);
108
109 static GSource *g_converter_output_stream_create_source     (GPollableOutputStream *stream,
110                                                              GCancellable          *cancellable);
111
112 static void g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
113
114 G_DEFINE_TYPE_WITH_CODE (GConverterOutputStream,
115                          g_converter_output_stream,
116                          G_TYPE_FILTER_OUTPUT_STREAM,
117                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
118                                                 g_converter_output_stream_pollable_iface_init);
119                          )
120
121 static void
122 g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
123 {
124   GObjectClass *object_class;
125   GOutputStreamClass *istream_class;
126
127   g_type_class_add_private (klass, sizeof (GConverterOutputStreamPrivate));
128
129   object_class = G_OBJECT_CLASS (klass);
130   object_class->get_property = g_converter_output_stream_get_property;
131   object_class->set_property = g_converter_output_stream_set_property;
132   object_class->finalize     = g_converter_output_stream_finalize;
133
134   istream_class = G_OUTPUT_STREAM_CLASS (klass);
135   istream_class->write_fn = g_converter_output_stream_write;
136   istream_class->flush = g_converter_output_stream_flush;
137
138   g_object_class_install_property (object_class,
139                                    PROP_CONVERTER,
140                                    g_param_spec_object ("converter",
141                                                         P_("Converter"),
142                                                         P_("The converter object"),
143                                                         G_TYPE_CONVERTER,
144                                                         G_PARAM_READWRITE|
145                                                         G_PARAM_CONSTRUCT_ONLY|
146                                                         G_PARAM_STATIC_STRINGS));
147
148 }
149
150 static void
151 g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
152 {
153   iface->can_poll = g_converter_output_stream_can_poll;
154   iface->is_writable = g_converter_output_stream_is_writable;
155   iface->write_nonblocking = g_converter_output_stream_write_nonblocking;
156   iface->create_source = g_converter_output_stream_create_source;
157 }
158
159 static void
160 g_converter_output_stream_finalize (GObject *object)
161 {
162   GConverterOutputStreamPrivate *priv;
163   GConverterOutputStream        *stream;
164
165   stream = G_CONVERTER_OUTPUT_STREAM (object);
166   priv = stream->priv;
167
168   g_free (priv->output_buffer.data);
169   g_free (priv->converted_buffer.data);
170   if (priv->converter)
171     g_object_unref (priv->converter);
172
173   G_OBJECT_CLASS (g_converter_output_stream_parent_class)->finalize (object);
174 }
175
176 static void
177 g_converter_output_stream_set_property (GObject      *object,
178                                        guint         prop_id,
179                                        const GValue *value,
180                                        GParamSpec   *pspec)
181 {
182   GConverterOutputStream *cstream;
183
184   cstream = G_CONVERTER_OUTPUT_STREAM (object);
185
186    switch (prop_id)
187     {
188     case PROP_CONVERTER:
189       cstream->priv->converter = g_value_dup_object (value);
190       break;
191
192     default:
193       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
194       break;
195     }
196
197 }
198
199 static void
200 g_converter_output_stream_get_property (GObject    *object,
201                                        guint       prop_id,
202                                        GValue     *value,
203                                        GParamSpec *pspec)
204 {
205   GConverterOutputStreamPrivate *priv;
206   GConverterOutputStream        *cstream;
207
208   cstream = G_CONVERTER_OUTPUT_STREAM (object);
209   priv = cstream->priv;
210
211   switch (prop_id)
212     {
213     case PROP_CONVERTER:
214       g_value_set_object (value, priv->converter);
215       break;
216
217     default:
218       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
219       break;
220     }
221 }
222
223 static void
224 g_converter_output_stream_init (GConverterOutputStream *stream)
225 {
226   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
227                                               G_TYPE_CONVERTER_OUTPUT_STREAM,
228                                               GConverterOutputStreamPrivate);
229 }
230
231 /**
232  * g_converter_output_stream_new:
233  * @base_stream: a #GOutputStream
234  * @converter: a #GConverter
235  *
236  * Creates a new converter output stream for the @base_stream.
237  *
238  * Returns: a new #GOutputStream.
239  **/
240 GOutputStream *
241 g_converter_output_stream_new (GOutputStream *base_stream,
242                                GConverter    *converter)
243 {
244   GOutputStream *stream;
245
246   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
247
248   stream = g_object_new (G_TYPE_CONVERTER_OUTPUT_STREAM,
249                          "base-stream", base_stream,
250                          "converter", converter,
251                          NULL);
252
253   return stream;
254 }
255
256 static gsize
257 buffer_data_size (Buffer *buffer)
258 {
259   return buffer->end - buffer->start;
260 }
261
262 static gsize
263 buffer_tailspace (Buffer *buffer)
264 {
265   return buffer->size - buffer->end;
266 }
267
268 static char *
269 buffer_data (Buffer *buffer)
270 {
271   return buffer->data + buffer->start;
272 }
273
274 static void
275 buffer_consumed (Buffer *buffer,
276                  gsize count)
277 {
278   buffer->start += count;
279   if (buffer->start == buffer->end)
280     buffer->start = buffer->end = 0;
281 }
282
283 static void
284 compact_buffer (Buffer *buffer)
285 {
286   gsize in_buffer;
287
288   in_buffer = buffer_data_size (buffer);
289   memmove (buffer->data,
290            buffer->data + buffer->start,
291            in_buffer);
292   buffer->end -= buffer->start;
293   buffer->start = 0;
294 }
295
296 static void
297 grow_buffer (Buffer *buffer)
298 {
299   char *data;
300   gsize size, in_buffer;
301
302   if (buffer->size == 0)
303     size = INITIAL_BUFFER_SIZE;
304   else
305     size = buffer->size * 2;
306
307   data = g_malloc (size);
308   in_buffer = buffer_data_size (buffer);
309
310   memcpy (data,
311           buffer->data + buffer->start,
312           in_buffer);
313   g_free (buffer->data);
314   buffer->data = data;
315   buffer->end -= buffer->start;
316   buffer->start = 0;
317   buffer->size = size;
318 }
319
320 /* Ensures that the buffer can fit at_least_size bytes,
321  * *including* the current in-buffer data */
322 static void
323 buffer_ensure_space (Buffer *buffer,
324                      gsize at_least_size)
325 {
326   gsize in_buffer, left_to_fill;
327
328   in_buffer = buffer_data_size (buffer);
329
330   if (in_buffer >= at_least_size)
331     return;
332
333   left_to_fill = buffer_tailspace (buffer);
334
335   if (in_buffer + left_to_fill >= at_least_size)
336     {
337       /* We fit in remaining space at end */
338       /* If the copy is small, compact now anyway so we can fill more */
339       if (in_buffer < 256)
340         compact_buffer (buffer);
341     }
342   else if (buffer->size >= at_least_size)
343     {
344       /* We fit, but only if we compact */
345       compact_buffer (buffer);
346     }
347   else
348     {
349       /* Need to grow buffer */
350       while (buffer->size < at_least_size)
351         grow_buffer (buffer);
352     }
353 }
354
355 static void
356 buffer_append (Buffer *buffer,
357                const char *data,
358                gsize data_size)
359 {
360   buffer_ensure_space (buffer,
361                        buffer_data_size (buffer) + data_size);
362   memcpy (buffer->data + buffer->end, data, data_size);
363   buffer->end += data_size;
364 }
365
366
367 static gboolean
368 flush_buffer (GConverterOutputStream *stream,
369               gboolean                blocking,
370               GCancellable           *cancellable,
371               GError                **error)
372 {
373   GConverterOutputStreamPrivate *priv;
374   GOutputStream *base_stream;
375   gsize nwritten;
376   gsize available;
377   gboolean res;
378
379   priv = stream->priv;
380
381   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
382
383   available = buffer_data_size (&priv->converted_buffer);
384   if (available > 0)
385     {
386       res = g_pollable_stream_write_all (base_stream,
387                                          buffer_data (&priv->converted_buffer),
388                                          available,
389                                          blocking,
390                                          &nwritten,
391                                          cancellable,
392                                          error);
393       buffer_consumed (&priv->converted_buffer, nwritten);
394       return res;
395     }
396   return TRUE;
397 }
398
399
400 static gssize
401 write_internal (GOutputStream  *stream,
402                 const void     *buffer,
403                 gsize           count,
404                 gboolean        blocking,
405                 GCancellable   *cancellable,
406                 GError        **error)
407 {
408   GConverterOutputStream *cstream;
409   GConverterOutputStreamPrivate *priv;
410   gssize retval;
411   GConverterResult res;
412   gsize bytes_read;
413   gsize bytes_written;
414   GError *my_error;
415   const char *to_convert;
416   gsize to_convert_size, converted_bytes;
417   gboolean converting_from_buffer;
418
419   cstream = G_CONVERTER_OUTPUT_STREAM (stream);
420   priv = cstream->priv;
421
422   /* Write out all available pre-converted data and fail if
423      not possible */
424   if (!flush_buffer (cstream, blocking, cancellable, error))
425     return -1;
426
427   if (priv->finished)
428     return 0;
429
430   /* Convert as much as possible */
431   if (buffer_data_size (&priv->output_buffer) > 0)
432     {
433       converting_from_buffer = TRUE;
434       buffer_append (&priv->output_buffer, buffer, count);
435       to_convert = buffer_data (&priv->output_buffer);
436       to_convert_size = buffer_data_size (&priv->output_buffer);
437     }
438   else
439     {
440       converting_from_buffer = FALSE;
441       to_convert = buffer;
442       to_convert_size = count;
443     }
444
445   /* Ensure we have *some* initial target space */
446   buffer_ensure_space (&priv->converted_buffer, to_convert_size);
447
448   converted_bytes = 0;
449   while (!priv->finished && converted_bytes < to_convert_size)
450     {
451       /* Ensure we have *some* target space */
452       if (buffer_tailspace (&priv->converted_buffer) == 0)
453         grow_buffer (&priv->converted_buffer);
454
455       /* Try to convert to our buffer */
456       my_error = NULL;
457       res = g_converter_convert (priv->converter,
458                                  to_convert + converted_bytes,
459                                  to_convert_size - converted_bytes,
460                                  buffer_data (&priv->converted_buffer) + buffer_data_size (&priv->converted_buffer),
461                                  buffer_tailspace (&priv->converted_buffer),
462                                  0,
463                                  &bytes_read,
464                                  &bytes_written,
465                                  &my_error);
466
467       if (res != G_CONVERTER_ERROR)
468         {
469           priv->converted_buffer.end += bytes_written;
470           converted_bytes += bytes_read;
471
472           if (res == G_CONVERTER_FINISHED)
473             priv->finished = TRUE;
474         }
475       else
476         {
477           /* No-space errors can be handled locally: */
478           if (g_error_matches (my_error,
479                                G_IO_ERROR,
480                                G_IO_ERROR_NO_SPACE))
481             {
482               /* Need more destination space, grow it
483                * Note: if we actually grow the buffer (as opposed to compacting it),
484                * this will double the size, not just add one byte. */
485               buffer_ensure_space (&priv->converted_buffer,
486                                    priv->converted_buffer.size + 1);
487               g_error_free (my_error);
488               continue;
489             }
490
491           if (converted_bytes > 0)
492             {
493               /* We got an conversion error, but we did convert some bytes before
494                  that, so handle those before reporting the error */
495               g_error_free (my_error);
496               break;
497             }
498
499           if (g_error_matches (my_error,
500                                G_IO_ERROR,
501                                G_IO_ERROR_PARTIAL_INPUT))
502             {
503               /* Consume everything to buffer that we append to next time
504                  we write */
505               if (!converting_from_buffer)
506                 buffer_append (&priv->output_buffer, buffer, count);
507               /* in the converting_from_buffer case we already appended this */
508
509               g_error_free (my_error);
510               return count; /* consume everything */
511             }
512
513           /* Converted no data and got an normal error, return it */
514           g_propagate_error (error, my_error);
515           return -1;
516         }
517     }
518
519   if (converting_from_buffer)
520     {
521       buffer_consumed (&priv->output_buffer, converted_bytes);
522       retval = count;
523     }
524   else
525     retval = converted_bytes;
526
527   /* We now successfully consumed retval bytes, so we can't return an error,
528      even if writing this to the base stream fails. If it does we'll just
529      stop early and report this error when we try again on the next
530      write call. */
531   flush_buffer (cstream, blocking, cancellable, NULL);
532
533   return retval;
534 }
535
536 static gssize
537 g_converter_output_stream_write (GOutputStream  *stream,
538                                  const void     *buffer,
539                                  gsize           count,
540                                  GCancellable   *cancellable,
541                                  GError        **error)
542 {
543   return write_internal (stream, buffer, count, TRUE, cancellable, error);
544 }
545
546 static gboolean
547 g_converter_output_stream_flush (GOutputStream  *stream,
548                                  GCancellable   *cancellable,
549                                  GError        **error)
550 {
551   GConverterOutputStream *cstream;
552   GConverterOutputStreamPrivate *priv;
553   GConverterResult res;
554   GError *my_error;
555   gboolean is_closing;
556   gboolean flushed;
557   gsize bytes_read;
558   gsize bytes_written;
559
560   cstream = G_CONVERTER_OUTPUT_STREAM (stream);
561   priv = cstream->priv;
562
563   is_closing = g_output_stream_is_closing (stream);
564
565   /* Write out all available pre-converted data and fail if
566      not possible */
567   if (!flush_buffer (cstream, TRUE, cancellable, error))
568     return FALSE;
569
570   /* Ensure we have *some* initial target space */
571   buffer_ensure_space (&priv->converted_buffer, 1);
572
573   /* Convert whole buffer */
574   flushed = FALSE;
575   while (!priv->finished && !flushed)
576     {
577       /* Ensure we have *some* target space */
578       if (buffer_tailspace (&priv->converted_buffer) == 0)
579         grow_buffer (&priv->converted_buffer);
580
581       /* Try to convert to our buffer */
582       my_error = NULL;
583       res = g_converter_convert (priv->converter,
584                                  buffer_data (&priv->output_buffer),
585                                  buffer_data_size (&priv->output_buffer),
586                                  buffer_data (&priv->converted_buffer) + buffer_data_size (&priv->converted_buffer),
587                                  buffer_tailspace (&priv->converted_buffer),
588                                  is_closing ? G_CONVERTER_INPUT_AT_END : G_CONVERTER_FLUSH,
589                                  &bytes_read,
590                                  &bytes_written,
591                                  &my_error);
592
593       if (res != G_CONVERTER_ERROR)
594         {
595           priv->converted_buffer.end += bytes_written;
596           buffer_consumed (&priv->output_buffer, bytes_read);
597
598           if (res == G_CONVERTER_FINISHED)
599             priv->finished = TRUE;
600           if (!is_closing &&
601               res == G_CONVERTER_FLUSHED)
602             {
603               /* Should not have retured FLUSHED with input left */
604               g_assert (buffer_data_size (&priv->output_buffer) == 0);
605               flushed = TRUE;
606             }
607         }
608       else
609         {
610           /* No-space errors can be handled locally: */
611           if (g_error_matches (my_error,
612                                G_IO_ERROR,
613                                G_IO_ERROR_NO_SPACE))
614             {
615               /* Need more destination space, grow it
616                * Note: if we actually grow the buffer (as opposed to compacting it),
617                * this will double the size, not just add one byte. */
618               buffer_ensure_space (&priv->converted_buffer,
619                                    priv->converted_buffer.size + 1);
620               g_error_free (my_error);
621               continue;
622             }
623
624           /* Any other error, including PARTIAL_INPUT can't be fixed by now
625              and is an error */
626           g_propagate_error (error, my_error);
627           return FALSE;
628         }
629     }
630
631   /* Now write all converted data to base stream */
632   if (!flush_buffer (cstream, TRUE, cancellable, error))
633     return FALSE;
634
635   return TRUE;
636 }
637
638 static gboolean
639 g_converter_output_stream_can_poll (GPollableOutputStream *stream)
640 {
641   GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
642
643   return (G_IS_POLLABLE_OUTPUT_STREAM (base_stream) &&
644           g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (base_stream)));
645 }
646
647 static gboolean
648 g_converter_output_stream_is_writable (GPollableOutputStream *stream)
649 {
650   GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
651
652   return g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (base_stream));
653 }
654
655 static gssize
656 g_converter_output_stream_write_nonblocking (GPollableOutputStream  *stream,
657                                              const void             *buffer,
658                                              gsize                   count,
659                                              GError                **error)
660 {
661   return write_internal (G_OUTPUT_STREAM (stream), buffer, count, FALSE,
662                          NULL, error);
663 }
664
665 static GSource *
666 g_converter_output_stream_create_source (GPollableOutputStream *stream,
667                                          GCancellable          *cancellable)
668 {
669   GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
670   GSource *base_source, *pollable_source;
671
672   base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (base_stream), NULL);
673   pollable_source = g_pollable_source_new_full (stream, base_source,
674                                                 cancellable);
675   g_source_unref (base_source);
676
677   return pollable_source;
678 }
679
680 /**
681  * g_converter_output_stream_get_converter:
682  * @converter_stream: a #GConverterOutputStream
683  *
684  * Gets the #GConverter that is used by @converter_stream.
685  *
686  * Returns: (transfer none): the converter of the converter output stream
687  *
688  * Since: 2.24
689  */
690 GConverter *
691 g_converter_output_stream_get_converter (GConverterOutputStream *converter_stream)
692 {
693   return converter_stream->priv->converter;
694 }