Remove build warning
[platform/upstream/libsoup.git] / libsoup / soup-cache-input-stream.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2012 Igalia, S.L.
4  */
5
6 #ifdef HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9
10 #include <glib/gi18n-lib.h>
11 #include "soup-cache-input-stream.h"
12 #include "soup-message-body.h"
13
14 static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
15
16 G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
17                          G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
18                                                 soup_cache_input_stream_pollable_init))
19
20 /* properties */
21 enum {
22         PROP_0,
23
24         PROP_OUTPUT_STREAM,
25
26         LAST_PROP
27 };
28
29 enum {
30         CACHING_FINISHED,
31
32         LAST_SIGNAL
33 };
34
35 static guint signals[LAST_SIGNAL] = { 0 };
36
37 struct _SoupCacheInputStreamPrivate
38 {
39         GOutputStream *output_stream;
40         GCancellable *cancellable;
41         gsize bytes_written;
42
43         gboolean read_finished;
44         SoupBuffer *current_writing_buffer;
45         GQueue *buffer_queue;
46 };
47
48 static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
49
50 static inline void
51 notify_and_clear (SoupCacheInputStream *istream, GError *error)
52 {
53         SoupCacheInputStreamPrivate *priv = istream->priv;
54
55         g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);
56
57         g_clear_object (&priv->cancellable);
58         g_clear_object (&priv->output_stream);
59 }
60
61 static inline void
62 try_write_next_buffer (SoupCacheInputStream *istream)
63 {
64         SoupCacheInputStreamPrivate *priv = istream->priv;
65
66         if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
67                 soup_cache_input_stream_write_next_buffer (istream);
68         else if (priv->read_finished)
69                 notify_and_clear (istream, NULL);
70         else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
71                 GError *error = NULL;
72                 g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
73                                      _("Network stream unexpectedly closed"));
74                 notify_and_clear (istream, error);
75         }
76 }
77
78 static void
79 file_replaced_cb (GObject      *source,
80                   GAsyncResult *res,
81                   gpointer      user_data)
82 {
83         SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
84         SoupCacheInputStreamPrivate *priv = istream->priv;
85         GError *error = NULL;
86
87         priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
88
89         if (error)
90                 notify_and_clear (istream, error);
91         else
92                 try_write_next_buffer (istream);
93
94         g_object_unref (istream);
95 }
96
97 static void
98 soup_cache_input_stream_init (SoupCacheInputStream *self)
99 {
100         SoupCacheInputStreamPrivate *priv =
101                 G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
102                                              SoupCacheInputStreamPrivate);
103
104         priv->buffer_queue = g_queue_new ();
105         self->priv = priv;
106 }
107
108 static void
109 soup_cache_input_stream_get_property (GObject *object,
110                                       guint property_id, GValue *value, GParamSpec *pspec)
111 {
112         SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
113         SoupCacheInputStreamPrivate *priv = self->priv;
114
115         switch (property_id) {
116         case PROP_OUTPUT_STREAM:
117                 g_value_set_object (value, priv->output_stream);
118                 break;
119         default:
120                 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
121                 break;
122         }
123 }
124
125 static void
126 soup_cache_input_stream_set_property (GObject *object,
127                                       guint property_id, const GValue *value, GParamSpec *pspec)
128 {
129         SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
130         SoupCacheInputStreamPrivate *priv = self->priv;
131
132         switch (property_id) {
133         case PROP_OUTPUT_STREAM:
134                 priv->output_stream = g_value_dup_object (value);
135                 break;
136         default:
137                 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
138                 break;
139         }
140 }
141
142 static void
143 soup_cache_input_stream_finalize (GObject *object)
144 {
145         SoupCacheInputStream *self = (SoupCacheInputStream *)object;
146         SoupCacheInputStreamPrivate *priv = self->priv;
147
148         g_clear_object (&priv->cancellable);
149         g_clear_object (&priv->output_stream);
150         g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
151         g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
152
153         G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
154 }
155
156 static void
157 write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
158 {
159         GOutputStream *ostream = G_OUTPUT_STREAM (source);
160         SoupCacheInputStreamPrivate *priv = istream->priv;
161         gssize write_size;
162         gsize pending;
163         GError *error = NULL;
164
165         write_size = g_output_stream_write_finish (ostream, result, &error);
166         if (error) {
167                 notify_and_clear (istream, error);
168                 g_object_unref (istream);
169                 return;
170         }
171
172         /* Check that we have written everything */
173         pending = priv->current_writing_buffer->length - write_size;
174         if (pending) {
175                 SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
176                                                                    write_size, pending);
177                 g_queue_push_head (priv->buffer_queue, subbuffer);
178         }
179
180         priv->bytes_written += write_size;
181         g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
182
183         try_write_next_buffer (istream);
184         g_object_unref (istream);
185 }
186
187 static void
188 soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
189 {
190         SoupCacheInputStreamPrivate *priv = istream->priv;
191         SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
192         int priority;
193
194         g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
195
196         g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
197         priv->current_writing_buffer = buffer;
198
199         if (priv->buffer_queue->length > 10)
200                 priority = G_PRIORITY_DEFAULT;
201         else
202                 priority = G_PRIORITY_LOW;
203
204         g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
205                                      priority, priv->cancellable,
206                                      (GAsyncReadyCallback) write_ready_cb,
207                                      g_object_ref (istream));
208 }
209
210 static gssize
211 read_internal (GInputStream  *stream,
212                void          *buffer,
213                gsize          count,
214                gboolean       blocking,
215                GCancellable  *cancellable,
216                GError       **error)
217 {
218         SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
219         SoupCacheInputStreamPrivate *priv = istream->priv;
220         GInputStream *base_stream;
221         gssize nread;
222
223         base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
224         nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
225                                         cancellable, error);
226
227         if (G_UNLIKELY (nread == -1 || priv->read_finished))
228                 return nread;
229
230         if (nread == 0) {
231                 priv->read_finished = TRUE;
232
233                 if (priv->current_writing_buffer == NULL && priv->output_stream)
234                         notify_and_clear (istream, NULL);
235         } else {
236                 SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
237                 g_queue_push_tail (priv->buffer_queue, soup_buffer);
238
239                 if (priv->current_writing_buffer == NULL && priv->output_stream)
240                         soup_cache_input_stream_write_next_buffer (istream);
241         }
242
243         return nread;
244 }
245
246 static gssize
247 soup_cache_input_stream_read_fn (GInputStream  *stream,
248                                  void          *buffer,
249                                  gsize          count,
250                                  GCancellable  *cancellable,
251                                  GError       **error)
252 {
253         return read_internal (stream, buffer, count, TRUE,
254                               cancellable, error);
255 }
256
257 static gssize
258 soup_cache_input_stream_read_nonblocking (GPollableInputStream  *stream,
259                                           void                  *buffer,
260                                           gsize                  count,
261                                           GError               **error)
262 {
263         return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
264                               NULL, error);
265 }
266
267 static void
268 soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
269                                        gpointer interface_data)
270 {
271         pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
272 }
273
274 static gboolean
275 soup_cache_input_stream_close_fn (GInputStream  *stream,
276                                   GCancellable  *cancellable,
277                                   GError       **error)
278 {
279         SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
280         SoupCacheInputStreamPrivate *priv = istream->priv;
281
282         if (!priv->read_finished) {
283                 if (priv->output_stream) {
284                         /* Cancel any pending write operation or return an error if none. */
285                         if (g_output_stream_has_pending (priv->output_stream))
286                                 g_cancellable_cancel (priv->cancellable);
287                         else {
288                                 GError *error = NULL;
289                                 g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
290                                                      _("Failed to completely cache the resource"));
291                                 notify_and_clear (istream, error);
292                         }
293                 } else if (priv->cancellable)
294                         /* The file_replace_async() hasn't finished yet */
295                         g_cancellable_cancel (priv->cancellable);
296         }
297
298         return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
299 }
300
301 static void
302 soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
303 {
304         GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
305         GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
306
307         g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
308
309         gobject_class->get_property = soup_cache_input_stream_get_property;
310         gobject_class->set_property = soup_cache_input_stream_set_property;
311         gobject_class->finalize = soup_cache_input_stream_finalize;
312
313         istream_class->read_fn = soup_cache_input_stream_read_fn;
314         istream_class->close_fn = soup_cache_input_stream_close_fn;
315
316         g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
317                                          g_param_spec_object ("output-stream", "Output stream",
318                                                               "the output stream where to write.",
319                                                               G_TYPE_OUTPUT_STREAM,
320                                                               G_PARAM_READWRITE |
321                                                               G_PARAM_CONSTRUCT_ONLY |
322                                                               G_PARAM_STATIC_STRINGS));
323
324         signals[CACHING_FINISHED] =
325                 g_signal_new ("caching-finished",
326                               G_OBJECT_CLASS_TYPE (gobject_class),
327                               G_SIGNAL_RUN_FIRST,
328                               G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
329                               NULL, NULL,
330                               NULL,
331                               G_TYPE_NONE, 2,
332                               G_TYPE_INT, G_TYPE_ERROR);
333 }
334
335 GInputStream *
336 soup_cache_input_stream_new (GInputStream *base_stream,
337                              GFile        *file)
338 {
339         SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
340                                               "base-stream", base_stream,
341                                               "close-base-stream", FALSE,
342                                               NULL);
343
344         istream->priv->cancellable = g_cancellable_new ();
345         g_file_replace_async (file, NULL, FALSE,
346                               G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
347                               G_PRIORITY_DEFAULT, istream->priv->cancellable,
348                               file_replaced_cb, g_object_ref (istream));
349
350         return (GInputStream *) istream;
351 }