1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * soup-message-io.c: HTTP message I/O
5 * Copyright (C) 2000-2003, Ximian, Inc.
15 #include <glib/gi18n-lib.h>
17 #include "soup-body-input-stream.h"
18 #include "soup-body-output-stream.h"
19 #include "soup-client-input-stream.h"
20 #include "soup-connection.h"
21 #include "soup-content-sniffer-stream.h"
22 #include "soup-converter-wrapper.h"
23 #include "soup-filter-input-stream.h"
24 #include "soup-message.h"
25 #include "soup-message-private.h"
26 #include "soup-message-queue.h"
27 #include "soup-misc.h"
28 #include "soup-misc-private.h"
31 SOUP_MESSAGE_IO_CLIENT,
32 SOUP_MESSAGE_IO_SERVER
36 SOUP_MESSAGE_IO_STATE_NOT_STARTED,
37 SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
38 SOUP_MESSAGE_IO_STATE_HEADERS,
39 SOUP_MESSAGE_IO_STATE_BLOCKING,
40 SOUP_MESSAGE_IO_STATE_BODY_START,
41 SOUP_MESSAGE_IO_STATE_BODY,
42 SOUP_MESSAGE_IO_STATE_BODY_DATA,
43 SOUP_MESSAGE_IO_STATE_BODY_DONE,
44 SOUP_MESSAGE_IO_STATE_FINISHING,
45 SOUP_MESSAGE_IO_STATE_DONE
48 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
49 (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
50 state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
51 state != SOUP_MESSAGE_IO_STATE_DONE)
52 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
53 (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
54 state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
57 SoupMessageQueueItem *item;
58 SoupMessageIOMode mode;
59 GCancellable *cancellable;
62 SoupFilterInputStream *istream;
63 GInputStream *body_istream;
64 GOutputStream *ostream;
65 GOutputStream *body_ostream;
66 GMainContext *async_context;
69 SoupMessageIOState read_state;
70 SoupEncoding read_encoding;
71 GByteArray *read_header_buf;
72 SoupMessageBody *read_body;
75 SoupMessageIOState write_state;
76 SoupEncoding write_encoding;
78 SoupMessageBody *write_body;
79 SoupBuffer *write_chunk;
80 goffset write_body_offset;
85 GSource *unpause_source;
88 SoupMessageGetHeadersFn get_headers_cb;
89 SoupMessageParseHeadersFn parse_headers_cb;
91 SoupMessageCompletionFn completion_cb;
92 gpointer completion_data;
96 #define RESPONSE_BLOCK_SIZE 8192
99 soup_message_io_cleanup (SoupMessage *msg)
101 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
102 SoupMessageIOData *io;
104 soup_message_io_stop (msg);
109 priv->io_data = NULL;
112 g_object_unref (io->iostream);
113 if (io->body_istream)
114 g_object_unref (io->body_istream);
115 if (io->body_ostream)
116 g_object_unref (io->body_ostream);
117 if (io->async_context)
118 g_main_context_unref (io->async_context);
120 soup_message_queue_item_unref (io->item);
122 g_byte_array_free (io->read_header_buf, TRUE);
124 g_string_free (io->write_buf, TRUE);
126 soup_buffer_free (io->write_chunk);
128 g_slice_free (SoupMessageIOData, io);
132 soup_message_io_stop (SoupMessage *msg)
134 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
135 SoupMessageIOData *io = priv->io_data;
141 g_source_destroy (io->io_source);
142 g_source_unref (io->io_source);
143 io->io_source = NULL;
146 if (io->unpause_source) {
147 g_source_destroy (io->unpause_source);
148 g_source_unref (io->unpause_source);
149 io->unpause_source = NULL;
152 if (io->read_state < SOUP_MESSAGE_IO_STATE_FINISHING) {
153 if (io->item && io->item->conn)
154 soup_connection_disconnect (io->item->conn);
156 g_io_stream_close (io->iostream, NULL, NULL);
161 soup_message_io_finished (SoupMessage *msg)
163 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
164 SoupMessageIOData *io = priv->io_data;
165 SoupMessageCompletionFn completion_cb = io->completion_cb;
166 gpointer completion_data = io->completion_data;
169 soup_message_io_cleanup (msg);
171 completion_cb (msg, completion_data);
172 g_object_unref (msg);
176 request_is_idempotent (SoupMessage *msg)
179 return (msg->method == SOUP_METHOD_GET);
183 io_error (SoupMessage *msg, GError *error)
185 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
186 SoupMessageIOData *io = priv->io_data;
188 if (error && error->domain == G_TLS_ERROR) {
189 soup_message_set_status_full (msg,
190 SOUP_STATUS_SSL_FAILED,
192 } else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
193 io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
194 io->read_header_buf->len == 0 &&
195 soup_connection_get_ever_used (io->item->conn) &&
196 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
197 request_is_idempotent (msg)) {
198 /* Connection got closed, but we can safely try again */
199 io->item->state = SOUP_MESSAGE_RESTARTING;
200 } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
201 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
204 g_error_free (error);
206 soup_message_io_finished (msg);
210 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
212 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
213 SoupMessageIOData *io = priv->io_data;
214 gssize nread, old_len;
218 old_len = io->read_header_buf->len;
219 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
220 nread = soup_filter_input_stream_read_line (io->istream,
221 io->read_header_buf->data + old_len,
226 io->read_header_buf->len = old_len + MAX (nread, 0);
228 soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
229 g_set_error_literal (error, G_IO_ERROR,
230 G_IO_ERROR_PARTIAL_INPUT,
231 _("Connection terminated unexpectedly"));
237 if (nread == 1 && old_len >= 2 &&
238 !strncmp ((char *)io->read_header_buf->data +
239 io->read_header_buf->len - 2,
242 else if (nread == 2 && old_len >= 3 &&
243 !strncmp ((char *)io->read_header_buf->data +
244 io->read_header_buf->len - 3,
250 /* We need to "rewind" io->read_header_buf back one line.
251 * That SHOULD be two characters (CR LF), but if the
252 * web server was stupid, it might only be one.
254 if (io->read_header_buf->len < 3 ||
255 io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
256 io->read_header_buf->len--;
258 io->read_header_buf->len -= 2;
259 io->read_header_buf->data[io->read_header_buf->len] = '\0';
265 setup_body_istream (SoupMessage *msg)
267 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
268 SoupMessageIOData *io = priv->io_data;
269 GConverter *decoder, *wrapper;
270 GInputStream *filter;
274 soup_body_input_stream_new (io->istream,
278 for (d = priv->decoders; d; d = d->next) {
280 wrapper = soup_converter_wrapper_new (decoder, msg);
281 filter = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM,
282 "base-stream", io->body_istream,
283 "converter", wrapper,
285 g_object_unref (io->body_istream);
286 io->body_istream = filter;
290 filter = soup_content_sniffer_stream_new (priv->sniffer,
291 msg, io->body_istream);
292 g_object_unref (io->body_istream);
293 io->body_istream = filter;
298 * There are two request/response formats: the basic request/response,
299 * possibly with one or more unsolicited informational responses (such
300 * as the WebDAV "102 Processing" response):
303 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
304 * W:BODY / R:NOT_STARTED -> R:BODY / W:NOT_STARTED
305 * [W:DONE / R:HEADERS (1xx) <- R:DONE / W:HEADERS (1xx) ...]
306 * W:DONE / R:HEADERS <- R:DONE / W:HEADERS
307 * W:DONE / R:BODY <- R:DONE / W:BODY
308 * W:DONE / R:DONE R:DONE / W:DONE
310 * and the "Expect: 100-continue" request/response, with the client
311 * blocking halfway through its request, and then either continuing or
312 * aborting, depending on the server response:
315 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
316 * W:BLOCKING / R:HEADERS <- R:BLOCKING / W:HEADERS
317 * [W:BODY / R:BLOCKING -> R:BODY / W:BLOCKING]
318 * [W:DONE / R:HEADERS <- R:DONE / W:HEADERS]
319 * W:DONE / R:BODY <- R:DONE / W:BODY
320 * W:DONE / R:DONE R:DONE / W:DONE
323 /* Attempts to push forward the writing side of @msg's I/O. Returns
324 * %TRUE if it manages to make some progress, and it is likely that
325 * further progress can be made. Returns %FALSE if it has reached a
326 * stopping point of some sort (need input from the application,
327 * socket not writable, write is complete, etc).
330 io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
332 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
333 SoupMessageIOData *io = priv->io_data;
337 switch (io->write_state) {
338 case SOUP_MESSAGE_IO_STATE_HEADERS:
339 if (!io->write_buf->len) {
340 io->get_headers_cb (msg, io->write_buf,
345 while (io->written < io->write_buf->len) {
346 nwrote = g_pollable_stream_write (io->ostream,
347 io->write_buf->str + io->written,
348 io->write_buf->len - io->written,
353 io->written += nwrote;
357 g_string_truncate (io->write_buf, 0);
359 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
360 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
361 if (msg->status_code == SOUP_STATUS_CONTINUE) {
362 /* Stop and wait for the body now */
364 SOUP_MESSAGE_IO_STATE_BLOCKING;
365 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
367 /* We just wrote a 1xx response
368 * header, so stay in STATE_HEADERS.
369 * (The caller will pause us from the
370 * wrote_informational callback if he
371 * is not ready to send the final
376 soup_message_wrote_informational (msg);
377 soup_message_cleanup_response (msg);
381 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
382 SoupMessageHeaders *hdrs =
383 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
384 msg->request_headers : msg->response_headers;
385 io->write_length = soup_message_headers_get_content_length (hdrs);
388 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
389 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
390 /* Need to wait for the Continue response */
391 io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
392 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
394 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
396 /* If the client was waiting for a Continue
397 * but we sent something else, then they're
400 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
401 io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
402 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
405 soup_message_wrote_headers (msg);
409 case SOUP_MESSAGE_IO_STATE_BODY_START:
410 io->body_ostream = soup_body_output_stream_new (io->ostream,
413 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
417 case SOUP_MESSAGE_IO_STATE_BODY:
418 if (!io->write_length &&
419 io->write_encoding != SOUP_ENCODING_EOF &&
420 io->write_encoding != SOUP_ENCODING_CHUNKED) {
421 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
425 if (!io->write_chunk) {
426 io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
427 if (!io->write_chunk) {
428 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
429 soup_message_io_pause (msg);
432 if (!io->write_chunk->length) {
433 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
438 nwrote = g_pollable_stream_write (io->body_ostream,
439 io->write_chunk->data + io->written,
440 io->write_chunk->length - io->written,
446 chunk = soup_buffer_new_subbuffer (io->write_chunk,
447 io->written, nwrote);
448 io->written += nwrote;
449 if (io->write_length)
450 io->write_length -= nwrote;
452 if (io->written == io->write_chunk->length)
453 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
455 soup_message_wrote_body_data (msg, chunk);
456 soup_buffer_free (chunk);
460 case SOUP_MESSAGE_IO_STATE_BODY_DATA:
462 if (io->write_chunk->length == 0) {
463 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
467 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
468 priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
469 soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
470 io->write_body_offset += io->write_chunk->length;
471 soup_buffer_free (io->write_chunk);
472 io->write_chunk = NULL;
474 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
475 soup_message_wrote_chunk (msg);
479 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
480 if (io->body_ostream) {
481 if (!g_output_stream_close (io->body_ostream, cancellable, error))
483 g_clear_object (&io->body_ostream);
486 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
487 soup_message_wrote_body (msg);
491 case SOUP_MESSAGE_IO_STATE_FINISHING:
492 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
494 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
495 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
500 g_return_val_if_reached (FALSE);
506 /* Attempts to push forward the reading side of @msg's I/O. Returns
507 * %TRUE if it manages to make some progress, and it is likely that
508 * further progress can be made. Returns %FALSE if it has reached a
509 * stopping point of some sort (need input from the application,
510 * socket not readable, read is complete, etc).
513 io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
515 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
516 SoupMessageIOData *io = priv->io_data;
517 guchar *stack_buf = NULL;
522 switch (io->read_state) {
523 case SOUP_MESSAGE_IO_STATE_HEADERS:
524 if (!read_headers (msg, cancellable, error))
527 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
528 io->read_header_buf->len,
531 g_byte_array_set_size (io->read_header_buf, 0);
533 if (status != SOUP_STATUS_OK) {
534 /* Either we couldn't parse the headers, or they
535 * indicated something that would mean we wouldn't
536 * be able to parse the body. (Eg, unknown
537 * Transfer-Encoding.). Skip the rest of the
538 * reading, and make sure the connection gets
539 * closed when we're done.
541 soup_message_set_status (msg, status);
542 soup_message_headers_append (msg->request_headers,
543 "Connection", "close");
544 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
548 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
549 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
550 if (msg->status_code == SOUP_STATUS_CONTINUE &&
551 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
552 /* Pause the reader, unpause the writer */
554 SOUP_MESSAGE_IO_STATE_BLOCKING;
556 SOUP_MESSAGE_IO_STATE_BODY_START;
558 /* Just stay in HEADERS */
559 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
562 /* Informational responses have no bodies, so
563 * bail out here rather than parsing encoding, etc
565 soup_message_got_informational (msg);
566 soup_message_cleanup_response (msg);
568 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
569 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
570 /* The client requested a Continue response. The
571 * got_headers handler may change this to something
574 soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
575 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
576 io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
578 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
580 /* If the client was waiting for a Continue
581 * but got something else, then it's done
584 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
585 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
586 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
589 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
590 SoupMessageHeaders *hdrs =
591 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
592 msg->response_headers : msg->request_headers;
593 io->read_length = soup_message_headers_get_content_length (hdrs);
595 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
596 !soup_message_is_keepalive (msg)) {
597 /* Some servers suck and send
598 * incorrect Content-Length values, so
599 * allow EOF termination in this case
600 * (iff the message is too short) too.
602 io->read_encoding = SOUP_ENCODING_EOF;
605 io->read_length = -1;
607 soup_message_got_headers (msg);
611 case SOUP_MESSAGE_IO_STATE_BODY_START:
612 if (!io->body_istream)
613 setup_body_istream (msg);
616 SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
617 const char *content_type;
620 if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error))
623 content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms);
624 soup_message_content_sniffed (msg, content_type, params);
627 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
631 case SOUP_MESSAGE_IO_STATE_BODY:
632 if (priv->chunk_allocator) {
633 buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
635 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
636 soup_message_io_pause (msg);
641 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
642 buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
644 RESPONSE_BLOCK_SIZE);
647 nread = g_pollable_stream_read (io->body_istream,
648 (guchar *)buffer->data,
653 buffer->length = nread;
654 soup_message_body_got_chunk (io->read_body, buffer);
655 soup_message_got_chunk (msg, buffer);
656 soup_buffer_free (buffer);
660 soup_buffer_free (buffer);
664 /* else nread == 0 */
665 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
669 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
670 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
671 soup_message_got_body (msg);
675 case SOUP_MESSAGE_IO_STATE_FINISHING:
676 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
678 if (io->mode == SOUP_MESSAGE_IO_SERVER)
679 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
684 g_return_val_if_reached (FALSE);
697 message_source_check (GSource *source)
699 SoupMessageSource *message_source = (SoupMessageSource *)source;
701 if (message_source->paused) {
702 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (message_source->msg);
703 SoupMessageIOData *io = priv->io_data;
705 if (!io || io->paused)
714 message_source_prepare (GSource *source,
718 return message_source_check (source);
722 message_source_dispatch (GSource *source,
723 GSourceFunc callback,
726 SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
727 SoupMessageSource *message_source = (SoupMessageSource *)source;
729 return (*func) (message_source->msg, user_data);
733 message_source_finalize (GSource *source)
735 SoupMessageSource *message_source = (SoupMessageSource *)source;
737 g_object_unref (message_source->msg);
741 message_source_closure_callback (SoupMessage *msg,
744 GClosure *closure = data;
745 GValue param = G_VALUE_INIT;
746 GValue result_value = G_VALUE_INIT;
749 g_value_init (&result_value, G_TYPE_BOOLEAN);
751 g_value_init (¶m, SOUP_TYPE_MESSAGE);
752 g_value_set_object (¶m, msg);
754 g_closure_invoke (closure, &result_value, 1, ¶m, NULL);
756 result = g_value_get_boolean (&result_value);
757 g_value_unset (&result_value);
758 g_value_unset (¶m);
763 static GSourceFuncs message_source_funcs =
765 message_source_prepare,
766 message_source_check,
767 message_source_dispatch,
768 message_source_finalize,
769 (GSourceFunc)message_source_closure_callback,
770 (GSourceDummyMarshal)g_cclosure_marshal_generic,
774 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
775 SoupMessageSourceFunc callback, gpointer user_data)
777 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
778 SoupMessageIOData *io = priv->io_data;
779 GSource *base_source, *source;
780 SoupMessageSource *message_source;
783 base_source = g_timeout_source_new (0);
784 } else if (io->paused) {
786 } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
787 GPollableInputStream *istream;
789 if (io->body_istream)
790 istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
792 istream = G_POLLABLE_INPUT_STREAM (io->istream);
793 base_source = g_pollable_input_stream_create_source (istream, cancellable);
794 } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
795 GPollableOutputStream *ostream;
797 if (io->body_ostream)
798 ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
800 ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
801 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
803 base_source = g_timeout_source_new (0);
805 source = g_source_new (&message_source_funcs,
806 sizeof (SoupMessageSource));
807 g_source_set_name (source, "SoupMessageSource");
808 message_source = (SoupMessageSource *)source;
809 message_source->msg = g_object_ref (msg);
810 message_source->paused = io && io->paused;
813 g_source_set_dummy_callback (base_source);
814 g_source_add_child_source (source, base_source);
815 g_source_unref (base_source);
817 g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
822 io_run_until (SoupMessage *msg,
823 SoupMessageIOState read_state, SoupMessageIOState write_state,
824 GCancellable *cancellable, GError **error)
826 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
827 SoupMessageIOData *io = priv->io_data;
828 gboolean progress = TRUE, done;
829 GError *my_error = NULL;
831 if (g_cancellable_set_error_if_cancelled (cancellable, error))
834 g_set_error_literal (error, G_IO_ERROR,
835 G_IO_ERROR_CANCELLED,
836 _("Operation was cancelled"));
842 while (progress && priv->io_data == io && !io->paused &&
843 (io->read_state < read_state || io->write_state < write_state)) {
845 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
846 progress = io_read (msg, cancellable, &my_error);
847 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
848 progress = io_write (msg, cancellable, &my_error);
854 g_propagate_error (error, my_error);
855 g_object_unref (msg);
857 } else if (g_cancellable_set_error_if_cancelled (cancellable, error)) {
858 g_object_unref (msg);
860 } else if (priv->io_data != io) {
861 g_set_error_literal (error, G_IO_ERROR,
862 G_IO_ERROR_CANCELLED,
863 _("Operation was cancelled"));
864 g_object_unref (msg);
868 done = (io->read_state >= read_state &&
869 io->write_state >= write_state);
871 if (io->paused && !done) {
872 g_set_error_literal (error, G_IO_ERROR,
873 G_IO_ERROR_WOULD_BLOCK,
874 _("Operation would block"));
875 g_object_unref (msg);
879 g_object_unref (msg);
884 io_run (SoupMessage *msg, gpointer user_data)
886 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
887 SoupMessageIOData *io = priv->io_data;
888 GError *error = NULL;
889 GCancellable *cancellable;
892 g_source_destroy (io->io_source);
893 g_source_unref (io->io_source);
894 io->io_source = NULL;
898 cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
900 if (io_run_until (msg,
901 SOUP_MESSAGE_IO_STATE_DONE,
902 SOUP_MESSAGE_IO_STATE_DONE,
903 cancellable, &error)) {
904 soup_message_io_finished (msg);
905 } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
906 g_clear_error (&error);
907 io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
908 g_source_attach (io->io_source, io->async_context);
909 } else if (error && priv->io_data == io) {
910 io_error (msg, error);
913 g_object_unref (msg);
914 g_clear_object (&cancellable);
920 soup_message_io_run_until_write (SoupMessage *msg,
921 GCancellable *cancellable, GError **error)
923 return io_run_until (msg,
924 SOUP_MESSAGE_IO_STATE_ANY,
925 SOUP_MESSAGE_IO_STATE_BODY,
930 soup_message_io_run_until_read (SoupMessage *msg,
931 GCancellable *cancellable, GError **error)
933 return io_run_until (msg,
934 SOUP_MESSAGE_IO_STATE_BODY,
935 SOUP_MESSAGE_IO_STATE_ANY,
940 soup_message_io_run_until_finish (SoupMessage *msg,
941 GCancellable *cancellable,
946 if (!io_run_until (msg,
947 SOUP_MESSAGE_IO_STATE_DONE,
948 SOUP_MESSAGE_IO_STATE_DONE,
952 soup_message_io_finished (msg);
953 g_object_unref (msg);
958 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
960 SoupMessage *msg = user_data;
961 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
962 SoupMessageIOData *io = priv->io_data;
964 if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
965 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
969 soup_message_io_get_response_istream (SoupMessage *msg,
972 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
973 SoupMessageIOData *io = priv->io_data;
974 GInputStream *client_stream;
976 g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
978 if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
979 g_set_error_literal (error, SOUP_HTTP_ERROR,
980 msg->status_code, msg->reason_phrase);
984 client_stream = soup_client_input_stream_new (io->body_istream, msg);
985 g_signal_connect (client_stream, "eof",
986 G_CALLBACK (client_stream_eof), msg);
988 return client_stream;
992 static SoupMessageIOData *
993 new_iostate (SoupMessage *msg, GIOStream *iostream,
994 GMainContext *async_context, SoupMessageIOMode mode,
995 SoupMessageGetHeadersFn get_headers_cb,
996 SoupMessageParseHeadersFn parse_headers_cb,
997 gpointer header_data,
998 SoupMessageCompletionFn completion_cb,
999 gpointer completion_data)
1001 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1002 SoupMessageIOData *io;
1004 io = g_slice_new0 (SoupMessageIOData);
1006 io->get_headers_cb = get_headers_cb;
1007 io->parse_headers_cb = parse_headers_cb;
1008 io->header_data = header_data;
1009 io->completion_cb = completion_cb;
1010 io->completion_data = completion_data;
1012 io->iostream = g_object_ref (iostream);
1013 io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1014 io->ostream = g_io_stream_get_output_stream (iostream);
1016 if (async_context) {
1017 io->async_context = g_main_context_ref (async_context);
1018 io->blocking = FALSE;
1020 io->blocking = TRUE;
1022 io->read_header_buf = g_byte_array_new ();
1023 io->write_buf = g_string_new (NULL);
1025 io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1026 io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1029 soup_message_io_cleanup (msg);
1035 soup_message_io_client (SoupMessageQueueItem *item,
1036 GIOStream *iostream,
1037 GMainContext *async_context,
1038 SoupMessageGetHeadersFn get_headers_cb,
1039 SoupMessageParseHeadersFn parse_headers_cb,
1040 gpointer header_data,
1041 SoupMessageCompletionFn completion_cb,
1042 gpointer completion_data)
1044 SoupMessageIOData *io;
1046 io = new_iostate (item->msg, iostream, async_context,
1047 SOUP_MESSAGE_IO_CLIENT,
1048 get_headers_cb, parse_headers_cb, header_data,
1049 completion_cb, completion_data);
1052 soup_message_queue_item_ref (item);
1053 io->cancellable = item->cancellable;
1055 io->read_body = item->msg->response_body;
1056 io->write_body = item->msg->request_body;
1058 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1060 io_run (item->msg, NULL);
1064 soup_message_io_server (SoupMessage *msg,
1065 GIOStream *iostream, GMainContext *async_context,
1066 SoupMessageGetHeadersFn get_headers_cb,
1067 SoupMessageParseHeadersFn parse_headers_cb,
1068 gpointer header_data,
1069 SoupMessageCompletionFn completion_cb,
1070 gpointer completion_data)
1072 SoupMessageIOData *io;
1074 io = new_iostate (msg, iostream, async_context,
1075 SOUP_MESSAGE_IO_SERVER,
1076 get_headers_cb, parse_headers_cb, header_data,
1077 completion_cb, completion_data);
1079 io->read_body = msg->request_body;
1080 io->write_body = msg->response_body;
1082 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1087 soup_message_io_pause (SoupMessage *msg)
1089 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1090 SoupMessageIOData *io = priv->io_data;
1092 g_return_if_fail (io != NULL);
1094 if (io->item && io->item->new_api)
1095 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1097 if (io->io_source) {
1098 g_source_destroy (io->io_source);
1099 g_source_unref (io->io_source);
1100 io->io_source = NULL;
1103 if (io->unpause_source) {
1104 g_source_destroy (io->unpause_source);
1105 io->unpause_source = NULL;
1112 io_unpause_internal (gpointer msg)
1114 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1115 SoupMessageIOData *io = priv->io_data;
1117 g_return_val_if_fail (io != NULL, FALSE);
1118 io->unpause_source = NULL;
1129 soup_message_io_unpause (SoupMessage *msg)
1131 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1132 SoupMessageIOData *io = priv->io_data;
1134 g_return_if_fail (io != NULL);
1136 if (io->item && io->item->new_api) {
1137 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1142 if (!io->blocking) {
1143 if (!io->unpause_source) {
1144 io->unpause_source = soup_add_completion (
1145 io->async_context, io_unpause_internal, msg);
1148 io_unpause_internal (msg);
1152 * soup_message_io_in_progress:
1153 * @msg: a #SoupMessage
1155 * Tests whether or not I/O is currently in progress on @msg.
1157 * Return value: whether or not I/O is currently in progress.
1160 soup_message_io_in_progress (SoupMessage *msg)
1162 SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1164 return priv->io_data != NULL;