1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * soup-body-input-stream.c
5 * Copyright 2012 Red Hat, Inc.
14 #include <glib/gi18n-lib.h>
16 #include "soup-body-input-stream.h"
18 #include "soup-filter-input-stream.h"
21 SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE,
22 SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END,
23 SOUP_BODY_INPUT_STREAM_STATE_CHUNK,
24 SOUP_BODY_INPUT_STREAM_STATE_TRAILERS,
25 SOUP_BODY_INPUT_STREAM_STATE_DONE
26 } SoupBodyInputStreamState;
28 struct _SoupBodyInputStreamPrivate {
29 GInputStream *base_stream;
31 SoupEncoding encoding;
33 SoupBodyInputStreamState chunked_state;
44 static guint signals[LAST_SIGNAL] = { 0 };
53 static void soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
54 static void soup_body_input_stream_seekable_init (GSeekableIface *seekable_interface);
56 G_DEFINE_TYPE_WITH_CODE (SoupBodyInputStream, soup_body_input_stream, G_TYPE_FILTER_INPUT_STREAM,
57 G_ADD_PRIVATE (SoupBodyInputStream)
58 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
59 soup_body_input_stream_pollable_init)
60 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
61 soup_body_input_stream_seekable_init))
64 soup_body_input_stream_init (SoupBodyInputStream *bistream)
66 bistream->priv = soup_body_input_stream_get_instance_private (bistream);
67 bistream->priv->encoding = SOUP_ENCODING_NONE;
71 soup_body_input_stream_constructed (GObject *object)
73 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
75 bistream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (bistream));
77 if (bistream->priv->encoding == SOUP_ENCODING_NONE ||
78 (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH &&
79 bistream->priv->read_length == 0))
80 bistream->priv->eof = TRUE;
84 soup_body_input_stream_set_property (GObject *object, guint prop_id,
85 const GValue *value, GParamSpec *pspec)
87 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
91 bistream->priv->encoding = g_value_get_enum (value);
92 if (bistream->priv->encoding == SOUP_ENCODING_CHUNKED)
93 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
95 case PROP_CONTENT_LENGTH:
96 bistream->priv->read_length = g_value_get_int64 (value);
99 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
105 soup_body_input_stream_get_property (GObject *object, guint prop_id,
106 GValue *value, GParamSpec *pspec)
108 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
112 g_value_set_enum (value, bistream->priv->encoding);
115 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
121 soup_body_input_stream_read_raw (SoupBodyInputStream *bistream,
125 GCancellable *cancellable,
130 nread = g_pollable_stream_read (bistream->priv->base_stream,
135 bistream->priv->eof = TRUE;
136 if (bistream->priv->encoding != SOUP_ENCODING_EOF) {
137 g_set_error_literal (error, G_IO_ERROR,
138 G_IO_ERROR_PARTIAL_INPUT,
139 _("Connection terminated unexpectedly"));
147 soup_body_input_stream_read_chunked (SoupBodyInputStream *bistream,
151 GCancellable *cancellable,
154 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream);
160 switch (bistream->priv->chunked_state) {
161 case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE:
162 nread = soup_filter_input_stream_read_line (
163 fstream, metabuf, sizeof (metabuf), blocking,
164 &got_line, cancellable, error);
168 g_set_error_literal (error, G_IO_ERROR,
169 G_IO_ERROR_PARTIAL_INPUT,
170 _("Connection terminated unexpectedly"));
174 bistream->priv->read_length = strtoul (metabuf, NULL, 16);
175 if (bistream->priv->read_length > 0)
176 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK;
178 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_TRAILERS;
181 case SOUP_BODY_INPUT_STREAM_STATE_CHUNK:
182 nread = soup_body_input_stream_read_raw (
184 MIN (count, bistream->priv->read_length),
185 blocking, cancellable, error);
187 bistream->priv->read_length -= nread;
188 if (bistream->priv->read_length == 0)
189 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END;
193 case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END:
194 nread = soup_filter_input_stream_read_line (
195 SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream),
196 metabuf, sizeof (metabuf), blocking,
197 &got_line, cancellable, error);
201 g_set_error_literal (error, G_IO_ERROR,
202 G_IO_ERROR_PARTIAL_INPUT,
203 _("Connection terminated unexpectedly"));
207 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
210 case SOUP_BODY_INPUT_STREAM_STATE_TRAILERS:
211 nread = soup_filter_input_stream_read_line (
212 fstream, buffer, count, blocking,
213 &got_line, cancellable, error);
217 if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread)) {
218 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_DONE;
219 bistream->priv->eof = TRUE;
223 case SOUP_BODY_INPUT_STREAM_STATE_DONE:
231 read_internal (GInputStream *stream,
235 GCancellable *cancellable,
238 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
241 if (bistream->priv->eof)
244 switch (bistream->priv->encoding) {
245 case SOUP_ENCODING_NONE:
248 case SOUP_ENCODING_CHUNKED:
249 return soup_body_input_stream_read_chunked (bistream, buffer, count,
250 blocking, cancellable, error);
252 case SOUP_ENCODING_CONTENT_LENGTH:
253 case SOUP_ENCODING_EOF:
254 if (bistream->priv->read_length != -1) {
255 count = MIN (count, bistream->priv->read_length);
260 nread = soup_body_input_stream_read_raw (bistream, buffer, count,
261 blocking, cancellable, error);
262 if (bistream->priv->read_length != -1 && nread > 0)
263 bistream->priv->read_length -= nread;
265 if (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH)
266 bistream->priv->pos += nread;
270 g_return_val_if_reached (-1);
275 soup_body_input_stream_skip (GInputStream *stream,
277 GCancellable *cancellable,
280 SoupBodyInputStreamPrivate *priv = SOUP_BODY_INPUT_STREAM(stream)->priv;
283 skipped = g_input_stream_skip (G_FILTER_INPUT_STREAM (stream)->base_stream,
284 MIN (count, priv->read_length),
289 else if (skipped > 0)
290 priv->pos += skipped;
296 soup_body_input_stream_read_fn (GInputStream *stream,
299 GCancellable *cancellable,
302 return read_internal (stream, buffer, count, TRUE,
307 soup_body_input_stream_close_fn (GInputStream *stream,
308 GCancellable *cancellable,
311 g_signal_emit (stream, signals[CLOSED], 0);
313 return G_INPUT_STREAM_CLASS (soup_body_input_stream_parent_class)->close_fn (stream, cancellable, error);
317 soup_body_input_stream_is_readable (GPollableInputStream *stream)
319 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
321 return bistream->priv->eof ||
322 g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream));
326 soup_body_input_stream_can_poll (GPollableInputStream *pollable)
328 GInputStream *base_stream = SOUP_BODY_INPUT_STREAM (pollable)->priv->base_stream;
330 return G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
331 g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream));
335 soup_body_input_stream_read_nonblocking (GPollableInputStream *stream,
340 return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
345 soup_body_input_stream_create_source (GPollableInputStream *stream,
346 GCancellable *cancellable)
348 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
349 GSource *base_source, *pollable_source;
351 if (bistream->priv->eof)
352 base_source = g_timeout_source_new (0);
354 base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream), cancellable);
355 g_source_set_dummy_callback (base_source);
357 pollable_source = g_pollable_source_new (G_OBJECT (stream));
358 g_source_add_child_source (pollable_source, base_source);
359 g_source_unref (base_source);
361 return pollable_source;
365 soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
367 GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
368 GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
370 object_class->constructed = soup_body_input_stream_constructed;
371 object_class->set_property = soup_body_input_stream_set_property;
372 object_class->get_property = soup_body_input_stream_get_property;
374 input_stream_class->skip = soup_body_input_stream_skip;
375 input_stream_class->read_fn = soup_body_input_stream_read_fn;
376 input_stream_class->close_fn = soup_body_input_stream_close_fn;
379 g_signal_new ("closed",
380 G_OBJECT_CLASS_TYPE (object_class),
387 g_object_class_install_property (
388 object_class, PROP_ENCODING,
389 g_param_spec_enum ("encoding",
391 "Message body encoding",
394 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
395 g_object_class_install_property (
396 object_class, PROP_CONTENT_LENGTH,
397 g_param_spec_int64 ("content-length",
399 "Message body Content-Length",
401 G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
405 soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
406 gpointer interface_data)
408 pollable_interface->can_poll = soup_body_input_stream_can_poll;
409 pollable_interface->is_readable = soup_body_input_stream_is_readable;
410 pollable_interface->read_nonblocking = soup_body_input_stream_read_nonblocking;
411 pollable_interface->create_source = soup_body_input_stream_create_source;
415 soup_body_input_stream_tell (GSeekable *seekable)
417 return SOUP_BODY_INPUT_STREAM (seekable)->priv->pos;
421 soup_body_input_stream_can_seek (GSeekable *seekable)
423 SoupBodyInputStreamPrivate *priv = SOUP_BODY_INPUT_STREAM (seekable)->priv;
425 return priv->encoding == SOUP_ENCODING_CONTENT_LENGTH
426 && G_IS_SEEKABLE (priv->base_stream)
427 && g_seekable_can_seek (G_SEEKABLE (priv->base_stream));
431 soup_body_input_stream_seek (GSeekable *seekable,
434 GCancellable *cancellable,
437 SoupBodyInputStreamPrivate *priv = SOUP_BODY_INPUT_STREAM (seekable)->priv;
438 goffset position, end_position;
440 end_position = priv->pos + priv->read_length;
443 position = priv->pos + offset;
449 position = end_position + offset;
452 g_return_val_if_reached (FALSE);
455 if (position < 0 || position >= end_position) {
456 g_set_error_literal (error,
458 G_IO_ERROR_INVALID_ARGUMENT,
459 _("Invalid seek request"));
463 if (!g_seekable_seek (G_SEEKABLE (priv->base_stream), position - priv->pos,
464 G_SEEK_CUR, cancellable, error))
467 priv->pos = position;
473 soup_body_input_stream_can_truncate (GSeekable *seekable)
479 soup_body_input_stream_truncate_fn (GSeekable *seekable,
481 GCancellable *cancellable,
484 g_set_error_literal (error,
486 G_IO_ERROR_NOT_SUPPORTED,
487 _("Cannot truncate SoupBodyInputStream"));
492 soup_body_input_stream_seekable_init (GSeekableIface *seekable_interface)
494 seekable_interface->tell = soup_body_input_stream_tell;
495 seekable_interface->can_seek = soup_body_input_stream_can_seek;
496 seekable_interface->seek = soup_body_input_stream_seek;
497 seekable_interface->can_truncate = soup_body_input_stream_can_truncate;
498 seekable_interface->truncate_fn = soup_body_input_stream_truncate_fn;
502 soup_body_input_stream_new (GInputStream *base_stream,
503 SoupEncoding encoding,
504 goffset content_length)
506 if (encoding == SOUP_ENCODING_CHUNKED)
507 g_return_val_if_fail (SOUP_IS_FILTER_INPUT_STREAM (base_stream), NULL);
509 return g_object_new (SOUP_TYPE_BODY_INPUT_STREAM,
510 "base-stream", base_stream,
511 "close-base-stream", FALSE,
512 "encoding", encoding,
513 "content-length", content_length,