1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * soup-message-io.c: HTTP message I/O
5 * Copyright (C) 2000-2003, Ximian, Inc.
12 #include <glib/gi18n-lib.h>
15 #include <sysprof-capture.h>
19 #include "soup-body-input-stream.h"
20 #include "soup-body-output-stream.h"
21 #include "soup-client-input-stream.h"
22 #include "soup-connection.h"
23 #include "soup-content-processor.h"
24 #include "soup-content-sniffer-stream.h"
25 #include "soup-filter-input-stream.h"
26 #include "soup-message-private.h"
27 #include "soup-message-queue.h"
28 #include "soup-misc-private.h"
31 SOUP_MESSAGE_IO_CLIENT,
32 SOUP_MESSAGE_IO_SERVER
36 SOUP_MESSAGE_IO_STATE_NOT_STARTED,
37 SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
38 SOUP_MESSAGE_IO_STATE_HEADERS,
39 SOUP_MESSAGE_IO_STATE_BLOCKING,
40 SOUP_MESSAGE_IO_STATE_BODY_START,
41 SOUP_MESSAGE_IO_STATE_BODY,
42 SOUP_MESSAGE_IO_STATE_BODY_DATA,
43 SOUP_MESSAGE_IO_STATE_BODY_FLUSH,
44 SOUP_MESSAGE_IO_STATE_BODY_DONE,
45 SOUP_MESSAGE_IO_STATE_FINISHING,
46 SOUP_MESSAGE_IO_STATE_DONE
49 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
50 (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
51 state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
52 state != SOUP_MESSAGE_IO_STATE_DONE)
53 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
54 (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
55 state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
58 SoupMessageQueueItem *item;
59 SoupMessageIOMode mode;
60 GCancellable *cancellable;
63 SoupFilterInputStream *istream;
64 GInputStream *body_istream;
65 GOutputStream *ostream;
66 GOutputStream *body_ostream;
67 GMainContext *async_context;
69 SoupMessageIOState read_state;
70 SoupEncoding read_encoding;
71 GByteArray *read_header_buf;
72 SoupMessageBody *read_body;
75 SoupMessageIOState write_state;
76 SoupEncoding write_encoding;
78 SoupMessageBody *write_body;
79 SoupBuffer *write_chunk;
80 goffset write_body_offset;
85 GSource *unpause_source;
88 GCancellable *async_close_wait;
89 GError *async_close_error;
91 SoupMessageGetHeadersFn get_headers_cb;
92 SoupMessageParseHeadersFn parse_headers_cb;
94 SoupMessageCompletionFn completion_cb;
95 gpointer completion_data;
98 gint64 begin_time_nsec;
102 static void io_run (SoupMessage *msg, gboolean blocking);
104 #define RESPONSE_BLOCK_SIZE 8192
105 #define HEADER_SIZE_LIMIT (64 * 1024)
108 soup_message_io_cleanup (SoupMessage *msg)
110 SoupMessageIOData *io;
112 soup_message_io_stop (msg);
114 io = soup_message_get_io_data (msg);
117 soup_message_set_io_data (msg, NULL);
120 g_object_unref (io->iostream);
121 if (io->body_istream)
122 g_object_unref (io->body_istream);
123 if (io->body_ostream)
124 g_object_unref (io->body_ostream);
125 if (io->async_context)
126 g_main_context_unref (io->async_context);
128 soup_message_queue_item_unref (io->item);
130 g_byte_array_free (io->read_header_buf, TRUE);
132 g_string_free (io->write_buf, TRUE);
134 soup_buffer_free (io->write_chunk);
136 if (io->async_close_wait) {
137 g_cancellable_cancel (io->async_close_wait);
138 g_clear_object (&io->async_close_wait);
140 g_clear_error (&io->async_close_error);
142 g_slice_free (SoupMessageIOData, io);
146 soup_message_io_stop (SoupMessage *msg)
148 SoupMessageIOData *io = soup_message_get_io_data (msg);
154 g_source_destroy (io->io_source);
155 g_source_unref (io->io_source);
156 io->io_source = NULL;
159 if (io->unpause_source) {
160 g_source_destroy (io->unpause_source);
161 g_source_unref (io->unpause_source);
162 io->unpause_source = NULL;
167 soup_message_io_finished (SoupMessage *msg)
169 SoupMessageIOData *io;
170 SoupMessageCompletionFn completion_cb;
171 gpointer completion_data;
172 SoupMessageIOCompletion completion;
174 io = soup_message_get_io_data (msg);
178 completion_cb = io->completion_cb;
179 completion_data = io->completion_data;
181 if ((io->read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
182 io->write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
183 completion = SOUP_MESSAGE_IO_COMPLETE;
185 completion = SOUP_MESSAGE_IO_INTERRUPTED;
188 soup_message_io_cleanup (msg);
190 completion_cb (msg, completion, completion_data);
191 g_object_unref (msg);
195 soup_message_io_steal (SoupMessage *msg)
197 SoupMessageIOData *io;
198 SoupMessageCompletionFn completion_cb;
199 gpointer completion_data;
202 io = soup_message_get_io_data (msg);
203 if (!io || !io->iostream)
206 iostream = g_object_ref (io->iostream);
207 completion_cb = io->completion_cb;
208 completion_data = io->completion_data;
211 soup_message_io_cleanup (msg);
213 completion_cb (msg, SOUP_MESSAGE_IO_STOLEN, completion_data);
214 g_object_unref (msg);
220 read_headers (SoupMessage *msg, gboolean blocking,
221 GCancellable *cancellable, GError **error)
223 SoupMessageIOData *io;
224 gssize nread, old_len;
227 io = soup_message_get_io_data (msg);
229 old_len = io->read_header_buf->len;
230 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
231 nread = soup_filter_input_stream_read_line (io->istream,
232 io->read_header_buf->data + old_len,
237 io->read_header_buf->len = old_len + MAX (nread, 0);
239 if (io->read_header_buf->len > 0)
241 soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
242 g_set_error_literal (error, G_IO_ERROR,
243 G_IO_ERROR_PARTIAL_INPUT,
244 _("Connection terminated unexpectedly"));
250 if (nread == 1 && old_len >= 2 &&
251 !strncmp ((char *)io->read_header_buf->data +
252 io->read_header_buf->len - 2,
254 io->read_header_buf->len--;
256 } else if (nread == 2 && old_len >= 3 &&
257 !strncmp ((char *)io->read_header_buf->data +
258 io->read_header_buf->len - 3,
260 io->read_header_buf->len -= 2;
265 if (io->read_header_buf->len > HEADER_SIZE_LIMIT) {
266 soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
267 g_set_error_literal (error, G_IO_ERROR,
268 G_IO_ERROR_PARTIAL_INPUT,
269 _("Header too big"));
274 io->read_header_buf->data[io->read_header_buf->len] = '\0';
279 processing_stage_cmp (gconstpointer a,
282 SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a));
283 SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b));
285 if (stage_a > stage_b)
287 if (stage_a == stage_b)
293 soup_message_setup_body_istream (GInputStream *body_stream,
295 SoupSession *session,
296 SoupProcessingStage start_at_stage)
298 GInputStream *istream;
299 GSList *p, *processors;
301 istream = g_object_ref (body_stream);
303 processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
304 processors = g_slist_sort (processors, processing_stage_cmp);
306 for (p = processors; p; p = p->next) {
307 GInputStream *wrapper;
308 SoupContentProcessor *processor;
310 processor = SOUP_CONTENT_PROCESSOR (p->data);
311 if (soup_message_disables_feature (msg, p->data) ||
312 soup_content_processor_get_processing_stage (processor) < start_at_stage)
315 wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
317 g_object_unref (istream);
322 g_slist_free (processors);
328 closed_async (GObject *source,
329 GAsyncResult *result,
332 GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
333 SoupMessage *msg = user_data;
334 SoupMessageIOData *io;
335 GCancellable *async_close_wait;
337 io = soup_message_get_io_data (msg);
338 if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
339 g_object_unref (msg);
343 g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
344 g_clear_object (&io->body_ostream);
346 async_close_wait = io->async_close_wait;
347 io->async_close_wait = NULL;
348 g_cancellable_cancel (async_close_wait);
349 g_object_unref (async_close_wait);
351 g_object_unref (msg);
355 * There are two request/response formats: the basic request/response,
356 * possibly with one or more unsolicited informational responses (such
357 * as the WebDAV "102 Processing" response):
360 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
361 * W:BODY / R:NOT_STARTED -> R:BODY / W:NOT_STARTED
362 * [W:DONE / R:HEADERS (1xx) <- R:DONE / W:HEADERS (1xx) ...]
363 * W:DONE / R:HEADERS <- R:DONE / W:HEADERS
364 * W:DONE / R:BODY <- R:DONE / W:BODY
365 * W:DONE / R:DONE R:DONE / W:DONE
367 * and the "Expect: 100-continue" request/response, with the client
368 * blocking halfway through its request, and then either continuing or
369 * aborting, depending on the server response:
372 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
373 * W:BLOCKING / R:HEADERS <- R:BLOCKING / W:HEADERS
374 * [W:BODY / R:BLOCKING -> R:BODY / W:BLOCKING]
375 * [W:DONE / R:HEADERS <- R:DONE / W:HEADERS]
376 * W:DONE / R:BODY <- R:DONE / W:BODY
377 * W:DONE / R:DONE R:DONE / W:DONE
380 /* Attempts to push forward the writing side of @msg's I/O. Returns
381 * %TRUE if it manages to make some progress, and it is likely that
382 * further progress can be made. Returns %FALSE if it has reached a
383 * stopping point of some sort (need input from the application,
384 * socket not writable, write is complete, etc).
387 io_write (SoupMessage *msg, gboolean blocking,
388 GCancellable *cancellable, GError **error)
390 SoupMessageIOData *io = soup_message_get_io_data (msg);
394 if (io->async_close_error) {
395 g_propagate_error (error, io->async_close_error);
396 io->async_close_error = NULL;
398 } else if (io->async_close_wait) {
399 g_set_error_literal (error, G_IO_ERROR,
400 G_IO_ERROR_WOULD_BLOCK,
401 _("Operation would block"));
405 switch (io->write_state) {
406 case SOUP_MESSAGE_IO_STATE_HEADERS:
407 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
408 io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING &&
409 msg->status_code == 0) {
410 /* Client requested "Expect: 100-continue", and
411 * server did not set an error.
413 soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
416 if (!io->write_buf->len) {
417 io->get_headers_cb (msg, io->write_buf,
422 while (io->written < io->write_buf->len) {
423 nwrote = g_pollable_stream_write (io->ostream,
424 io->write_buf->str + io->written,
425 io->write_buf->len - io->written,
430 io->written += nwrote;
434 g_string_truncate (io->write_buf, 0);
436 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
437 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
438 if (msg->status_code == SOUP_STATUS_CONTINUE) {
439 /* Stop and wait for the body now */
441 SOUP_MESSAGE_IO_STATE_BLOCKING;
442 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
444 /* We just wrote a 1xx response
445 * header, so stay in STATE_HEADERS.
446 * (The caller will pause us from the
447 * wrote_informational callback if he
448 * is not ready to send the final
453 soup_message_wrote_informational (msg);
455 /* If this was "101 Switching Protocols", then
456 * the server probably stole the connection...
458 if (io != soup_message_get_io_data (msg))
461 soup_message_cleanup_response (msg);
465 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
466 SoupMessageHeaders *hdrs =
467 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
468 msg->request_headers : msg->response_headers;
469 io->write_length = soup_message_headers_get_content_length (hdrs);
472 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
473 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
474 /* Need to wait for the Continue response */
475 io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
476 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
478 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
480 /* If the client was waiting for a Continue
481 * but we sent something else, then they're
484 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
485 io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
486 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
489 soup_message_wrote_headers (msg);
493 case SOUP_MESSAGE_IO_STATE_BODY_START:
494 io->body_ostream = soup_body_output_stream_new (io->ostream,
497 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
501 case SOUP_MESSAGE_IO_STATE_BODY:
502 if (!io->write_length &&
503 io->write_encoding != SOUP_ENCODING_EOF &&
504 io->write_encoding != SOUP_ENCODING_CHUNKED) {
505 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
509 if (!io->write_chunk) {
510 io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
511 if (!io->write_chunk) {
512 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
513 soup_message_io_pause (msg);
516 if (!io->write_chunk->length) {
517 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
522 nwrote = g_pollable_stream_write (io->body_ostream,
523 io->write_chunk->data + io->written,
524 io->write_chunk->length - io->written,
530 chunk = soup_buffer_new_subbuffer (io->write_chunk,
531 io->written, nwrote);
532 io->written += nwrote;
533 if (io->write_length)
534 io->write_length -= nwrote;
536 if (io->written == io->write_chunk->length)
537 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
539 soup_message_wrote_body_data (msg, chunk);
540 soup_buffer_free (chunk);
544 case SOUP_MESSAGE_IO_STATE_BODY_DATA:
546 if (io->write_chunk->length == 0) {
547 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
551 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
552 soup_message_get_flags (msg) & SOUP_MESSAGE_CAN_REBUILD)
553 soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
554 io->write_body_offset += io->write_chunk->length;
555 soup_buffer_free (io->write_chunk);
556 io->write_chunk = NULL;
558 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
559 soup_message_wrote_chunk (msg);
563 case SOUP_MESSAGE_IO_STATE_BODY_FLUSH:
564 if (io->body_ostream) {
565 if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) {
566 if (!g_output_stream_close (io->body_ostream, cancellable, error))
568 g_clear_object (&io->body_ostream);
570 io->async_close_wait = g_cancellable_new ();
571 if (io->async_context)
572 g_main_context_push_thread_default (io->async_context);
573 g_output_stream_close_async (io->body_ostream,
574 G_PRIORITY_DEFAULT, cancellable,
575 closed_async, g_object_ref (msg));
576 if (io->async_context)
577 g_main_context_pop_thread_default (io->async_context);
581 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
585 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
586 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
587 soup_message_wrote_body (msg);
591 case SOUP_MESSAGE_IO_STATE_FINISHING:
592 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
594 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
595 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
600 g_return_val_if_reached (FALSE);
606 /* Attempts to push forward the reading side of @msg's I/O. Returns
607 * %TRUE if it manages to make some progress, and it is likely that
608 * further progress can be made. Returns %FALSE if it has reached a
609 * stopping point of some sort (need input from the application,
610 * socket not readable, read is complete, etc).
613 io_read (SoupMessage *msg, gboolean blocking,
614 GCancellable *cancellable, GError **error)
616 SoupMessageIOData *io = soup_message_get_io_data (msg);
617 guchar *stack_buf = NULL;
622 switch (io->read_state) {
623 case SOUP_MESSAGE_IO_STATE_HEADERS:
624 if (!read_headers (msg, blocking, cancellable, error))
627 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
628 io->read_header_buf->len,
630 io->header_data, error);
631 g_byte_array_set_size (io->read_header_buf, 0);
633 if (status != SOUP_STATUS_OK) {
634 /* Either we couldn't parse the headers, or they
635 * indicated something that would mean we wouldn't
636 * be able to parse the body. (Eg, unknown
637 * Transfer-Encoding.). Skip the rest of the
638 * reading, and make sure the connection gets
639 * closed when we're done.
641 soup_message_set_status (msg, status);
642 soup_message_headers_append (msg->request_headers,
643 "Connection", "close");
644 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
648 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
649 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
650 if (msg->status_code == SOUP_STATUS_CONTINUE &&
651 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
652 /* Pause the reader, unpause the writer */
654 SOUP_MESSAGE_IO_STATE_BLOCKING;
656 SOUP_MESSAGE_IO_STATE_BODY_START;
658 /* Just stay in HEADERS */
659 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
662 /* Informational responses have no bodies, so
663 * bail out here rather than parsing encoding, etc
665 soup_message_got_informational (msg);
667 /* If this was "101 Switching Protocols", then
668 * the session may have stolen the connection...
670 if (io != soup_message_get_io_data (msg))
673 soup_message_cleanup_response (msg);
675 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
676 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
677 /* We must return a status code and response
678 * headers to the client; either an error to
679 * be set by a got-headers handler below, or
680 * else %SOUP_STATUS_CONTINUE otherwise.
682 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
683 io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
685 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
687 /* If the client was waiting for a Continue
688 * but got something else, then it's done
691 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
692 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
693 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
696 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
697 SoupMessageHeaders *hdrs =
698 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
699 msg->response_headers : msg->request_headers;
700 io->read_length = soup_message_headers_get_content_length (hdrs);
702 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
703 !soup_message_is_keepalive (msg)) {
704 /* Some servers suck and send
705 * incorrect Content-Length values, so
706 * allow EOF termination in this case
707 * (iff the message is too short) too.
709 io->read_encoding = SOUP_ENCODING_EOF;
712 io->read_length = -1;
714 soup_message_got_headers (msg);
718 case SOUP_MESSAGE_IO_STATE_BODY_START:
719 if (!io->body_istream) {
720 GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
724 /* TODO: server-side messages do not have a io->item. This means
725 * that we cannot use content processors for them right now.
727 if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
728 io->body_istream = soup_message_setup_body_istream (body_istream, msg,
730 SOUP_STAGE_MESSAGE_BODY);
731 g_object_unref (body_istream);
733 io->body_istream = body_istream;
737 if (soup_message_get_content_sniffer (msg)) {
738 SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
739 const char *content_type;
742 if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
746 content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms);
747 soup_message_content_sniffed (msg, content_type, params);
750 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
754 case SOUP_MESSAGE_IO_STATE_BODY:
755 if (soup_message_has_chunk_allocator (msg)) {
756 buffer = soup_message_allocate_chunk (msg, io->read_length);
758 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
759 soup_message_io_pause (msg);
764 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
765 buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
767 RESPONSE_BLOCK_SIZE);
770 nread = g_pollable_stream_read (io->body_istream,
771 (guchar *)buffer->data,
776 buffer->length = nread;
777 soup_message_body_got_chunk (io->read_body, buffer);
778 soup_message_got_chunk (msg, buffer);
779 soup_buffer_free (buffer);
783 soup_buffer_free (buffer);
787 /* else nread == 0 */
788 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
792 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
793 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
794 soup_message_got_body (msg);
798 case SOUP_MESSAGE_IO_STATE_FINISHING:
799 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
801 if (io->mode == SOUP_MESSAGE_IO_SERVER)
802 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
807 g_return_val_if_reached (FALSE);
820 message_source_check (GSource *source)
822 SoupMessageSource *message_source = (SoupMessageSource *)source;
824 if (message_source->paused) {
825 SoupMessageIOData *io = soup_message_get_io_data (message_source->msg);
827 if (io && io->paused)
836 message_source_prepare (GSource *source,
840 return message_source_check (source);
844 message_source_dispatch (GSource *source,
845 GSourceFunc callback,
848 SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
849 SoupMessageSource *message_source = (SoupMessageSource *)source;
851 return (*func) (message_source->msg, user_data);
855 message_source_finalize (GSource *source)
857 SoupMessageSource *message_source = (SoupMessageSource *)source;
859 g_object_unref (message_source->msg);
863 message_source_closure_callback (SoupMessage *msg,
866 GClosure *closure = data;
867 GValue param = G_VALUE_INIT;
868 GValue result_value = G_VALUE_INIT;
871 g_value_init (&result_value, G_TYPE_BOOLEAN);
873 g_value_init (¶m, SOUP_TYPE_MESSAGE);
874 g_value_set_object (¶m, msg);
876 g_closure_invoke (closure, &result_value, 1, ¶m, NULL);
878 result = g_value_get_boolean (&result_value);
879 g_value_unset (&result_value);
880 g_value_unset (¶m);
885 static GSourceFuncs message_source_funcs =
887 message_source_prepare,
888 message_source_check,
889 message_source_dispatch,
890 message_source_finalize,
891 (GSourceFunc)message_source_closure_callback,
892 (GSourceDummyMarshal)g_cclosure_marshal_generic,
896 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
897 SoupMessageSourceFunc callback, gpointer user_data)
899 SoupMessageIOData *io = soup_message_get_io_data (msg);
900 GSource *base_source, *source;
901 SoupMessageSource *message_source;
904 base_source = g_timeout_source_new (0);
905 } else if (io->paused) {
907 } else if (io->async_close_wait) {
908 base_source = g_cancellable_source_new (io->async_close_wait);
909 } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
910 GPollableInputStream *istream;
912 if (io->body_istream)
913 istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
915 istream = G_POLLABLE_INPUT_STREAM (io->istream);
916 base_source = g_pollable_input_stream_create_source (istream, cancellable);
917 } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
918 GPollableOutputStream *ostream;
920 if (io->body_ostream)
921 ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
923 ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
924 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
926 base_source = g_timeout_source_new (0);
928 source = g_source_new (&message_source_funcs,
929 sizeof (SoupMessageSource));
930 g_source_set_name (source, "SoupMessageSource");
931 message_source = (SoupMessageSource *)source;
932 message_source->msg = g_object_ref (msg);
933 message_source->paused = io && io->paused;
936 g_source_set_dummy_callback (base_source);
937 g_source_add_child_source (source, base_source);
938 g_source_unref (base_source);
940 g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
945 request_is_restartable (SoupMessage *msg, GError *error)
947 SoupMessageIOData *io = soup_message_get_io_data (msg);
952 return (io->mode == SOUP_MESSAGE_IO_CLIENT &&
953 io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
954 io->read_header_buf->len == 0 &&
955 soup_connection_get_ever_used (io->item->conn) &&
956 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
957 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
958 error->domain != G_TLS_ERROR &&
959 SOUP_METHOD_IS_IDEMPOTENT (msg->method));
963 io_run_until (SoupMessage *msg, gboolean blocking,
964 SoupMessageIOState read_state, SoupMessageIOState write_state,
965 GCancellable *cancellable, GError **error)
967 SoupMessageIOData *io = soup_message_get_io_data (msg);
968 gboolean progress = TRUE, done;
969 GError *my_error = NULL;
971 if (g_cancellable_set_error_if_cancelled (cancellable, error))
974 g_set_error_literal (error, G_IO_ERROR,
975 G_IO_ERROR_CANCELLED,
976 _("Operation was cancelled"));
982 while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait &&
983 (io->read_state < read_state || io->write_state < write_state)) {
985 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
986 progress = io_read (msg, blocking, cancellable, &my_error);
987 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
988 progress = io_write (msg, blocking, cancellable, &my_error);
994 if (request_is_restartable (msg, my_error)) {
995 /* Connection got closed, but we can safely try again */
996 g_error_free (my_error);
997 g_set_error_literal (error, SOUP_HTTP_ERROR,
998 SOUP_STATUS_TRY_AGAIN, "");
999 g_object_unref (msg);
1003 g_propagate_error (error, my_error);
1004 g_object_unref (msg);
1006 } else if (soup_message_get_io_data (msg) != io) {
1007 g_set_error_literal (error, G_IO_ERROR,
1008 G_IO_ERROR_CANCELLED,
1009 _("Operation was cancelled"));
1010 g_object_unref (msg);
1012 } else if (!io->async_close_wait &&
1013 g_cancellable_set_error_if_cancelled (cancellable, error)) {
1014 g_object_unref (msg);
1018 done = (io->read_state >= read_state &&
1019 io->write_state >= write_state);
1021 if (!blocking && !done) {
1022 g_set_error_literal (error, G_IO_ERROR,
1023 G_IO_ERROR_WOULD_BLOCK,
1024 _("Operation would block"));
1025 g_object_unref (msg);
1030 /* Allow profiling of network requests. */
1031 if (io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
1032 io->write_state == SOUP_MESSAGE_IO_STATE_DONE) {
1033 SoupURI *uri = soup_message_get_uri (msg);
1034 char *uri_str = soup_uri_to_string (uri, FALSE);
1035 const gchar *last_modified = soup_message_headers_get_one (msg->request_headers, "Last-Modified");
1036 const gchar *etag = soup_message_headers_get_one (msg->request_headers, "ETag");
1038 /* FIXME: Expand and generalise sysprof support:
1039 * https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */
1040 sysprof_collector_mark_printf (io->begin_time_nsec, SYSPROF_CAPTURE_CURRENT_TIME - io->begin_time_nsec,
1041 "libsoup", "message",
1042 "%s request/response to %s: "
1043 "read %" G_GOFFSET_FORMAT "B, "
1044 "wrote %" G_GOFFSET_FORMAT "B, "
1045 "Last-Modified: %s, "
1047 soup_message_get_https_status (msg, NULL, NULL) ? "HTTPS" : "HTTP",
1048 uri_str, io->read_length, io->write_length,
1049 (last_modified != NULL) ? last_modified : "(unset)",
1050 (etag != NULL) ? etag : "(unset)");
1053 #endif /* HAVE_SYSPROF */
1055 g_object_unref (msg);
1060 io_run_ready (SoupMessage *msg, gpointer user_data)
1062 io_run (msg, FALSE);
1067 io_run (SoupMessage *msg, gboolean blocking)
1069 SoupMessageIOData *io = soup_message_get_io_data (msg);
1070 GError *error = NULL;
1071 GCancellable *cancellable;
1073 if (io->io_source) {
1074 g_source_destroy (io->io_source);
1075 g_source_unref (io->io_source);
1076 io->io_source = NULL;
1080 cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
1082 if (io_run_until (msg, blocking,
1083 SOUP_MESSAGE_IO_STATE_DONE,
1084 SOUP_MESSAGE_IO_STATE_DONE,
1085 cancellable, &error)) {
1086 soup_message_io_finished (msg);
1087 } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1088 g_clear_error (&error);
1089 io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
1090 g_source_attach (io->io_source, io->async_context);
1091 } else if (error && soup_message_get_io_data (msg) == io) {
1092 if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
1093 io->item->state = SOUP_MESSAGE_RESTARTING;
1094 else if (error->domain == G_TLS_ERROR) {
1095 soup_message_set_status_full (msg,
1096 SOUP_STATUS_SSL_FAILED,
1098 } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
1099 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
1101 g_error_free (error);
1102 soup_message_io_finished (msg);
1104 g_error_free (error);
1106 g_object_unref (msg);
1107 g_clear_object (&cancellable);
1111 soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
1112 GCancellable *cancellable, GError **error)
1114 return io_run_until (msg, blocking,
1115 SOUP_MESSAGE_IO_STATE_ANY,
1116 SOUP_MESSAGE_IO_STATE_BODY,
1117 cancellable, error);
1121 soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
1122 GCancellable *cancellable, GError **error)
1124 return io_run_until (msg, blocking,
1125 SOUP_MESSAGE_IO_STATE_BODY,
1126 SOUP_MESSAGE_IO_STATE_ANY,
1127 cancellable, error);
1131 soup_message_io_run_until_finish (SoupMessage *msg,
1133 GCancellable *cancellable,
1136 SoupMessageIOData *io = soup_message_get_io_data (msg);
1142 g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, FALSE);
1144 if (io->read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
1145 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
1148 success = io_run_until (msg, blocking,
1149 SOUP_MESSAGE_IO_STATE_DONE,
1150 SOUP_MESSAGE_IO_STATE_DONE,
1151 cancellable, error);
1153 g_object_unref (msg);
1158 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
1160 SoupMessage *msg = user_data;
1161 SoupMessageIOData *io = soup_message_get_io_data (msg);
1163 if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
1164 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
1168 soup_message_io_get_response_istream (SoupMessage *msg,
1171 SoupMessageIOData *io = soup_message_get_io_data (msg);
1172 GInputStream *client_stream;
1174 g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
1176 if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
1177 g_set_error_literal (error, SOUP_HTTP_ERROR,
1178 msg->status_code, msg->reason_phrase);
1182 client_stream = soup_client_input_stream_new (io->body_istream, msg);
1183 g_signal_connect (client_stream, "eof",
1184 G_CALLBACK (client_stream_eof), msg);
1186 return client_stream;
1190 static SoupMessageIOData *
1191 new_iostate (SoupMessage *msg, GIOStream *iostream,
1192 GMainContext *async_context, SoupMessageIOMode mode,
1193 SoupMessageGetHeadersFn get_headers_cb,
1194 SoupMessageParseHeadersFn parse_headers_cb,
1195 gpointer header_data,
1196 SoupMessageCompletionFn completion_cb,
1197 gpointer completion_data)
1199 SoupMessageIOData *io;
1201 io = g_slice_new0 (SoupMessageIOData);
1203 io->get_headers_cb = get_headers_cb;
1204 io->parse_headers_cb = parse_headers_cb;
1205 io->header_data = header_data;
1206 io->completion_cb = completion_cb;
1207 io->completion_data = completion_data;
1209 io->iostream = g_object_ref (iostream);
1210 io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1211 io->ostream = g_io_stream_get_output_stream (iostream);
1214 io->async_context = g_main_context_ref (async_context);
1216 io->read_header_buf = g_byte_array_new ();
1217 io->write_buf = g_string_new (NULL);
1219 io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1220 io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1222 if (soup_message_get_io_data (msg))
1223 soup_message_io_cleanup (msg);
1224 soup_message_set_io_data (msg, io);
1227 io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
1234 soup_message_io_client (SoupMessageQueueItem *item,
1235 GIOStream *iostream,
1236 GMainContext *async_context,
1237 SoupMessageGetHeadersFn get_headers_cb,
1238 SoupMessageParseHeadersFn parse_headers_cb,
1239 gpointer header_data,
1240 SoupMessageCompletionFn completion_cb,
1241 gpointer completion_data)
1243 SoupMessageIOData *io;
1245 io = new_iostate (item->msg, iostream, async_context,
1246 SOUP_MESSAGE_IO_CLIENT,
1247 get_headers_cb, parse_headers_cb, header_data,
1248 completion_cb, completion_data);
1251 soup_message_queue_item_ref (item);
1252 io->cancellable = item->cancellable;
1254 io->read_body = item->msg->response_body;
1255 io->write_body = item->msg->request_body;
1257 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1259 if (!item->new_api) {
1261 SOUP_IS_SESSION_SYNC (item->session) ||
1262 (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
1263 io_run (item->msg, blocking);
1268 soup_message_io_server (SoupMessage *msg,
1269 GIOStream *iostream, GMainContext *async_context,
1270 SoupMessageGetHeadersFn get_headers_cb,
1271 SoupMessageParseHeadersFn parse_headers_cb,
1272 gpointer header_data,
1273 SoupMessageCompletionFn completion_cb,
1274 gpointer completion_data)
1276 SoupMessageIOData *io;
1278 io = new_iostate (msg, iostream, async_context,
1279 SOUP_MESSAGE_IO_SERVER,
1280 get_headers_cb, parse_headers_cb, header_data,
1281 completion_cb, completion_data);
1283 io->read_body = msg->request_body;
1284 io->write_body = msg->response_body;
1286 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1287 io_run (msg, FALSE);
1291 soup_message_io_pause (SoupMessage *msg)
1293 SoupMessageIOData *io = soup_message_get_io_data (msg);
1295 g_return_if_fail (io != NULL);
1297 if (io->item && io->item->new_api)
1298 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1300 if (io->io_source) {
1301 g_source_destroy (io->io_source);
1302 g_source_unref (io->io_source);
1303 io->io_source = NULL;
1306 if (io->unpause_source) {
1307 g_source_destroy (io->unpause_source);
1308 g_source_unref (io->unpause_source);
1309 io->unpause_source = NULL;
1316 io_unpause_internal (gpointer msg)
1318 SoupMessageIOData *io = soup_message_get_io_data (msg);
1320 g_return_val_if_fail (io != NULL, FALSE);
1322 g_clear_pointer (&io->unpause_source, g_source_unref);
1328 io_run (msg, FALSE);
1333 soup_message_io_unpause (SoupMessage *msg)
1335 SoupMessageIOData *io = soup_message_get_io_data (msg);
1337 g_return_if_fail (io != NULL);
1339 if (io->item && io->item->new_api) {
1340 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1345 if (!io->unpause_source) {
1346 io->unpause_source = soup_add_completion_reffed (io->async_context,
1347 io_unpause_internal, msg, NULL);
1352 * soup_message_io_in_progress:
1353 * @msg: a #SoupMessage
1355 * Tests whether or not I/O is currently in progress on @msg.
1357 * Return value: whether or not I/O is currently in progress.
1360 soup_message_io_in_progress (SoupMessage *msg)
1362 return soup_message_get_io_data (msg) != NULL;