1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * soup-filter-input-stream.c
5 * Copyright 2012 Red Hat, Inc.
14 #include "soup-filter-input-stream.h"
17 /* This is essentially a subset of GDataInputStream, except that we
18 * can do the equivalent of "fill_nonblocking()" on it. (We could use
19 * an actual GDataInputStream, and implement the nonblocking semantics
20 * via fill_async(), but that would be more work...)
23 struct _SoupFilterInputStreamPrivate {
26 gboolean in_read_until;
29 static void soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
31 G_DEFINE_TYPE_WITH_CODE (SoupFilterInputStream, soup_filter_input_stream, G_TYPE_FILTER_INPUT_STREAM,
32 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
33 soup_filter_input_stream_pollable_init))
36 soup_filter_input_stream_init (SoupFilterInputStream *stream)
38 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
39 SOUP_TYPE_FILTER_INPUT_STREAM,
40 SoupFilterInputStreamPrivate);
44 soup_filter_input_stream_finalize (GObject *object)
46 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (object);
48 g_clear_pointer (&fstream->priv->buf, g_byte_array_unref);
50 G_OBJECT_CLASS (soup_filter_input_stream_parent_class)->finalize (object);
54 read_from_buf (SoupFilterInputStream *fstream, gpointer buffer, gsize count)
56 GByteArray *buf = fstream->priv->buf;
60 memcpy (buffer, buf->data, count);
62 if (count == buf->len) {
63 g_byte_array_free (buf, TRUE);
64 fstream->priv->buf = NULL;
66 memmove (buf->data, buf->data + count,
68 g_byte_array_set_size (buf, buf->len - count);
75 soup_filter_input_stream_read_fn (GInputStream *stream,
78 GCancellable *cancellable,
81 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
83 if (!fstream->priv->in_read_until)
84 fstream->priv->need_more = FALSE;
86 if (fstream->priv->buf && !fstream->priv->in_read_until) {
87 return read_from_buf (fstream, buffer, count);
89 return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
91 TRUE, cancellable, error);
96 soup_filter_input_stream_is_readable (GPollableInputStream *stream)
98 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
100 if (fstream->priv->buf && !fstream->priv->need_more)
103 return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream));
107 soup_filter_input_stream_read_nonblocking (GPollableInputStream *stream,
112 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
114 if (!fstream->priv->in_read_until)
115 fstream->priv->need_more = FALSE;
117 if (fstream->priv->buf && !fstream->priv->in_read_until) {
118 return read_from_buf (fstream, buffer, count);
120 return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
127 soup_filter_input_stream_create_source (GPollableInputStream *stream,
128 GCancellable *cancellable)
130 SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
131 GSource *base_source, *pollable_source;
133 if (fstream->priv->buf && !fstream->priv->need_more)
134 base_source = g_timeout_source_new (0);
136 base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream), cancellable);
138 g_source_set_dummy_callback (base_source);
139 pollable_source = g_pollable_source_new (G_OBJECT (stream));
140 g_source_add_child_source (pollable_source, base_source);
141 g_source_unref (base_source);
143 return pollable_source;
147 soup_filter_input_stream_class_init (SoupFilterInputStreamClass *stream_class)
149 GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
150 GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
152 g_type_class_add_private (stream_class, sizeof (SoupFilterInputStreamPrivate));
154 object_class->finalize = soup_filter_input_stream_finalize;
156 input_stream_class->read_fn = soup_filter_input_stream_read_fn;
160 soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
161 gpointer interface_data)
163 pollable_interface->is_readable = soup_filter_input_stream_is_readable;
164 pollable_interface->read_nonblocking = soup_filter_input_stream_read_nonblocking;
165 pollable_interface->create_source = soup_filter_input_stream_create_source;
169 soup_filter_input_stream_new (GInputStream *base_stream)
171 return g_object_new (SOUP_TYPE_FILTER_INPUT_STREAM,
172 "base-stream", base_stream,
173 "close-base-stream", FALSE,
178 soup_filter_input_stream_read_line (SoupFilterInputStream *fstream,
183 GCancellable *cancellable,
186 return soup_filter_input_stream_read_until (fstream, buffer, length,
193 soup_filter_input_stream_read_until (SoupFilterInputStream *fstream,
196 const void *boundary,
197 gsize boundary_length,
199 gboolean include_boundary,
200 gboolean *got_boundary,
201 GCancellable *cancellable,
205 guint8 *p, *buf, *end;
206 gboolean eof = FALSE;
207 GError *my_error = NULL;
209 g_return_val_if_fail (SOUP_IS_FILTER_INPUT_STREAM (fstream), -1);
210 g_return_val_if_fail (!include_boundary || (boundary_length < length), -1);
212 *got_boundary = FALSE;
213 fstream->priv->need_more = FALSE;
215 if (!fstream->priv->buf || fstream->priv->buf->len < boundary_length) {
219 if (!fstream->priv->buf)
220 fstream->priv->buf = g_byte_array_new ();
221 prev_len = fstream->priv->buf->len;
222 g_byte_array_set_size (fstream->priv->buf, length);
223 buf = fstream->priv->buf->data;
225 fstream->priv->in_read_until = TRUE;
226 nread = g_pollable_stream_read (G_INPUT_STREAM (fstream),
227 buf + prev_len, length - prev_len,
229 cancellable, &my_error);
230 fstream->priv->in_read_until = FALSE;
233 fstream->priv->buf->len = prev_len;
235 g_byte_array_free (fstream->priv->buf, TRUE);
236 fstream->priv->buf = NULL;
239 if (nread == 0 && prev_len)
242 if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
243 fstream->priv->need_more = TRUE;
245 g_propagate_error (error, my_error);
251 g_propagate_error (error, my_error);
253 fstream->priv->buf->len = prev_len + nread;
255 buf = fstream->priv->buf->data;
257 /* Scan for the boundary */
258 end = buf + fstream->priv->buf->len;
260 end -= boundary_length;
261 for (p = buf; p <= end; p++) {
262 if (*p == *(guint8*)boundary &&
263 !memcmp (p, boundary, boundary_length)) {
264 if (include_boundary)
265 p += boundary_length;
266 *got_boundary = TRUE;
271 if (!*got_boundary && fstream->priv->buf->len < length && !eof)
274 /* Return everything up to 'p' (which is either just after the boundary if
275 * include_boundary is TRUE, just before the boundary if include_boundary is
276 * FALSE, @boundary_len - 1 bytes before the end of the buffer, or end-of-
279 return read_from_buf (fstream, buffer, p - buf);