Remove extern "C" wrapping other includes
[platform/upstream/libsoup.git] / libsoup / soup-message-io.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-io.c: HTTP message I/O
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <glib/gi18n-lib.h>
13
14 #ifdef HAVE_SYSPROF
15 #include <sysprof-capture.h>
16 #endif
17
18 #include "soup.h"
19 #include "soup-body-input-stream.h"
20 #include "soup-body-output-stream.h"
21 #include "soup-client-input-stream.h"
22 #include "soup-connection.h"
23 #include "soup-content-processor.h"
24 #include "soup-content-sniffer-stream.h"
25 #include "soup-filter-input-stream.h"
26 #include "soup-message-private.h"
27 #include "soup-message-queue.h"
28 #include "soup-misc-private.h"
29
30 typedef enum {
31         SOUP_MESSAGE_IO_CLIENT,
32         SOUP_MESSAGE_IO_SERVER
33 } SoupMessageIOMode;
34
35 typedef enum {
36         SOUP_MESSAGE_IO_STATE_NOT_STARTED,
37         SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
38         SOUP_MESSAGE_IO_STATE_HEADERS,
39         SOUP_MESSAGE_IO_STATE_BLOCKING,
40         SOUP_MESSAGE_IO_STATE_BODY_START,
41         SOUP_MESSAGE_IO_STATE_BODY,
42         SOUP_MESSAGE_IO_STATE_BODY_DATA,
43         SOUP_MESSAGE_IO_STATE_BODY_FLUSH,
44         SOUP_MESSAGE_IO_STATE_BODY_DONE,
45         SOUP_MESSAGE_IO_STATE_FINISHING,
46         SOUP_MESSAGE_IO_STATE_DONE
47 } SoupMessageIOState;
48
49 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
50         (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
51          state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
52          state != SOUP_MESSAGE_IO_STATE_DONE)
53 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
54         (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
55          state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
56
57 typedef struct {
58         SoupMessageQueueItem *item;
59         SoupMessageIOMode     mode;
60         GCancellable         *cancellable;
61
62         GIOStream              *iostream;
63         SoupFilterInputStream  *istream;
64         GInputStream           *body_istream;
65         GOutputStream          *ostream;
66         GOutputStream          *body_ostream;
67         GMainContext           *async_context;
68
69         SoupMessageIOState    read_state;
70         SoupEncoding          read_encoding;
71         GByteArray           *read_header_buf;
72         SoupMessageBody      *read_body;
73         goffset               read_length;
74
75         SoupMessageIOState    write_state;
76         SoupEncoding          write_encoding;
77         GString              *write_buf;
78         SoupMessageBody      *write_body;
79         SoupBuffer           *write_chunk;
80         goffset               write_body_offset;
81         goffset               write_length;
82         goffset               written;
83
84         GSource *io_source;
85         GSource *unpause_source;
86         gboolean paused;
87
88         GCancellable *async_close_wait;
89         GError       *async_close_error;
90
91         SoupMessageGetHeadersFn   get_headers_cb;
92         SoupMessageParseHeadersFn parse_headers_cb;
93         gpointer                  header_data;
94         SoupMessageCompletionFn   completion_cb;
95         gpointer                  completion_data;
96
97 #ifdef HAVE_SYSPROF
98         gint64 begin_time_nsec;
99 #endif
100 } SoupMessageIOData;
101         
102 static void io_run (SoupMessage *msg, gboolean blocking);
103
104 #define RESPONSE_BLOCK_SIZE 8192
105 #define HEADER_SIZE_LIMIT (64 * 1024)
106
107 void
108 soup_message_io_cleanup (SoupMessage *msg)
109 {
110         SoupMessageIOData *io;
111
112         soup_message_io_stop (msg);
113
114         io = soup_message_get_io_data (msg);
115         if (!io)
116                 return;
117         soup_message_set_io_data (msg, NULL);
118
119         if (io->iostream)
120                 g_object_unref (io->iostream);
121         if (io->body_istream)
122                 g_object_unref (io->body_istream);
123         if (io->body_ostream)
124                 g_object_unref (io->body_ostream);
125         if (io->async_context)
126                 g_main_context_unref (io->async_context);
127         if (io->item)
128                 soup_message_queue_item_unref (io->item);
129
130         g_byte_array_free (io->read_header_buf, TRUE);
131
132         g_string_free (io->write_buf, TRUE);
133         if (io->write_chunk)
134                 soup_buffer_free (io->write_chunk);
135
136         if (io->async_close_wait) {
137                 g_cancellable_cancel (io->async_close_wait);
138                 g_clear_object (&io->async_close_wait);
139         }
140         g_clear_error (&io->async_close_error);
141
142         g_slice_free (SoupMessageIOData, io);
143 }
144
145 void
146 soup_message_io_stop (SoupMessage *msg)
147 {
148         SoupMessageIOData *io = soup_message_get_io_data (msg);
149
150         if (!io)
151                 return;
152
153         if (io->io_source) {
154                 g_source_destroy (io->io_source);
155                 g_source_unref (io->io_source);
156                 io->io_source = NULL;
157         }
158
159         if (io->unpause_source) {
160                 g_source_destroy (io->unpause_source);
161                 g_source_unref (io->unpause_source);
162                 io->unpause_source = NULL;
163         }
164 }
165
166 void
167 soup_message_io_finished (SoupMessage *msg)
168 {
169         SoupMessageIOData *io;
170         SoupMessageCompletionFn completion_cb;
171         gpointer completion_data;
172         SoupMessageIOCompletion completion;
173
174         io = soup_message_get_io_data (msg);
175         if (!io)
176                 return;
177
178         completion_cb = io->completion_cb;
179         completion_data = io->completion_data;
180
181         if ((io->read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
182              io->write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
183                 completion = SOUP_MESSAGE_IO_COMPLETE;
184         else
185                 completion = SOUP_MESSAGE_IO_INTERRUPTED;
186
187         g_object_ref (msg);
188         soup_message_io_cleanup (msg);
189         if (completion_cb)
190                 completion_cb (msg, completion, completion_data);
191         g_object_unref (msg);
192 }
193
194 GIOStream *
195 soup_message_io_steal (SoupMessage *msg)
196 {
197         SoupMessageIOData *io;
198         SoupMessageCompletionFn completion_cb;
199         gpointer completion_data;
200         GIOStream *iostream;
201
202         io = soup_message_get_io_data (msg);
203         if (!io || !io->iostream)
204                 return NULL;
205
206         iostream = g_object_ref (io->iostream);
207         completion_cb = io->completion_cb;
208         completion_data = io->completion_data;
209
210         g_object_ref (msg);
211         soup_message_io_cleanup (msg);
212         if (completion_cb)
213                 completion_cb (msg, SOUP_MESSAGE_IO_STOLEN, completion_data);
214         g_object_unref (msg);
215
216         return iostream;
217 }
218
219 static gboolean
220 read_headers (SoupMessage *msg, gboolean blocking,
221               GCancellable *cancellable, GError **error)
222 {
223         SoupMessageIOData *io;
224         gssize nread, old_len;
225         gboolean got_lf;
226
227         io = soup_message_get_io_data (msg);
228         while (1) {
229                 old_len = io->read_header_buf->len;
230                 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
231                 nread = soup_filter_input_stream_read_line (io->istream,
232                                                             io->read_header_buf->data + old_len,
233                                                             RESPONSE_BLOCK_SIZE,
234                                                             blocking,
235                                                             &got_lf,
236                                                             cancellable, error);
237                 io->read_header_buf->len = old_len + MAX (nread, 0);
238                 if (nread == 0) {
239                         if (io->read_header_buf->len > 0)
240                                 break;
241                         soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
242                         g_set_error_literal (error, G_IO_ERROR,
243                                              G_IO_ERROR_PARTIAL_INPUT,
244                                              _("Connection terminated unexpectedly"));
245                 }
246                 if (nread <= 0)
247                         return FALSE;
248
249                 if (got_lf) {
250                         if (nread == 1 && old_len >= 2 &&
251                             !strncmp ((char *)io->read_header_buf->data +
252                                       io->read_header_buf->len - 2,
253                                       "\n\n", 2)) {
254                                 io->read_header_buf->len--;
255                                 break;
256                         } else if (nread == 2 && old_len >= 3 &&
257                                  !strncmp ((char *)io->read_header_buf->data +
258                                            io->read_header_buf->len - 3,
259                                            "\n\r\n", 3)) {
260                                 io->read_header_buf->len -= 2;
261                                 break;
262                         }
263                 }
264
265                 if (io->read_header_buf->len > HEADER_SIZE_LIMIT) {
266                         soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
267                         g_set_error_literal (error, G_IO_ERROR,
268                                              G_IO_ERROR_PARTIAL_INPUT,
269                                              _("Header too big"));
270                         return FALSE;
271                 }
272         }
273
274         io->read_header_buf->data[io->read_header_buf->len] = '\0';
275         return TRUE;
276 }
277
278 static gint
279 processing_stage_cmp (gconstpointer a,
280                     gconstpointer b)
281 {
282         SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a));
283         SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b));
284
285         if (stage_a > stage_b)
286                 return 1;
287         if (stage_a == stage_b)
288                 return 0;
289         return -1;
290 }
291
292 GInputStream *
293 soup_message_setup_body_istream (GInputStream *body_stream,
294                                  SoupMessage *msg,
295                                  SoupSession *session,
296                                  SoupProcessingStage start_at_stage)
297 {
298         GInputStream *istream;
299         GSList *p, *processors;
300
301         istream = g_object_ref (body_stream);
302
303         processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
304         processors = g_slist_sort (processors, processing_stage_cmp);
305
306         for (p = processors; p; p = p->next) {
307                 GInputStream *wrapper;
308                 SoupContentProcessor *processor;
309
310                 processor = SOUP_CONTENT_PROCESSOR (p->data);
311                 if (soup_message_disables_feature (msg, p->data) ||
312                     soup_content_processor_get_processing_stage (processor) < start_at_stage)
313                         continue;
314
315                 wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
316                 if (wrapper) {
317                         g_object_unref (istream);
318                         istream = wrapper;
319                 }
320         }
321
322         g_slist_free (processors);
323
324         return istream;
325 }
326
327 static void
328 closed_async (GObject      *source,
329               GAsyncResult *result,
330               gpointer      user_data)
331 {
332         GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
333         SoupMessage *msg = user_data;
334         SoupMessageIOData *io;
335         GCancellable *async_close_wait;
336
337         io = soup_message_get_io_data (msg);
338         if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
339                 g_object_unref (msg);
340                 return;
341         }
342
343         g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
344         g_clear_object (&io->body_ostream);
345
346         async_close_wait = io->async_close_wait;
347         io->async_close_wait = NULL;
348         g_cancellable_cancel (async_close_wait);
349         g_object_unref (async_close_wait);
350
351         g_object_unref (msg);
352 }
353
354 /*
355  * There are two request/response formats: the basic request/response,
356  * possibly with one or more unsolicited informational responses (such
357  * as the WebDAV "102 Processing" response):
358  *
359  *     Client                            Server
360  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
361  *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
362  *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
363  *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
364  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
365  *      W:DONE     / R:DONE               R:DONE     / W:DONE
366  *     
367  * and the "Expect: 100-continue" request/response, with the client
368  * blocking halfway through its request, and then either continuing or
369  * aborting, depending on the server response:
370  *
371  *     Client                            Server
372  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
373  *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
374  *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
375  *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
376  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
377  *      W:DONE     / R:DONE               R:DONE     / W:DONE
378  */
379
380 /* Attempts to push forward the writing side of @msg's I/O. Returns
381  * %TRUE if it manages to make some progress, and it is likely that
382  * further progress can be made. Returns %FALSE if it has reached a
383  * stopping point of some sort (need input from the application,
384  * socket not writable, write is complete, etc).
385  */
386 static gboolean
387 io_write (SoupMessage *msg, gboolean blocking,
388           GCancellable *cancellable, GError **error)
389 {
390         SoupMessageIOData *io = soup_message_get_io_data (msg);
391         SoupBuffer *chunk;
392         gssize nwrote;
393
394         if (io->async_close_error) {
395                 g_propagate_error (error, io->async_close_error);
396                 io->async_close_error = NULL;
397                 return FALSE;
398         } else if (io->async_close_wait) {
399                 g_set_error_literal (error, G_IO_ERROR,
400                                      G_IO_ERROR_WOULD_BLOCK,
401                                      _("Operation would block"));
402                 return FALSE;
403         }
404
405         switch (io->write_state) {
406         case SOUP_MESSAGE_IO_STATE_HEADERS:
407                 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
408                     io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING &&
409                     msg->status_code == 0) {
410                         /* Client requested "Expect: 100-continue", and
411                          * server did not set an error.
412                          */
413                         soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
414                 }
415
416                 if (!io->write_buf->len) {
417                         io->get_headers_cb (msg, io->write_buf,
418                                             &io->write_encoding,
419                                             io->header_data);
420                 }
421
422                 while (io->written < io->write_buf->len) {
423                         nwrote = g_pollable_stream_write (io->ostream,
424                                                           io->write_buf->str + io->written,
425                                                           io->write_buf->len - io->written,
426                                                           blocking,
427                                                           cancellable, error);
428                         if (nwrote == -1)
429                                 return FALSE;
430                         io->written += nwrote;
431                 }
432
433                 io->written = 0;
434                 g_string_truncate (io->write_buf, 0);
435
436                 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
437                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
438                         if (msg->status_code == SOUP_STATUS_CONTINUE) {
439                                 /* Stop and wait for the body now */
440                                 io->write_state =
441                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
442                                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
443                         } else {
444                                 /* We just wrote a 1xx response
445                                  * header, so stay in STATE_HEADERS.
446                                  * (The caller will pause us from the
447                                  * wrote_informational callback if he
448                                  * is not ready to send the final
449                                  * response.)
450                                  */
451                         }
452
453                         soup_message_wrote_informational (msg);
454
455                         /* If this was "101 Switching Protocols", then
456                          * the server probably stole the connection...
457                          */
458                         if (io != soup_message_get_io_data (msg))
459                                 return FALSE;
460
461                         soup_message_cleanup_response (msg);
462                         break;
463                 }
464
465                 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
466                         SoupMessageHeaders *hdrs =
467                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
468                                 msg->request_headers : msg->response_headers;
469                         io->write_length = soup_message_headers_get_content_length (hdrs);
470                 }
471
472                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
473                     soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
474                         /* Need to wait for the Continue response */
475                         io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
476                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
477                 } else {
478                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
479
480                         /* If the client was waiting for a Continue
481                          * but we sent something else, then they're
482                          * now done writing.
483                          */
484                         if (io->mode == SOUP_MESSAGE_IO_SERVER &&
485                             io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
486                                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
487                 }
488
489                 soup_message_wrote_headers (msg);
490                 break;
491
492
493         case SOUP_MESSAGE_IO_STATE_BODY_START:
494                 io->body_ostream = soup_body_output_stream_new (io->ostream,
495                                                                 io->write_encoding,
496                                                                 io->write_length);
497                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
498                 break;
499
500
501         case SOUP_MESSAGE_IO_STATE_BODY:
502                 if (!io->write_length &&
503                     io->write_encoding != SOUP_ENCODING_EOF &&
504                     io->write_encoding != SOUP_ENCODING_CHUNKED) {
505                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
506                         break;
507                 }
508
509                 if (!io->write_chunk) {
510                         io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
511                         if (!io->write_chunk) {
512                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
513                                 soup_message_io_pause (msg);
514                                 return FALSE;
515                         }
516                         if (!io->write_chunk->length) {
517                                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
518                                 break;
519                         }
520                 }
521
522                 nwrote = g_pollable_stream_write (io->body_ostream,
523                                                   io->write_chunk->data + io->written,
524                                                   io->write_chunk->length - io->written,
525                                                   blocking,
526                                                   cancellable, error);
527                 if (nwrote == -1)
528                         return FALSE;
529
530                 chunk = soup_buffer_new_subbuffer (io->write_chunk,
531                                                    io->written, nwrote);
532                 io->written += nwrote;
533                 if (io->write_length)
534                         io->write_length -= nwrote;
535
536                 if (io->written == io->write_chunk->length)
537                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
538
539                 soup_message_wrote_body_data (msg, chunk);
540                 soup_buffer_free (chunk);
541                 break;
542
543
544         case SOUP_MESSAGE_IO_STATE_BODY_DATA:
545                 io->written = 0;
546                 if (io->write_chunk->length == 0) {
547                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
548                         break;
549                 }
550
551                 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
552                     soup_message_get_flags (msg) & SOUP_MESSAGE_CAN_REBUILD)
553                         soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
554                 io->write_body_offset += io->write_chunk->length;
555                 soup_buffer_free (io->write_chunk);
556                 io->write_chunk = NULL;
557
558                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
559                 soup_message_wrote_chunk (msg);
560                 break;
561
562
563         case SOUP_MESSAGE_IO_STATE_BODY_FLUSH:
564                 if (io->body_ostream) {
565                         if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) {
566                                 if (!g_output_stream_close (io->body_ostream, cancellable, error))
567                                         return FALSE;
568                                 g_clear_object (&io->body_ostream);
569                         } else {
570                                 io->async_close_wait = g_cancellable_new ();
571                                 if (io->async_context)
572                                         g_main_context_push_thread_default (io->async_context);
573                                 g_output_stream_close_async (io->body_ostream,
574                                                              G_PRIORITY_DEFAULT, cancellable,
575                                                              closed_async, g_object_ref (msg));
576                                 if (io->async_context)
577                                         g_main_context_pop_thread_default (io->async_context);
578                         }
579                 }
580
581                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
582                 break;
583
584
585         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
586                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
587                 soup_message_wrote_body (msg);
588                 break;
589
590
591         case SOUP_MESSAGE_IO_STATE_FINISHING:
592                 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
593
594                 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
595                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
596                 break;
597
598
599         default:
600                 g_return_val_if_reached (FALSE);
601         }
602
603         return TRUE;
604 }
605
606 /* Attempts to push forward the reading side of @msg's I/O. Returns
607  * %TRUE if it manages to make some progress, and it is likely that
608  * further progress can be made. Returns %FALSE if it has reached a
609  * stopping point of some sort (need input from the application,
610  * socket not readable, read is complete, etc).
611  */
612 static gboolean
613 io_read (SoupMessage *msg, gboolean blocking,
614          GCancellable *cancellable, GError **error)
615 {
616         SoupMessageIOData *io = soup_message_get_io_data (msg);
617         guchar *stack_buf = NULL;
618         gssize nread;
619         SoupBuffer *buffer;
620         guint status;
621
622         switch (io->read_state) {
623         case SOUP_MESSAGE_IO_STATE_HEADERS:
624                 if (!read_headers (msg, blocking, cancellable, error))
625                         return FALSE;
626
627                 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
628                                                io->read_header_buf->len,
629                                                &io->read_encoding,
630                                                io->header_data, error);
631                 g_byte_array_set_size (io->read_header_buf, 0);
632
633                 if (status != SOUP_STATUS_OK) {
634                         /* Either we couldn't parse the headers, or they
635                          * indicated something that would mean we wouldn't
636                          * be able to parse the body. (Eg, unknown
637                          * Transfer-Encoding.). Skip the rest of the
638                          * reading, and make sure the connection gets
639                          * closed when we're done.
640                          */
641                         soup_message_set_status (msg, status);
642                         soup_message_headers_append (msg->request_headers,
643                                                      "Connection", "close");
644                         io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
645                         break;
646                 }
647
648                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
649                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
650                         if (msg->status_code == SOUP_STATUS_CONTINUE &&
651                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
652                                 /* Pause the reader, unpause the writer */
653                                 io->read_state =
654                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
655                                 io->write_state =
656                                         SOUP_MESSAGE_IO_STATE_BODY_START;
657                         } else {
658                                 /* Just stay in HEADERS */
659                                 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
660                         }
661
662                         /* Informational responses have no bodies, so
663                          * bail out here rather than parsing encoding, etc
664                          */
665                         soup_message_got_informational (msg);
666
667                         /* If this was "101 Switching Protocols", then
668                          * the session may have stolen the connection...
669                          */
670                         if (io != soup_message_get_io_data (msg))
671                                 return FALSE;
672
673                         soup_message_cleanup_response (msg);
674                         break;
675                 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
676                            soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
677                         /* We must return a status code and response
678                          * headers to the client; either an error to
679                          * be set by a got-headers handler below, or
680                          * else %SOUP_STATUS_CONTINUE otherwise.
681                          */
682                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
683                         io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
684                 } else {
685                         io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
686
687                         /* If the client was waiting for a Continue
688                          * but got something else, then it's done
689                          * writing.
690                          */
691                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
692                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
693                                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
694                 }
695
696                 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
697                         SoupMessageHeaders *hdrs =
698                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
699                                 msg->response_headers : msg->request_headers;
700                         io->read_length = soup_message_headers_get_content_length (hdrs);
701
702                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
703                             !soup_message_is_keepalive (msg)) {
704                                 /* Some servers suck and send
705                                  * incorrect Content-Length values, so
706                                  * allow EOF termination in this case
707                                  * (iff the message is too short) too.
708                                  */
709                                 io->read_encoding = SOUP_ENCODING_EOF;
710                         }
711                 } else
712                         io->read_length = -1;
713
714                 soup_message_got_headers (msg);
715                 break;
716
717
718         case SOUP_MESSAGE_IO_STATE_BODY_START:
719                 if (!io->body_istream) {
720                         GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
721                                                                                  io->read_encoding,
722                                                                                  io->read_length);
723
724                         /* TODO: server-side messages do not have a io->item. This means
725                          * that we cannot use content processors for them right now.
726                          */
727                         if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
728                                 io->body_istream = soup_message_setup_body_istream (body_istream, msg,
729                                                                                     io->item->session,
730                                                                                     SOUP_STAGE_MESSAGE_BODY);
731                                 g_object_unref (body_istream);
732                         } else {
733                                 io->body_istream = body_istream;
734                         }
735                 }
736
737                 if (soup_message_get_content_sniffer (msg)) {
738                         SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
739                         const char *content_type;
740                         GHashTable *params;
741
742                         if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
743                                                                    cancellable, error))
744                                 return FALSE;
745
746                         content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
747                         soup_message_content_sniffed (msg, content_type, params);
748                 }
749
750                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
751                 break;
752
753
754         case SOUP_MESSAGE_IO_STATE_BODY:
755                 if (soup_message_has_chunk_allocator (msg)) {
756                         buffer = soup_message_allocate_chunk (msg, io->read_length);
757                         if (!buffer) {
758                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
759                                 soup_message_io_pause (msg);
760                                 return FALSE;
761                         }
762                 } else {
763                         if (!stack_buf)
764                                 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
765                         buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
766                                                   stack_buf,
767                                                   RESPONSE_BLOCK_SIZE);
768                 }
769
770                 nread = g_pollable_stream_read (io->body_istream,
771                                                 (guchar *)buffer->data,
772                                                 buffer->length,
773                                                 blocking,
774                                                 cancellable, error);
775                 if (nread > 0) {
776                         buffer->length = nread;
777                         soup_message_body_got_chunk (io->read_body, buffer);
778                         soup_message_got_chunk (msg, buffer);
779                         soup_buffer_free (buffer);
780                         break;
781                 }
782
783                 soup_buffer_free (buffer);
784                 if (nread == -1)
785                         return FALSE;
786
787                 /* else nread == 0 */
788                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
789                 break;
790
791
792         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
793                 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
794                 soup_message_got_body (msg);
795                 break;
796
797
798         case SOUP_MESSAGE_IO_STATE_FINISHING:
799                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
800
801                 if (io->mode == SOUP_MESSAGE_IO_SERVER)
802                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
803                 break;
804
805
806         default:
807                 g_return_val_if_reached (FALSE);
808         }
809
810         return TRUE;
811 }
812
813 typedef struct {
814         GSource source;
815         SoupMessage *msg;
816         gboolean paused;
817 } SoupMessageSource;
818
819 static gboolean
820 message_source_check (GSource *source)
821 {
822         SoupMessageSource *message_source = (SoupMessageSource *)source;
823
824         if (message_source->paused) {
825                 SoupMessageIOData *io = soup_message_get_io_data (message_source->msg);
826
827                 if (io && io->paused)
828                         return FALSE;
829                 else
830                         return TRUE;
831         } else
832                 return FALSE;
833 }
834
835 static gboolean
836 message_source_prepare (GSource *source,
837                         gint    *timeout)
838 {
839         *timeout = -1;
840         return message_source_check (source);
841 }
842
843 static gboolean
844 message_source_dispatch (GSource     *source,
845                          GSourceFunc  callback,
846                          gpointer     user_data)
847 {
848         SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
849         SoupMessageSource *message_source = (SoupMessageSource *)source;
850
851         return (*func) (message_source->msg, user_data);
852 }
853
854 static void
855 message_source_finalize (GSource *source)
856 {
857         SoupMessageSource *message_source = (SoupMessageSource *)source;
858
859         g_object_unref (message_source->msg);
860 }
861
862 static gboolean
863 message_source_closure_callback (SoupMessage *msg,
864                                  gpointer     data)
865 {
866         GClosure *closure = data;
867         GValue param = G_VALUE_INIT;
868         GValue result_value = G_VALUE_INIT;
869         gboolean result;
870
871         g_value_init (&result_value, G_TYPE_BOOLEAN);
872
873         g_value_init (&param, SOUP_TYPE_MESSAGE);
874         g_value_set_object (&param, msg);
875
876         g_closure_invoke (closure, &result_value, 1, &param, NULL);
877
878         result = g_value_get_boolean (&result_value);
879         g_value_unset (&result_value);
880         g_value_unset (&param);
881
882         return result;
883 }
884
885 static GSourceFuncs message_source_funcs =
886 {
887         message_source_prepare,
888         message_source_check,
889         message_source_dispatch,
890         message_source_finalize,
891         (GSourceFunc)message_source_closure_callback,
892         (GSourceDummyMarshal)g_cclosure_marshal_generic,
893 };
894
895 GSource *
896 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
897                             SoupMessageSourceFunc callback, gpointer user_data)
898 {
899         SoupMessageIOData *io = soup_message_get_io_data (msg);
900         GSource *base_source, *source;
901         SoupMessageSource *message_source;
902
903         if (!io) {
904                 base_source = g_timeout_source_new (0);
905         } else if (io->paused) {
906                 base_source = NULL;
907         } else if (io->async_close_wait) {
908                 base_source = g_cancellable_source_new (io->async_close_wait);
909         } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
910                 GPollableInputStream *istream;
911
912                 if (io->body_istream)
913                         istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
914                 else
915                         istream = G_POLLABLE_INPUT_STREAM (io->istream);
916                 base_source = g_pollable_input_stream_create_source (istream, cancellable);
917         } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
918                 GPollableOutputStream *ostream;
919
920                 if (io->body_ostream)
921                         ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
922                 else
923                         ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
924                 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
925         } else
926                 base_source = g_timeout_source_new (0);
927
928         source = g_source_new (&message_source_funcs,
929                                sizeof (SoupMessageSource));
930         g_source_set_name (source, "SoupMessageSource");
931         message_source = (SoupMessageSource *)source;
932         message_source->msg = g_object_ref (msg);
933         message_source->paused = io && io->paused;
934
935         if (base_source) {
936                 g_source_set_dummy_callback (base_source);
937                 g_source_add_child_source (source, base_source);
938                 g_source_unref (base_source);
939         }
940         g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
941         return source;
942 }
943
944 static gboolean
945 request_is_restartable (SoupMessage *msg, GError *error)
946 {
947         SoupMessageIOData *io = soup_message_get_io_data (msg);
948
949         if (!io)
950                 return FALSE;
951
952         return (io->mode == SOUP_MESSAGE_IO_CLIENT &&
953                 io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
954                 io->read_header_buf->len == 0 &&
955                 soup_connection_get_ever_used (io->item->conn) &&
956                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
957                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
958                 error->domain != G_TLS_ERROR &&
959                 SOUP_METHOD_IS_IDEMPOTENT (msg->method));
960 }
961
962 static gboolean
963 io_run_until (SoupMessage *msg, gboolean blocking,
964               SoupMessageIOState read_state, SoupMessageIOState write_state,
965               GCancellable *cancellable, GError **error)
966 {
967         SoupMessageIOData *io = soup_message_get_io_data (msg);
968         gboolean progress = TRUE, done;
969         GError *my_error = NULL;
970
971         if (g_cancellable_set_error_if_cancelled (cancellable, error))
972                 return FALSE;
973         else if (!io) {
974                 g_set_error_literal (error, G_IO_ERROR,
975                                      G_IO_ERROR_CANCELLED,
976                                      _("Operation was cancelled"));
977                 return FALSE;
978         }
979
980         g_object_ref (msg);
981
982         while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait &&
983                (io->read_state < read_state || io->write_state < write_state)) {
984
985                 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
986                         progress = io_read (msg, blocking, cancellable, &my_error);
987                 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
988                         progress = io_write (msg, blocking, cancellable, &my_error);
989                 else
990                         progress = FALSE;
991         }
992
993         if (my_error) {
994                 if (request_is_restartable (msg, my_error)) {
995                         /* Connection got closed, but we can safely try again */
996                         g_error_free (my_error);
997                         g_set_error_literal (error, SOUP_HTTP_ERROR,
998                                              SOUP_STATUS_TRY_AGAIN, "");
999                         g_object_unref (msg);
1000                         return FALSE;
1001                 }
1002
1003                 g_propagate_error (error, my_error);
1004                 g_object_unref (msg);
1005                 return FALSE;
1006         } else if (soup_message_get_io_data (msg) != io) {
1007                 g_set_error_literal (error, G_IO_ERROR,
1008                                      G_IO_ERROR_CANCELLED,
1009                                      _("Operation was cancelled"));
1010                 g_object_unref (msg);
1011                 return FALSE;
1012         } else if (!io->async_close_wait &&
1013                    g_cancellable_set_error_if_cancelled (cancellable, error)) {
1014                 g_object_unref (msg);
1015                 return FALSE;
1016         }
1017
1018         done = (io->read_state >= read_state &&
1019                 io->write_state >= write_state);
1020
1021         if (!blocking && !done) {
1022                 g_set_error_literal (error, G_IO_ERROR,
1023                                      G_IO_ERROR_WOULD_BLOCK,
1024                                      _("Operation would block"));
1025                 g_object_unref (msg);
1026                 return FALSE;
1027         }
1028
1029 #ifdef HAVE_SYSPROF
1030         /* Allow profiling of network requests. */
1031         if (io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
1032             io->write_state == SOUP_MESSAGE_IO_STATE_DONE) {
1033                 SoupURI *uri = soup_message_get_uri (msg);
1034                 char *uri_str = soup_uri_to_string (uri, FALSE);
1035                 const gchar *last_modified = soup_message_headers_get_one (msg->request_headers, "Last-Modified");
1036                 const gchar *etag = soup_message_headers_get_one (msg->request_headers, "ETag");
1037
1038                 /* FIXME: Expand and generalise sysprof support:
1039                  * https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */
1040                 sysprof_collector_mark_printf (io->begin_time_nsec, SYSPROF_CAPTURE_CURRENT_TIME - io->begin_time_nsec,
1041                                                "libsoup", "message",
1042                                                "%s request/response to %s: "
1043                                                "read %" G_GOFFSET_FORMAT "B, "
1044                                                "wrote %" G_GOFFSET_FORMAT "B, "
1045                                                "Last-Modified: %s, "
1046                                                "ETag: %s",
1047                                                soup_message_get_https_status (msg, NULL, NULL) ? "HTTPS" : "HTTP",
1048                                                uri_str, io->read_length, io->write_length,
1049                                                (last_modified != NULL) ? last_modified : "(unset)",
1050                                                (etag != NULL) ? etag : "(unset)");
1051                 g_free (uri_str);
1052         }
1053 #endif  /* HAVE_SYSPROF */
1054
1055         g_object_unref (msg);
1056         return done;
1057 }
1058
1059 static gboolean
1060 io_run_ready (SoupMessage *msg, gpointer user_data)
1061 {
1062         io_run (msg, FALSE);
1063         return FALSE;
1064 }
1065
1066 static void
1067 io_run (SoupMessage *msg, gboolean blocking)
1068 {
1069         SoupMessageIOData *io = soup_message_get_io_data (msg);
1070         GError *error = NULL;
1071         GCancellable *cancellable;
1072
1073         if (io->io_source) {
1074                 g_source_destroy (io->io_source);
1075                 g_source_unref (io->io_source);
1076                 io->io_source = NULL;
1077         }
1078
1079         g_object_ref (msg);
1080         cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
1081
1082         if (io_run_until (msg, blocking,
1083                           SOUP_MESSAGE_IO_STATE_DONE,
1084                           SOUP_MESSAGE_IO_STATE_DONE,
1085                           cancellable, &error)) {
1086                 soup_message_io_finished (msg);
1087         } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1088                 g_clear_error (&error);
1089                 io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
1090                 g_source_attach (io->io_source, io->async_context);
1091         } else if (error && soup_message_get_io_data (msg) == io) {
1092                 if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
1093                         io->item->state = SOUP_MESSAGE_RESTARTING;
1094                 else if (error->domain == G_TLS_ERROR) {
1095                         soup_message_set_status_full (msg,
1096                                                       SOUP_STATUS_SSL_FAILED,
1097                                                       error->message);
1098                 } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
1099                         soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
1100
1101                 g_error_free (error);
1102                 soup_message_io_finished (msg);
1103         } else if (error)
1104                 g_error_free (error);
1105
1106         g_object_unref (msg);
1107         g_clear_object (&cancellable);
1108 }
1109
1110 gboolean
1111 soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
1112                                  GCancellable *cancellable, GError **error)
1113 {
1114         return io_run_until (msg, blocking,
1115                              SOUP_MESSAGE_IO_STATE_ANY,
1116                              SOUP_MESSAGE_IO_STATE_BODY,
1117                              cancellable, error);
1118 }
1119
1120 gboolean
1121 soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
1122                                 GCancellable *cancellable, GError **error)
1123 {
1124         return io_run_until (msg, blocking,
1125                              SOUP_MESSAGE_IO_STATE_BODY,
1126                              SOUP_MESSAGE_IO_STATE_ANY,
1127                              cancellable, error);
1128 }
1129
1130 gboolean
1131 soup_message_io_run_until_finish (SoupMessage   *msg,
1132                                   gboolean       blocking,
1133                                   GCancellable  *cancellable,
1134                                   GError       **error)
1135 {
1136         SoupMessageIOData *io = soup_message_get_io_data (msg);
1137         gboolean success;
1138
1139         g_object_ref (msg);
1140
1141         if (io) {
1142                 g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, FALSE);
1143
1144                 if (io->read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
1145                         io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
1146         }
1147
1148         success = io_run_until (msg, blocking,
1149                                 SOUP_MESSAGE_IO_STATE_DONE,
1150                                 SOUP_MESSAGE_IO_STATE_DONE,
1151                                 cancellable, error);
1152
1153         g_object_unref (msg);
1154         return success;
1155 }
1156
1157 static void
1158 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
1159 {
1160         SoupMessage *msg = user_data;
1161         SoupMessageIOData *io = soup_message_get_io_data (msg);
1162
1163         if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
1164                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
1165 }
1166
1167 GInputStream *
1168 soup_message_io_get_response_istream (SoupMessage  *msg,
1169                                       GError      **error)
1170 {
1171         SoupMessageIOData *io = soup_message_get_io_data (msg);
1172         GInputStream *client_stream;
1173
1174         g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
1175
1176         if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
1177                 g_set_error_literal (error, SOUP_HTTP_ERROR,
1178                                      msg->status_code, msg->reason_phrase);
1179                 return NULL;
1180         }
1181
1182         client_stream = soup_client_input_stream_new (io->body_istream, msg);
1183         g_signal_connect (client_stream, "eof",
1184                           G_CALLBACK (client_stream_eof), msg);
1185
1186         return client_stream;
1187 }
1188
1189
1190 static SoupMessageIOData *
1191 new_iostate (SoupMessage *msg, GIOStream *iostream,
1192              GMainContext *async_context, SoupMessageIOMode mode,
1193              SoupMessageGetHeadersFn get_headers_cb,
1194              SoupMessageParseHeadersFn parse_headers_cb,
1195              gpointer header_data,
1196              SoupMessageCompletionFn completion_cb,
1197              gpointer completion_data)
1198 {
1199         SoupMessageIOData *io;
1200
1201         io = g_slice_new0 (SoupMessageIOData);
1202         io->mode = mode;
1203         io->get_headers_cb   = get_headers_cb;
1204         io->parse_headers_cb = parse_headers_cb;
1205         io->header_data      = header_data;
1206         io->completion_cb    = completion_cb;
1207         io->completion_data  = completion_data;
1208
1209         io->iostream = g_object_ref (iostream);
1210         io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1211         io->ostream = g_io_stream_get_output_stream (iostream);
1212
1213         if (async_context)
1214                 io->async_context = g_main_context_ref (async_context);
1215
1216         io->read_header_buf = g_byte_array_new ();
1217         io->write_buf       = g_string_new (NULL);
1218
1219         io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1220         io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1221
1222         if (soup_message_get_io_data (msg))
1223                 soup_message_io_cleanup (msg);
1224         soup_message_set_io_data (msg, io);
1225
1226 #ifdef HAVE_SYSPROF
1227         io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
1228 #endif
1229
1230         return io;
1231 }
1232
1233 void
1234 soup_message_io_client (SoupMessageQueueItem *item,
1235                         GIOStream *iostream,
1236                         GMainContext *async_context,
1237                         SoupMessageGetHeadersFn get_headers_cb,
1238                         SoupMessageParseHeadersFn parse_headers_cb,
1239                         gpointer header_data,
1240                         SoupMessageCompletionFn completion_cb,
1241                         gpointer completion_data)
1242 {
1243         SoupMessageIOData *io;
1244
1245         io = new_iostate (item->msg, iostream, async_context,
1246                           SOUP_MESSAGE_IO_CLIENT,
1247                           get_headers_cb, parse_headers_cb, header_data,
1248                           completion_cb, completion_data);
1249
1250         io->item = item;
1251         soup_message_queue_item_ref (item);
1252         io->cancellable = item->cancellable;
1253
1254         io->read_body       = item->msg->response_body;
1255         io->write_body      = item->msg->request_body;
1256
1257         io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
1258
1259         if (!item->new_api) {
1260                 gboolean blocking =
1261                         SOUP_IS_SESSION_SYNC (item->session) ||
1262                         (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
1263                 io_run (item->msg, blocking);
1264         }
1265 }
1266
1267 void
1268 soup_message_io_server (SoupMessage *msg,
1269                         GIOStream *iostream, GMainContext *async_context,
1270                         SoupMessageGetHeadersFn get_headers_cb,
1271                         SoupMessageParseHeadersFn parse_headers_cb,
1272                         gpointer header_data,
1273                         SoupMessageCompletionFn completion_cb,
1274                         gpointer completion_data)
1275 {
1276         SoupMessageIOData *io;
1277
1278         io = new_iostate (msg, iostream, async_context,
1279                           SOUP_MESSAGE_IO_SERVER,
1280                           get_headers_cb, parse_headers_cb, header_data,
1281                           completion_cb, completion_data);
1282
1283         io->read_body       = msg->request_body;
1284         io->write_body      = msg->response_body;
1285
1286         io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
1287         io_run (msg, FALSE);
1288 }
1289
1290 void  
1291 soup_message_io_pause (SoupMessage *msg)
1292 {
1293         SoupMessageIOData *io = soup_message_get_io_data (msg);
1294
1295         g_return_if_fail (io != NULL);
1296
1297         if (io->item && io->item->new_api)
1298                 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1299
1300         if (io->io_source) {
1301                 g_source_destroy (io->io_source);
1302                 g_source_unref (io->io_source);
1303                 io->io_source = NULL;
1304         }
1305
1306         if (io->unpause_source) {
1307                 g_source_destroy (io->unpause_source);
1308                 g_source_unref (io->unpause_source);
1309                 io->unpause_source = NULL;
1310         }
1311
1312         io->paused = TRUE;
1313 }
1314
1315 static gboolean
1316 io_unpause_internal (gpointer msg)
1317 {
1318         SoupMessageIOData *io = soup_message_get_io_data (msg);
1319
1320         g_return_val_if_fail (io != NULL, FALSE);
1321
1322         g_clear_pointer (&io->unpause_source, g_source_unref);
1323         io->paused = FALSE;
1324
1325         if (io->io_source)
1326                 return FALSE;
1327
1328         io_run (msg, FALSE);
1329         return FALSE;
1330 }
1331
1332 void
1333 soup_message_io_unpause (SoupMessage *msg)
1334 {
1335         SoupMessageIOData *io = soup_message_get_io_data (msg);
1336
1337         g_return_if_fail (io != NULL);
1338
1339         if (io->item && io->item->new_api) {
1340                 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1341                 io->paused = FALSE;
1342                 return;
1343         }
1344
1345         if (!io->unpause_source) {
1346                 io->unpause_source = soup_add_completion_reffed (io->async_context,
1347                                                                  io_unpause_internal, msg, NULL);
1348         }
1349 }
1350
1351 /**
1352  * soup_message_io_in_progress:
1353  * @msg: a #SoupMessage
1354  *
1355  * Tests whether or not I/O is currently in progress on @msg.
1356  *
1357  * Return value: whether or not I/O is currently in progress.
1358  **/
1359 gboolean
1360 soup_message_io_in_progress (SoupMessage *msg)
1361 {
1362         return soup_message_get_io_data (msg) != NULL;
1363 }