1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * soup-message-io.c: HTTP message I/O
5 * Copyright (C) 2000-2003, Ximian, Inc.
15 #include "soup-body-input-stream.h"
16 #include "soup-body-output-stream.h"
17 #include "soup-connection.h"
18 #include "soup-filter-input-stream.h"
19 #include "soup-message.h"
20 #include "soup-message-private.h"
21 #include "soup-message-queue.h"
22 #include "soup-misc.h"
23 #include "soup-socket.h"
26 SOUP_MESSAGE_IO_CLIENT,
27 SOUP_MESSAGE_IO_SERVER
31 SOUP_MESSAGE_IO_STATE_NOT_STARTED,
32 SOUP_MESSAGE_IO_STATE_HEADERS,
33 SOUP_MESSAGE_IO_STATE_BLOCKING,
34 SOUP_MESSAGE_IO_STATE_BODY_START,
35 SOUP_MESSAGE_IO_STATE_BODY,
36 SOUP_MESSAGE_IO_STATE_BODY_DATA,
37 SOUP_MESSAGE_IO_STATE_BODY_DONE,
38 SOUP_MESSAGE_IO_STATE_FINISHING,
39 SOUP_MESSAGE_IO_STATE_DONE
42 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
43 (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
44 state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
45 state != SOUP_MESSAGE_IO_STATE_DONE)
48 SoupMessageQueueItem *item;
49 SoupMessageIOMode mode;
50 GCancellable *cancellable;
53 SoupFilterInputStream *istream;
54 GInputStream *body_istream;
55 GOutputStream *ostream;
56 GOutputStream *body_ostream;
57 GMainContext *async_context;
60 SoupMessageIOState read_state;
61 SoupEncoding read_encoding;
62 GByteArray *read_header_buf;
63 SoupMessageBody *read_body;
66 gboolean need_content_sniffed, need_got_chunk;
67 SoupMessageBody *sniff_data;
69 SoupMessageIOState write_state;
70 SoupEncoding write_encoding;
72 SoupMessageBody *write_body;
73 SoupBuffer *write_chunk;
74 goffset write_body_offset;
79 GSource *unpause_source;
82 SoupMessageGetHeadersFn get_headers_cb;
83 SoupMessageParseHeadersFn parse_headers_cb;
85 SoupMessageCompletionFn completion_cb;
86 gpointer completion_data;
90 /* Put these around callback invocation if there is code afterward
91 * that depends on the IO having not been cancelled.
93 #define dummy_to_make_emacs_happy {
94 #define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
95 #define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
96 #define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
98 #define RESPONSE_BLOCK_SIZE 8192
101 soup_message_io_cleanup (SoupMessage *msg)
103 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
104 SoupMessageIOData *io;
106 soup_message_io_stop (msg);
111 priv->io_data = NULL;
114 g_object_unref (io->sock);
116 g_object_remove_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
118 g_object_remove_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
119 if (io->body_istream)
120 g_object_unref (io->body_istream);
121 if (io->body_ostream)
122 g_object_unref (io->body_ostream);
123 if (io->async_context)
124 g_main_context_unref (io->async_context);
126 soup_message_queue_item_unref (io->item);
128 g_byte_array_free (io->read_header_buf, TRUE);
130 g_string_free (io->write_buf, TRUE);
132 soup_buffer_free (io->write_chunk);
135 soup_message_body_free (io->sniff_data);
137 g_slice_free (SoupMessageIOData, io);
141 soup_message_io_stop (SoupMessage *msg)
143 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
144 SoupMessageIOData *io = priv->io_data;
150 g_source_destroy (io->io_source);
151 io->io_source = NULL;
154 if (io->unpause_source) {
155 g_source_destroy (io->unpause_source);
156 io->unpause_source = NULL;
159 if (io->read_state < SOUP_MESSAGE_IO_STATE_FINISHING)
160 soup_socket_disconnect (io->sock);
164 soup_message_io_finished (SoupMessage *msg)
166 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
167 SoupMessageIOData *io = priv->io_data;
168 SoupMessageCompletionFn completion_cb = io->completion_cb;
169 gpointer completion_data = io->completion_data;
172 soup_message_io_cleanup (msg);
174 completion_cb (msg, completion_data);
175 g_object_unref (msg);
179 request_is_idempotent (SoupMessage *msg)
182 return (msg->method == SOUP_METHOD_GET);
186 io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
188 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
189 SoupMessageIOData *io = priv->io_data;
191 if (error && error->domain == G_TLS_ERROR) {
192 soup_message_set_status_full (msg,
193 SOUP_STATUS_SSL_FAILED,
195 } else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
196 io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
197 io->read_header_buf->len == 0 &&
198 soup_connection_get_ever_used (io->item->conn) &&
199 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
200 request_is_idempotent (msg)) {
201 /* Connection got closed, but we can safely try again */
202 io->item->state = SOUP_MESSAGE_RESTARTING;
203 } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
204 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
207 g_error_free (error);
209 soup_message_io_finished (msg);
213 io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
215 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
216 SoupMessageIOData *io = priv->io_data;
217 SoupBuffer *sniffed_buffer;
218 char *sniffed_mime_type;
219 GHashTable *params = NULL;
224 if (!io->sniff_data) {
225 io->sniff_data = soup_message_body_new ();
226 io->need_content_sniffed = TRUE;
229 if (io->need_content_sniffed) {
230 if (io->sniff_data->length < priv->bytes_for_sniffing &&
234 io->need_content_sniffed = FALSE;
235 sniffed_buffer = soup_message_body_flatten (io->sniff_data);
236 sniffed_mime_type = soup_content_sniffer_sniff (priv->sniffer, msg, sniffed_buffer, ¶ms);
238 SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
239 soup_message_content_sniffed (msg, sniffed_mime_type, params);
240 g_free (sniffed_mime_type);
242 g_hash_table_destroy (params);
244 soup_buffer_free (sniffed_buffer);
245 SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
248 if (io->need_got_chunk) {
249 io->need_got_chunk = FALSE;
250 sniffed_buffer = soup_message_body_flatten (io->sniff_data);
252 SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
253 soup_message_got_chunk (msg, sniffed_buffer);
254 soup_buffer_free (sniffed_buffer);
255 SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
262 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
264 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
265 SoupMessageIOData *io = priv->io_data;
266 gssize nread, old_len;
270 old_len = io->read_header_buf->len;
271 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
272 nread = soup_filter_input_stream_read_line (io->istream,
273 io->read_header_buf->data + old_len,
278 io->read_header_buf->len = old_len + MAX (nread, 0);
280 io_error (io->sock, msg, NULL);
285 if (nread == 1 && old_len >= 2 &&
286 !strncmp ((char *)io->read_header_buf->data +
287 io->read_header_buf->len - 2,
290 else if (nread == 2 && old_len >= 3 &&
291 !strncmp ((char *)io->read_header_buf->data +
292 io->read_header_buf->len - 3,
298 /* We need to "rewind" io->read_header_buf back one line.
299 * That SHOULD be two characters (CR LF), but if the
300 * web server was stupid, it might only be one.
302 if (io->read_header_buf->len < 3 ||
303 io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
304 io->read_header_buf->len--;
306 io->read_header_buf->len -= 2;
307 io->read_header_buf->data[io->read_header_buf->len] = '\0';
313 content_decode_one (SoupBuffer *buf, GConverter *converter, GError **error)
315 gsize outbuf_length, outbuf_used, outbuf_cur, input_used, input_cur;
317 GConverterResult result;
318 gboolean dummy_zlib_header_used = FALSE;
320 outbuf_length = MAX (buf->length * 2, 1024);
321 outbuf = g_malloc (outbuf_length);
322 outbuf_cur = input_cur = 0;
325 result = g_converter_convert (
327 buf->data + input_cur, buf->length - input_cur,
328 outbuf + outbuf_cur, outbuf_length - outbuf_cur,
329 0, &input_used, &outbuf_used, error);
330 input_cur += input_used;
331 outbuf_cur += outbuf_used;
333 if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NO_SPACE) ||
334 (!*error && outbuf_cur == outbuf_length)) {
335 g_clear_error (error);
337 outbuf = g_realloc (outbuf, outbuf_length);
338 } else if (input_cur == 0 &&
339 !dummy_zlib_header_used &&
340 G_IS_ZLIB_DECOMPRESSOR (converter) &&
341 g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_INVALID_DATA)) {
343 GZlibCompressorFormat format;
344 g_object_get (G_OBJECT (converter), "format", &format, NULL);
346 if (format == G_ZLIB_COMPRESSOR_FORMAT_ZLIB) {
347 /* Some servers (especially Apache with mod_deflate)
348 * return RAW compressed data without the zlib headers
349 * when the client claims to support deflate. For
350 * those cases use a dummy header (stolen from
351 * Mozilla's nsHTTPCompressConv.cpp) and try to
352 * continue uncompressing data.
354 static char dummy_zlib_header[2] = { 0x78, 0x9C };
356 g_converter_reset (converter);
357 result = g_converter_convert (converter,
358 dummy_zlib_header, sizeof(dummy_zlib_header),
359 outbuf + outbuf_cur, outbuf_length - outbuf_cur,
360 0, &input_used, &outbuf_used, NULL);
361 dummy_zlib_header_used = TRUE;
362 if (result == G_CONVERTER_CONVERTED) {
363 g_clear_error (error);
372 /* GZlibDecompressor can't ever return
373 * G_IO_ERROR_PARTIAL_INPUT unless we pass it
374 * input_length = 0, which we don't. Other
375 * converters might of course, so eventually
376 * this code needs to be rewritten to deal
382 } while (input_cur < buf->length && result != G_CONVERTER_FINISHED);
385 return soup_buffer_new (SOUP_MEMORY_TAKE, outbuf, outbuf_cur);
393 content_decode (SoupMessage *msg, SoupBuffer *buf)
395 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
398 GError *error = NULL;
401 for (d = priv->decoders; d; d = d->next) {
404 decoded = content_decode_one (buf, decoder, &error);
406 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_FAILED))
407 g_warning ("Content-Decoding error: %s\n", error->message);
408 g_error_free (error);
410 soup_message_set_flags (msg, priv->msg_flags & ~SOUP_MESSAGE_CONTENT_DECODED);
414 soup_buffer_free (buf);
426 * There are two request/response formats: the basic request/response,
427 * possibly with one or more unsolicited informational responses (such
428 * as the WebDAV "102 Processing" response):
431 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
432 * W:BODY / R:NOT_STARTED -> R:BODY / W:NOT_STARTED
433 * [W:DONE / R:HEADERS (1xx) <- R:DONE / W:HEADERS (1xx) ...]
434 * W:DONE / R:HEADERS <- R:DONE / W:HEADERS
435 * W:DONE / R:BODY <- R:DONE / W:BODY
436 * W:DONE / R:DONE R:DONE / W:DONE
438 * and the "Expect: 100-continue" request/response, with the client
439 * blocking halfway through its request, and then either continuing or
440 * aborting, depending on the server response:
443 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
444 * W:BLOCKING / R:HEADERS <- R:BLOCKING / W:HEADERS
445 * [W:BODY / R:BLOCKING -> R:BODY / W:BLOCKING]
446 * [W:DONE / R:HEADERS <- R:DONE / W:HEADERS]
447 * W:DONE / R:BODY <- R:DONE / W:BODY
448 * W:DONE / R:DONE R:DONE / W:DONE
451 /* Attempts to push forward the writing side of @msg's I/O. Returns
452 * %TRUE if it manages to make some progress, and it is likely that
453 * further progress can be made. Returns %FALSE if it has reached a
454 * stopping point of some sort (need input from the application,
455 * socket not writable, write is complete, etc).
458 io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
460 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
461 SoupMessageIOData *io = priv->io_data;
465 switch (io->write_state) {
466 case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
467 case SOUP_MESSAGE_IO_STATE_BLOCKING:
471 case SOUP_MESSAGE_IO_STATE_HEADERS:
472 if (!io->write_buf->len) {
473 io->get_headers_cb (msg, io->write_buf,
478 while (io->written < io->write_buf->len) {
479 nwrote = g_pollable_stream_write (io->ostream,
480 io->write_buf->str + io->written,
481 io->write_buf->len - io->written,
486 io->written += nwrote;
490 g_string_truncate (io->write_buf, 0);
492 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
493 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
494 if (msg->status_code == SOUP_STATUS_CONTINUE) {
495 /* Stop and wait for the body now */
497 SOUP_MESSAGE_IO_STATE_BLOCKING;
498 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
500 /* We just wrote a 1xx response
501 * header, so stay in STATE_HEADERS.
502 * (The caller will pause us from the
503 * wrote_informational callback if he
504 * is not ready to send the final
509 soup_message_wrote_informational (msg);
510 soup_message_cleanup_response (msg);
514 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
515 SoupMessageHeaders *hdrs =
516 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
517 msg->request_headers : msg->response_headers;
518 io->write_length = soup_message_headers_get_content_length (hdrs);
521 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
522 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
523 /* Need to wait for the Continue response */
524 io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
525 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
527 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
529 /* If the client was waiting for a Continue
530 * but we sent something else, then they're
533 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
534 io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
535 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
538 soup_message_wrote_headers (msg);
542 case SOUP_MESSAGE_IO_STATE_BODY_START:
543 io->body_ostream = soup_body_output_stream_new (io->ostream,
546 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
550 case SOUP_MESSAGE_IO_STATE_BODY:
551 if (!io->write_length &&
552 io->write_encoding != SOUP_ENCODING_EOF &&
553 io->write_encoding != SOUP_ENCODING_CHUNKED) {
554 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
558 if (!io->write_chunk) {
559 io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
560 if (!io->write_chunk) {
561 soup_message_io_pause (msg);
564 if (!io->write_chunk->length) {
565 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
570 nwrote = g_pollable_stream_write (io->body_ostream,
571 io->write_chunk->data + io->written,
572 io->write_chunk->length - io->written,
578 chunk = soup_buffer_new_subbuffer (io->write_chunk,
579 io->written, nwrote);
580 io->written += nwrote;
581 if (io->write_length)
582 io->write_length -= nwrote;
584 if (io->written == io->write_chunk->length)
585 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
587 soup_message_wrote_body_data (msg, chunk);
588 soup_buffer_free (chunk);
592 case SOUP_MESSAGE_IO_STATE_BODY_DATA:
594 if (io->write_chunk->length == 0) {
595 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
599 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
600 priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
601 soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
602 io->write_body_offset += io->write_chunk->length;
603 soup_buffer_free (io->write_chunk);
604 io->write_chunk = NULL;
606 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
607 soup_message_wrote_chunk (msg);
611 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
612 if (io->body_ostream) {
613 if (!g_output_stream_close (io->body_ostream, cancellable, error))
615 g_clear_object (&io->body_ostream);
618 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
619 soup_message_wrote_body (msg);
623 case SOUP_MESSAGE_IO_STATE_FINISHING:
624 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
626 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
627 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
631 case SOUP_MESSAGE_IO_STATE_DONE:
633 g_return_val_if_reached (FALSE);
639 /* Attempts to push forward the reading side of @msg's I/O. Returns
640 * %TRUE if it manages to make some progress, and it is likely that
641 * further progress can be made. Returns %FALSE if it has reached a
642 * stopping point of some sort (need input from the application,
643 * socket not readable, read is complete, etc).
646 io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
648 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
649 SoupMessageIOData *io = priv->io_data;
650 guchar *stack_buf = NULL;
655 switch (io->read_state) {
656 case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
657 case SOUP_MESSAGE_IO_STATE_BLOCKING:
661 case SOUP_MESSAGE_IO_STATE_HEADERS:
662 if (!read_headers (msg, cancellable, error))
665 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
666 io->read_header_buf->len,
669 g_byte_array_set_size (io->read_header_buf, 0);
671 if (status != SOUP_STATUS_OK) {
672 /* Either we couldn't parse the headers, or they
673 * indicated something that would mean we wouldn't
674 * be able to parse the body. (Eg, unknown
675 * Transfer-Encoding.). Skip the rest of the
676 * reading, and make sure the connection gets
677 * closed when we're done.
679 soup_message_set_status (msg, status);
680 soup_message_headers_append (msg->request_headers,
681 "Connection", "close");
682 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
686 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
687 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
688 if (msg->status_code == SOUP_STATUS_CONTINUE &&
689 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
690 /* Pause the reader, unpause the writer */
692 SOUP_MESSAGE_IO_STATE_BLOCKING;
694 SOUP_MESSAGE_IO_STATE_BODY_START;
696 /* Just stay in HEADERS */
697 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
700 /* Informational responses have no bodies, so
701 * bail out here rather than parsing encoding, etc
703 soup_message_got_informational (msg);
704 soup_message_cleanup_response (msg);
706 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
707 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
708 /* The client requested a Continue response. The
709 * got_headers handler may change this to something
712 soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
713 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
714 io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
716 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
718 /* If the client was waiting for a Continue
719 * but got something else, then it's done
722 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
723 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
724 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
727 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
728 SoupMessageHeaders *hdrs =
729 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
730 msg->response_headers : msg->request_headers;
731 io->read_length = soup_message_headers_get_content_length (hdrs);
733 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
734 !soup_message_is_keepalive (msg)) {
735 /* Some servers suck and send
736 * incorrect Content-Length values, so
737 * allow EOF termination in this case
738 * (iff the message is too short) too.
740 io->read_encoding = SOUP_ENCODING_EOF;
743 io->read_length = -1;
745 io->body_istream = soup_body_input_stream_new (SOUP_FILTER_INPUT_STREAM (io->istream),
748 soup_message_got_headers (msg);
752 case SOUP_MESSAGE_IO_STATE_BODY:
753 if (!io_handle_sniffing (msg, FALSE))
756 if (priv->chunk_allocator) {
757 buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
759 soup_message_io_pause (msg);
764 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
765 buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
767 RESPONSE_BLOCK_SIZE);
770 nread = g_pollable_stream_read (io->body_istream,
771 (guchar *)buffer->data,
776 buffer->length = nread;
777 buffer = content_decode (msg, buffer);
781 soup_message_body_got_chunk (io->read_body, buffer);
783 if (io->need_content_sniffed) {
784 soup_message_body_append_buffer (io->sniff_data, buffer);
785 soup_buffer_free (buffer);
786 io->need_got_chunk = TRUE;
787 if (!io_handle_sniffing (msg, FALSE))
792 soup_message_got_chunk (msg, buffer);
793 soup_buffer_free (buffer);
797 soup_buffer_free (buffer);
801 /* else nread == 0 */
802 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
806 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
807 if (!io_handle_sniffing (msg, TRUE))
810 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
811 soup_message_got_body (msg);
815 case SOUP_MESSAGE_IO_STATE_FINISHING:
816 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
818 if (io->mode == SOUP_MESSAGE_IO_SERVER)
819 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
823 case SOUP_MESSAGE_IO_STATE_DONE:
825 g_return_val_if_reached (FALSE);
832 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
833 GSourceFunc callback, gpointer user_data)
835 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
836 SoupMessageIOData *io = priv->io_data;
839 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
840 source = g_pollable_input_stream_create_source (
841 G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
842 } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
843 source = g_pollable_output_stream_create_source (
844 G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
846 g_return_val_if_reached (NULL);
848 g_source_set_callback (source, callback, user_data, NULL);
852 static gboolean io_run (GObject *stream, SoupMessage *msg);
855 setup_io_source (SoupMessage *msg)
857 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
858 SoupMessageIOData *io = priv->io_data;
860 io->io_source = soup_message_io_get_source (msg, NULL,
861 (GSourceFunc)io_run, msg);
862 g_source_attach (io->io_source, io->async_context);
863 g_source_unref (io->io_source);
867 io_run (GObject *stream, SoupMessage *msg)
869 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
870 SoupMessageIOData *io = priv->io_data;
871 GError *error = NULL;
874 g_source_destroy (io->io_source);
875 io->io_source = NULL;
880 while (priv->io_data == io && !io->paused) {
881 gboolean progress = FALSE;
883 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
884 progress = io_read (msg, io->cancellable, &error);
885 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
886 progress = io_write (msg, io->cancellable, &error);
893 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
894 g_clear_error (&error);
895 setup_io_source (msg);
897 io_error (io->sock, msg, error);
898 } else if (priv->io_data == io &&
899 io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
900 io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
901 soup_message_io_finished (msg);
903 g_object_unref (msg);
908 static SoupMessageIOData *
909 new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
910 SoupMessageGetHeadersFn get_headers_cb,
911 SoupMessageParseHeadersFn parse_headers_cb,
912 gpointer header_data,
913 SoupMessageCompletionFn completion_cb,
914 gpointer completion_data)
916 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
917 SoupMessageIOData *io;
918 gboolean non_blocking, use_thread_context;
920 io = g_slice_new0 (SoupMessageIOData);
922 io->get_headers_cb = get_headers_cb;
923 io->parse_headers_cb = parse_headers_cb;
924 io->header_data = header_data;
925 io->completion_cb = completion_cb;
926 io->completion_data = completion_data;
928 io->sock = g_object_ref (sock);
929 io->istream = SOUP_FILTER_INPUT_STREAM (soup_socket_get_input_stream (sock));
931 g_object_add_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
932 io->ostream = soup_socket_get_output_stream (sock);
934 g_object_add_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
936 g_object_get (io->sock,
937 SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
938 SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
940 io->blocking = !non_blocking;
942 if (use_thread_context) {
943 io->async_context = g_main_context_get_thread_default ();
944 if (io->async_context)
945 g_main_context_ref (io->async_context);
947 g_object_get (io->sock,
948 SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
952 io->read_header_buf = g_byte_array_new ();
953 io->write_buf = g_string_new (NULL);
955 io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
956 io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
959 soup_message_io_cleanup (msg);
965 soup_message_io_client (SoupMessageQueueItem *item,
966 SoupMessageGetHeadersFn get_headers_cb,
967 SoupMessageParseHeadersFn parse_headers_cb,
968 gpointer header_data,
969 SoupMessageCompletionFn completion_cb,
970 gpointer completion_data)
972 SoupMessageIOData *io;
973 SoupSocket *sock = soup_connection_get_socket (item->conn);
975 io = new_iostate (item->msg, sock, SOUP_MESSAGE_IO_CLIENT,
976 get_headers_cb, parse_headers_cb, header_data,
977 completion_cb, completion_data);
980 soup_message_queue_item_ref (item);
981 io->cancellable = item->cancellable;
983 io->read_body = item->msg->response_body;
984 io->write_body = item->msg->request_body;
986 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
987 io_run (NULL, item->msg);
991 soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
992 SoupMessageGetHeadersFn get_headers_cb,
993 SoupMessageParseHeadersFn parse_headers_cb,
994 gpointer header_data,
995 SoupMessageCompletionFn completion_cb,
996 gpointer completion_data)
998 SoupMessageIOData *io;
1000 io = new_iostate (msg, sock, SOUP_MESSAGE_IO_SERVER,
1001 get_headers_cb, parse_headers_cb, header_data,
1002 completion_cb, completion_data);
1004 io->read_body = msg->request_body;
1005 io->write_body = msg->response_body;
1007 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1012 soup_message_io_pause (SoupMessage *msg)
1014 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1015 SoupMessageIOData *io = priv->io_data;
1017 g_return_if_fail (io != NULL);
1019 if (io->io_source) {
1020 g_source_destroy (io->io_source);
1021 io->io_source = NULL;
1024 if (io->unpause_source) {
1025 g_source_destroy (io->unpause_source);
1026 io->unpause_source = NULL;
1033 io_unpause_internal (gpointer msg)
1035 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1036 SoupMessageIOData *io = priv->io_data;
1038 g_return_val_if_fail (io != NULL, FALSE);
1039 io->unpause_source = NULL;
1050 soup_message_io_unpause (SoupMessage *msg)
1052 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1053 SoupMessageIOData *io = priv->io_data;
1055 g_return_if_fail (io != NULL);
1057 if (!io->blocking) {
1058 if (!io->unpause_source) {
1059 io->unpause_source = soup_add_completion (
1060 io->async_context, io_unpause_internal, msg);
1063 io_unpause_internal (msg);
1067 * soup_message_io_in_progress:
1068 * @msg: a #SoupMessage
1070 * Tests whether or not I/O is currently in progress on @msg.
1072 * Return value: whether or not I/O is currently in progress.
1075 soup_message_io_in_progress (SoupMessage *msg)
1077 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1079 return priv->io_data != NULL;