4e5c35f7d1e70b69bede215f3c804b06a52c81bf
[platform/upstream/libsoup.git] / libsoup / soup-message-io.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-io.c: HTTP message I/O
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <stdlib.h>
13 #include <string.h>
14
15 #include <glib/gi18n-lib.h>
16
17 #include "soup-body-input-stream.h"
18 #include "soup-body-output-stream.h"
19 #include "soup-client-input-stream.h"
20 #include "soup-connection.h"
21 #include "soup-content-sniffer-stream.h"
22 #include "soup-converter-wrapper.h"
23 #include "soup-filter-input-stream.h"
24 #include "soup-message.h"
25 #include "soup-message-private.h"
26 #include "soup-message-queue.h"
27 #include "soup-misc.h"
28 #include "soup-socket.h"
29
30 typedef enum {
31         SOUP_MESSAGE_IO_CLIENT,
32         SOUP_MESSAGE_IO_SERVER
33 } SoupMessageIOMode;
34
35 typedef enum {
36         SOUP_MESSAGE_IO_STATE_NOT_STARTED,
37         SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
38         SOUP_MESSAGE_IO_STATE_HEADERS,
39         SOUP_MESSAGE_IO_STATE_BLOCKING,
40         SOUP_MESSAGE_IO_STATE_BODY_START,
41         SOUP_MESSAGE_IO_STATE_BODY,
42         SOUP_MESSAGE_IO_STATE_BODY_DATA,
43         SOUP_MESSAGE_IO_STATE_BODY_DONE,
44         SOUP_MESSAGE_IO_STATE_FINISHING,
45         SOUP_MESSAGE_IO_STATE_DONE
46 } SoupMessageIOState;
47
48 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
49         (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
50          state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
51          state != SOUP_MESSAGE_IO_STATE_DONE)
52 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
53         (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
54          state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
55
56 typedef struct {
57         SoupMessageQueueItem *item;
58         SoupMessageIOMode     mode;
59         GCancellable         *cancellable;
60
61         SoupSocket             *sock;
62         SoupFilterInputStream  *istream;
63         GInputStream           *body_istream;
64         GOutputStream          *ostream;
65         GOutputStream          *body_ostream;
66         GMainContext           *async_context;
67         gboolean                blocking;
68
69         SoupMessageIOState    read_state;
70         SoupEncoding          read_encoding;
71         GByteArray           *read_header_buf;
72         SoupMessageBody      *read_body;
73         goffset               read_length;
74
75         SoupMessageIOState    write_state;
76         SoupEncoding          write_encoding;
77         GString              *write_buf;
78         SoupMessageBody      *write_body;
79         SoupBuffer           *write_chunk;
80         goffset               write_body_offset;
81         goffset               write_length;
82         goffset               written;
83
84         GSource *io_source;
85         GSource *unpause_source;
86         gboolean paused;
87
88         SoupMessageGetHeadersFn   get_headers_cb;
89         SoupMessageParseHeadersFn parse_headers_cb;
90         gpointer                  header_data;
91         SoupMessageCompletionFn   completion_cb;
92         gpointer                  completion_data;
93 } SoupMessageIOData;
94         
95
96 #define RESPONSE_BLOCK_SIZE 8192
97
98 void
99 soup_message_io_cleanup (SoupMessage *msg)
100 {
101         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
102         SoupMessageIOData *io;
103
104         soup_message_io_stop (msg);
105
106         io = priv->io_data;
107         if (!io)
108                 return;
109         priv->io_data = NULL;
110
111         if (io->sock)
112                 g_object_unref (io->sock);
113         if (io->istream)
114                 g_object_remove_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
115         if (io->ostream)
116                 g_object_remove_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
117         if (io->body_istream)
118                 g_object_unref (io->body_istream);
119         if (io->body_ostream)
120                 g_object_unref (io->body_ostream);
121         if (io->async_context)
122                 g_main_context_unref (io->async_context);
123         if (io->item)
124                 soup_message_queue_item_unref (io->item);
125
126         g_byte_array_free (io->read_header_buf, TRUE);
127
128         g_string_free (io->write_buf, TRUE);
129         if (io->write_chunk)
130                 soup_buffer_free (io->write_chunk);
131
132         g_slice_free (SoupMessageIOData, io);
133 }
134
135 void
136 soup_message_io_stop (SoupMessage *msg)
137 {
138         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
139         SoupMessageIOData *io = priv->io_data;
140
141         if (!io)
142                 return;
143
144         if (io->io_source) {
145                 g_source_destroy (io->io_source);
146                 g_source_unref (io->io_source);
147                 io->io_source = NULL;
148         }
149
150         if (io->unpause_source) {
151                 g_source_destroy (io->unpause_source);
152                 g_source_unref (io->unpause_source);
153                 io->unpause_source = NULL;
154         }
155
156         if (io->read_state < SOUP_MESSAGE_IO_STATE_FINISHING)
157                 soup_socket_disconnect (io->sock);
158 }
159
160 void
161 soup_message_io_finished (SoupMessage *msg)
162 {
163         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
164         SoupMessageIOData *io = priv->io_data;
165         SoupMessageCompletionFn completion_cb = io->completion_cb;
166         gpointer completion_data = io->completion_data;
167
168         g_object_ref (msg);
169         soup_message_io_cleanup (msg);
170         if (completion_cb)
171                 completion_cb (msg, completion_data);
172         g_object_unref (msg);
173 }
174
175 static gboolean
176 request_is_idempotent (SoupMessage *msg)
177 {
178         /* FIXME */
179         return (msg->method == SOUP_METHOD_GET);
180 }
181
182 static void
183 io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
184 {
185         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
186         SoupMessageIOData *io = priv->io_data;
187
188         if (error && error->domain == G_TLS_ERROR) {
189                 soup_message_set_status_full (msg,
190                                               SOUP_STATUS_SSL_FAILED,
191                                               error->message);
192         } else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
193                    io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
194                    io->read_header_buf->len == 0 &&
195                    soup_connection_get_ever_used (io->item->conn) &&
196                    !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
197                    request_is_idempotent (msg)) {
198                 /* Connection got closed, but we can safely try again */
199                 io->item->state = SOUP_MESSAGE_RESTARTING;
200         } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
201                 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
202
203         if (error)
204                 g_error_free (error);
205
206         soup_message_io_finished (msg);
207 }
208
209 static gboolean
210 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
211 {
212         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
213         SoupMessageIOData *io = priv->io_data;
214         gssize nread, old_len;
215         gboolean got_lf;
216
217         while (1) {
218                 old_len = io->read_header_buf->len;
219                 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
220                 nread = soup_filter_input_stream_read_line (io->istream,
221                                                             io->read_header_buf->data + old_len,
222                                                             RESPONSE_BLOCK_SIZE,
223                                                             io->blocking,
224                                                             &got_lf,
225                                                             cancellable, error);
226                 io->read_header_buf->len = old_len + MAX (nread, 0);
227                 if (nread == 0) {
228                         soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
229                         g_set_error_literal (error, G_IO_ERROR,
230                                              G_IO_ERROR_PARTIAL_INPUT,
231                                              _("Connection terminated unexpectedly"));
232                 }
233                 if (nread <= 0)
234                         return FALSE;
235
236                 if (got_lf) {
237                         if (nread == 1 && old_len >= 2 &&
238                             !strncmp ((char *)io->read_header_buf->data +
239                                       io->read_header_buf->len - 2,
240                                       "\n\n", 2))
241                                 break;
242                         else if (nread == 2 && old_len >= 3 &&
243                                  !strncmp ((char *)io->read_header_buf->data +
244                                            io->read_header_buf->len - 3,
245                                            "\n\r\n", 3))
246                                 break;
247                 }
248         }
249
250         /* We need to "rewind" io->read_header_buf back one line.
251          * That SHOULD be two characters (CR LF), but if the
252          * web server was stupid, it might only be one.
253          */
254         if (io->read_header_buf->len < 3 ||
255             io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
256                 io->read_header_buf->len--;
257         else
258                 io->read_header_buf->len -= 2;
259         io->read_header_buf->data[io->read_header_buf->len] = '\0';
260
261         return TRUE;
262 }
263
264 static void
265 setup_body_istream (SoupMessage *msg)
266 {
267         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
268         SoupMessageIOData *io = priv->io_data;
269         GConverter *decoder, *wrapper;
270         GInputStream *filter;
271         GSList *d;
272
273         io->body_istream =
274                 soup_body_input_stream_new (io->istream,
275                                             io->read_encoding,
276                                             io->read_length);
277
278         for (d = priv->decoders; d; d = d->next) {
279                 decoder = d->data;
280                 wrapper = soup_converter_wrapper_new (decoder, msg);
281                 filter = g_object_new (G_TYPE_CONVERTER_INPUT_STREAM,
282                                        "base-stream", io->body_istream,
283                                        "converter", wrapper,
284                                        NULL);
285                 g_object_unref (io->body_istream);
286                 io->body_istream = filter;
287         }
288
289         if (priv->sniffer) {
290                 filter = soup_content_sniffer_stream_new (priv->sniffer,
291                                                           msg, io->body_istream);
292                 g_object_unref (io->body_istream);
293                 io->body_istream = filter;
294         }
295 }
296
297 /*
298  * There are two request/response formats: the basic request/response,
299  * possibly with one or more unsolicited informational responses (such
300  * as the WebDAV "102 Processing" response):
301  *
302  *     Client                            Server
303  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
304  *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
305  *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
306  *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
307  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
308  *      W:DONE     / R:DONE               R:DONE     / W:DONE
309  *     
310  * and the "Expect: 100-continue" request/response, with the client
311  * blocking halfway through its request, and then either continuing or
312  * aborting, depending on the server response:
313  *
314  *     Client                            Server
315  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
316  *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
317  *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
318  *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
319  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
320  *      W:DONE     / R:DONE               R:DONE     / W:DONE
321  */
322
323 /* Attempts to push forward the writing side of @msg's I/O. Returns
324  * %TRUE if it manages to make some progress, and it is likely that
325  * further progress can be made. Returns %FALSE if it has reached a
326  * stopping point of some sort (need input from the application,
327  * socket not writable, write is complete, etc).
328  */
329 static gboolean
330 io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
331 {
332         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
333         SoupMessageIOData *io = priv->io_data;
334         SoupBuffer *chunk;
335         gssize nwrote;
336
337         switch (io->write_state) {
338         case SOUP_MESSAGE_IO_STATE_HEADERS:
339                 if (!io->write_buf->len) {
340                         io->get_headers_cb (msg, io->write_buf,
341                                             &io->write_encoding,
342                                             io->header_data);
343                 }
344
345                 while (io->written < io->write_buf->len) {
346                         nwrote = g_pollable_stream_write (io->ostream,
347                                                           io->write_buf->str + io->written,
348                                                           io->write_buf->len - io->written,
349                                                           io->blocking,
350                                                           cancellable, error);
351                         if (nwrote == -1)
352                                 return FALSE;
353                         io->written += nwrote;
354                 }
355
356                 io->written = 0;
357                 g_string_truncate (io->write_buf, 0);
358
359                 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
360                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
361                         if (msg->status_code == SOUP_STATUS_CONTINUE) {
362                                 /* Stop and wait for the body now */
363                                 io->write_state =
364                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
365                                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
366                         } else {
367                                 /* We just wrote a 1xx response
368                                  * header, so stay in STATE_HEADERS.
369                                  * (The caller will pause us from the
370                                  * wrote_informational callback if he
371                                  * is not ready to send the final
372                                  * response.)
373                                  */
374                         }
375
376                         soup_message_wrote_informational (msg);
377                         soup_message_cleanup_response (msg);
378                         break;
379                 }
380
381                 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
382                         SoupMessageHeaders *hdrs =
383                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
384                                 msg->request_headers : msg->response_headers;
385                         io->write_length = soup_message_headers_get_content_length (hdrs);
386                 }
387
388                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
389                     soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
390                         /* Need to wait for the Continue response */
391                         io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
392                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
393                 } else {
394                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
395
396                         /* If the client was waiting for a Continue
397                          * but we sent something else, then they're
398                          * now done writing.
399                          */
400                         if (io->mode == SOUP_MESSAGE_IO_SERVER &&
401                             io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
402                                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
403                 }
404
405                 soup_message_wrote_headers (msg);
406                 break;
407
408
409         case SOUP_MESSAGE_IO_STATE_BODY_START:
410                 io->body_ostream = soup_body_output_stream_new (io->ostream,
411                                                                 io->write_encoding,
412                                                                 io->write_length);
413                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
414                 break;
415
416
417         case SOUP_MESSAGE_IO_STATE_BODY:
418                 if (!io->write_length &&
419                     io->write_encoding != SOUP_ENCODING_EOF &&
420                     io->write_encoding != SOUP_ENCODING_CHUNKED) {
421                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
422                         break;
423                 }
424
425                 if (!io->write_chunk) {
426                         io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
427                         if (!io->write_chunk) {
428                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
429                                 soup_message_io_pause (msg);
430                                 return FALSE;
431                         }
432                         if (!io->write_chunk->length) {
433                                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
434                                 break;
435                         }
436                 }
437
438                 nwrote = g_pollable_stream_write (io->body_ostream,
439                                                   io->write_chunk->data + io->written,
440                                                   io->write_chunk->length - io->written,
441                                                   io->blocking,
442                                                   cancellable, error);
443                 if (nwrote == -1)
444                         return FALSE;
445
446                 chunk = soup_buffer_new_subbuffer (io->write_chunk,
447                                                    io->written, nwrote);
448                 io->written += nwrote;
449                 if (io->write_length)
450                         io->write_length -= nwrote;
451
452                 if (io->written == io->write_chunk->length)
453                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
454
455                 soup_message_wrote_body_data (msg, chunk);
456                 soup_buffer_free (chunk);
457                 break;
458
459
460         case SOUP_MESSAGE_IO_STATE_BODY_DATA:
461                 io->written = 0;
462                 if (io->write_chunk->length == 0) {
463                         io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
464                         break;
465                 }
466
467                 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
468                     priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
469                         soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
470                 io->write_body_offset += io->write_chunk->length;
471                 soup_buffer_free (io->write_chunk);
472                 io->write_chunk = NULL;
473
474                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
475                 soup_message_wrote_chunk (msg);
476                 break;
477
478
479         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
480                 if (io->body_ostream) {
481                         if (!g_output_stream_close (io->body_ostream, cancellable, error))
482                                 return FALSE;
483                         g_clear_object (&io->body_ostream);
484                 }
485
486                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
487                 soup_message_wrote_body (msg);
488                 break;
489
490
491         case SOUP_MESSAGE_IO_STATE_FINISHING:
492                 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
493
494                 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
495                         io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
496                 break;
497
498
499         default:
500                 g_return_val_if_reached (FALSE);
501         }
502
503         return TRUE;
504 }
505
506 /* Attempts to push forward the reading side of @msg's I/O. Returns
507  * %TRUE if it manages to make some progress, and it is likely that
508  * further progress can be made. Returns %FALSE if it has reached a
509  * stopping point of some sort (need input from the application,
510  * socket not readable, read is complete, etc).
511  */
512 static gboolean
513 io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
514 {
515         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
516         SoupMessageIOData *io = priv->io_data;
517         guchar *stack_buf = NULL;
518         gssize nread;
519         SoupBuffer *buffer;
520         guint status;
521
522         switch (io->read_state) {
523         case SOUP_MESSAGE_IO_STATE_HEADERS:
524                 if (!read_headers (msg, cancellable, error))
525                         return FALSE;
526
527                 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
528                                                io->read_header_buf->len,
529                                                &io->read_encoding,
530                                                io->header_data);
531                 g_byte_array_set_size (io->read_header_buf, 0);
532
533                 if (status != SOUP_STATUS_OK) {
534                         /* Either we couldn't parse the headers, or they
535                          * indicated something that would mean we wouldn't
536                          * be able to parse the body. (Eg, unknown
537                          * Transfer-Encoding.). Skip the rest of the
538                          * reading, and make sure the connection gets
539                          * closed when we're done.
540                          */
541                         soup_message_set_status (msg, status);
542                         soup_message_headers_append (msg->request_headers,
543                                                      "Connection", "close");
544                         io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
545                         break;
546                 }
547
548                 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
549                     SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
550                         if (msg->status_code == SOUP_STATUS_CONTINUE &&
551                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
552                                 /* Pause the reader, unpause the writer */
553                                 io->read_state =
554                                         SOUP_MESSAGE_IO_STATE_BLOCKING;
555                                 io->write_state =
556                                         SOUP_MESSAGE_IO_STATE_BODY_START;
557                         } else {
558                                 /* Just stay in HEADERS */
559                                 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
560                         }
561
562                         /* Informational responses have no bodies, so
563                          * bail out here rather than parsing encoding, etc
564                          */
565                         soup_message_got_informational (msg);
566                         soup_message_cleanup_response (msg);
567                         break;
568                 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
569                            soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
570                         /* The client requested a Continue response. The
571                          * got_headers handler may change this to something
572                          * else though.
573                          */
574                         soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
575                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
576                         io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
577                 } else {
578                         io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
579
580                         /* If the client was waiting for a Continue
581                          * but got something else, then it's done
582                          * writing.
583                          */
584                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
585                             io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
586                                 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
587                 }
588
589                 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
590                         SoupMessageHeaders *hdrs =
591                                 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
592                                 msg->response_headers : msg->request_headers;
593                         io->read_length = soup_message_headers_get_content_length (hdrs);
594
595                         if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
596                             !soup_message_is_keepalive (msg)) {
597                                 /* Some servers suck and send
598                                  * incorrect Content-Length values, so
599                                  * allow EOF termination in this case
600                                  * (iff the message is too short) too.
601                                  */
602                                 io->read_encoding = SOUP_ENCODING_EOF;
603                         }
604                 } else
605                         io->read_length = -1;
606
607                 soup_message_got_headers (msg);
608                 break;
609
610
611         case SOUP_MESSAGE_IO_STATE_BODY_START:
612                 if (!io->body_istream)
613                         setup_body_istream (msg);
614
615                 if (priv->sniffer) {
616                         SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
617                         const char *content_type;
618                         GHashTable *params;
619
620                         if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error))
621                                 return FALSE;
622
623                         content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
624                         soup_message_content_sniffed (msg, content_type, params);
625                 }
626
627                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
628                 break;
629
630
631         case SOUP_MESSAGE_IO_STATE_BODY:
632                 if (priv->chunk_allocator) {
633                         buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
634                         if (!buffer) {
635                                 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
636                                 soup_message_io_pause (msg);
637                                 return FALSE;
638                         }
639                 } else {
640                         if (!stack_buf)
641                                 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
642                         buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
643                                                   stack_buf,
644                                                   RESPONSE_BLOCK_SIZE);
645                 }
646
647                 nread = g_pollable_stream_read (io->body_istream,
648                                                 (guchar *)buffer->data,
649                                                 buffer->length,
650                                                 io->blocking,
651                                                 cancellable, error);
652                 if (nread > 0) {
653                         buffer->length = nread;
654                         soup_message_body_got_chunk (io->read_body, buffer);
655                         soup_message_got_chunk (msg, buffer);
656                         soup_buffer_free (buffer);
657                         break;
658                 }
659
660                 soup_buffer_free (buffer);
661                 if (nread == -1)
662                         return FALSE;
663
664                 /* else nread == 0 */
665                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
666                 break;
667
668
669         case SOUP_MESSAGE_IO_STATE_BODY_DONE:
670                 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
671                 soup_message_got_body (msg);
672                 break;
673
674
675         case SOUP_MESSAGE_IO_STATE_FINISHING:
676                 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
677
678                 if (io->mode == SOUP_MESSAGE_IO_SERVER)
679                         io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
680                 break;
681
682
683         default:
684                 g_return_val_if_reached (FALSE);
685         }
686
687         return TRUE;
688 }
689
690 typedef struct {
691         GSource source;
692         SoupMessage *msg;
693 } SoupMessageSource;
694
695 static gboolean
696 message_source_prepare (GSource *source,
697                         gint    *timeout)
698 {
699         *timeout = -1;
700         return FALSE;
701 }
702
703 static gboolean
704 message_source_check (GSource *source)
705 {
706         return FALSE;
707 }
708
709 static gboolean
710 message_source_dispatch (GSource     *source,
711                          GSourceFunc  callback,
712                          gpointer     user_data)
713 {
714   SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
715   SoupMessageSource *message_source = (SoupMessageSource *)source;
716
717   return (*func) (message_source->msg, user_data);
718 }
719
720 static void
721 message_source_finalize (GSource *source)
722 {
723   SoupMessageSource *message_source = (SoupMessageSource *)source;
724
725   g_object_unref (message_source->msg);
726 }
727
728 static gboolean
729 message_source_closure_callback (SoupMessage *msg,
730                                  gpointer     data)
731 {
732   GClosure *closure = data;
733
734   GValue param = G_VALUE_INIT;
735   GValue result_value = G_VALUE_INIT;
736   gboolean result;
737
738   g_value_init (&result_value, G_TYPE_BOOLEAN);
739
740   g_value_init (&param, SOUP_TYPE_MESSAGE);
741   g_value_set_object (&param, msg);
742
743   g_closure_invoke (closure, &result_value, 1, &param, NULL);
744
745   result = g_value_get_boolean (&result_value);
746   g_value_unset (&result_value);
747   g_value_unset (&param);
748
749   return result;
750 }
751
752 static GSourceFuncs message_source_funcs =
753 {
754   message_source_prepare,
755   message_source_check,
756   message_source_dispatch,
757   message_source_finalize,
758   (GSourceFunc)message_source_closure_callback,
759   (GSourceDummyMarshal)g_cclosure_marshal_generic,
760 };
761
762 GSource *
763 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
764                             SoupMessageSourceFunc callback, gpointer user_data)
765 {
766         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
767         SoupMessageIOData *io = priv->io_data;
768         GSource *base_source, *source;
769         SoupMessageSource *message_source;
770
771         if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
772                 GPollableInputStream *istream;
773
774                 if (io->body_istream)
775                         istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
776                 else
777                         istream = G_POLLABLE_INPUT_STREAM (io->istream);
778                 base_source = g_pollable_input_stream_create_source (istream, cancellable);
779         } else if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
780                 GPollableOutputStream *ostream;
781
782                 if (io->body_ostream)
783                         ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
784                 else
785                         ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
786                 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
787         } else
788                 base_source = g_timeout_source_new (0);
789
790         g_source_set_dummy_callback (base_source);
791         source = g_source_new (&message_source_funcs,
792                                sizeof (SoupMessageSource));
793         g_source_set_name (source, "SoupMessageSource");
794         message_source = (SoupMessageSource *)source;
795         message_source->msg = g_object_ref (msg);
796
797         g_source_add_child_source (source, base_source);
798         g_source_unref (base_source);
799         g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
800         return source;
801 }
802
803 static gboolean
804 io_run_until (SoupMessage *msg,
805               SoupMessageIOState read_state, SoupMessageIOState write_state,
806               GCancellable *cancellable, GError **error)
807 {
808         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
809         SoupMessageIOData *io = priv->io_data;
810         gboolean progress = TRUE, done;
811
812         if (g_cancellable_set_error_if_cancelled (cancellable, error))
813                 return FALSE;
814         else if (!io) {
815                 g_set_error_literal (error, G_IO_ERROR,
816                                      G_IO_ERROR_CANCELLED,
817                                      _("Operation was cancelled"));
818                 return FALSE;
819         }
820
821         g_object_ref (msg);
822
823         while (progress && priv->io_data == io && !io->paused &&
824                (io->read_state < read_state || io->write_state < write_state)) {
825
826                 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
827                         progress = io_read (msg, cancellable, error);
828                 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
829                         progress = io_write (msg, cancellable, error);
830                 else
831                         progress = FALSE;
832         }
833
834         done = (priv->io_data == io &&
835                 io->read_state >= read_state &&
836                 io->write_state >= write_state);
837
838         g_object_unref (msg);
839         return done;
840 }
841
842 static gboolean
843 io_run (SoupMessage *msg, gpointer user_data)
844 {
845         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
846         SoupMessageIOData *io = priv->io_data;
847         GError *error = NULL;
848
849         if (io->io_source) {
850                 g_source_destroy (io->io_source);
851                 g_source_unref (io->io_source);
852                 io->io_source = NULL;
853         }
854
855         g_object_ref (msg);
856
857         if (io_run_until (msg,
858                           SOUP_MESSAGE_IO_STATE_DONE,
859                           SOUP_MESSAGE_IO_STATE_DONE,
860                           io->cancellable, &error)) {
861                 soup_message_io_finished (msg);
862         } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
863                 g_clear_error (&error);
864                 io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
865                 g_source_attach (io->io_source, io->async_context);
866         } else if (error) {
867                 io_error (io->sock, msg, error);
868         }
869
870         g_object_unref (msg);
871         return FALSE;
872 }
873
874 gboolean
875 soup_message_io_run_until_write (SoupMessage *msg,
876                                  GCancellable *cancellable, GError **error)
877 {
878         return io_run_until (msg,
879                              SOUP_MESSAGE_IO_STATE_ANY,
880                              SOUP_MESSAGE_IO_STATE_BODY,
881                              cancellable, error);
882 }
883
884 gboolean
885 soup_message_io_run_until_read (SoupMessage *msg,
886                                 GCancellable *cancellable, GError **error)
887 {
888         return io_run_until (msg,
889                              SOUP_MESSAGE_IO_STATE_BODY,
890                              SOUP_MESSAGE_IO_STATE_ANY,
891                              cancellable, error);
892 }
893
894 gboolean
895 soup_message_io_run_until_finish (SoupMessage   *msg,
896                                   GCancellable  *cancellable,
897                                   GError       **error)
898 {
899         g_object_ref (msg);
900
901         if (!io_run_until (msg,
902                            SOUP_MESSAGE_IO_STATE_DONE,
903                            SOUP_MESSAGE_IO_STATE_DONE,
904                            cancellable, error))
905                 return FALSE;
906
907         soup_message_io_finished (msg);
908         g_object_unref (msg);
909         return TRUE;
910 }
911
912 static void
913 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
914 {
915         SoupMessage *msg = user_data;
916         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
917         SoupMessageIOData *io = priv->io_data;
918
919         if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
920                 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
921 }
922
923 GInputStream *
924 soup_message_io_get_response_istream (SoupMessage  *msg,
925                                       GError      **error)
926 {
927         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
928         SoupMessageIOData *io = priv->io_data;
929         GInputStream *client_stream;
930
931         g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
932
933         if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
934                 g_set_error_literal (error, SOUP_HTTP_ERROR,
935                                      msg->status_code, msg->reason_phrase);
936                 return NULL;
937         }
938
939         client_stream = soup_client_input_stream_new (io->body_istream, msg);
940         g_signal_connect (client_stream, "eof",
941                           G_CALLBACK (client_stream_eof), msg);
942
943         return client_stream;
944 }
945
946
947 static SoupMessageIOData *
948 new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
949              SoupMessageGetHeadersFn get_headers_cb,
950              SoupMessageParseHeadersFn parse_headers_cb,
951              gpointer header_data,
952              SoupMessageCompletionFn completion_cb,
953              gpointer completion_data)
954 {
955         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
956         SoupMessageIOData *io;
957         gboolean non_blocking, use_thread_context;
958
959         io = g_slice_new0 (SoupMessageIOData);
960         io->mode = mode;
961         io->get_headers_cb   = get_headers_cb;
962         io->parse_headers_cb = parse_headers_cb;
963         io->header_data      = header_data;
964         io->completion_cb    = completion_cb;
965         io->completion_data  = completion_data;
966
967         io->sock = g_object_ref (sock);
968         io->istream = SOUP_FILTER_INPUT_STREAM (soup_socket_get_input_stream (sock));
969         if (io->istream)
970                 g_object_add_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
971         io->ostream = soup_socket_get_output_stream (sock);
972         if (io->ostream)
973                 g_object_add_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
974
975         g_object_get (io->sock,
976                       SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
977                       SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
978                       NULL);
979         io->blocking = !non_blocking;
980
981         if (use_thread_context) {
982                 io->async_context = g_main_context_get_thread_default ();
983                 if (io->async_context)
984                         g_main_context_ref (io->async_context);
985         } else {
986                 g_object_get (io->sock,
987                               SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
988                               NULL);
989         }
990
991         io->read_header_buf = g_byte_array_new ();
992         io->write_buf       = g_string_new (NULL);
993
994         io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
995         io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
996
997         if (priv->io_data)
998                 soup_message_io_cleanup (msg);
999         priv->io_data = io;
1000         return io;
1001 }
1002
1003 void
1004 soup_message_io_client (SoupMessageQueueItem *item,
1005                         SoupMessageGetHeadersFn get_headers_cb,
1006                         SoupMessageParseHeadersFn parse_headers_cb,
1007                         gpointer header_data,
1008                         SoupMessageCompletionFn completion_cb,
1009                         gpointer completion_data)
1010 {
1011         SoupMessageIOData *io;
1012         SoupSocket *sock = soup_connection_get_socket (item->conn);
1013
1014         io = new_iostate (item->msg, sock, SOUP_MESSAGE_IO_CLIENT,
1015                           get_headers_cb, parse_headers_cb, header_data,
1016                           completion_cb, completion_data);
1017
1018         io->item = item;
1019         soup_message_queue_item_ref (item);
1020         io->cancellable = item->cancellable;
1021
1022         io->read_body       = item->msg->response_body;
1023         io->write_body      = item->msg->request_body;
1024
1025         io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
1026         if (!item->new_api)
1027                 io_run (item->msg, NULL);
1028 }
1029
1030 void
1031 soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
1032                         SoupMessageGetHeadersFn get_headers_cb,
1033                         SoupMessageParseHeadersFn parse_headers_cb,
1034                         gpointer header_data,
1035                         SoupMessageCompletionFn completion_cb,
1036                         gpointer completion_data)
1037 {
1038         SoupMessageIOData *io;
1039
1040         io = new_iostate (msg, sock, SOUP_MESSAGE_IO_SERVER,
1041                           get_headers_cb, parse_headers_cb, header_data,
1042                           completion_cb, completion_data);
1043
1044         io->read_body       = msg->request_body;
1045         io->write_body      = msg->response_body;
1046
1047         io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
1048         io_run (msg, NULL);
1049 }
1050
1051 void  
1052 soup_message_io_pause (SoupMessage *msg)
1053 {
1054         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1055         SoupMessageIOData *io = priv->io_data;
1056
1057         g_return_if_fail (io != NULL);
1058
1059         if (io->io_source) {
1060                 g_source_destroy (io->io_source);
1061                 g_source_unref (io->io_source);
1062                 io->io_source = NULL;
1063         }
1064
1065         if (io->unpause_source) {
1066                 g_source_destroy (io->unpause_source);
1067                 io->unpause_source = NULL;
1068         }
1069
1070         io->paused = TRUE;
1071 }
1072
1073 static gboolean
1074 io_unpause_internal (gpointer msg)
1075 {
1076         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1077         SoupMessageIOData *io = priv->io_data;
1078
1079         g_return_val_if_fail (io != NULL, FALSE);
1080         io->unpause_source = NULL;
1081         io->paused = FALSE;
1082
1083         if (io->io_source)
1084                 return FALSE;
1085
1086         io_run (msg, NULL);
1087         return FALSE;
1088 }
1089
1090 void
1091 soup_message_io_unpause (SoupMessage *msg)
1092 {
1093         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1094         SoupMessageIOData *io = priv->io_data;
1095
1096         g_return_if_fail (io != NULL);
1097
1098         if (!io->blocking) {
1099                 if (!io->unpause_source) {
1100                         io->unpause_source = soup_add_completion (
1101                                 io->async_context, io_unpause_internal, msg);
1102                 }
1103         } else
1104                 io_unpause_internal (msg);
1105 }
1106
1107 /**
1108  * soup_message_io_in_progress:
1109  * @msg: a #SoupMessage
1110  *
1111  * Tests whether or not I/O is currently in progress on @msg.
1112  *
1113  * Return value: whether or not I/O is currently in progress.
1114  **/
1115 gboolean
1116 soup_message_io_in_progress (SoupMessage *msg)
1117 {
1118         SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
1119
1120         return priv->io_data != NULL;
1121 }