1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * soup-body-input-stream.c
5 * Copyright 2012 Red Hat, Inc.
14 #include <glib/gi18n-lib.h>
16 #include "soup-body-input-stream.h"
18 #include "soup-filter-input-stream.h"
19 #include "soup-marshal.h"
22 SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE,
23 SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END,
24 SOUP_BODY_INPUT_STREAM_STATE_CHUNK,
25 SOUP_BODY_INPUT_STREAM_STATE_TRAILERS,
26 SOUP_BODY_INPUT_STREAM_STATE_DONE
27 } SoupBodyInputStreamState;
29 struct _SoupBodyInputStreamPrivate {
30 GInputStream *base_stream;
32 SoupEncoding encoding;
34 SoupBodyInputStreamState chunked_state;
45 static guint signals[LAST_SIGNAL] = { 0 };
54 static void soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
55 static void soup_body_input_stream_seekable_init (GSeekableIface *seekable_interface);
57 G_DEFINE_TYPE_WITH_CODE (SoupBodyInputStream, soup_body_input_stream, G_TYPE_FILTER_INPUT_STREAM,
58 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
59 soup_body_input_stream_pollable_init)
60 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
61 soup_body_input_stream_seekable_init))
64 soup_body_input_stream_init (SoupBodyInputStream *bistream)
66 bistream->priv = G_TYPE_INSTANCE_GET_PRIVATE (bistream,
67 SOUP_TYPE_BODY_INPUT_STREAM,
68 SoupBodyInputStreamPrivate);
69 bistream->priv->encoding = SOUP_ENCODING_NONE;
73 soup_body_input_stream_constructed (GObject *object)
75 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
77 bistream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (bistream));
79 if (bistream->priv->encoding == SOUP_ENCODING_NONE ||
80 (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH &&
81 bistream->priv->read_length == 0))
82 bistream->priv->eof = TRUE;
86 soup_body_input_stream_set_property (GObject *object, guint prop_id,
87 const GValue *value, GParamSpec *pspec)
89 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
93 bistream->priv->encoding = g_value_get_enum (value);
94 if (bistream->priv->encoding == SOUP_ENCODING_CHUNKED)
95 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
97 case PROP_CONTENT_LENGTH:
98 bistream->priv->read_length = g_value_get_int64 (value);
101 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
107 soup_body_input_stream_get_property (GObject *object, guint prop_id,
108 GValue *value, GParamSpec *pspec)
110 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
114 g_value_set_enum (value, bistream->priv->encoding);
117 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
123 soup_body_input_stream_read_raw (SoupBodyInputStream *bistream,
127 GCancellable *cancellable,
132 nread = g_pollable_stream_read (bistream->priv->base_stream,
137 bistream->priv->eof = TRUE;
138 if (bistream->priv->encoding != SOUP_ENCODING_EOF) {
139 g_set_error_literal (error, G_IO_ERROR,
140 G_IO_ERROR_PARTIAL_INPUT,
141 _("Connection terminated unexpectedly"));
149 soup_body_input_stream_read_chunked (SoupBodyInputStream *bistream,
153 GCancellable *cancellable,
156 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream);
162 switch (bistream->priv->chunked_state) {
163 case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE:
164 nread = soup_filter_input_stream_read_line (
165 fstream, metabuf, sizeof (metabuf), blocking,
166 &got_line, cancellable, error);
170 g_set_error_literal (error, G_IO_ERROR,
171 G_IO_ERROR_PARTIAL_INPUT,
172 _("Connection terminated unexpectedly"));
176 bistream->priv->read_length = strtoul (metabuf, NULL, 16);
177 if (bistream->priv->read_length > 0)
178 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK;
180 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_TRAILERS;
183 case SOUP_BODY_INPUT_STREAM_STATE_CHUNK:
184 nread = soup_body_input_stream_read_raw (
186 MIN (count, bistream->priv->read_length),
187 blocking, cancellable, error);
189 bistream->priv->read_length -= nread;
190 if (bistream->priv->read_length == 0)
191 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END;
195 case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END:
196 nread = soup_filter_input_stream_read_line (
197 SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream),
198 metabuf, sizeof (metabuf), blocking,
199 &got_line, cancellable, error);
203 g_set_error_literal (error, G_IO_ERROR,
204 G_IO_ERROR_PARTIAL_INPUT,
205 _("Connection terminated unexpectedly"));
209 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
212 case SOUP_BODY_INPUT_STREAM_STATE_TRAILERS:
213 nread = soup_filter_input_stream_read_line (
214 fstream, buffer, count, blocking,
215 &got_line, cancellable, error);
219 if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread)) {
220 bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_DONE;
221 bistream->priv->eof = TRUE;
225 case SOUP_BODY_INPUT_STREAM_STATE_DONE:
233 read_internal (GInputStream *stream,
237 GCancellable *cancellable,
240 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
243 if (bistream->priv->eof)
246 switch (bistream->priv->encoding) {
247 case SOUP_ENCODING_NONE:
250 case SOUP_ENCODING_CHUNKED:
251 return soup_body_input_stream_read_chunked (bistream, buffer, count,
252 blocking, cancellable, error);
254 case SOUP_ENCODING_CONTENT_LENGTH:
255 case SOUP_ENCODING_EOF:
256 if (bistream->priv->read_length != -1) {
257 count = MIN (count, bistream->priv->read_length);
262 nread = soup_body_input_stream_read_raw (bistream, buffer, count,
263 blocking, cancellable, error);
264 if (bistream->priv->read_length != -1 && nread > 0)
265 bistream->priv->read_length -= nread;
267 if (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH)
268 bistream->priv->pos += nread;
272 g_return_val_if_reached (-1);
277 soup_body_input_stream_skip (GInputStream *stream,
279 GCancellable *cancellable,
282 SoupBodyInputStreamPrivate *priv = SOUP_BODY_INPUT_STREAM(stream)->priv;
285 skipped = g_input_stream_skip (G_FILTER_INPUT_STREAM (stream)->base_stream,
286 MIN (count, priv->read_length),
290 priv->pos += skipped;
296 soup_body_input_stream_read_fn (GInputStream *stream,
299 GCancellable *cancellable,
302 return read_internal (stream, buffer, count, TRUE,
307 soup_body_input_stream_close_fn (GInputStream *stream,
308 GCancellable *cancellable,
311 g_signal_emit (stream, signals[CLOSED], 0);
313 return G_INPUT_STREAM_CLASS (soup_body_input_stream_parent_class)->close_fn (stream, cancellable, error);
317 soup_body_input_stream_is_readable (GPollableInputStream *stream)
319 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
321 return bistream->priv->eof ||
322 g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream));
326 soup_body_input_stream_can_poll (GPollableInputStream *pollable)
328 GInputStream *base_stream = SOUP_BODY_INPUT_STREAM (pollable)->priv->base_stream;
330 return G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
331 g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream));
335 soup_body_input_stream_read_nonblocking (GPollableInputStream *stream,
340 return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
345 soup_body_input_stream_create_source (GPollableInputStream *stream,
346 GCancellable *cancellable)
348 SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
349 GSource *base_source, *pollable_source;
351 if (bistream->priv->eof)
352 base_source = g_timeout_source_new (0);
354 base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream), cancellable);
355 g_source_set_dummy_callback (base_source);
357 pollable_source = g_pollable_source_new (G_OBJECT (stream));
358 g_source_add_child_source (pollable_source, base_source);
359 g_source_unref (base_source);
361 return pollable_source;
365 soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
367 GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
368 GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
370 g_type_class_add_private (stream_class, sizeof (SoupBodyInputStreamPrivate));
372 object_class->constructed = soup_body_input_stream_constructed;
373 object_class->set_property = soup_body_input_stream_set_property;
374 object_class->get_property = soup_body_input_stream_get_property;
376 input_stream_class->skip = soup_body_input_stream_skip;
377 input_stream_class->read_fn = soup_body_input_stream_read_fn;
378 input_stream_class->close_fn = soup_body_input_stream_close_fn;
381 g_signal_new ("closed",
382 G_OBJECT_CLASS_TYPE (object_class),
386 _soup_marshal_NONE__NONE,
389 g_object_class_install_property (
390 object_class, PROP_ENCODING,
391 g_param_spec_enum ("encoding",
393 "Message body encoding",
396 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
397 g_object_class_install_property (
398 object_class, PROP_CONTENT_LENGTH,
399 g_param_spec_int64 ("content-length",
401 "Message body Content-Length",
403 G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
407 soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
408 gpointer interface_data)
410 pollable_interface->can_poll = soup_body_input_stream_can_poll;
411 pollable_interface->is_readable = soup_body_input_stream_is_readable;
412 pollable_interface->read_nonblocking = soup_body_input_stream_read_nonblocking;
413 pollable_interface->create_source = soup_body_input_stream_create_source;
417 soup_body_input_stream_tell (GSeekable *seekable)
419 return SOUP_BODY_INPUT_STREAM (seekable)->priv->pos;
423 soup_body_input_stream_can_seek (GSeekable *seekable)
425 SoupBodyInputStreamPrivate *priv = SOUP_BODY_INPUT_STREAM (seekable)->priv;
427 return priv->encoding == SOUP_ENCODING_CONTENT_LENGTH
428 && G_IS_SEEKABLE (priv->base_stream)
429 && g_seekable_can_seek (G_SEEKABLE (priv->base_stream));
433 soup_body_input_stream_seek (GSeekable *seekable,
436 GCancellable *cancellable,
439 SoupBodyInputStreamPrivate *priv = SOUP_BODY_INPUT_STREAM (seekable)->priv;
440 goffset position, end_position;
442 end_position = priv->pos + priv->read_length;
445 position = priv->pos + offset;
451 position = end_position + offset;
454 g_return_val_if_reached (FALSE);
457 if (position < 0 || position >= end_position) {
458 g_set_error_literal (error,
460 G_IO_ERROR_INVALID_ARGUMENT,
461 _("Invalid seek request"));
465 if (!g_seekable_seek (G_SEEKABLE (priv->base_stream), position - priv->pos,
466 G_SEEK_CUR, cancellable, error))
469 priv->pos = position;
475 soup_body_input_stream_can_truncate (GSeekable *seekable)
481 soup_body_input_stream_truncate_fn (GSeekable *seekable,
483 GCancellable *cancellable,
486 g_set_error_literal (error,
488 G_IO_ERROR_NOT_SUPPORTED,
489 _("Cannot truncate SoupBodyInputStream"));
494 soup_body_input_stream_seekable_init (GSeekableIface *seekable_interface)
496 seekable_interface->tell = soup_body_input_stream_tell;
497 seekable_interface->can_seek = soup_body_input_stream_can_seek;
498 seekable_interface->seek = soup_body_input_stream_seek;
499 seekable_interface->can_truncate = soup_body_input_stream_can_truncate;
500 seekable_interface->truncate_fn = soup_body_input_stream_truncate_fn;
504 soup_body_input_stream_new (GInputStream *base_stream,
505 SoupEncoding encoding,
506 goffset content_length)
508 if (encoding == SOUP_ENCODING_CHUNKED)
509 g_return_val_if_fail (SOUP_IS_FILTER_INPUT_STREAM (base_stream), NULL);
511 return g_object_new (SOUP_TYPE_BODY_INPUT_STREAM,
512 "base-stream", base_stream,
513 "close-base-stream", FALSE,
514 "encoding", encoding,
515 "content-length", content_length,